From 5592322e13d0bf741130425079e37979f7637564 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 3 Mar 2022 16:45:06 +0000 Subject: [PATCH] Clean old notifications regularly (#2244) * Clean old notifications regularly We'll keep highlights for a month and non-highlights for a day, to stop the `userapi_notifications` table from growing indefinitely. We'll also allow storing events even if no pushers are present, because apparently Element Web expects to work that way. * Fix the milliseconds * Use process context * Update sytest lists * Fix build issue --- sytest-blacklist | 1 + sytest-whitelist | 1 - userapi/consumers/syncapi_streamevent.go | 3 -- userapi/storage/interface.go | 1 + .../storage/postgres/notifications_table.go | 28 +++++++++++++++---- userapi/storage/shared/storage.go | 4 +++ .../storage/sqlite3/notifications_table.go | 28 +++++++++++++++---- userapi/storage/tables/interface.go | 1 + userapi/userapi.go | 12 ++++++++ 9 files changed, 63 insertions(+), 16 deletions(-) diff --git a/sytest-blacklist b/sytest-blacklist index 978dcde8..becc500e 100644 --- a/sytest-blacklist +++ b/sytest-blacklist @@ -35,3 +35,4 @@ AS-ghosted users can use rooms themselves # Flakey, need additional investigation Messages that notify from another user increment notification_count Messages that highlight from another user increment unread highlight count +Notifications can be viewed with GET /notifications \ No newline at end of file diff --git a/sytest-whitelist b/sytest-whitelist index 602f8646..63d779bf 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -638,7 +638,6 @@ Rooms with many users are correctly pushed Don't get pushed for rooms you've muted Rejected events are not pushed Test that rejected pushers are removed. -Notifications can be viewed with GET /notifications Trying to add push rule with no scope fails with 400 Trying to add push rule with invalid scope fails with 400 Forward extremities remain so even after the next events are populated as outliers diff --git a/userapi/consumers/syncapi_streamevent.go b/userapi/consumers/syncapi_streamevent.go index d86078cb..11081327 100644 --- a/userapi/consumers/syncapi_streamevent.go +++ b/userapi/consumers/syncapi_streamevent.go @@ -139,9 +139,6 @@ func (s *OutputStreamEventConsumer) processMessage(ctx context.Context, event *g // removing it means we can send all notifications to // e.g. Element's Push gateway in one go. for _, mem := range members { - if p, err := s.db.GetPushers(ctx, mem.Localpart); err != nil || len(p) == 0 { - continue - } if err := s.notifyLocal(ctx, event, pos, mem, roomSize, roomName); err != nil { log.WithFields(log.Fields{ "localpart": mem.Localpart, diff --git a/userapi/storage/interface.go b/userapi/storage/interface.go index 6d22fea9..77706710 100644 --- a/userapi/storage/interface.go +++ b/userapi/storage/interface.go @@ -97,6 +97,7 @@ type Database interface { GetNotifications(ctx context.Context, localpart string, fromID int64, limit int, filter tables.NotificationFilter) ([]*api.Notification, int64, error) GetNotificationCount(ctx context.Context, localpart string, filter tables.NotificationFilter) (int64, error) GetRoomNotificationCounts(ctx context.Context, localpart, roomID string) (total int64, highlight int64, _ error) + DeleteOldNotifications(ctx context.Context) error UpsertPusher(ctx context.Context, p api.Pusher, localpart string) error GetPushers(ctx context.Context, localpart string) ([]api.Pusher, error) diff --git a/userapi/storage/postgres/notifications_table.go b/userapi/storage/postgres/notifications_table.go index 7bcc0f9c..a27c1125 100644 --- a/userapi/storage/postgres/notifications_table.go +++ b/userapi/storage/postgres/notifications_table.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "encoding/json" + "time" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -28,12 +29,13 @@ import ( ) type notificationsStatements struct { - insertStmt *sql.Stmt - deleteUpToStmt *sql.Stmt - updateReadStmt *sql.Stmt - selectStmt *sql.Stmt - selectCountStmt *sql.Stmt - selectRoomCountsStmt *sql.Stmt + insertStmt *sql.Stmt + deleteUpToStmt *sql.Stmt + updateReadStmt *sql.Stmt + selectStmt *sql.Stmt + selectCountStmt *sql.Stmt + selectRoomCountsStmt *sql.Stmt + cleanNotificationsStmt *sql.Stmt } const notificationSchema = ` @@ -77,6 +79,10 @@ const selectRoomNotificationCountsSQL = "" + "SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " + "WHERE localpart = $1 AND room_id = $2 AND NOT read" +const cleanNotificationsSQL = "" + + "DELETE FROM userapi_notifications WHERE" + + " (highlight = FALSE AND ts_ms < $1) OR (highlight = TRUE AND ts_ms < $2)" + func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error) { s := ¬ificationsStatements{} _, err := db.Exec(notificationSchema) @@ -90,9 +96,19 @@ func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error) {&s.selectStmt, selectNotificationSQL}, {&s.selectCountStmt, selectNotificationCountSQL}, {&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL}, + {&s.cleanNotificationsStmt, cleanNotificationsSQL}, }.Prepare(db) } +func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error { + _, err := sqlutil.TxStmt(txn, s.cleanNotificationsStmt).ExecContext( + ctx, + time.Now().AddDate(0, 0, -1).UnixNano()/int64(time.Millisecond), // keep non-highlights for a day + time.Now().AddDate(0, -1, 0).UnixNano()/int64(time.Millisecond), // keep highlights for a month + ) + return err +} + // Insert inserts a notification into the database. func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error { roomID, tsMS := n.RoomID, n.TS diff --git a/userapi/storage/shared/storage.go b/userapi/storage/shared/storage.go index a58974b4..febf0322 100644 --- a/userapi/storage/shared/storage.go +++ b/userapi/storage/shared/storage.go @@ -705,6 +705,10 @@ func (d *Database) GetRoomNotificationCounts(ctx context.Context, localpart, roo return d.Notifications.SelectRoomCounts(ctx, nil, localpart, roomID) } +func (d *Database) DeleteOldNotifications(ctx context.Context) error { + return d.Notifications.Clean(ctx, nil) +} + func (d *Database) UpsertPusher( ctx context.Context, p api.Pusher, localpart string, ) error { diff --git a/userapi/storage/sqlite3/notifications_table.go b/userapi/storage/sqlite3/notifications_table.go index fcfb1aad..df826025 100644 --- a/userapi/storage/sqlite3/notifications_table.go +++ b/userapi/storage/sqlite3/notifications_table.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "encoding/json" + "time" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -28,12 +29,13 @@ import ( ) type notificationsStatements struct { - insertStmt *sql.Stmt - deleteUpToStmt *sql.Stmt - updateReadStmt *sql.Stmt - selectStmt *sql.Stmt - selectCountStmt *sql.Stmt - selectRoomCountsStmt *sql.Stmt + insertStmt *sql.Stmt + deleteUpToStmt *sql.Stmt + updateReadStmt *sql.Stmt + selectStmt *sql.Stmt + selectCountStmt *sql.Stmt + selectRoomCountsStmt *sql.Stmt + cleanNotificationsStmt *sql.Stmt } const notificationSchema = ` @@ -77,6 +79,10 @@ const selectRoomNotificationCountsSQL = "" + "SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " + "WHERE localpart = $1 AND room_id = $2 AND NOT read" +const cleanNotificationsSQL = "" + + "DELETE FROM userapi_notifications WHERE" + + " (highlight = FALSE AND ts_ms < $1) OR (highlight = TRUE AND ts_ms < $2)" + func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) { s := ¬ificationsStatements{} _, err := db.Exec(notificationSchema) @@ -90,9 +96,19 @@ func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) { {&s.selectStmt, selectNotificationSQL}, {&s.selectCountStmt, selectNotificationCountSQL}, {&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL}, + {&s.cleanNotificationsStmt, cleanNotificationsSQL}, }.Prepare(db) } +func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error { + _, err := sqlutil.TxStmt(txn, s.cleanNotificationsStmt).ExecContext( + ctx, + time.Now().AddDate(0, 0, -1).UnixNano()/int64(time.Millisecond), // keep non-highlights for a day + time.Now().AddDate(0, -1, 0).UnixNano()/int64(time.Millisecond), // keep highlights for a month + ) + return err +} + // Insert inserts a notification into the database. func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error { roomID, tsMS := n.RoomID, n.TS diff --git a/userapi/storage/tables/interface.go b/userapi/storage/tables/interface.go index 815e5119..99c907b8 100644 --- a/userapi/storage/tables/interface.go +++ b/userapi/storage/tables/interface.go @@ -103,6 +103,7 @@ type PusherTable interface { } type NotificationTable interface { + Clean(ctx context.Context, txn *sql.Tx) error Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error) diff --git a/userapi/userapi.go b/userapi/userapi.go index 8dbc095f..251a4eda 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -15,6 +15,8 @@ package userapi import ( + "time" + "github.com/gorilla/mux" "github.com/matrix-org/dendrite/internal/pushgateway" keyapi "github.com/matrix-org/dendrite/keyserver/api" @@ -79,5 +81,15 @@ func NewInternalAPI( logrus.WithError(err).Panic("failed to start user API streamed event consumer") } + var cleanOldNotifs func() + cleanOldNotifs = func() { + logrus.Infof("Cleaning old notifications") + if err := db.DeleteOldNotifications(base.Context()); err != nil { + logrus.WithError(err).Error("Failed to clean old notifications") + } + time.AfterFunc(time.Hour, cleanOldNotifs) + } + time.AfterFunc(time.Minute, cleanOldNotifs) + return userAPI }