diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go index ef7efa2b..59113971 100644 --- a/syncapi/routing/context.go +++ b/syncapi/routing/context.go @@ -44,7 +44,7 @@ func Context( syncDB storage.Database, roomID, eventID string, ) util.JSONResponse { - filter, err := parseContextParams(req) + filter, err := parseRoomEventFilter(req) if err != nil { errMsg := "" switch err.(type) { @@ -164,7 +164,7 @@ func applyLazyLoadMembers(filter *gomatrixserverlib.RoomEventFilter, eventsAfter return newState } -func parseContextParams(req *http.Request) (*gomatrixserverlib.RoomEventFilter, error) { +func parseRoomEventFilter(req *http.Request) (*gomatrixserverlib.RoomEventFilter, error) { // Default room filter filter := &gomatrixserverlib.RoomEventFilter{Limit: 10} diff --git a/syncapi/routing/context_test.go b/syncapi/routing/context_test.go index 1b430d83..e79a5d5f 100644 --- a/syncapi/routing/context_test.go +++ b/syncapi/routing/context_test.go @@ -55,13 +55,13 @@ func Test_parseContextParams(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotFilter, err := parseContextParams(tt.req) + gotFilter, err := parseRoomEventFilter(tt.req) if (err != nil) != tt.wantErr { - t.Errorf("parseContextParams() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("parseRoomEventFilter() error = %v, wantErr %v", err, tt.wantErr) return } if !reflect.DeepEqual(gotFilter, tt.wantFilter) { - t.Errorf("parseContextParams() gotFilter = %v, want %v", gotFilter, tt.wantFilter) + t.Errorf("parseRoomEventFilter() gotFilter = %v, want %v", gotFilter, tt.wantFilter) } }) } diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 9bb8c6d2..7cd54eef 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -19,7 +19,6 @@ import ( "fmt" "net/http" "sort" - "strconv" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/roomserver/api" @@ -45,8 +44,8 @@ type messagesReq struct { fromStream *types.StreamingToken device *userapi.Device wasToProvided bool - limit int backwardOrdering bool + filter *gomatrixserverlib.RoomEventFilter } type messagesResp struct { @@ -54,10 +53,9 @@ type messagesResp struct { StartStream string `json:"start_stream,omitempty"` // NOTSPEC: so clients can hit /messages then immediately /sync with a latest sync token End string `json:"end"` Chunk []gomatrixserverlib.ClientEvent `json:"chunk"` + State []gomatrixserverlib.ClientEvent `json:"state"` } -const defaultMessagesLimit = 10 - // OnIncomingMessagesRequest implements the /messages endpoint from the // client-server API. // See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages @@ -83,6 +81,14 @@ func OnIncomingMessagesRequest( } } + filter, err := parseRoomEventFilter(req) + if err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidArgumentValue("unable to parse filter"), + } + } + // Extract parameters from the request's URL. // Pagination tokens. var fromStream *types.StreamingToken @@ -143,18 +149,6 @@ func OnIncomingMessagesRequest( wasToProvided = false } - // Maximum number of events to return; defaults to 10. - limit := defaultMessagesLimit - if len(req.URL.Query().Get("limit")) > 0 { - limit, err = strconv.Atoi(req.URL.Query().Get("limit")) - - if err != nil { - return util.JSONResponse{ - Code: http.StatusBadRequest, - JSON: jsonerror.InvalidArgumentValue("limit could not be parsed into an integer: " + err.Error()), - } - } - } // TODO: Implement filtering (#587) // Check the room ID's format. @@ -176,7 +170,7 @@ func OnIncomingMessagesRequest( to: &to, fromStream: fromStream, wasToProvided: wasToProvided, - limit: limit, + filter: filter, backwardOrdering: backwardOrdering, device: device, } @@ -187,10 +181,27 @@ func OnIncomingMessagesRequest( return jsonerror.InternalServerError() } + // at least fetch the membership events for the users returned in chunk if LazyLoadMembers is set + state := []gomatrixserverlib.ClientEvent{} + if filter.LazyLoadMembers { + memberShipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent) + for _, evt := range clientEvents { + memberShip, err := db.GetStateEvent(req.Context(), roomID, gomatrixserverlib.MRoomMember, evt.Sender) + if err != nil { + util.GetLogger(req.Context()).WithError(err).Error("failed to get membership event for user") + continue + } + memberShipToUser[evt.Sender] = memberShip + } + for _, evt := range memberShipToUser { + state = append(state, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatAll)) + } + } + util.GetLogger(req.Context()).WithFields(logrus.Fields{ "from": from.String(), "to": to.String(), - "limit": limit, + "limit": filter.Limit, "backwards": backwardOrdering, "return_start": start.String(), "return_end": end.String(), @@ -200,6 +211,7 @@ func OnIncomingMessagesRequest( Chunk: clientEvents, Start: start.String(), End: end.String(), + State: state, } if emptyFromSupplied { res.StartStream = fromStream.String() @@ -234,19 +246,18 @@ func (r *messagesReq) retrieveEvents() ( clientEvents []gomatrixserverlib.ClientEvent, start, end types.TopologyToken, err error, ) { - eventFilter := gomatrixserverlib.DefaultRoomEventFilter() - eventFilter.Limit = r.limit + eventFilter := r.filter // Retrieve the events from the local database. var streamEvents []types.StreamEvent if r.fromStream != nil { toStream := r.to.StreamToken() streamEvents, err = r.db.GetEventsInStreamingRange( - r.ctx, r.fromStream, &toStream, r.roomID, &eventFilter, r.backwardOrdering, + r.ctx, r.fromStream, &toStream, r.roomID, eventFilter, r.backwardOrdering, ) } else { streamEvents, err = r.db.GetEventsInTopologicalRange( - r.ctx, r.from, r.to, r.roomID, r.limit, r.backwardOrdering, + r.ctx, r.from, r.to, r.roomID, eventFilter.Limit, r.backwardOrdering, ) } if err != nil { @@ -434,7 +445,7 @@ func (r *messagesReq) handleEmptyEventsSlice() ( // Check if we have backward extremities for this room. if len(backwardExtremities) > 0 { // If so, retrieve as much events as needed through backfilling. - events, err = r.backfill(r.roomID, backwardExtremities, r.limit) + events, err = r.backfill(r.roomID, backwardExtremities, r.filter.Limit) if err != nil { return } @@ -456,7 +467,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent events []*gomatrixserverlib.HeaderedEvent, err error, ) { // Check if we have enough events. - isSetLargeEnough := len(streamEvents) >= r.limit + isSetLargeEnough := len(streamEvents) >= r.filter.Limit if !isSetLargeEnough { // it might be fine we don't have up to 'limit' events, let's find out if r.backwardOrdering { @@ -483,7 +494,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering { var pdus []*gomatrixserverlib.HeaderedEvent // Only ask the remote server for enough events to reach the limit. - pdus, err = r.backfill(r.roomID, backwardExtremities, r.limit-len(streamEvents)) + pdus, err = r.backfill(r.roomID, backwardExtremities, r.filter.Limit-len(streamEvents)) if err != nil { return } diff --git a/sytest-whitelist b/sytest-whitelist index 12522cfb..2dd56d26 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -597,6 +597,7 @@ Device list doesn't change if remote server is down /context/ on non world readable room does not work /context/ returns correct number of events /context/ with lazy_load_members filter works +GET /rooms/:room_id/messages lazy loads members correctly Can query remote device keys using POST after notification Device deletion propagates over federation Get left notifs in sync and /keys/changes when other user leaves