From 02180633399c702403bc2dea9b3057318315843a Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 22 Sep 2017 12:34:54 +0200 Subject: [PATCH] Implement room state retrieval endpoint (#254) * Move prev event structure to component-wide types * Renamed key for better understandability * Implement /state endpoint * Change rowsToEvents() to return an empty slice instead of nil in case of empty result set * Doc * Fix forgotten comma * Specify HTTP method * Update comment * Remove debug fmt.Println --- .../dendrite/syncapi/consumers/roomserver.go | 14 ++--- .../dendrite/syncapi/routing/routing.go | 6 ++ .../storage/current_room_state_table.go | 2 +- .../dendrite/syncapi/storage/syncserver.go | 13 +++++ .../dendrite/syncapi/sync/requestpool.go | 58 ++++++++++++++++++- .../dendrite/syncapi/types/types.go | 8 +++ 6 files changed, 88 insertions(+), 13 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index 911ccd0d..7c40247b 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -38,12 +38,6 @@ type OutputRoomEvent struct { query api.RoomserverQueryAPI } -type prevEventRef struct { - PrevContent json.RawMessage `json:"prev_content"` - PrevID string `json:"replaces_state"` - UserID string `json:"prev_sender"` -} - // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. func NewOutputRoomEvent( cfg *config.Dendrite, @@ -267,10 +261,10 @@ func (s *OutputRoomEvent) updateStateEvent(event gomatrixserverlib.Event) (gomat return event, nil } - prev := prevEventRef{ - PrevContent: prevEvent.Content(), - PrevID: prevEvent.EventID(), - UserID: prevEvent.Sender(), + prev := types.PrevEventRef{ + PrevContent: prevEvent.Content(), + ReplacesState: prevEvent.EventID(), + PrevSender: prevEvent.Sender(), } return event.SetUnsigned(prev) diff --git a/src/github.com/matrix-org/dendrite/syncapi/routing/routing.go b/src/github.com/matrix-org/dendrite/syncapi/routing/routing.go index e95a7a25..5bad22b5 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/syncapi/routing/routing.go @@ -30,7 +30,13 @@ const pathPrefixR0 = "/_matrix/client/r0" // Setup configures the given mux with sync-server listeners func Setup(apiMux *mux.Router, srp *sync.RequestPool, deviceDB *devices.Database) { r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() + r0mux.Handle("/sync", common.MakeAuthAPI("sync", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { return srp.OnIncomingSyncRequest(req, device) })).Methods("GET") + + r0mux.Handle("/rooms/{roomID}/state", common.MakeAuthAPI("room_state", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { + vars := mux.Vars(req) + return srp.OnIncomingStateRequest(req, vars["roomID"]) + })).Methods("GET") } diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index 4ea06808..c5879d53 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -216,7 +216,7 @@ func (s *currentRoomStateStatements) selectEventsWithEventIDs( } func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { - var result []gomatrixserverlib.Event + result := []gomatrixserverlib.Event{} for rows.Next() { var eventBytes []byte if err := rows.Scan(&eventBytes); err != nil { diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 3ab7e1c7..383e86da 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -162,6 +162,19 @@ func (d *SyncServerDatabase) GetStateEvent( return d.roomstate.selectStateEvent(ctx, evType, roomID, stateKey) } +// GetStateEventsForRoom fetches the state events for a given room. +// Returns an empty slice if no state events could be found for this room. +// Returns an error if there was an issue with the retrieval. +func (d *SyncServerDatabase) GetStateEventsForRoom( + ctx context.Context, roomID string, +) (stateEvents []gomatrixserverlib.Event, err error) { + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { + stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID) + return err + }) + return +} + // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. func (d *SyncServerDatabase) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { return d.syncStreamPositionTx(ctx, nil) diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index c9b86a6f..56ba3d13 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -15,6 +15,7 @@ package sync import ( + "encoding/json" "net/http" "time" @@ -105,6 +106,56 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype } } +type stateEventInStateResp struct { + gomatrixserverlib.ClientEvent + PrevContent json.RawMessage `json:"prev_content,omitempty"` + ReplacesState string `json:"replaces_state,omitempty"` +} + +// OnIncomingStateRequest is called when a client makes a /rooms/{roomID}/state +// request. It will fetch all the state events from the specified room and will +// append the necessary keys to them if applicable before returning them. +// Returns an error if something went wrong in the process. +// TODO: Check if the user is in the room. If not, check if the room's history +// is publicly visible. Current behaviour is returning an empty array if the +// user cannot see the room's history. +func (rp *RequestPool) OnIncomingStateRequest(req *http.Request, roomID string) util.JSONResponse { + stateEvents, err := rp.db.GetStateEventsForRoom(req.Context(), roomID) + if err != nil { + return httputil.LogThenError(req, err) + } + + resp := []stateEventInStateResp{} + // Fill the prev_content and replaces_state keys if necessary + for _, event := range stateEvents { + stateEvent := stateEventInStateResp{ + ClientEvent: gomatrixserverlib.ToClientEvent(event, gomatrixserverlib.FormatAll), + } + var prevEventRef types.PrevEventRef + if len(event.Unsigned()) > 0 { + if err := json.Unmarshal(event.Unsigned(), &prevEventRef); err != nil { + return httputil.LogThenError(req, err) + } + // Fills the previous state event ID if the state event replaces another + // state event + if len(prevEventRef.ReplacesState) > 0 { + stateEvent.ReplacesState = prevEventRef.ReplacesState + } + // Fill the previous event if the state event references a previous event + if prevEventRef.PrevContent != nil { + stateEvent.PrevContent = prevEventRef.PrevContent + } + } + + resp = append(resp, stateEvent) + } + + return util.JSONResponse{ + Code: 200, + JSON: resp, + } +} + func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (*types.Response, error) { // TODO: handle ignored users if req.since == types.StreamPosition(0) { @@ -116,8 +167,11 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.Stre func (rp *RequestPool) appendAccountData( data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition, ) (*types.Response, error) { - // TODO: We currently send all account data on every sync response, we should instead send data - // that has changed on incremental sync responses + // TODO: Account data doesn't have a sync position of its own, meaning that + // account data might be sent multiple time to the client if multiple account + // data keys were set between two message. This isn't a huge issue since the + // duplicate data doesn't represent a huge quantity of data, but an optimisation + // here would be making sure each data is sent only once to the client. localpart, _, err := gomatrixserverlib.SplitID('@', userID) if err != nil { return nil, err diff --git a/src/github.com/matrix-org/dendrite/syncapi/types/types.go b/src/github.com/matrix-org/dendrite/syncapi/types/types.go index adc764c3..f710c6d5 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/types/types.go +++ b/src/github.com/matrix-org/dendrite/syncapi/types/types.go @@ -15,6 +15,7 @@ package types import ( + "encoding/json" "strconv" "github.com/matrix-org/gomatrixserverlib" @@ -28,6 +29,13 @@ func (sp StreamPosition) String() string { return strconv.FormatInt(int64(sp), 10) } +// PrevEventRef represents a reference to a previous event in a state event upgrade +type PrevEventRef struct { + PrevContent json.RawMessage `json:"prev_content"` + ReplacesState string `json:"replaces_state"` + PrevSender string `json:"prev_sender"` +} + // Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync type Response struct { NextBatch string `json:"next_batch"`