mirror of
https://github.com/1f349/dendrite.git
synced 2025-01-22 07:16:38 +00:00
sql/backwards_extremities: Shift to table format and share code (#985)
* sql/backwards_extremities: Shift to table format and share code This is an initial cut to reduce boilerplate at the storage layer. It removes the need for 2x `_table.go` files, one for each DB engine, replacing it with a single struct which has an interface which implements the raw SQL statements. The actual impl sits alongside the interface declaration which is generally regarded as best practice (though no canonical sources). Especially in this case where the impl is tiny (functions returning strings) and relies heavily on the function signatures of the table struct (for parameters), having the context in the same file is useful. * Remove _table redundancy
This commit is contained in:
parent
5071ecb8b3
commit
35b7cbd5d8
@ -1,124 +0,0 @@
|
||||
// Copyright 2018 New Vector Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
)
|
||||
|
||||
// The purpose of this table is to keep track of backwards extremities for a room.
|
||||
// Backwards extremities are the earliest (DAG-wise) known events which we have
|
||||
// the entire event JSON. These event IDs are used in federation requests to fetch
|
||||
// even earlier events.
|
||||
//
|
||||
// We persist the previous event IDs as well, one per row, so when we do fetch even
|
||||
// earlier events we can simply delete rows which referenced it. Consider the graph:
|
||||
// A
|
||||
// | Event C has 1 prev_event ID: A.
|
||||
// B C
|
||||
// |___| Event D has 2 prev_event IDs: B and C.
|
||||
// |
|
||||
// D
|
||||
// The earliest known event we have is D, so this table has 2 rows.
|
||||
// A backfill request gives us C but not B. We delete rows where prev_event=C. This
|
||||
// still means that D is a backwards extremity as we do not have event B. However, event
|
||||
// C is *also* a backwards extremity at this point as we do not have event A. Later,
|
||||
// when we fetch event B, we delete rows where prev_event=B. This then removes D as
|
||||
// a backwards extremity because there are no more rows with event_id=B.
|
||||
const backwardExtremitiesSchema = `
|
||||
-- Stores output room events received from the roomserver.
|
||||
CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
|
||||
-- The 'room_id' key for the event.
|
||||
room_id TEXT NOT NULL,
|
||||
-- The event ID for the last known event. This is the backwards extremity.
|
||||
event_id TEXT NOT NULL,
|
||||
-- The prev_events for the last known event. This is used to update extremities.
|
||||
prev_event_id TEXT NOT NULL,
|
||||
|
||||
PRIMARY KEY(room_id, event_id, prev_event_id)
|
||||
);
|
||||
`
|
||||
|
||||
const insertBackwardExtremitySQL = "" +
|
||||
"INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" +
|
||||
" VALUES ($1, $2, $3)" +
|
||||
" ON CONFLICT DO NOTHING"
|
||||
|
||||
const selectBackwardExtremitiesForRoomSQL = "" +
|
||||
"SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
|
||||
|
||||
const deleteBackwardExtremitySQL = "" +
|
||||
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
|
||||
|
||||
type backwardExtremitiesStatements struct {
|
||||
insertBackwardExtremityStmt *sql.Stmt
|
||||
selectBackwardExtremitiesForRoomStmt *sql.Stmt
|
||||
deleteBackwardExtremityStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(backwardExtremitiesSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if s.insertBackwardExtremityStmt, err = db.Prepare(insertBackwardExtremitySQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *backwardExtremitiesStatements) insertsBackwardExtremity(
|
||||
ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string,
|
||||
) (err error) {
|
||||
_, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
|
||||
ctx context.Context, roomID string,
|
||||
) (eventIDs []string, err error) {
|
||||
rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed")
|
||||
|
||||
for rows.Next() {
|
||||
var eID string
|
||||
if err = rows.Scan(&eID); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
eventIDs = append(eventIDs, eID)
|
||||
}
|
||||
|
||||
return eventIDs, rows.Err()
|
||||
}
|
||||
|
||||
func (s *backwardExtremitiesStatements) deleteBackwardExtremity(
|
||||
ctx context.Context, txn *sql.Tx, roomID, knownEventID string,
|
||||
) (err error) {
|
||||
_, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
|
||||
return
|
||||
}
|
@ -32,6 +32,7 @@ import (
|
||||
_ "github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
@ -56,7 +57,7 @@ type SyncServerDatasource struct {
|
||||
invites inviteEventsStatements
|
||||
eduCache *cache.EDUCache
|
||||
topology outputRoomEventsTopologyStatements
|
||||
backwardExtremities backwardExtremitiesStatements
|
||||
backwardExtremities tables.BackwardsExtremities
|
||||
}
|
||||
|
||||
// NewSyncServerDatasource creates a new sync server database
|
||||
@ -75,16 +76,17 @@ func NewSyncServerDatasource(dbDataSourceName string) (*SyncServerDatasource, er
|
||||
if err = d.events.prepare(d.db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := d.roomstate.prepare(d.db); err != nil {
|
||||
if err = d.roomstate.prepare(d.db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := d.invites.prepare(d.db); err != nil {
|
||||
if err = d.invites.prepare(d.db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := d.topology.prepare(d.db); err != nil {
|
||||
if err = d.topology.prepare(d.db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := d.backwardExtremities.prepare(d.db); err != nil {
|
||||
d.backwardExtremities, err = tables.NewBackwardsExtremities(d.db, &tables.PostgresBackwardsExtremitiesStatements{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
d.eduCache = cache.New()
|
||||
@ -116,7 +118,7 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([
|
||||
// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
|
||||
// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.
|
||||
func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
|
||||
if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
|
||||
if err := d.backwardExtremities.DeleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -137,7 +139,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx
|
||||
|
||||
// If the event is missing, consider it a backward extremity.
|
||||
if !found {
|
||||
if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil {
|
||||
if err = d.backwardExtremities.InsertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -314,7 +316,7 @@ func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.Paginati
|
||||
func (d *SyncServerDatasource) BackwardExtremitiesForRoom(
|
||||
ctx context.Context, roomID string,
|
||||
) (backwardExtremities []string, err error) {
|
||||
return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, roomID)
|
||||
return d.backwardExtremities.SelectBackwardExtremitiesForRoom(ctx, roomID)
|
||||
}
|
||||
|
||||
// MaxTopologicalPosition returns the highest topological position for a given
|
||||
|
@ -1,124 +0,0 @@
|
||||
// Copyright 2018 New Vector Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
)
|
||||
|
||||
// The purpose of this table is to keep track of backwards extremities for a room.
|
||||
// Backwards extremities are the earliest (DAG-wise) known events which we have
|
||||
// the entire event JSON. These event IDs are used in federation requests to fetch
|
||||
// even earlier events.
|
||||
//
|
||||
// We persist the previous event IDs as well, one per row, so when we do fetch even
|
||||
// earlier events we can simply delete rows which referenced it. Consider the graph:
|
||||
// A
|
||||
// | Event C has 1 prev_event ID: A.
|
||||
// B C
|
||||
// |___| Event D has 2 prev_event IDs: B and C.
|
||||
// |
|
||||
// D
|
||||
// The earliest known event we have is D, so this table has 2 rows.
|
||||
// A backfill request gives us C but not B. We delete rows where prev_event=C. This
|
||||
// still means that D is a backwards extremity as we do not have event B. However, event
|
||||
// C is *also* a backwards extremity at this point as we do not have event A. Later,
|
||||
// when we fetch event B, we delete rows where prev_event=B. This then removes D as
|
||||
// a backwards extremity because there are no more rows with event_id=B.
|
||||
const backwardExtremitiesSchema = `
|
||||
-- Stores output room events received from the roomserver.
|
||||
CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
|
||||
-- The 'room_id' key for the event.
|
||||
room_id TEXT NOT NULL,
|
||||
-- The event ID for the last known event. This is the backwards extremity.
|
||||
event_id TEXT NOT NULL,
|
||||
-- The prev_events for the last known event. This is used to update extremities.
|
||||
prev_event_id TEXT NOT NULL,
|
||||
|
||||
PRIMARY KEY(room_id, event_id, prev_event_id)
|
||||
);
|
||||
`
|
||||
|
||||
const insertBackwardExtremitySQL = "" +
|
||||
"INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" +
|
||||
" VALUES ($1, $2, $3)" +
|
||||
" ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING"
|
||||
|
||||
const selectBackwardExtremitiesForRoomSQL = "" +
|
||||
"SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
|
||||
|
||||
const deleteBackwardExtremitySQL = "" +
|
||||
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
|
||||
|
||||
type backwardExtremitiesStatements struct {
|
||||
insertBackwardExtremityStmt *sql.Stmt
|
||||
selectBackwardExtremitiesForRoomStmt *sql.Stmt
|
||||
deleteBackwardExtremityStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(backwardExtremitiesSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if s.insertBackwardExtremityStmt, err = db.Prepare(insertBackwardExtremitySQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *backwardExtremitiesStatements) insertsBackwardExtremity(
|
||||
ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string,
|
||||
) (err error) {
|
||||
_, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
|
||||
ctx context.Context, roomID string,
|
||||
) (eventIDs []string, err error) {
|
||||
rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed")
|
||||
|
||||
for rows.Next() {
|
||||
var eID string
|
||||
if err = rows.Scan(&eID); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
eventIDs = append(eventIDs, eID)
|
||||
}
|
||||
|
||||
return eventIDs, rows.Err()
|
||||
}
|
||||
|
||||
func (s *backwardExtremitiesStatements) deleteBackwardExtremity(
|
||||
ctx context.Context, txn *sql.Tx, roomID, knownEventID string,
|
||||
) (err error) {
|
||||
_, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
|
||||
return
|
||||
}
|
@ -35,6 +35,7 @@ import (
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
@ -60,7 +61,7 @@ type SyncServerDatasource struct {
|
||||
invites inviteEventsStatements
|
||||
eduCache *cache.EDUCache
|
||||
topology outputRoomEventsTopologyStatements
|
||||
backwardExtremities backwardExtremitiesStatements
|
||||
backwardExtremities tables.BackwardsExtremities
|
||||
}
|
||||
|
||||
// NewSyncServerDatasource creates a new sync server database
|
||||
@ -102,16 +103,17 @@ func (d *SyncServerDatasource) prepare() (err error) {
|
||||
if err = d.events.prepare(d.db, &d.streamID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := d.roomstate.prepare(d.db, &d.streamID); err != nil {
|
||||
if err = d.roomstate.prepare(d.db, &d.streamID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := d.invites.prepare(d.db, &d.streamID); err != nil {
|
||||
if err = d.invites.prepare(d.db, &d.streamID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := d.topology.prepare(d.db); err != nil {
|
||||
if err = d.topology.prepare(d.db); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := d.backwardExtremities.prepare(d.db); err != nil {
|
||||
d.backwardExtremities, err = tables.NewBackwardsExtremities(d.db, &tables.SqliteBackwardsExtremitiesStatements{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@ -142,7 +144,7 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([
|
||||
// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
|
||||
// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.
|
||||
func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
|
||||
if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
|
||||
if err := d.backwardExtremities.DeleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -163,7 +165,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx
|
||||
|
||||
// If the event is missing, consider it a backward extremity.
|
||||
if !found {
|
||||
if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil {
|
||||
if err = d.backwardExtremities.InsertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -344,7 +346,7 @@ func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.Pagi
|
||||
func (d *SyncServerDatasource) BackwardExtremitiesForRoom(
|
||||
ctx context.Context, roomID string,
|
||||
) (backwardExtremities []string, err error) {
|
||||
return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, roomID)
|
||||
return d.backwardExtremities.SelectBackwardExtremitiesForRoom(ctx, roomID)
|
||||
}
|
||||
|
||||
// MaxTopologicalPosition returns the highest topological position for a given
|
||||
|
175
syncapi/storage/tables/backward_extremities.go
Normal file
175
syncapi/storage/tables/backward_extremities.go
Normal file
@ -0,0 +1,175 @@
|
||||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tables
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
)
|
||||
|
||||
// BackwardsExtremitiesStatements contains the SQL statements to implement.
|
||||
// See BackwardsExtremities to see the parameter and response types.
|
||||
type BackwardsExtremitiesStatements interface {
|
||||
Schema() string
|
||||
InsertBackwardExtremity() string
|
||||
SelectBackwardExtremitiesForRoom() string
|
||||
DeleteBackwardExtremity() string
|
||||
}
|
||||
|
||||
type PostgresBackwardsExtremitiesStatements struct{}
|
||||
|
||||
func (s *PostgresBackwardsExtremitiesStatements) Schema() string {
|
||||
return `-- Stores output room events received from the roomserver.
|
||||
CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
|
||||
-- The 'room_id' key for the event.
|
||||
room_id TEXT NOT NULL,
|
||||
-- The event ID for the last known event. This is the backwards extremity.
|
||||
event_id TEXT NOT NULL,
|
||||
-- The prev_events for the last known event. This is used to update extremities.
|
||||
prev_event_id TEXT NOT NULL,
|
||||
|
||||
PRIMARY KEY(room_id, event_id, prev_event_id)
|
||||
);
|
||||
`
|
||||
}
|
||||
func (s *PostgresBackwardsExtremitiesStatements) InsertBackwardExtremity() string {
|
||||
return "" +
|
||||
"INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" +
|
||||
" VALUES ($1, $2, $3)" +
|
||||
" ON CONFLICT DO NOTHING"
|
||||
}
|
||||
func (s *PostgresBackwardsExtremitiesStatements) SelectBackwardExtremitiesForRoom() string {
|
||||
return "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
|
||||
}
|
||||
func (s *PostgresBackwardsExtremitiesStatements) DeleteBackwardExtremity() string {
|
||||
return "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
|
||||
}
|
||||
|
||||
type SqliteBackwardsExtremitiesStatements struct{}
|
||||
|
||||
func (s *SqliteBackwardsExtremitiesStatements) Schema() string {
|
||||
return `-- Stores output room events received from the roomserver.
|
||||
CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
|
||||
-- The 'room_id' key for the event.
|
||||
room_id TEXT NOT NULL,
|
||||
-- The event ID for the last known event. This is the backwards extremity.
|
||||
event_id TEXT NOT NULL,
|
||||
-- The prev_events for the last known event. This is used to update extremities.
|
||||
prev_event_id TEXT NOT NULL,
|
||||
|
||||
PRIMARY KEY(room_id, event_id, prev_event_id)
|
||||
);
|
||||
`
|
||||
}
|
||||
|
||||
func (s *SqliteBackwardsExtremitiesStatements) InsertBackwardExtremity() string {
|
||||
return "" +
|
||||
"INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" +
|
||||
" VALUES ($1, $2, $3)" +
|
||||
" ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING"
|
||||
}
|
||||
|
||||
func (s *SqliteBackwardsExtremitiesStatements) SelectBackwardExtremitiesForRoom() string {
|
||||
return "" +
|
||||
"SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
|
||||
}
|
||||
|
||||
func (s *SqliteBackwardsExtremitiesStatements) DeleteBackwardExtremity() string {
|
||||
return "" +
|
||||
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
|
||||
}
|
||||
|
||||
// BackwardsExtremities keeps track of backwards extremities for a room.
|
||||
// Backwards extremities are the earliest (DAG-wise) known events which we have
|
||||
// the entire event JSON. These event IDs are used in federation requests to fetch
|
||||
// even earlier events.
|
||||
//
|
||||
// We persist the previous event IDs as well, one per row, so when we do fetch even
|
||||
// earlier events we can simply delete rows which referenced it. Consider the graph:
|
||||
// A
|
||||
// | Event C has 1 prev_event ID: A.
|
||||
// B C
|
||||
// |___| Event D has 2 prev_event IDs: B and C.
|
||||
// |
|
||||
// D
|
||||
// The earliest known event we have is D, so this table has 2 rows.
|
||||
// A backfill request gives us C but not B. We delete rows where prev_event=C. This
|
||||
// still means that D is a backwards extremity as we do not have event B. However, event
|
||||
// C is *also* a backwards extremity at this point as we do not have event A. Later,
|
||||
// when we fetch event B, we delete rows where prev_event=B. This then removes D as
|
||||
// a backwards extremity because there are no more rows with event_id=B.
|
||||
type BackwardsExtremities struct {
|
||||
insertBackwardExtremityStmt *sql.Stmt
|
||||
selectBackwardExtremitiesForRoomStmt *sql.Stmt
|
||||
deleteBackwardExtremityStmt *sql.Stmt
|
||||
}
|
||||
|
||||
// NewBackwardsExtremities prepares the table
|
||||
func NewBackwardsExtremities(db *sql.DB, stmts BackwardsExtremitiesStatements) (table BackwardsExtremities, err error) {
|
||||
_, err = db.Exec(stmts.Schema())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if table.insertBackwardExtremityStmt, err = db.Prepare(stmts.InsertBackwardExtremity()); err != nil {
|
||||
return
|
||||
}
|
||||
if table.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(stmts.SelectBackwardExtremitiesForRoom()); err != nil {
|
||||
return
|
||||
}
|
||||
if table.deleteBackwardExtremityStmt, err = db.Prepare(stmts.DeleteBackwardExtremity()); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// InsertsBackwardExtremity inserts a new backwards extremity.
|
||||
func (s *BackwardsExtremities) InsertsBackwardExtremity(
|
||||
ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string,
|
||||
) (err error) {
|
||||
_, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID)
|
||||
return
|
||||
}
|
||||
|
||||
// SelectBackwardExtremitiesForRoom retrieves all backwards extremities for the room.
|
||||
func (s *BackwardsExtremities) SelectBackwardExtremitiesForRoom(
|
||||
ctx context.Context, roomID string,
|
||||
) (eventIDs []string, err error) {
|
||||
rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed")
|
||||
|
||||
for rows.Next() {
|
||||
var eID string
|
||||
if err = rows.Scan(&eID); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
eventIDs = append(eventIDs, eID)
|
||||
}
|
||||
|
||||
return eventIDs, rows.Err()
|
||||
}
|
||||
|
||||
// DeleteBackwardExtremity removes a backwards extremity for a room, if one existed.
|
||||
func (s *BackwardsExtremities) DeleteBackwardExtremity(
|
||||
ctx context.Context, txn *sql.Tx, roomID, knownEventID string,
|
||||
) (err error) {
|
||||
_, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
|
||||
return
|
||||
}
|
Loading…
Reference in New Issue
Block a user