From 66b397b3c60c51bba36e4bce858733b2fda26f6a Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 27 Apr 2022 11:25:07 +0100 Subject: [PATCH] Don't create fictitious presence entries (#2381) * Don't create fictitious presence entries for users that don't have any * Update whitelist, since that test probably shouldn't be passing * Fix panics --- syncapi/consumers/presence.go | 9 ++++++++- syncapi/storage/postgres/presence_table.go | 3 +++ syncapi/storage/sqlite3/presence_table.go | 3 +++ syncapi/streams/stream_presence.go | 12 +++++------ syncapi/sync/requestpool.go | 23 ++++++++++++---------- sytest-whitelist | 2 -- 6 files changed, 33 insertions(+), 19 deletions(-) diff --git a/syncapi/consumers/presence.go b/syncapi/consumers/presence.go index b198b229..6bcca48f 100644 --- a/syncapi/consumers/presence.go +++ b/syncapi/consumers/presence.go @@ -88,6 +88,11 @@ func (s *PresenceConsumer) Start() error { } return } + if presence == nil { + presence = &types.PresenceInternal{ + UserID: userID, + } + } deviceRes := api.QueryDevicesResponse{} if err = s.deviceAPI.QueryDevices(s.ctx, &api.QueryDevicesRequest{UserID: userID}, &deviceRes); err != nil { @@ -106,7 +111,9 @@ func (s *PresenceConsumer) Start() error { m.Header.Set(jetstream.UserID, presence.UserID) m.Header.Set("presence", presence.ClientFields.Presence) - m.Header.Set("status_msg", *presence.ClientFields.StatusMsg) + if presence.ClientFields.StatusMsg != nil { + m.Header.Set("status_msg", *presence.ClientFields.StatusMsg) + } m.Header.Set("last_active_ts", strconv.Itoa(int(presence.LastActiveTS))) if err = msg.RespondMsg(m); err != nil { diff --git a/syncapi/storage/postgres/presence_table.go b/syncapi/storage/postgres/presence_table.go index 49336c4e..9f1e37f7 100644 --- a/syncapi/storage/postgres/presence_table.go +++ b/syncapi/storage/postgres/presence_table.go @@ -127,6 +127,9 @@ func (p *presenceStatements) GetPresenceForUser( } stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt) err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS) + if err == sql.ErrNoRows { + return nil, nil + } result.ClientFields.Presence = result.Presence.String() return result, err } diff --git a/syncapi/storage/sqlite3/presence_table.go b/syncapi/storage/sqlite3/presence_table.go index 00b16458..177a01bf 100644 --- a/syncapi/storage/sqlite3/presence_table.go +++ b/syncapi/storage/sqlite3/presence_table.go @@ -142,6 +142,9 @@ func (p *presenceStatements) GetPresenceForUser( } stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt) err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS) + if err == sql.ErrNoRows { + return nil, nil + } result.ClientFields.Presence = result.Presence.String() return result, err } diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go index 9a6c5c13..614b88d4 100644 --- a/syncapi/streams/stream_presence.go +++ b/syncapi/streams/stream_presence.go @@ -16,7 +16,6 @@ package streams import ( "context" - "database/sql" "encoding/json" "sync" @@ -80,11 +79,10 @@ func (p *PresenceStreamProvider) IncrementalSync( if _, ok := presences[roomUsers[i]]; ok { continue } + // Bear in mind that this might return nil, but at least populating + // a nil means that there's a map entry so we won't repeat this call. presences[roomUsers[i]], err = p.DB.GetPresence(ctx, roomUsers[i]) if err != nil { - if err == sql.ErrNoRows { - continue - } req.Log.WithError(err).Error("unable to query presence for user") return from } @@ -93,8 +91,10 @@ func (p *PresenceStreamProvider) IncrementalSync( } lastPos := to - for i := range presences { - presence := presences[i] + for _, presence := range presences { + if presence == nil { + continue + } // Ignore users we don't share a room with if req.Device.UserID != presence.UserID && !p.notifier.IsSharedUser(req.Device.UserID, presence.UserID) { continue diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 70334099..76d550a6 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -127,14 +127,23 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user if !ok { // this should almost never happen return } + newPresence := types.PresenceInternal{ - ClientFields: types.PresenceClientResponse{ - Presence: presenceID.String(), - }, Presence: presenceID, UserID: userID, LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()), } + + // ensure we also send the current status_msg to federated servers and not nil + dbPresence, err := db.GetPresence(context.Background(), userID) + if err != nil && err != sql.ErrNoRows { + return + } + if dbPresence != nil { + newPresence.ClientFields = dbPresence.ClientFields + } + newPresence.ClientFields.Presence = presenceID.String() + defer rp.presence.Store(userID, newPresence) // avoid spamming presence updates when syncing existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence) @@ -145,13 +154,7 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user } } - // ensure we also send the current status_msg to federated servers and not nil - dbPresence, err := db.GetPresence(context.Background(), userID) - if err != nil && err != sql.ErrNoRows { - return - } - - if err := rp.producer.SendPresence(userID, presenceID, dbPresence.ClientFields.StatusMsg); err != nil { + if err := rp.producer.SendPresence(userID, presenceID, newPresence.ClientFields.StatusMsg); err != nil { logrus.WithError(err).Error("Unable to publish presence message from sync") return } diff --git a/sytest-whitelist b/sytest-whitelist index c9829606..6af8d89f 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -681,8 +681,6 @@ GET /presence/:user_id/status fetches initial status PUT /presence/:user_id/status updates my presence Presence change reports an event to myself Existing members see new members' presence -#Existing members see new member's presence -Newly joined room includes presence in incremental sync Get presence for newly joined members in incremental sync User sees their own presence in a sync User sees updates to presence from other users in the incremental sync.