From 98a5e410d7ecae49f525ddabc55a86d8a6731f22 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 23 Mar 2022 10:20:18 +0000 Subject: [PATCH] Per-room consumers (#2293) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Roomserver input refactoring — again! * Ensure the actor runs again * Preserve consumer after unsubscribe * Another sprinkling of magic * Rename `TopicFor` to `Prefixed` * Recreate the stream if the config is bad * Check streams too * Prefix subjects, preserve inboxes * Recreate if subjects wrong * Remove stream subject * Reconstruct properly * Fix mutex unlock * Comments * Fix tests * Don't drop events * Review comments * Separate `queueInputRoomEvents` function * Re-jig control flow a bit --- appservice/consumers/roomserver.go | 2 +- clientapi/clientapi.go | 2 +- eduserver/eduserver.go | 6 +- federationapi/consumers/eduserver.go | 6 +- federationapi/consumers/keychange.go | 4 +- federationapi/consumers/roomserver.go | 2 +- keyserver/keyserver.go | 2 +- roomserver/internal/api.go | 1 + roomserver/internal/input/input.go | 393 ++++++++++++++------ roomserver/roomserver.go | 4 +- setup/config/config_jetstream.go | 4 +- setup/jetstream/nats.go | 31 +- setup/jetstream/streams.go | 14 +- syncapi/consumers/clientapi.go | 2 +- syncapi/consumers/eduserver_receipts.go | 2 +- syncapi/consumers/eduserver_sendtodevice.go | 2 +- syncapi/consumers/eduserver_typing.go | 2 +- syncapi/consumers/roomserver.go | 2 +- syncapi/consumers/userapi.go | 2 +- syncapi/syncapi.go | 6 +- userapi/consumers/syncapi_readupdate.go | 2 +- userapi/consumers/syncapi_streamevent.go | 2 +- userapi/userapi.go | 4 +- 23 files changed, 345 insertions(+), 152 deletions(-) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 9d723bed..01790722 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -56,7 +56,7 @@ func NewOutputRoomEventConsumer( ctx: process.Context(), jetstream: js, durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"), - topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), + topic: cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent), asDB: appserviceDB, rsAPI: rsAPI, serverName: string(cfg.Global.ServerName), diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 75184d3b..4550343c 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -55,7 +55,7 @@ func AddPublicRoutes( syncProducer := &producers.SyncAPIProducer{ JetStream: js, - Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), + Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), } routing.Setup( diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index 9fe8704c..91208a40 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -48,9 +48,9 @@ func NewInternalAPI( Cache: eduCache, UserAPI: userAPI, JetStream: js, - OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), - OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), - OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + OutputTypingEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), + OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), + OutputReceiptEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), ServerName: cfg.Matrix.ServerName, } } diff --git a/federationapi/consumers/eduserver.go b/federationapi/consumers/eduserver.go index 1f81fa25..e14e60f4 100644 --- a/federationapi/consumers/eduserver.go +++ b/federationapi/consumers/eduserver.go @@ -58,9 +58,9 @@ func NewOutputEDUConsumer( db: store, ServerName: cfg.Matrix.ServerName, durable: cfg.Matrix.JetStream.Durable("FederationAPIEDUServerConsumer"), - typingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), - sendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), - receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + typingTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), + sendToDeviceTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), + receiptTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), } } diff --git a/federationapi/consumers/keychange.go b/federationapi/consumers/keychange.go index 33d716d2..94e45435 100644 --- a/federationapi/consumers/keychange.go +++ b/federationapi/consumers/keychange.go @@ -55,8 +55,8 @@ func NewKeyChangeConsumer( return &KeyChangeConsumer{ ctx: process.Context(), jetstream: js, - durable: cfg.Matrix.JetStream.TopicFor("FederationAPIKeyChangeConsumer"), - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), + durable: cfg.Matrix.JetStream.Prefixed("FederationAPIKeyChangeConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), queues: queues, db: store, serverName: cfg.Matrix.ServerName, diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index 989f7cf4..ff2c8e5d 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -61,7 +61,7 @@ func NewOutputRoomEventConsumer( queues: queues, rsAPI: rsAPI, durable: cfg.Matrix.JetStream.Durable("FederationAPIRoomServerConsumer"), - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), } } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index cf66bd38..c557dfba 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -46,7 +46,7 @@ func NewInternalAPI( logrus.WithError(err).Panicf("failed to connect to key server database") } keyChangeProducer := &producers.KeyChange{ - Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), + Topic: string(cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent)), JetStream: js, DB: db, } diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 91001e41..f96cefcb 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -90,6 +90,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA r.KeyRing = keyRing r.Inputer = &input.Inputer{ + Cfg: r.Cfg, ProcessContext: r.ProcessContext, DB: r.DB, InputRoomEventTopic: r.InputRoomEventTopic, diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index c6e35461..6a8ae6d0 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -19,6 +19,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "sync" "time" @@ -29,6 +30,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" @@ -45,7 +47,35 @@ var keyContentFields = map[string]string{ "m.room.member": "membership", } +// Inputer is responsible for consuming from the roomserver input +// streams and processing the events. All input events are queued +// into a single NATS stream and the order is preserved strictly. +// The `room_id` message header will contain the room ID which will +// be used to assign the pending event to a per-room worker. +// +// The input API maintains an ephemeral headers-only consumer. It +// will speed through the stream working out which room IDs are +// pending and create durable consumers for them. The durable +// consumer will then be used for each room worker goroutine to +// fetch events one by one and process them. Each room having a +// durable consumer of its own means there is no head-of-line +// blocking between rooms. Filtering ensures that each durable +// consumer only receives events for the room it is interested in. +// +// The ephemeral consumer closely tracks the newest events. The +// per-room durable consumers will only progress through the stream +// as events are processed. +// +// A BC * -> positions of each consumer (* = ephemeral) +// ⌄ ⌄⌄ ⌄ +// ABAABCAABCAA -> newest (letter = subject for each message) +// +// In this example, A is still processing an event but has two +// pending events to process afterwards. Both B and C are caught +// up, so they will do nothing until a new event comes in for B +// or C. type Inputer struct { + Cfg *config.RoomServer ProcessContext *process.ProcessContext DB storage.Database NATSClient *nats.Conn @@ -57,147 +87,275 @@ type Inputer struct { ACLs *acls.ServerACLs InputRoomEventTopic string OutputRoomEventTopic string - workers sync.Map // room ID -> *phony.Inbox + workers sync.Map // room ID -> *worker Queryer *query.Queryer } -func (r *Inputer) workerForRoom(roomID string) *phony.Inbox { - inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) - return inbox.(*phony.Inbox) +type worker struct { + phony.Inbox + sync.Mutex + r *Inputer + roomID string + subscription *nats.Subscription } -// eventsInProgress is an in-memory map to keep a track of which events we have -// queued up for processing. If we get a redelivery from NATS and we still have -// the queued up item then we won't do anything with the redelivered message. If -// we've restarted Dendrite and now this map is empty then it means that we will -// reload pending work from NATS. -var eventsInProgress sync.Map +func (r *Inputer) startWorkerForRoom(roomID string) { + v, loaded := r.workers.LoadOrStore(roomID, &worker{ + r: r, + roomID: roomID, + }) + w := v.(*worker) + w.Lock() + defer w.Unlock() + if !loaded || w.subscription == nil { + consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID)) + subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID)) -// onMessage is called when a new event arrives in the roomserver input stream. + // Create the consumer. We do this as a specific step rather than + // letting PullSubscribe create it for us because we need the consumer + // to outlive the subscription. If we do it this way, we can Bind in the + // next step, and when we Unsubscribe, the consumer continues to live. If + // we leave PullSubscribe to create the durable consumer, Unsubscribe will + // delete it because it thinks it "owns" it, which in turn breaks the + // interest-based retention storage policy. + // If the durable consumer already exists, this is effectively a no-op. + // Another interesting tid-bit here: the ACK policy is set to "all" so that + // if we acknowledge a message, we also acknowledge everything that comes + // before it. This is necessary because otherwise our consumer will never + // acknowledge things we filtered out for other subjects and therefore they + // will linger around forever. + if _, err := w.r.JetStream.AddConsumer( + r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), + &nats.ConsumerConfig{ + Durable: consumer, + AckPolicy: nats.AckAllPolicy, + DeliverPolicy: nats.DeliverAllPolicy, + FilterSubject: subject, + AckWait: MaximumMissingProcessingTime + (time.Second * 10), + }, + ); err != nil { + logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID) + return + } + + // Bind to our durable consumer. We want to receive all messages waiting + // for this subject and we want to manually acknowledge them, so that we + // can ensure they are only cleaned up when we are done processing them. + sub, err := w.r.JetStream.PullSubscribe( + subject, consumer, + nats.ManualAck(), + nats.DeliverAll(), + nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)), + nats.Bind(r.InputRoomEventTopic, consumer), + ) + if err != nil { + logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID) + return + } + + // Go and start pulling messages off the queue. + w.subscription = sub + w.Act(nil, w._next) + } +} + +// Start creates an ephemeral non-durable consumer on the roomserver +// input topic. It is configured to deliver us headers only because we +// don't actually care about the contents of the message at this point, +// we only care about the `room_id` field. Once a message arrives, we +// will look to see if we have a worker for that room which has its +// own consumer. If we don't, we'll start one. func (r *Inputer) Start() error { _, err := r.JetStream.Subscribe( - r.InputRoomEventTopic, - // We specifically don't use jetstream.WithJetStreamMessage here because we - // queue the task off to a room-specific queue and the ACK needs to be sent - // later, possibly with an error response to the inputter if synchronous. - func(msg *nats.Msg) { - roomID := msg.Header.Get("room_id") - var inputRoomEvent api.InputRoomEvent - if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { - _ = msg.Term() - return - } - - _ = msg.InProgress() - index := roomID + "\000" + inputRoomEvent.Event.EventID() - if _, ok := eventsInProgress.LoadOrStore(index, struct{}{}); ok { - // We're already waiting to deal with this event, so there's no - // point in queuing it up again. We've notified NATS that we're - // working on the message still, so that will have deferred the - // redelivery by a bit. - return - } - - roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() - r.workerForRoom(roomID).Act(nil, func() { - _ = msg.InProgress() // resets the acknowledgement wait timer - defer eventsInProgress.Delete(index) - defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() - var errString string - if err := r.processRoomEvent(r.ProcessContext.Context(), &inputRoomEvent); err != nil { - if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { - sentry.CaptureException(err) - } - logrus.WithError(err).WithFields(logrus.Fields{ - "room_id": roomID, - "event_id": inputRoomEvent.Event.EventID(), - "type": inputRoomEvent.Event.Type(), - }).Warn("Roomserver failed to process async event") - _ = msg.Term() - errString = err.Error() - } else { - _ = msg.Ack() - } - if replyTo := msg.Header.Get("sync"); replyTo != "" { - if err := r.NATSClient.Publish(replyTo, []byte(errString)); err != nil { - logrus.WithError(err).WithFields(logrus.Fields{ - "room_id": roomID, - "event_id": inputRoomEvent.Event.EventID(), - "type": inputRoomEvent.Event.Type(), - }).Warn("Roomserver failed to respond for sync event") - } - } - }) + "", // This is blank because we specified it in BindStream. + func(m *nats.Msg) { + roomID := m.Header.Get(jetstream.RoomID) + r.startWorkerForRoom(roomID) + _ = m.Ack() }, - // NATS wants to acknowledge automatically by default when the message is - // read from the stream, but we want to override that behaviour by making - // sure that we only acknowledge when we're happy we've done everything we - // can. This ensures we retry things when it makes sense to do so. - nats.ManualAck(), - // Use a durable named consumer. - r.Durable, - // If we've missed things in the stream, e.g. we restarted, then replay - // all of the queued messages that were waiting for us. + nats.HeadersOnly(), nats.DeliverAll(), - // Ensure that NATS doesn't try to resend us something that wasn't done - // within the period of time that we might still be processing it. - nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)), - // It is recommended to disable this for pull consumers as per the docs: - // https://docs.nats.io/nats-concepts/jetstream/consumers#note-about-push-and-pull-consumers - nats.MaxAckPending(-1), + nats.AckAll(), + nats.BindStream(r.InputRoomEventTopic), ) return err } +// _next is called by the worker for the room. It must only be called +// by the actor embedded into the worker. +func (w *worker) _next() { + // Look up what the next event is that's waiting to be processed. + ctx, cancel := context.WithTimeout(w.r.ProcessContext.Context(), time.Minute) + defer cancel() + msgs, err := w.subscription.Fetch(1, nats.Context(ctx)) + switch err { + case nil: + // Make sure that once we're done here, we queue up another call + // to _next in the inbox. + defer w.Act(nil, w._next) + + // If no error was reported, but we didn't get exactly one message, + // then skip over this and try again on the next iteration. + if len(msgs) != 1 { + return + } + + case context.DeadlineExceeded: + // The context exceeded, so we've been waiting for more than a + // minute for activity in this room. At this point we will shut + // down the subscriber to free up resources. It'll get started + // again if new activity happens. + if err = w.subscription.Unsubscribe(); err != nil { + logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) + } + w.Lock() + w.subscription = nil + w.Unlock() + return + + default: + // Something went wrong while trying to fetch the next event + // from the queue. In which case, we'll shut down the subscriber + // and wait to be notified about new room activity again. Maybe + // the problem will be corrected by then. + logrus.WithError(err).Errorf("Failed to get next stream message for room %q", w.roomID) + if err = w.subscription.Unsubscribe(); err != nil { + logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) + } + w.Lock() + w.subscription = nil + w.Unlock() + return + } + + // Try to unmarshal the input room event. If the JSON unmarshalling + // fails then we'll terminate the message — this notifies NATS that + // we are done with the message and never want to see it again. + msg := msgs[0] + var inputRoomEvent api.InputRoomEvent + if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { + _ = msg.Term() + return + } + + roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc() + defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() + + // Process the room event. If something goes wrong then we'll tell + // NATS to terminate the message. We'll store the error result as + // a string, because we might want to return that to the caller if + // it was a synchronous request. + var errString string + if err = w.r.processRoomEvent(w.r.ProcessContext.Context(), &inputRoomEvent); err != nil { + if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { + sentry.CaptureException(err) + } + logrus.WithError(err).WithFields(logrus.Fields{ + "room_id": w.roomID, + "event_id": inputRoomEvent.Event.EventID(), + "type": inputRoomEvent.Event.Type(), + }).Warn("Roomserver failed to process async event") + _ = msg.Term() + errString = err.Error() + } else { + _ = msg.Ack() + } + + // If it was a synchronous input request then the "sync" field + // will be present in the message. That means that someone is + // waiting for a response. The temporary inbox name is present in + // that field, so send back the error string (if any). If there + // was no error then we'll return a blank message, which means + // that everything was OK. + if replyTo := msg.Header.Get("sync"); replyTo != "" { + if err = w.r.NATSClient.Publish(replyTo, []byte(errString)); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "room_id": w.roomID, + "event_id": inputRoomEvent.Event.EventID(), + "type": inputRoomEvent.Event.Type(), + }).Warn("Roomserver failed to respond for sync event") + } + } +} + +// queueInputRoomEvents queues events into the roomserver input +// stream in NATS. +func (r *Inputer) queueInputRoomEvents( + ctx context.Context, + request *api.InputRoomEventsRequest, +) (replySub *nats.Subscription, err error) { + // If the request is synchronous then we need to create a + // temporary inbox to wait for responses on, and then create + // a subscription to it. If it's asynchronous then we won't + // bother, so these values will remain empty. + var replyTo string + if !request.Asynchronous { + replyTo = nats.NewInbox() + replySub, err = r.NATSClient.SubscribeSync(replyTo) + if err != nil { + return nil, fmt.Errorf("r.NATSClient.SubscribeSync: %w", err) + } + if replySub == nil { + // This shouldn't ever happen, but it doesn't hurt to check + // because we can potentially avoid a nil pointer panic later + // if it did for some reason. + return nil, fmt.Errorf("expected a subscription to the temporary inbox") + } + } + + // For each event, marshal the input room event and then + // send it into the input queue. + for _, e := range request.InputRoomEvents { + roomID := e.Event.RoomID() + subj := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(roomID)) + msg := &nats.Msg{ + Subject: subj, + Header: nats.Header{}, + } + msg.Header.Set("room_id", roomID) + if replyTo != "" { + msg.Header.Set("sync", replyTo) + } + msg.Data, err = json.Marshal(e) + if err != nil { + return nil, fmt.Errorf("json.Marshal: %w", err) + } + if _, err = r.JetStream.PublishMsg(msg, nats.Context(ctx)); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "room_id": roomID, + "event_id": e.Event.EventID(), + "subj": subj, + }).Error("Roomserver failed to queue async event") + return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err) + } + } + return +} + // InputRoomEvents implements api.RoomserverInternalAPI func (r *Inputer) InputRoomEvents( ctx context.Context, request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, ) { - var replyTo string - var replySub *nats.Subscription - if !request.Asynchronous { - var err error - replyTo = nats.NewInbox() - replySub, err = r.NATSClient.SubscribeSync(replyTo) - if err != nil { - response.ErrMsg = err.Error() - return - } - } - - var err error - for _, e := range request.InputRoomEvents { - msg := &nats.Msg{ - Subject: r.InputRoomEventTopic, - Header: nats.Header{}, - Reply: replyTo, - } - roomID := e.Event.RoomID() - msg.Header.Set("room_id", roomID) - if replyTo != "" { - msg.Header.Set("sync", replyTo) - } - msg.Data, err = json.Marshal(e) - if err != nil { - response.ErrMsg = err.Error() - return - } - if _, err = r.JetStream.PublishMsg(msg); err != nil { - logrus.WithError(err).WithFields(logrus.Fields{ - "room_id": roomID, - "event_id": e.Event.EventID(), - }).Error("Roomserver failed to queue async event") - return - } - } - - if request.Asynchronous || replySub == nil { + // Queue up the event into the roomserver. + replySub, err := r.queueInputRoomEvents(ctx, request) + if err != nil { + response.ErrMsg = err.Error() return } + // If we aren't waiting for synchronous responses then we can + // give up here, there is nothing further to do. + if replySub == nil { + return + } + + // Otherwise, we'll want to sit and wait for the responses + // from the roomserver. There will be one response for every + // input we submitted. The last error value we receive will + // be the one returned as the error string. defer replySub.Drain() // nolint:errcheck for i := 0; i < len(request.InputRoomEvents); i++ { msg, err := replySub.NextMsgWithContext(ctx) @@ -207,7 +365,6 @@ func (r *Inputer) InputRoomEvents( } if len(msg.Data) > 0 { response.ErrMsg = string(msg.Data) - return } } } diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 896773ba..36e3c526 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -54,8 +54,8 @@ func NewInternalAPI( return internal.NewRoomserverAPI( base.ProcessContext, cfg, roomserverDB, js, nc, - cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent), - cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), + cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), + cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), base.Caches, perspectiveServerNames, ) } diff --git a/setup/config/config_jetstream.go b/setup/config/config_jetstream.go index 9271cd8b..b6a93d39 100644 --- a/setup/config/config_jetstream.go +++ b/setup/config/config_jetstream.go @@ -19,12 +19,12 @@ type JetStream struct { InMemory bool `yaml:"in_memory"` } -func (c *JetStream) TopicFor(name string) string { +func (c *JetStream) Prefixed(name string) string { return fmt.Sprintf("%s%s", c.TopicPrefix, name) } func (c *JetStream) Durable(name string) string { - return c.TopicFor(name) + return c.Prefixed(name) } func (c *JetStream) Defaults(generate bool) { diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 43cc0331..748c191b 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -1,6 +1,7 @@ package jetstream import ( + "reflect" "strings" "sync" "time" @@ -75,14 +76,35 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream } for _, stream := range streams { // streams are defined in streams.go - name := cfg.TopicFor(stream.Name) + name := cfg.Prefixed(stream.Name) info, err := s.StreamInfo(name) if err != nil && err != natsclient.ErrStreamNotFound { logrus.WithError(err).Fatal("Unable to get stream info") } + subjects := stream.Subjects + if len(subjects) == 0 { + // By default we want each stream to listen for the subjects + // that are either an exact match for the stream name, or where + // the first part of the subject is the stream name. ">" is a + // wildcard in NATS for one or more subject tokens. In the case + // that the stream is called "Foo", this will match any message + // with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc. + subjects = []string{name, name + ".>"} + } + if info != nil { + switch { + case !reflect.DeepEqual(info.Config.Subjects, subjects): + fallthrough + case info.Config.Retention != stream.Retention: + fallthrough + case info.Config.Storage != stream.Storage: + if err = s.DeleteStream(name); err != nil { + logrus.WithError(err).Fatal("Unable to delete stream") + } + info = nil + } + } if info == nil { - stream.Subjects = []string{name} - // If we're trying to keep everything in memory (e.g. unit tests) // then overwrite the storage policy. if cfg.InMemory { @@ -93,8 +115,9 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream // array, otherwise we end up with namespaces on namespaces. namespaced := *stream namespaced.Name = name + namespaced.Subjects = subjects if _, err = s.AddStream(&namespaced); err != nil { - logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream") + logrus.WithError(err).WithField("stream", name).WithField("subjects", subjects).Fatal("Unable to add stream") } } } diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index aa3e95cb..aa979924 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -1,6 +1,8 @@ package jetstream import ( + "fmt" + "regexp" "time" "github.com/nats-io/nats.go" @@ -24,10 +26,20 @@ var ( OutputReadUpdate = "OutputReadUpdate" ) +var safeCharacters = regexp.MustCompile("[^A-Za-z0-9$]+") + +func Tokenise(str string) string { + return safeCharacters.ReplaceAllString(str, "_") +} + +func InputRoomEventSubj(roomID string) string { + return fmt.Sprintf("%s.%s", InputRoomEvent, Tokenise(roomID)) +} + var streams = []*nats.StreamConfig{ { Name: InputRoomEvent, - Retention: nats.WorkQueuePolicy, + Retention: nats.InterestPolicy, Storage: nats.FileStorage, }, { diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index fcb7b5b1..40c1cd3d 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -61,7 +61,7 @@ func NewOutputClientDataConsumer( return &OutputClientDataConsumer{ ctx: process.Context(), jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"), db: store, notifier: notifier, diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 4e4c61c6..ab79998e 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -62,7 +62,7 @@ func NewOutputReceiptEventConsumer( return &OutputReceiptEventConsumer{ ctx: process.Context(), jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"), db: store, notifier: notifier, diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index b0beef06..bdbe7735 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -57,7 +57,7 @@ func NewOutputSendToDeviceEventConsumer( return &OutputSendToDeviceEventConsumer{ ctx: process.Context(), jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"), db: store, serverName: cfg.Matrix.ServerName, diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index cae5df8a..c2828c7f 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -56,7 +56,7 @@ func NewOutputTypingEventConsumer( return &OutputTypingEventConsumer{ ctx: process.Context(), jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"), eduCache: eduCache, notifier: notifier, diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 640c505c..5bdc0fad 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -65,7 +65,7 @@ func NewOutputRoomEventConsumer( ctx: process.Context(), cfg: cfg, jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), durable: cfg.Matrix.JetStream.Durable("SyncAPIRoomServerConsumer"), db: store, notifier: notifier, diff --git a/syncapi/consumers/userapi.go b/syncapi/consumers/userapi.go index a3b2dd53..010fa7c8 100644 --- a/syncapi/consumers/userapi.go +++ b/syncapi/consumers/userapi.go @@ -56,7 +56,7 @@ func NewOutputNotificationDataConsumer( ctx: process.Context(), jetstream: js, durable: cfg.Matrix.JetStream.Durable("SyncAPINotificationDataConsumer"), - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputNotificationData), db: store, notifier: notifier, stream: stream, diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index f1f82722..ed8118bf 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -67,18 +67,18 @@ func AddPublicRoutes( userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{ JetStream: js, - Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent), + Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent), } userAPIReadUpdateProducer := &producers.UserAPIReadProducer{ JetStream: js, - Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate), + Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate), } _ = userAPIReadUpdateProducer keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( - process, cfg, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), + process, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), js, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, ) diff --git a/userapi/consumers/syncapi_readupdate.go b/userapi/consumers/syncapi_readupdate.go index 2e58020b..067f9333 100644 --- a/userapi/consumers/syncapi_readupdate.go +++ b/userapi/consumers/syncapi_readupdate.go @@ -47,7 +47,7 @@ func NewOutputReadUpdateConsumer( db: store, ServerName: cfg.Matrix.ServerName, durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIReadUpdateConsumer"), - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate), pgClient: pgClient, userAPI: userAPI, syncProducer: syncProducer, diff --git a/userapi/consumers/syncapi_streamevent.go b/userapi/consumers/syncapi_streamevent.go index 11081327..da3cd393 100644 --- a/userapi/consumers/syncapi_streamevent.go +++ b/userapi/consumers/syncapi_streamevent.go @@ -54,7 +54,7 @@ func NewOutputStreamEventConsumer( jetstream: js, db: store, durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"), - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent), pgClient: pgClient, userAPI: userAPI, rsAPI: rsAPI, diff --git a/userapi/userapi.go b/userapi/userapi.go index 97bdf7b2..e91ce3a7 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -54,8 +54,8 @@ func NewInternalAPI( // it's handled by clientapi, and hence uses its topic. When user // API handles it for all account data, we can remove it from // here. - cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), - cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData), + cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), + cfg.Matrix.JetStream.Prefixed(jetstream.OutputNotificationData), ) userAPI := &internal.UserInternalAPI{