2020-02-13 17:27:33 +00:00
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
2020-06-12 14:55:57 +01:00
"github.com/matrix-org/dendrite/internal/sqlutil"
2020-05-14 16:11:37 +01:00
"github.com/matrix-org/dendrite/syncapi/storage/tables"
2020-02-13 17:27:33 +00:00
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
const outputRoomEventsTopologySchema = `
-- Stores output room events received from the roomserver .
CREATE TABLE IF NOT EXISTS syncapi_output_room_events_topology (
event_id TEXT PRIMARY KEY ,
2020-05-01 11:01:34 +01:00
topological_position BIGINT NOT NULL ,
stream_position BIGINT NOT NULL ,
2020-02-13 17:27:33 +00:00
room_id TEXT NOT NULL ,
2020-05-01 11:01:34 +01:00
UNIQUE ( topological_position , room_id , stream_position )
2020-02-13 17:27:33 +00:00
) ;
-- The topological order will be used in events selection and ordering
2020-05-01 11:01:34 +01:00
-- CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON syncapi_output_room_events_topology ( topological_position , stream_position , room_id ) ;
2020-02-13 17:27:33 +00:00
`
const insertEventInTopologySQL = "" +
2020-05-01 11:01:34 +01:00
"INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id, stream_position)" +
" VALUES ($1, $2, $3, $4)" +
" ON CONFLICT DO NOTHING"
2020-02-13 17:27:33 +00:00
const selectEventIDsInRangeASCSQL = "" +
"SELECT event_id FROM syncapi_output_room_events_topology" +
2020-05-15 16:27:34 +01:00
" WHERE room_id = $1 AND (" +
2020-05-01 11:01:34 +01:00
"(topological_position > $2 AND topological_position < $3) OR" +
2022-03-18 10:40:01 +00:00
"(topological_position = $4 AND stream_position >= $5)" +
2020-05-15 16:27:34 +01:00
") ORDER BY topological_position ASC, stream_position ASC LIMIT $6"
2020-02-13 17:27:33 +00:00
const selectEventIDsInRangeDESCSQL = "" +
2020-05-01 11:01:34 +01:00
"SELECT event_id FROM syncapi_output_room_events_topology" +
2020-05-15 16:27:34 +01:00
" WHERE room_id = $1 AND (" +
2020-05-01 11:01:34 +01:00
"(topological_position > $2 AND topological_position < $3) OR" +
"(topological_position = $4 AND stream_position <= $5)" +
2020-05-15 16:27:34 +01:00
") ORDER BY topological_position DESC, stream_position DESC LIMIT $6"
2020-02-13 17:27:33 +00:00
const selectPositionInTopologySQL = "" +
2020-05-01 12:41:38 +01:00
"SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
2020-02-13 17:27:33 +00:00
" WHERE event_id = $1"
2022-03-18 10:40:01 +00:00
const selectStreamToTopologicalPositionAscSQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position >= $2 ORDER BY topological_position ASC LIMIT 1;"
const selectStreamToTopologicalPositionDescSQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;"
2020-09-15 11:17:46 +01:00
2020-02-13 17:27:33 +00:00
type outputRoomEventsTopologyStatements struct {
2022-03-18 10:40:01 +00:00
db * sql . DB
insertEventInTopologyStmt * sql . Stmt
selectEventIDsInRangeASCStmt * sql . Stmt
selectEventIDsInRangeDESCStmt * sql . Stmt
selectPositionInTopologyStmt * sql . Stmt
selectStreamToTopologicalPositionAscStmt * sql . Stmt
selectStreamToTopologicalPositionDescStmt * sql . Stmt
2020-02-13 17:27:33 +00:00
}
2020-05-14 16:11:37 +01:00
func NewSqliteTopologyTable ( db * sql . DB ) ( tables . Topology , error ) {
2020-07-21 15:48:21 +01:00
s := & outputRoomEventsTopologyStatements {
2020-08-21 10:42:08 +01:00
db : db ,
2020-07-21 15:48:21 +01:00
}
2020-05-14 16:11:37 +01:00
_ , err := db . Exec ( outputRoomEventsTopologySchema )
2020-02-13 17:27:33 +00:00
if err != nil {
2020-05-14 16:11:37 +01:00
return nil , err
2020-02-13 17:27:33 +00:00
}
if s . insertEventInTopologyStmt , err = db . Prepare ( insertEventInTopologySQL ) ; err != nil {
2020-05-14 16:11:37 +01:00
return nil , err
2020-02-13 17:27:33 +00:00
}
if s . selectEventIDsInRangeASCStmt , err = db . Prepare ( selectEventIDsInRangeASCSQL ) ; err != nil {
2020-05-14 16:11:37 +01:00
return nil , err
2020-02-13 17:27:33 +00:00
}
if s . selectEventIDsInRangeDESCStmt , err = db . Prepare ( selectEventIDsInRangeDESCSQL ) ; err != nil {
2020-05-14 16:11:37 +01:00
return nil , err
2020-02-13 17:27:33 +00:00
}
if s . selectPositionInTopologyStmt , err = db . Prepare ( selectPositionInTopologySQL ) ; err != nil {
2020-05-14 16:11:37 +01:00
return nil , err
2020-02-13 17:27:33 +00:00
}
2022-03-18 10:40:01 +00:00
if s . selectStreamToTopologicalPositionAscStmt , err = db . Prepare ( selectStreamToTopologicalPositionAscSQL ) ; err != nil {
return nil , err
}
if s . selectStreamToTopologicalPositionDescStmt , err = db . Prepare ( selectStreamToTopologicalPositionDescSQL ) ; err != nil {
2020-09-15 11:17:46 +01:00
return nil , err
}
2020-05-14 16:11:37 +01:00
return s , nil
2020-02-13 17:27:33 +00:00
}
// insertEventInTopology inserts the given event in the room's topology, based
// on the event's depth.
2020-05-14 16:11:37 +01:00
func ( s * outputRoomEventsTopologyStatements ) InsertEventInTopology (
2020-05-01 11:01:34 +01:00
ctx context . Context , txn * sql . Tx , event * gomatrixserverlib . HeaderedEvent , pos types . StreamPosition ,
2021-01-20 20:43:20 +00:00
) ( types . StreamPosition , error ) {
_ , err := sqlutil . TxStmt ( txn , s . insertEventInTopologyStmt ) . ExecContext (
2020-08-21 10:42:08 +01:00
ctx , event . EventID ( ) , event . Depth ( ) , event . RoomID ( ) , pos ,
)
2021-01-20 20:43:20 +00:00
return types . StreamPosition ( event . Depth ( ) ) , err
2020-02-13 17:27:33 +00:00
}
2020-05-14 16:11:37 +01:00
func ( s * outputRoomEventsTopologyStatements ) SelectEventIDsInRange (
2020-02-13 17:27:33 +00:00
ctx context . Context , txn * sql . Tx , roomID string ,
2020-05-14 17:30:16 +01:00
minDepth , maxDepth , maxStreamPos types . StreamPosition ,
2020-02-13 17:27:33 +00:00
limit int , chronologicalOrder bool ,
) ( eventIDs [ ] string , err error ) {
// Decide on the selection's order according to whether chronological order
// is requested or not.
var stmt * sql . Stmt
if chronologicalOrder {
2020-06-12 14:55:57 +01:00
stmt = sqlutil . TxStmt ( txn , s . selectEventIDsInRangeASCStmt )
2020-02-13 17:27:33 +00:00
} else {
2020-06-12 14:55:57 +01:00
stmt = sqlutil . TxStmt ( txn , s . selectEventIDsInRangeDESCStmt )
2020-02-13 17:27:33 +00:00
}
// Query the event IDs.
2020-05-14 17:30:16 +01:00
rows , err := stmt . QueryContext ( ctx , roomID , minDepth , maxDepth , maxDepth , maxStreamPos , limit )
2020-02-13 17:27:33 +00:00
if err == sql . ErrNoRows {
// If no event matched the request, return an empty slice.
return [ ] string { } , nil
} else if err != nil {
return
}
// Return the IDs.
var eventID string
for rows . Next ( ) {
if err = rows . Scan ( & eventID ) ; err != nil {
return
}
eventIDs = append ( eventIDs , eventID )
}
return
}
// selectPositionInTopology returns the position of a given event in the
// topology of the room it belongs to.
2020-05-14 16:11:37 +01:00
func ( s * outputRoomEventsTopologyStatements ) SelectPositionInTopology (
2020-02-13 17:27:33 +00:00
ctx context . Context , txn * sql . Tx , eventID string ,
2020-05-01 12:41:38 +01:00
) ( pos types . StreamPosition , spos types . StreamPosition , err error ) {
2020-06-12 14:55:57 +01:00
stmt := sqlutil . TxStmt ( txn , s . selectPositionInTopologyStmt )
2020-05-01 12:41:38 +01:00
err = stmt . QueryRowContext ( ctx , eventID ) . Scan ( & pos , & spos )
2020-02-13 17:27:33 +00:00
return
}
2022-03-18 10:40:01 +00:00
// SelectStreamToTopologicalPosition returns the closest position of a given event
// in the topology of the room it belongs to from the given stream position.
func ( s * outputRoomEventsTopologyStatements ) SelectStreamToTopologicalPosition (
ctx context . Context , txn * sql . Tx , roomID string , streamPos types . StreamPosition , backwardOrdering bool ,
) ( topoPos types . StreamPosition , err error ) {
if backwardOrdering {
2022-09-28 10:18:03 +01:00
err = sqlutil . TxStmt ( txn , s . selectStreamToTopologicalPositionDescStmt ) . QueryRowContext ( ctx , roomID , streamPos ) . Scan ( & topoPos )
2022-03-18 10:40:01 +00:00
} else {
2022-09-28 10:18:03 +01:00
err = sqlutil . TxStmt ( txn , s . selectStreamToTopologicalPositionAscStmt ) . QueryRowContext ( ctx , roomID , streamPos ) . Scan ( & topoPos )
2022-03-18 10:40:01 +00:00
}
return
}