From a572f4db034e13aa38e122d1c4233b15e2356494 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 7 Feb 2022 19:10:01 +0000 Subject: [PATCH] Fix bugs that could wedge rooms (#2154) * Don't flake so badly for rejected events * Moar * Fix panic * Don't count rejected events as missing * Don't treat rejected events without state as missing * Revert "Don't count rejected events as missing" This reverts commit 4b6139b62eb91ba059b47415b0275964b37d9b43. * Missing events should be KindOld * If we have state, use it, regardless of memberships which could be stale now * Fetch missing state for KindOld too * Tweak the condition again * Clean up a bit * Use room updater to get latest events in a race-free way * Return the correct error * Improve errors --- roomserver/internal/input/input_events.go | 14 +++----- roomserver/internal/input/input_missing.go | 41 +++++++++++----------- roomserver/internal/query/query.go | 3 +- roomserver/types/types.go | 6 ++++ 4 files changed, 33 insertions(+), 31 deletions(-) diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 0ca5c31a..85189e47 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -297,7 +297,10 @@ func (r *Inputer) processRoomEvent( "soft_fail": softfail, "missing_prev": missingPrev, }).Warn("Stored rejected event") - return commitTransaction, rejectionErr + if rejectionErr != nil { + return commitTransaction, types.RejectedError(rejectionErr.Error()) + } + return commitTransaction, nil } switch input.Kind { @@ -483,16 +486,7 @@ func (r *Inputer) calculateAndSetState( roomState := state.NewStateResolution(updater, roomInfo) if input.HasState { - // Check here if we think we're in the room already. stateAtEvent.Overwrite = true - var joinEventNIDs []types.EventNID - // Request join memberships only for local users only. - if joinEventNIDs, err = updater.GetMembershipEventNIDsForRoom(ctx, roomInfo.RoomNID, true, true); err == nil { - // If we have no local users that are joined to the room then any state about - // the room that we have is quite possibly out of date. Therefore in that case - // we should overwrite it rather than merge it. - stateAtEvent.Overwrite = len(joinEventNIDs) == 0 - } // We've been told what the state at the event is so we don't need to calculate it. // Check that those state events are in the database and store the state. diff --git a/roomserver/internal/input/input_missing.go b/roomserver/internal/input/input_missing.go index 4d330666..7a72b038 100644 --- a/roomserver/internal/input/input_missing.go +++ b/roomserver/internal/input/input_missing.go @@ -12,6 +12,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/storage/shared" + "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/sirupsen/logrus" @@ -75,13 +76,15 @@ func (t *missingStateReq) processEventWithMissingState( // in the gap in the DAG for _, newEvent := range newEvents { _, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{ - Kind: api.KindNew, + Kind: api.KindOld, Event: newEvent.Headered(roomVersion), Origin: t.origin, SendAsServer: api.DoNotSendToOtherServers, }) if err != nil { - return fmt.Errorf("t.inputer.processRoomEvent: %w", err) + if _, ok := err.(types.RejectedError); !ok { + return fmt.Errorf("t.inputer.processRoomEvent (filling gap): %w", err) + } } } return nil @@ -183,8 +186,11 @@ func (t *missingStateReq) processEventWithMissingState( } // TODO: we could do this concurrently? for _, ire := range outlierRoomEvents { - if _, err = t.inputer.processRoomEvent(ctx, t.db, &ire); err != nil { - return fmt.Errorf("t.inputer.processRoomEvent[outlier]: %w", err) + _, err = t.inputer.processRoomEvent(ctx, t.db, &ire) + if err != nil { + if _, ok := err.(types.RejectedError); !ok { + return fmt.Errorf("t.inputer.processRoomEvent (outlier): %w", err) + } } } @@ -205,7 +211,9 @@ func (t *missingStateReq) processEventWithMissingState( SendAsServer: api.DoNotSendToOtherServers, }) if err != nil { - return fmt.Errorf("t.inputer.processRoomEvent: %w", err) + if _, ok := err.(types.RejectedError); !ok { + return fmt.Errorf("t.inputer.processRoomEvent (backward extremity): %w", err) + } } // Then send all of the newer backfilled events, of which will all be newer @@ -220,7 +228,9 @@ func (t *missingStateReq) processEventWithMissingState( SendAsServer: api.DoNotSendToOtherServers, }) if err != nil { - return fmt.Errorf("t.inputer.processRoomEvent: %w", err) + if _, ok := err.(types.RejectedError); !ok { + return fmt.Errorf("t.inputer.processRoomEvent (fast forward): %w", err) + } } } @@ -395,20 +405,11 @@ retryAllowedState: // without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled bool, err error) { logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID()) - needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{e}) - // query latest events (our trusted forward extremities) - req := api.QueryLatestEventsAndStateRequest{ - RoomID: e.RoomID(), - StateToFetch: needed.Tuples(), - } - var res api.QueryLatestEventsAndStateResponse - if err = t.queryer.QueryLatestEventsAndState(ctx, &req, &res); err != nil { - logger.WithError(err).Warn("Failed to query latest events") - return nil, false, err - } - latestEvents := make([]string, len(res.LatestEvents)) - for i, ev := range res.LatestEvents { - latestEvents[i] = res.LatestEvents[i].EventID + + latest := t.db.LatestEvents() + latestEvents := make([]string, len(latest)) + for i, ev := range latest { + latestEvents[i] = ev.EventID t.hadEvent(ev.EventID) } diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index 6b4cb581..84553303 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -149,7 +149,8 @@ func (r *Queryer) QueryMissingAuthPrevEvents( } for _, prevEventID := range request.PrevEventIDs { - if state, err := r.DB.StateAtEventIDs(ctx, []string{prevEventID}); err != nil || len(state) == 0 { + state, err := r.DB.StateAtEventIDs(ctx, []string{prevEventID}) + if err != nil || len(state) == 0 { response.MissingPrevEventIDs = append(response.MissingPrevEventIDs, prevEventID) } } diff --git a/roomserver/types/types.go b/roomserver/types/types.go index d7e03ad6..5e1eebe9 100644 --- a/roomserver/types/types.go +++ b/roomserver/types/types.go @@ -209,6 +209,12 @@ type MissingEventError string func (e MissingEventError) Error() string { return string(e) } +// A RejectedError is returned when an event is stored as rejected. The error +// contains the reason why. +type RejectedError string + +func (e RejectedError) Error() string { return string(e) } + // RoomInfo contains metadata about a room type RoomInfo struct { RoomNID RoomNID