From a5a85c6a11f7ab6f1ab4c1d83e2ca1cf781b03b8 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 17 Aug 2020 17:33:19 +0100 Subject: [PATCH] Make PerformJoin responsible for sending invite to RS input (#1277) * Make PerformJoin send input membership event * Invite input room events in separate goroutine * Don't limit roomserver input events using request context * Synchronous input room events * Nope, that didn't work * oops send state key to GetMembership * Don't generate stripped state in client API more times than necessary, generate output events on receiving end of federated invite * Commit membership updater changes * Tweaks --- clientapi/routing/createroom.go | 75 +++++++++++++----------- roomserver/api/wrapper.go | 17 ------ roomserver/internal/perform_invite.go | 83 ++++++++++++++++++--------- sytest-whitelist | 1 + 4 files changed, 98 insertions(+), 78 deletions(-) diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go index 36837d40..57fc3f33 100644 --- a/clientapi/routing/createroom.go +++ b/clientapi/routing/createroom.go @@ -371,20 +371,10 @@ func createRoom( } // If this is a direct message then we should invite the participants. - for _, invitee := range r.Invite { - // Build the invite event. - inviteEvent, err := buildMembershipEvent( - req.Context(), invitee, "", accountDB, device, gomatrixserverlib.Invite, - roomID, true, cfg, evTime, rsAPI, asAPI, - ) - if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("buildMembershipEvent failed") - continue - } + if len(r.Invite) > 0 { // Build some stripped state for the invite. - candidates := append(gomatrixserverlib.UnwrapEventHeaders(builtEvents), inviteEvent.Event) - var strippedState []gomatrixserverlib.InviteV2StrippedState - for _, event := range candidates { + var globalStrippedState []gomatrixserverlib.InviteV2StrippedState + for _, event := range builtEvents { switch event.Type() { case gomatrixserverlib.MRoomName: fallthrough @@ -395,29 +385,48 @@ func createRoom( case gomatrixserverlib.MRoomMember: fallthrough case gomatrixserverlib.MRoomJoinRules: - strippedState = append( - strippedState, - gomatrixserverlib.NewInviteV2StrippedState(&event), + ev := event.Event + globalStrippedState = append( + globalStrippedState, + gomatrixserverlib.NewInviteV2StrippedState(&ev), ) } } - // Send the invite event to the roomserver. - err = roomserverAPI.SendInvite( - req.Context(), rsAPI, - inviteEvent.Headered(roomVersion), - strippedState, // invite room state - cfg.Matrix.ServerName, // send as server - nil, // transaction ID - ) - switch e := err.(type) { - case *roomserverAPI.PerformError: - return e.JSONResponse() - case nil: - default: - util.GetLogger(req.Context()).WithError(err).Error("roomserverAPI.SendInvite failed") - return util.JSONResponse{ - Code: http.StatusInternalServerError, - JSON: jsonerror.InternalServerError(), + + // Process the invites. + for _, invitee := range r.Invite { + // Build the invite event. + inviteEvent, err := buildMembershipEvent( + req.Context(), invitee, "", accountDB, device, gomatrixserverlib.Invite, + roomID, true, cfg, evTime, rsAPI, asAPI, + ) + if err != nil { + util.GetLogger(req.Context()).WithError(err).Error("buildMembershipEvent failed") + continue + } + inviteStrippedState := append( + globalStrippedState, + gomatrixserverlib.NewInviteV2StrippedState(&inviteEvent.Event), + ) + // Send the invite event to the roomserver. + err = roomserverAPI.SendInvite( + req.Context(), + rsAPI, + inviteEvent.Headered(roomVersion), + inviteStrippedState, // invite room state + cfg.Matrix.ServerName, // send as server + nil, // transaction ID + ) + switch e := err.(type) { + case *roomserverAPI.PerformError: + return e.JSONResponse() + case nil: + default: + util.GetLogger(req.Context()).WithError(err).Error("roomserverAPI.SendInvite failed") + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: jsonerror.InternalServerError(), + } } } } diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go index ed3daf67..207c12c8 100644 --- a/roomserver/api/wrapper.go +++ b/roomserver/api/wrapper.go @@ -118,23 +118,6 @@ func SendInvite( return response.Error } - // Now send the invite event into the roomserver. If the room is known - // locally then this will succeed, notifying existing users in the room - // about the new invite. If the room isn't known locally then this will - // fail - and that's also OK. - inputReq := &InputRoomEventsRequest{ - InputRoomEvents: []InputRoomEvent{ - { - Kind: KindNew, - Event: inviteEvent, - AuthEventIDs: inviteEvent.AuthEventIDs(), - SendAsServer: string(sendAsServer), - }, - }, - } - inputRes := &InputRoomEventsResponse{} - _ = rsAPI.InputRoomEvents(ctx, inputReq, inputRes) - return nil } diff --git a/roomserver/internal/perform_invite.go b/roomserver/internal/perform_invite.go index 2e6bcc8e..aab3e8a8 100644 --- a/roomserver/internal/perform_invite.go +++ b/roomserver/internal/perform_invite.go @@ -5,7 +5,6 @@ import ( "fmt" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" - "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/storage" @@ -55,19 +54,15 @@ func (r *RoomserverInternalAPI) PerformInvite( } } - updater, err := r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocal, req.RoomVersion) - if err != nil { - return fmt.Errorf("r.DB.MembershipUpdater: %w", err) - } - succeeded := false - defer func() { - txerr := sqlutil.EndTransaction(updater, &succeeded) - if err == nil && txerr != nil { - err = txerr + var isAlreadyJoined bool + roomNID, err := r.DB.RoomNID(ctx, roomID) + if err == nil { + _, isAlreadyJoined, err = r.DB.GetMembership(ctx, roomNID, *event.StateKey()) + if err != nil { + return fmt.Errorf("r.DB.GetMembership: %w", err) } - }() - - if updater.IsJoin() { + } + if isAlreadyJoined { // If the user is joined to the room then that takes precedence over this // invite event. It makes little sense to move a user that is already // joined to the room into the invite state. @@ -103,9 +98,11 @@ func (r *RoomserverInternalAPI) PerformInvite( } if isOriginLocal { - // check that the user is allowed to do this. We can only do this check if it is - // a local invite as we have the auth events, else we have to take it on trust. - _, err = checkAuthEvents(ctx, r.DB, req.Event, req.Event.AuthEventIDs()) + // The invite originated locally. Therefore we have a responsibility to + // try and see if the user is allowed to make this invite. We can't do + // this for invites coming in over federation - we have to take those on + // trust. + _, err = checkAuthEvents(ctx, r.DB, event, event.AuthEventIDs()) if err != nil { log.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", event.AuthEventIDs()).Error( "processInviteEvent.checkAuthEvents failed for event", @@ -127,7 +124,7 @@ func (r *RoomserverInternalAPI) PerformInvite( if req.SendAsServer != api.DoNotSendToOtherServers && !isTargetLocal { fsReq := &federationSenderAPI.PerformInviteRequest{ RoomVersion: req.RoomVersion, - Event: req.Event, + Event: event, InviteRoomState: inviteState, } fsRes := &federationSenderAPI.PerformInviteResponse{} @@ -141,19 +138,49 @@ func (r *RoomserverInternalAPI) PerformInvite( } event = fsRes.Event } + + // Send the invite event to the roomserver input stream. This will + // notify existing users in the room about the invite, update the + // membership table and ensure that the event is ready and available + // to use as an auth event when accepting the invite. + inputReq := &api.InputRoomEventsRequest{ + InputRoomEvents: []api.InputRoomEvent{ + { + Kind: api.KindNew, + Event: event, + AuthEventIDs: event.AuthEventIDs(), + SendAsServer: req.SendAsServer, + }, + }, + } + inputRes := &api.InputRoomEventsResponse{} + if err = r.InputRoomEvents(context.Background(), inputReq, inputRes); err != nil { + return fmt.Errorf("r.InputRoomEvents: %w", err) + } + } else { + // The invite originated over federation. Process the membership + // update, which will notify the sync API etc about the incoming + // invite. + updater, err := r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocal, req.RoomVersion) + if err != nil { + return fmt.Errorf("r.DB.MembershipUpdater: %w", err) + } + + unwrapped := event.Unwrap() + outputUpdates, err := updateToInviteMembership(updater, &unwrapped, nil, req.Event.RoomVersion) + if err != nil { + return fmt.Errorf("updateToInviteMembership: %w", err) + } + + if err = updater.Commit(); err != nil { + return fmt.Errorf("updater.Commit: %w", err) + } + + if err = r.WriteOutputEvents(roomID, outputUpdates); err != nil { + return fmt.Errorf("r.WriteOutputEvents: %w", err) + } } - unwrapped := event.Unwrap() - outputUpdates, err := updateToInviteMembership(updater, &unwrapped, nil, req.Event.RoomVersion) - if err != nil { - return fmt.Errorf("updateToInviteMembership: %w", err) - } - - if err = r.WriteOutputEvents(roomID, outputUpdates); err != nil { - return fmt.Errorf("r.WriteOutputEvents: %w", err) - } - - succeeded = true return nil } diff --git a/sytest-whitelist b/sytest-whitelist index 840d58a7..8084bbe3 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -455,3 +455,4 @@ Banned servers cannot backfill Inbound /v1/send_leave rejects leaves from other servers Guest users can accept invites to private rooms over federation AS user (not ghost) can join room without registering +If user leaves room, remote user changes device and rejoins we see update in /sync and /keys/changes