mirror of
https://github.com/1f349/dendrite.git
synced 2025-01-22 07:16:38 +00:00
Don't persist transaction IDs in the roomserver (#2048)
This commit is contained in:
parent
403498a85b
commit
6e93531e94
@ -121,20 +121,8 @@ func (r *Inputer) processRoomEvent(
|
||||
}
|
||||
}
|
||||
|
||||
// If we don't have a transaction ID then get one.
|
||||
if input.TransactionID != nil {
|
||||
tdID := input.TransactionID
|
||||
eventID, err = r.DB.GetTransactionEventID(
|
||||
ctx, tdID.TransactionID, tdID.SessionID, event.Sender(),
|
||||
)
|
||||
// On error OR event with the transaction already processed/processesing
|
||||
if err != nil || eventID != "" {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Store the event.
|
||||
_, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, input.TransactionID, authEventNIDs, isRejected)
|
||||
_, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("r.DB.StoreEvent: %w", err)
|
||||
}
|
||||
|
@ -562,7 +562,7 @@ func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixs
|
||||
var stateAtEvent types.StateAtEvent
|
||||
var redactedEventID string
|
||||
var redactionEvent *gomatrixserverlib.Event
|
||||
roomNID, stateAtEvent, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids, false)
|
||||
roomNID, stateAtEvent, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), authNids, false)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to persist event")
|
||||
continue
|
||||
|
@ -17,7 +17,6 @@ package storage
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
@ -69,7 +68,7 @@ type Database interface {
|
||||
SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error)
|
||||
// Stores a matrix room event in the database. Returns the room NID, the state snapshot and the redacted event ID if any, or an error.
|
||||
StoreEvent(
|
||||
ctx context.Context, event *gomatrixserverlib.Event, txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID,
|
||||
ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID,
|
||||
isRejected bool,
|
||||
) (types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error)
|
||||
// Look up the state entries for a list of string event IDs
|
||||
@ -92,10 +91,6 @@ type Database interface {
|
||||
// Returns the latest events in the room and the last eventID sent to the log along with an updater.
|
||||
// If this returns an error then no further action is required.
|
||||
GetLatestEventsForUpdate(ctx context.Context, roomInfo types.RoomInfo) (*shared.LatestEventsUpdater, error)
|
||||
// Look up event ID by transaction's info.
|
||||
// This is used to determine if the room event is processed/processing already.
|
||||
// Returns an empty string if no such event exists.
|
||||
GetTransactionEventID(ctx context.Context, transactionID string, sessionID int64, userID string) (string, error)
|
||||
// Look up event references for the latest events in the room and the current state snapshot.
|
||||
// Returns the latest events, the current state and the maximum depth of the latest events plus 1.
|
||||
// Returns an error if there was a problem talking to the database.
|
||||
|
@ -82,9 +82,6 @@ func (d *Database) create(db *sql.DB) error {
|
||||
if err := createRoomsTable(db); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := createTransactionsTable(db); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := createStateBlockTable(db); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -134,10 +131,6 @@ func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
transactions, err := prepareTransactionsTable(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stateBlock, err := prepareStateBlockTable(db)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -179,7 +172,6 @@ func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) error {
|
||||
EventJSONTable: eventJSON,
|
||||
EventsTable: events,
|
||||
RoomsTable: rooms,
|
||||
TransactionsTable: transactions,
|
||||
StateBlockTable: stateBlock,
|
||||
StateSnapshotTable: stateSnapshot,
|
||||
PrevEventsTable: prevEvents,
|
||||
|
@ -1,94 +0,0 @@
|
||||
// Copyright 2017-2018 New Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||
)
|
||||
|
||||
const transactionsSchema = `
|
||||
-- The transactions table holds transaction IDs with sender's info and event ID it belongs to.
|
||||
-- This table is used by roomserver to prevent reprocessing of events.
|
||||
CREATE TABLE IF NOT EXISTS roomserver_transactions (
|
||||
-- The transaction ID of the event.
|
||||
transaction_id TEXT NOT NULL,
|
||||
-- The session ID of the originating transaction.
|
||||
session_id BIGINT NOT NULL,
|
||||
-- User ID of the sender who authored the event
|
||||
user_id TEXT NOT NULL,
|
||||
-- Event ID corresponding to the transaction
|
||||
-- Required to return event ID to client on a duplicate request.
|
||||
event_id TEXT NOT NULL,
|
||||
-- A transaction ID is unique for a user and device
|
||||
-- This automatically creates an index.
|
||||
PRIMARY KEY (transaction_id, session_id, user_id)
|
||||
);
|
||||
`
|
||||
const insertTransactionSQL = "" +
|
||||
"INSERT INTO roomserver_transactions (transaction_id, session_id, user_id, event_id)" +
|
||||
" VALUES ($1, $2, $3, $4)"
|
||||
|
||||
const selectTransactionEventIDSQL = "" +
|
||||
"SELECT event_id FROM roomserver_transactions" +
|
||||
" WHERE transaction_id = $1 AND session_id = $2 AND user_id = $3"
|
||||
|
||||
type transactionStatements struct {
|
||||
insertTransactionStmt *sql.Stmt
|
||||
selectTransactionEventIDStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func createTransactionsTable(db *sql.DB) error {
|
||||
_, err := db.Exec(transactionsSchema)
|
||||
return err
|
||||
}
|
||||
|
||||
func prepareTransactionsTable(db *sql.DB) (tables.Transactions, error) {
|
||||
s := &transactionStatements{}
|
||||
|
||||
return s, sqlutil.StatementList{
|
||||
{&s.insertTransactionStmt, insertTransactionSQL},
|
||||
{&s.selectTransactionEventIDStmt, selectTransactionEventIDSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
func (s *transactionStatements) InsertTransaction(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
transactionID string,
|
||||
sessionID int64,
|
||||
userID string,
|
||||
eventID string,
|
||||
) (err error) {
|
||||
_, err = s.insertTransactionStmt.ExecContext(
|
||||
ctx, transactionID, sessionID, userID, eventID,
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *transactionStatements) SelectTransactionEventID(
|
||||
ctx context.Context,
|
||||
transactionID string,
|
||||
sessionID int64,
|
||||
userID string,
|
||||
) (eventID string, err error) {
|
||||
err = s.selectTransactionEventIDStmt.QueryRowContext(
|
||||
ctx, transactionID, sessionID, userID,
|
||||
).Scan(&eventID)
|
||||
return
|
||||
}
|
@ -9,7 +9,6 @@ import (
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
@ -35,7 +34,6 @@ type Database struct {
|
||||
EventTypesTable tables.EventTypes
|
||||
EventStateKeysTable tables.EventStateKeys
|
||||
RoomsTable tables.Rooms
|
||||
TransactionsTable tables.Transactions
|
||||
StateSnapshotTable tables.StateSnapshot
|
||||
StateBlockTable tables.StateBlock
|
||||
RoomAliasesTable tables.RoomAliases
|
||||
@ -426,17 +424,6 @@ func (d *Database) Events(
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (d *Database) GetTransactionEventID(
|
||||
ctx context.Context, transactionID string,
|
||||
sessionID int64, userID string,
|
||||
) (string, error) {
|
||||
eventID, err := d.TransactionsTable.SelectTransactionEventID(ctx, transactionID, sessionID, userID)
|
||||
if err == sql.ErrNoRows {
|
||||
return "", nil
|
||||
}
|
||||
return eventID, err
|
||||
}
|
||||
|
||||
func (d *Database) MembershipUpdater(
|
||||
ctx context.Context, roomID, targetUserID string,
|
||||
targetLocal bool, roomVersion gomatrixserverlib.RoomVersion,
|
||||
@ -473,7 +460,7 @@ func (d *Database) GetLatestEventsForUpdate(
|
||||
|
||||
func (d *Database) StoreEvent(
|
||||
ctx context.Context, event *gomatrixserverlib.Event,
|
||||
txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID, isRejected bool,
|
||||
authEventNIDs []types.EventNID, isRejected bool,
|
||||
) (types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
|
||||
var (
|
||||
roomNID types.RoomNID
|
||||
@ -487,15 +474,6 @@ func (d *Database) StoreEvent(
|
||||
)
|
||||
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
if txnAndSessionID != nil {
|
||||
if err = d.TransactionsTable.InsertTransaction(
|
||||
ctx, txn, txnAndSessionID.TransactionID,
|
||||
txnAndSessionID.SessionID, event.Sender(), event.EventID(),
|
||||
); err != nil {
|
||||
return fmt.Errorf("d.TransactionsTable.InsertTransaction: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Here we should aim to have two different code paths for new rooms
|
||||
// vs existing ones.
|
||||
|
||||
|
@ -90,9 +90,6 @@ func (d *Database) create(db *sql.DB) error {
|
||||
if err := createRoomsTable(db); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := createTransactionsTable(db); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := createStateBlockTable(db); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -142,10 +139,6 @@ func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
transactions, err := prepareTransactionsTable(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stateBlock, err := prepareStateBlockTable(db)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -187,7 +180,6 @@ func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) error {
|
||||
EventStateKeysTable: eventStateKeys,
|
||||
EventJSONTable: eventJSON,
|
||||
RoomsTable: rooms,
|
||||
TransactionsTable: transactions,
|
||||
StateBlockTable: stateBlock,
|
||||
StateSnapshotTable: stateSnapshot,
|
||||
PrevEventsTable: prevEvents,
|
||||
|
@ -1,91 +0,0 @@
|
||||
// Copyright 2017-2018 New Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||
)
|
||||
|
||||
const transactionsSchema = `
|
||||
CREATE TABLE IF NOT EXISTS roomserver_transactions (
|
||||
transaction_id TEXT NOT NULL,
|
||||
session_id INTEGER NOT NULL,
|
||||
user_id TEXT NOT NULL,
|
||||
event_id TEXT NOT NULL,
|
||||
PRIMARY KEY (transaction_id, session_id, user_id)
|
||||
);
|
||||
`
|
||||
const insertTransactionSQL = `
|
||||
INSERT INTO roomserver_transactions (transaction_id, session_id, user_id, event_id)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
`
|
||||
|
||||
const selectTransactionEventIDSQL = `
|
||||
SELECT event_id FROM roomserver_transactions
|
||||
WHERE transaction_id = $1 AND session_id = $2 AND user_id = $3
|
||||
`
|
||||
|
||||
type transactionStatements struct {
|
||||
db *sql.DB
|
||||
insertTransactionStmt *sql.Stmt
|
||||
selectTransactionEventIDStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func createTransactionsTable(db *sql.DB) error {
|
||||
_, err := db.Exec(transactionsSchema)
|
||||
return err
|
||||
}
|
||||
|
||||
func prepareTransactionsTable(db *sql.DB) (tables.Transactions, error) {
|
||||
s := &transactionStatements{
|
||||
db: db,
|
||||
}
|
||||
|
||||
return s, sqlutil.StatementList{
|
||||
{&s.insertTransactionStmt, insertTransactionSQL},
|
||||
{&s.selectTransactionEventIDStmt, selectTransactionEventIDSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
func (s *transactionStatements) InsertTransaction(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
transactionID string,
|
||||
sessionID int64,
|
||||
userID string,
|
||||
eventID string,
|
||||
) error {
|
||||
stmt := sqlutil.TxStmt(txn, s.insertTransactionStmt)
|
||||
_, err := stmt.ExecContext(
|
||||
ctx, transactionID, sessionID, userID, eventID,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *transactionStatements) SelectTransactionEventID(
|
||||
ctx context.Context,
|
||||
transactionID string,
|
||||
sessionID int64,
|
||||
userID string,
|
||||
) (eventID string, err error) {
|
||||
err = s.selectTransactionEventIDStmt.QueryRowContext(
|
||||
ctx, transactionID, sessionID, userID,
|
||||
).Scan(&eventID)
|
||||
return
|
||||
}
|
@ -76,11 +76,6 @@ type Rooms interface {
|
||||
BulkSelectRoomNIDs(ctx context.Context, roomIDs []string) ([]types.RoomNID, error)
|
||||
}
|
||||
|
||||
type Transactions interface {
|
||||
InsertTransaction(ctx context.Context, txn *sql.Tx, transactionID string, sessionID int64, userID string, eventID string) error
|
||||
SelectTransactionEventID(ctx context.Context, transactionID string, sessionID int64, userID string) (eventID string, err error)
|
||||
}
|
||||
|
||||
type StateSnapshot interface {
|
||||
InsertState(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, stateBlockNIDs types.StateBlockNIDs) (stateNID types.StateSnapshotNID, err error)
|
||||
BulkSelectStateBlockNIDs(ctx context.Context, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error)
|
||||
|
Loading…
Reference in New Issue
Block a user