Fetching missing state from the roomserver. (#135)

* Fetching missing state from the roomserver.

Whenever the syncserver receives an event from the room server that adds
state that isn't in the syncserver's local database it should fetch
those state events from the roomserver.

* Fix append

* Put comment back

* Comments

* s/addsStateEvents/lookupStateEvents/

* Fix spelling

* Include the stream position that a state event was added at in the current state tables

* Fix comment

* Review comments
This commit is contained in:
Mark Haines 2017-06-07 16:35:41 +01:00 committed by GitHub
parent 515cce1a45
commit b184a48897
5 changed files with 200 additions and 48 deletions

View File

@ -20,6 +20,8 @@ import (
// Sync contains the config information necessary to spin up a sync-server process. // Sync contains the config information necessary to spin up a sync-server process.
type Sync struct { type Sync struct {
// Where the room server is listening for queries.
RoomserverURL string `yaml:"roomserver_url"`
// The topic for events which are written by the room server output log. // The topic for events which are written by the room server output log.
RoomserverOutputTopic string `yaml:"roomserver_topic"` RoomserverOutputTopic string `yaml:"roomserver_topic"`
// A list of URIs to consume events from. These kafka logs should be produced by a Room Server. // A list of URIs to consume events from. These kafka logs should be produced by a Room Server.

View File

@ -16,6 +16,7 @@ package consumers
import ( import (
"encoding/json" "encoding/json"
"fmt"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
@ -33,6 +34,7 @@ type OutputRoomEvent struct {
roomServerConsumer *common.ContinualConsumer roomServerConsumer *common.ContinualConsumer
db *storage.SyncServerDatabase db *storage.SyncServerDatabase
notifier *sync.Notifier notifier *sync.Notifier
query api.RoomserverQueryAPI
} }
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
@ -51,6 +53,7 @@ func NewOutputRoomEvent(cfg *config.Sync, n *sync.Notifier, store *storage.SyncS
roomServerConsumer: &consumer, roomServerConsumer: &consumer,
db: store, db: store,
notifier: n, notifier: n,
query: api.NewRoomserverQueryAPIHTTP(cfg.RoomserverURL, nil),
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
@ -84,7 +87,19 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
"room_id": ev.RoomID(), "room_id": ev.RoomID(),
}).Info("received event from roomserver") }).Info("received event from roomserver")
syncStreamPos, err := s.db.WriteEvent(&ev, output.AddsStateEventIDs, output.RemovesStateEventIDs) addsStateEvents, err := s.lookupStateEvents(output.AddsStateEventIDs, ev)
if err != nil {
log.WithFields(log.Fields{
"event": string(ev.JSON()),
log.ErrorKey: err,
"add": output.AddsStateEventIDs,
"del": output.RemovesStateEventIDs,
}).Panicf("roomserver output log: state event lookup failure")
}
syncStreamPos, err := s.db.WriteEvent(
&ev, addsStateEvents, output.AddsStateEventIDs, output.RemovesStateEventIDs,
)
if err != nil { if err != nil {
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database
@ -100,3 +115,74 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
// lookupStateEvents looks up the state events that are added by a new event.
func (s *OutputRoomEvent) lookupStateEvents(
addsStateEventIDs []string, event gomatrixserverlib.Event,
) ([]gomatrixserverlib.Event, error) {
// Fast path if there aren't any new state events.
if len(addsStateEventIDs) == 0 {
return nil, nil
}
// Fast path if the only state event added is the event itself.
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
return []gomatrixserverlib.Event{event}, nil
}
// Check if this is re-adding a state events that we previously processed
// If we have previously received a state event it may still be in
// our event database.
result, err := s.db.Events(addsStateEventIDs)
if err != nil {
return nil, err
}
missing := missingEventsFrom(result, addsStateEventIDs)
// Check if event itself is being added.
for _, eventID := range missing {
if eventID == event.EventID() {
result = append(result, event)
break
}
}
missing = missingEventsFrom(result, addsStateEventIDs)
if len(missing) == 0 {
return result, nil
}
// At this point the missing events are neither the event itself nor are
// they present in our local database. Our only option is to fetch them
// from the roomserver using the query API.
eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
var eventResp api.QueryEventsByIDResponse
if err := s.query.QueryEventsByID(&eventReq, &eventResp); err != nil {
return nil, err
}
result = append(result, eventResp.Events...)
missing = missingEventsFrom(result, addsStateEventIDs)
if len(missing) != 0 {
return nil, fmt.Errorf(
"missing %d state events IDs at event %q", len(missing), event.EventID(),
)
}
return result, nil
}
func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string {
have := map[string]bool{}
for _, event := range events {
have[event.EventID()] = true
}
var missing []string
for _, eventID := range required {
if !have[eventID] {
missing = append(missing, eventID)
}
}
return missing
}

View File

@ -16,6 +16,7 @@ package storage
import ( import (
"database/sql" "database/sql"
"github.com/lib/pq"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -35,6 +36,9 @@ CREATE TABLE IF NOT EXISTS current_room_state (
-- The 'content.membership' value if this event is an m.room.member event. For other -- The 'content.membership' value if this event is an m.room.member event. For other
-- events, this will be NULL. -- events, this will be NULL.
membership TEXT, membership TEXT,
-- The serial ID of the output_room_events table when this event became
-- part of the current state of the room.
added_at BIGINT,
-- Clobber based on 3-uple of room_id, type and state_key -- Clobber based on 3-uple of room_id, type and state_key
CONSTRAINT room_state_unique UNIQUE (room_id, type, state_key) CONSTRAINT room_state_unique UNIQUE (room_id, type, state_key)
); );
@ -45,9 +49,10 @@ CREATE INDEX IF NOT EXISTS membership_idx ON current_room_state(type, state_key,
` `
const upsertRoomStateSQL = "" + const upsertRoomStateSQL = "" +
"INSERT INTO current_room_state (room_id, event_id, type, state_key, event_json, membership) VALUES ($1, $2, $3, $4, $5, $6)" + "INSERT INTO current_room_state (room_id, event_id, type, state_key, event_json, membership, added_at)" +
" VALUES ($1, $2, $3, $4, $5, $6, $7)" +
" ON CONFLICT ON CONSTRAINT room_state_unique" + " ON CONFLICT ON CONSTRAINT room_state_unique" +
" DO UPDATE SET event_id = $2, event_json = $5, membership = $6" " DO UPDATE SET event_id = $2, event_json = $5, membership = $6, added_at = $7"
const deleteRoomStateByEventIDSQL = "" + const deleteRoomStateByEventIDSQL = "" +
"DELETE FROM current_room_state WHERE event_id = $1" "DELETE FROM current_room_state WHERE event_id = $1"
@ -61,12 +66,16 @@ const selectCurrentStateSQL = "" +
const selectJoinedUsersSQL = "" + const selectJoinedUsersSQL = "" +
"SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'" "SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
const selectEventsWithEventIDsSQL = "" +
"SELECT added_at, event_json FROM current_room_state WHERE event_id = ANY($1)"
type currentRoomStateStatements struct { type currentRoomStateStatements struct {
upsertRoomStateStmt *sql.Stmt upsertRoomStateStmt *sql.Stmt
deleteRoomStateByEventIDStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt
selectRoomIDsWithMembershipStmt *sql.Stmt selectRoomIDsWithMembershipStmt *sql.Stmt
selectCurrentStateStmt *sql.Stmt selectCurrentStateStmt *sql.Stmt
selectJoinedUsersStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt
selectEventsWithEventIDsStmt *sql.Stmt
} }
func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
@ -89,6 +98,9 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil { if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
return return
} }
if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
return
}
return return
} }
@ -141,6 +153,33 @@ func (s *currentRoomStateStatements) selectCurrentState(txn *sql.Tx, roomID stri
} }
defer rows.Close() defer rows.Close()
return rowsToEvents(rows)
}
func (s *currentRoomStateStatements) deleteRoomStateByEventID(txn *sql.Tx, eventID string) error {
_, err := txn.Stmt(s.deleteRoomStateByEventIDStmt).Exec(eventID)
return err
}
func (s *currentRoomStateStatements) upsertRoomState(
txn *sql.Tx, event gomatrixserverlib.Event, membership *string, addedAt int64,
) error {
_, err := txn.Stmt(s.upsertRoomStateStmt).Exec(
event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership, addedAt,
)
return err
}
func (s *currentRoomStateStatements) selectEventsWithEventIDs(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) {
rows, err := txn.Stmt(s.selectEventsWithEventIDsStmt).Query(pq.StringArray(eventIDs))
if err != nil {
return nil, err
}
defer rows.Close()
return rowsToStreamEvents(rows)
}
func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
var result []gomatrixserverlib.Event var result []gomatrixserverlib.Event
for rows.Next() { for rows.Next() {
var eventBytes []byte var eventBytes []byte
@ -156,15 +195,3 @@ func (s *currentRoomStateStatements) selectCurrentState(txn *sql.Tx, roomID stri
} }
return result, nil return result, nil
} }
func (s *currentRoomStateStatements) deleteRoomStateByEventID(txn *sql.Tx, eventID string) error {
_, err := txn.Stmt(s.deleteRoomStateByEventIDStmt).Exec(eventID)
return err
}
func (s *currentRoomStateStatements) upsertRoomState(txn *sql.Tx, event gomatrixserverlib.Event, membership *string) error {
_, err := txn.Stmt(s.upsertRoomStateStmt).Exec(
event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership,
)
return err
}

