diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go index 82834239..87f0d86d 100644 --- a/syncapi/notifier/notifier.go +++ b/syncapi/notifier/notifier.go @@ -333,6 +333,20 @@ func (n *Notifier) Load(ctx context.Context, db storage.Database) error { return nil } +// LoadRooms loads the membership states required to notify users correctly. +func (n *Notifier) LoadRooms(ctx context.Context, db storage.Database, roomIDs []string) error { + n.lock.Lock() + defer n.lock.Unlock() + + roomToUsers, err := db.AllJoinedUsersInRoom(ctx, roomIDs) + if err != nil { + return err + } + n.setUsersJoinedToRooms(roomToUsers) + + return nil +} + // CurrentPosition returns the current sync position func (n *Notifier) CurrentPosition() types.StreamingToken { n.lock.RLock() diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 48697859..5a036d88 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -52,6 +52,9 @@ type Database interface { // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) + // AllJoinedUsersInRoom returns a map of room ID to a list of all joined user IDs for a given room. + AllJoinedUsersInRoom(ctx context.Context, roomIDs []string) (map[string][]string, error) + // AllPeekingDevicesInRooms returns a map of room ID to a list of all peeking devices. AllPeekingDevicesInRooms(ctx context.Context) (map[string][]types.PeekingDevice, error) // Events lookups a list of event by their event ID. diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index fe68788d..8ee387b3 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -93,6 +93,9 @@ const selectCurrentStateSQL = "" + const selectJoinedUsersSQL = "" + "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'" +const selectJoinedUsersInRoomSQL = "" + + "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join' AND room_id = ANY($1)" + const selectStateEventSQL = "" + "SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3" @@ -112,6 +115,7 @@ type currentRoomStateStatements struct { selectRoomIDsWithAnyMembershipStmt *sql.Stmt selectCurrentStateStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt + selectJoinedUsersInRoomStmt *sql.Stmt selectEventsWithEventIDsStmt *sql.Stmt selectStateEventStmt *sql.Stmt } @@ -143,6 +147,9 @@ func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, erro if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil { return nil, err } + if s.selectJoinedUsersInRoomStmt, err = db.Prepare(selectJoinedUsersInRoomSQL); err != nil { + return nil, err + } if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil { return nil, err } @@ -163,9 +170,32 @@ func (s *currentRoomStateStatements) SelectJoinedUsers( defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedUsers: rows.close() failed") result := make(map[string][]string) + var roomID string + var userID string + for rows.Next() { + if err := rows.Scan(&roomID, &userID); err != nil { + return nil, err + } + users := result[roomID] + users = append(users, userID) + result[roomID] = users + } + return result, rows.Err() +} + +// SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room. +func (s *currentRoomStateStatements) SelectJoinedUsersInRoom( + ctx context.Context, roomIDs []string, +) (map[string][]string, error) { + rows, err := s.selectJoinedUsersInRoomStmt.QueryContext(ctx, pq.StringArray(roomIDs)) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedUsers: rows.close() failed") + + result := make(map[string][]string) + var userID, roomID string for rows.Next() { - var roomID string - var userID string if err := rows.Scan(&roomID, &userID); err != nil { return nil, err } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index b7d2d3a2..ec5edd35 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -168,6 +168,10 @@ func (d *Database) AllJoinedUsersInRooms(ctx context.Context) (map[string][]stri return d.CurrentRoomState.SelectJoinedUsers(ctx) } +func (d *Database) AllJoinedUsersInRoom(ctx context.Context, roomIDs []string) (map[string][]string, error) { + return d.CurrentRoomState.SelectJoinedUsersInRoom(ctx, roomIDs) +} + func (d *Database) AllPeekingDevicesInRooms(ctx context.Context) (map[string][]types.PeekingDevice, error) { return d.Peeks.SelectPeekingDevices(ctx) } diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index ccda005c..f0a1c7bb 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -77,6 +77,9 @@ const selectCurrentStateSQL = "" + const selectJoinedUsersSQL = "" + "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'" +const selectJoinedUsersInRoomSQL = "" + + "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join' AND room_id IN ($1)" + const selectStateEventSQL = "" + "SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3" @@ -97,7 +100,8 @@ type currentRoomStateStatements struct { selectRoomIDsWithMembershipStmt *sql.Stmt selectRoomIDsWithAnyMembershipStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt - selectStateEventStmt *sql.Stmt + //selectJoinedUsersInRoomStmt *sql.Stmt - prepared at runtime due to variadic + selectStateEventStmt *sql.Stmt } func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *StreamIDStatements) (tables.CurrentRoomState, error) { @@ -127,13 +131,16 @@ func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *StreamIDStatements) (t if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil { return nil, err } + //if s.selectJoinedUsersInRoomStmt, err = db.Prepare(selectJoinedUsersInRoomSQL); err != nil { + // return nil, err + //} if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { return nil, err } return s, nil } -// JoinedMemberLists returns a map of room ID to a list of joined user IDs. +// SelectJoinedUsers returns a map of room ID to a list of joined user IDs. func (s *currentRoomStateStatements) SelectJoinedUsers( ctx context.Context, ) (map[string][]string, error) { @@ -144,9 +151,9 @@ func (s *currentRoomStateStatements) SelectJoinedUsers( defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedUsers: rows.close() failed") result := make(map[string][]string) + var roomID string + var userID string for rows.Next() { - var roomID string - var userID string if err := rows.Scan(&roomID, &userID); err != nil { return nil, err } @@ -157,6 +164,40 @@ func (s *currentRoomStateStatements) SelectJoinedUsers( return result, nil } +// SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room. +func (s *currentRoomStateStatements) SelectJoinedUsersInRoom( + ctx context.Context, roomIDs []string, +) (map[string][]string, error) { + query := strings.Replace(selectJoinedUsersInRoomSQL, "($1)", sqlutil.QueryVariadic(len(roomIDs)), 1) + params := make([]interface{}, 0, len(roomIDs)) + for _, roomID := range roomIDs { + params = append(params, roomID) + } + stmt, err := s.db.Prepare(query) + if err != nil { + return nil, fmt.Errorf("SelectJoinedUsersInRoom s.db.Prepare: %w", err) + } + defer internal.CloseAndLogIfError(ctx, stmt, "SelectJoinedUsersInRoom: stmt.close() failed") + + rows, err := stmt.QueryContext(ctx, params...) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectJoinedUsersInRoom: rows.close() failed") + + result := make(map[string][]string) + var userID, roomID string + for rows.Next() { + if err := rows.Scan(&roomID, &userID); err != nil { + return nil, err + } + users := result[roomID] + users = append(users, userID) + result[roomID] = users + } + return result, rows.Err() +} + // SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state. func (s *currentRoomStateStatements) SelectRoomIDsWithMembership( ctx context.Context, diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 6fdd9483..ccdebfdb 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -102,6 +102,8 @@ type CurrentRoomState interface { SelectRoomIDsWithAnyMembership(ctx context.Context, txn *sql.Tx, userID string) (map[string]string, error) // SelectJoinedUsers returns a map of room ID to a list of joined user IDs. SelectJoinedUsers(ctx context.Context) (map[string][]string, error) + // SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room. + SelectJoinedUsersInRoom(ctx context.Context, roomIDs []string) (map[string][]string, error) } // BackwardsExtremities keeps track of backwards extremities for a room. diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go index 675a7a17..a84d1987 100644 --- a/syncapi/streams/stream_presence.go +++ b/syncapi/streams/stream_presence.go @@ -67,9 +67,8 @@ func (p *PresenceStreamProvider) IncrementalSync( // add newly joined rooms user presences newlyJoined := joinedRooms(req.Response, req.Device.UserID) if len(newlyJoined) > 0 { - // TODO: This refreshes all lists and is quite expensive - // The notifier should update the lists itself - if err = p.notifier.Load(ctx, p.DB); err != nil { + // TODO: Check if this is working better than before. + if err = p.notifier.LoadRooms(ctx, p.DB, newlyJoined); err != nil { req.Log.WithError(err).Error("unable to refresh notifier lists") return from }