diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index fd963ad8..e58f11c1 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -14,6 +14,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" "github.com/sirupsen/logrus" @@ -32,6 +33,7 @@ type RoomserverInternalAPI struct { *perform.Publisher *perform.Backfiller *perform.Forgetter + ProcessContext *process.ProcessContext DB storage.Database Cfg *config.RoomServer Cache caching.RoomServerCaches @@ -48,12 +50,13 @@ type RoomserverInternalAPI struct { } func NewRoomserverAPI( - cfg *config.RoomServer, roomserverDB storage.Database, consumer nats.JetStreamContext, - inputRoomEventTopic, outputRoomEventTopic string, caches caching.RoomServerCaches, - perspectiveServerNames []gomatrixserverlib.ServerName, + processCtx *process.ProcessContext, cfg *config.RoomServer, roomserverDB storage.Database, + consumer nats.JetStreamContext, inputRoomEventTopic, outputRoomEventTopic string, + caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName, ) *RoomserverInternalAPI { serverACLs := acls.NewServerACLs(roomserverDB) a := &RoomserverInternalAPI{ + ProcessContext: processCtx, DB: roomserverDB, Cfg: cfg, Cache: caches, @@ -83,6 +86,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA r.KeyRing = keyRing r.Inputer = &input.Inputer{ + ProcessContext: r.ProcessContext, DB: r.DB, InputRoomEventTopic: r.InputRoomEventTopic, OutputRoomEventTopic: r.OutputRoomEventTopic, diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 5bdec0a2..22e4b67a 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -31,6 +31,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" @@ -59,6 +60,7 @@ var keyContentFields = map[string]string{ } type Inputer struct { + ProcessContext *process.ProcessContext DB storage.Database JetStream nats.JetStreamContext Durable nats.SubOpt @@ -115,7 +117,7 @@ func (r *Inputer) Start() error { _ = msg.InProgress() // resets the acknowledgement wait timer defer eventsInProgress.Delete(index) defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() - action, err := r.processRoomEventUsingUpdater(context.Background(), roomID, &inputRoomEvent) + action, err := r.processRoomEventUsingUpdater(r.ProcessContext.Context(), roomID, &inputRoomEvent) if err != nil { if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { sentry.CaptureException(err) diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index e1b84b80..950c6b4e 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -53,7 +53,7 @@ func NewInternalAPI( js := jetstream.Prepare(&cfg.Matrix.JetStream) return internal.NewRoomserverAPI( - cfg, roomserverDB, js, + base.ProcessContext, cfg, roomserverDB, js, cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent), cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), base.Caches, perspectiveServerNames,