From 16035b97373849d74961e15616f3f1449f0a5abd Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 7 Jan 2022 17:31:57 +0000 Subject: [PATCH] NATS JetStream tweaks (#2086) * Use named NATS durable consumers * Build fixes * Remove dupe call to SetFederationAPI * Use namespaced consumer name * Fix namespacing * Fix unit tests hopefully --- appservice/consumers/roomserver.go | 4 +++- cmd/dendrite-monolith-server/main.go | 4 ---- federationapi/consumers/eduserver.go | 8 +++++--- federationapi/consumers/roomserver.go | 4 +++- federationapi/federationapi_keys_test.go | 9 +++++++++ roomserver/internal/api.go | 1 + roomserver/internal/input/input.go | 3 +++ setup/config/config_jetstream.go | 6 ++++++ setup/jetstream/nats.go | 7 ++++++- syncapi/consumers/clientapi.go | 4 +++- syncapi/consumers/eduserver_receipts.go | 4 +++- syncapi/consumers/eduserver_sendtodevice.go | 4 +++- syncapi/consumers/eduserver_typing.go | 4 +++- syncapi/consumers/roomserver.go | 4 +++- 14 files changed, 51 insertions(+), 15 deletions(-) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 139b5724..8aea5c34 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -34,6 +34,7 @@ import ( type OutputRoomEventConsumer struct { ctx context.Context jetstream nats.JetStreamContext + durable nats.SubOpt topic string asDB storage.Database rsAPI api.RoomserverInternalAPI @@ -54,6 +55,7 @@ func NewOutputRoomEventConsumer( return &OutputRoomEventConsumer{ ctx: process.Context(), jetstream: js, + durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"), topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), asDB: appserviceDB, rsAPI: rsAPI, @@ -64,7 +66,7 @@ func NewOutputRoomEventConsumer( // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) return err } diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 08851734..4d0598f3 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -99,10 +99,6 @@ func main() { } keyRing := fsAPI.KeyRing() - // The underlying roomserver implementation needs to be able to call the fedsender. - // This is different to rsAPI which can be the http client which doesn't need this dependency - rsImpl.SetFederationAPI(fsAPI, keyRing) - keyImpl := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI) keyAPI := keyImpl if base.UseHTTPAPIs { diff --git a/federationapi/consumers/eduserver.go b/federationapi/consumers/eduserver.go index 9e52acef..c3e5b4d4 100644 --- a/federationapi/consumers/eduserver.go +++ b/federationapi/consumers/eduserver.go @@ -34,6 +34,7 @@ import ( type OutputEDUConsumer struct { ctx context.Context jetstream nats.JetStreamContext + durable nats.SubOpt db storage.Database queues *queue.OutgoingQueues ServerName gomatrixserverlib.ServerName @@ -56,6 +57,7 @@ func NewOutputEDUConsumer( queues: queues, db: store, ServerName: cfg.Matrix.ServerName, + durable: cfg.Matrix.JetStream.Durable("FederationAPIEDUServerConsumer"), typingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), sendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), @@ -64,13 +66,13 @@ func NewOutputEDUConsumer( // Start consuming from EDU servers func (t *OutputEDUConsumer) Start() error { - if _, err := t.jetstream.Subscribe(t.typingTopic, t.onTypingEvent); err != nil { + if _, err := t.jetstream.Subscribe(t.typingTopic, t.onTypingEvent, t.durable); err != nil { return err } - if _, err := t.jetstream.Subscribe(t.sendToDeviceTopic, t.onSendToDeviceEvent); err != nil { + if _, err := t.jetstream.Subscribe(t.sendToDeviceTopic, t.onSendToDeviceEvent, t.durable); err != nil { return err } - if _, err := t.jetstream.Subscribe(t.receiptTopic, t.onReceiptEvent); err != nil { + if _, err := t.jetstream.Subscribe(t.receiptTopic, t.onReceiptEvent, t.durable); err != nil { return err } return nil diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index 12410bb7..632adae3 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -37,6 +37,7 @@ type OutputRoomEventConsumer struct { cfg *config.FederationAPI rsAPI api.RoomserverInternalAPI jetstream nats.JetStreamContext + durable nats.SubOpt db storage.Database queues *queue.OutgoingQueues topic string @@ -58,13 +59,14 @@ func NewOutputRoomEventConsumer( db: store, queues: queues, rsAPI: rsAPI, + durable: cfg.Matrix.JetStream.Durable("FederationAPIRoomServerConsumer"), topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), } } // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) return err } diff --git a/federationapi/federationapi_keys_test.go b/federationapi/federationapi_keys_test.go index b9503963..4774c882 100644 --- a/federationapi/federationapi_keys_test.go +++ b/federationapi/federationapi_keys_test.go @@ -68,6 +68,13 @@ func TestMain(m *testing.M) { panic("can't create cache: " + err.Error()) } + // Create a temporary directory for JetStream. + d, err := ioutil.TempDir("./", "jetstream*") + if err != nil { + panic(err) + } + defer os.RemoveAll(d) + // Draw up just enough Dendrite config for the server key // API to work. cfg := &config.Dendrite{} @@ -75,6 +82,8 @@ func TestMain(m *testing.M) { cfg.Global.ServerName = gomatrixserverlib.ServerName(s.name) cfg.Global.PrivateKey = testPriv cfg.Global.JetStream.InMemory = true + cfg.Global.JetStream.TopicPrefix = string(s.name[:1]) + cfg.Global.JetStream.StoragePath = config.Path(d) cfg.Global.KeyID = serverKeyID cfg.Global.KeyValidityPeriod = s.validity cfg.FederationAPI.Database.ConnectionString = config.DataSource("file::memory:") diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index e370f7e4..cf2e59c6 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -67,6 +67,7 @@ func NewRoomserverAPI( InputRoomEventTopic: inputRoomEventTopic, OutputRoomEventTopic: outputRoomEventTopic, JetStream: consumer, + Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"), ServerName: cfg.Matrix.ServerName, ACLs: serverACLs, }, diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index dbff5fdd..57e51055 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -43,6 +43,7 @@ var keyContentFields = map[string]string{ type Inputer struct { DB storage.Database JetStream nats.JetStreamContext + Durable nats.SubOpt ServerName gomatrixserverlib.ServerName ACLs *acls.ServerACLs InputRoomEventTopic string @@ -85,6 +86,8 @@ func (r *Inputer) Start() error { // or nak them within a certain amount of time. This stops that from // happening, so we don't end up doing a lot of unnecessary duplicate work. nats.MaxDeliver(0), + // Use a durable named consumer. + r.Durable, ) return err } diff --git a/setup/config/config_jetstream.go b/setup/config/config_jetstream.go index 0bd84899..94e2d88b 100644 --- a/setup/config/config_jetstream.go +++ b/setup/config/config_jetstream.go @@ -2,6 +2,8 @@ package config import ( "fmt" + + "github.com/nats-io/nats.go" ) type JetStream struct { @@ -23,6 +25,10 @@ func (c *JetStream) TopicFor(name string) string { return fmt.Sprintf("%s%s", c.TopicPrefix, name) } +func (c *JetStream) Durable(name string) nats.SubOpt { + return nats.Durable(c.TopicFor(name)) +} + func (c *JetStream) Defaults(generate bool) { c.Addresses = []string{} c.TopicPrefix = "Dendrite" diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index eec68d82..6dbbd1f4 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -75,13 +75,18 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContex } if info == nil { stream.Subjects = []string{name} + // If we're trying to keep everything in memory (e.g. unit tests) // then overwrite the storage policy. if cfg.InMemory { stream.Storage = nats.MemoryStorage } - if _, err = s.AddStream(stream); err != nil { + // Namespace the streams without modifying the original streams + // array, otherwise we end up with namespaces on namespaces. + namespaced := *stream + namespaced.Name = name + if _, err = s.AddStream(&namespaced); err != nil { logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream") } } diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index 85710cdd..1ec9beb0 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -34,6 +34,7 @@ import ( type OutputClientDataConsumer struct { ctx context.Context jetstream nats.JetStreamContext + durable nats.SubOpt topic string db storage.Database stream types.StreamProvider @@ -53,6 +54,7 @@ func NewOutputClientDataConsumer( ctx: process.Context(), jetstream: js, topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), + durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"), db: store, notifier: notifier, stream: stream, @@ -61,7 +63,7 @@ func NewOutputClientDataConsumer( // Start consuming from room servers func (s *OutputClientDataConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) return err } diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 582e1d64..57d69d6f 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -34,6 +34,7 @@ import ( type OutputReceiptEventConsumer struct { ctx context.Context jetstream nats.JetStreamContext + durable nats.SubOpt topic string db storage.Database stream types.StreamProvider @@ -54,6 +55,7 @@ func NewOutputReceiptEventConsumer( ctx: process.Context(), jetstream: js, topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"), db: store, notifier: notifier, stream: stream, @@ -62,7 +64,7 @@ func NewOutputReceiptEventConsumer( // Start consuming from EDU api func (s *OutputReceiptEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) return err } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 6579c303..54e689fa 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -36,6 +36,7 @@ import ( type OutputSendToDeviceEventConsumer struct { ctx context.Context jetstream nats.JetStreamContext + durable nats.SubOpt topic string db storage.Database serverName gomatrixserverlib.ServerName // our server name @@ -57,6 +58,7 @@ func NewOutputSendToDeviceEventConsumer( ctx: process.Context(), jetstream: js, topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), + durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"), db: store, serverName: cfg.Matrix.ServerName, notifier: notifier, @@ -66,7 +68,7 @@ func NewOutputSendToDeviceEventConsumer( // Start consuming from EDU api func (s *OutputSendToDeviceEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) return err } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 487befe8..de2f6f95 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -35,6 +35,7 @@ import ( type OutputTypingEventConsumer struct { ctx context.Context jetstream nats.JetStreamContext + durable nats.SubOpt topic string eduCache *cache.EDUCache stream types.StreamProvider @@ -56,6 +57,7 @@ func NewOutputTypingEventConsumer( ctx: process.Context(), jetstream: js, topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), + durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"), eduCache: eduCache, notifier: notifier, stream: stream, @@ -64,7 +66,7 @@ func NewOutputTypingEventConsumer( // Start consuming from EDU api func (s *OutputTypingEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) return err } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 5b008e3d..6b3ebe53 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -38,6 +38,7 @@ type OutputRoomEventConsumer struct { cfg *config.SyncAPI rsAPI api.RoomserverInternalAPI jetstream nats.JetStreamContext + durable nats.SubOpt topic string db storage.Database pduStream types.StreamProvider @@ -61,6 +62,7 @@ func NewOutputRoomEventConsumer( cfg: cfg, jetstream: js, topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), + durable: cfg.Matrix.JetStream.Durable("SyncAPIRoomServerConsumer"), db: store, notifier: notifier, pduStream: pduStream, @@ -71,7 +73,7 @@ func NewOutputRoomEventConsumer( // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) return err }