diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index 1b541991..e0e5891d 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -100,8 +100,10 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty } } -// WaitForEvents blocks until there are new events for this request. -func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition { +// WaitForEvents blocks until there are events for this request after sincePos. +// In particular, it will return immediately if there are already events after +// sincePos for the request, but otherwise blocks waiting for new events. +func (n *Notifier) WaitForEvents(req syncRequest, sincePos types.StreamPosition) types.StreamPosition { // Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298 // - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID // - Incoming events wake requests for a matching room ID @@ -117,7 +119,7 @@ func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition { // TODO: We increment the stream position for any event, so it's possible that we return immediately // with a pos which contains no new events for this user. We should probably re-wait for events // automatically in this case. - if req.since != currentPos { + if sincePos != currentPos { n.streamLock.Unlock() return currentPos } @@ -141,6 +143,11 @@ func (n *Notifier) Load(ctx context.Context, db *storage.SyncServerDatabase) err return nil } +// CurrentPosition returns the current stream position +func (n *Notifier) CurrentPosition() types.StreamPosition { + return n.currPos +} + // setUsersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from // these rooms will wake the given users /sync requests. This should be called prior to ANY calls to // OnNewEvent (eg on startup) to prevent racing. diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go index 358243bc..7aab417b 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -258,7 +258,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) { done := make(chan types.StreamPosition, 1) go func() { - newPos := n.WaitForEvents(req) + newPos := n.WaitForEvents(req, req.since) done <- newPos close(done) }() diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index bd5909ef..81469087 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -62,48 +62,76 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype "timeout": syncReq.timeout, }).Info("Incoming /sync request") - // Fork off 2 goroutines: one to do the work, and one to serve as a timeout. - // Whichever returns first is the one we will serve back to the client. - timeoutChan := make(chan struct{}) - timer := time.AfterFunc(syncReq.timeout, func() { - close(timeoutChan) // signal that the timeout has expired - }) + currPos := rp.notifier.CurrentPosition() - done := make(chan util.JSONResponse) - go func() { - currentPos := rp.notifier.WaitForEvents(*syncReq) - // We stop the timer BEFORE calculating the response so the cpu work - // done to calculate the response is not timed. This stops us from - // doing lots of work then timing out and sending back an empty response. - timer.Stop() - syncData, err := rp.currentSyncForUser(*syncReq, currentPos) - var res util.JSONResponse + // If this is an initial sync or timeout=0 we return immediately + if syncReq.since == types.StreamPosition(0) || syncReq.timeout == 0 { + syncData, err := rp.currentSyncForUser(*syncReq, currPos) if err != nil { - res = httputil.LogThenError(req, err) - } else { - syncData, err = rp.appendAccountData(syncData, device.UserID, *syncReq, currentPos) - if err != nil { - res = httputil.LogThenError(req, err) - } else { - res = util.JSONResponse{ - Code: 200, - JSON: syncData, - } - } + return httputil.LogThenError(req, err) } - done <- res - close(done) - }() - - select { - case <-timeoutChan: // timeout fired return util.JSONResponse{ Code: 200, - JSON: types.NewResponse(syncReq.since), + JSON: syncData, } - case res := <-done: // received a response - return res } + + // Otherwise, we wait for the notifier to tell us if something *may* have + // happened. We loop in case it turns out that nothing did happen. + + timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above + defer timer.Stop() + + for { + select { + // Wait for notifier to wake us up + case currPos = <-rp.makeNotifyChannel(*syncReq, currPos): + // Or for timeout to expire + case <-timer.C: + return util.JSONResponse{ + Code: 200, + JSON: types.NewResponse(syncReq.since), + } + // Or for the request to be cancelled + case <-req.Context().Done(): + return httputil.LogThenError(req, req.Context().Err()) + } + + // Note that we don't time out during calculation of sync + // response. This ensures that we don't waste the hard work + // of calculating the sync only to get timed out before we + // can respond + + syncData, err := rp.currentSyncForUser(*syncReq, currPos) + if err != nil { + return httputil.LogThenError(req, err) + } + if !syncData.IsEmpty() { + return util.JSONResponse{ + Code: 200, + JSON: syncData, + } + } + + } +} + +// makeNotifyChannel returns a channel that produces the current stream position +// when there *may* be something to return to the client. Only produces a single +// value and then closes the channel. +func (rp *RequestPool) makeNotifyChannel(syncReq syncRequest, sincePos types.StreamPosition) chan types.StreamPosition { + notified := make(chan types.StreamPosition) + + // TODO(#303): We need to ensure that WaitForEvents gets properly cancelled + // when the request is finished, or use some other mechanism to ensure we + // don't leak goroutines here + go (func() { + currentPos := rp.notifier.WaitForEvents(syncReq, sincePos) + notified <- currentPos + close(notified) + })() + + return notified } type stateEventInStateResp struct { @@ -196,12 +224,20 @@ func (rp *RequestPool) OnIncomingStateTypeRequest(req *http.Request, roomID stri } } -func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (*types.Response, error) { +func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) { // TODO: handle ignored users if req.since == types.StreamPosition(0) { - return rp.db.CompleteSync(req.ctx, req.userID, req.limit) + res, err = rp.db.CompleteSync(req.ctx, req.userID, req.limit) + } else { + res, err = rp.db.IncrementalSync(req.ctx, req.userID, req.since, currentPos, req.limit) } - return rp.db.IncrementalSync(req.ctx, req.userID, req.since, currentPos, req.limit) + + if err != nil { + return + } + + res, err = rp.appendAccountData(res, req.userID, req, currentPos) + return } func (rp *RequestPool) appendAccountData( diff --git a/src/github.com/matrix-org/dendrite/syncapi/types/types.go b/src/github.com/matrix-org/dendrite/syncapi/types/types.go index f710c6d5..d0b1c38a 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/types/types.go +++ b/src/github.com/matrix-org/dendrite/syncapi/types/types.go @@ -74,6 +74,16 @@ func NewResponse(pos StreamPosition) *Response { return &res } +// IsEmpty returns true if the response is empty, i.e. used to decided whether +// to return the response immediately to the client or to wait for more data. +func (r *Response) IsEmpty() bool { + return len(r.Rooms.Join) == 0 && + len(r.Rooms.Invite) == 0 && + len(r.Rooms.Leave) == 0 && + len(r.AccountData.Events) == 0 && + len(r.Presence.Events) == 0 +} + // JoinResponse represents a /sync response for a room which is under the 'join' key. type JoinResponse struct { State struct {