diff --git a/federationapi/routing/backfill.go b/federationapi/routing/backfill.go index 651a4a2d..1f46b240 100644 --- a/federationapi/routing/backfill.go +++ b/federationapi/routing/backfill.go @@ -69,9 +69,14 @@ func Backfill( // Populate the request. req := api.QueryBackfillRequest{ - RoomID: roomID, - EarliestEventsIDs: eIDs, - ServerName: request.Origin(), + RoomID: roomID, + // we don't know who the successors are for these events, which won't + // be a problem because we don't use that information when servicing /backfill requests, + // only when making them. TODO: Think of a better API shape + BackwardsExtremities: map[string][]string{ + "": eIDs, + }, + ServerName: request.Origin(), } if req.Limit, err = strconv.Atoi(limit); err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("strconv.Atoi failed") @@ -105,6 +110,12 @@ func Backfill( eventJSONs = append(eventJSONs, e.JSON()) } + // sytest wants these in reversed order, similar to /messages, so reverse them now. + for i := len(eventJSONs)/2 - 1; i >= 0; i-- { + opp := len(eventJSONs) - 1 - i + eventJSONs[i], eventJSONs[opp] = eventJSONs[opp], eventJSONs[i] + } + txn := gomatrixserverlib.Transaction{ Origin: cfg.Matrix.ServerName, PDUs: eventJSONs, diff --git a/roomserver/api/query.go b/roomserver/api/query.go index 9afc51f4..e92d2a99 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -21,6 +21,7 @@ import ( commonHTTP "github.com/matrix-org/dendrite/common/http" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" opentracing "github.com/opentracing/opentracing-go" ) @@ -228,14 +229,24 @@ type QueryStateAndAuthChainResponse struct { type QueryBackfillRequest struct { // The room to backfill RoomID string `json:"room_id"` - // Events to start paginating from. - EarliestEventsIDs []string `json:"earliest_event_ids"` + // A map of backwards extremity event ID to a list of its prev_event IDs. + BackwardsExtremities map[string][]string `json:"backwards_extremities"` // The maximum number of events to retrieve. Limit int `json:"limit"` // The server interested in the events. ServerName gomatrixserverlib.ServerName `json:"server_name"` } +// PrevEventIDs returns the prev_event IDs of all backwards extremities, de-duplicated in a lexicographically sorted order. +func (r *QueryBackfillRequest) PrevEventIDs() []string { + var prevEventIDs []string + for _, pes := range r.BackwardsExtremities { + prevEventIDs = append(prevEventIDs, pes...) + } + prevEventIDs = util.UniqueStrings(prevEventIDs) + return prevEventIDs +} + // QueryBackfillResponse is a response to QueryBackfill. type QueryBackfillResponse struct { // Missing events, arbritrary order. diff --git a/roomserver/internal/query.go b/roomserver/internal/query.go index 8043746c..2d1c21c5 100644 --- a/roomserver/internal/query.go +++ b/roomserver/internal/query.go @@ -495,14 +495,8 @@ func (r *RoomserverInternalAPI) QueryBackfill( // defines the highest number of elements in the map below. visited := make(map[string]bool, request.Limit) - // The provided event IDs have already been seen by the request's emitter, - // and will be retrieved anyway, so there's no need to care about them if - // they appear in our exploration of the event tree. - for _, id := range request.EarliestEventsIDs { - visited[id] = true - } - - front = request.EarliestEventsIDs + // this will include these events which is what we want + front = request.PrevEventIDs() // Scan the event tree for events to send back. resultNIDs, err := r.scanEventTree(ctx, front, visited, request.Limit, request.ServerName) @@ -534,7 +528,7 @@ func (r *RoomserverInternalAPI) backfillViaFederation(ctx context.Context, req * if err != nil { return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err) } - requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName) + requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName, req.BackwardsExtremities) // Request 100 items regardless of what the query asks for. // We don't want to go much higher than this. // We can't honour exactly the limit as some sytests rely on requesting more for tests to pass @@ -542,7 +536,7 @@ func (r *RoomserverInternalAPI) backfillViaFederation(ctx context.Context, req * // Specifically the test "Outbound federation can backfill events" events, err := gomatrixserverlib.RequestBackfill( ctx, requester, - r.KeyRing, req.RoomID, roomVer, req.EarliestEventsIDs, 100) + r.KeyRing, req.RoomID, roomVer, req.PrevEventIDs(), 100) if err != nil { return err } diff --git a/roomserver/internal/query_backfill.go b/roomserver/internal/query_backfill.go index 82f7238d..49e0af34 100644 --- a/roomserver/internal/query_backfill.go +++ b/roomserver/internal/query_backfill.go @@ -16,6 +16,7 @@ type backfillRequester struct { db storage.Database fedClient *gomatrixserverlib.FederationClient thisServer gomatrixserverlib.ServerName + bwExtrems map[string][]string // per-request state servers []gomatrixserverlib.ServerName @@ -23,13 +24,14 @@ type backfillRequester struct { eventIDMap map[string]gomatrixserverlib.Event } -func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName) *backfillRequester { +func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName, bwExtrems map[string][]string) *backfillRequester { return &backfillRequester{ db: db, fedClient: fedClient, thisServer: thisServer, eventIDToBeforeStateIDs: make(map[string][]string), eventIDMap: make(map[string]gomatrixserverlib.Event), + bwExtrems: bwExtrems, } } @@ -160,26 +162,44 @@ func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatr // It returns a list of servers which can be queried for backfill requests. These servers // will be servers that are in the room already. The entries at the beginning are preferred servers // and will be tried first. An empty list will fail the request. -func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID string) (servers []gomatrixserverlib.ServerName) { +func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID string) []gomatrixserverlib.ServerName { + // eventID will be a prev_event ID of a backwards extremity, meaning we will not have a database entry for it. Instead, use + // its successor, so look it up. + successor := "" +FindSuccessor: + for sucID, prevEventIDs := range b.bwExtrems { + for _, pe := range prevEventIDs { + if pe == eventID { + successor = sucID + break FindSuccessor + } + } + } + if successor == "" { + logrus.WithField("event_id", eventID).Error("ServersAtEvent: failed to find successor of this event to determine room state") + return nil + } + eventID = successor + // getMembershipsBeforeEventNID requires a NID, so retrieving the NID for // the event is necessary. NIDs, err := b.db.EventNIDs(ctx, []string{eventID}) if err != nil { logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get event NID for event") - return + return nil } stateEntries, err := stateBeforeEvent(ctx, b.db, NIDs[eventID]) if err != nil { logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to load state before event") - return + return nil } // possibly return all joined servers depending on history visiblity memberEventsFromVis, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries) if err != nil { logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules") - return + return nil } logrus.Infof("ServersAtEvent including %d current events from history visibility", len(memberEventsFromVis)) @@ -189,7 +209,7 @@ func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID memberEvents, err := getMembershipsAtState(ctx, b.db, stateEntries, true) if err != nil { logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get memberships before event") - return + return nil } memberEvents = append(memberEvents, memberEventsFromVis...) @@ -198,6 +218,7 @@ func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID for _, event := range memberEvents { serverSet[event.Origin()] = true } + var servers []gomatrixserverlib.ServerName for server := range serverSet { if server == b.thisServer { continue @@ -205,7 +226,7 @@ func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID servers = append(servers, server) } b.servers = servers - return + return servers } // Backfill performs a backfill request to the given server. diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 811188cc..88e16fe5 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -205,6 +205,7 @@ func (r *messagesReq) retrieveEvents() ( } var events []gomatrixserverlib.HeaderedEvent + util.GetLogger(r.ctx).WithField("start", start).WithField("end", end).Infof("Fetched %d events locally", len(streamEvents)) // There can be two reasons for streamEvents to be empty: either we've // reached the oldest event in the room (or the most recent one, depending @@ -373,13 +374,13 @@ func (e eventsByDepth) Less(i, j int) bool { // event, or if there is no remote homeserver to contact. // Returns an error if there was an issue with retrieving the list of servers in // the room or sending the request. -func (r *messagesReq) backfill(roomID string, fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) { +func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) { var res api.QueryBackfillResponse err := r.rsAPI.QueryBackfill(context.Background(), &api.QueryBackfillRequest{ - RoomID: roomID, - EarliestEventsIDs: fromEventIDs, - Limit: limit, - ServerName: r.cfg.Matrix.ServerName, + RoomID: roomID, + BackwardsExtremities: backwardsExtremities, + Limit: limit, + ServerName: r.cfg.Matrix.ServerName, }, &res) if err != nil { return nil, fmt.Errorf("QueryBackfill failed: %w", err) diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index eba008b3..121e94c7 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -94,9 +94,8 @@ type Database interface { GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error) // EventPositionInTopology returns the depth and stream position of the given event. EventPositionInTopology(ctx context.Context, eventID string) (types.TopologyToken, error) - // BackwardExtremitiesForRoom returns the event IDs of all of the backward - // extremities we know of for a given room. - BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities []string, err error) + // BackwardExtremitiesForRoom returns a map of backwards extremity event ID to a list of its prev_events. + BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities map[string][]string, err error) // MaxTopologicalPosition returns the highest topological position for a given room. MaxTopologicalPosition(ctx context.Context, roomID string) (types.TopologyToken, error) // StreamEventsToEvents converts streamEvent to Event. If device is non-nil and diff --git a/syncapi/storage/postgres/backwards_extremities_table.go b/syncapi/storage/postgres/backwards_extremities_table.go index 3dd1b234..fa498a40 100644 --- a/syncapi/storage/postgres/backwards_extremities_table.go +++ b/syncapi/storage/postgres/backwards_extremities_table.go @@ -41,7 +41,7 @@ const insertBackwardExtremitySQL = "" + " ON CONFLICT DO NOTHING" const selectBackwardExtremitiesForRoomSQL = "" + - "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1" + "SELECT event_id, prev_event_id FROM syncapi_backward_extremities WHERE room_id = $1" const deleteBackwardExtremitySQL = "" + "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" @@ -79,23 +79,24 @@ func (s *backwardExtremitiesStatements) InsertsBackwardExtremity( func (s *backwardExtremitiesStatements) SelectBackwardExtremitiesForRoom( ctx context.Context, roomID string, -) (eventIDs []string, err error) { +) (bwExtrems map[string][]string, err error) { rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID) if err != nil { return } defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed") + bwExtrems = make(map[string][]string) for rows.Next() { var eID string - if err = rows.Scan(&eID); err != nil { + var prevEventID string + if err = rows.Scan(&eID, &prevEventID); err != nil { return } - - eventIDs = append(eventIDs, eID) + bwExtrems[eID] = append(bwExtrems[eID], prevEventID) } - return eventIDs, rows.Err() + return bwExtrems, rows.Err() } func (s *backwardExtremitiesStatements) DeleteBackwardExtremity( diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index be1f9c7a..543e5b4a 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -310,6 +310,7 @@ func (d *Database) updateRoomState( } membership = &value } + if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil { return err } @@ -367,7 +368,7 @@ func (d *Database) SyncPosition(ctx context.Context) (tok types.StreamingToken, func (d *Database) BackwardExtremitiesForRoom( ctx context.Context, roomID string, -) (backwardExtremities []string, err error) { +) (backwardExtremities map[string][]string, err error) { return d.BackwardExtremities.SelectBackwardExtremitiesForRoom(ctx, roomID) } diff --git a/syncapi/storage/sqlite3/backwards_extremities_table.go b/syncapi/storage/sqlite3/backwards_extremities_table.go index a172f6bf..c5d9cae5 100644 --- a/syncapi/storage/sqlite3/backwards_extremities_table.go +++ b/syncapi/storage/sqlite3/backwards_extremities_table.go @@ -41,7 +41,7 @@ const insertBackwardExtremitySQL = "" + " ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING" const selectBackwardExtremitiesForRoomSQL = "" + - "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1" + "SELECT event_id, prev_event_id FROM syncapi_backward_extremities WHERE room_id = $1" const deleteBackwardExtremitySQL = "" + "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" @@ -79,23 +79,24 @@ func (s *backwardExtremitiesStatements) InsertsBackwardExtremity( func (s *backwardExtremitiesStatements) SelectBackwardExtremitiesForRoom( ctx context.Context, roomID string, -) (eventIDs []string, err error) { +) (bwExtrems map[string][]string, err error) { rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID) if err != nil { return } defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed") + bwExtrems = make(map[string][]string) for rows.Next() { var eID string - if err = rows.Scan(&eID); err != nil { + var prevEventID string + if err = rows.Scan(&eID, &prevEventID); err != nil { return } - - eventIDs = append(eventIDs, eID) + bwExtrems[eID] = append(bwExtrems[eID], prevEventID) } - return eventIDs, rows.Err() + return bwExtrems, rows.Err() } func (s *backwardExtremitiesStatements) DeleteBackwardExtremity( diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index f31bdc2e..bc3b6941 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -89,8 +89,8 @@ type CurrentRoomState interface { type BackwardsExtremities interface { // InsertsBackwardExtremity inserts a new backwards extremity. InsertsBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string) (err error) - // SelectBackwardExtremitiesForRoom retrieves all backwards extremities for the room. - SelectBackwardExtremitiesForRoom(ctx context.Context, roomID string) (eventIDs []string, err error) + // SelectBackwardExtremitiesForRoom retrieves all backwards extremities for the room, as a map of event_id to list of prev_event_ids. + SelectBackwardExtremitiesForRoom(ctx context.Context, roomID string) (bwExtrems map[string][]string, err error) // DeleteBackwardExtremity removes a backwards extremity for a room, if one existed. DeleteBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, knownEventID string) (err error) } diff --git a/sytest-whitelist b/sytest-whitelist index 4a8af13a..2e8205dc 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -280,3 +280,4 @@ Inbound federation can get public room list An event which redacts itself should be ignored A pair of events which redact each other should be ignored Outbound federation can backfill events +Inbound federation can backfill events