diff --git a/appservice/appservice.go b/appservice/appservice.go index 924a609e..7e7c67f5 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -58,7 +58,7 @@ func NewInternalAPI( }, }, } - js, _, _ := jetstream.Prepare(&base.Cfg.Global.JetStream) + js := jetstream.Prepare(&base.Cfg.Global.JetStream) // Create a connection to the appservice postgres DB appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database) diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 7c772125..d678ada9 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -49,7 +49,7 @@ func AddPublicRoutes( extRoomsProvider api.ExtraPublicRoomsProvider, mscCfg *config.MSCs, ) { - js, _, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js := jetstream.Prepare(&cfg.Matrix.JetStream) syncProducer := &producers.SyncAPIProducer{ JetStream: js, diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index db03001b..febcf286 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -42,7 +42,7 @@ func NewInternalAPI( ) api.EDUServerInputAPI { cfg := &base.Cfg.EDUServer - js, _, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js := jetstream.Prepare(&cfg.Matrix.JetStream) return &input.EDUServerInputAPI{ Cache: eduCache, diff --git a/federationapi/consumers/keychange.go b/federationapi/consumers/keychange.go index 6a737d0a..1ec9f4c1 100644 --- a/federationapi/consumers/keychange.go +++ b/federationapi/consumers/keychange.go @@ -17,80 +17,73 @@ package consumers import ( "context" "encoding/json" - "fmt" - "github.com/Shopify/sarama" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/federationapi/queue" "github.com/matrix-org/dendrite/federationapi/storage" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" "github.com/sirupsen/logrus" ) // KeyChangeConsumer consumes events that originate in key server. type KeyChangeConsumer struct { ctx context.Context - consumer *internal.ContinualConsumer + jetstream nats.JetStreamContext + durable string db storage.Database queues *queue.OutgoingQueues serverName gomatrixserverlib.ServerName rsAPI roomserverAPI.RoomserverInternalAPI + topic string } // NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers. func NewKeyChangeConsumer( process *process.ProcessContext, cfg *config.KeyServer, - kafkaConsumer sarama.Consumer, + js nats.JetStreamContext, queues *queue.OutgoingQueues, store storage.Database, rsAPI roomserverAPI.RoomserverInternalAPI, ) *KeyChangeConsumer { - c := &KeyChangeConsumer{ - ctx: process.Context(), - consumer: &internal.ContinualConsumer{ - Process: process, - ComponentName: "federationapi/keychange", - Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), - Consumer: kafkaConsumer, - PartitionStore: store, - }, + return &KeyChangeConsumer{ + ctx: process.Context(), + jetstream: js, + durable: cfg.Matrix.JetStream.TopicFor("FederationAPIKeyChangeConsumer"), + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), queues: queues, db: store, serverName: cfg.Matrix.ServerName, rsAPI: rsAPI, } - c.consumer.ProcessMessage = c.onMessage - - return c } // Start consuming from key servers func (t *KeyChangeConsumer) Start() error { - if err := t.consumer.Start(); err != nil { - return fmt.Errorf("t.consumer.Start: %w", err) - } - return nil + return jetstream.JetStreamConsumer( + t.ctx, t.jetstream, t.topic, t.durable, t.onMessage, + nats.DeliverAll(), nats.ManualAck(), + ) } // onMessage is called in response to a message received on the // key change events topic from the key server. -func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error { +func (t *KeyChangeConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { var m api.DeviceMessage - if err := json.Unmarshal(msg.Value, &m); err != nil { + if err := json.Unmarshal(msg.Data, &m); err != nil { logrus.WithError(err).Errorf("failed to read device message from key change topic") - return nil + return true } if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil { // This probably shouldn't happen but stops us from panicking if we come // across an update that doesn't satisfy either types. - return nil + return true } switch m.Type { case api.TypeCrossSigningUpdate: @@ -102,9 +95,9 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error { } } -func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error { +func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool { if m.DeviceKeys == nil { - return nil + return true } logger := logrus.WithField("user_id", m.UserID) @@ -112,10 +105,10 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error { _, originServerName, err := gomatrixserverlib.SplitID('@', m.UserID) if err != nil { logger.WithError(err).Error("Failed to extract domain from key change event") - return nil + return true } if originServerName != t.serverName { - return nil + return true } var queryRes roomserverAPI.QueryRoomsForUserResponse @@ -125,13 +118,13 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error { }, &queryRes) if err != nil { logger.WithError(err).Error("failed to calculate joined rooms for user") - return nil + return true } // send this key change to all servers who share rooms with this user. destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true) if err != nil { logger.WithError(err).Error("failed to calculate joined hosts for rooms user is in") - return nil + return true } // Pack the EDU and marshal it @@ -149,24 +142,26 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error { Keys: m.KeyJSON, } if edu.Content, err = json.Marshal(event); err != nil { - return err + logger.WithError(err).Error("failed to marshal EDU JSON") + return true } - logrus.Infof("Sending device list update message to %q", destinations) - return t.queues.SendEDU(edu, t.serverName, destinations) + logger.Infof("Sending device list update message to %q", destinations) + err = t.queues.SendEDU(edu, t.serverName, destinations) + return err == nil } -func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error { +func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool { output := m.CrossSigningKeyUpdate _, host, err := gomatrixserverlib.SplitID('@', output.UserID) if err != nil { logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure") - return nil + return true } if host != gomatrixserverlib.ServerName(t.serverName) { // Ignore any messages that didn't originate locally, otherwise we'll // end up parroting information we received from other servers. - return nil + return true } logger := logrus.WithField("user_id", output.UserID) @@ -177,13 +172,13 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error { }, &queryRes) if err != nil { logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined rooms for user") - return nil + return true } // send this key change to all servers who share rooms with this user. destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true) if err != nil { logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined hosts for rooms user is in") - return nil + return true } // Pack the EDU and marshal it @@ -193,11 +188,12 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error { } if edu.Content, err = json.Marshal(output); err != nil { logger.WithError(err).Error("fedsender key change consumer: failed to marshal output, dropping") - return nil + return true } logger.Infof("Sending cross-signing update message to %q", destinations) - return t.queues.SendEDU(edu, t.serverName, destinations) + err = t.queues.SendEDU(edu, t.serverName, destinations) + return err == nil } func prevID(streamID int) []int { diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index ac29f930..e9862000 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -114,6 +114,11 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) } } + case api.OutputTypeNewInviteEvent: + log.WithField("type", output.Type).Debug( + "received new invite, send device keys", + ) + case api.OutputTypeNewInboundPeek: if err := s.processInboundPeek(*output.NewInboundPeek); err != nil { log.WithFields(log.Fields{ diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index 63387b9d..a982d800 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -92,7 +92,7 @@ func NewInternalAPI( FailuresUntilBlacklist: cfg.FederationMaxRetries, } - js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js := jetstream.Prepare(&cfg.Matrix.JetStream) queues := queue.NewOutgoingQueues( federationDB, base.ProcessContext, @@ -120,7 +120,7 @@ func NewInternalAPI( logrus.WithError(err).Panic("failed to start typing server consumer") } keyConsumer := consumers.NewKeyChangeConsumer( - base.ProcessContext, &base.Cfg.KeyServer, consumer, queues, federationDB, rsAPI, + base.ProcessContext, &base.Cfg.KeyServer, js, queues, federationDB, rsAPI, ) if err := keyConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start key server consumer") diff --git a/federationapi/storage/interface.go b/federationapi/storage/interface.go index 21a919f6..3fa8d1f7 100644 --- a/federationapi/storage/interface.go +++ b/federationapi/storage/interface.go @@ -19,12 +19,10 @@ import ( "github.com/matrix-org/dendrite/federationapi/storage/shared" "github.com/matrix-org/dendrite/federationapi/types" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/gomatrixserverlib" ) type Database interface { - internal.PartitionStorer gomatrixserverlib.KeyDatabase UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error) diff --git a/go.mod b/go.mod index bd94713a..fc18ce07 100644 --- a/go.mod +++ b/go.mod @@ -11,13 +11,12 @@ require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/MFAshby/stdemuxerhook v1.0.0 github.com/Masterminds/semver/v3 v3.1.1 - github.com/S7evinK/saramajetstream v0.0.0-20210709110708-de6efc8c4a32 - github.com/Shopify/sarama v1.31.0 github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/codeclysm/extract v2.2.0+incompatible github.com/containerd/containerd v1.5.9 // indirect github.com/docker/docker v20.10.12+incompatible github.com/docker/go-connections v0.4.0 + github.com/frankban/quicktest v1.14.0 // indirect github.com/getsentry/sentry-go v0.12.0 github.com/gologme/log v1.3.0 github.com/gorilla/mux v1.8.0 @@ -74,6 +73,7 @@ require ( golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 gopkg.in/h2non/bimg.v1 v1.1.5 gopkg.in/yaml.v2 v2.4.0 + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect nhooyr.io/websocket v1.8.7 ) diff --git a/go.sum b/go.sum index 038ccef5..3f8a99f4 100644 --- a/go.sum +++ b/go.sum @@ -100,17 +100,8 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko github.com/RoaringBitmap/roaring v0.4.7/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w= github.com/RyanCarrier/dijkstra v1.0.0/go.mod h1:5agGUBNEtUAGIANmbw09fuO3a2htPEkc1jNH01qxCWA= github.com/RyanCarrier/dijkstra-1 v0.0.0-20170512020943-0e5801a26345/go.mod h1:OK4EvWJ441LQqGzed5NGB6vKBAE34n3z7iayPcEwr30= -github.com/S7evinK/saramajetstream v0.0.0-20210709110708-de6efc8c4a32 h1:i3fOph9Hjleo6LbuqN9ODFxnwt7mOtYMpCGeC8qJN50= -github.com/S7evinK/saramajetstream v0.0.0-20210709110708-de6efc8c4a32/go.mod h1:ne+jkLlzafIzaE4Q0Ze81T27dNgXe1wxovVEoAtSHTc= github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ= -github.com/Shopify/sarama v1.29.0/go.mod h1:2QpgD79wpdAESqNQMxNc0KYMkycd4slxGdV3TWSVqrU= -github.com/Shopify/sarama v1.31.0 h1:gObk7jCPutDxf+E6GA5G21noAZsi1SvP9ftCQYqpzus= -github.com/Shopify/sarama v1.31.0/go.mod h1:BeW3gXRc/CxgAsrSly2RE9nIXUfC9ezb7QHBPVhvzjI= -github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= -github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= -github.com/Shopify/toxiproxy/v2 v2.3.0 h1:62YkpiP4bzdhKMH+6uC5E95y608k3zDwdzuBMsnn3uQ= -github.com/Shopify/toxiproxy/v2 v2.3.0/go.mod h1:KvQTtB6RjCJY4zqNJn7C7JDFgsG5uoHYDirfUfpIm0c= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= @@ -353,12 +344,6 @@ github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3 github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= -github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= -github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= -github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= -github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= -github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= @@ -379,8 +364,6 @@ github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 h1:u/UEqS66A5ckRmS4yNp github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= -github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= -github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY= github.com/frankban/quicktest v1.0.0/go.mod h1:R98jIehRai+d1/3Hv2//jOVCTJhW1VBavT6B6CuGq2k= github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o= @@ -498,8 +481,6 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= -github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gologme/log v1.2.0/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U= github.com/gologme/log v1.3.0 h1:l781G4dE+pbigClDSDzSaaYKtiueHCILUa/qSDsmHAo= github.com/gologme/log v1.3.0/go.mod h1:yKT+DvIPdDdDoPtqFrFxheooyVmoqi0BAsw+erN3wA4= @@ -552,8 +533,6 @@ github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33/go.mod h1:Qkdc/uu github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= -github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= @@ -580,8 +559,6 @@ github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHh github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= -github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= -github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -667,18 +644,6 @@ github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsj github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= -github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= -github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= -github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= -github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= -github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= -github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= -github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= -github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= -github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJzodkA= -github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= -github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= -github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= @@ -747,10 +712,7 @@ github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0 github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= -github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= -github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw= github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= @@ -1250,9 +1212,6 @@ github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/9 github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= -github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= -github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -1305,8 +1264,6 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= -github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= @@ -1432,7 +1389,6 @@ github.com/urfave/cli v0.0.0-20171014202726-7bc6a0acffa5/go.mod h1:70zkFmudgCuE/ github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= -github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBnvPM1Su9w= @@ -1463,11 +1419,6 @@ github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPyS github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= -github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= -github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= -github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= -github.com/xdg/scram v1.0.3/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= -github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v0.0.0-20180618132009-1d523034197f/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs= @@ -1550,7 +1501,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= @@ -1655,7 +1605,6 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20210427231257-85d9c07bbe3a/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= @@ -1664,7 +1613,6 @@ golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20210927181540-4e4d966f7476/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211008194852-3b03d305991f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20220105145211-5b0dc2dfae98/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1798,7 +1746,6 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7-0.20210503195748-5c7c50ebbd4f/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= @@ -2034,7 +1981,6 @@ gopkg.in/yaml.v2 v2.0.0-20170712054546-1be3d31502d6/go.mod h1:JAlM8MvJe8wmxCU4Bl gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/consumers.go b/internal/consumers.go deleted file mode 100644 index 3a4e0b7f..00000000 --- a/internal/consumers.go +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package internal - -import ( - "context" - "fmt" - - "github.com/Shopify/sarama" - "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/setup/process" - "github.com/sirupsen/logrus" -) - -// A PartitionStorer has the storage APIs needed by the consumer. -type PartitionStorer interface { - // PartitionOffsets returns the offsets the consumer has reached for each partition. - PartitionOffsets(ctx context.Context, topic string) ([]sqlutil.PartitionOffset, error) - // SetPartitionOffset records where the consumer has reached for a partition. - SetPartitionOffset(ctx context.Context, topic string, partition int32, offset int64) error -} - -// A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to -// remember the offset it reached. -type ContinualConsumer struct { - // The parent context for the listener, stop consuming when this context is done - Process *process.ProcessContext - // The component name - ComponentName string - // The kafkaesque topic to consume events from. - // This is the name used in kafka to identify the stream to consume events from. - Topic string - // A kafkaesque stream consumer providing the APIs for talking to the event source. - // The interface is taken from a client library for Apache Kafka. - // But any equivalent event streaming protocol could be made to implement the same interface. - Consumer sarama.Consumer - // A thing which can load and save partition offsets for a topic. - PartitionStore PartitionStorer - // ProcessMessage is a function which will be called for each message in the log. Return an error to - // stop processing messages. See ErrShutdown for specific control signals. - ProcessMessage func(msg *sarama.ConsumerMessage) error - // ShutdownCallback is called when ProcessMessage returns ErrShutdown, after the partition has been saved. - // It is optional. - ShutdownCallback func() -} - -// ErrShutdown can be returned from ContinualConsumer.ProcessMessage to stop the ContinualConsumer. -var ErrShutdown = fmt.Errorf("shutdown") - -// Start starts the consumer consuming. -// Starts up a goroutine for each partition in the kafka stream. -// Returns nil once all the goroutines are started. -// Returns an error if it can't start consuming for any of the partitions. -func (c *ContinualConsumer) Start() error { - _, err := c.StartOffsets() - return err -} - -// StartOffsets is the same as Start but returns the loaded offsets as well. -func (c *ContinualConsumer) StartOffsets() ([]sqlutil.PartitionOffset, error) { - offsets := map[int32]int64{} - - partitions, err := c.Consumer.Partitions(c.Topic) - if err != nil { - return nil, err - } - for _, partition := range partitions { - // Default all the offsets to the beginning of the stream. - offsets[partition] = sarama.OffsetOldest - } - - storedOffsets, err := c.PartitionStore.PartitionOffsets(context.TODO(), c.Topic) - if err != nil { - return nil, err - } - for _, offset := range storedOffsets { - // We've already processed events from this partition so advance the offset to where we got to. - // ConsumePartition will start streaming from the message with the given offset (inclusive), - // so increment 1 to avoid getting the same message a second time. - offsets[offset.Partition] = 1 + offset.Offset - } - - var partitionConsumers []sarama.PartitionConsumer - for partition, offset := range offsets { - pc, err := c.Consumer.ConsumePartition(c.Topic, partition, offset) - if err != nil { - for _, p := range partitionConsumers { - p.Close() // nolint: errcheck - } - return nil, err - } - partitionConsumers = append(partitionConsumers, pc) - } - for _, pc := range partitionConsumers { - go c.consumePartition(pc) - if c.Process != nil { - c.Process.ComponentStarted() - go func(pc sarama.PartitionConsumer) { - <-c.Process.WaitForShutdown() - _ = pc.Close() - c.Process.ComponentFinished() - logrus.Infof("Stopped consumer for %q topic %q", c.ComponentName, c.Topic) - }(pc) - } - } - - return storedOffsets, nil -} - -// consumePartition consumes the room events for a single partition of the kafkaesque stream. -func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) { - defer pc.Close() // nolint: errcheck - for message := range pc.Messages() { - msgErr := c.ProcessMessage(message) - // Advance our position in the stream so that we will start at the right position after a restart. - if err := c.PartitionStore.SetPartitionOffset(context.TODO(), c.Topic, message.Partition, message.Offset); err != nil { - panic(fmt.Errorf("the ContinualConsumer in %q failed to SetPartitionOffset: %w", c.ComponentName, err)) - } - // Shutdown if we were told to do so. - if msgErr == ErrShutdown { - if c.ShutdownCallback != nil { - c.ShutdownCallback() - } - return - } - } -} diff --git a/keyserver/api/api.go b/keyserver/api/api.go index 0eea2f0f..3933961c 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -228,7 +228,7 @@ type QueryKeyChangesRequest struct { // The offset of the last received key event, or sarama.OffsetOldest if this is from the beginning Offset int64 // The inclusive offset where to track key changes up to. Messages with this offset are included in the response. - // Use sarama.OffsetNewest if the offset is unknown (then check the response Offset to avoid racing). + // Use types.OffsetNewest if the offset is unknown (then check the response Offset to avoid racing). ToOffset int64 } diff --git a/keyserver/consumers/cross_signing.go b/keyserver/consumers/cross_signing.go index 4b2bd4a9..a533006f 100644 --- a/keyserver/consumers/cross_signing.go +++ b/keyserver/consumers/cross_signing.go @@ -18,29 +18,30 @@ import ( "context" "encoding/json" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/storage" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" "github.com/sirupsen/logrus" - - "github.com/Shopify/sarama" ) type OutputCrossSigningKeyUpdateConsumer struct { - eduServerConsumer *internal.ContinualConsumer - keyDB storage.Database - keyAPI api.KeyInternalAPI - serverName string + ctx context.Context + keyDB storage.Database + keyAPI api.KeyInternalAPI + serverName string + jetstream nats.JetStreamContext + durable string + topic string } func NewOutputCrossSigningKeyUpdateConsumer( process *process.ProcessContext, cfg *config.Dendrite, - kafkaConsumer sarama.Consumer, + js nats.JetStreamContext, keyDB storage.Database, keyAPI api.KeyInternalAPI, ) *OutputCrossSigningKeyUpdateConsumer { @@ -48,60 +49,58 @@ func NewOutputCrossSigningKeyUpdateConsumer( // topic. We will only produce events where the UserID matches our server name, // and we will only consume events where the UserID does NOT match our server // name (because the update came from a remote server). - consumer := internal.ContinualConsumer{ - Process: process, - ComponentName: "keyserver/keyserver", - Topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), - Consumer: kafkaConsumer, - PartitionStore: keyDB, - } s := &OutputCrossSigningKeyUpdateConsumer{ - eduServerConsumer: &consumer, - keyDB: keyDB, - keyAPI: keyAPI, - serverName: string(cfg.Global.ServerName), + ctx: process.Context(), + keyDB: keyDB, + jetstream: js, + durable: cfg.Global.JetStream.Durable("KeyServerCrossSigningConsumer"), + topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), + keyAPI: keyAPI, + serverName: string(cfg.Global.ServerName), } - consumer.ProcessMessage = s.onMessage return s } func (s *OutputCrossSigningKeyUpdateConsumer) Start() error { - return s.eduServerConsumer.Start() + return jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), + ) } // onMessage is called in response to a message received on the // key change events topic from the key server. -func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { +func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { var m api.DeviceMessage - if err := json.Unmarshal(msg.Value, &m); err != nil { + if err := json.Unmarshal(msg.Data, &m); err != nil { logrus.WithError(err).Errorf("failed to read device message from key change topic") - return nil + return true } if m.OutputCrossSigningKeyUpdate == nil { // This probably shouldn't happen but stops us from panicking if we come // across an update that doesn't satisfy either types. - return nil + return true } switch m.Type { case api.TypeCrossSigningUpdate: return t.onCrossSigningMessage(m) default: - return nil + return true } } -func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) error { +func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) bool { output := m.CrossSigningKeyUpdate _, host, err := gomatrixserverlib.SplitID('@', output.UserID) if err != nil { logrus.WithError(err).Errorf("eduserver output log: user ID parse failure") - return nil + return true } if host == gomatrixserverlib.ServerName(s.serverName) { // Ignore any messages that contain information about our own users, as // they already originated from this server. - return nil + return true } uploadReq := &api.PerformUploadDeviceKeysRequest{ UserID: output.UserID, @@ -114,5 +113,8 @@ func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.Device } uploadRes := &api.PerformUploadDeviceKeysResponse{} s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes) - return uploadRes.Error + if uploadRes.Error != nil { + return false + } + return true } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 8cc50ea0..61ccc030 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -40,7 +40,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { func NewInternalAPI( base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, ) api.KeyInternalAPI { - js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js := jetstream.Prepare(&cfg.Matrix.JetStream) db, err := storage.NewDatabase(&cfg.Database) if err != nil { @@ -66,7 +66,7 @@ func NewInternalAPI( }() keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer( - base.ProcessContext, base.Cfg, consumer, db, ap, + base.ProcessContext, base.Cfg, js, db, ap, ) if err := keyconsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer") diff --git a/keyserver/storage/interface.go b/keyserver/storage/interface.go index 87feae47..0110860e 100644 --- a/keyserver/storage/interface.go +++ b/keyserver/storage/interface.go @@ -18,15 +18,12 @@ import ( "context" "encoding/json" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/types" "github.com/matrix-org/gomatrixserverlib" ) type Database interface { - internal.PartitionStorer - // ExistingOneTimeKeys returns a map of keyIDWithAlgorithm to key JSON for the given parameters. If no keys exist with this combination // of user/device/key/algorithm 4-uple then it is omitted from the map. Returns an error when failing to communicate with the database. ExistingOneTimeKeys(ctx context.Context, userID, deviceID string, keyIDsWithAlgorithms []string) (map[string]json.RawMessage, error) @@ -71,7 +68,7 @@ type Database interface { StoreKeyChange(ctx context.Context, userID string) (int64, error) // KeyChanges returns a list of user IDs who have modified their keys from the offset given (exclusive) to the offset given (inclusive). - // A to offset of sarama.OffsetNewest means no upper limit. + // A to offset of types.OffsetNewest means no upper limit. // Returns the offset of the latest key change. KeyChanges(ctx context.Context, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) diff --git a/keyserver/storage/postgres/key_changes_table.go b/keyserver/storage/postgres/key_changes_table.go index 20d227c2..f93a94bd 100644 --- a/keyserver/storage/postgres/key_changes_table.go +++ b/keyserver/storage/postgres/key_changes_table.go @@ -17,9 +17,7 @@ package postgres import ( "context" "database/sql" - "math" - "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/storage/tables" ) @@ -78,9 +76,6 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, userID strin func (s *keyChangesStatements) SelectKeyChanges( ctx context.Context, fromOffset, toOffset int64, ) (userIDs []string, latestOffset int64, err error) { - if toOffset == sarama.OffsetNewest { - toOffset = math.MaxInt64 - } latestOffset = fromOffset rows, err := s.selectKeyChangesStmt.QueryContext(ctx, fromOffset, toOffset) if err != nil { diff --git a/keyserver/storage/sqlite3/key_changes_table.go b/keyserver/storage/sqlite3/key_changes_table.go index d43c15ca..e035e8c9 100644 --- a/keyserver/storage/sqlite3/key_changes_table.go +++ b/keyserver/storage/sqlite3/key_changes_table.go @@ -17,9 +17,7 @@ package sqlite3 import ( "context" "database/sql" - "math" - "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/storage/tables" ) @@ -76,9 +74,6 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, userID strin func (s *keyChangesStatements) SelectKeyChanges( ctx context.Context, fromOffset, toOffset int64, ) (userIDs []string, latestOffset int64, err error) { - if toOffset == sarama.OffsetNewest { - toOffset = math.MaxInt64 - } latestOffset = fromOffset rows, err := s.selectKeyChangesStmt.QueryContext(ctx, fromOffset, toOffset) if err != nil { diff --git a/keyserver/storage/storage_test.go b/keyserver/storage/storage_test.go index 2f8cf809..c4c99d8c 100644 --- a/keyserver/storage/storage_test.go +++ b/keyserver/storage/storage_test.go @@ -9,8 +9,8 @@ import ( "reflect" "testing" - "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/keyserver/types" "github.com/matrix-org/dendrite/setup/config" ) @@ -50,7 +50,7 @@ func TestKeyChanges(t *testing.T) { MustNotError(t, err) deviceChangeIDC, err := db.StoreKeyChange(ctx, "@charlie:localhost") MustNotError(t, err) - userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDB, sarama.OffsetNewest) + userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDB, types.OffsetNewest) if err != nil { t.Fatalf("Failed to KeyChanges: %s", err) } @@ -74,7 +74,7 @@ func TestKeyChangesNoDupes(t *testing.T) { } deviceChangeID, err := db.StoreKeyChange(ctx, "@alice:localhost") MustNotError(t, err) - userIDs, latest, err := db.KeyChanges(ctx, 0, sarama.OffsetNewest) + userIDs, latest, err := db.KeyChanges(ctx, 0, types.OffsetNewest) if err != nil { t.Fatalf("Failed to KeyChanges: %s", err) } diff --git a/keyserver/storage/tables/interface.go b/keyserver/storage/tables/interface.go index 0d94c94c..e44757e1 100644 --- a/keyserver/storage/tables/interface.go +++ b/keyserver/storage/tables/interface.go @@ -46,7 +46,7 @@ type DeviceKeys interface { type KeyChanges interface { InsertKeyChange(ctx context.Context, userID string) (int64, error) // SelectKeyChanges returns the set (de-duplicated) of users who have changed their keys between the two offsets. - // Results are exclusive of fromOffset and inclusive of toOffset. A toOffset of sarama.OffsetNewest means no upper offset. + // Results are exclusive of fromOffset and inclusive of toOffset. A toOffset of types.OffsetNewest means no upper offset. SelectKeyChanges(ctx context.Context, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) Prepare() error diff --git a/keyserver/types/storage.go b/keyserver/types/storage.go index 3480ec65..7fb90454 100644 --- a/keyserver/types/storage.go +++ b/keyserver/types/storage.go @@ -14,7 +14,18 @@ package types -import "github.com/matrix-org/gomatrixserverlib" +import ( + "math" + + "github.com/matrix-org/gomatrixserverlib" +) + +const ( + // OffsetNewest tells e.g. the database to get the most current data + OffsetNewest int64 = math.MaxInt64 + // OffsetOldest tells e.g. the database to get the oldest data + OffsetOldest int64 = 0 +) // KeyTypePurposeToInt maps a purpose to an integer, which is used in the // database to reduce the amount of space taken up by this column. diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 669957be..e1b84b80 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -50,7 +50,7 @@ func NewInternalAPI( logrus.WithError(err).Panicf("failed to connect to room server db") } - js, _, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js := jetstream.Prepare(&cfg.Matrix.JetStream) return internal.NewRoomserverAPI( cfg, roomserverDB, js, diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 5d7937b5..77ad2b72 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -5,20 +5,17 @@ import ( "sync" "time" - "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/setup/config" "github.com/sirupsen/logrus" - saramajs "github.com/S7evinK/saramajetstream" natsserver "github.com/nats-io/nats-server/v2/server" - "github.com/nats-io/nats.go" natsclient "github.com/nats-io/nats.go" ) var natsServer *natsserver.Server var natsServerMutex sync.Mutex -func Prepare(cfg *config.JetStream) (nats.JetStreamContext, sarama.Consumer, sarama.SyncProducer) { +func Prepare(cfg *config.JetStream) natsclient.JetStreamContext { // check if we need an in-process NATS Server if len(cfg.Addresses) != 0 { return setupNATS(cfg, nil) @@ -52,20 +49,20 @@ func Prepare(cfg *config.JetStream) (nats.JetStreamContext, sarama.Consumer, sar return setupNATS(cfg, nc) } -func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContext, sarama.Consumer, sarama.SyncProducer) { +func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) natsclient.JetStreamContext { if nc == nil { var err error - nc, err = nats.Connect(strings.Join(cfg.Addresses, ",")) + nc, err = natsclient.Connect(strings.Join(cfg.Addresses, ",")) if err != nil { logrus.WithError(err).Panic("Unable to connect to NATS") - return nil, nil, nil + return nil } } s, err := nc.JetStream() if err != nil { logrus.WithError(err).Panic("Unable to get JetStream context") - return nil, nil, nil + return nil } for _, stream := range streams { // streams are defined in streams.go @@ -80,7 +77,7 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContex // If we're trying to keep everything in memory (e.g. unit tests) // then overwrite the storage policy. if cfg.InMemory { - stream.Storage = nats.MemoryStorage + stream.Storage = natsclient.MemoryStorage } // Namespace the streams without modifying the original streams @@ -93,7 +90,5 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContex } } - consumer := saramajs.NewJetStreamConsumer(nc, s, "") - producer := saramajs.NewJetStreamProducer(nc, s, "") - return s, consumer, producer + return s } diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 97685cc0..e806f76e 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -18,84 +18,81 @@ import ( "context" "encoding/json" - "github.com/Shopify/sarama" "github.com/getsentry/sentry-go" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" "github.com/sirupsen/logrus" ) // OutputKeyChangeEventConsumer consumes events that originated in the key server. type OutputKeyChangeEventConsumer struct { - ctx context.Context - keyChangeConsumer *internal.ContinualConsumer - db storage.Database - notifier *notifier.Notifier - stream types.StreamProvider - serverName gomatrixserverlib.ServerName // our server name - rsAPI roomserverAPI.RoomserverInternalAPI - keyAPI api.KeyInternalAPI + ctx context.Context + jetstream nats.JetStreamContext + durable string + topic string + db storage.Database + notifier *notifier.Notifier + stream types.StreamProvider + serverName gomatrixserverlib.ServerName // our server name + rsAPI roomserverAPI.RoomserverInternalAPI + keyAPI api.KeyInternalAPI } // NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer. // Call Start() to begin consuming from the key server. func NewOutputKeyChangeEventConsumer( process *process.ProcessContext, - serverName gomatrixserverlib.ServerName, + cfg *config.SyncAPI, topic string, - kafkaConsumer sarama.Consumer, + js nats.JetStreamContext, keyAPI api.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, store storage.Database, notifier *notifier.Notifier, stream types.StreamProvider, ) *OutputKeyChangeEventConsumer { - - consumer := internal.ContinualConsumer{ - Process: process, - ComponentName: "syncapi/keychange", - Topic: topic, - Consumer: kafkaConsumer, - PartitionStore: store, - } - s := &OutputKeyChangeEventConsumer{ - ctx: process.Context(), - keyChangeConsumer: &consumer, - db: store, - serverName: serverName, - keyAPI: keyAPI, - rsAPI: rsAPI, - notifier: notifier, - stream: stream, + ctx: process.Context(), + jetstream: js, + durable: cfg.Matrix.JetStream.Durable("SyncAPIKeyChangeConsumer"), + topic: topic, + db: store, + serverName: cfg.Matrix.ServerName, + keyAPI: keyAPI, + rsAPI: rsAPI, + notifier: notifier, + stream: stream, } - consumer.ProcessMessage = s.onMessage - return s } // Start consuming from the key server func (s *OutputKeyChangeEventConsumer) Start() error { - return s.keyChangeConsumer.Start() + return jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), + ) } -func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { +func (s *OutputKeyChangeEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { var m api.DeviceMessage - if err := json.Unmarshal(msg.Value, &m); err != nil { + if err := json.Unmarshal(msg.Data, &m); err != nil { logrus.WithError(err).Errorf("failed to read device message from key change topic") - return nil + return true } if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil { // This probably shouldn't happen but stops us from panicking if we come // across an update that doesn't satisfy either types. - return nil + return true } switch m.Type { case api.TypeCrossSigningUpdate: @@ -107,9 +104,9 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er } } -func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, deviceChangeID int64) error { +func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, deviceChangeID int64) bool { if m.DeviceKeys == nil { - return nil + return true } output := m.DeviceKeys // work out who we need to notify about the new key @@ -120,7 +117,7 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, d if err != nil { logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server") sentry.CaptureException(err) - return err + return true } // make sure we get our own key updates too! queryRes.UserIDsToCount[output.UserID] = 1 @@ -131,10 +128,10 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, d s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID) } - return nil + return true } -func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, deviceChangeID int64) error { +func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, deviceChangeID int64) bool { output := m.CrossSigningKeyUpdate // work out who we need to notify about the new key var queryRes roomserverAPI.QuerySharedUsersResponse @@ -144,7 +141,7 @@ func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage if err != nil { logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server") sentry.CaptureException(err) - return err + return true } // make sure we get our own key updates too! queryRes.UserIDsToCount[output.UserID] = 1 @@ -155,5 +152,5 @@ func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID) } - return nil + return true } diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index 41efd4a0..fa1064b7 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -18,8 +18,8 @@ import ( "context" "strings" - "github.com/Shopify/sarama" keyapi "github.com/matrix-org/dendrite/keyserver/api" + keytypes "github.com/matrix-org/dendrite/keyserver/types" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" @@ -64,8 +64,8 @@ func DeviceListCatchup( } // now also track users who we already share rooms with but who have updated their devices between the two tokens - offset := sarama.OffsetOldest - toOffset := sarama.OffsetNewest + offset := keytypes.OffsetOldest + toOffset := keytypes.OffsetNewest if to > 0 && to > from { toOffset = int64(to) } diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 9cff4cad..b464ad9c 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -19,7 +19,6 @@ import ( eduAPI "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -27,8 +26,6 @@ import ( ) type Database interface { - internal.PartitionStorer - MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 39bc233a..72462459 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -48,7 +48,7 @@ func AddPublicRoutes( federation *gomatrixserverlib.FederationClient, cfg *config.SyncAPI, ) { - js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js := jetstream.Prepare(&cfg.Matrix.JetStream) syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) if err != nil { @@ -65,8 +65,8 @@ func AddPublicRoutes( requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier) keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( - process, cfg.Matrix.ServerName, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), - consumer, keyAPI, rsAPI, syncDB, notifier, + process, cfg, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), + js, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, ) if err = keyChangeConsumer.Start(); err != nil {