diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go index 5a1c94e1..121f8f3c 100644 --- a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go @@ -296,6 +296,16 @@ func testSyncServer(syncServerCmdChan chan error, userID, since, want string) { } } +func writeToRoomServerLog(indexes ...int) { + var roomEvents []string + for _, i := range indexes { + roomEvents = append(roomEvents, outputRoomEventTestData[i]) + } + if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(roomEvents)); err != nil { + panic(err) + } +} + // Runs a battery of sync server tests against test data in testdata.go // testdata.go has a list of OutputRoomEvents which will be fed into the kafka log which the sync server will consume. // The tests will pause at various points in this list to conduct tests on the /sync responses before continuing. @@ -311,9 +321,11 @@ func main() { // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world 2"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/2?access_token=@alice:localhost" // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world 3"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost" // $ curl -XPUT -d '{"name":"Custom Room Name"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost" - if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(outputRoomEventTestData[0:9])); err != nil { - panic(err) - } + writeToRoomServerLog( + i0StateRoomCreate, i1StateAliceJoin, i2StatePowerLevels, i3StateJoinRules, i4StateHistoryVisibility, + i5AliceMsg, i6AliceMsg, i7AliceMsg, i8StateAliceRoomName, + ) + // Make sure initial sync works TODO: prev_batch testSyncServer(syncServerCmdChan, "@alice:localhost", "", `{ "account_data": { @@ -338,15 +350,15 @@ func main() { }, "timeline": { "events": [`+ - clientEventTestData[0]+","+ - clientEventTestData[1]+","+ - clientEventTestData[2]+","+ - clientEventTestData[3]+","+ - clientEventTestData[4]+","+ - clientEventTestData[5]+","+ - clientEventTestData[6]+","+ - clientEventTestData[7]+","+ - clientEventTestData[8]+`], + clientEventTestData[i0StateRoomCreate]+","+ + clientEventTestData[i1StateAliceJoin]+","+ + clientEventTestData[i2StatePowerLevels]+","+ + clientEventTestData[i3StateJoinRules]+","+ + clientEventTestData[i4StateHistoryVisibility]+","+ + clientEventTestData[i5AliceMsg]+","+ + clientEventTestData[i6AliceMsg]+","+ + clientEventTestData[i7AliceMsg]+","+ + clientEventTestData[i8StateAliceRoomName]+`], "limited": true, "prev_batch": "" } @@ -387,9 +399,7 @@ func main() { }`) // $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost" - if err := exe.WriteToTopic(inputTopic, canonicalJSONInput([]string{outputRoomEventTestData[9]})); err != nil { - panic(err) - } + writeToRoomServerLog(i9StateBobJoin) // Make sure alice sees it TODO: prev_batch testSyncServer(syncServerCmdChan, "@alice:localhost", "9", `{ @@ -416,7 +426,7 @@ func main() { "timeline": { "limited": false, "prev_batch": "", - "events": [`+clientEventTestData[9]+`] + "events": [`+clientEventTestData[i9StateBobJoin]+`] } } }, @@ -445,18 +455,18 @@ func main() { }, "state": { "events": [`+ - clientEventTestData[0]+","+ - clientEventTestData[1]+","+ - clientEventTestData[2]+","+ - clientEventTestData[3]+","+ - clientEventTestData[4]+","+ - clientEventTestData[8]+`] + clientEventTestData[i0StateRoomCreate]+","+ + clientEventTestData[i1StateAliceJoin]+","+ + clientEventTestData[i2StatePowerLevels]+","+ + clientEventTestData[i3StateJoinRules]+","+ + clientEventTestData[i4StateHistoryVisibility]+","+ + clientEventTestData[i8StateAliceRoomName]+`] }, "timeline": { "limited": false, "prev_batch": "", "events": [`+ - clientEventTestData[9]+`] + clientEventTestData[i9StateBobJoin]+`] } } }, @@ -465,9 +475,8 @@ func main() { }`) // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello alice"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/1?access_token=@bob:localhost" - if err := exe.WriteToTopic(inputTopic, canonicalJSONInput([]string{outputRoomEventTestData[10]})); err != nil { - panic(err) - } + writeToRoomServerLog(i10BobMsg) + // Make sure alice can see everything around the join point for bob TODO: prev_batch testSyncServer(syncServerCmdChan, "@alice:localhost", "7", `{ "account_data": { @@ -494,10 +503,10 @@ func main() { "limited": false, "prev_batch": "", "events": [`+ - clientEventTestData[7]+","+ - clientEventTestData[8]+","+ - clientEventTestData[9]+","+ - clientEventTestData[10]+`] + clientEventTestData[i7AliceMsg]+","+ + clientEventTestData[i8StateAliceRoomName]+","+ + clientEventTestData[i9StateBobJoin]+","+ + clientEventTestData[i10BobMsg]+`] } } }, @@ -508,9 +517,7 @@ func main() { // $ curl -XPUT -d '{"name":"A Different Custom Room Name"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost" // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello bob"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/2?access_token=@alice:localhost" // $ curl -XPUT -d '{"membership":"invite"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@bob:localhost" - if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(outputRoomEventTestData[11:14])); err != nil { - panic(err) - } + writeToRoomServerLog(i11StateAliceRoomName, i12AliceMsg, i13StateBobInviteCharlie) // Make sure charlie sees the invite both with and without a ?since= token // TODO: Invite state should include the invite event and the room name. @@ -540,8 +547,90 @@ func main() { // $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost" // $ curl -XPUT -d '{"msgtype":"m.text","body":"not charlie..."}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost" // $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@alice:localhost" + writeToRoomServerLog(i14StateCharlieJoin, i15AliceMsg, i16StateAliceKickCharlie) + + // Check transitions to leave work + testSyncServer(syncServerCmdChan, "@charlie:localhost", "15", `{ + "account_data": { + "events": [] + }, + "next_batch": "17", + "presence": { + "events": [] + }, + "rooms": { + "invite": {}, + "join": {}, + "leave": { + "!PjrbIMW2cIiaYF4t:localhost": { + "state": { + "events": [] + }, + "timeline": { + "limited": false, + "prev_batch": "", + "events": [`+ + clientEventTestData[i15AliceMsg]+","+ + clientEventTestData[i16StateAliceKickCharlie]+`] + } + } + } + } + }`) + + // Test joining and leaving the same room in a single /sync request puts the room in the 'leave' section. + // TODO: Use an earlier since value to assert that the /sync response doesn't leak messages + // from before charlie was joined to the room. Currently it does leak because RecentEvents doesn't + // take membership into account. + testSyncServer(syncServerCmdChan, "@charlie:localhost", "14", `{ + "account_data": { + "events": [] + }, + "next_batch": "17", + "presence": { + "events": [] + }, + "rooms": { + "invite": {}, + "join": {}, + "leave": { + "!PjrbIMW2cIiaYF4t:localhost": { + "state": { + "events": [] + }, + "timeline": { + "limited": false, + "prev_batch": "", + "events": [`+ + clientEventTestData[i14StateCharlieJoin]+","+ + clientEventTestData[i15AliceMsg]+","+ + clientEventTestData[i16StateAliceKickCharlie]+`] + } + } + } + } + }`) + // $ curl -XPUT -d '{"msgtype":"m.text","body":"why did you kick charlie"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost" // $ curl -XPUT -d '{"name":"No Charlies"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost" + writeToRoomServerLog(i17BobMsg, i18StateAliceRoomName) + + // Check that users don't see state changes in rooms after they have left + testSyncServer(syncServerCmdChan, "@charlie:localhost", "17", `{ + "account_data": { + "events": [] + }, + "next_batch": "19", + "presence": { + "events": [] + }, + "rooms": { + "invite": {}, + "join": {}, + "leave": {} + } + }`) + // $ curl -XPUT -d '{"msgtype":"m.text","body":"whatever"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost" // $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost" // $ curl -XPUT -d '{"msgtype":"m.text","body":"im alone now"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost" diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/testdata.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/testdata.go index 49a55eda..7f241e42 100644 --- a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/testdata.go +++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/testdata.go @@ -14,6 +14,37 @@ package main +const ( + i0StateRoomCreate = iota + i1StateAliceJoin + i2StatePowerLevels + i3StateJoinRules + i4StateHistoryVisibility + i5AliceMsg + i6AliceMsg + i7AliceMsg + i8StateAliceRoomName + i9StateBobJoin + i10BobMsg + i11StateAliceRoomName + i12AliceMsg + i13StateBobInviteCharlie + i14StateCharlieJoin + i15AliceMsg + i16StateAliceKickCharlie + i17BobMsg + i18StateAliceRoomName + i19BobMsg + i20StateBobLeave + i21AliceMsg + i22StateAliceInviteBob + i23StateBobRejectInvite + i24AliceMsg + i25StateAliceRoomName + i26StateCharlieJoin + i27CharlieMsg +) + var outputRoomEventTestData = []string{ // $ curl -XPOST -d '{}' "http://localhost:8009/_matrix/client/r0/createRoom?access_token=@alice:localhost" `{"Event":{"auth_events":[],"content":{"creator":"@alice:localhost"},"depth":1,"event_id":"$xz0fUB8zNMTGFh1W:localhost","hashes":{"sha256":"KKkpxS8NoH0igBbL3J+nJ39MRlmA7QgW4BGL7Fv4ASI"},"origin":"localhost","origin_server_ts":1494411218382,"prev_events":[],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"uZG5Q/Hs2Z611gFlZPdwomomRJKf70xV2FQV+gLWM1XgzkLDRlRF3cBZc9y3CnHKnV/upTcXs7Op2/GmgD3UBw"}},"state_key":"","type":"m.room.create"},"VisibilityEventIDs":null,"LatestEventIDs":["$xz0fUB8zNMTGFh1W:localhost"],"AddsStateEventIDs":["$xz0fUB8zNMTGFh1W:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":""}`, diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 83c43167..46e2b9f6 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -25,6 +25,12 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) +type stateDelta struct { + roomID string + stateEvents []gomatrixserverlib.Event + membership string +} + // SyncServerDatabase represents a sync server database type SyncServerDatabase struct { db *sql.DB @@ -115,67 +121,44 @@ func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error) // IncrementalSync returns all the data needed in order to create an incremental sync response. func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (res *types.Response, returnErr error) { returnErr = runTransaction(d.db, func(txn *sql.Tx) error { - roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join") - if err != nil { - return err - } - - state, err := d.events.StateBetween(txn, fromPos, toPos) + // Work out which rooms to return in the response. This is done by getting not only the currently + // joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions. + // This works out what the 'state' key should be for each room as well as which membership block + // to put the room into. + deltas, err := d.getStateDeltas(txn, fromPos, toPos, userID) if err != nil { return err } res = types.NewResponse(toPos) - - // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 - // - Get membership list changes for this user in this sync response - // - For each room which has membership list changes: - // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO). - // If it is, then we need to send the full room state down (and 'limited' is always true). - // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. - // * TODO Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block. - - // work out which rooms transitioned to 'joined' between the 2 stream positions and add full state where needed. - for roomID, stateEvents := range state { - for _, ev := range stateEvents { - // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event. - // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this, - // dupe join events will result in the entire room state coming down to the client again. This is added in - // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to - // the timeline. - if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) { - var memberContent events.MemberContent - if err := json.Unmarshal(ev.Content(), &memberContent); err != nil { - return err - } - if memberContent.Membership != "join" { - continue - } - - allState, err := d.roomstate.CurrentState(txn, roomID) - if err != nil { - return err - } - state[roomID] = allState - } - - } - } - - for _, roomID := range roomIDs { - recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, fromPos, toPos, numRecentEventsPerRoom) + for _, delta := range deltas { + recentEvents, err := d.events.RecentEventsInRoom(txn, delta.roomID, fromPos, toPos, numRecentEventsPerRoom) if err != nil { return err } - state[roomID] = removeDuplicates(state[roomID], recentEvents) + delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back - jr := types.NewJoinResponse() - jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true - jr.State.Events = gomatrixserverlib.ToClientEvents(state[roomID], gomatrixserverlib.FormatSync) - res.Rooms.Join[roomID] = *jr + switch delta.membership { + case "join": + jr := types.NewJoinResponse() + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[delta.roomID] = *jr + case "leave": + fallthrough // transitions to leave are the same as ban + case "ban": + // TODO: recentEvents may contain events that this user is not allowed to see because they are + // no longer in the room. + lr := types.NewLeaveResponse() + lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Leave[delta.roomID] = *lr + } } + // TODO: This should be done in getStateDeltas return d.addInvitesToResponse(txn, userID, res) }) return @@ -242,6 +225,67 @@ func (d *SyncServerDatabase) addInvitesToResponse(txn *sql.Tx, userID string, re return nil } +func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.StreamPosition, userID string) ([]stateDelta, error) { + // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 + // - Get membership list changes for this user in this sync response + // - For each room which has membership list changes: + // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO). + // If it is, then we need to send the full room state down (and 'limited' is always true). + // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. + // * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block. + // - Get all CURRENTLY joined rooms, and add them to 'joined' block. + var deltas []stateDelta + + // get all the state events ever between these two positions + state, err := d.events.StateBetween(txn, fromPos, toPos) + if err != nil { + return nil, err + } + for roomID, stateEvents := range state { + for _, ev := range stateEvents { + // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event. + // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this, + // dupe join events will result in the entire room state coming down to the client again. This is added in + // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to + // the timeline. + if membership := getMembershipFromEvent(&ev, userID); membership != "" { + if membership == "join" { + // send full room state down instead of a delta + var allState []gomatrixserverlib.Event + allState, err = d.roomstate.CurrentState(txn, roomID) + if err != nil { + return nil, err + } + state[roomID] = allState + continue // we'll add this room in when we do joined rooms + } + + deltas = append(deltas, stateDelta{ + membership: membership, + stateEvents: stateEvents, + roomID: roomID, + }) + break + } + } + } + + // Add in currently joined rooms + joinedRoomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join") + if err != nil { + return nil, err + } + for _, joinedRoomID := range joinedRoomIDs { + deltas = append(deltas, stateDelta{ + membership: "join", + stateEvents: state[joinedRoomID], + roomID: joinedRoomID, + }) + } + + return deltas, nil +} + // There may be some overlap where events in stateEvents are already in recentEvents, so filter // them out so we don't include them twice in the /sync response. They should be in recentEvents // only, so clients get to the correct state once they have rolled forward. @@ -267,6 +311,19 @@ func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gom return stateEvents } +// getMembershipFromEvent returns the value of content.membership iff the event is a state event +// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned. +func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string { + if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) { + var memberContent events.MemberContent + if err := json.Unmarshal(ev.Content(), &memberContent); err != nil { + return "" + } + return memberContent.Membership + } + return "" +} + func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { txn, err := db.Begin() if err != nil {