diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index ff159bee..97bcc12a 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -63,6 +63,7 @@ func AddPublicRoutes( TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), + TopicDeviceListUpdate: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate), ServerName: cfg.Matrix.ServerName, UserAPI: userAPI, } diff --git a/federationapi/producers/syncapi.go b/federationapi/producers/syncapi.go index 49415003..6453d026 100644 --- a/federationapi/producers/syncapi.go +++ b/federationapi/producers/syncapi.go @@ -17,6 +17,7 @@ package producers import ( "context" "encoding/json" + "fmt" "strconv" "time" @@ -34,6 +35,7 @@ type SyncAPIProducer struct { TopicSendToDeviceEvent string TopicTypingEvent string TopicPresenceEvent string + TopicDeviceListUpdate string JetStream nats.JetStreamContext ServerName gomatrixserverlib.ServerName UserAPI userapi.UserInternalAPI @@ -161,3 +163,18 @@ func (p *SyncAPIProducer) SendPresence( _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) return err } + +func (p *SyncAPIProducer) SendDeviceListUpdate( + ctx context.Context, deviceListUpdate *gomatrixserverlib.DeviceListUpdateEvent, +) (err error) { + m := nats.NewMsg(p.TopicDeviceListUpdate) + m.Header.Set(jetstream.UserID, deviceListUpdate.UserID) + m.Data, err = json.Marshal(deviceListUpdate) + if err != nil { + return fmt.Errorf("json.Marshal: %w", err) + } + + log.Debugf("Sending device list update: %+v", m.Header) + _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)) + return err +} diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index c25dabce..43003be3 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -501,11 +501,7 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli } else if serverName != t.Origin { return } - var inputRes keyapi.InputDeviceListUpdateResponse - t.keyAPI.InputDeviceListUpdate(context.Background(), &keyapi.InputDeviceListUpdateRequest{ - Event: payload, - }, &inputRes) - if inputRes.Error != nil { - util.GetLogger(ctx).WithError(inputRes.Error).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate") + if err := t.producer.SendDeviceListUpdate(ctx, &payload); err != nil { + util.GetLogger(ctx).WithError(err).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate") } } diff --git a/keyserver/api/api.go b/keyserver/api/api.go index 140f0356..c0a1eedb 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -62,8 +62,6 @@ type FederationKeyAPI interface { QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse) QuerySignatures(ctx context.Context, req *QuerySignaturesRequest, res *QuerySignaturesResponse) QueryDeviceMessages(ctx context.Context, req *QueryDeviceMessagesRequest, res *QueryDeviceMessagesResponse) - // InputDeviceListUpdate from a federated server EDU - InputDeviceListUpdate(ctx context.Context, req *InputDeviceListUpdateRequest, res *InputDeviceListUpdateResponse) PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse) PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse) } @@ -337,11 +335,3 @@ type QuerySignaturesResponse struct { // The request error, if any Error *KeyError } - -type InputDeviceListUpdateRequest struct { - Event gomatrixserverlib.DeviceListUpdateEvent -} - -type InputDeviceListUpdateResponse struct { - Error *KeyError -} diff --git a/keyserver/consumers/devicelistupdate.go b/keyserver/consumers/devicelistupdate.go new file mode 100644 index 00000000..f4f24628 --- /dev/null +++ b/keyserver/consumers/devicelistupdate.go @@ -0,0 +1,82 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + + "github.com/matrix-org/dendrite/keyserver/internal" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" +) + +// DeviceListUpdateConsumer consumes device list updates that came in over federation. +type DeviceListUpdateConsumer struct { + ctx context.Context + jetstream nats.JetStreamContext + durable string + topic string + updater *internal.DeviceListUpdater +} + +// NewDeviceListUpdateConsumer creates a new DeviceListConsumer. Call Start() to begin consuming from key servers. +func NewDeviceListUpdateConsumer( + process *process.ProcessContext, + cfg *config.KeyServer, + js nats.JetStreamContext, + updater *internal.DeviceListUpdater, +) *DeviceListUpdateConsumer { + return &DeviceListUpdateConsumer{ + ctx: process.Context(), + jetstream: js, + durable: cfg.Matrix.JetStream.Prefixed("KeyServerInputDeviceListConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate), + updater: updater, + } +} + +// Start consuming from key servers +func (t *DeviceListUpdateConsumer) Start() error { + return jetstream.JetStreamConsumer( + t.ctx, t.jetstream, t.topic, t.durable, t.onMessage, + nats.DeliverAll(), nats.ManualAck(), + ) +} + +// onMessage is called in response to a message received on the +// key change events topic from the key server. +func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { + var m gomatrixserverlib.DeviceListUpdateEvent + if err := json.Unmarshal(msg.Data, &m); err != nil { + logrus.WithError(err).Errorf("Failed to read from device list update input topic") + return true + } + err := t.updater.Update(ctx, m) + if err != nil { + logrus.WithFields(logrus.Fields{ + "user_id": m.UserID, + "device_id": m.DeviceID, + "stream_id": m.StreamID, + "prev_id": m.PrevID, + }).WithError(err).Errorf("Failed to update device list") + return false + } + return true +} diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index f8d0d69c..c146b2aa 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -47,17 +47,6 @@ func (a *KeyInternalAPI) SetUserAPI(i userapi.KeyserverUserAPI) { a.UserAPI = i } -func (a *KeyInternalAPI) InputDeviceListUpdate( - ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse, -) { - err := a.Updater.Update(ctx, req.Event) - if err != nil { - res.Error = &api.KeyError{ - Err: fmt.Sprintf("failed to update device list: %s", err), - } - } -} - func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) { userIDs, latest, err := a.DB.KeyChanges(ctx, req.Offset, req.ToOffset) if err != nil { diff --git a/keyserver/inthttp/client.go b/keyserver/inthttp/client.go index abce8158..dac61d1e 100644 --- a/keyserver/inthttp/client.go +++ b/keyserver/inthttp/client.go @@ -63,20 +63,6 @@ type httpKeyInternalAPI struct { func (h *httpKeyInternalAPI) SetUserAPI(i userapi.KeyserverUserAPI) { // no-op: doesn't need it } -func (h *httpKeyInternalAPI) InputDeviceListUpdate( - ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse, -) { - span, ctx := opentracing.StartSpanFromContext(ctx, "InputDeviceListUpdate") - defer span.Finish() - - apiURL := h.apiURL + InputDeviceListUpdatePath - err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) - if err != nil { - res.Error = &api.KeyError{ - Err: err.Error(), - } - } -} func (h *httpKeyInternalAPI) PerformClaimKeys( ctx context.Context, diff --git a/keyserver/inthttp/server.go b/keyserver/inthttp/server.go index 8d557a76..5bf5976a 100644 --- a/keyserver/inthttp/server.go +++ b/keyserver/inthttp/server.go @@ -25,17 +25,6 @@ import ( ) func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) { - internalAPIMux.Handle(InputDeviceListUpdatePath, - httputil.MakeInternalAPI("inputDeviceListUpdate", func(req *http.Request) util.JSONResponse { - request := api.InputDeviceListUpdateRequest{} - response := api.InputDeviceListUpdateResponse{} - if err := json.NewDecoder(req.Body).Decode(&request); err != nil { - return util.MessageResponse(http.StatusBadRequest, err.Error()) - } - s.InputDeviceListUpdate(req.Context(), &request, &response) - return util.JSONResponse{Code: http.StatusOK, JSON: &response} - }), - ) internalAPIMux.Handle(PerformClaimKeysPath, httputil.MakeInternalAPI("performClaimKeys", func(req *http.Request) util.JSONResponse { request := api.PerformClaimKeysRequest{} diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 3ffd3ba1..cd506f98 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -18,6 +18,7 @@ import ( "github.com/gorilla/mux" fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/keyserver/consumers" "github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/inthttp" "github.com/matrix-org/dendrite/keyserver/producers" @@ -59,10 +60,17 @@ func NewInternalAPI( updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable ap.Updater = updater go func() { - if err := updater.Start(); err != nil { + if err = updater.Start(); err != nil { logrus.WithError(err).Panicf("failed to start device list updater") } }() + dlConsumer := consumers.NewDeviceListUpdateConsumer( + base.ProcessContext, cfg, js, updater, + ) + if err = dlConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start device list consumer") + } + return ap } diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 6594e694..110808b1 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -16,6 +16,7 @@ const ( var ( InputRoomEvent = "InputRoomEvent" + InputDeviceListUpdate = "InputDeviceListUpdate" OutputRoomEvent = "OutputRoomEvent" OutputSendToDeviceEvent = "OutputSendToDeviceEvent" OutputKeyChangeEvent = "OutputKeyChangeEvent" @@ -45,6 +46,11 @@ var streams = []*nats.StreamConfig{ Retention: nats.InterestPolicy, Storage: nats.FileStorage, }, + { + Name: InputDeviceListUpdate, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, { Name: OutputRoomEvent, Retention: nats.InterestPolicy, diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index d9fb9cf8..219b35e2 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -43,9 +43,6 @@ func (k *mockKeyAPI) QueryOneTimeKeys(ctx context.Context, req *keyapi.QueryOneT } func (k *mockKeyAPI) QueryDeviceMessages(ctx context.Context, req *keyapi.QueryDeviceMessagesRequest, res *keyapi.QueryDeviceMessagesResponse) { -} -func (k *mockKeyAPI) InputDeviceListUpdate(ctx context.Context, req *keyapi.InputDeviceListUpdateRequest, res *keyapi.InputDeviceListUpdateResponse) { - } func (k *mockKeyAPI) QuerySignatures(ctx context.Context, req *keyapi.QuerySignaturesRequest, res *keyapi.QuerySignaturesResponse) { }