From 7466e6b7186610ee8696c2d4db7aa1138c24adbe Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 27 Aug 2020 11:05:41 +0100 Subject: [PATCH] Fix lock errors in federation sender (#1347) * Fix lock errors in federation sender * Additional fix to writers --- federationsender/storage/shared/storage_edus.go | 7 +++---- federationsender/storage/shared/storage_pdus.go | 4 ++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/federationsender/storage/shared/storage_edus.go b/federationsender/storage/shared/storage_edus.go index 75a6dd51..529b46aa 100644 --- a/federationsender/storage/shared/storage_edus.go +++ b/federationsender/storage/shared/storage_edus.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" - "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" ) @@ -33,7 +32,7 @@ func (d *Database) AssociateEDUWithDestination( serverName gomatrixserverlib.ServerName, receipt *Receipt, ) error { - return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { for _, nid := range receipt.nids { if err := d.FederationSenderQueueEDUs.InsertQueueEDU( ctx, // context @@ -60,7 +59,7 @@ func (d *Database) GetNextTransactionEDUs( receipt *Receipt, err error, ) { - err = sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { + err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit) if err != nil { return fmt.Errorf("SelectQueueEDUs: %w", err) @@ -99,7 +98,7 @@ func (d *Database) CleanEDUs( return errors.New("expected receipt") } - return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { if err := d.FederationSenderQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, receipt.nids); err != nil { return err } diff --git a/federationsender/storage/shared/storage_pdus.go b/federationsender/storage/shared/storage_pdus.go index 00588956..4b51146d 100644 --- a/federationsender/storage/shared/storage_pdus.go +++ b/federationsender/storage/shared/storage_pdus.go @@ -34,7 +34,7 @@ func (d *Database) AssociatePDUWithDestination( serverName gomatrixserverlib.ServerName, receipt *Receipt, ) error { - return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { for _, nid := range receipt.nids { if err := d.FederationSenderQueuePDUs.InsertQueuePDU( ctx, // context @@ -111,7 +111,7 @@ func (d *Database) CleanPDUs( return errors.New("expected receipt") } - return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, receipt.nids); err != nil { return err }