From 5298dd1133948606172944b7e5ec3805ccc72644 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 26 Oct 2022 14:52:33 +0100 Subject: [PATCH] Update federation API consumers --- federationapi/consumers/keychange.go | 44 ++++++++++++------------- federationapi/consumers/presence.go | 10 +++--- federationapi/consumers/receipts.go | 34 +++++++++---------- federationapi/consumers/sendtodevice.go | 36 ++++++++++---------- federationapi/consumers/typing.go | 32 +++++++++--------- 5 files changed, 78 insertions(+), 78 deletions(-) diff --git a/federationapi/consumers/keychange.go b/federationapi/consumers/keychange.go index 67dfdc1d..7d1ae0f8 100644 --- a/federationapi/consumers/keychange.go +++ b/federationapi/consumers/keychange.go @@ -35,14 +35,14 @@ import ( // KeyChangeConsumer consumes events that originate in key server. type KeyChangeConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - db storage.Database - queues *queue.OutgoingQueues - serverName gomatrixserverlib.ServerName - rsAPI roomserverAPI.FederationRoomserverAPI - topic string + ctx context.Context + jetstream nats.JetStreamContext + durable string + db storage.Database + queues *queue.OutgoingQueues + isLocalServerName func(gomatrixserverlib.ServerName) bool + rsAPI roomserverAPI.FederationRoomserverAPI + topic string } // NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers. @@ -55,14 +55,14 @@ func NewKeyChangeConsumer( rsAPI roomserverAPI.FederationRoomserverAPI, ) *KeyChangeConsumer { return &KeyChangeConsumer{ - ctx: process.Context(), - jetstream: js, - durable: cfg.Matrix.JetStream.Prefixed("FederationAPIKeyChangeConsumer"), - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), - queues: queues, - db: store, - serverName: cfg.Matrix.ServerName, - rsAPI: rsAPI, + ctx: process.Context(), + jetstream: js, + durable: cfg.Matrix.JetStream.Prefixed("FederationAPIKeyChangeConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), + queues: queues, + db: store, + isLocalServerName: cfg.Matrix.IsLocalServerName, + rsAPI: rsAPI, } } @@ -112,7 +112,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool { logger.WithError(err).Error("Failed to extract domain from key change event") return true } - if originServerName != t.serverName { + if !t.isLocalServerName(originServerName) { return true } @@ -141,7 +141,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool { // Pack the EDU and marshal it edu := &gomatrixserverlib.EDU{ Type: gomatrixserverlib.MDeviceListUpdate, - Origin: string(t.serverName), + Origin: string(originServerName), } event := gomatrixserverlib.DeviceListUpdateEvent{ UserID: m.UserID, @@ -159,7 +159,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool { } logger.Debugf("Sending device list update message to %q", destinations) - err = t.queues.SendEDU(edu, t.serverName, destinations) + err = t.queues.SendEDU(edu, originServerName, destinations) return err == nil } @@ -171,7 +171,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool { logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure") return true } - if host != gomatrixserverlib.ServerName(t.serverName) { + if !t.isLocalServerName(host) { // Ignore any messages that didn't originate locally, otherwise we'll // end up parroting information we received from other servers. return true @@ -203,7 +203,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool { // Pack the EDU and marshal it edu := &gomatrixserverlib.EDU{ Type: types.MSigningKeyUpdate, - Origin: string(t.serverName), + Origin: string(host), } if edu.Content, err = json.Marshal(output); err != nil { sentry.CaptureException(err) @@ -212,7 +212,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool { } logger.Debugf("Sending cross-signing update message to %q", destinations) - err = t.queues.SendEDU(edu, t.serverName, destinations) + err = t.queues.SendEDU(edu, host, destinations) return err == nil } diff --git a/federationapi/consumers/presence.go b/federationapi/consumers/presence.go index e76103cd..3445d34a 100644 --- a/federationapi/consumers/presence.go +++ b/federationapi/consumers/presence.go @@ -38,7 +38,7 @@ type OutputPresenceConsumer struct { durable string db storage.Database queues *queue.OutgoingQueues - ServerName gomatrixserverlib.ServerName + isLocalServerName func(gomatrixserverlib.ServerName) bool topic string outboundPresenceEnabled bool } @@ -56,7 +56,7 @@ func NewOutputPresenceConsumer( jetstream: js, queues: queues, db: store, - ServerName: cfg.Matrix.ServerName, + isLocalServerName: cfg.Matrix.IsLocalServerName, durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound, @@ -85,7 +85,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg log.WithError(err).WithField("user_id", userID).Error("failed to extract domain from receipt sender") return true } - if serverName != t.ServerName { + if !t.isLocalServerName(serverName) { return true } @@ -127,7 +127,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg edu := &gomatrixserverlib.EDU{ Type: gomatrixserverlib.MPresence, - Origin: string(t.ServerName), + Origin: string(serverName), } if edu.Content, err = json.Marshal(content); err != nil { log.WithError(err).Error("failed to marshal EDU JSON") @@ -135,7 +135,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg } log.Tracef("sending presence EDU to %d servers", len(joined)) - if err = t.queues.SendEDU(edu, t.ServerName, joined); err != nil { + if err = t.queues.SendEDU(edu, serverName, joined); err != nil { log.WithError(err).Error("failed to send EDU") return false } diff --git a/federationapi/consumers/receipts.go b/federationapi/consumers/receipts.go index 75827cb6..200c06e6 100644 --- a/federationapi/consumers/receipts.go +++ b/federationapi/consumers/receipts.go @@ -34,13 +34,13 @@ import ( // OutputReceiptConsumer consumes events that originate in the clientapi. type OutputReceiptConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - db storage.Database - queues *queue.OutgoingQueues - ServerName gomatrixserverlib.ServerName - topic string + ctx context.Context + jetstream nats.JetStreamContext + durable string + db storage.Database + queues *queue.OutgoingQueues + isLocalServerName func(gomatrixserverlib.ServerName) bool + topic string } // NewOutputReceiptConsumer creates a new OutputReceiptConsumer. Call Start() to begin consuming typing events. @@ -52,13 +52,13 @@ func NewOutputReceiptConsumer( store storage.Database, ) *OutputReceiptConsumer { return &OutputReceiptConsumer{ - ctx: process.Context(), - jetstream: js, - queues: queues, - db: store, - ServerName: cfg.Matrix.ServerName, - durable: cfg.Matrix.JetStream.Durable("FederationAPIReceiptConsumer"), - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), + ctx: process.Context(), + jetstream: js, + queues: queues, + db: store, + isLocalServerName: cfg.Matrix.IsLocalServerName, + durable: cfg.Matrix.JetStream.Durable("FederationAPIReceiptConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), } } @@ -95,7 +95,7 @@ func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender") return true } - if receiptServerName != t.ServerName { + if !t.isLocalServerName(receiptServerName) { return true } @@ -134,14 +134,14 @@ func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) edu := &gomatrixserverlib.EDU{ Type: gomatrixserverlib.MReceipt, - Origin: string(t.ServerName), + Origin: string(receiptServerName), } if edu.Content, err = json.Marshal(content); err != nil { log.WithError(err).Error("failed to marshal EDU JSON") return true } - if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil { + if err := t.queues.SendEDU(edu, receiptServerName, names); err != nil { log.WithError(err).Error("failed to send EDU") return false } diff --git a/federationapi/consumers/sendtodevice.go b/federationapi/consumers/sendtodevice.go index 9aec22a3..9620d161 100644 --- a/federationapi/consumers/sendtodevice.go +++ b/federationapi/consumers/sendtodevice.go @@ -34,13 +34,13 @@ import ( // OutputSendToDeviceConsumer consumes events that originate in the clientapi. type OutputSendToDeviceConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - db storage.Database - queues *queue.OutgoingQueues - ServerName gomatrixserverlib.ServerName - topic string + ctx context.Context + jetstream nats.JetStreamContext + durable string + db storage.Database + queues *queue.OutgoingQueues + isLocalServerName func(gomatrixserverlib.ServerName) bool + topic string } // NewOutputSendToDeviceConsumer creates a new OutputSendToDeviceConsumer. Call Start() to begin consuming send-to-device events. @@ -52,13 +52,13 @@ func NewOutputSendToDeviceConsumer( store storage.Database, ) *OutputSendToDeviceConsumer { return &OutputSendToDeviceConsumer{ - ctx: process.Context(), - jetstream: js, - queues: queues, - db: store, - ServerName: cfg.Matrix.ServerName, - durable: cfg.Matrix.JetStream.Durable("FederationAPIESendToDeviceConsumer"), - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), + ctx: process.Context(), + jetstream: js, + queues: queues, + db: store, + isLocalServerName: cfg.Matrix.IsLocalServerName, + durable: cfg.Matrix.JetStream.Durable("FederationAPIESendToDeviceConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), } } @@ -82,7 +82,7 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats log.WithError(err).WithField("user_id", sender).Error("Failed to extract domain from send-to-device sender") return true } - if originServerName != t.ServerName { + if !t.isLocalServerName(originServerName) { return true } // Extract the send-to-device event from msg. @@ -101,14 +101,14 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats } // The SyncAPI is already handling sendToDevice for the local server - if destServerName == t.ServerName { + if t.isLocalServerName(destServerName) { return true } // Pack the EDU and marshal it edu := &gomatrixserverlib.EDU{ Type: gomatrixserverlib.MDirectToDevice, - Origin: string(t.ServerName), + Origin: string(originServerName), } tdm := gomatrixserverlib.ToDeviceMessage{ Sender: ote.Sender, @@ -127,7 +127,7 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats } log.Debugf("Sending send-to-device message into %q destination queue", destServerName) - if err := t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil { + if err := t.queues.SendEDU(edu, originServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil { log.WithError(err).Error("failed to send EDU") return false } diff --git a/federationapi/consumers/typing.go b/federationapi/consumers/typing.go index 9c737913..c66f9751 100644 --- a/federationapi/consumers/typing.go +++ b/federationapi/consumers/typing.go @@ -31,13 +31,13 @@ import ( // OutputTypingConsumer consumes events that originate in the clientapi. type OutputTypingConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - db storage.Database - queues *queue.OutgoingQueues - ServerName gomatrixserverlib.ServerName - topic string + ctx context.Context + jetstream nats.JetStreamContext + durable string + db storage.Database + queues *queue.OutgoingQueues + isLocalServerName func(gomatrixserverlib.ServerName) bool + topic string } // NewOutputTypingConsumer creates a new OutputTypingConsumer. Call Start() to begin consuming typing events. @@ -49,13 +49,13 @@ func NewOutputTypingConsumer( store storage.Database, ) *OutputTypingConsumer { return &OutputTypingConsumer{ - ctx: process.Context(), - jetstream: js, - queues: queues, - db: store, - ServerName: cfg.Matrix.ServerName, - durable: cfg.Matrix.JetStream.Durable("FederationAPITypingConsumer"), - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), + ctx: process.Context(), + jetstream: js, + queues: queues, + db: store, + isLocalServerName: cfg.Matrix.IsLocalServerName, + durable: cfg.Matrix.JetStream.Durable("FederationAPITypingConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), } } @@ -87,7 +87,7 @@ func (t *OutputTypingConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) _ = msg.Ack() return true } - if typingServerName != t.ServerName { + if !t.isLocalServerName(typingServerName) { return true } @@ -111,7 +111,7 @@ func (t *OutputTypingConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) log.WithError(err).Error("failed to marshal EDU JSON") return true } - if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil { + if err := t.queues.SendEDU(edu, typingServerName, names); err != nil { log.WithError(err).Error("failed to send EDU") return false }