2020-01-23 17:51:10 +00:00
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
2020-05-21 14:40:13 +01:00
"github.com/matrix-org/dendrite/internal"
2020-09-15 11:17:46 +01:00
"github.com/matrix-org/dendrite/internal/sqlutil"
2023-04-27 12:54:20 +01:00
rstypes "github.com/matrix-org/dendrite/roomserver/types"
2020-05-14 16:11:37 +01:00
"github.com/matrix-org/dendrite/syncapi/storage/tables"
2020-01-23 17:51:10 +00:00
"github.com/matrix-org/dendrite/syncapi/types"
)
const outputRoomEventsTopologySchema = `
-- Stores output room events received from the roomserver .
CREATE TABLE IF NOT EXISTS syncapi_output_room_events_topology (
-- The event ID for the event .
event_id TEXT PRIMARY KEY ,
-- The place of the event in the room ' s topology . This can usually be determined
-- from the event ' s depth .
topological_position BIGINT NOT NULL ,
2020-05-01 11:01:34 +01:00
stream_position BIGINT NOT NULL ,
2020-01-23 17:51:10 +00:00
-- The ' room_id ' key for the event .
room_id TEXT NOT NULL
) ;
-- The topological order will be used in events selection and ordering
2020-05-01 11:01:34 +01:00
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON syncapi_output_room_events_topology ( topological_position , stream_position , room_id ) ;
2020-01-23 17:51:10 +00:00
`
const insertEventInTopologySQL = "" +
2020-05-01 11:01:34 +01:00
"INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id, stream_position)" +
" VALUES ($1, $2, $3, $4)" +
2021-01-20 20:43:20 +00:00
" ON CONFLICT (topological_position, stream_position, room_id) DO UPDATE SET event_id = $1" +
" RETURNING topological_position"
2020-01-23 17:51:10 +00:00
const selectEventIDsInRangeASCSQL = "" +
2023-07-13 13:18:37 +01:00
"SELECT event_id, topological_position, stream_position FROM syncapi_output_room_events_topology" +
2020-05-15 16:27:34 +01:00
" WHERE room_id = $1 AND (" +
2020-05-01 11:01:34 +01:00
"(topological_position > $2 AND topological_position < $3) OR" +
2022-03-18 10:40:01 +00:00
"(topological_position = $4 AND stream_position >= $5)" +
2020-05-15 16:27:34 +01:00
") ORDER BY topological_position ASC, stream_position ASC LIMIT $6"
2020-01-23 17:51:10 +00:00
const selectEventIDsInRangeDESCSQL = "" +
2023-07-13 13:18:37 +01:00
"SELECT event_id, topological_position, stream_position FROM syncapi_output_room_events_topology" +
2020-05-15 16:27:34 +01:00
" WHERE room_id = $1 AND (" +
2020-05-01 11:01:34 +01:00
"(topological_position > $2 AND topological_position < $3) OR" +
"(topological_position = $4 AND stream_position <= $5)" +
2020-05-15 16:27:34 +01:00
") ORDER BY topological_position DESC, stream_position DESC LIMIT $6"
2020-01-23 17:51:10 +00:00
const selectPositionInTopologySQL = "" +
2020-05-01 12:41:38 +01:00
"SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
2020-01-23 17:51:10 +00:00
" WHERE event_id = $1"
2022-03-18 10:40:01 +00:00
const selectStreamToTopologicalPositionAscSQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position >= $2 ORDER BY topological_position ASC LIMIT 1;"
const selectStreamToTopologicalPositionDescSQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;"
2023-01-19 20:02:32 +00:00
const purgeEventsTopologySQL = "" +
"DELETE FROM syncapi_output_room_events_topology WHERE room_id = $1"
2020-01-23 17:51:10 +00:00
type outputRoomEventsTopologyStatements struct {
2022-03-18 10:40:01 +00:00
insertEventInTopologyStmt * sql . Stmt
selectEventIDsInRangeASCStmt * sql . Stmt
selectEventIDsInRangeDESCStmt * sql . Stmt
selectPositionInTopologyStmt * sql . Stmt
selectStreamToTopologicalPositionAscStmt * sql . Stmt
selectStreamToTopologicalPositionDescStmt * sql . Stmt
2023-01-19 20:02:32 +00:00
purgeEventsTopologyStmt * sql . Stmt
2020-01-23 17:51:10 +00:00
}
2020-05-14 16:11:37 +01:00
func NewPostgresTopologyTable ( db * sql . DB ) ( tables . Topology , error ) {
s := & outputRoomEventsTopologyStatements { }
_ , err := db . Exec ( outputRoomEventsTopologySchema )
2020-01-23 17:51:10 +00:00
if err != nil {
2020-05-14 16:11:37 +01:00
return nil , err
2020-01-23 17:51:10 +00:00
}
2023-01-19 20:02:32 +00:00
return s , sqlutil . StatementList {
{ & s . insertEventInTopologyStmt , insertEventInTopologySQL } ,
{ & s . selectEventIDsInRangeASCStmt , selectEventIDsInRangeASCSQL } ,
{ & s . selectEventIDsInRangeDESCStmt , selectEventIDsInRangeDESCSQL } ,
{ & s . selectPositionInTopologyStmt , selectPositionInTopologySQL } ,
{ & s . selectStreamToTopologicalPositionAscStmt , selectStreamToTopologicalPositionAscSQL } ,
{ & s . selectStreamToTopologicalPositionDescStmt , selectStreamToTopologicalPositionDescSQL } ,
{ & s . purgeEventsTopologyStmt , purgeEventsTopologySQL } ,
} . Prepare ( db )
2020-01-23 17:51:10 +00:00
}
2020-05-14 16:11:37 +01:00
// InsertEventInTopology inserts the given event in the room's topology, based
2020-01-23 17:51:10 +00:00
// on the event's depth.
2020-05-14 16:11:37 +01:00
func ( s * outputRoomEventsTopologyStatements ) InsertEventInTopology (
2023-04-27 12:54:20 +01:00
ctx context . Context , txn * sql . Tx , event * rstypes . HeaderedEvent , pos types . StreamPosition ,
2021-01-20 20:43:20 +00:00
) ( topoPos types . StreamPosition , err error ) {
err = sqlutil . TxStmt ( txn , s . insertEventInTopologyStmt ) . QueryRowContext (
2020-05-01 11:01:34 +01:00
ctx , event . EventID ( ) , event . Depth ( ) , event . RoomID ( ) , pos ,
2021-01-20 20:43:20 +00:00
) . Scan ( & topoPos )
2020-01-23 17:51:10 +00:00
return
}
2020-05-14 16:11:37 +01:00
// SelectEventIDsInRange selects the IDs of events which positions are within a
2023-07-13 13:18:37 +01:00
// given range in a given room's topological order. Returns the start/end topological tokens for
// the returned eventIDs.
2020-01-23 17:51:10 +00:00
// Returns an empty slice if no events match the given range.
2020-05-14 16:11:37 +01:00
func ( s * outputRoomEventsTopologyStatements ) SelectEventIDsInRange (
2020-05-14 17:30:16 +01:00
ctx context . Context , txn * sql . Tx , roomID string , minDepth , maxDepth , maxStreamPos types . StreamPosition ,
2020-01-23 17:51:10 +00:00
limit int , chronologicalOrder bool ,
2023-07-13 13:18:37 +01:00
) ( eventIDs [ ] string , start , end types . TopologyToken , err error ) {
2020-01-23 17:51:10 +00:00
// Decide on the selection's order according to whether chronological order
// is requested or not.
var stmt * sql . Stmt
if chronologicalOrder {
2022-04-08 17:53:24 +01:00
stmt = sqlutil . TxStmt ( txn , s . selectEventIDsInRangeASCStmt )
2020-01-23 17:51:10 +00:00
} else {
2022-04-08 17:53:24 +01:00
stmt = sqlutil . TxStmt ( txn , s . selectEventIDsInRangeDESCStmt )
2020-01-23 17:51:10 +00:00
}
// Query the event IDs.
2020-05-14 17:30:16 +01:00
rows , err := stmt . QueryContext ( ctx , roomID , minDepth , maxDepth , maxDepth , maxStreamPos , limit )
2020-01-23 17:51:10 +00:00
if err == sql . ErrNoRows {
// If no event matched the request, return an empty slice.
2023-07-13 13:18:37 +01:00
return [ ] string { } , start , end , nil
2020-01-23 17:51:10 +00:00
} else if err != nil {
return
}
2020-05-21 14:40:13 +01:00
defer internal . CloseAndLogIfError ( ctx , rows , "selectEventIDsInRange: rows.close() failed" )
2020-01-23 17:51:10 +00:00
// Return the IDs.
var eventID string
2023-07-13 13:18:37 +01:00
var token types . TopologyToken
var tokens [ ] types . TopologyToken
2020-01-23 17:51:10 +00:00
for rows . Next ( ) {
2023-07-13 13:18:37 +01:00
if err = rows . Scan ( & eventID , & token . Depth , & token . PDUPosition ) ; err != nil {
2020-01-23 17:51:10 +00:00
return
}
eventIDs = append ( eventIDs , eventID )
2023-07-13 13:18:37 +01:00
tokens = append ( tokens , token )
2020-01-23 17:51:10 +00:00
}
2023-07-13 13:18:37 +01:00
// The values are already ordered by SQL, so we can use them as is.
if len ( tokens ) > 0 {
start = tokens [ 0 ]
end = tokens [ len ( tokens ) - 1 ]
}
return eventIDs , start , end , rows . Err ( )
2020-01-23 17:51:10 +00:00
}
2020-05-14 16:11:37 +01:00
// SelectPositionInTopology returns the position of a given event in the
2020-01-23 17:51:10 +00:00
// topology of the room it belongs to.
2020-05-14 16:11:37 +01:00
func ( s * outputRoomEventsTopologyStatements ) SelectPositionInTopology (
ctx context . Context , txn * sql . Tx , eventID string ,
2020-05-01 12:41:38 +01:00
) ( pos , spos types . StreamPosition , err error ) {
2022-09-28 10:18:03 +01:00
err = sqlutil . TxStmt ( txn , s . selectPositionInTopologyStmt ) . QueryRowContext ( ctx , eventID ) . Scan ( & pos , & spos )
2020-01-23 17:51:10 +00:00
return
}
2022-03-18 10:40:01 +00:00
// SelectStreamToTopologicalPosition returns the closest position of a given event
// in the topology of the room it belongs to from the given stream position.
func ( s * outputRoomEventsTopologyStatements ) SelectStreamToTopologicalPosition (
ctx context . Context , txn * sql . Tx , roomID string , streamPos types . StreamPosition , backwardOrdering bool ,
) ( topoPos types . StreamPosition , err error ) {
if backwardOrdering {
2022-09-28 10:18:03 +01:00
err = sqlutil . TxStmt ( txn , s . selectStreamToTopologicalPositionDescStmt ) . QueryRowContext ( ctx , roomID , streamPos ) . Scan ( & topoPos )
2022-03-18 10:40:01 +00:00
} else {
2022-09-28 10:18:03 +01:00
err = sqlutil . TxStmt ( txn , s . selectStreamToTopologicalPositionAscStmt ) . QueryRowContext ( ctx , roomID , streamPos ) . Scan ( & topoPos )
2022-03-18 10:40:01 +00:00
}
return
}
2023-01-19 20:02:32 +00:00
func ( s * outputRoomEventsTopologyStatements ) PurgeEventsTopology (
ctx context . Context , txn * sql . Tx , roomID string ,
) error {
_ , err := sqlutil . TxStmt ( txn , s . purgeEventsTopologyStmt ) . ExecContext ( ctx , roomID )
return err
}