2020-07-28 17:38:30 +01:00
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
2022-09-09 13:14:52 +01:00
"errors"
2020-07-28 17:38:30 +01:00
2022-09-09 13:14:52 +01:00
"github.com/sirupsen/logrus"
2022-08-08 09:18:57 +01:00
2020-07-28 17:38:30 +01:00
"github.com/matrix-org/dendrite/internal"
2022-07-25 10:39:22 +01:00
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/storage/postgres/deltas"
2020-07-28 17:38:30 +01:00
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
var keyChangesSchema = `
-- Stores key change information about users . Used to determine when to send updated device lists to clients .
2022-01-21 09:56:06 +00:00
CREATE SEQUENCE IF NOT EXISTS keyserver_key_changes_seq ;
2020-07-28 17:38:30 +01:00
CREATE TABLE IF NOT EXISTS keyserver_key_changes (
2022-01-21 09:56:06 +00:00
change_id BIGINT PRIMARY KEY DEFAULT nextval ( ' keyserver_key_changes_seq ' ) ,
2020-07-28 17:38:30 +01:00
user_id TEXT NOT NULL ,
2022-01-21 09:56:06 +00:00
CONSTRAINT keyserver_key_changes_unique_per_user UNIQUE ( user_id )
2020-07-28 17:38:30 +01:00
) ;
`
2022-01-21 09:56:06 +00:00
// Replace based on user ID. We don't care how many times the user's keys have changed, only that they
// have changed, hence we can just keep bumping the change ID for this user.
2020-07-28 17:38:30 +01:00
const upsertKeyChangeSQL = "" +
2022-01-21 09:56:06 +00:00
"INSERT INTO keyserver_key_changes (user_id)" +
" VALUES ($1)" +
" ON CONFLICT ON CONSTRAINT keyserver_key_changes_unique_per_user" +
" DO UPDATE SET change_id = nextval('keyserver_key_changes_seq')" +
" RETURNING change_id"
2020-07-28 17:38:30 +01:00
const selectKeyChangesSQL = "" +
2022-01-21 09:56:06 +00:00
"SELECT user_id, change_id FROM keyserver_key_changes WHERE change_id > $1 AND change_id <= $2"
2020-07-28 17:38:30 +01:00
type keyChangesStatements struct {
db * sql . DB
upsertKeyChangeStmt * sql . Stmt
selectKeyChangesStmt * sql . Stmt
}
func NewPostgresKeyChangesTable ( db * sql . DB ) ( tables . KeyChanges , error ) {
s := & keyChangesStatements {
db : db ,
}
_ , err := db . Exec ( keyChangesSchema )
2022-07-25 10:39:22 +01:00
if err != nil {
return s , err
}
2022-09-09 13:14:52 +01:00
if err = executeMigration ( context . Background ( ) , db ) ; err != nil {
return nil , err
}
return s , nil
}
func executeMigration ( ctx context . Context , db * sql . DB ) error {
2022-07-25 10:39:22 +01:00
// TODO: Remove when we are sure we are not having goose artefacts in the db
// This forces an error, which indicates the migration is already applied, since the
// column partition was removed from the table
2022-09-09 13:14:52 +01:00
migrationName := "keyserver: refactor key changes"
var cName string
err := db . QueryRowContext ( ctx , "select column_name from information_schema.columns where table_name = 'keyserver_key_changes' AND column_name = 'partition'" ) . Scan ( & cName )
if err != nil {
if errors . Is ( err , sql . ErrNoRows ) { // migration was already executed, as the column was removed
if err = sqlutil . InsertMigration ( ctx , db , migrationName ) ; err != nil {
// not a fatal error, log and continue
logrus . WithError ( err ) . Warnf ( "unable to manually insert migration '%s'" , migrationName )
2022-08-08 09:18:57 +01:00
}
2022-09-09 13:14:52 +01:00
return nil
2022-08-08 09:18:57 +01:00
}
2022-09-09 13:14:52 +01:00
return err
2022-07-25 10:39:22 +01:00
}
2022-09-09 13:14:52 +01:00
m := sqlutil . NewMigrator ( db )
m . AddMigrations ( sqlutil . Migration {
Version : migrationName ,
Up : deltas . UpRefactorKeyChanges ,
} )
return m . Up ( ctx )
2022-01-21 09:56:06 +00:00
}
func ( s * keyChangesStatements ) Prepare ( ) ( err error ) {
if s . upsertKeyChangeStmt , err = s . db . Prepare ( upsertKeyChangeSQL ) ; err != nil {
return err
2020-07-28 17:38:30 +01:00
}
2022-01-21 09:56:06 +00:00
if s . selectKeyChangesStmt , err = s . db . Prepare ( selectKeyChangesSQL ) ; err != nil {
return err
2020-07-28 17:38:30 +01:00
}
2022-01-21 09:56:06 +00:00
return nil
2020-07-28 17:38:30 +01:00
}
2022-01-21 09:56:06 +00:00
func ( s * keyChangesStatements ) InsertKeyChange ( ctx context . Context , userID string ) ( changeID int64 , err error ) {
err = s . upsertKeyChangeStmt . QueryRowContext ( ctx , userID ) . Scan ( & changeID )
return
2020-07-28 17:38:30 +01:00
}
func ( s * keyChangesStatements ) SelectKeyChanges (
2022-01-21 09:56:06 +00:00
ctx context . Context , fromOffset , toOffset int64 ,
2020-07-28 17:38:30 +01:00
) ( userIDs [ ] string , latestOffset int64 , err error ) {
2020-12-18 11:11:21 +00:00
latestOffset = fromOffset
2022-01-21 09:56:06 +00:00
rows , err := s . selectKeyChangesStmt . QueryContext ( ctx , fromOffset , toOffset )
2020-07-28 17:38:30 +01:00
if err != nil {
return nil , 0 , err
}
defer internal . CloseAndLogIfError ( ctx , rows , "selectKeyChangesStmt: rows.close() failed" )
for rows . Next ( ) {
var userID string
var offset int64
if err := rows . Scan ( & userID , & offset ) ; err != nil {
return nil , 0 , err
}
if offset > latestOffset {
latestOffset = offset
}
userIDs = append ( userIDs , userID )
}
return
}