View File

@ -16,7 +16,6 @@ package storage
import ( import (
"database/sql" "database/sql"
"fmt"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/lib/pq" "github.com/lib/pq"
@ -193,7 +192,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents(
return nil, err return nil, err
} }
defer rows.Close() defer rows.Close()
events, err := rowsToEvents(rows) events, err := rowsToStreamEvents(rows)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -205,23 +204,19 @@ func (s *outputRoomEventsStatements) selectRecentEvents(
// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing // Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing
// from the database. // from the database.
func (s *outputRoomEventsStatements) selectEvents(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) { func (s *outputRoomEventsStatements) selectEvents(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) {
rows, err := txn.Stmt(s.selectEventsStmt).Query(pq.StringArray(eventIDs)) stmt := s.selectEventsStmt
if txn != nil {
stmt = txn.Stmt(stmt)
}
rows, err := stmt.Query(pq.StringArray(eventIDs))
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer rows.Close() defer rows.Close()
result, err := rowsToEvents(rows) return rowsToStreamEvents(rows)
if err != nil {
return nil, err
}
if len(result) != len(eventIDs) {
return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(result), len(eventIDs))
}
return result, nil
} }
func rowsToEvents(rows *sql.Rows) ([]streamEvent, error) { func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) {
var result []streamEvent var result []streamEvent
for rows.Next() { for rows.Next() {
var ( var (

View File

@ -17,6 +17,7 @@ package storage
import ( import (
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"fmt"
// Import the postgres database driver. // Import the postgres database driver.
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/matrix-org/dendrite/clientapi/events" "github.com/matrix-org/dendrite/clientapi/events"
@ -75,10 +76,24 @@ func (d *SyncServerDatabase) AllJoinedUsersInRooms() (map[string][]string, error
return d.roomstate.selectJoinedUsers() return d.roomstate.selectJoinedUsers()
} }
// Events lookups a list of event by their event ID.
// Returns a list of events matching the requested IDs found in the database.
// If an event is not found in the database then it will be omitted from the list.
// Returns an error if there was a problem talking with the database
func (d *SyncServerDatabase) Events(eventIDs []string) ([]gomatrixserverlib.Event, error) {
streamEvents, err := d.events.selectEvents(nil, eventIDs)
if err != nil {
return nil, err
}
return streamEventsToEvents(streamEvents), nil
}
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
// when generating the stream position for this event. Returns the sync stream position for the inserted event. // when generating the stream position for this event. Returns the sync stream position for the inserted event.
// Returns an error if there was a problem inserting this event. // Returns an error if there was a problem inserting this event.
func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos types.StreamPosition, returnErr error) { func (d *SyncServerDatabase) WriteEvent(
ev *gomatrixserverlib.Event, addStateEvents []gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string,
) (streamPos types.StreamPosition, returnErr error) {
returnErr = runTransaction(d.db, func(txn *sql.Tx) error { returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
var err error var err error
pos, err := d.events.insertEvent(txn, ev, addStateEventIDs, removeStateEventIDs) pos, err := d.events.insertEvent(txn, ev, addStateEventIDs, removeStateEventIDs)
@ -87,31 +102,19 @@ func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEve
} }
streamPos = types.StreamPosition(pos) streamPos = types.StreamPosition(pos)
if len(addStateEventIDs) == 0 && len(removeStateEventIDs) == 0 { if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 {
// Nothing to do, the event may have just been a message event. // Nothing to do, the event may have just been a message event.
return nil return nil
} }
// Update the current room state based on the added/removed state event IDs. return d.updateRoomState(txn, removeStateEventIDs, addStateEvents, streamPos)
// In the common case there is a single added event ID which is the state event itself, assuming `ev` is a state event.
// However, conflict resolution may result in there being different events being added, or even some removed.
if len(removeStateEventIDs) == 0 && len(addStateEventIDs) == 1 && addStateEventIDs[0] == ev.EventID() {
// common case
return d.updateRoomState(txn, nil, []gomatrixserverlib.Event{*ev})
}
// uncommon case: we need to fetch the full event for each event ID mentioned, then update room state
added, err := d.events.selectEvents(txn, addStateEventIDs)
if err != nil {
return err
}
return d.updateRoomState(txn, removeStateEventIDs, streamEventsToEvents(added))
}) })
return return
} }
func (d *SyncServerDatabase) updateRoomState(txn *sql.Tx, removedEventIDs []string, addedEvents []gomatrixserverlib.Event) error { func (d *SyncServerDatabase) updateRoomState(
txn *sql.Tx, removedEventIDs []string, addedEvents []gomatrixserverlib.Event, streamPos types.StreamPosition,
) error {
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add. // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
for _, eventID := range removedEventIDs { for _, eventID := range removedEventIDs {
if err := d.roomstate.deleteRoomStateByEventID(txn, eventID); err != nil { if err := d.roomstate.deleteRoomStateByEventID(txn, eventID); err != nil {
@ -132,7 +135,7 @@ func (d *SyncServerDatabase) updateRoomState(txn *sql.Tx, removedEventIDs []stri
} }
membership = &memberContent.Membership membership = &memberContent.Membership
} }
if err := d.roomstate.upsertRoomState(txn, event, membership); err != nil { if err := d.roomstate.upsertRoomState(txn, event, membership, int64(streamPos)); err != nil {
return err return err
} }
} }
@ -310,7 +313,7 @@ func (d *SyncServerDatabase) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet ma
for _, missingEvIDs := range missingEvents { for _, missingEvIDs := range missingEvents {
allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...) allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...)
} }
evs, err := d.events.selectEvents(txn, allMissingEventIDs) evs, err := d.fetchMissingStateEvents(txn, allMissingEventIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -323,6 +326,45 @@ func (d *SyncServerDatabase) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet ma
return stateBetween, nil return stateBetween, nil
} }
func (d *SyncServerDatabase) fetchMissingStateEvents(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) {
// Fetch from the events table first so we pick up the stream ID for the
// event.
events, err := d.events.selectEvents(txn, eventIDs)
if err != nil {
return nil, err
}
have := map[string]bool{}
for _, event := range events {
have[event.EventID()] = true
}
var missing []string
for _, eventID := range eventIDs {
if !have[eventID] {
missing = append(missing, eventID)
}
}
if len(missing) == 0 {
return events, nil
}
// If they are missing from the events table then they should be state
// events that we received from outside the main event stream.
// These should be in the room state table.
stateEvents, err := d.roomstate.selectEventsWithEventIDs(txn, missing)
if err != nil {
return nil, err
}
if len(stateEvents) != len(missing) {
return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing))
}
for _, e := range stateEvents {
events = append(events, e)
}
return events, nil
}
func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.StreamPosition, userID string) ([]stateDelta, error) { func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.StreamPosition, userID string) ([]stateDelta, error) {
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
// - Get membership list changes for this user in this sync response // - Get membership list changes for this user in this sync response