From 41c6a3737ec2b328f78a5f516e8d8ac954e4e34d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Feb 2017 13:52:32 +0000 Subject: [PATCH] A kafkaesque room event consumer for the roomserver. (#1) * A kafkaesque room event consumer for the roomserver. Implement the main input loop for the roomserver. It will receive events from a kafkaesque event source and track where it is in the stream. It currently does nothing with the events it consumes. --- .gitignore | 28 +++++ hooks/install.sh | 5 + hooks/pre-commit | 9 ++ .../dendrite/roomserver/api/input.go | 35 ++++++ .../dendrite/roomserver/input/consumer.go | 103 ++++++++++++++++++ .../roomserver/roomserver/roomserver.go | 44 ++++++++ .../dendrite/roomserver/storage/sql.go | 70 ++++++++++++ .../dendrite/roomserver/storage/storage.go | 37 +++++++ .../dendrite/roomserver/types/types.go | 10 ++ 9 files changed, 341 insertions(+) create mode 100644 .gitignore create mode 100755 hooks/install.sh create mode 100755 hooks/pre-commit create mode 100644 src/github.com/matrix-org/dendrite/roomserver/api/input.go create mode 100644 src/github.com/matrix-org/dendrite/roomserver/input/consumer.go create mode 100644 src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go create mode 100644 src/github.com/matrix-org/dendrite/roomserver/storage/sql.go create mode 100644 src/github.com/matrix-org/dendrite/roomserver/storage/storage.go create mode 100644 src/github.com/matrix-org/dendrite/roomserver/types/types.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..a11dc931 --- /dev/null +++ b/.gitignore @@ -0,0 +1,28 @@ +.*.swp + +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +bin +pkg +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof diff --git a/hooks/install.sh b/hooks/install.sh new file mode 100755 index 00000000..f8aa331f --- /dev/null +++ b/hooks/install.sh @@ -0,0 +1,5 @@ +#! /bin/bash + +DOT_GIT="$(dirname $0)/../.git" + +ln -s "../../hooks/pre-commit" "$DOT_GIT/hooks/pre-commit" \ No newline at end of file diff --git a/hooks/pre-commit b/hooks/pre-commit new file mode 100755 index 00000000..d9ffbfba --- /dev/null +++ b/hooks/pre-commit @@ -0,0 +1,9 @@ +#! /bin/bash + +set -eu + +golint src/... +go fmt ./src/... +go tool vet --shadow ./src +gocyclo -over 12 src/ +gb test diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/input.go b/src/github.com/matrix-org/dendrite/roomserver/api/input.go new file mode 100644 index 00000000..366541af --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/api/input.go @@ -0,0 +1,35 @@ +// Package api provides the types that are used to communicate with the roomserver. +package api + +const ( + // KindOutlier event fall outside the contiguous event graph. + // We do not have the state for these events. + // These events are state events used to authenticate other events. + // They can become part of the contiguous event graph via backfill. + KindOutlier = 1 + // KindJoin event start a new contiguous event graph. The event must be a + // m.room.member event joining this server to the room. This must come with + // the state at the event. If the event is contiguous with the existing + // graph for the room then it is treated as a normal new event. + KindJoin = 2 + // KindNew event extend the contiguous graph going forwards. + // They usually don't need state, but may include state if the + // there was a new event that references an event that we don't + // have a copy of. + KindNew = 3 + // KindBackfill event extend the contiguous graph going backwards. + // They always have state. + KindBackfill = 4 +) + +// InputRoomEvent is a matrix room event to add to the room server database. +type InputRoomEvent struct { + // Whether this event is new, backfilled or an outlier. + // This controls how the event is processed. + Kind int + // The event JSON for the event to add. + Event []byte + // Optional list of state event IDs forming the state before this event. + // These state events must have already been persisted. + State []string +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go new file mode 100644 index 00000000..6d2783f7 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go @@ -0,0 +1,103 @@ +// Package input contains the code that writes +package input + +import ( + "github.com/matrix-org/dendrite/roomserver/types" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// A ConsumerDatabase has the storage APIs needed by the consumer. +type ConsumerDatabase interface { + // PartitionOffsets returns the offsets the consumer has reached for each partition. + PartitionOffsets(topic string) ([]types.PartitionOffset, error) + // SetPartitionOffset records where the consumer has reached for a partition. + SetPartitionOffset(topic string, partition int32, offset int64) error +} + +// An ErrorLogger handles the errors encountered by the consumer. +type ErrorLogger interface { + OnError(message *sarama.ConsumerMessage, err error) +} + +// A Consumer consumes a kafkaesque stream of room events. +// The room events are supplied as api.InputRoomEvent structs serialised as JSON. +// The events should be valid matrix events. +// The events needed to authenticate the event should already be stored on the roomserver. +// The events needed to construct the state at the event should already be stored on the roomserver. +// If the event is not valid then it will be discarded and an error will be logged. +type Consumer struct { + // A kafkaesque stream consumer providing the APIs for talking to the event source. + // The interface is taken from a client library for Apache Kafka. + // But any equivalent event streaming protocol could be made to implement the same interface. + Consumer sarama.Consumer + // The database used to store the room events. + DB ConsumerDatabase + // The kafkaesque topic to consume room events from. + // This is the name used in kafka to identify the stream to consume events from. + RoomEventTopic string + // The ErrorLogger for this consumer. + // If left as nil then the consumer will panic when it encounters an error + ErrorLogger ErrorLogger +} + +// Start starts the consumer consuming. +// Starts up a goroutine for each partition in the kafka stream. +// Returns nil once all the goroutines are started. +// Returns an error if it can't start consuming for any of the partitions. +func (c *Consumer) Start() error { + offsets := map[int32]int64{} + + partitions, err := c.Consumer.Partitions(c.RoomEventTopic) + if err != nil { + return err + } + for _, partition := range partitions { + // Default all the offsets to the beginning of the stream. + offsets[partition] = sarama.OffsetOldest + } + + storedOffsets, err := c.DB.PartitionOffsets(c.RoomEventTopic) + if err != nil { + return err + } + for _, offset := range storedOffsets { + // We've already processed events from this partition so advance the offset to where we got to. + offsets[offset.Partition] = offset.Offset + } + + var partitionConsumers []sarama.PartitionConsumer + for partition, offset := range offsets { + pc, err := c.Consumer.ConsumePartition(c.RoomEventTopic, partition, offset) + if err != nil { + for _, p := range partitionConsumers { + p.Close() + } + return err + } + partitionConsumers = append(partitionConsumers, pc) + } + for _, pc := range partitionConsumers { + go c.consumePartition(pc) + } + + return nil +} + +// consumePartition consumes the room events for a single partition of the kafkaesque stream. +func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) { + defer pc.Close() + for message := range pc.Messages() { + // TODO: Do stuff with message. + if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil { + c.logError(message, err) + } + } +} + +// logError is a convenience method for logging errors. +func (c *Consumer) logError(message *sarama.ConsumerMessage, err error) { + if c.ErrorLogger == nil { + panic(err) + } + c.ErrorLogger.OnError(message, err) +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go b/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go new file mode 100644 index 00000000..0205ff00 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go @@ -0,0 +1,44 @@ +package main + +import ( + "fmt" + "github.com/matrix-org/dendrite/roomserver/input" + "github.com/matrix-org/dendrite/roomserver/storage" + sarama "gopkg.in/Shopify/sarama.v1" + "os" + "strings" +) + +var ( + database = os.Getenv("DATABASE") + kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",") + roomEventTopic = os.Getenv("TOPIC_ROOM_EVENT") +) + +func main() { + db, err := storage.Open(database) + if err != nil { + panic(err) + } + + kafkaConsumer, err := sarama.NewConsumer(kafkaURIs, nil) + if err != nil { + panic(err) + } + + consumer := input.Consumer{ + Consumer: kafkaConsumer, + DB: db, + RoomEventTopic: roomEventTopic, + } + + if err = consumer.Start(); err != nil { + panic(err) + } + + fmt.Println("Started roomserver") + + // Wait forever. + // TODO: Implement clean shutdown. + select {} +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go new file mode 100644 index 00000000..3e988f21 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go @@ -0,0 +1,70 @@ +package storage + +import ( + "database/sql" + "github.com/matrix-org/dendrite/roomserver/types" +) + +type statements struct { + selectPartitionOffsetsStmt *sql.Stmt + upsertPartitionOffsetStmt *sql.Stmt +} + +func (s *statements) prepare(db *sql.DB) error { + var err error + + _, err = db.Exec(partitionOffsetsSchema) + if err != nil { + return err + } + + if s.selectPartitionOffsetsStmt, err = db.Prepare(selectPartitionOffsetsSQL); err != nil { + return err + } + if s.upsertPartitionOffsetStmt, err = db.Prepare(upsertPartitionOffsetsSQL); err != nil { + return err + } + return nil +} + +const partitionOffsetsSchema = ` +-- The offsets that the server has processed up to. +CREATE TABLE IF NOT EXISTS partition_offsets ( + -- The name of the topic. + topic TEXT NOT NULL, + -- The 32-bit partition ID + partition INTEGER NOT NULL, + -- The 64-bit offset. + partition_offset BIGINT NOT NULL, + CONSTRAINT topic_partition_unique UNIQUE (topic, partition) +); +` + +const selectPartitionOffsetsSQL = "" + + "SELECT partition, partition_offset FROM partition_offsets WHERE topic = $1" + +const upsertPartitionOffsetsSQL = "" + + "INSERT INTO partition_offsets (topic, partition, partition_offset) VALUES ($1, $2, $3)" + + " ON CONFLICT ON CONSTRAINT topic_partition_unique" + + " DO UPDATE SET partition_offset = $3" + +func (s *statements) selectPartitionOffsets(topic string) ([]types.PartitionOffset, error) { + rows, err := s.selectPartitionOffsetsStmt.Query(topic) + if err != nil { + return nil, err + } + defer rows.Close() + var results []types.PartitionOffset + for rows.Next() { + var offset types.PartitionOffset + if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil { + return nil, err + } + } + return results, nil +} + +func (s *statements) upsertPartitionOffset(topic string, partition int32, offset int64) error { + _, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset) + return err +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go new file mode 100644 index 00000000..2b162a81 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -0,0 +1,37 @@ +package storage + +import ( + "database/sql" + // Import the postgres database driver. + _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/roomserver/types" +) + +// A Database is used to store room events and stream offsets. +type Database struct { + statements statements + db *sql.DB +} + +// Open a postgres database. +func Open(dataSourceName string) (*Database, error) { + var d Database + var err error + if d.db, err = sql.Open("postgres", dataSourceName); err != nil { + return nil, err + } + if err = d.statements.prepare(d.db); err != nil { + return nil, err + } + return &d, nil +} + +// PartitionOffsets implements input.ConsumerDatabase +func (d *Database) PartitionOffsets(topic string) ([]types.PartitionOffset, error) { + return d.statements.selectPartitionOffsets(topic) +} + +// SetPartitionOffset implements input.ConsumerDatabase +func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error { + return d.statements.upsertPartitionOffset(topic, partition, offset) +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/types/types.go b/src/github.com/matrix-org/dendrite/roomserver/types/types.go new file mode 100644 index 00000000..3c6dd5bb --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/types/types.go @@ -0,0 +1,10 @@ +// Package types provides the types that are used internally within the roomserver. +package types + +// A PartitionOffset is the offset into a partition of the input log. +type PartitionOffset struct { + // The ID of the partition. + Partition int32 + // The offset into the partition. + Offset int64 +}