diff --git a/dendrite-config.yaml b/dendrite-config.yaml index 30babd8e..d04707a6 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -56,6 +56,7 @@ kafka: topics: input_room_event: roomserverInput output_room_event: roomserverOutput + output_client_data: clientapiOutput user_updates: userUpdates # The postgres connection configs for connecting to the databases e.g a postgres:// URI diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/account_data_table.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/account_data_table.go index 63e84a66..0c1fc0ff 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/account_data_table.go +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/account_data_table.go @@ -44,12 +44,16 @@ const insertAccountDataSQL = ` const selectAccountDataSQL = "" + "SELECT room_id, type, content FROM account_data WHERE localpart = $1" +const selectAccountDataByTypeSQL = "" + + "SELECT content FROM account_data WHERE localpart = $1 AND room_id = $2 AND type = $3" + const deleteAccountDataSQL = "" + "DELETE FROM account_data WHERE localpart = $1 AND room_id = $2 AND type = $3" type accountDataStatements struct { - insertAccountDataStmt *sql.Stmt - selectAccountDataStmt *sql.Stmt + insertAccountDataStmt *sql.Stmt + selectAccountDataStmt *sql.Stmt + selectAccountDataByTypeStmt *sql.Stmt } func (s *accountDataStatements) prepare(db *sql.DB) (err error) { @@ -63,6 +67,9 @@ func (s *accountDataStatements) prepare(db *sql.DB) (err error) { if s.selectAccountDataStmt, err = db.Prepare(selectAccountDataSQL); err != nil { return } + if s.selectAccountDataByTypeStmt, err = db.Prepare(selectAccountDataByTypeSQL); err != nil { + return + } return } @@ -107,3 +114,31 @@ func (s *accountDataStatements) selectAccountData(localpart string) ( return } + +func (s *accountDataStatements) selectAccountDataByType( + localpart string, roomID string, dataType string, +) (data []gomatrixserverlib.ClientEvent, err error) { + data = []gomatrixserverlib.ClientEvent{} + + rows, err := s.selectAccountDataByTypeStmt.Query(localpart, roomID, dataType) + if err != nil { + return + } + + for rows.Next() { + var content []byte + + if err = rows.Scan(&content); err != nil { + return + } + + ac := gomatrixserverlib.ClientEvent{ + Type: dataType, + Content: content, + } + + data = append(data, ac) + } + + return +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go index a7b2c786..ca9deac0 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go @@ -224,6 +224,14 @@ func (d *Database) GetAccountData(localpart string) ( return d.accountDatas.selectAccountData(localpart) } +// GetAccountDataByType returns account data matching a given +// localpart, room ID and type. +// If no account data could be found, returns an empty array +// Returns an error if there was an issue with the retrieval +func (d *Database) GetAccountDataByType(localpart string, roomID string, dataType string) (data []gomatrixserverlib.ClientEvent, err error) { + return d.accountDatas.selectAccountDataByType(localpart, roomID, dataType) +} + func hashPassword(plaintext string) (hash string, err error) { hashBytes, err := bcrypt.GenerateFromPassword([]byte(plaintext), bcrypt.DefaultCost) return string(hashBytes), err diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go b/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go new file mode 100644 index 00000000..2597089e --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go @@ -0,0 +1,65 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "encoding/json" + + "github.com/matrix-org/dendrite/common" + + sarama "gopkg.in/Shopify/sarama.v1" +) + +// SyncAPIProducer produces events for the sync API server to consume +type SyncAPIProducer struct { + Topic string + Producer sarama.SyncProducer +} + +// NewSyncAPIProducer creates a new SyncAPIProducer +func NewSyncAPIProducer(kafkaURIs []string, topic string) (*SyncAPIProducer, error) { + producer, err := sarama.NewSyncProducer(kafkaURIs, nil) + if err != nil { + return nil, err + } + return &SyncAPIProducer{ + Topic: topic, + Producer: producer, + }, nil +} + +// SendData sends account data to the sync API server +func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error { + var m sarama.ProducerMessage + + data := common.AccountData{ + RoomID: roomID, + Type: dataType, + } + value, err := json.Marshal(data) + if err != nil { + return err + } + + m.Topic = string(p.Topic) + m.Key = sarama.StringEncoder(userID) + m.Value = sarama.ByteEncoder(value) + + if _, _, err := p.Producer.SendMessage(&m); err != nil { + return err + } + + return nil +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/readers/account_data.go b/src/github.com/matrix-org/dendrite/clientapi/readers/account_data.go index ca2c2232..d3bb932d 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/readers/account_data.go +++ b/src/github.com/matrix-org/dendrite/clientapi/readers/account_data.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -30,7 +31,7 @@ import ( // SaveAccountData implements PUT /user/{userId}/[rooms/{roomId}/]account_data/{type} func SaveAccountData( req *http.Request, accountDB *accounts.Database, device *authtypes.Device, - userID string, roomID string, dataType string, + userID string, roomID string, dataType string, syncProducer *producers.SyncAPIProducer, ) util.JSONResponse { if req.Method != "PUT" { return util.JSONResponse{ @@ -62,6 +63,10 @@ func SaveAccountData( return httputil.LogThenError(req, err) } + if err := syncProducer.SendData(userID, roomID, dataType); err != nil { + return httputil.LogThenError(req, err) + } + return util.JSONResponse{ Code: 200, JSON: struct{}{}, diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index 5a2935e8..aeb23164 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -48,6 +48,7 @@ func Setup( federation *gomatrixserverlib.FederationClient, keyRing gomatrixserverlib.KeyRing, userUpdateProducer *producers.UserUpdateProducer, + syncProducer *producers.SyncAPIProducer, ) { apiMux := mux.NewRouter() @@ -291,14 +292,14 @@ func Setup( r0mux.Handle("/user/{userID}/account_data/{type}", common.MakeAuthAPI("user_account_data", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) - return readers.SaveAccountData(req, accountDB, device, vars["userID"], "", vars["type"]) + return readers.SaveAccountData(req, accountDB, device, vars["userID"], "", vars["type"], syncProducer) }), ) r0mux.Handle("/user/{userID}/rooms/{roomID}/account_data/{type}", common.MakeAuthAPI("user_account_data", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) - return readers.SaveAccountData(req, accountDB, device, vars["userID"], vars["roomID"], vars["type"]) + return readers.SaveAccountData(req, accountDB, device, vars["userID"], vars["roomID"], vars["type"], syncProducer) }), ) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go index 0f8c035f..6f568b1a 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go @@ -61,6 +61,12 @@ func main() { if err != nil { log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err) } + syncProducer, err := producers.NewSyncAPIProducer( + cfg.Kafka.Addresses, string(cfg.Kafka.Topics.OutputClientData), + ) + if err != nil { + log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err) + } federation := gomatrixserverlib.NewFederationClient( cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey, @@ -99,7 +105,7 @@ func main() { routing.Setup( http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer, queryAPI, aliasAPI, accountDB, deviceDB, federation, keyRing, - userUpdateProducer, + userUpdateProducer, syncProducer, ) log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil)) } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go index c7870684..77ada412 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go @@ -73,12 +73,19 @@ func main() { if err = n.Load(db); err != nil { log.Panicf("startup: failed to set up notifier: %s", err) } - consumer, err := consumers.NewOutputRoomEvent(cfg, n, db) + roomConsumer, err := consumers.NewOutputRoomEvent(cfg, n, db) if err != nil { log.Panicf("startup: failed to create room server consumer: %s", err) } - if err = consumer.Start(); err != nil { - log.Panicf("startup: failed to start room server consumer") + if err = roomConsumer.Start(); err != nil { + log.Panicf("startup: failed to start room server consumer: %s", err) + } + clientConsumer, err := consumers.NewOutputClientData(cfg, n, db) + if err != nil { + log.Panicf("startup: failed to create client API server consumer: %s", err) + } + if err = clientConsumer.Start(); err != nil { + log.Panicf("startup: failed to start client API server consumer: %s", err) } log.Info("Starting sync server on ", cfg.Listen.SyncAPI) diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go index b0e36c42..e39a8980 100644 --- a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go @@ -54,6 +54,7 @@ var ( ) const inputTopic = "syncserverInput" +const clientTopic = "clientapiserverOutput" var exe = test.KafkaExecutor{ ZookeeperURI: zookeeperURI, @@ -134,6 +135,7 @@ func startSyncServer() (*exec.Cmd, chan error) { cfg.Matrix.ServerName = "localhost" cfg.Listen.SyncAPI = config.Address(syncserverAddr) cfg.Kafka.Topics.OutputRoomEvent = config.Topic(inputTopic) + cfg.Kafka.Topics.OutputClientData = config.Topic(clientTopic) if err := test.WriteConfig(cfg, dir); err != nil { panic(err) @@ -177,6 +179,10 @@ func prepareKafka() { if err := exe.CreateTopic(inputTopic); err != nil { panic(err) } + exe.DeleteTopic(clientTopic) + if err := exe.CreateTopic(clientTopic); err != nil { + panic(err) + } } func testSyncServer(syncServerCmdChan chan error, userID, since, want string) { diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index 4b362b5f..324561f6 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -98,6 +98,8 @@ type Dendrite struct { Topics struct { // Topic for roomserver/api.OutputRoomEvent events. OutputRoomEvent Topic `yaml:"output_room_event"` + // Topic for sending account data from client API to sync API + OutputClientData Topic `yaml:"output_client_data"` // Topic for user updates (profile, presence) UserUpdates Topic `yaml:"user_updates"` } @@ -298,6 +300,7 @@ func (config *Dendrite) check() error { checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses))) checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent)) + checkNotEmpty("kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData)) checkNotEmpty("database.account", string(config.Database.Account)) checkNotEmpty("database.device", string(config.Database.Device)) checkNotEmpty("database.server_key", string(config.Database.ServerKey)) diff --git a/src/github.com/matrix-org/dendrite/common/config/config_test.go b/src/github.com/matrix-org/dendrite/common/config/config_test.go index 7af61968..4275e3d4 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config_test.go +++ b/src/github.com/matrix-org/dendrite/common/config/config_test.go @@ -44,6 +44,7 @@ kafka: topics: input_room_event: input.room output_room_event: output.room + output_client_data: output.client database: media_api: "postgresql:///media_api" account: "postgresql:///account" diff --git a/src/github.com/matrix-org/dendrite/common/test/config.go b/src/github.com/matrix-org/dendrite/common/test/config.go index e429d06b..a28a08d5 100644 --- a/src/github.com/matrix-org/dendrite/common/test/config.go +++ b/src/github.com/matrix-org/dendrite/common/test/config.go @@ -82,6 +82,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con // TODO: Different servers should be using different topics. // Make this configurable somehow? cfg.Kafka.Topics.OutputRoomEvent = "test.room.output" + cfg.Kafka.Topics.OutputClientData = "test.clientapi.output" // TODO: Use different databases for the different schemas. // Using the same database for every schema currently works because diff --git a/src/github.com/matrix-org/dendrite/common/types.go b/src/github.com/matrix-org/dendrite/common/types.go new file mode 100644 index 00000000..471a2f30 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/common/types.go @@ -0,0 +1,22 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +// AccountData represents account data sent from the client API server to the +// sync API server +type AccountData struct { + RoomID string `json:"room_id"` + Type string `json:"type"` +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go new file mode 100644 index 00000000..a2a240ff --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go @@ -0,0 +1,91 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "encoding/json" + + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/sync" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// OutputClientData consumes events that originated in the client API server. +type OutputClientData struct { + clientAPIConsumer *common.ContinualConsumer + db *storage.SyncServerDatabase + notifier *sync.Notifier +} + +// NewOutputClientData creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. +func NewOutputClientData(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputClientData, error) { + kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) + if err != nil { + return nil, err + } + + consumer := common.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputClientData), + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &OutputClientData{ + clientAPIConsumer: &consumer, + db: store, + notifier: n, + } + consumer.ProcessMessage = s.onMessage + + return s, nil +} + +// Start consuming from room servers +func (s *OutputClientData) Start() error { + return s.clientAPIConsumer.Start() +} + +// onMessage is called when the sync server receives a new event from the client API server output log. +// It is not safe for this function to be called from multiple goroutines, or else the +// sync stream position may race and be incorrectly calculated. +func (s *OutputClientData) onMessage(msg *sarama.ConsumerMessage) error { + // Parse out the event JSON + var output common.AccountData + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("client API server output log: message parse failure") + return nil + } + + log.WithFields(log.Fields{ + "type": output.Type, + "room_id": output.RoomID, + }).Info("received data from client API server") + + syncStreamPos, err := s.db.UpsertAccountData(string(msg.Key), output.RoomID, output.Type) + if err != nil { + log.WithFields(log.Fields{ + "type": output.Type, + "room_id": output.RoomID, + log.ErrorKey: err, + }).Panicf("could not save account data") + } + + s.notifier.OnNewEvent(nil, string(msg.Key), syncStreamPos) + + return nil +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index 13159c87..c846705f 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -35,12 +35,9 @@ type OutputRoomEvent struct { db *storage.SyncServerDatabase notifier *sync.Notifier query api.RoomserverQueryAPI - serverName gomatrixserverlib.ServerName - keyID gomatrixserverlib.KeyID - privateKey []byte } -type prevMembership struct { +type prevEventRef struct { PrevContent json.RawMessage `json:"prev_content"` PrevID string `json:"replaces_state"` UserID string `json:"prev_sender"` @@ -64,9 +61,6 @@ func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.S db: store, notifier: n, query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil), - serverName: cfg.Matrix.ServerName, - keyID: cfg.Matrix.KeyID, - privateKey: cfg.Matrix.PrivateKey, } consumer.ProcessMessage = s.onMessage @@ -113,13 +107,13 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: state event lookup failure") } - ev, err = s.updateStateEvent(ev, s.keyID, s.privateKey) + ev, err = s.updateStateEvent(ev) if err != nil { return err } for i := range addsStateEvents { - addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i], s.keyID, s.privateKey) + addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i]) if err != nil { return err } @@ -139,7 +133,7 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: write event failure") return nil } - s.notifier.OnNewEvent(&ev, types.StreamPosition(syncStreamPos)) + s.notifier.OnNewEvent(&ev, "", types.StreamPosition(syncStreamPos)) return nil } @@ -201,10 +195,7 @@ func (s *OutputRoomEvent) lookupStateEvents( return result, nil } -func (s *OutputRoomEvent) updateStateEvent( - event gomatrixserverlib.Event, keyID gomatrixserverlib.KeyID, - privateKey []byte, -) (gomatrixserverlib.Event, error) { +func (s *OutputRoomEvent) updateStateEvent(event gomatrixserverlib.Event) (gomatrixserverlib.Event, error) { var stateKey string if event.StateKey() == nil { stateKey = "" @@ -221,7 +212,7 @@ func (s *OutputRoomEvent) updateStateEvent( return event, nil } - prev := prevMembership{ + prev := prevEventRef{ PrevContent: prevEvent.Content(), PrevID: prevEvent.EventID(), UserID: prevEvent.Sender(), diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go new file mode 100644 index 00000000..f95e4858 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go @@ -0,0 +1,113 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "database/sql" + + "github.com/matrix-org/dendrite/syncapi/types" +) + +const accountDataSchema = ` +-- Stores the users account data +CREATE TABLE IF NOT EXISTS account_data_type ( + -- The highest numeric ID from the output_room_events at the time of saving the data + id BIGINT, + -- ID of the user the data belongs to + user_id TEXT NOT NULL, + -- ID of the room the data is related to (empty string if not related to a specific room) + room_id TEXT NOT NULL, + -- Type of the data + type TEXT NOT NULL, + + PRIMARY KEY(user_id, room_id, type), + + -- We don't want two entries of the same type for the same user + CONSTRAINT account_data_unique UNIQUE (user_id, room_id, type) +); + +CREATE UNIQUE INDEX IF NOT EXISTS account_data_id_idx ON account_data_type(id); +` + +const insertAccountDataSQL = "" + + "INSERT INTO account_data_type (id, user_id, room_id, type) VALUES ($1, $2, $3, $4)" + + " ON CONFLICT ON CONSTRAINT account_data_unique" + + " DO UPDATE SET id = EXCLUDED.id" + +const selectAccountDataInRangeSQL = "" + + "SELECT room_id, type FROM account_data_type" + + " WHERE user_id = $1 AND id > $2 AND id <= $3" + + " ORDER BY id ASC" + +type accountDataStatements struct { + insertAccountDataStmt *sql.Stmt + selectAccountDataInRangeStmt *sql.Stmt +} + +func (s *accountDataStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(accountDataSchema) + if err != nil { + return + } + if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil { + return + } + if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil { + return + } + return +} + +func (s *accountDataStatements) insertAccountData( + pos types.StreamPosition, userID string, roomID string, dataType string, +) (err error) { + _, err = s.insertAccountDataStmt.Exec(pos, userID, roomID, dataType) + return +} + +func (s *accountDataStatements) selectAccountDataInRange( + userID string, oldPos types.StreamPosition, newPos types.StreamPosition, +) (data map[string][]string, err error) { + data = make(map[string][]string) + + // If both positions are the same, it means that the data was saved after the + // latest room event. In that case, we need to decrement the old position as + // it would prevent the SQL request from returning anything. + if oldPos == newPos { + oldPos-- + } + + rows, err := s.selectAccountDataInRangeStmt.Query(userID, oldPos, newPos) + if err != nil { + return + } + + for rows.Next() { + var dataType string + var roomID string + + if err = rows.Scan(&roomID, &dataType); err != nil { + return + } + + if len(data[roomID]) > 0 { + data[roomID] = append(data[roomID], dataType) + } else { + data[roomID] = []string{dataType} + } + } + + return +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 46231c77..2433b68c 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -41,10 +41,11 @@ type streamEvent struct { // SyncServerDatabase represents a sync server database type SyncServerDatabase struct { - db *sql.DB - partitions common.PartitionOffsetStatements - events outputRoomEventsStatements - roomstate currentRoomStateStatements + db *sql.DB + partitions common.PartitionOffsetStatements + accountData accountDataStatements + events outputRoomEventsStatements + roomstate currentRoomStateStatements } // NewSyncServerDatabase creates a new sync server database @@ -58,6 +59,10 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { if err = partitions.Prepare(db); err != nil { return nil, err } + accountData := accountDataStatements{} + if err = accountData.prepare(db); err != nil { + return nil, err + } events := outputRoomEventsStatements{} if err = events.prepare(db); err != nil { return nil, err @@ -66,7 +71,7 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { if err := state.prepare(db); err != nil { return nil, err } - return &SyncServerDatabase{db, partitions, events, state}, nil + return &SyncServerDatabase{db, partitions, accountData, events, state}, nil } // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. @@ -274,6 +279,33 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom return } +// GetAccountDataInRange returns all account data for a given user inserted or +// updated between two given positions +// Returns a map following the format data[roomID] = []dataTypes +// If no data is retrieved, returns an empty map +// If there was an issue with the retrieval, returns an error +func (d *SyncServerDatabase) GetAccountDataInRange( + userID string, oldPos types.StreamPosition, newPos types.StreamPosition, +) (map[string][]string, error) { + return d.accountData.selectAccountDataInRange(userID, oldPos, newPos) +} + +// UpsertAccountData keeps track of new or updated account data, by saving the type +// of the new/updated data, and the user ID and room ID the data is related to (empty) +// room ID means the data isn't specific to any room) +// If no data with the given type, user ID and room ID exists in the database, +// creates a new row, else update the existing one +// Returns an error if there was an issue with the upsert +func (d *SyncServerDatabase) UpsertAccountData(userID string, roomID string, dataType string) (types.StreamPosition, error) { + pos, err := d.SyncStreamPosition() + if err != nil { + return pos, err + } + + err = d.accountData.insertAccountData(pos, userID, roomID, dataType) + return pos, err +} + func (d *SyncServerDatabase) addInvitesToResponse(txn *sql.Tx, userID string, res *types.Response) error { // Add invites - TODO: This will break over federation as they won't be in the current state table according to Mark. roomIDs, err := d.roomstate.selectRoomIDsWithMembership(txn, userID, "invite") diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index 814660a7..c2fdd8f0 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -54,39 +54,44 @@ func NewNotifier(pos types.StreamPosition) *Notifier { // OnNewEvent is called when a new event is received from the room server. Must only be // called from a single goroutine, to avoid races between updates which could set the // current position in the stream incorrectly. -func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) { +// Can be called either with a *gomatrixserverlib.Event, or with an user ID +func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos types.StreamPosition) { // update the current position then notify relevant /sync streams. // This needs to be done PRIOR to waking up users as they will read this value. n.streamLock.Lock() defer n.streamLock.Unlock() n.currPos = pos - // Map this event's room_id to a list of joined users, and wake them up. - userIDs := n.joinedUsers(ev.RoomID()) - // If this is an invite, also add in the invitee to this list. - if ev.Type() == "m.room.member" && ev.StateKey() != nil { - userID := *ev.StateKey() - membership, err := ev.Membership() - if err != nil { - log.WithError(err).WithField("event_id", ev.EventID()).Errorf( - "Notifier.OnNewEvent: Failed to unmarshal member event", - ) - } else { - // Keep the joined user map up-to-date - switch membership { - case "invite": - userIDs = append(userIDs, userID) - case "join": - n.addJoinedUser(ev.RoomID(), userID) - case "leave": - fallthrough - case "ban": - n.removeJoinedUser(ev.RoomID(), userID) + if ev != nil { + // Map this event's room_id to a list of joined users, and wake them up. + userIDs := n.joinedUsers(ev.RoomID()) + // If this is an invite, also add in the invitee to this list. + if ev.Type() == "m.room.member" && ev.StateKey() != nil { + userID := *ev.StateKey() + membership, err := ev.Membership() + if err != nil { + log.WithError(err).WithField("event_id", ev.EventID()).Errorf( + "Notifier.OnNewEvent: Failed to unmarshal member event", + ) + } else { + // Keep the joined user map up-to-date + switch membership { + case "invite": + userIDs = append(userIDs, userID) + case "join": + n.addJoinedUser(ev.RoomID(), userID) + case "leave": + fallthrough + case "ban": + n.removeJoinedUser(ev.RoomID(), userID) + } } } - } - for _, userID := range userIDs { + for _, userID := range userIDs { + n.wakeupUser(userID, pos) + } + } else if len(userID) > 0 { n.wakeupUser(userID, pos) } } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go index 03e39da0..358243bc 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -123,7 +123,7 @@ func TestNewEventAndJoinedToRoom(t *testing.T) { stream := n.fetchUserStream(bob, true) waitForBlocking(stream, 1) - n.OnNewEvent(&randomMessageEvent, streamPositionAfter) + n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter) wg.Wait() } @@ -151,7 +151,7 @@ func TestNewInviteEventForUser(t *testing.T) { stream := n.fetchUserStream(bob, true) waitForBlocking(stream, 1) - n.OnNewEvent(&aliceInviteBobEvent, streamPositionAfter) + n.OnNewEvent(&aliceInviteBobEvent, "", streamPositionAfter) wg.Wait() } @@ -182,7 +182,7 @@ func TestMultipleRequestWakeup(t *testing.T) { stream := n.fetchUserStream(bob, true) waitForBlocking(stream, 3) - n.OnNewEvent(&randomMessageEvent, streamPositionAfter) + n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter) wg.Wait() @@ -217,7 +217,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { }() bobStream := n.fetchUserStream(bob, true) waitForBlocking(bobStream, 1) - n.OnNewEvent(&bobLeaveEvent, streamPositionAfter) + n.OnNewEvent(&bobLeaveEvent, "", streamPositionAfter) leaveWG.Wait() // send an event into the room. Make sure alice gets it. Bob should not. @@ -246,7 +246,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { waitForBlocking(aliceStream, 1) waitForBlocking(bobStream, 1) - n.OnNewEvent(&randomMessageEvent, streamPositionAfter2) + n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter2) aliceWG.Wait() // it's possible that at this point alice has been informed and bob is about to be informed, so wait diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index 953e5f4f..a207b815 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -80,7 +80,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype if err != nil { res = httputil.LogThenError(req, err) } else { - syncData, err = rp.appendAccountData(syncData, device.UserID) + syncData, err = rp.appendAccountData(syncData, device.UserID, *syncReq, currentPos) if err != nil { res = httputil.LogThenError(req, err) } else { @@ -113,7 +113,9 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.Stre return rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit) } -func (rp *RequestPool) appendAccountData(data *types.Response, userID string) (*types.Response, error) { +func (rp *RequestPool) appendAccountData( + data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition, +) (*types.Response, error) { // TODO: We currently send all account data on every sync response, we should instead send data // that has changed on incremental sync responses localpart, _, err := gomatrixserverlib.SplitID('@', userID) @@ -121,16 +123,56 @@ func (rp *RequestPool) appendAccountData(data *types.Response, userID string) (* return nil, err } - global, rooms, err := rp.accountDB.GetAccountData(localpart) + if req.since == types.StreamPosition(0) { + // If this is the initial sync, we don't need to check if a data has + // already been sent. Instead, we send the whole batch. + var global []gomatrixserverlib.ClientEvent + var rooms map[string][]gomatrixserverlib.ClientEvent + global, rooms, err = rp.accountDB.GetAccountData(localpart) + if err != nil { + return nil, err + } + data.AccountData.Events = global + + for r, j := range data.Rooms.Join { + if len(rooms[r]) > 0 { + j.AccountData.Events = rooms[r] + data.Rooms.Join[r] = j + } + } + + return data, nil + } + + // Sync is not initial, get all account data since the latest sync + dataTypes, err := rp.db.GetAccountDataInRange(userID, req.since, currentPos) if err != nil { return nil, err } - data.AccountData.Events = global - for r, j := range data.Rooms.Join { - if len(rooms[r]) > 0 { - j.AccountData.Events = rooms[r] - data.Rooms.Join[r] = j + if len(dataTypes) == 0 { + return data, nil + } + + // Iterate over the rooms + for roomID, dataTypes := range dataTypes { + events := []gomatrixserverlib.ClientEvent{} + // Request the missing data from the database + for _, dataType := range dataTypes { + evs, err := rp.accountDB.GetAccountDataByType(localpart, roomID, dataType) + if err != nil { + return nil, err + } + events = append(events, evs...) + } + + // Append the data to the response + if len(roomID) > 0 { + jr := data.Rooms.Join[roomID] + jr.AccountData.Events = events + data.Rooms.Join[roomID] = jr + } else { + data.AccountData.Events = events } }