From 6e63df1d9a3eadf924d518a1a02f04dfd03ad6b1 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 19 Oct 2020 14:59:13 +0100 Subject: [PATCH] KindOld (#1531) * Add KindOld * Don't process latest events/memberships for old events * Allow federationsender to ignore duplicate key entries when LatestEventIDs is duplicated by RS output events * Signal to downstream components if an event has become a forward extremity * Don't exclude from sync * Soft-fail checks on KindNew * Don't run the latest events updater at all for KindOld * Don't make federation sender change after all * Kind in federation sender join * Don't send isForwardExtremity * Fix syncapi * Update comments * Fix SendEventWithState * Update sytest-whitelist * Generate old output events * Sync API consumes old room events * Update comments --- clientapi/routing/createroom.go | 1 + clientapi/routing/membership.go | 1 + clientapi/routing/profile.go | 4 +- clientapi/routing/redaction.go | 2 +- clientapi/routing/sendevent.go | 1 + clientapi/threepid/invites.go | 1 + federationapi/routing/join.go | 1 + federationapi/routing/leave.go | 1 + federationapi/routing/send.go | 3 ++ federationapi/routing/threepid.go | 3 +- federationsender/internal/perform.go | 1 + roomserver/api/input.go | 16 ++++++-- roomserver/api/output.go | 18 +++++++++ roomserver/api/wrapper.go | 14 ++++--- roomserver/internal/input/input_events.go | 39 +++++++++++++------ .../internal/input/input_latest_events.go | 11 +++--- roomserver/roomserver_test.go | 4 +- syncapi/consumers/roomserver.go | 37 +++++++++++++++++- sytest-whitelist | 1 + 19 files changed, 125 insertions(+), 34 deletions(-) diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go index 9655339c..cff3c981 100644 --- a/clientapi/routing/createroom.go +++ b/clientapi/routing/createroom.go @@ -344,6 +344,7 @@ func createRoom( if err = roomserverAPI.SendEventWithState( req.Context(), rsAPI, + roomserverAPI.KindNew, &gomatrixserverlib.RespState{ StateEvents: accumulated, AuthEvents: accumulated, diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go index 88cb2364..fe079557 100644 --- a/clientapi/routing/membership.go +++ b/clientapi/routing/membership.go @@ -76,6 +76,7 @@ func sendMembership(ctx context.Context, accountDB accounts.Database, device *us if err = roomserverAPI.SendEvents( ctx, rsAPI, + api.KindNew, []gomatrixserverlib.HeaderedEvent{event.Event.Headered(roomVer)}, cfg.Matrix.ServerName, nil, diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go index 60669a0c..bbe35fac 100644 --- a/clientapi/routing/profile.go +++ b/clientapi/routing/profile.go @@ -170,7 +170,7 @@ func SetAvatarURL( return jsonerror.InternalServerError() } - if err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil { + if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil { util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } @@ -288,7 +288,7 @@ func SetDisplayName( return jsonerror.InternalServerError() } - if err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil { + if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil { util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } diff --git a/clientapi/routing/redaction.go b/clientapi/routing/redaction.go index 9701685e..266c0aff 100644 --- a/clientapi/routing/redaction.go +++ b/clientapi/routing/redaction.go @@ -121,7 +121,7 @@ func SendRedaction( JSON: jsonerror.NotFound("Room does not exist"), } } - if err = roomserverAPI.SendEvents(context.Background(), rsAPI, []gomatrixserverlib.HeaderedEvent{*e}, cfg.Matrix.ServerName, nil); err != nil { + if err = roomserverAPI.SendEvents(context.Background(), rsAPI, api.KindNew, []gomatrixserverlib.HeaderedEvent{*e}, cfg.Matrix.ServerName, nil); err != nil { util.GetLogger(req.Context()).WithError(err).Errorf("failed to SendEvents") return jsonerror.InternalServerError() } diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go index 9744a564..1303663f 100644 --- a/clientapi/routing/sendevent.go +++ b/clientapi/routing/sendevent.go @@ -92,6 +92,7 @@ func SendEvent( // event ID in case of duplicate transaction is discarded if err := api.SendEvents( req.Context(), rsAPI, + api.KindNew, []gomatrixserverlib.HeaderedEvent{ e.Headered(verRes.RoomVersion), }, diff --git a/clientapi/threepid/invites.go b/clientapi/threepid/invites.go index b9575a28..272d3407 100644 --- a/clientapi/threepid/invites.go +++ b/clientapi/threepid/invites.go @@ -361,6 +361,7 @@ func emit3PIDInviteEvent( return api.SendEvents( ctx, rsAPI, + api.KindNew, []gomatrixserverlib.HeaderedEvent{ (*event).Headered(queryRes.RoomVersion), }, diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go index c637116f..12f20536 100644 --- a/federationapi/routing/join.go +++ b/federationapi/routing/join.go @@ -290,6 +290,7 @@ func SendJoin( if !alreadyJoined { if err = api.SendEvents( httpReq.Context(), rsAPI, + api.KindNew, []gomatrixserverlib.HeaderedEvent{ event.Headered(stateAndAuthChainResponse.RoomVersion), }, diff --git a/federationapi/routing/leave.go b/federationapi/routing/leave.go index e16dfcc2..fb81d931 100644 --- a/federationapi/routing/leave.go +++ b/federationapi/routing/leave.go @@ -256,6 +256,7 @@ func SendLeave( // the room, so set SendAsServer to cfg.Matrix.ServerName if err = api.SendEvents( httpReq.Context(), rsAPI, + api.KindNew, []gomatrixserverlib.HeaderedEvent{ event.Headered(verRes.RoomVersion), }, diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 783fdc3b..76dc3a2e 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -403,6 +403,7 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event) er return api.SendEvents( context.Background(), t.rsAPI, + api.KindNew, []gomatrixserverlib.HeaderedEvent{ e.Headered(stateResp.RoomVersion), }, @@ -586,6 +587,7 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser err = api.SendEventWithState( context.Background(), t.rsAPI, + api.KindOld, resolvedState, backwardsExtremity.Headered(roomVersion), t.haveEventIDs(), @@ -605,6 +607,7 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser if err = api.SendEvents( context.Background(), t.rsAPI, + api.KindOld, append(headeredNewEvents, e.Headered(roomVersion)), api.DoNotSendToOtherServers, nil, diff --git a/federationapi/routing/threepid.go b/federationapi/routing/threepid.go index ec6cc148..4db5273a 100644 --- a/federationapi/routing/threepid.go +++ b/federationapi/routing/threepid.go @@ -89,7 +89,7 @@ func CreateInvitesFrom3PIDInvites( } // Send all the events - if err := api.SendEvents(req.Context(), rsAPI, evs, cfg.Matrix.ServerName, nil); err != nil { + if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, evs, cfg.Matrix.ServerName, nil); err != nil { util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } @@ -174,6 +174,7 @@ func ExchangeThirdPartyInvite( // Send the event to the roomserver if err = api.SendEvents( httpReq.Context(), rsAPI, + api.KindNew, []gomatrixserverlib.HeaderedEvent{ signedEvent.Event.Headered(verRes.RoomVersion), }, diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go index 254883e6..3904ab85 100644 --- a/federationsender/internal/perform.go +++ b/federationsender/internal/perform.go @@ -248,6 +248,7 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer( // returned state to the roomserver to update our local view. if err = roomserverAPI.SendEventWithState( ctx, r.rsAPI, + roomserverAPI.KindNew, respState, event.Headered(respMakeJoin.RoomVersion), nil, diff --git a/roomserver/api/input.go b/roomserver/api/input.go index dd693203..e1a8afa0 100644 --- a/roomserver/api/input.go +++ b/roomserver/api/input.go @@ -21,17 +21,25 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) +type Kind int + const ( // KindOutlier event fall outside the contiguous event graph. // We do not have the state for these events. // These events are state events used to authenticate other events. // They can become part of the contiguous event graph via backfill. - KindOutlier = 1 + KindOutlier Kind = iota + 1 // KindNew event extend the contiguous graph going forwards. // They usually don't need state, but may include state if the // there was a new event that references an event that we don't - // have a copy of. - KindNew = 2 + // have a copy of. New events will influence the fwd extremities + // of the room and output events will be generated as a result. + KindNew + // KindOld event extend the graph backwards, or fill gaps in + // history. They may or may not include state. They will not be + // considered for forward extremities, and output events will NOT + // be generated for them. + KindOld ) // DoNotSendToOtherServers tells us not to send the event to other matrix @@ -43,7 +51,7 @@ const DoNotSendToOtherServers = "" type InputRoomEvent struct { // Whether this event is new, backfilled or an outlier. // This controls how the event is processed. - Kind int `json:"kind"` + Kind Kind `json:"kind"` // The event JSON for the event to add. Event gomatrixserverlib.HeaderedEvent `json:"event"` // List of state event IDs that authenticate this event. diff --git a/roomserver/api/output.go b/roomserver/api/output.go index d57f3b04..9cb814a4 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -24,6 +24,8 @@ type OutputType string const ( // OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent OutputTypeNewRoomEvent OutputType = "new_room_event" + // OutputTypeOldRoomEvent indicates that the event is an OutputOldRoomEvent + OutputTypeOldRoomEvent OutputType = "old_room_event" // OutputTypeNewInviteEvent indicates that the event is an OutputNewInviteEvent OutputTypeNewInviteEvent OutputType = "new_invite_event" // OutputTypeRetireInviteEvent indicates that the event is an OutputRetireInviteEvent @@ -58,6 +60,8 @@ type OutputEvent struct { Type OutputType `json:"type"` // The content of event with type OutputTypeNewRoomEvent NewRoomEvent *OutputNewRoomEvent `json:"new_room_event,omitempty"` + // The content of event with type OutputTypeOldRoomEvent + OldRoomEvent *OutputOldRoomEvent `json:"old_room_event,omitempty"` // The content of event with type OutputTypeNewInviteEvent NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"` // The content of event with type OutputTypeRetireInviteEvent @@ -178,6 +182,20 @@ func (ore *OutputNewRoomEvent) AddsState() []gomatrixserverlib.HeaderedEvent { return append(ore.AddStateEvents, ore.Event) } +// An OutputOldRoomEvent is written when the roomserver receives an old event. +// This will typically happen as a result of getting either missing events +// or backfilling. Downstream components may wish to send these events to +// clients when it is advantageous to do so, but with the consideration that +// the event is likely a historic event. +// +// Old events do not update forward extremities or the current room state, +// therefore they must not be treated as if they do. Downstream components +// should build their current room state up from OutputNewRoomEvents only. +type OutputOldRoomEvent struct { + // The Event. + Event gomatrixserverlib.HeaderedEvent `json:"event"` +} + // An OutputNewInviteEvent is written whenever an invite becomes active. // Invite events can be received outside of an existing room so have to be // tracked separately from the room events themselves. diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go index a38c00df..9e821910 100644 --- a/roomserver/api/wrapper.go +++ b/roomserver/api/wrapper.go @@ -24,13 +24,14 @@ import ( // SendEvents to the roomserver The events are written with KindNew. func SendEvents( - ctx context.Context, rsAPI RoomserverInternalAPI, events []gomatrixserverlib.HeaderedEvent, + ctx context.Context, rsAPI RoomserverInternalAPI, + kind Kind, events []gomatrixserverlib.HeaderedEvent, sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID, ) error { ires := make([]InputRoomEvent, len(events)) for i, event := range events { ires[i] = InputRoomEvent{ - Kind: KindNew, + Kind: kind, Event: event, AuthEventIDs: event.AuthEventIDs(), SendAsServer: string(sendAsServer), @@ -40,12 +41,13 @@ func SendEvents( return SendInputRoomEvents(ctx, rsAPI, ires) } -// SendEventWithState writes an event with KindNew to the roomserver +// SendEventWithState writes an event with the specified kind to the roomserver // with the state at the event as KindOutlier before it. Will not send any event that is // marked as `true` in haveEventIDs func SendEventWithState( - ctx context.Context, rsAPI RoomserverInternalAPI, state *gomatrixserverlib.RespState, - event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool, + ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind, + state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, + haveEventIDs map[string]bool, ) error { outliers, err := state.Events() if err != nil { @@ -70,7 +72,7 @@ func SendEventWithState( } ires = append(ires, InputRoomEvent{ - Kind: KindNew, + Kind: kind, Event: event, AuthEventIDs: event.AuthEventIDs(), HasState: true, diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 11334159..67031609 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -119,7 +119,7 @@ func (r *Inputer) processRoomEvent( // We haven't calculated a state for this event yet. // Lets calculate one. err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected) - if err != nil { + if err != nil && input.Kind != api.KindOld { return "", fmt.Errorf("r.calculateAndSetState: %w", err) } } @@ -136,16 +136,31 @@ func (r *Inputer) processRoomEvent( return event.EventID(), rejectionErr } - if err = r.updateLatestEvents( - ctx, // context - roomInfo, // room info for the room being updated - stateAtEvent, // state at event (below) - event, // event - input.SendAsServer, // send as server - input.TransactionID, // transaction ID - input.HasState, // rewrites state? - ); err != nil { - return "", fmt.Errorf("r.updateLatestEvents: %w", err) + switch input.Kind { + case api.KindNew: + if err = r.updateLatestEvents( + ctx, // context + roomInfo, // room info for the room being updated + stateAtEvent, // state at event (below) + event, // event + input.SendAsServer, // send as server + input.TransactionID, // transaction ID + input.HasState, // rewrites state? + ); err != nil { + return "", fmt.Errorf("r.updateLatestEvents: %w", err) + } + case api.KindOld: + err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ + { + Type: api.OutputTypeOldRoomEvent, + OldRoomEvent: &api.OutputOldRoomEvent{ + Event: headered, + }, + }, + }) + if err != nil { + return "", fmt.Errorf("r.WriteOutputEvents (old): %w", err) + } } // processing this event resulted in an event (which may not be the one we're processing) @@ -163,7 +178,7 @@ func (r *Inputer) processRoomEvent( }, }) if err != nil { - return "", fmt.Errorf("r.WriteOutputEvents: %w", err) + return "", fmt.Errorf("r.WriteOutputEvents (redactions): %w", err) } } diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index ca5d214d..5adcd087 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -164,8 +164,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { return fmt.Errorf("u.api.updateMemberships: %w", err) } - var update *api.OutputEvent - update, err = u.makeOutputNewRoomEvent() + update, err := u.makeOutputNewRoomEvent() if err != nil { return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err) } @@ -259,6 +258,8 @@ func (u *latestEventsUpdater) latestState() error { return nil } +// calculateLatest works out the new set of forward extremities. Returns +// true if the new event is included in those extremites, false otherwise. func (u *latestEventsUpdater) calculateLatest( oldLatest []types.StateAtEventAndReference, newEvent types.StateAtEventAndReference, @@ -326,7 +327,6 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) if err != nil { return nil, err } - for _, entry := range u.added { ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID]) } @@ -339,13 +339,14 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) for _, entry := range u.stateBeforeEventAdds { ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID]) } + ore.SendAsServer = u.sendAsServer // include extra state events if they were added as nearly every downstream component will care about it // and we'd rather not have them all hit QueryEventsByID at the same time! if len(ore.AddsStateEventIDs) > 0 { - ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs) - if err != nil { + var err error + if ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs); err != nil { return nil, fmt.Errorf("failed to load add_state_events from db: %w", err) } } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 1b692a09..c8e60efa 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -191,7 +191,7 @@ func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []js t.Helper() rsAPI, dp := mustCreateRoomserverAPI(t) hevents := mustLoadRawEvents(t, ver, events) - if err := api.SendEvents(ctx, rsAPI, hevents, testOrigin, nil); err != nil { + if err := api.SendEvents(ctx, rsAPI, api.KindNew, hevents, testOrigin, nil); err != nil { t.Errorf("failed to SendEvents: %s", err) } return rsAPI, dp, hevents @@ -337,7 +337,7 @@ func TestOutputRewritesState(t *testing.T) { deleteDatabase() rsAPI, producer := mustCreateRoomserverAPI(t) defer deleteDatabase() - err := api.SendEvents(context.Background(), rsAPI, originalEvents, testOrigin, nil) + err := api.SendEvents(context.Background(), rsAPI, api.KindNew, originalEvents, testOrigin, nil) if err != nil { t.Fatalf("failed to send original events: %s", err) } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index ca48c830..373baea5 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -97,6 +97,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { } } return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) + case api.OutputTypeOldRoomEvent: + return s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent) case api.OutputTypeNewInviteEvent: return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) case api.OutputTypeRetireInviteEvent: @@ -168,7 +170,40 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( log.ErrorKey: err, "add": msg.AddsStateEventIDs, "del": msg.RemovesStateEventIDs, - }).Panicf("roomserver output log: write event failure") + }).Panicf("roomserver output log: write new event failure") + return nil + } + + if pduPos, err = s.notifyJoinedPeeks(ctx, &ev, pduPos); err != nil { + logrus.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos) + return err + } + + s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil)) + + return nil +} + +func (s *OutputRoomEventConsumer) onOldRoomEvent( + ctx context.Context, msg api.OutputOldRoomEvent, +) error { + ev := msg.Event + + pduPos, err := s.db.WriteEvent( + ctx, + &ev, + []gomatrixserverlib.HeaderedEvent{}, + []string{}, // adds no state + []string{}, // removes no state + nil, // no transaction + false, // not excluded from sync + ) + if err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + log.ErrorKey: err, + }).Panicf("roomserver output log: write old event failure") return nil } diff --git a/sytest-whitelist b/sytest-whitelist index 2ba0a88b..2d032331 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -484,3 +484,4 @@ Users cannot kick users who have already left a room A prev_batch token from incremental sync can be used in the v1 messages API Event with an invalid signature in the send_join response should not cause room join to fail Inbound federation rejects typing notifications from wrong remote +Should not be able to take over the room by pretending there is no PL event