diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go index e63680ae..4b92ce67 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go @@ -4,6 +4,7 @@ import ( "database/sql" // Import the postgres database driver. _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/clientapi/sync/syncapi" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/gomatrixserverlib" ) @@ -41,13 +42,14 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // when generating the stream position for this event. Returns the sync stream position for the inserted event. // Returns an error if there was a problem inserting this event. -func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos int64, returnErr error) { +func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos syncapi.StreamPosition, returnErr error) { returnErr = runTransaction(d.db, func(txn *sql.Tx) error { var err error - streamPos, err = d.events.InsertEvent(txn, ev, addStateEventIDs, removeStateEventIDs) + pos, err := d.events.InsertEvent(txn, ev, addStateEventIDs, removeStateEventIDs) if err != nil { return err } + streamPos = syncapi.StreamPosition(pos) if len(addStateEventIDs) == 0 && len(removeStateEventIDs) == 0 { // Nothing to do, the event may have just been a message event. @@ -86,13 +88,17 @@ func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, o } // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. -func (d *SyncServerDatabase) SyncStreamPosition() (int64, error) { - return d.events.MaxID() +func (d *SyncServerDatabase) SyncStreamPosition() (syncapi.StreamPosition, error) { + id, err := d.events.MaxID() + if err != nil { + return syncapi.StreamPosition(0), err + } + return syncapi.StreamPosition(id), nil } // EventsInRange returns all events in the given range, exclusive of oldPos, inclusive of newPos. -func (d *SyncServerDatabase) EventsInRange(oldPos, newPos int64) ([]gomatrixserverlib.Event, error) { - return d.events.InRange(oldPos, newPos) +func (d *SyncServerDatabase) EventsInRange(oldPos, newPos syncapi.StreamPosition) ([]gomatrixserverlib.Event, error) { + return d.events.InRange(int64(oldPos), int64(newPos)) } func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go index 051e8663..e68606c2 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go @@ -11,6 +11,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/storage" + "github.com/matrix-org/dendrite/clientapi/sync/syncapi" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -20,7 +21,7 @@ const defaultSyncTimeout = time.Duration(30) * time.Second type syncRequest struct { userID string timeout time.Duration - since syncStreamPosition + since syncapi.StreamPosition wantFullState bool } @@ -28,7 +29,7 @@ type syncRequest struct { type RequestPool struct { db *storage.SyncServerDatabase // The latest sync stream position: guarded by 'cond'. - currPos syncStreamPosition + currPos syncapi.StreamPosition // A condition variable to notify all waiting goroutines of a new sync stream position cond *sync.Cond } @@ -39,7 +40,7 @@ func NewRequestPool(db *storage.SyncServerDatabase) (*RequestPool, error) { if err != nil { return nil, err } - return &RequestPool{db, syncStreamPosition(pos), sync.NewCond(&sync.Mutex{})}, nil + return &RequestPool{db, syncapi.StreamPosition(pos), sync.NewCond(&sync.Mutex{})}, nil } // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be @@ -114,7 +115,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons // OnNewEvent is called when a new event is received from the room server. Must only be // called from a single goroutine, to avoid races between updates which could set the // current position in the stream incorrectly. -func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPosition) { +func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncapi.StreamPosition) { // update the current position in a guard and then notify all /sync streams rp.cond.L.Lock() rp.currPos = pos @@ -123,7 +124,7 @@ func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPos rp.cond.Broadcast() // notify ALL waiting goroutines } -func (rp *RequestPool) waitForEvents(req syncRequest) syncStreamPosition { +func (rp *RequestPool) waitForEvents(req syncRequest) syncapi.StreamPosition { // In a guard, check if the /sync request should block, and block it until we get a new position rp.cond.L.Lock() currentPos := rp.currPos @@ -137,9 +138,38 @@ func (rp *RequestPool) waitForEvents(req syncRequest) syncStreamPosition { return currentPos } -func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib.Event, error) { +func (rp *RequestPool) currentSyncForUser(req syncRequest) (*syncapi.Response, error) { currentPos := rp.waitForEvents(req) - return rp.db.EventsInRange(int64(req.since), int64(currentPos)) + + // TODO: handle ignored users + + // TODO: Implement https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 + // 1) Get the CURRENT joined room list for this user + // 2) Get membership list changes for this user between the provided stream position and now. + // 3) For each room which has membership list changes: + // a) Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins). + // If it is, then we need to send the full room state down (and 'limited' is always true). + // b) Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. + // c) Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block. // Synapse has a TODO: How do we handle ban -> leave in same batch? + // 4) Add joined rooms (joined room list) + + events, err := rp.db.EventsInRange(req.since, currentPos) + if err != nil { + return nil, err + } + + res := syncapi.NewResponse() + // for now, dump everything as join timeline events + for _, ev := range events { + roomData := res.Rooms.Join[ev.RoomID()] + roomData.Timeline.Events = append(roomData.Timeline.Events, ev) + res.Rooms.Join[ev.RoomID()] = roomData + } + + // Make sure we send the next_batch as a string. We don't want to confuse clients by sending this + // as an integer even though (at the moment) it is. + res.NextBatch = currentPos.String() + return res, nil } func getTimeout(timeoutMS string) time.Duration { @@ -153,13 +183,13 @@ func getTimeout(timeoutMS string) time.Duration { return time.Duration(i) * time.Millisecond } -func getSyncStreamPosition(since string) (syncStreamPosition, error) { +func getSyncStreamPosition(since string) (syncapi.StreamPosition, error) { if since == "" { - return syncStreamPosition(0), nil + return syncapi.StreamPosition(0), nil } i, err := strconv.Atoi(since) if err != nil { - return syncStreamPosition(0), err + return syncapi.StreamPosition(0), err } - return syncStreamPosition(i), nil + return syncapi.StreamPosition(i), nil } diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncapi/syncapi.go b/src/github.com/matrix-org/dendrite/clientapi/sync/syncapi/syncapi.go new file mode 100644 index 00000000..24603c03 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/syncapi/syncapi.go @@ -0,0 +1,112 @@ +package syncapi + +import ( + "strconv" + + "github.com/matrix-org/gomatrixserverlib" +) + +// StreamPosition represents the offset in the sync stream a client is at. +type StreamPosition int64 + +// String implements the Stringer interface. +func (sp StreamPosition) String() string { + return strconv.FormatInt(int64(sp), 10) +} + +// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync +type Response struct { + NextBatch string `json:"next_batch"` + AccountData struct { + Events []gomatrixserverlib.Event `json:"events"` + } `json:"account_data"` + Presence struct { + Events []gomatrixserverlib.Event `json:"events"` + } `json:"presence"` + Rooms struct { + Join map[string]JoinResponse `json:"join"` + Invite map[string]InviteResponse `json:"invite"` + Leave map[string]LeaveResponse `json:"leave"` + } `json:"rooms"` +} + +// NewResponse creates an empty response with initialised maps. +func NewResponse() *Response { + res := Response{} + // Pre-initalise the maps. Synapse will return {} even if there are no rooms under a specific section, + // so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors. + res.Rooms.Join = make(map[string]JoinResponse) + res.Rooms.Invite = make(map[string]InviteResponse) + res.Rooms.Leave = make(map[string]LeaveResponse) + + // Also pre-intialise empty slices or else we'll insert 'null' instead of '[]' for the value. + // TODO: We really shouldn't have to do all this to coerce encoding/json to Do The Right Thing. We should + // really be using our own Marshal/Unmarshal implementations otherwise this may prove to be a CPU bottleneck. + // This also applies to NewJoinResponse, NewInviteResponse and NewLeaveResponse. + res.AccountData.Events = make([]gomatrixserverlib.Event, 0) + res.Presence.Events = make([]gomatrixserverlib.Event, 0) + + return &res +} + +// JoinResponse represents a /sync response for a room which is under the 'join' key. +type JoinResponse struct { + State struct { + Events []gomatrixserverlib.Event `json:"events"` + } `json:"state"` + Timeline struct { + Events []gomatrixserverlib.Event `json:"events"` + Limited bool `json:"limited"` + PrevBatch string `json:"prev_batch"` + } `json:"timeline"` + Ephemeral struct { + Events []gomatrixserverlib.Event `json:"events"` + } `json:"ephemeral"` + AccountData struct { + Events []gomatrixserverlib.Event `json:"events"` + } `json:"account_data"` +} + +// NewJoinResponse creates an empty response with initialised arrays. +func NewJoinResponse() *JoinResponse { + res := JoinResponse{} + res.State.Events = make([]gomatrixserverlib.Event, 0) + res.Timeline.Events = make([]gomatrixserverlib.Event, 0) + res.Ephemeral.Events = make([]gomatrixserverlib.Event, 0) + res.AccountData.Events = make([]gomatrixserverlib.Event, 0) + return &res +} + +// InviteResponse represents a /sync response for a room which is under the 'invite' key. +type InviteResponse struct { + InviteState struct { + Events []gomatrixserverlib.Event + } `json:"invite_state"` +} + +// NewInviteResponse creates an empty response with initialised arrays. +func NewInviteResponse() *InviteResponse { + res := InviteResponse{} + res.InviteState.Events = make([]gomatrixserverlib.Event, 0) + return &res +} + +// LeaveResponse represents a /sync response for a room which is under the 'leave' key. +type LeaveResponse struct { + State struct { + Events []gomatrixserverlib.Event `json:"events"` + } `json:"state"` + Timeline struct { + Events []gomatrixserverlib.Event `json:"events"` + Limited bool `json:"limited"` + PrevBatch string `json:"prev_batch"` + } `json:"timeline"` +} + +// NewLeaveResponse creates an empty response with initialised arrays. +func NewLeaveResponse() *LeaveResponse { + res := LeaveResponse{} + res.State.Events = make([]gomatrixserverlib.Event, 0) + res.Timeline.Events = make([]gomatrixserverlib.Event, 0) + return &res +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go index 892c163d..142b6f11 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go @@ -6,15 +6,13 @@ import ( log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/config" "github.com/matrix-org/dendrite/clientapi/storage" + "github.com/matrix-org/dendrite/clientapi/sync/syncapi" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" sarama "gopkg.in/Shopify/sarama.v1" ) -// syncStreamPosition represents the offset in the sync stream a client is at. -type syncStreamPosition int64 - // Server contains all the logic for running a sync server type Server struct { roomServerConsumer *common.ContinualConsumer @@ -83,7 +81,7 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: write event failure") return nil } - s.rp.OnNewEvent(&ev, syncStreamPosition(syncStreamPos)) + s.rp.OnNewEvent(&ev, syncapi.StreamPosition(syncStreamPos)) return nil }