From 49dc49b232432d52c082646cc6f778593f4cb8b4 Mon Sep 17 00:00:00 2001 From: S7evinK <2353100+S7evinK@users.noreply.github.com> Date: Tue, 29 Mar 2022 14:14:35 +0200 Subject: [PATCH] Remove eduserver (#2306) * Move receipt sending to own JetStream producer * Move SendToDevice to producer * Remove most parts of the EDU server * Fix SendToDevice & copyrights * Move structs, cleanup EDU Server traces * Use HeadersOnly subscription * Missing file * Fix linter issues * Move consumers to own files * Rename durable consumer; Consumer cleanup * Docs/config cleanup --- build/docker/config/dendrite.yaml | 6 - build/docker/docker-compose.polylith.yml | 12 - build/gobind-pinecone/monolith.go | 7 - build/gobind-yggdrasil/monolith.go | 17 +- clientapi/clientapi.go | 13 +- clientapi/producers/syncapi.go | 124 ++++++++- clientapi/routing/account_data.go | 5 +- clientapi/routing/receipt.go | 11 +- clientapi/routing/routing.go | 12 +- clientapi/routing/sendtodevice.go | 10 +- clientapi/routing/sendtyping.go | 11 +- cmd/dendrite-demo-libp2p/main.go | 7 - cmd/dendrite-demo-pinecone/main.go | 7 - cmd/dendrite-demo-yggdrasil/main.go | 17 +- cmd/dendrite-monolith-server/main.go | 21 +- cmd/dendrite-polylith-multi/main.go | 1 - .../personalities/clientapi.go | 3 +- .../personalities/eduserver.go | 33 --- .../personalities/federationapi.go | 4 +- cmd/dendritejs-pinecone/main.go | 14 +- cmd/dendritejs/main.go | 4 - dendrite-config.yaml | 6 - docs/INSTALL.md | 8 - eduserver/api/input.go | 103 ------- eduserver/api/output.go | 57 ---- eduserver/api/types.go | 59 ---- eduserver/api/wrapper.go | 88 ------ eduserver/eduserver.go | 56 ---- eduserver/input/input.go | 198 -------------- eduserver/inthttp/client.go | 70 ----- eduserver/inthttp/server.go | 54 ---- federationapi/consumers/eduserver.go | 257 ------------------ federationapi/consumers/keychange.go | 4 +- federationapi/consumers/receipts.go | 141 ++++++++++ federationapi/consumers/sendtodevice.go | 125 +++++++++ federationapi/consumers/typing.go | 119 ++++++++ federationapi/federationapi.go | 41 ++- federationapi/federationapi_test.go | 2 +- federationapi/producers/syncapi.go | 144 ++++++++++ federationapi/routing/routing.go | 6 +- federationapi/routing/send.go | 39 +-- federationapi/routing/send_test.go | 40 --- federationapi/types/types.go | 15 + .../caching/cache_typing.go | 6 +- .../caching/cache_typing_test.go | 6 +- internal/test/config.go | 2 - keyserver/api/api.go | 18 +- keyserver/internal/cross_signing.go | 5 +- keyserver/producers/keychange.go | 5 +- setup/base/base.go | 11 - setup/config/config.go | 15 +- setup/config/config_eduserver.go | 17 -- setup/config/config_test.go | 4 - setup/jetstream/nats.go | 21 ++ setup/jetstream/streams.go | 5 +- setup/monolith.go | 18 +- syncapi/consumers/clientapi.go | 2 +- .../{eduserver_receipts.go => receipts.go} | 24 +- ...server_sendtodevice.go => sendtodevice.go} | 26 +- .../{eduserver_typing.go => typing.go} | 51 ++-- syncapi/storage/interface.go | 5 +- syncapi/storage/postgres/receipt_table.go | 7 +- syncapi/storage/shared/syncserver.go | 5 +- syncapi/storage/sqlite3/receipt_table.go | 7 +- syncapi/storage/tables/interface.go | 3 +- syncapi/streams/stream_receipt.go | 19 +- syncapi/streams/stream_typing.go | 4 +- syncapi/streams/streams.go | 4 +- syncapi/syncapi.go | 6 +- syncapi/types/types.go | 18 ++ 70 files changed, 931 insertions(+), 1354 deletions(-) delete mode 100644 cmd/dendrite-polylith-multi/personalities/eduserver.go delete mode 100644 eduserver/api/input.go delete mode 100644 eduserver/api/output.go delete mode 100644 eduserver/api/types.go delete mode 100644 eduserver/api/wrapper.go delete mode 100644 eduserver/eduserver.go delete mode 100644 eduserver/input/input.go delete mode 100644 eduserver/inthttp/client.go delete mode 100644 eduserver/inthttp/server.go delete mode 100644 federationapi/consumers/eduserver.go create mode 100644 federationapi/consumers/receipts.go create mode 100644 federationapi/consumers/sendtodevice.go create mode 100644 federationapi/consumers/typing.go create mode 100644 federationapi/producers/syncapi.go rename eduserver/cache/cache.go => internal/caching/cache_typing.go (97%) rename eduserver/cache/cache_test.go => internal/caching/cache_typing_test.go (97%) delete mode 100644 setup/config/config_eduserver.go rename syncapi/consumers/{eduserver_receipts.go => receipts.go} (87%) rename syncapi/consumers/{eduserver_sendtodevice.go => sendtodevice.go} (87%) rename syncapi/consumers/{eduserver_typing.go => typing.go} (67%) diff --git a/build/docker/config/dendrite.yaml b/build/docker/config/dendrite.yaml index c01ab62e..f3d37303 100644 --- a/build/docker/config/dendrite.yaml +++ b/build/docker/config/dendrite.yaml @@ -160,12 +160,6 @@ client_api: threshold: 5 cooloff_ms: 500 -# Configuration for the EDU server. -edu_server: - internal_api: - listen: http://0.0.0.0:7778 - connect: http://edu_server:7778 - # Configuration for the Federation API. federation_api: internal_api: diff --git a/build/docker/docker-compose.polylith.yml b/build/docker/docker-compose.polylith.yml index 207d0451..de0ab0aa 100644 --- a/build/docker/docker-compose.polylith.yml +++ b/build/docker/docker-compose.polylith.yml @@ -84,18 +84,6 @@ services: - internal restart: unless-stopped - edu_server: - hostname: edu_server - image: matrixdotorg/dendrite-polylith:latest - command: eduserver - volumes: - - ./config:/etc/dendrite - depends_on: - - jetstream - networks: - - internal - restart: unless-stopped - federation_api: hostname: federation_api image: matrixdotorg/dendrite-polylith:latest diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index 346c5d1e..efc21f59 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -24,8 +24,6 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" - "github.com/matrix-org/dendrite/eduserver" - "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/httputil" @@ -317,10 +315,6 @@ func (m *DendriteMonolith) Start() { m.userAPI = userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, base.PushGatewayHTTPClient()) keyAPI.SetUserAPI(m.userAPI) - eduInputAPI := eduserver.NewInternalAPI( - base, cache.New(), m.userAPI, - ) - asAPI := appservice.NewInternalAPI(base, m.userAPI, rsAPI) // The underlying roomserver implementation needs to be able to call the fedsender. @@ -338,7 +332,6 @@ func (m *DendriteMonolith) Start() { KeyRing: keyRing, AppserviceAPI: asAPI, - EDUInternalAPI: eduInputAPI, FederationAPI: fsAPI, RoomserverAPI: rsAPI, UserAPI: m.userAPI, diff --git a/build/gobind-yggdrasil/monolith.go b/build/gobind-yggdrasil/monolith.go index a2c9bcff..87dcad2e 100644 --- a/build/gobind-yggdrasil/monolith.go +++ b/build/gobind-yggdrasil/monolith.go @@ -13,8 +13,6 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms" - "github.com/matrix-org/dendrite/eduserver" - "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/httputil" @@ -123,10 +121,6 @@ func (m *DendriteMonolith) Start() { userAPI := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, base.PushGatewayHTTPClient()) keyAPI.SetUserAPI(userAPI) - eduInputAPI := eduserver.NewInternalAPI( - base, cache.New(), userAPI, - ) - asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) rsAPI.SetAppserviceAPI(asAPI) @@ -141,12 +135,11 @@ func (m *DendriteMonolith) Start() { FedClient: federation, KeyRing: keyRing, - AppserviceAPI: asAPI, - EDUInternalAPI: eduInputAPI, - FederationAPI: fsAPI, - RoomserverAPI: rsAPI, - UserAPI: userAPI, - KeyAPI: keyAPI, + AppserviceAPI: asAPI, + FederationAPI: fsAPI, + RoomserverAPI: rsAPI, + UserAPI: userAPI, + KeyAPI: keyAPI, ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider( ygg, fsAPI, federation, ), diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index d0ef368d..d4b417a3 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -20,7 +20,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/api" "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/routing" - eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/transactions" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" @@ -40,7 +39,6 @@ func AddPublicRoutes( cfg *config.ClientAPI, federation *gomatrixserverlib.FederationClient, rsAPI roomserverAPI.RoomserverInternalAPI, - eduInputAPI eduServerAPI.EDUServerInputAPI, asAPI appserviceAPI.AppServiceQueryAPI, transactionsCache *transactions.Cache, fsAPI federationAPI.FederationInternalAPI, @@ -53,12 +51,17 @@ func AddPublicRoutes( js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream) syncProducer := &producers.SyncAPIProducer{ - JetStream: js, - Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), + JetStream: js, + TopicClientData: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), + TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), + TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), + TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), + UserAPI: userAPI, + ServerName: cfg.Matrix.ServerName, } routing.Setup( - router, synapseAdminRouter, cfg, eduInputAPI, rsAPI, asAPI, + router, synapseAdminRouter, cfg, rsAPI, asAPI, userAPI, userDirectoryProvider, federation, syncProducer, transactionsCache, fsAPI, keyAPI, extRoomsProvider, mscCfg, diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go index 9ab90391..2dee04e3 100644 --- a/clientapi/producers/syncapi.go +++ b/clientapi/producers/syncapi.go @@ -15,24 +15,34 @@ package producers import ( + "context" "encoding/json" + "strconv" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" ) // SyncAPIProducer produces events for the sync API server to consume type SyncAPIProducer struct { - Topic string - JetStream nats.JetStreamContext + TopicClientData string + TopicReceiptEvent string + TopicSendToDeviceEvent string + TopicTypingEvent string + JetStream nats.JetStreamContext + ServerName gomatrixserverlib.ServerName + UserAPI userapi.UserInternalAPI } // SendData sends account data to the sync API server func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON) error { m := &nats.Msg{ - Subject: p.Topic, + Subject: p.TopicClientData, Header: nats.Header{}, } m.Header.Set(jetstream.UserID, userID) @@ -52,8 +62,114 @@ func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string "user_id": userID, "room_id": roomID, "data_type": dataType, - }).Tracef("Producing to topic '%s'", p.Topic) + }).Tracef("Producing to topic '%s'", p.TopicClientData) _, err = p.JetStream.PublishMsg(m) return err } + +func (p *SyncAPIProducer) SendReceipt( + ctx context.Context, + userID, roomID, eventID, receiptType string, timestamp gomatrixserverlib.Timestamp, +) error { + m := &nats.Msg{ + Subject: p.TopicReceiptEvent, + Header: nats.Header{}, + } + m.Header.Set(jetstream.UserID, userID) + m.Header.Set(jetstream.RoomID, roomID) + m.Header.Set(jetstream.EventID, eventID) + m.Header.Set("type", receiptType) + m.Header.Set("timestamp", strconv.Itoa(int(timestamp))) + + log.WithFields(log.Fields{}).Tracef("Producing to topic '%s'", p.TopicReceiptEvent) + _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) + return err +} + +func (p *SyncAPIProducer) SendToDevice( + ctx context.Context, sender, userID, deviceID, eventType string, + message interface{}, +) error { + devices := []string{} + _, domain, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return err + } + + // If the event is targeted locally then we want to expand the wildcard + // out into individual device IDs so that we can send them to each respective + // device. If the event isn't targeted locally then we can't expand the + // wildcard as we don't know about the remote devices, so instead we leave it + // as-is, so that the federation sender can send it on with the wildcard intact. + if domain == p.ServerName && deviceID == "*" { + var res userapi.QueryDevicesResponse + err = p.UserAPI.QueryDevices(context.TODO(), &userapi.QueryDevicesRequest{ + UserID: userID, + }, &res) + if err != nil { + return err + } + for _, dev := range res.Devices { + devices = append(devices, dev.ID) + } + } else { + devices = append(devices, deviceID) + } + + js, err := json.Marshal(message) + if err != nil { + return err + } + + log.WithFields(log.Fields{ + "user_id": userID, + "num_devices": len(devices), + "type": eventType, + }).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent) + for _, device := range devices { + ote := &types.OutputSendToDeviceEvent{ + UserID: userID, + DeviceID: device, + SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ + Sender: sender, + Type: eventType, + Content: js, + }, + } + + eventJSON, err := json.Marshal(ote) + if err != nil { + log.WithError(err).Error("sendToDevice failed json.Marshal") + return err + } + m := &nats.Msg{ + Subject: p.TopicSendToDeviceEvent, + Data: eventJSON, + Header: nats.Header{}, + } + m.Header.Set("sender", sender) + m.Header.Set(jetstream.UserID, userID) + if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil { + log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") + return err + } + } + return nil +} + +func (p *SyncAPIProducer) SendTyping( + ctx context.Context, userID, roomID string, typing bool, timeoutMS int64, +) error { + m := &nats.Msg{ + Subject: p.TopicTypingEvent, + Header: nats.Header{}, + } + m.Header.Set(jetstream.UserID, userID) + m.Header.Set(jetstream.RoomID, roomID) + m.Header.Set("typing", strconv.FormatBool(typing)) + m.Header.Set("timeout_ms", strconv.Itoa(int(timeoutMS))) + + _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) + return err +} diff --git a/clientapi/routing/account_data.go b/clientapi/routing/account_data.go index d8e98269..873ffaf5 100644 --- a/clientapi/routing/account_data.go +++ b/clientapi/routing/account_data.go @@ -23,7 +23,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" - eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal/eventutil" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/userapi/api" @@ -146,7 +145,7 @@ type fullyReadEvent struct { // SaveReadMarker implements POST /rooms/{roomId}/read_markers func SaveReadMarker( req *http.Request, - userAPI api.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, eduAPI eduserverAPI.EDUServerInputAPI, + userAPI api.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, syncProducer *producers.SyncAPIProducer, device *api.Device, roomID string, ) util.JSONResponse { // Verify that the user is a member of this room @@ -192,7 +191,7 @@ func SaveReadMarker( // Handle the read receipt that may be included in the read marker if r.Read != "" { - return SetReceipt(req, eduAPI, device, roomID, "m.read", r.Read) + return SetReceipt(req, syncProducer, device, roomID, "m.read", r.Read) } return util.JSONResponse{ diff --git a/clientapi/routing/receipt.go b/clientapi/routing/receipt.go index fe8fe765..0f9b1b4f 100644 --- a/clientapi/routing/receipt.go +++ b/clientapi/routing/receipt.go @@ -19,21 +19,20 @@ import ( "net/http" "time" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/dendrite/eduserver/api" - userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/util" "github.com/sirupsen/logrus" ) -func SetReceipt(req *http.Request, eduAPI api.EDUServerInputAPI, device *userapi.Device, roomId, receiptType, eventId string) util.JSONResponse { +func SetReceipt(req *http.Request, syncProducer *producers.SyncAPIProducer, device *userapi.Device, roomID, receiptType, eventID string) util.JSONResponse { timestamp := gomatrixserverlib.AsTimestamp(time.Now()) logrus.WithFields(logrus.Fields{ - "roomId": roomId, + "roomID": roomID, "receiptType": receiptType, - "eventId": eventId, + "eventID": eventID, "userId": device.UserID, "timestamp": timestamp, }).Debug("Setting receipt") @@ -43,7 +42,7 @@ func SetReceipt(req *http.Request, eduAPI api.EDUServerInputAPI, device *userapi return util.MessageResponse(400, fmt.Sprintf("receipt type must be m.read not '%s'", receiptType)) } - if err := api.SendReceipt(req.Context(), eduAPI, device.UserID, roomId, eventId, receiptType, timestamp); err != nil { + if err := syncProducer.SendReceipt(req.Context(), device.UserID, roomID, eventID, receiptType, timestamp); err != nil { return util.ErrorResponse(err) } diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 218087bb..8afaba56 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -26,7 +26,6 @@ import ( clientutil "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" - eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/transactions" @@ -47,7 +46,6 @@ import ( // nolint: gocyclo func Setup( publicAPIMux, synapseAdminRouter *mux.Router, cfg *config.ClientAPI, - eduAPI eduServerAPI.EDUServerInputAPI, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, userAPI userapi.UserInternalAPI, @@ -467,7 +465,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SendTyping(req, device, vars["roomID"], vars["userID"], eduAPI, rsAPI) + return SendTyping(req, device, vars["roomID"], vars["userID"], rsAPI, syncProducer) }), ).Methods(http.MethodPut, http.MethodOptions) v3mux.Handle("/rooms/{roomID}/redact/{eventID}", @@ -496,7 +494,7 @@ func Setup( return util.ErrorResponse(err) } txnID := vars["txnID"] - return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID) + return SendToDevice(req, device, syncProducer, transactionsCache, vars["eventType"], &txnID) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -510,7 +508,7 @@ func Setup( return util.ErrorResponse(err) } txnID := vars["txnID"] - return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID) + return SendToDevice(req, device, syncProducer, transactionsCache, vars["eventType"], &txnID) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -942,7 +940,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SaveReadMarker(req, userAPI, rsAPI, eduAPI, syncProducer, device, vars["roomID"]) + return SaveReadMarker(req, userAPI, rsAPI, syncProducer, device, vars["roomID"]) }), ).Methods(http.MethodPost, http.MethodOptions) @@ -1297,7 +1295,7 @@ func Setup( return util.ErrorResponse(err) } - return SetReceipt(req, eduAPI, device, vars["roomId"], vars["receiptType"], vars["eventId"]) + return SetReceipt(req, syncProducer, device, vars["roomId"], vars["receiptType"], vars["eventId"]) }), ).Methods(http.MethodPost, http.MethodOptions) } diff --git a/clientapi/routing/sendtodevice.go b/clientapi/routing/sendtodevice.go index 768e8e0e..4a5f0888 100644 --- a/clientapi/routing/sendtodevice.go +++ b/clientapi/routing/sendtodevice.go @@ -18,17 +18,17 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal/transactions" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/util" ) // SendToDevice handles PUT /_matrix/client/r0/sendToDevice/{eventType}/{txnId} -// sends the device events to the EDU Server +// sends the device events to the syncapi & federationsender func SendToDevice( req *http.Request, device *userapi.Device, - eduAPI api.EDUServerInputAPI, + syncProducer *producers.SyncAPIProducer, txnCache *transactions.Cache, eventType string, txnID *string, ) util.JSONResponse { @@ -48,8 +48,8 @@ func SendToDevice( for userID, byUser := range httpReq.Messages { for deviceID, message := range byUser { - if err := api.SendToDevice( - req.Context(), eduAPI, device.UserID, userID, deviceID, eventType, message, + if err := syncProducer.SendToDevice( + req.Context(), device.UserID, userID, deviceID, eventType, message, ); err != nil { util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed") return jsonerror.InternalServerError() diff --git a/clientapi/routing/sendtyping.go b/clientapi/routing/sendtyping.go index abd2061a..6a27ee61 100644 --- a/clientapi/routing/sendtyping.go +++ b/clientapi/routing/sendtyping.go @@ -17,7 +17,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/clientapi/producers" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/util" @@ -32,9 +32,8 @@ type typingContentJSON struct { // sends the typing events to client API typingProducer func SendTyping( req *http.Request, device *userapi.Device, roomID string, - userID string, - eduAPI api.EDUServerInputAPI, - rsAPI roomserverAPI.RoomserverInternalAPI, + userID string, rsAPI roomserverAPI.RoomserverInternalAPI, + syncProducer *producers.SyncAPIProducer, ) util.JSONResponse { if device.UserID != userID { return util.JSONResponse{ @@ -56,9 +55,7 @@ func SendTyping( return *resErr } - if err := api.SendTyping( - req.Context(), eduAPI, userID, roomID, r.Typing, r.Timeout, - ); err != nil { + if err := syncProducer.SendTyping(req.Context(), userID, roomID, r.Typing, r.Timeout); err != nil { util.GetLogger(req.Context()).WithError(err).Error("eduProducer.Send failed") return jsonerror.InternalServerError() } diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index 8ce64191..26c8eb85 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -29,7 +29,6 @@ import ( p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery" "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/embed" - "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" @@ -40,8 +39,6 @@ import ( "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/dendrite/eduserver/cache" - "github.com/sirupsen/logrus" _ "github.com/mattn/go-sqlite3" @@ -152,9 +149,6 @@ func main() { userAPI := userapi.NewInternalAPI(&base.Base, accountDB, &cfg.UserAPI, nil, keyAPI, rsAPI, base.Base.PushGatewayHTTPClient()) keyAPI.SetUserAPI(userAPI) - eduInputAPI := eduserver.NewInternalAPI( - &base.Base, cache.New(), userAPI, - ) asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI) rsAPI.SetAppserviceAPI(asAPI) fsAPI := federationapi.NewInternalAPI( @@ -180,7 +174,6 @@ func main() { KeyRing: keyRing, AppserviceAPI: asAPI, - EDUInternalAPI: eduInputAPI, FederationAPI: fsAPI, RoomserverAPI: rsAPI, UserAPI: userAPI, diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 87054dc8..a6abf06e 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -37,8 +37,6 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" - "github.com/matrix-org/dendrite/eduserver" - "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal" @@ -191,10 +189,6 @@ func main() { userAPI := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient()) keyAPI.SetUserAPI(userAPI) - eduInputAPI := eduserver.NewInternalAPI( - base, cache.New(), userAPI, - ) - asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) rsComponent.SetFederationAPI(fsAPI, keyRing) @@ -210,7 +204,6 @@ func main() { KeyRing: keyRing, AppserviceAPI: asAPI, - EDUInternalAPI: eduInputAPI, FederationAPI: fsAPI, RoomserverAPI: rsAPI, UserAPI: userAPI, diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index b7e30ba2..b840eb2b 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -32,8 +32,6 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms" - "github.com/matrix-org/dendrite/eduserver" - "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal" @@ -120,10 +118,6 @@ func main() { userAPI := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient()) keyAPI.SetUserAPI(userAPI) - eduInputAPI := eduserver.NewInternalAPI( - base, cache.New(), userAPI, - ) - asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) rsAPI.SetAppserviceAPI(asAPI) fsAPI := federationapi.NewInternalAPI( @@ -139,12 +133,11 @@ func main() { FedClient: federation, KeyRing: keyRing, - AppserviceAPI: asAPI, - EDUInternalAPI: eduInputAPI, - FederationAPI: fsAPI, - RoomserverAPI: rsAPI, - UserAPI: userAPI, - KeyAPI: keyAPI, + AppserviceAPI: asAPI, + FederationAPI: fsAPI, + RoomserverAPI: rsAPI, + UserAPI: userAPI, + KeyAPI: keyAPI, ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider( ygg, fsAPI, federation, ), diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 3b952504..1443ab5b 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -19,8 +19,6 @@ import ( "os" "github.com/matrix-org/dendrite/appservice" - "github.com/matrix-org/dendrite/eduserver" - "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/keyserver" "github.com/matrix-org/dendrite/roomserver" @@ -61,7 +59,6 @@ func main() { // itself. cfg.AppServiceAPI.InternalAPI.Connect = httpAPIAddr cfg.ClientAPI.InternalAPI.Connect = httpAPIAddr - cfg.EDUServer.InternalAPI.Connect = httpAPIAddr cfg.FederationAPI.InternalAPI.Connect = httpAPIAddr cfg.KeyServer.InternalAPI.Connect = httpAPIAddr cfg.MediaAPI.InternalAPI.Connect = httpAPIAddr @@ -136,14 +133,6 @@ func main() { rsImpl.SetUserAPI(userAPI) keyImpl.SetUserAPI(userAPI) - eduInputAPI := eduserver.NewInternalAPI( - base, cache.New(), userAPI, - ) - if base.UseHTTPAPIs { - eduserver.AddInternalRoutes(base.InternalAPIMux, eduInputAPI) - eduInputAPI = base.EDUServerClient() - } - monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, @@ -151,12 +140,10 @@ func main() { FedClient: federation, KeyRing: keyRing, - AppserviceAPI: asAPI, - EDUInternalAPI: eduInputAPI, - FederationAPI: fsAPI, - RoomserverAPI: rsAPI, - UserAPI: userAPI, - KeyAPI: keyAPI, + AppserviceAPI: asAPI, FederationAPI: fsAPI, + RoomserverAPI: rsAPI, + UserAPI: userAPI, + KeyAPI: keyAPI, } monolith.AddAllPublicRoutes( base.ProcessContext, diff --git a/cmd/dendrite-polylith-multi/main.go b/cmd/dendrite-polylith-multi/main.go index edfe6cdb..6226cc32 100644 --- a/cmd/dendrite-polylith-multi/main.go +++ b/cmd/dendrite-polylith-multi/main.go @@ -43,7 +43,6 @@ func main() { components := map[string]entrypoint{ "appservice": personalities.Appservice, "clientapi": personalities.ClientAPI, - "eduserver": personalities.EDUServer, "federationapi": personalities.FederationAPI, "keyserver": personalities.KeyServer, "mediaapi": personalities.MediaAPI, diff --git a/cmd/dendrite-polylith-multi/personalities/clientapi.go b/cmd/dendrite-polylith-multi/personalities/clientapi.go index 978d8b0a..1e509f88 100644 --- a/cmd/dendrite-polylith-multi/personalities/clientapi.go +++ b/cmd/dendrite-polylith-multi/personalities/clientapi.go @@ -27,13 +27,12 @@ func ClientAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { asQuery := base.AppserviceHTTPClient() rsAPI := base.RoomserverHTTPClient() fsAPI := base.FederationAPIHTTPClient() - eduInputAPI := base.EDUServerClient() userAPI := base.UserAPIClient() keyAPI := base.KeyServerHTTPClient() clientapi.AddPublicRoutes( base.ProcessContext, base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI, - federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, userAPI, + federation, rsAPI, asQuery, transactions.New(), fsAPI, userAPI, userAPI, keyAPI, nil, &cfg.MSCs, ) diff --git a/cmd/dendrite-polylith-multi/personalities/eduserver.go b/cmd/dendrite-polylith-multi/personalities/eduserver.go deleted file mode 100644 index 8719facb..00000000 --- a/cmd/dendrite-polylith-multi/personalities/eduserver.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package personalities - -import ( - "github.com/matrix-org/dendrite/eduserver" - "github.com/matrix-org/dendrite/eduserver/cache" - basepkg "github.com/matrix-org/dendrite/setup/base" - "github.com/matrix-org/dendrite/setup/config" -) - -func EDUServer(base *basepkg.BaseDendrite, cfg *config.Dendrite) { - intAPI := eduserver.NewInternalAPI(base, cache.New(), base.UserAPIClient()) - eduserver.AddInternalRoutes(base.InternalAPIMux, intAPI) - - base.SetupAndServeHTTP( - base.Cfg.EDUServer.InternalAPI.Listen, // internal listener - basepkg.NoListener, // external listener - nil, nil, - ) -} diff --git a/cmd/dendrite-polylith-multi/personalities/federationapi.go b/cmd/dendrite-polylith-multi/personalities/federationapi.go index 44357d66..b82577ce 100644 --- a/cmd/dendrite-polylith-multi/personalities/federationapi.go +++ b/cmd/dendrite-polylith-multi/personalities/federationapi.go @@ -29,9 +29,9 @@ func FederationAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { keyRing := fsAPI.KeyRing() federationapi.AddPublicRoutes( - base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicWellKnownAPIMux, + base.ProcessContext, base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicWellKnownAPIMux, &base.Cfg.FederationAPI, userAPI, federation, keyRing, - rsAPI, fsAPI, base.EDUServerClient(), keyAPI, + rsAPI, fsAPI, keyAPI, &base.Cfg.MSCs, nil, ) diff --git a/cmd/dendritejs-pinecone/main.go b/cmd/dendritejs-pinecone/main.go index 407081f5..05cdf29e 100644 --- a/cmd/dendritejs-pinecone/main.go +++ b/cmd/dendritejs-pinecone/main.go @@ -31,8 +31,6 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" - "github.com/matrix-org/dendrite/eduserver" - "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" @@ -193,7 +191,6 @@ func startup() { userAPI := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient()) keyAPI.SetUserAPI(userAPI) - eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI) asQuery := appservice.NewInternalAPI( base, userAPI, rsAPI, ) @@ -208,12 +205,11 @@ func startup() { FedClient: federation, KeyRing: keyRing, - AppserviceAPI: asQuery, - EDUInternalAPI: eduInputAPI, - FederationAPI: fedSenderAPI, - RoomserverAPI: rsAPI, - UserAPI: userAPI, - KeyAPI: keyAPI, + AppserviceAPI: asQuery, + FederationAPI: fedSenderAPI, + RoomserverAPI: rsAPI, + UserAPI: userAPI, + KeyAPI: keyAPI, //ServerKeyAPI: serverKeyAPI, ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pSessions, fedSenderAPI, federation), } diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 37cbb12d..05e0f0ad 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -24,8 +24,6 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/appservice" - "github.com/matrix-org/dendrite/eduserver" - "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" @@ -203,7 +201,6 @@ func main() { } rsAPI := roomserver.NewInternalAPI(base) - eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI) asQuery := appservice.NewInternalAPI( base, userAPI, rsAPI, ) @@ -222,7 +219,6 @@ func main() { KeyRing: &keyRing, AppserviceAPI: asQuery, - EDUInternalAPI: eduInputAPI, FederationSenderAPI: fedSenderAPI, RoomserverAPI: rsAPI, UserAPI: userAPI, diff --git a/dendrite-config.yaml b/dendrite-config.yaml index 0236851c..6e2bc7be 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -187,12 +187,6 @@ client_api: threshold: 5 cooloff_ms: 500 -# Configuration for the EDU server. -edu_server: - internal_api: - listen: http://localhost:7778 # Only used in polylith deployments - connect: http://localhost:7778 # Only used in polylith deployments - # Configuration for the Federation API. federation_api: internal_api: diff --git a/docs/INSTALL.md b/docs/INSTALL.md index a7b2e67f..523c5c7d 100644 --- a/docs/INSTALL.md +++ b/docs/INSTALL.md @@ -263,14 +263,6 @@ This manages end-to-end encryption keys for users. ./bin/dendrite-polylith-multi --config=dendrite.yaml keyserver ``` -#### EDU server - -This manages processing EDUs such as typing, send-to-device events and presence. Clients do not talk to - -```bash -./bin/dendrite-polylith-multi --config=dendrite.yaml eduserver -``` - #### User server This manages user accounts, device access tokens and user account data, diff --git a/eduserver/api/input.go b/eduserver/api/input.go deleted file mode 100644 index 2aab107b..00000000 --- a/eduserver/api/input.go +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// Copyright 2017-2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package api provides the types that are used to communicate with the typing server. -package api - -import ( - "context" - - "github.com/matrix-org/gomatrixserverlib" -) - -// InputTypingEvent is an event for notifying the typing server about typing updates. -type InputTypingEvent struct { - // UserID of the user to update typing status. - UserID string `json:"user_id"` - // RoomID of the room the user is typing (or has stopped). - RoomID string `json:"room_id"` - // Typing is true if the user is typing, false if they have stopped. - Typing bool `json:"typing"` - // Timeout is the interval in milliseconds for which the user should be marked as typing. - TimeoutMS int64 `json:"timeout"` - // OriginServerTS when the server received the update. - OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"` -} - -type InputSendToDeviceEvent struct { - UserID string `json:"user_id"` - DeviceID string `json:"device_id"` - gomatrixserverlib.SendToDeviceEvent -} - -// InputTypingEventRequest is a request to EDUServerInputAPI -type InputTypingEventRequest struct { - InputTypingEvent InputTypingEvent `json:"input_typing_event"` -} - -// InputTypingEventResponse is a response to InputTypingEvents -type InputTypingEventResponse struct{} - -// InputSendToDeviceEventRequest is a request to EDUServerInputAPI -type InputSendToDeviceEventRequest struct { - InputSendToDeviceEvent InputSendToDeviceEvent `json:"input_send_to_device_event"` -} - -// InputSendToDeviceEventResponse is a response to InputSendToDeviceEventRequest -type InputSendToDeviceEventResponse struct{} - -type InputReceiptEvent struct { - UserID string `json:"user_id"` - RoomID string `json:"room_id"` - EventID string `json:"event_id"` - Type string `json:"type"` - Timestamp gomatrixserverlib.Timestamp `json:"timestamp"` -} - -// InputReceiptEventRequest is a request to EDUServerInputAPI -type InputReceiptEventRequest struct { - InputReceiptEvent InputReceiptEvent `json:"input_receipt_event"` -} - -// InputReceiptEventResponse is a response to InputReceiptEventRequest -type InputReceiptEventResponse struct{} - -type InputCrossSigningKeyUpdateRequest struct { - CrossSigningKeyUpdate `json:"signing_keys"` -} - -type InputCrossSigningKeyUpdateResponse struct{} - -// EDUServerInputAPI is used to write events to the typing server. -type EDUServerInputAPI interface { - InputTypingEvent( - ctx context.Context, - request *InputTypingEventRequest, - response *InputTypingEventResponse, - ) error - - InputSendToDeviceEvent( - ctx context.Context, - request *InputSendToDeviceEventRequest, - response *InputSendToDeviceEventResponse, - ) error - - InputReceiptEvent( - ctx context.Context, - request *InputReceiptEventRequest, - response *InputReceiptEventResponse, - ) error -} diff --git a/eduserver/api/output.go b/eduserver/api/output.go deleted file mode 100644 index c6de4e01..00000000 --- a/eduserver/api/output.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// Copyright 2017-2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package api - -import ( - "time" - - "github.com/matrix-org/gomatrixserverlib" -) - -// OutputTypingEvent is an entry in typing server output kafka log. -// This contains the event with extra fields used to create 'm.typing' event -// in clientapi & federation. -type OutputTypingEvent struct { - // The Event for the typing edu event. - Event TypingEvent `json:"event"` - // ExpireTime is the interval after which the user should no longer be - // considered typing. Only available if Event.Typing is true. - ExpireTime *time.Time -} - -// OutputSendToDeviceEvent is an entry in the send-to-device output kafka log. -// This contains the full event content, along with the user ID and device ID -// to which it is destined. -type OutputSendToDeviceEvent struct { - UserID string `json:"user_id"` - DeviceID string `json:"device_id"` - gomatrixserverlib.SendToDeviceEvent -} - -// OutputReceiptEvent is an entry in the receipt output kafka log -type OutputReceiptEvent struct { - UserID string `json:"user_id"` - RoomID string `json:"room_id"` - EventID string `json:"event_id"` - Type string `json:"type"` - Timestamp gomatrixserverlib.Timestamp `json:"timestamp"` -} - -// OutputCrossSigningKeyUpdate is an entry in the signing key update output kafka log -type OutputCrossSigningKeyUpdate struct { - CrossSigningKeyUpdate `json:"signing_keys"` -} diff --git a/eduserver/api/types.go b/eduserver/api/types.go deleted file mode 100644 index a207580f..00000000 --- a/eduserver/api/types.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright 2021 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package api - -import "github.com/matrix-org/gomatrixserverlib" - -const ( - MSigningKeyUpdate = "m.signing_key_update" -) - -type TypingEvent struct { - Type string `json:"type"` - RoomID string `json:"room_id"` - UserID string `json:"user_id"` - Typing bool `json:"typing"` -} - -type ReceiptEvent struct { - UserID string `json:"user_id"` - RoomID string `json:"room_id"` - EventID string `json:"event_id"` - Type string `json:"type"` - Timestamp gomatrixserverlib.Timestamp `json:"timestamp"` -} - -type FederationReceiptMRead struct { - User map[string]FederationReceiptData `json:"m.read"` -} - -type FederationReceiptData struct { - Data ReceiptTS `json:"data"` - EventIDs []string `json:"event_ids"` -} - -type ReceiptMRead struct { - User map[string]ReceiptTS `json:"m.read"` -} - -type ReceiptTS struct { - TS gomatrixserverlib.Timestamp `json:"ts"` -} - -type CrossSigningKeyUpdate struct { - MasterKey *gomatrixserverlib.CrossSigningKey `json:"master_key,omitempty"` - SelfSigningKey *gomatrixserverlib.CrossSigningKey `json:"self_signing_key,omitempty"` - UserID string `json:"user_id"` -} diff --git a/eduserver/api/wrapper.go b/eduserver/api/wrapper.go deleted file mode 100644 index 7907f4d3..00000000 --- a/eduserver/api/wrapper.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package api - -import ( - "context" - "encoding/json" - "time" - - "github.com/matrix-org/gomatrixserverlib" -) - -// SendTyping sends a typing event to EDU server -func SendTyping( - ctx context.Context, eduAPI EDUServerInputAPI, userID, roomID string, - typing bool, timeoutMS int64, -) error { - requestData := InputTypingEvent{ - UserID: userID, - RoomID: roomID, - Typing: typing, - TimeoutMS: timeoutMS, - OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()), - } - - var response InputTypingEventResponse - err := eduAPI.InputTypingEvent( - ctx, &InputTypingEventRequest{InputTypingEvent: requestData}, &response, - ) - - return err -} - -// SendToDevice sends a typing event to EDU server -func SendToDevice( - ctx context.Context, eduAPI EDUServerInputAPI, sender, userID, deviceID, eventType string, - message interface{}, -) error { - js, err := json.Marshal(message) - if err != nil { - return err - } - requestData := InputSendToDeviceEvent{ - UserID: userID, - DeviceID: deviceID, - SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ - Sender: sender, - Type: eventType, - Content: js, - }, - } - request := InputSendToDeviceEventRequest{ - InputSendToDeviceEvent: requestData, - } - response := InputSendToDeviceEventResponse{} - return eduAPI.InputSendToDeviceEvent(ctx, &request, &response) -} - -// SendReceipt sends a receipt event to EDU Server -func SendReceipt( - ctx context.Context, - eduAPI EDUServerInputAPI, userID, roomID, eventID, receiptType string, - timestamp gomatrixserverlib.Timestamp, -) error { - request := InputReceiptEventRequest{ - InputReceiptEvent: InputReceiptEvent{ - UserID: userID, - RoomID: roomID, - EventID: eventID, - Type: receiptType, - Timestamp: timestamp, - }, - } - response := InputReceiptEventResponse{} - return eduAPI.InputReceiptEvent(ctx, &request, &response) -} diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go deleted file mode 100644 index 91208a40..00000000 --- a/eduserver/eduserver.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// Copyright 2017-2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package eduserver - -import ( - "github.com/gorilla/mux" - "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/eduserver/cache" - "github.com/matrix-org/dendrite/eduserver/input" - "github.com/matrix-org/dendrite/eduserver/inthttp" - "github.com/matrix-org/dendrite/setup/base" - "github.com/matrix-org/dendrite/setup/jetstream" - userapi "github.com/matrix-org/dendrite/userapi/api" -) - -// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions -// on the given input API. -func AddInternalRoutes(internalMux *mux.Router, inputAPI api.EDUServerInputAPI) { - inthttp.AddRoutes(inputAPI, internalMux) -} - -// NewInternalAPI returns a concerete implementation of the internal API. Callers -// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. -func NewInternalAPI( - base *base.BaseDendrite, - eduCache *cache.EDUCache, - userAPI userapi.UserInternalAPI, -) api.EDUServerInputAPI { - cfg := &base.Cfg.EDUServer - - js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) - - return &input.EDUServerInputAPI{ - Cache: eduCache, - UserAPI: userAPI, - JetStream: js, - OutputTypingEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), - OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), - OutputReceiptEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), - ServerName: cfg.Matrix.ServerName, - } -} diff --git a/eduserver/input/input.go b/eduserver/input/input.go deleted file mode 100644 index e58f0dd3..00000000 --- a/eduserver/input/input.go +++ /dev/null @@ -1,198 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// Copyright 2017-2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package input - -import ( - "context" - "encoding/json" - "time" - - "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/eduserver/cache" - userapi "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrixserverlib" - "github.com/nats-io/nats.go" - "github.com/sirupsen/logrus" -) - -// EDUServerInputAPI implements api.EDUServerInputAPI -type EDUServerInputAPI struct { - // Cache to store the current typing members in each room. - Cache *cache.EDUCache - // The kafka topic to output new typing events to. - OutputTypingEventTopic string - // The kafka topic to output new send to device events to. - OutputSendToDeviceEventTopic string - // The kafka topic to output new receipt events to - OutputReceiptEventTopic string - // kafka producer - JetStream nats.JetStreamContext - // Internal user query API - UserAPI userapi.UserInternalAPI - // our server name - ServerName gomatrixserverlib.ServerName -} - -// InputTypingEvent implements api.EDUServerInputAPI -func (t *EDUServerInputAPI) InputTypingEvent( - ctx context.Context, - request *api.InputTypingEventRequest, - response *api.InputTypingEventResponse, -) error { - ite := &request.InputTypingEvent - if ite.Typing { - // user is typing, update our current state of users typing. - expireTime := ite.OriginServerTS.Time().Add( - time.Duration(ite.TimeoutMS) * time.Millisecond, - ) - t.Cache.AddTypingUser(ite.UserID, ite.RoomID, &expireTime) - } else { - t.Cache.RemoveUser(ite.UserID, ite.RoomID) - } - - return t.sendTypingEvent(ite) -} - -// InputTypingEvent implements api.EDUServerInputAPI -func (t *EDUServerInputAPI) InputSendToDeviceEvent( - ctx context.Context, - request *api.InputSendToDeviceEventRequest, - response *api.InputSendToDeviceEventResponse, -) error { - ise := &request.InputSendToDeviceEvent - return t.sendToDeviceEvent(ise) -} - -func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { - ev := &api.TypingEvent{ - Type: gomatrixserverlib.MTyping, - RoomID: ite.RoomID, - UserID: ite.UserID, - Typing: ite.Typing, - } - ote := &api.OutputTypingEvent{ - Event: *ev, - } - - if ev.Typing { - expireTime := ite.OriginServerTS.Time().Add( - time.Duration(ite.TimeoutMS) * time.Millisecond, - ) - ote.ExpireTime = &expireTime - } - - eventJSON, err := json.Marshal(ote) - if err != nil { - return err - } - logrus.WithFields(logrus.Fields{ - "room_id": ite.RoomID, - "user_id": ite.UserID, - "typing": ite.Typing, - }).Tracef("Producing to topic '%s'", t.OutputTypingEventTopic) - - _, err = t.JetStream.PublishMsg(&nats.Msg{ - Subject: t.OutputTypingEventTopic, - Header: nats.Header{}, - Data: eventJSON, - }) - return err -} - -func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) error { - devices := []string{} - _, domain, err := gomatrixserverlib.SplitID('@', ise.UserID) - if err != nil { - return err - } - - // If the event is targeted locally then we want to expand the wildcard - // out into individual device IDs so that we can send them to each respective - // device. If the event isn't targeted locally then we can't expand the - // wildcard as we don't know about the remote devices, so instead we leave it - // as-is, so that the federation sender can send it on with the wildcard intact. - if domain == t.ServerName && ise.DeviceID == "*" { - var res userapi.QueryDevicesResponse - err = t.UserAPI.QueryDevices(context.TODO(), &userapi.QueryDevicesRequest{ - UserID: ise.UserID, - }, &res) - if err != nil { - return err - } - for _, dev := range res.Devices { - devices = append(devices, dev.ID) - } - } else { - devices = append(devices, ise.DeviceID) - } - - logrus.WithFields(logrus.Fields{ - "user_id": ise.UserID, - "num_devices": len(devices), - "type": ise.Type, - }).Tracef("Producing to topic '%s'", t.OutputSendToDeviceEventTopic) - for _, device := range devices { - ote := &api.OutputSendToDeviceEvent{ - UserID: ise.UserID, - DeviceID: device, - SendToDeviceEvent: ise.SendToDeviceEvent, - } - - eventJSON, err := json.Marshal(ote) - if err != nil { - logrus.WithError(err).Error("sendToDevice failed json.Marshal") - return err - } - - if _, err = t.JetStream.PublishMsg(&nats.Msg{ - Subject: t.OutputSendToDeviceEventTopic, - Data: eventJSON, - }); err != nil { - logrus.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") - return err - } - } - - return nil -} - -// InputReceiptEvent implements api.EDUServerInputAPI -// TODO: Intelligently batch requests sent by the same user (e.g wait a few milliseconds before emitting output events) -func (t *EDUServerInputAPI) InputReceiptEvent( - ctx context.Context, - request *api.InputReceiptEventRequest, - response *api.InputReceiptEventResponse, -) error { - logrus.WithFields(logrus.Fields{}).Tracef("Producing to topic '%s'", t.OutputReceiptEventTopic) - output := &api.OutputReceiptEvent{ - UserID: request.InputReceiptEvent.UserID, - RoomID: request.InputReceiptEvent.RoomID, - EventID: request.InputReceiptEvent.EventID, - Type: request.InputReceiptEvent.Type, - Timestamp: request.InputReceiptEvent.Timestamp, - } - js, err := json.Marshal(output) - if err != nil { - return err - } - - _, err = t.JetStream.PublishMsg(&nats.Msg{ - Subject: t.OutputReceiptEventTopic, - Data: js, - }) - return err -} diff --git a/eduserver/inthttp/client.go b/eduserver/inthttp/client.go deleted file mode 100644 index 0690ed82..00000000 --- a/eduserver/inthttp/client.go +++ /dev/null @@ -1,70 +0,0 @@ -package inthttp - -import ( - "context" - "errors" - "net/http" - - "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/internal/httputil" - "github.com/opentracing/opentracing-go" -) - -// HTTP paths for the internal HTTP APIs -const ( - EDUServerInputTypingEventPath = "/eduserver/input" - EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice" - EDUServerInputReceiptEventPath = "/eduserver/receipt" -) - -// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API. -func NewEDUServerClient(eduServerURL string, httpClient *http.Client) (api.EDUServerInputAPI, error) { - if httpClient == nil { - return nil, errors.New("NewEDUServerClient: httpClient is ") - } - return &httpEDUServerInputAPI{eduServerURL, httpClient}, nil -} - -type httpEDUServerInputAPI struct { - eduServerURL string - httpClient *http.Client -} - -// InputTypingEvent implements EDUServerInputAPI -func (h *httpEDUServerInputAPI) InputTypingEvent( - ctx context.Context, - request *api.InputTypingEventRequest, - response *api.InputTypingEventResponse, -) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "InputTypingEvent") - defer span.Finish() - - apiURL := h.eduServerURL + EDUServerInputTypingEventPath - return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) -} - -// InputSendToDeviceEvent implements EDUServerInputAPI -func (h *httpEDUServerInputAPI) InputSendToDeviceEvent( - ctx context.Context, - request *api.InputSendToDeviceEventRequest, - response *api.InputSendToDeviceEventResponse, -) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "InputSendToDeviceEvent") - defer span.Finish() - - apiURL := h.eduServerURL + EDUServerInputSendToDeviceEventPath - return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) -} - -// InputSendToDeviceEvent implements EDUServerInputAPI -func (h *httpEDUServerInputAPI) InputReceiptEvent( - ctx context.Context, - request *api.InputReceiptEventRequest, - response *api.InputReceiptEventResponse, -) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "InputReceiptEventPath") - defer span.Finish() - - apiURL := h.eduServerURL + EDUServerInputReceiptEventPath - return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) -} diff --git a/eduserver/inthttp/server.go b/eduserver/inthttp/server.go deleted file mode 100644 index a3494375..00000000 --- a/eduserver/inthttp/server.go +++ /dev/null @@ -1,54 +0,0 @@ -package inthttp - -import ( - "encoding/json" - "net/http" - - "github.com/gorilla/mux" - "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/internal/httputil" - "github.com/matrix-org/util" -) - -// AddRoutes adds the EDUServerInputAPI handlers to the http.ServeMux. -func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) { - internalAPIMux.Handle(EDUServerInputTypingEventPath, - httputil.MakeInternalAPI("inputTypingEvents", func(req *http.Request) util.JSONResponse { - var request api.InputTypingEventRequest - var response api.InputTypingEventResponse - if err := json.NewDecoder(req.Body).Decode(&request); err != nil { - return util.MessageResponse(http.StatusBadRequest, err.Error()) - } - if err := t.InputTypingEvent(req.Context(), &request, &response); err != nil { - return util.ErrorResponse(err) - } - return util.JSONResponse{Code: http.StatusOK, JSON: &response} - }), - ) - internalAPIMux.Handle(EDUServerInputSendToDeviceEventPath, - httputil.MakeInternalAPI("inputSendToDeviceEvents", func(req *http.Request) util.JSONResponse { - var request api.InputSendToDeviceEventRequest - var response api.InputSendToDeviceEventResponse - if err := json.NewDecoder(req.Body).Decode(&request); err != nil { - return util.MessageResponse(http.StatusBadRequest, err.Error()) - } - if err := t.InputSendToDeviceEvent(req.Context(), &request, &response); err != nil { - return util.ErrorResponse(err) - } - return util.JSONResponse{Code: http.StatusOK, JSON: &response} - }), - ) - internalAPIMux.Handle(EDUServerInputReceiptEventPath, - httputil.MakeInternalAPI("inputReceiptEvent", func(req *http.Request) util.JSONResponse { - var request api.InputReceiptEventRequest - var response api.InputReceiptEventResponse - if err := json.NewDecoder(req.Body).Decode(&request); err != nil { - return util.MessageResponse(http.StatusBadRequest, err.Error()) - } - if err := t.InputReceiptEvent(req.Context(), &request, &response); err != nil { - return util.ErrorResponse(err) - } - return util.JSONResponse{Code: http.StatusOK, JSON: &response} - }), - ) -} diff --git a/federationapi/consumers/eduserver.go b/federationapi/consumers/eduserver.go deleted file mode 100644 index e14e60f4..00000000 --- a/federationapi/consumers/eduserver.go +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package consumers - -import ( - "context" - "encoding/json" - - "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/federationapi/queue" - "github.com/matrix-org/dendrite/federationapi/storage" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/setup/process" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" - "github.com/nats-io/nats.go" - log "github.com/sirupsen/logrus" -) - -// OutputEDUConsumer consumes events that originate in EDU server. -type OutputEDUConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - db storage.Database - queues *queue.OutgoingQueues - ServerName gomatrixserverlib.ServerName - typingTopic string - sendToDeviceTopic string - receiptTopic string -} - -// NewOutputEDUConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers. -func NewOutputEDUConsumer( - process *process.ProcessContext, - cfg *config.FederationAPI, - js nats.JetStreamContext, - queues *queue.OutgoingQueues, - store storage.Database, -) *OutputEDUConsumer { - return &OutputEDUConsumer{ - ctx: process.Context(), - jetstream: js, - queues: queues, - db: store, - ServerName: cfg.Matrix.ServerName, - durable: cfg.Matrix.JetStream.Durable("FederationAPIEDUServerConsumer"), - typingTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), - sendToDeviceTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), - receiptTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), - } -} - -// Start consuming from EDU servers -func (t *OutputEDUConsumer) Start() error { - if err := jetstream.JetStreamConsumer( - t.ctx, t.jetstream, t.typingTopic, t.durable, t.onTypingEvent, - nats.DeliverAll(), nats.ManualAck(), - ); err != nil { - return err - } - if err := jetstream.JetStreamConsumer( - t.ctx, t.jetstream, t.sendToDeviceTopic, t.durable, t.onSendToDeviceEvent, - nats.DeliverAll(), nats.ManualAck(), - ); err != nil { - return err - } - if err := jetstream.JetStreamConsumer( - t.ctx, t.jetstream, t.receiptTopic, t.durable, t.onReceiptEvent, - nats.DeliverAll(), nats.ManualAck(), - ); err != nil { - return err - } - return nil -} - -// onSendToDeviceEvent is called in response to a message received on the -// send-to-device events topic from the EDU server. -func (t *OutputEDUConsumer) onSendToDeviceEvent(ctx context.Context, msg *nats.Msg) bool { - // Extract the send-to-device event from msg. - var ote api.OutputSendToDeviceEvent - if err := json.Unmarshal(msg.Data, &ote); err != nil { - log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)") - return true - } - - // only send send-to-device events which originated from us - _, originServerName, err := gomatrixserverlib.SplitID('@', ote.Sender) - if err != nil { - log.WithError(err).WithField("user_id", ote.Sender).Error("Failed to extract domain from send-to-device sender") - return true - } - if originServerName != t.ServerName { - log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere") - return true - } - - _, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID) - if err != nil { - log.WithError(err).WithField("user_id", ote.UserID).Error("Failed to extract domain from send-to-device destination") - return true - } - - // Pack the EDU and marshal it - edu := &gomatrixserverlib.EDU{ - Type: gomatrixserverlib.MDirectToDevice, - Origin: string(t.ServerName), - } - tdm := gomatrixserverlib.ToDeviceMessage{ - Sender: ote.Sender, - Type: ote.Type, - MessageID: util.RandomString(32), - Messages: map[string]map[string]json.RawMessage{ - ote.UserID: { - ote.DeviceID: ote.Content, - }, - }, - } - if edu.Content, err = json.Marshal(tdm); err != nil { - log.WithError(err).Error("failed to marshal EDU JSON") - return true - } - - log.Debugf("Sending send-to-device message into %q destination queue", destServerName) - if err := t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil { - log.WithError(err).Error("failed to send EDU") - return false - } - - return true -} - -// onTypingEvent is called in response to a message received on the typing -// events topic from the EDU server. -func (t *OutputEDUConsumer) onTypingEvent(ctx context.Context, msg *nats.Msg) bool { - // Extract the typing event from msg. - var ote api.OutputTypingEvent - if err := json.Unmarshal(msg.Data, &ote); err != nil { - // Skip this msg but continue processing messages. - log.WithError(err).Errorf("eduserver output log: message parse failed (expected typing)") - _ = msg.Ack() - return true - } - - // only send typing events which originated from us - _, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID) - if err != nil { - log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender") - _ = msg.Ack() - return true - } - if typingServerName != t.ServerName { - return true - } - - joined, err := t.db.GetJoinedHosts(ctx, ote.Event.RoomID) - if err != nil { - log.WithError(err).WithField("room_id", ote.Event.RoomID).Error("failed to get joined hosts for room") - return false - } - - names := make([]gomatrixserverlib.ServerName, len(joined)) - for i := range joined { - names[i] = joined[i].ServerName - } - - edu := &gomatrixserverlib.EDU{Type: ote.Event.Type} - if edu.Content, err = json.Marshal(map[string]interface{}{ - "room_id": ote.Event.RoomID, - "user_id": ote.Event.UserID, - "typing": ote.Event.Typing, - }); err != nil { - log.WithError(err).Error("failed to marshal EDU JSON") - return true - } - - if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil { - log.WithError(err).Error("failed to send EDU") - return false - } - - return true -} - -// onReceiptEvent is called in response to a message received on the receipt -// events topic from the EDU server. -func (t *OutputEDUConsumer) onReceiptEvent(ctx context.Context, msg *nats.Msg) bool { - // Extract the typing event from msg. - var receipt api.OutputReceiptEvent - if err := json.Unmarshal(msg.Data, &receipt); err != nil { - // Skip this msg but continue processing messages. - log.WithError(err).Errorf("eduserver output log: message parse failed (expected receipt)") - return true - } - - // only send receipt events which originated from us - _, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID) - if err != nil { - log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender") - return true - } - if receiptServerName != t.ServerName { - return true - } - - joined, err := t.db.GetJoinedHosts(ctx, receipt.RoomID) - if err != nil { - log.WithError(err).WithField("room_id", receipt.RoomID).Error("failed to get joined hosts for room") - return false - } - - names := make([]gomatrixserverlib.ServerName, len(joined)) - for i := range joined { - names[i] = joined[i].ServerName - } - - content := map[string]api.FederationReceiptMRead{} - content[receipt.RoomID] = api.FederationReceiptMRead{ - User: map[string]api.FederationReceiptData{ - receipt.UserID: { - Data: api.ReceiptTS{ - TS: receipt.Timestamp, - }, - EventIDs: []string{receipt.EventID}, - }, - }, - } - - edu := &gomatrixserverlib.EDU{ - Type: gomatrixserverlib.MReceipt, - Origin: string(t.ServerName), - } - if edu.Content, err = json.Marshal(content); err != nil { - log.WithError(err).Error("failed to marshal EDU JSON") - return true - } - - if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil { - log.WithError(err).Error("failed to send EDU") - return false - } - - return true -} diff --git a/federationapi/consumers/keychange.go b/federationapi/consumers/keychange.go index 94e45435..0ece18e9 100644 --- a/federationapi/consumers/keychange.go +++ b/federationapi/consumers/keychange.go @@ -18,9 +18,9 @@ import ( "context" "encoding/json" - eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/federationapi/queue" "github.com/matrix-org/dendrite/federationapi/storage" + "github.com/matrix-org/dendrite/federationapi/types" "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" @@ -190,7 +190,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool { // Pack the EDU and marshal it edu := &gomatrixserverlib.EDU{ - Type: eduserverAPI.MSigningKeyUpdate, + Type: types.MSigningKeyUpdate, Origin: string(t.serverName), } if edu.Content, err = json.Marshal(output); err != nil { diff --git a/federationapi/consumers/receipts.go b/federationapi/consumers/receipts.go new file mode 100644 index 00000000..9300451e --- /dev/null +++ b/federationapi/consumers/receipts.go @@ -0,0 +1,141 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + "strconv" + + "github.com/getsentry/sentry-go" + "github.com/matrix-org/dendrite/federationapi/queue" + "github.com/matrix-org/dendrite/federationapi/storage" + fedTypes "github.com/matrix-org/dendrite/federationapi/types" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" + syncTypes "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" +) + +// OutputReceiptConsumer consumes events that originate in the clientapi. +type OutputReceiptConsumer struct { + ctx context.Context + jetstream nats.JetStreamContext + durable string + db storage.Database + queues *queue.OutgoingQueues + ServerName gomatrixserverlib.ServerName + topic string +} + +// NewOutputReceiptConsumer creates a new OutputReceiptConsumer. Call Start() to begin consuming typing events. +func NewOutputReceiptConsumer( + process *process.ProcessContext, + cfg *config.FederationAPI, + js nats.JetStreamContext, + queues *queue.OutgoingQueues, + store storage.Database, +) *OutputReceiptConsumer { + return &OutputReceiptConsumer{ + ctx: process.Context(), + jetstream: js, + queues: queues, + db: store, + ServerName: cfg.Matrix.ServerName, + durable: cfg.Matrix.JetStream.Durable("FederationAPIReceiptConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), + } +} + +// Start consuming from the clientapi +func (t *OutputReceiptConsumer) Start() error { + return jetstream.JetStreamConsumer( + t.ctx, t.jetstream, t.topic, t.durable, t.onMessage, + nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(), + ) +} + +// onMessage is called in response to a message received on the receipt +// events topic from the client api. +func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { + receipt := syncTypes.OutputReceiptEvent{ + UserID: msg.Header.Get(jetstream.UserID), + RoomID: msg.Header.Get(jetstream.RoomID), + EventID: msg.Header.Get(jetstream.EventID), + Type: msg.Header.Get("type"), + } + + // only send receipt events which originated from us + _, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID) + if err != nil { + log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender") + return true + } + if receiptServerName != t.ServerName { + return true + } + + timestamp, err := strconv.Atoi(msg.Header.Get("timestamp")) + if err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("EDU output log: message parse failure") + sentry.CaptureException(err) + return true + } + + receipt.Timestamp = gomatrixserverlib.Timestamp(timestamp) + + joined, err := t.db.GetJoinedHosts(ctx, receipt.RoomID) + if err != nil { + log.WithError(err).WithField("room_id", receipt.RoomID).Error("failed to get joined hosts for room") + return false + } + + names := make([]gomatrixserverlib.ServerName, len(joined)) + for i := range joined { + names[i] = joined[i].ServerName + } + + content := map[string]fedTypes.FederationReceiptMRead{} + content[receipt.RoomID] = fedTypes.FederationReceiptMRead{ + User: map[string]fedTypes.FederationReceiptData{ + receipt.UserID: { + Data: fedTypes.ReceiptTS{ + TS: receipt.Timestamp, + }, + EventIDs: []string{receipt.EventID}, + }, + }, + } + + edu := &gomatrixserverlib.EDU{ + Type: gomatrixserverlib.MReceipt, + Origin: string(t.ServerName), + } + if edu.Content, err = json.Marshal(content); err != nil { + log.WithError(err).Error("failed to marshal EDU JSON") + return true + } + + if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil { + log.WithError(err).Error("failed to send EDU") + return false + } + + return true +} diff --git a/federationapi/consumers/sendtodevice.go b/federationapi/consumers/sendtodevice.go new file mode 100644 index 00000000..84c9f620 --- /dev/null +++ b/federationapi/consumers/sendtodevice.go @@ -0,0 +1,125 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + + "github.com/matrix-org/dendrite/federationapi/queue" + "github.com/matrix-org/dendrite/federationapi/storage" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" + syncTypes "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" +) + +// OutputSendToDeviceConsumer consumes events that originate in the clientapi. +type OutputSendToDeviceConsumer struct { + ctx context.Context + jetstream nats.JetStreamContext + durable string + db storage.Database + queues *queue.OutgoingQueues + ServerName gomatrixserverlib.ServerName + topic string +} + +// NewOutputSendToDeviceConsumer creates a new OutputSendToDeviceConsumer. Call Start() to begin consuming send-to-device events. +func NewOutputSendToDeviceConsumer( + process *process.ProcessContext, + cfg *config.FederationAPI, + js nats.JetStreamContext, + queues *queue.OutgoingQueues, + store storage.Database, +) *OutputSendToDeviceConsumer { + return &OutputSendToDeviceConsumer{ + ctx: process.Context(), + jetstream: js, + queues: queues, + db: store, + ServerName: cfg.Matrix.ServerName, + durable: cfg.Matrix.JetStream.Durable("FederationAPIESendToDeviceConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), + } +} + +// Start consuming from the client api +func (t *OutputSendToDeviceConsumer) Start() error { + return jetstream.JetStreamConsumer( + t.ctx, t.jetstream, t.topic, t.durable, t.onMessage, + nats.DeliverAll(), nats.ManualAck(), + ) +} + +// onMessage is called in response to a message received on the +// send-to-device events topic from the client api. +func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { + // only send send-to-device events which originated from us + sender := msg.Header.Get("sender") + _, originServerName, err := gomatrixserverlib.SplitID('@', sender) + if err != nil { + log.WithError(err).WithField("user_id", sender).Error("Failed to extract domain from send-to-device sender") + return true + } + if originServerName != t.ServerName { + log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere") + return true + } + // Extract the send-to-device event from msg. + var ote syncTypes.OutputSendToDeviceEvent + if err = json.Unmarshal(msg.Data, &ote); err != nil { + log.WithError(err).Errorf("output log: message parse failed (expected send-to-device)") + return true + } + + _, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID) + if err != nil { + log.WithError(err).WithField("user_id", ote.UserID).Error("Failed to extract domain from send-to-device destination") + return true + } + + // Pack the EDU and marshal it + edu := &gomatrixserverlib.EDU{ + Type: gomatrixserverlib.MDirectToDevice, + Origin: string(t.ServerName), + } + tdm := gomatrixserverlib.ToDeviceMessage{ + Sender: ote.Sender, + Type: ote.Type, + MessageID: util.RandomString(32), + Messages: map[string]map[string]json.RawMessage{ + ote.UserID: { + ote.DeviceID: ote.Content, + }, + }, + } + if edu.Content, err = json.Marshal(tdm); err != nil { + log.WithError(err).Error("failed to marshal EDU JSON") + return true + } + + log.Debugf("Sending send-to-device message into %q destination queue", destServerName) + if err := t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil { + log.WithError(err).Error("failed to send EDU") + return false + } + + return true +} diff --git a/federationapi/consumers/typing.go b/federationapi/consumers/typing.go new file mode 100644 index 00000000..428e1a86 --- /dev/null +++ b/federationapi/consumers/typing.go @@ -0,0 +1,119 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + "strconv" + + "github.com/matrix-org/dendrite/federationapi/queue" + "github.com/matrix-org/dendrite/federationapi/storage" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" +) + +// OutputTypingConsumer consumes events that originate in the clientapi. +type OutputTypingConsumer struct { + ctx context.Context + jetstream nats.JetStreamContext + durable string + db storage.Database + queues *queue.OutgoingQueues + ServerName gomatrixserverlib.ServerName + topic string +} + +// NewOutputTypingConsumer creates a new OutputTypingConsumer. Call Start() to begin consuming typing events. +func NewOutputTypingConsumer( + process *process.ProcessContext, + cfg *config.FederationAPI, + js nats.JetStreamContext, + queues *queue.OutgoingQueues, + store storage.Database, +) *OutputTypingConsumer { + return &OutputTypingConsumer{ + ctx: process.Context(), + jetstream: js, + queues: queues, + db: store, + ServerName: cfg.Matrix.ServerName, + durable: cfg.Matrix.JetStream.Durable("FederationAPITypingConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), + } +} + +// Start consuming from the clientapi +func (t *OutputTypingConsumer) Start() error { + return jetstream.JetStreamConsumer( + t.ctx, t.jetstream, t.topic, t.durable, t.onMessage, + nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(), + ) +} + +// onMessage is called in response to a message received on the typing +// events topic from the client api. +func (t *OutputTypingConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { + // Extract the typing event from msg. + roomID := msg.Header.Get(jetstream.RoomID) + userID := msg.Header.Get(jetstream.UserID) + typing, err := strconv.ParseBool(msg.Header.Get("typing")) + if err != nil { + log.WithError(err).Errorf("EDU output log: typing parse failure") + return true + } + + // only send typing events which originated from us + _, typingServerName, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + log.WithError(err).WithField("user_id", userID).Error("Failed to extract domain from typing sender") + _ = msg.Ack() + return true + } + if typingServerName != t.ServerName { + return true + } + + joined, err := t.db.GetJoinedHosts(ctx, roomID) + if err != nil { + log.WithError(err).WithField("room_id", roomID).Error("failed to get joined hosts for room") + return false + } + + names := make([]gomatrixserverlib.ServerName, len(joined)) + for i := range joined { + names[i] = joined[i].ServerName + } + + edu := &gomatrixserverlib.EDU{Type: "m.typing"} + if edu.Content, err = json.Marshal(map[string]interface{}{ + "room_id": roomID, + "user_id": userID, + "typing": typing, + }); err != nil { + log.WithError(err).Error("failed to marshal EDU JSON") + return true + } + if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil { + log.WithError(err).Error("failed to send EDU") + return false + } + + return true +} diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index b7f93ecb..8a0ce8e3 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -16,12 +16,12 @@ package federationapi import ( "github.com/gorilla/mux" - eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/federationapi/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/consumers" "github.com/matrix-org/dendrite/federationapi/internal" "github.com/matrix-org/dendrite/federationapi/inthttp" + "github.com/matrix-org/dendrite/federationapi/producers" "github.com/matrix-org/dendrite/federationapi/queue" "github.com/matrix-org/dendrite/federationapi/statistics" "github.com/matrix-org/dendrite/federationapi/storage" @@ -31,6 +31,7 @@ import ( "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/sirupsen/logrus" @@ -46,6 +47,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.FederationInternalAPI) { // AddPublicRoutes sets up and registers HTTP handlers on the base API muxes for the FederationAPI component. func AddPublicRoutes( + process *process.ProcessContext, fedRouter, keyRouter, wellKnownRouter *mux.Router, cfg *config.FederationAPI, userAPI userapi.UserInternalAPI, @@ -53,16 +55,26 @@ func AddPublicRoutes( keyRing gomatrixserverlib.JSONVerifier, rsAPI roomserverAPI.RoomserverInternalAPI, federationAPI federationAPI.FederationInternalAPI, - eduAPI eduserverAPI.EDUServerInputAPI, keyAPI keyserverAPI.KeyInternalAPI, mscCfg *config.MSCs, servers federationAPI.ServersInRoomProvider, ) { + + js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream) + producer := &producers.SyncAPIProducer{ + JetStream: js, + TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), + TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), + TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), + ServerName: cfg.Matrix.ServerName, + UserAPI: userAPI, + } + routing.Setup( fedRouter, keyRouter, wellKnownRouter, cfg, rsAPI, - eduAPI, federationAPI, keyRing, + federationAPI, keyRing, federation, userAPI, keyAPI, mscCfg, - servers, + servers, producer, ) } @@ -112,17 +124,28 @@ func NewInternalAPI( if err = rsConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start room server consumer") } - - tsConsumer := consumers.NewOutputEDUConsumer( + tsConsumer := consumers.NewOutputSendToDeviceConsumer( base.ProcessContext, cfg, js, queues, federationDB, ) - if err := tsConsumer.Start(); err != nil { - logrus.WithError(err).Panic("failed to start typing server consumer") + if err = tsConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start send-to-device consumer") + } + receiptConsumer := consumers.NewOutputReceiptConsumer( + base.ProcessContext, cfg, js, queues, federationDB, + ) + if err = receiptConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start receipt consumer") + } + typingConsumer := consumers.NewOutputTypingConsumer( + base.ProcessContext, cfg, js, queues, federationDB, + ) + if err = typingConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start typing consumer") } keyConsumer := consumers.NewKeyChangeConsumer( base.ProcessContext, &base.Cfg.KeyServer, js, queues, federationDB, rsAPI, ) - if err := keyConsumer.Start(); err != nil { + if err = keyConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start key server consumer") } diff --git a/federationapi/federationapi_test.go b/federationapi/federationapi_test.go index c660f12e..833359c1 100644 --- a/federationapi/federationapi_test.go +++ b/federationapi/federationapi_test.go @@ -30,7 +30,7 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) { fsAPI := base.FederationAPIHTTPClient() // TODO: This is pretty fragile, as if anything calls anything on these nils this test will break. // Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing. - federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicWellKnownAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, &cfg.MSCs, nil) + federationapi.AddPublicRoutes(base.ProcessContext, base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicWellKnownAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, &cfg.MSCs, nil) baseURL, cancel := test.ListenAndServe(t, base.PublicFederationAPIMux, true) defer cancel() serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://")) diff --git a/federationapi/producers/syncapi.go b/federationapi/producers/syncapi.go new file mode 100644 index 00000000..24acb126 --- /dev/null +++ b/federationapi/producers/syncapi.go @@ -0,0 +1,144 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "context" + "encoding/json" + "strconv" + + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" +) + +// SyncAPIProducer produces events for the sync API server to consume +type SyncAPIProducer struct { + TopicReceiptEvent string + TopicSendToDeviceEvent string + TopicTypingEvent string + JetStream nats.JetStreamContext + ServerName gomatrixserverlib.ServerName + UserAPI userapi.UserInternalAPI +} + +func (p *SyncAPIProducer) SendReceipt( + ctx context.Context, + userID, roomID, eventID, receiptType string, timestamp gomatrixserverlib.Timestamp, +) error { + m := &nats.Msg{ + Subject: p.TopicReceiptEvent, + Header: nats.Header{}, + } + m.Header.Set(jetstream.UserID, userID) + m.Header.Set(jetstream.RoomID, roomID) + m.Header.Set(jetstream.EventID, eventID) + m.Header.Set("type", receiptType) + m.Header.Set("timestamp", strconv.Itoa(int(timestamp))) + + log.WithFields(log.Fields{}).Tracef("Producing to topic '%s'", p.TopicReceiptEvent) + _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) + return err +} + +func (p *SyncAPIProducer) SendToDevice( + ctx context.Context, sender, userID, deviceID, eventType string, + message interface{}, +) error { + devices := []string{} + _, domain, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return err + } + + // If the event is targeted locally then we want to expand the wildcard + // out into individual device IDs so that we can send them to each respective + // device. If the event isn't targeted locally then we can't expand the + // wildcard as we don't know about the remote devices, so instead we leave it + // as-is, so that the federation sender can send it on with the wildcard intact. + if domain == p.ServerName && deviceID == "*" { + var res userapi.QueryDevicesResponse + err = p.UserAPI.QueryDevices(context.TODO(), &userapi.QueryDevicesRequest{ + UserID: userID, + }, &res) + if err != nil { + return err + } + for _, dev := range res.Devices { + devices = append(devices, dev.ID) + } + } else { + devices = append(devices, deviceID) + } + + js, err := json.Marshal(message) + if err != nil { + return err + } + + log.WithFields(log.Fields{ + "user_id": userID, + "num_devices": len(devices), + "type": eventType, + }).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent) + for _, device := range devices { + ote := &types.OutputSendToDeviceEvent{ + UserID: userID, + DeviceID: device, + SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ + Sender: sender, + Type: eventType, + Content: js, + }, + } + + eventJSON, err := json.Marshal(ote) + if err != nil { + log.WithError(err).Error("sendToDevice failed json.Marshal") + return err + } + m := &nats.Msg{ + Subject: p.TopicSendToDeviceEvent, + Data: eventJSON, + Header: nats.Header{}, + } + m.Header.Set("sender", sender) + m.Header.Set(jetstream.UserID, userID) + + if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil { + log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") + return err + } + } + return nil +} + +func (p *SyncAPIProducer) SendTyping( + ctx context.Context, userID, roomID string, typing bool, timeoutMS int64, +) error { + m := &nats.Msg{ + Subject: p.TopicTypingEvent, + Header: nats.Header{}, + } + m.Header.Set(jetstream.UserID, userID) + m.Header.Set(jetstream.RoomID, roomID) + m.Header.Set("typing", strconv.FormatBool(typing)) + m.Header.Set("timeout_ms", strconv.Itoa(int(timeoutMS))) + _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) + return err +} diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index 04c88d95..9e5cdb28 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -19,8 +19,8 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/clientapi/jsonerror" - eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/dendrite/federationapi/producers" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/httputil" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" @@ -44,7 +44,6 @@ func Setup( fedMux, keyMux, wkMux *mux.Router, cfg *config.FederationAPI, rsAPI roomserverAPI.RoomserverInternalAPI, - eduAPI eduserverAPI.EDUServerInputAPI, fsAPI federationAPI.FederationInternalAPI, keys gomatrixserverlib.JSONVerifier, federation *gomatrixserverlib.FederationClient, @@ -52,6 +51,7 @@ func Setup( keyAPI keyserverAPI.KeyInternalAPI, mscCfg *config.MSCs, servers federationAPI.ServersInRoomProvider, + producer *producers.SyncAPIProducer, ) { v2keysmux := keyMux.PathPrefix("/v2").Subrouter() v1fedmux := fedMux.PathPrefix("/v1").Subrouter() @@ -116,7 +116,7 @@ func Setup( func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { return Send( httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]), - cfg, rsAPI, eduAPI, keyAPI, keys, federation, mu, servers, + cfg, rsAPI, keyAPI, keys, federation, mu, servers, producer, ) }, )).Methods(http.MethodPut, http.MethodOptions) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 745e36de..eacc76db 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -23,8 +23,9 @@ import ( "time" "github.com/matrix-org/dendrite/clientapi/jsonerror" - eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/dendrite/federationapi/producers" + "github.com/matrix-org/dendrite/federationapi/types" "github.com/matrix-org/dendrite/internal" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" @@ -87,12 +88,12 @@ func Send( txnID gomatrixserverlib.TransactionID, cfg *config.FederationAPI, rsAPI api.RoomserverInternalAPI, - eduAPI eduserverAPI.EDUServerInputAPI, keyAPI keyapi.KeyInternalAPI, keys gomatrixserverlib.JSONVerifier, federation *gomatrixserverlib.FederationClient, mu *internal.MutexByRoom, servers federationAPI.ServersInRoomProvider, + producer *producers.SyncAPIProducer, ) util.JSONResponse { // First we should check if this origin has already submitted this // txn ID to us. If they have and the txnIDs map contains an entry, @@ -127,12 +128,12 @@ func Send( t := txnReq{ rsAPI: rsAPI, - eduAPI: eduAPI, keys: keys, federation: federation, servers: servers, keyAPI: keyAPI, roomsMu: mu, + producer: producer, } var txnEvents struct { @@ -185,12 +186,12 @@ func Send( type txnReq struct { gomatrixserverlib.Transaction rsAPI api.RoomserverInternalAPI - eduAPI eduserverAPI.EDUServerInputAPI keyAPI keyapi.KeyInternalAPI keys gomatrixserverlib.JSONVerifier federation txnFederationClient roomsMu *internal.MutexByRoom servers federationAPI.ServersInRoomProvider + producer *producers.SyncAPIProducer } // A subset of FederationClient functionality that txn requires. Useful for testing. @@ -329,8 +330,8 @@ func (t *txnReq) processEDUs(ctx context.Context) { util.GetLogger(ctx).Debugf("Dropping typing event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin) continue } - if err := eduserverAPI.SendTyping(ctx, t.eduAPI, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil { - util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to edu server") + if err := t.producer.SendTyping(ctx, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil { + util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to JetStream") } case gomatrixserverlib.MDirectToDevice: // https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema @@ -342,12 +343,12 @@ func (t *txnReq) processEDUs(ctx context.Context) { for userID, byUser := range directPayload.Messages { for deviceID, message := range byUser { // TODO: check that the user and the device actually exist here - if err := eduserverAPI.SendToDevice(ctx, t.eduAPI, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil { + if err := t.producer.SendToDevice(ctx, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil { util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ "sender": directPayload.Sender, "user_id": userID, "device_id": deviceID, - }).Error("Failed to send send-to-device event to edu server") + }).Error("Failed to send send-to-device event to JetStream") } } } @@ -355,7 +356,7 @@ func (t *txnReq) processEDUs(ctx context.Context) { t.processDeviceListUpdate(ctx, e) case gomatrixserverlib.MReceipt: // https://matrix.org/docs/spec/server_server/r0.1.4#receipts - payload := map[string]eduserverAPI.FederationReceiptMRead{} + payload := map[string]types.FederationReceiptMRead{} if err := json.Unmarshal(e.Content, &payload); err != nil { util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal receipt event") @@ -379,12 +380,12 @@ func (t *txnReq) processEDUs(ctx context.Context) { "user_id": userID, "room_id": roomID, "events": mread.EventIDs, - }).Error("Failed to send receipt event to edu server") + }).Error("Failed to send receipt event to JetStream") continue } } } - case eduserverAPI.MSigningKeyUpdate: + case types.MSigningKeyUpdate: if err := t.processSigningKeyUpdate(ctx, e); err != nil { logrus.WithError(err).Errorf("Failed to process signing key update") } @@ -395,7 +396,7 @@ func (t *txnReq) processEDUs(ctx context.Context) { } func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverlib.EDU) error { - var updatePayload eduserverAPI.CrossSigningKeyUpdate + var updatePayload keyapi.CrossSigningKeyUpdate if err := json.Unmarshal(e.Content, &updatePayload); err != nil { util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ "user_id": updatePayload.UserID, @@ -422,7 +423,7 @@ func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverli return nil } -// processReceiptEvent sends receipt events to the edu server +// processReceiptEvent sends receipt events to JetStream func (t *txnReq) processReceiptEvent(ctx context.Context, userID, roomID, receiptType string, timestamp gomatrixserverlib.Timestamp, @@ -430,17 +431,7 @@ func (t *txnReq) processReceiptEvent(ctx context.Context, ) error { // store every event for _, eventID := range eventIDs { - req := eduserverAPI.InputReceiptEventRequest{ - InputReceiptEvent: eduserverAPI.InputReceiptEvent{ - UserID: userID, - RoomID: roomID, - EventID: eventID, - Type: receiptType, - Timestamp: timestamp, - }, - } - resp := eduserverAPI.InputReceiptEventResponse{} - if err := t.eduAPI.InputReceiptEvent(ctx, &req, &resp); err != nil { + if err := t.producer.SendReceipt(ctx, userID, roomID, eventID, receiptType, timestamp); err != nil { return fmt.Errorf("unable to set receipt event: %w", err) } } diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index 4280643e..8d2d8504 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - eduAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/test" "github.com/matrix-org/dendrite/roomserver/api" @@ -53,44 +52,6 @@ func init() { } } -type testEDUProducer struct { - // this producer keeps track of calls to InputTypingEvent - invocations []eduAPI.InputTypingEventRequest -} - -func (p *testEDUProducer) InputTypingEvent( - ctx context.Context, - request *eduAPI.InputTypingEventRequest, - response *eduAPI.InputTypingEventResponse, -) error { - p.invocations = append(p.invocations, *request) - return nil -} - -func (p *testEDUProducer) InputSendToDeviceEvent( - ctx context.Context, - request *eduAPI.InputSendToDeviceEventRequest, - response *eduAPI.InputSendToDeviceEventResponse, -) error { - return nil -} - -func (o *testEDUProducer) InputReceiptEvent( - ctx context.Context, - request *eduAPI.InputReceiptEventRequest, - response *eduAPI.InputReceiptEventResponse, -) error { - return nil -} - -func (o *testEDUProducer) InputCrossSigningKeyUpdate( - ctx context.Context, - request *eduAPI.InputCrossSigningKeyUpdateRequest, - response *eduAPI.InputCrossSigningKeyUpdateResponse, -) error { - return nil -} - type testRoomserverAPI struct { api.RoomserverInternalAPITrace inputRoomEvents []api.InputRoomEvent @@ -225,7 +186,6 @@ func (c *txnFedClient) LookupMissingEvents(ctx context.Context, s gomatrixserver func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederationClient, pdus []json.RawMessage) *txnReq { t := &txnReq{ rsAPI: rsAPI, - eduAPI: &testEDUProducer{}, keys: &test.NopJSONVerifier{}, federation: fedClient, roomsMu: internal.NewMutexByRoom(), diff --git a/federationapi/types/types.go b/federationapi/types/types.go index c486c05c..a28a80b2 100644 --- a/federationapi/types/types.go +++ b/federationapi/types/types.go @@ -18,6 +18,8 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) +const MSigningKeyUpdate = "m.signing_key_update" // TODO: move to gomatrixserverlib + // A JoinedHost is a server that is joined to a matrix room. type JoinedHost struct { // The MemberEventID of a m.room.member join event. @@ -51,3 +53,16 @@ type InboundPeek struct { RenewedTimestamp int64 RenewalInterval int64 } + +type FederationReceiptMRead struct { + User map[string]FederationReceiptData `json:"m.read"` +} + +type FederationReceiptData struct { + Data ReceiptTS `json:"data"` + EventIDs []string `json:"event_ids"` +} + +type ReceiptTS struct { + TS gomatrixserverlib.Timestamp `json:"ts"` +} diff --git a/eduserver/cache/cache.go b/internal/caching/cache_typing.go similarity index 97% rename from eduserver/cache/cache.go rename to internal/caching/cache_typing.go index f637d7c9..bd6a5fc1 100644 --- a/eduserver/cache/cache.go +++ b/internal/caching/cache_typing.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cache +package caching import ( "sync" @@ -53,8 +53,8 @@ func (t *EDUCache) newRoomData() *roomData { } } -// New returns a new EDUCache initialised for use. -func New() *EDUCache { +// NewTypingCache returns a new EDUCache initialised for use. +func NewTypingCache() *EDUCache { return &EDUCache{data: make(map[string]*roomData)} } diff --git a/eduserver/cache/cache_test.go b/internal/caching/cache_typing_test.go similarity index 97% rename from eduserver/cache/cache_test.go rename to internal/caching/cache_typing_test.go index c7d01879..c03d89bc 100644 --- a/eduserver/cache/cache_test.go +++ b/internal/caching/cache_typing_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cache +package caching import ( "testing" @@ -24,9 +24,9 @@ import ( ) func TestEDUCache(t *testing.T) { - tCache := New() + tCache := NewTypingCache() if tCache == nil { - t.Fatal("New failed") + t.Fatal("NewTypingCache failed") } t.Run("AddTypingUser", func(t *testing.T) { diff --git a/internal/test/config.go b/internal/test/config.go index 0372fb9c..0b0e897b 100644 --- a/internal/test/config.go +++ b/internal/test/config.go @@ -97,7 +97,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(database) cfg.AppServiceAPI.InternalAPI.Listen = assignAddress() - cfg.EDUServer.InternalAPI.Listen = assignAddress() cfg.FederationAPI.InternalAPI.Listen = assignAddress() cfg.KeyServer.InternalAPI.Listen = assignAddress() cfg.MediaAPI.InternalAPI.Listen = assignAddress() @@ -106,7 +105,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con cfg.UserAPI.InternalAPI.Listen = assignAddress() cfg.AppServiceAPI.InternalAPI.Connect = cfg.AppServiceAPI.InternalAPI.Listen - cfg.EDUServer.InternalAPI.Connect = cfg.EDUServer.InternalAPI.Listen cfg.FederationAPI.InternalAPI.Connect = cfg.FederationAPI.InternalAPI.Listen cfg.KeyServer.InternalAPI.Connect = cfg.KeyServer.InternalAPI.Listen cfg.MediaAPI.InternalAPI.Connect = cfg.MediaAPI.InternalAPI.Listen diff --git a/keyserver/api/api.go b/keyserver/api/api.go index d361c622..429617b1 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -21,7 +21,6 @@ import ( "strings" "time" - eduapi "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/keyserver/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -66,14 +65,25 @@ const ( // DeviceMessage represents the message produced into Kafka by the key server. type DeviceMessage struct { - Type DeviceMessageType `json:"Type,omitempty"` - *DeviceKeys `json:"DeviceKeys,omitempty"` - *eduapi.OutputCrossSigningKeyUpdate `json:"CrossSigningKeyUpdate,omitempty"` + Type DeviceMessageType `json:"Type,omitempty"` + *DeviceKeys `json:"DeviceKeys,omitempty"` + *OutputCrossSigningKeyUpdate `json:"CrossSigningKeyUpdate,omitempty"` // A monotonically increasing number which represents device changes for this user. StreamID int64 DeviceChangeID int64 } +// OutputCrossSigningKeyUpdate is an entry in the signing key update output kafka log +type OutputCrossSigningKeyUpdate struct { + CrossSigningKeyUpdate `json:"signing_keys"` +} + +type CrossSigningKeyUpdate struct { + MasterKey *gomatrixserverlib.CrossSigningKey `json:"master_key,omitempty"` + SelfSigningKey *gomatrixserverlib.CrossSigningKey `json:"self_signing_key,omitempty"` + UserID string `json:"user_id"` +} + // DeviceKeysEqual returns true if the device keys updates contain the // same display name and key JSON. This will return false if either of // the updates is not a device keys update, or if the user ID/device ID diff --git a/keyserver/internal/cross_signing.go b/keyserver/internal/cross_signing.go index 5124f37e..0d083b4b 100644 --- a/keyserver/internal/cross_signing.go +++ b/keyserver/internal/cross_signing.go @@ -22,7 +22,6 @@ import ( "fmt" "strings" - eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/types" "github.com/matrix-org/gomatrixserverlib" @@ -246,7 +245,7 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P } // Finally, generate a notification that we updated the keys. - update := eduserverAPI.CrossSigningKeyUpdate{ + update := api.CrossSigningKeyUpdate{ UserID: req.UserID, } if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok { @@ -337,7 +336,7 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req for userID := range req.Signatures { masterKey := queryRes.MasterKeys[userID] selfSigningKey := queryRes.SelfSigningKeys[userID] - update := eduserverAPI.CrossSigningKeyUpdate{ + update := api.CrossSigningKeyUpdate{ UserID: userID, MasterKey: &masterKey, SelfSigningKey: &selfSigningKey, diff --git a/keyserver/producers/keychange.go b/keyserver/producers/keychange.go index 9e1c4c64..f86c3417 100644 --- a/keyserver/producers/keychange.go +++ b/keyserver/producers/keychange.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" - eduapi "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/storage" "github.com/matrix-org/dendrite/setup/jetstream" @@ -70,10 +69,10 @@ func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error { return nil } -func (p *KeyChange) ProduceSigningKeyUpdate(key eduapi.CrossSigningKeyUpdate) error { +func (p *KeyChange) ProduceSigningKeyUpdate(key api.CrossSigningKeyUpdate) error { output := &api.DeviceMessage{ Type: api.TypeCrossSigningUpdate, - OutputCrossSigningKeyUpdate: &eduapi.OutputCrossSigningKeyUpdate{ + OutputCrossSigningKeyUpdate: &api.OutputCrossSigningKeyUpdate{ CrossSigningKeyUpdate: key, }, } diff --git a/setup/base/base.go b/setup/base/base.go index 6135e080..43d613b0 100644 --- a/setup/base/base.go +++ b/setup/base/base.go @@ -45,8 +45,6 @@ import ( appserviceAPI "github.com/matrix-org/dendrite/appservice/api" asinthttp "github.com/matrix-org/dendrite/appservice/inthttp" - eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" - eduinthttp "github.com/matrix-org/dendrite/eduserver/inthttp" federationAPI "github.com/matrix-org/dendrite/federationapi/api" federationIntHTTP "github.com/matrix-org/dendrite/federationapi/inthttp" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" @@ -247,15 +245,6 @@ func (b *BaseDendrite) UserAPIClient() userapi.UserInternalAPI { return userAPI } -// EDUServerClient returns EDUServerInputAPI for hitting the EDU server over HTTP -func (b *BaseDendrite) EDUServerClient() eduServerAPI.EDUServerInputAPI { - e, err := eduinthttp.NewEDUServerClient(b.Cfg.EDUServerURL(), b.apiHttpClient) - if err != nil { - logrus.WithError(err).Panic("EDUServerClient failed", b.apiHttpClient) - } - return e -} - // FederationAPIHTTPClient returns FederationInternalAPI for hitting // the federation API server over HTTP func (b *BaseDendrite) FederationAPIHTTPClient() federationAPI.FederationInternalAPI { diff --git a/setup/config/config.go b/setup/config/config.go index eb371a54..e03518e2 100644 --- a/setup/config/config.go +++ b/setup/config/config.go @@ -56,7 +56,6 @@ type Dendrite struct { Global Global `yaml:"global"` AppServiceAPI AppServiceAPI `yaml:"app_service_api"` ClientAPI ClientAPI `yaml:"client_api"` - EDUServer EDUServer `yaml:"edu_server"` FederationAPI FederationAPI `yaml:"federation_api"` KeyServer KeyServer `yaml:"key_server"` MediaAPI MediaAPI `yaml:"media_api"` @@ -296,7 +295,6 @@ func (c *Dendrite) Defaults(generate bool) { c.Global.Defaults(generate) c.ClientAPI.Defaults(generate) - c.EDUServer.Defaults(generate) c.FederationAPI.Defaults(generate) c.KeyServer.Defaults(generate) c.MediaAPI.Defaults(generate) @@ -314,8 +312,7 @@ func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) { Verify(configErrs *ConfigErrors, isMonolith bool) } for _, c := range []verifiable{ - &c.Global, &c.ClientAPI, - &c.EDUServer, &c.FederationAPI, + &c.Global, &c.ClientAPI, &c.FederationAPI, &c.KeyServer, &c.MediaAPI, &c.RoomServer, &c.SyncAPI, &c.UserAPI, &c.AppServiceAPI, &c.MSCs, @@ -327,7 +324,6 @@ func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) { func (c *Dendrite) Wiring() { c.Global.JetStream.Matrix = &c.Global c.ClientAPI.Matrix = &c.Global - c.EDUServer.Matrix = &c.Global c.FederationAPI.Matrix = &c.Global c.KeyServer.Matrix = &c.Global c.MediaAPI.Matrix = &c.Global @@ -519,15 +515,6 @@ func (config *Dendrite) UserAPIURL() string { return string(config.UserAPI.InternalAPI.Connect) } -// EDUServerURL returns an HTTP URL for where the EDU server is listening. -func (config *Dendrite) EDUServerURL() string { - // Hard code the EDU server to talk HTTP for now. - // If we support HTTPS we need to think of a practical way to do certificate validation. - // People setting up servers shouldn't need to get a certificate valid for the public - // internet for an internal API. - return string(config.EDUServer.InternalAPI.Connect) -} - // KeyServerURL returns an HTTP URL for where the key server is listening. func (config *Dendrite) KeyServerURL() string { // Hard code the key server to talk HTTP for now. diff --git a/setup/config/config_eduserver.go b/setup/config/config_eduserver.go deleted file mode 100644 index e7ed36aa..00000000 --- a/setup/config/config_eduserver.go +++ /dev/null @@ -1,17 +0,0 @@ -package config - -type EDUServer struct { - Matrix *Global `yaml:"-"` - - InternalAPI InternalAPIOptions `yaml:"internal_api"` -} - -func (c *EDUServer) Defaults(generate bool) { - c.InternalAPI.Listen = "http://localhost:7778" - c.InternalAPI.Connect = "http://localhost:7778" -} - -func (c *EDUServer) Verify(configErrs *ConfigErrors, isMonolith bool) { - checkURL(configErrs, "edu_server.internal_api.listen", string(c.InternalAPI.Listen)) - checkURL(configErrs, "edu_server.internal_api.connect", string(c.InternalAPI.Connect)) -} diff --git a/setup/config/config_test.go b/setup/config/config_test.go index e6f0a493..46e973fa 100644 --- a/setup/config/config_test.go +++ b/setup/config/config_test.go @@ -101,10 +101,6 @@ current_state_server: max_open_conns: 100 max_idle_conns: 2 conn_max_lifetime: -1 -edu_server: - internal_api: - listen: http://localhost:7778 - connect: http://localhost:7778 federation_api: internal_api: listen: http://localhost:7772 diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 328cf915..4e4fe7a2 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -157,5 +157,26 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc } } + // Clean up old consumers so that interest-based consumers do the + // right thing. + for stream, consumers := range map[string][]string{ + OutputClientData: {"SyncAPIClientAPIConsumer"}, + OutputReceiptEvent: {"SyncAPIEDUServerReceiptConsumer", "FederationAPIEDUServerConsumer"}, + OutputSendToDeviceEvent: {"SyncAPIEDUServerSendToDeviceConsumer", "FederationAPIEDUServerConsumer"}, + OutputTypingEvent: {"SyncAPIEDUServerTypingConsumer", "FederationAPIEDUServerConsumer"}, + } { + streamName := cfg.Matrix.JetStream.Prefixed(stream) + for _, consumer := range consumers { + consumerName := cfg.Matrix.JetStream.Prefixed(consumer) + "Pull" + consumerInfo, err := s.ConsumerInfo(streamName, consumerName) + if err != nil || consumerInfo == nil { + continue + } + if err = s.DeleteConsumer(streamName, consumerName); err != nil { + logrus.WithError(err).Errorf("Unable to clean up old consumer %q for stream %q", consumer, stream) + } + } + } + return s, nc } diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index aa979924..5f0d37fd 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -9,8 +9,9 @@ import ( ) const ( - UserID = "user_id" - RoomID = "room_id" + UserID = "user_id" + RoomID = "room_id" + EventID = "event_id" ) var ( diff --git a/setup/monolith.go b/setup/monolith.go index cf6872f9..32f1a649 100644 --- a/setup/monolith.go +++ b/setup/monolith.go @@ -19,7 +19,6 @@ import ( appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi" "github.com/matrix-org/dendrite/clientapi/api" - eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/federationapi" federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/transactions" @@ -43,12 +42,11 @@ type Monolith struct { Client *gomatrixserverlib.Client FedClient *gomatrixserverlib.FederationClient - AppserviceAPI appserviceAPI.AppServiceQueryAPI - EDUInternalAPI eduServerAPI.EDUServerInputAPI - FederationAPI federationAPI.FederationInternalAPI - RoomserverAPI roomserverAPI.RoomserverInternalAPI - UserAPI userapi.UserInternalAPI - KeyAPI keyAPI.KeyInternalAPI + AppserviceAPI appserviceAPI.AppServiceQueryAPI + FederationAPI federationAPI.FederationInternalAPI + RoomserverAPI roomserverAPI.RoomserverInternalAPI + UserAPI userapi.UserInternalAPI + KeyAPI keyAPI.KeyInternalAPI // Optional ExtPublicRoomsProvider api.ExtraPublicRoomsProvider @@ -64,14 +62,14 @@ func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ss clientapi.AddPublicRoutes( process, csMux, synapseMux, &m.Config.ClientAPI, m.FedClient, m.RoomserverAPI, - m.EDUInternalAPI, m.AppserviceAPI, transactions.New(), + m.AppserviceAPI, transactions.New(), m.FederationAPI, m.UserAPI, userDirectoryProvider, m.KeyAPI, m.ExtPublicRoomsProvider, &m.Config.MSCs, ) federationapi.AddPublicRoutes( - ssMux, keyMux, wkMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient, + process, ssMux, keyMux, wkMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient, m.KeyRing, m.RoomserverAPI, m.FederationAPI, - m.EDUInternalAPI, m.KeyAPI, &m.Config.MSCs, nil, + m.KeyAPI, &m.Config.MSCs, nil, ) mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, &m.Config.ClientAPI.RateLimiting, m.UserAPI, m.Client) syncapi.AddPublicRoutes( diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index 40c1cd3d..c28da460 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -62,7 +62,7 @@ func NewOutputClientDataConsumer( ctx: process.Context(), jetstream: js, topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), - durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"), + durable: cfg.Matrix.JetStream.Durable("SyncAPIAccountDataConsumer"), db: store, notifier: notifier, stream: stream, diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/receipts.go similarity index 87% rename from syncapi/consumers/eduserver_receipts.go rename to syncapi/consumers/receipts.go index ab79998e..6bb0747f 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/receipts.go @@ -17,11 +17,10 @@ package consumers import ( "context" "database/sql" - "encoding/json" "fmt" + "strconv" "github.com/getsentry/sentry-go" - "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" @@ -63,7 +62,7 @@ func NewOutputReceiptEventConsumer( ctx: process.Context(), jetstream: js, topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), - durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"), + durable: cfg.Matrix.JetStream.Durable("SyncAPIReceiptConsumer"), db: store, notifier: notifier, stream: stream, @@ -72,7 +71,7 @@ func NewOutputReceiptEventConsumer( } } -// Start consuming from EDU api +// Start consuming receipts events. func (s *OutputReceiptEventConsumer) Start() error { return jetstream.JetStreamConsumer( s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, @@ -81,14 +80,23 @@ func (s *OutputReceiptEventConsumer) Start() error { } func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { - var output api.OutputReceiptEvent - if err := json.Unmarshal(msg.Data, &output); err != nil { + output := types.OutputReceiptEvent{ + UserID: msg.Header.Get(jetstream.UserID), + RoomID: msg.Header.Get(jetstream.RoomID), + EventID: msg.Header.Get(jetstream.EventID), + Type: msg.Header.Get("type"), + } + + timestamp, err := strconv.Atoi(msg.Header.Get("timestamp")) + if err != nil { // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("EDU server output log: message parse failure") + log.WithError(err).Errorf("output log: message parse failure") sentry.CaptureException(err) return true } + output.Timestamp = gomatrixserverlib.Timestamp(timestamp) + streamPos, err := s.db.StoreReceipt( s.ctx, output.RoomID, @@ -117,7 +125,7 @@ func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Ms return true } -func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output api.OutputReceiptEvent) error { +func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output types.OutputReceiptEvent) error { if output.Type != "m.read" { return nil } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/sendtodevice.go similarity index 87% rename from syncapi/consumers/eduserver_sendtodevice.go rename to syncapi/consumers/sendtodevice.go index bdbe7735..0b9153fc 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/sendtodevice.go @@ -19,7 +19,6 @@ import ( "encoding/json" "github.com/getsentry/sentry-go" - "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" @@ -58,7 +57,7 @@ func NewOutputSendToDeviceEventConsumer( ctx: process.Context(), jetstream: js, topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), - durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"), + durable: cfg.Matrix.JetStream.Durable("SyncAPISendToDeviceConsumer"), db: store, serverName: cfg.Matrix.ServerName, notifier: notifier, @@ -66,7 +65,7 @@ func NewOutputSendToDeviceEventConsumer( } } -// Start consuming from EDU api +// Start consuming send-to-device events. func (s *OutputSendToDeviceEventConsumer) Start() error { return jetstream.JetStreamConsumer( s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, @@ -75,15 +74,8 @@ func (s *OutputSendToDeviceEventConsumer) Start() error { } func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { - var output api.OutputSendToDeviceEvent - if err := json.Unmarshal(msg.Data, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("EDU server output log: message parse failure") - sentry.CaptureException(err) - return true - } - - _, domain, err := gomatrixserverlib.SplitID('@', output.UserID) + userID := msg.Header.Get(jetstream.UserID) + _, domain, err := gomatrixserverlib.SplitID('@', userID) if err != nil { sentry.CaptureException(err) return true @@ -92,12 +84,20 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *na return true } + var output types.OutputSendToDeviceEvent + if err = json.Unmarshal(msg.Data, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("output log: message parse failure") + sentry.CaptureException(err) + return true + } + util.GetLogger(context.TODO()).WithFields(log.Fields{ "sender": output.Sender, "user_id": output.UserID, "device_id": output.DeviceID, "event_type": output.Type, - }).Info("sync API received send-to-device event from EDU server") + }).Debugf("sync API received send-to-device event from the clientapi/federationsender") streamPos, err := s.db.StoreNewSendForDeviceMessage( s.ctx, output.UserID, output.DeviceID, output.SendToDeviceEvent, diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/typing.go similarity index 67% rename from syncapi/consumers/eduserver_typing.go rename to syncapi/consumers/typing.go index c2828c7f..48e484ec 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/typing.go @@ -16,16 +16,14 @@ package consumers import ( "context" - "encoding/json" + "strconv" + "time" - "github.com/getsentry/sentry-go" - "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" - "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" @@ -37,7 +35,7 @@ type OutputTypingEventConsumer struct { jetstream nats.JetStreamContext durable string topic string - eduCache *cache.EDUCache + eduCache *caching.EDUCache stream types.StreamProvider notifier *notifier.Notifier } @@ -48,8 +46,7 @@ func NewOutputTypingEventConsumer( process *process.ProcessContext, cfg *config.SyncAPI, js nats.JetStreamContext, - store storage.Database, - eduCache *cache.EDUCache, + eduCache *caching.EDUCache, notifier *notifier.Notifier, stream types.StreamProvider, ) *OutputTypingEventConsumer { @@ -57,14 +54,14 @@ func NewOutputTypingEventConsumer( ctx: process.Context(), jetstream: js, topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), - durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"), + durable: cfg.Matrix.JetStream.Durable("SyncAPITypingConsumer"), eduCache: eduCache, notifier: notifier, stream: stream, } } -// Start consuming from EDU api +// Start consuming typing events. func (s *OutputTypingEventConsumer) Start() error { return jetstream.JetStreamConsumer( s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, @@ -73,34 +70,40 @@ func (s *OutputTypingEventConsumer) Start() error { } func (s *OutputTypingEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { - var output api.OutputTypingEvent - if err := json.Unmarshal(msg.Data, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("EDU server output log: message parse failure") - sentry.CaptureException(err) + roomID := msg.Header.Get(jetstream.RoomID) + userID := msg.Header.Get(jetstream.UserID) + typing, err := strconv.ParseBool(msg.Header.Get("typing")) + if err != nil { + log.WithError(err).Errorf("output log: typing parse failure") + return true + } + timeout, err := strconv.Atoi(msg.Header.Get("timeout_ms")) + if err != nil { + log.WithError(err).Errorf("output log: timeout_ms parse failure") return true } log.WithFields(log.Fields{ - "room_id": output.Event.RoomID, - "user_id": output.Event.UserID, - "typing": output.Event.Typing, - }).Debug("received data from EDU server") + "room_id": roomID, + "user_id": userID, + "typing": typing, + "timeout": timeout, + }).Debug("syncapi received EDU data from client api") var typingPos types.StreamPosition - typingEvent := output.Event - if typingEvent.Typing { + if typing { + expiry := time.Now().Add(time.Duration(timeout) * time.Millisecond) typingPos = types.StreamPosition( - s.eduCache.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime), + s.eduCache.AddTypingUser(userID, roomID, &expiry), ) } else { typingPos = types.StreamPosition( - s.eduCache.RemoveUser(typingEvent.UserID, typingEvent.RoomID), + s.eduCache.RemoveUser(userID, roomID), ) } s.stream.Advance(typingPos) - s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos}) + s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: typingPos}) return true } diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index b6ac5be1..647fffad 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -17,7 +17,6 @@ package storage import ( "context" - eduAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" @@ -46,7 +45,7 @@ type Database interface { InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) - RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) + RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error) // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) @@ -136,7 +135,7 @@ type Database interface { // StoreReceipt stores new receipt events StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) // GetRoomReceipts gets all receipts for a given roomID - GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) + GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]types.OutputReceiptEvent, error) // UpsertRoomUnreadNotificationCounts updates the notification statistics about a (user, room) key. UpsertRoomUnreadNotificationCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error) diff --git a/syncapi/storage/postgres/receipt_table.go b/syncapi/storage/postgres/receipt_table.go index 474d0c02..2a42ffd7 100644 --- a/syncapi/storage/postgres/receipt_table.go +++ b/syncapi/storage/postgres/receipt_table.go @@ -21,7 +21,6 @@ import ( "github.com/lib/pq" - "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/syncapi/storage/tables" @@ -95,16 +94,16 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room return } -func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) { +func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error) { var lastPos types.StreamPosition rows, err := r.selectRoomReceipts.QueryContext(ctx, pq.Array(roomIDs), streamPos) if err != nil { return 0, nil, fmt.Errorf("unable to query room receipts: %w", err) } defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed") - var res []api.OutputReceiptEvent + var res []types.OutputReceiptEvent for rows.Next() { - r := api.OutputReceiptEvent{} + r := types.OutputReceiptEvent{} var id types.StreamPosition err = rows.Scan(&id, &r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp) if err != nil { diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 9a2dc0d4..349e4452 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" - eduAPI "github.com/matrix-org/dendrite/eduserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/internal/eventutil" @@ -135,7 +134,7 @@ func (d *Database) PeeksInRange(ctx context.Context, userID, deviceID string, r return d.Peeks.SelectPeeksInRange(ctx, nil, userID, deviceID, r) } -func (d *Database) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) { +func (d *Database) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error) { return d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos) } @@ -972,7 +971,7 @@ func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId return } -func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) { +func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]types.OutputReceiptEvent, error) { _, receipts, err := d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos) return receipts, err } diff --git a/syncapi/storage/sqlite3/receipt_table.go b/syncapi/storage/sqlite3/receipt_table.go index 9111a39f..dea05771 100644 --- a/syncapi/storage/sqlite3/receipt_table.go +++ b/syncapi/storage/sqlite3/receipt_table.go @@ -20,7 +20,6 @@ import ( "fmt" "strings" - "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/syncapi/storage/tables" @@ -99,7 +98,7 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room } // SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp -func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) { +func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error) { selectSQL := strings.Replace(selectRoomReceipts, "($2)", sqlutil.QueryVariadicOffset(len(roomIDs), 1), 1) var lastPos types.StreamPosition params := make([]interface{}, len(roomIDs)+1) @@ -112,9 +111,9 @@ func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs return 0, nil, fmt.Errorf("unable to query room receipts: %w", err) } defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed") - var res []api.OutputReceiptEvent + var res []types.OutputReceiptEvent for rows.Next() { - r := api.OutputReceiptEvent{} + r := types.OutputReceiptEvent{} var id types.StreamPosition err = rows.Scan(&id, &r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp) if err != nil { diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 640b7dc3..ba0076e2 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -18,7 +18,6 @@ import ( "context" "database/sql" - eduAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" @@ -168,7 +167,7 @@ type Filter interface { type Receipts interface { UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) - SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) + SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error) SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error) } diff --git a/syncapi/streams/stream_receipt.go b/syncapi/streams/stream_receipt.go index 35ffd3a1..680f8cd8 100644 --- a/syncapi/streams/stream_receipt.go +++ b/syncapi/streams/stream_receipt.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" - eduAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -53,7 +52,7 @@ func (p *ReceiptStreamProvider) IncrementalSync( } // Group receipts by room, so we can create one ClientEvent for every room - receiptsByRoom := make(map[string][]eduAPI.OutputReceiptEvent) + receiptsByRoom := make(map[string][]types.OutputReceiptEvent) for _, receipt := range receipts { receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt) } @@ -68,15 +67,15 @@ func (p *ReceiptStreamProvider) IncrementalSync( Type: gomatrixserverlib.MReceipt, RoomID: roomID, } - content := make(map[string]eduAPI.ReceiptMRead) + content := make(map[string]ReceiptMRead) for _, receipt := range receipts { read, ok := content[receipt.EventID] if !ok { - read = eduAPI.ReceiptMRead{ - User: make(map[string]eduAPI.ReceiptTS), + read = ReceiptMRead{ + User: make(map[string]ReceiptTS), } } - read.User[receipt.UserID] = eduAPI.ReceiptTS{TS: receipt.Timestamp} + read.User[receipt.UserID] = ReceiptTS{TS: receipt.Timestamp} content[receipt.EventID] = read } ev.Content, err = json.Marshal(content) @@ -91,3 +90,11 @@ func (p *ReceiptStreamProvider) IncrementalSync( return lastPos } + +type ReceiptMRead struct { + User map[string]ReceiptTS `json:"m.read"` +} + +type ReceiptTS struct { + TS gomatrixserverlib.Timestamp `json:"ts"` +} diff --git a/syncapi/streams/stream_typing.go b/syncapi/streams/stream_typing.go index 1e7a46bd..e46cd447 100644 --- a/syncapi/streams/stream_typing.go +++ b/syncapi/streams/stream_typing.go @@ -4,14 +4,14 @@ import ( "context" "encoding/json" - "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) type TypingStreamProvider struct { StreamProvider - EDUCache *cache.EDUCache + EDUCache *caching.EDUCache } func (p *TypingStreamProvider) CompleteSync( diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index 17951acb..b2273aad 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -3,7 +3,7 @@ package streams import ( "context" - "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/internal/caching" keyapi "github.com/matrix-org/dendrite/keyserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage" @@ -25,7 +25,7 @@ type Streams struct { func NewSyncStreamProviders( d storage.Database, userAPI userapi.UserInternalAPI, rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI, - eduCache *cache.EDUCache, + eduCache *caching.EDUCache, ) *Streams { streams := &Streams{ PDUStreamProvider: &PDUStreamProvider{ diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index ed8118bf..b579467a 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -18,9 +18,9 @@ import ( "context" "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/internal/caching" "github.com/sirupsen/logrus" - "github.com/matrix-org/dendrite/eduserver/cache" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" @@ -56,7 +56,7 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to connect to sync db") } - eduCache := cache.New() + eduCache := caching.NewTypingCache() streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache) notifier := notifier.NewNotifier(streams.Latest(context.Background())) if err = notifier.Load(context.Background(), syncDB); err != nil { @@ -110,7 +110,7 @@ func AddPublicRoutes( } typingConsumer := consumers.NewOutputTypingEventConsumer( - process, cfg, js, syncDB, eduCache, notifier, streams.TypingStreamProvider, + process, cfg, js, eduCache, notifier, streams.TypingStreamProvider, ) if err = typingConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start typing consumer") diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 4150e6c9..f964b80b 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -487,3 +487,21 @@ type StreamedEvent struct { Event *gomatrixserverlib.HeaderedEvent `json:"event"` StreamPosition StreamPosition `json:"stream_position"` } + +// OutputReceiptEvent is an entry in the receipt output kafka log +type OutputReceiptEvent struct { + UserID string `json:"user_id"` + RoomID string `json:"room_id"` + EventID string `json:"event_id"` + Type string `json:"type"` + Timestamp gomatrixserverlib.Timestamp `json:"timestamp"` +} + +// OutputSendToDeviceEvent is an entry in the send-to-device output kafka log. +// This contains the full event content, along with the user ID and device ID +// to which it is destined. +type OutputSendToDeviceEvent struct { + UserID string `json:"user_id"` + DeviceID string `json:"device_id"` + gomatrixserverlib.SendToDeviceEvent +}