diff --git a/build/docker/config/dendrite.yaml b/build/docker/config/dendrite.yaml index 25cbd6d8..e3a0316d 100644 --- a/build/docker/config/dendrite.yaml +++ b/build/docker/config/dendrite.yaml @@ -62,6 +62,17 @@ global: - matrix.org - vector.im + # Disables federation. Dendrite will not be able to make any outbound HTTP requests + # to other servers and the federation API will not be exposed. + disable_federation: false + + # Configures the handling of presence events. + presence: + # Whether inbound presence events are allowed, e.g. receiving presence events from other servers + enable_inbound: false + # Whether outbound presence events are allowed, e.g. sending presence events to other servers + enable_outbound: false + # Configuration for NATS JetStream jetstream: # A list of NATS Server addresses to connect to. If none are specified, an diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index d4b417a3..e2f8d3f3 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -48,7 +48,7 @@ func AddPublicRoutes( extRoomsProvider api.ExtraPublicRoomsProvider, mscCfg *config.MSCs, ) { - js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream) + js, natsClient := jetstream.Prepare(process, &cfg.Matrix.JetStream) syncProducer := &producers.SyncAPIProducer{ JetStream: js, @@ -56,6 +56,7 @@ func AddPublicRoutes( TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), + TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), UserAPI: userAPI, ServerName: cfg.Matrix.ServerName, } @@ -64,6 +65,6 @@ func AddPublicRoutes( router, synapseAdminRouter, cfg, rsAPI, asAPI, userAPI, userDirectoryProvider, federation, syncProducer, transactionsCache, fsAPI, keyAPI, - extRoomsProvider, mscCfg, + extRoomsProvider, mscCfg, natsClient, ) } diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go index 2dee04e3..6e869c31 100644 --- a/clientapi/producers/syncapi.go +++ b/clientapi/producers/syncapi.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "strconv" + "time" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/setup/jetstream" @@ -34,6 +35,7 @@ type SyncAPIProducer struct { TopicReceiptEvent string TopicSendToDeviceEvent string TopicTypingEvent string + TopicPresenceEvent string JetStream nats.JetStreamContext ServerName gomatrixserverlib.ServerName UserAPI userapi.UserInternalAPI @@ -173,3 +175,19 @@ func (p *SyncAPIProducer) SendTyping( _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) return err } + +func (p *SyncAPIProducer) SendPresence( + ctx context.Context, userID string, presence types.Presence, statusMsg *string, +) error { + m := nats.NewMsg(p.TopicPresenceEvent) + m.Header.Set(jetstream.UserID, userID) + m.Header.Set("presence", presence.String()) + if statusMsg != nil { + m.Header.Set("status_msg", *statusMsg) + } + + m.Header.Set("last_active_ts", strconv.Itoa(int(gomatrixserverlib.AsTimestamp(time.Now())))) + + _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) + return err +} diff --git a/clientapi/routing/presence.go b/clientapi/routing/presence.go new file mode 100644 index 00000000..63fbb75e --- /dev/null +++ b/clientapi/routing/presence.go @@ -0,0 +1,138 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "fmt" + "net/http" + "strconv" + "time" + + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" +) + +type presenceReq struct { + Presence string `json:"presence"` + StatusMsg *string `json:"status_msg,omitempty"` +} + +func SetPresence( + req *http.Request, + cfg *config.ClientAPI, + device *api.Device, + producer *producers.SyncAPIProducer, + userID string, +) util.JSONResponse { + if !cfg.Matrix.Presence.EnableOutbound { + return util.JSONResponse{ + Code: http.StatusOK, + JSON: struct{}{}, + } + } + if device.UserID != userID { + return util.JSONResponse{ + Code: http.StatusForbidden, + JSON: jsonerror.Forbidden("Unable to set presence for other user."), + } + } + var presence presenceReq + parseErr := httputil.UnmarshalJSONRequest(req, &presence) + if parseErr != nil { + return *parseErr + } + + presenceStatus, ok := types.PresenceFromString(presence.Presence) + if !ok { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.Unknown(fmt.Sprintf("Unknown presence '%s'.", presence.Presence)), + } + } + + err := producer.SendPresence(req.Context(), userID, presenceStatus, presence.StatusMsg) + if err != nil { + log.WithError(err).Errorf("failed to update presence") + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: jsonerror.InternalServerError(), + } + } + + return util.JSONResponse{ + Code: http.StatusOK, + JSON: struct{}{}, + } +} + +func GetPresence( + req *http.Request, + device *api.Device, + natsClient *nats.Conn, + presenceTopic string, + userID string, +) util.JSONResponse { + msg := nats.NewMsg(presenceTopic) + msg.Header.Set(jetstream.UserID, userID) + + presence, err := natsClient.RequestMsg(msg, time.Second*10) + if err != nil { + log.WithError(err).Errorf("unable to get presence") + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: jsonerror.InternalServerError(), + } + } + + statusMsg := presence.Header.Get("status_msg") + e := presence.Header.Get("error") + if e != "" { + log.Errorf("received error msg from nats: %s", e) + return util.JSONResponse{ + Code: http.StatusOK, + JSON: types.PresenceClientResponse{ + Presence: types.PresenceUnavailable.String(), + }, + } + } + lastActive, err := strconv.Atoi(presence.Header.Get("last_active_ts")) + if err != nil { + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: jsonerror.InternalServerError(), + } + } + + p := types.PresenceInternal{LastActiveTS: gomatrixserverlib.Timestamp(lastActive)} + currentlyActive := p.CurrentlyActive() + return util.JSONResponse{ + Code: http.StatusOK, + JSON: types.PresenceClientResponse{ + CurrentlyActive: ¤tlyActive, + LastActiveAgo: p.LastActiveAgo(), + Presence: presence.Header.Get("presence"), + StatusMsg: &statusMsg, + }, + } +} diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index db860dcd..32e83187 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -32,9 +32,11 @@ import ( keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/nats-io/nats.go" "github.com/sirupsen/logrus" ) @@ -56,7 +58,7 @@ func Setup( federationSender federationAPI.FederationInternalAPI, keyAPI keyserverAPI.KeyInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, - mscCfg *config.MSCs, + mscCfg *config.MSCs, natsClient *nats.Conn, ) { rateLimits := httputil.NewRateLimits(&cfg.RateLimiting) userInteractiveAuth := auth.NewUserInteractive(userAPI, cfg) @@ -779,20 +781,6 @@ func Setup( }), ).Methods(http.MethodPost, http.MethodOptions) - // Element logs get flooded unless this is handled - v3mux.Handle("/presence/{userID}/status", - httputil.MakeExternalAPI("presence", func(req *http.Request) util.JSONResponse { - if r := rateLimits.Limit(req); r != nil { - return *r - } - // TODO: Set presence (probably the responsibility of a presence server not clientapi) - return util.JSONResponse{ - Code: http.StatusOK, - JSON: struct{}{}, - } - }), - ).Methods(http.MethodPut, http.MethodOptions) - v3mux.Handle("/voip/turnServer", httputil.MakeAuthAPI("turn_server", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { if r := rateLimits.Limit(req); r != nil { @@ -1308,4 +1296,22 @@ func Setup( return SetReceipt(req, syncProducer, device, vars["roomId"], vars["receiptType"], vars["eventId"]) }), ).Methods(http.MethodPost, http.MethodOptions) + v3mux.Handle("/presence/{userId}/status", + httputil.MakeAuthAPI("set_presence", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + return SetPresence(req, cfg, device, syncProducer, vars["userId"]) + }), + ).Methods(http.MethodPut, http.MethodOptions) + v3mux.Handle("/presence/{userId}/status", + httputil.MakeAuthAPI("get_presence", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + return GetPresence(req, device, natsClient, cfg.Matrix.JetStream.Prefixed(jetstream.RequestPresence), vars["userId"]) + }), + ).Methods(http.MethodGet, http.MethodOptions) } diff --git a/cmd/generate-config/main.go b/cmd/generate-config/main.go index ba5a87a7..24085afa 100644 --- a/cmd/generate-config/main.go +++ b/cmd/generate-config/main.go @@ -91,6 +91,10 @@ func main() { cfg.UserAPI.BCryptCost = bcrypt.MinCost cfg.Global.JetStream.InMemory = true cfg.ClientAPI.RegistrationSharedSecret = "complement" + cfg.Global.Presence = config.PresenceOptions{ + EnableInbound: true, + EnableOutbound: true, + } } j, err := yaml.Marshal(cfg) diff --git a/dendrite-config.yaml b/dendrite-config.yaml index 8b4c820a..47f08c4f 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -68,6 +68,13 @@ global: # to other servers and the federation API will not be exposed. disable_federation: false + # Configures the handling of presence events. + presence: + # Whether inbound presence events are allowed, e.g. receiving presence events from other servers + enable_inbound: false + # Whether outbound presence events are allowed, e.g. sending presence events to other servers + enable_outbound: false + # Server notices allows server admins to send messages to all users. server_notices: enabled: false diff --git a/federationapi/consumers/presence.go b/federationapi/consumers/presence.go new file mode 100644 index 00000000..bfce1b28 --- /dev/null +++ b/federationapi/consumers/presence.go @@ -0,0 +1,143 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + "strconv" + + "github.com/matrix-org/dendrite/federationapi/queue" + "github.com/matrix-org/dendrite/federationapi/storage" + fedTypes "github.com/matrix-org/dendrite/federationapi/types" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" +) + +// OutputReceiptConsumer consumes events that originate in the clientapi. +type OutputPresenceConsumer struct { + ctx context.Context + jetstream nats.JetStreamContext + durable string + db storage.Database + queues *queue.OutgoingQueues + ServerName gomatrixserverlib.ServerName + topic string + outboundPresenceEnabled bool +} + +// NewOutputPresenceConsumer creates a new OutputPresenceConsumer. Call Start() to begin consuming events. +func NewOutputPresenceConsumer( + process *process.ProcessContext, + cfg *config.FederationAPI, + js nats.JetStreamContext, + queues *queue.OutgoingQueues, + store storage.Database, +) *OutputPresenceConsumer { + return &OutputPresenceConsumer{ + ctx: process.Context(), + jetstream: js, + queues: queues, + db: store, + ServerName: cfg.Matrix.ServerName, + durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), + outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound, + } +} + +// Start consuming from the clientapi +func (t *OutputPresenceConsumer) Start() error { + if !t.outboundPresenceEnabled { + return nil + } + return jetstream.JetStreamConsumer( + t.ctx, t.jetstream, t.topic, t.durable, t.onMessage, + nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(), + ) +} + +// onMessage is called in response to a message received on the presence +// events topic from the client api. +func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { + // only send presence events which originated from us + userID := msg.Header.Get(jetstream.UserID) + _, serverName, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + log.WithError(err).WithField("user_id", userID).Error("failed to extract domain from receipt sender") + return true + } + if serverName != t.ServerName { + return true + } + + presence := msg.Header.Get("presence") + + ts, err := strconv.Atoi(msg.Header.Get("last_active_ts")) + if err != nil { + return true + } + + joined, err := t.db.GetAllJoinedHosts(ctx) + if err != nil { + log.WithError(err).Error("failed to get joined hosts") + return true + } + if len(joined) == 0 { + return true + } + + var statusMsg *string = nil + if data, ok := msg.Header["status_msg"]; ok && len(data) > 0 { + status := msg.Header.Get("status_msg") + statusMsg = &status + } + + p := types.PresenceInternal{LastActiveTS: gomatrixserverlib.Timestamp(ts)} + + content := fedTypes.Presence{ + Push: []fedTypes.PresenceContent{ + { + CurrentlyActive: p.CurrentlyActive(), + LastActiveAgo: p.LastActiveAgo(), + Presence: presence, + StatusMsg: statusMsg, + UserID: userID, + }, + }, + } + + edu := &gomatrixserverlib.EDU{ + Type: gomatrixserverlib.MPresence, + Origin: string(t.ServerName), + } + if edu.Content, err = json.Marshal(content); err != nil { + log.WithError(err).Error("failed to marshal EDU JSON") + return true + } + + log.Debugf("sending presence EDU to %d servers", len(joined)) + if err = t.queues.SendEDU(edu, t.ServerName, joined); err != nil { + log.WithError(err).Error("failed to send EDU") + return false + } + + return true +} diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index 8a0ce8e3..5bfe237a 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -66,6 +66,7 @@ func AddPublicRoutes( TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), + TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), ServerName: cfg.Matrix.ServerName, UserAPI: userAPI, } @@ -149,5 +150,11 @@ func NewInternalAPI( logrus.WithError(err).Panic("failed to start key server consumer") } + presenceConsumer := consumers.NewOutputPresenceConsumer( + base.ProcessContext, cfg, js, queues, federationDB, + ) + if err = presenceConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start presence consumer") + } return internal.NewFederationInternalAPI(federationDB, cfg, rsAPI, federation, stats, caches, queues, keyRing) } diff --git a/federationapi/producers/syncapi.go b/federationapi/producers/syncapi.go index 24acb126..49415003 100644 --- a/federationapi/producers/syncapi.go +++ b/federationapi/producers/syncapi.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "strconv" + "time" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/syncapi/types" @@ -32,6 +33,7 @@ type SyncAPIProducer struct { TopicReceiptEvent string TopicSendToDeviceEvent string TopicTypingEvent string + TopicPresenceEvent string JetStream nats.JetStreamContext ServerName gomatrixserverlib.ServerName UserAPI userapi.UserInternalAPI @@ -142,3 +144,20 @@ func (p *SyncAPIProducer) SendTyping( _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) return err } + +func (p *SyncAPIProducer) SendPresence( + ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveAgo int64, +) error { + m := nats.NewMsg(p.TopicPresenceEvent) + m.Header.Set(jetstream.UserID, userID) + m.Header.Set("presence", presence.String()) + if statusMsg != nil { + m.Header.Set("status_msg", *statusMsg) + } + lastActiveTS := gomatrixserverlib.AsTimestamp(time.Now().Add(-(time.Duration(lastActiveAgo) * time.Millisecond))) + + m.Header.Set("last_active_ts", strconv.Itoa(int(lastActiveTS))) + log.Debugf("Sending presence to syncAPI: %+v", m.Header) + _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) + return err +} diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index eacc76db..1bba632b 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -30,6 +30,7 @@ import ( keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + syncTypes "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" @@ -127,13 +128,14 @@ func Send( defer inFlightTxnsPerOrigin.Delete(index) t := txnReq{ - rsAPI: rsAPI, - keys: keys, - federation: federation, - servers: servers, - keyAPI: keyAPI, - roomsMu: mu, - producer: producer, + rsAPI: rsAPI, + keys: keys, + federation: federation, + servers: servers, + keyAPI: keyAPI, + roomsMu: mu, + producer: producer, + inboundPresenceEnabled: cfg.Matrix.Presence.EnableInbound, } var txnEvents struct { @@ -185,13 +187,14 @@ func Send( type txnReq struct { gomatrixserverlib.Transaction - rsAPI api.RoomserverInternalAPI - keyAPI keyapi.KeyInternalAPI - keys gomatrixserverlib.JSONVerifier - federation txnFederationClient - roomsMu *internal.MutexByRoom - servers federationAPI.ServersInRoomProvider - producer *producers.SyncAPIProducer + rsAPI api.RoomserverInternalAPI + keyAPI keyapi.KeyInternalAPI + keys gomatrixserverlib.JSONVerifier + federation txnFederationClient + roomsMu *internal.MutexByRoom + servers federationAPI.ServersInRoomProvider + producer *producers.SyncAPIProducer + inboundPresenceEnabled bool } // A subset of FederationClient functionality that txn requires. Useful for testing. @@ -389,12 +392,37 @@ func (t *txnReq) processEDUs(ctx context.Context) { if err := t.processSigningKeyUpdate(ctx, e); err != nil { logrus.WithError(err).Errorf("Failed to process signing key update") } + case gomatrixserverlib.MPresence: + if t.inboundPresenceEnabled { + if err := t.processPresence(ctx, e); err != nil { + logrus.WithError(err).Errorf("Failed to process presence update") + } + } default: util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU") } } } +// processPresence handles m.receipt events +func (t *txnReq) processPresence(ctx context.Context, e gomatrixserverlib.EDU) error { + payload := types.Presence{} + if err := json.Unmarshal(e.Content, &payload); err != nil { + return err + } + for _, content := range payload.Push { + presence, ok := syncTypes.PresenceFromString(content.Presence) + if !ok { + logrus.Warnf("invalid presence '%s', skipping.", content.Presence) + continue + } + if err := t.producer.SendPresence(ctx, content.UserID, presence, content.StatusMsg, content.LastActiveAgo); err != nil { + return err + } + } + return nil +} + func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverlib.EDU) error { var updatePayload keyapi.CrossSigningKeyUpdate if err := json.Unmarshal(e.Content, &updatePayload); err != nil { diff --git a/federationapi/types/types.go b/federationapi/types/types.go index a28a80b2..5821000c 100644 --- a/federationapi/types/types.go +++ b/federationapi/types/types.go @@ -66,3 +66,15 @@ type FederationReceiptData struct { type ReceiptTS struct { TS gomatrixserverlib.Timestamp `json:"ts"` } + +type Presence struct { + Push []PresenceContent `json:"push"` +} + +type PresenceContent struct { + CurrentlyActive bool `json:"currently_active,omitempty"` + LastActiveAgo int64 `json:"last_active_ago"` + Presence string `json:"presence"` + StatusMsg *string `json:"status_msg,omitempty"` + UserID string `json:"user_id"` +} diff --git a/setup/config/config_global.go b/setup/config/config_global.go index b947f207..c1650f07 100644 --- a/setup/config/config_global.go +++ b/setup/config/config_global.go @@ -41,6 +41,9 @@ type Global struct { // to other servers and the federation API will not be exposed. DisableFederation bool `yaml:"disable_federation"` + // Configures the handling of presence events. + Presence PresenceOptions `yaml:"presence"` + // List of domains that the server will trust as identity servers to // verify third-party identifiers. // Defaults to an empty array. @@ -225,3 +228,11 @@ func (c *DNSCacheOptions) Verify(configErrs *ConfigErrors, isMonolith bool) { checkPositive(configErrs, "cache_size", int64(c.CacheSize)) checkPositive(configErrs, "cache_lifetime", int64(c.CacheLifetime)) } + +// PresenceOptions defines possible configurations for presence events. +type PresenceOptions struct { + // Whether inbound presence events are allowed + EnableInbound bool `yaml:"enable_inbound"` + // Whether outbound presence events are allowed + EnableOutbound bool `yaml:"enable_outbound"` +} diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 5f0d37fd..6594e694 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -25,6 +25,8 @@ var ( OutputReceiptEvent = "OutputReceiptEvent" OutputStreamEvent = "OutputStreamEvent" OutputReadUpdate = "OutputReadUpdate" + RequestPresence = "GetPresence" + OutputPresenceEvent = "OutputPresenceEvent" ) var safeCharacters = regexp.MustCompile("[^A-Za-z0-9$]+") @@ -89,4 +91,10 @@ var streams = []*nats.StreamConfig{ Retention: nats.InterestPolicy, Storage: nats.FileStorage, }, + { + Name: OutputPresenceEvent, + Retention: nats.InterestPolicy, + Storage: nats.MemoryStorage, + MaxAge: time.Minute * 5, + }, } diff --git a/syncapi/consumers/presence.go b/syncapi/consumers/presence.go new file mode 100644 index 00000000..b198b229 --- /dev/null +++ b/syncapi/consumers/presence.go @@ -0,0 +1,158 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "strconv" + + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" +) + +// OutputTypingEventConsumer consumes events that originated in the EDU server. +type PresenceConsumer struct { + ctx context.Context + jetstream nats.JetStreamContext + nats *nats.Conn + durable string + requestTopic string + presenceTopic string + db storage.Database + stream types.StreamProvider + notifier *notifier.Notifier + deviceAPI api.UserDeviceAPI + cfg *config.SyncAPI +} + +// NewPresenceConsumer creates a new PresenceConsumer. +// Call Start() to begin consuming events. +func NewPresenceConsumer( + process *process.ProcessContext, + cfg *config.SyncAPI, + js nats.JetStreamContext, + nats *nats.Conn, + db storage.Database, + notifier *notifier.Notifier, + stream types.StreamProvider, + deviceAPI api.UserDeviceAPI, +) *PresenceConsumer { + return &PresenceConsumer{ + ctx: process.Context(), + nats: nats, + jetstream: js, + durable: cfg.Matrix.JetStream.Durable("SyncAPIPresenceConsumer"), + presenceTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), + requestTopic: cfg.Matrix.JetStream.Prefixed(jetstream.RequestPresence), + db: db, + notifier: notifier, + stream: stream, + deviceAPI: deviceAPI, + cfg: cfg, + } +} + +// Start consuming typing events. +func (s *PresenceConsumer) Start() error { + // Normal NATS subscription, used by Request/Reply + _, err := s.nats.Subscribe(s.requestTopic, func(msg *nats.Msg) { + userID := msg.Header.Get(jetstream.UserID) + presence, err := s.db.GetPresence(context.Background(), userID) + m := &nats.Msg{ + Header: nats.Header{}, + } + if err != nil { + m.Header.Set("error", err.Error()) + if err = msg.RespondMsg(m); err != nil { + logrus.WithError(err).Error("Unable to respond to messages") + } + return + } + + deviceRes := api.QueryDevicesResponse{} + if err = s.deviceAPI.QueryDevices(s.ctx, &api.QueryDevicesRequest{UserID: userID}, &deviceRes); err != nil { + m.Header.Set("error", err.Error()) + if err = msg.RespondMsg(m); err != nil { + logrus.WithError(err).Error("Unable to respond to messages") + } + return + } + + for i := range deviceRes.Devices { + if int64(presence.LastActiveTS) < deviceRes.Devices[i].LastSeenTS { + presence.LastActiveTS = gomatrixserverlib.Timestamp(deviceRes.Devices[i].LastSeenTS) + } + } + + m.Header.Set(jetstream.UserID, presence.UserID) + m.Header.Set("presence", presence.ClientFields.Presence) + m.Header.Set("status_msg", *presence.ClientFields.StatusMsg) + m.Header.Set("last_active_ts", strconv.Itoa(int(presence.LastActiveTS))) + + if err = msg.RespondMsg(m); err != nil { + logrus.WithError(err).Error("Unable to respond to messages") + return + } + }) + if err != nil { + return err + } + if !s.cfg.Matrix.Presence.EnableInbound && !s.cfg.Matrix.Presence.EnableOutbound { + return nil + } + return jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.presenceTopic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(), + ) +} + +func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { + userID := msg.Header.Get(jetstream.UserID) + presence := msg.Header.Get("presence") + timestamp := msg.Header.Get("last_active_ts") + fromSync, _ := strconv.ParseBool(msg.Header.Get("from_sync")) + + logrus.Debugf("syncAPI received presence event: %+v", msg.Header) + + ts, err := strconv.Atoi(timestamp) + if err != nil { + return true + } + + var statusMsg *string = nil + if data, ok := msg.Header["status_msg"]; ok && len(data) > 0 { + newMsg := msg.Header.Get("status_msg") + statusMsg = &newMsg + } + // OK is already checked, so no need to do it again + p, _ := types.PresenceFromString(presence) + pos, err := s.db.UpdatePresence(ctx, userID, p, statusMsg, gomatrixserverlib.Timestamp(ts), fromSync) + if err != nil { + return true + } + + s.stream.Advance(pos) + s.notifier.OnNewPresence(types.StreamingToken{PresencePosition: pos}, userID) + + return true +} diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go index 6a641e6f..d2b79b63 100644 --- a/syncapi/notifier/notifier.go +++ b/syncapi/notifier/notifier.go @@ -43,22 +43,30 @@ type Notifier struct { userDeviceStreams map[string]map[string]*UserDeviceStream // The last time we cleaned out stale entries from the userStreams map lastCleanUpTime time.Time + // Protects roomIDToJoinedUsers and roomIDToPeekingDevices + mapLock *sync.RWMutex } // NewNotifier creates a new notifier set to the given sync position. // In order for this to be of any use, the Notifier needs to be told all rooms and // the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase). -func NewNotifier(currPos types.StreamingToken) *Notifier { +func NewNotifier() *Notifier { return &Notifier{ - currPos: currPos, roomIDToJoinedUsers: make(map[string]userIDSet), roomIDToPeekingDevices: make(map[string]peekingDeviceSet), userDeviceStreams: make(map[string]map[string]*UserDeviceStream), streamLock: &sync.Mutex{}, + mapLock: &sync.RWMutex{}, lastCleanUpTime: time.Now(), } } +// SetCurrentPosition sets the current streaming positions. +// This must be called directly after NewNotifier and initialising the streams. +func (n *Notifier) SetCurrentPosition(currPos types.StreamingToken) { + n.currPos = currPos +} + // OnNewEvent is called when a new event is received from the room server. Must only be // called from a single goroutine, to avoid races between updates which could set the // current sync position incorrectly. @@ -83,7 +91,7 @@ func (n *Notifier) OnNewEvent( if ev != nil { // Map this event's room_id to a list of joined users, and wake them up. - usersToNotify := n.joinedUsers(ev.RoomID()) + usersToNotify := n.JoinedUsers(ev.RoomID()) // Map this event's room_id to a list of peeking devices, and wake them up. peekingDevicesToNotify := n.PeekingDevices(ev.RoomID()) // If this is an invite, also add in the invitee to this list. @@ -114,7 +122,7 @@ func (n *Notifier) OnNewEvent( n.wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos) } else if roomID != "" { - n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), n.currPos) + n.wakeupUsers(n.JoinedUsers(roomID), n.PeekingDevices(roomID), n.currPos) } else if len(userIDs) > 0 { n.wakeupUsers(userIDs, nil, n.currPos) } else { @@ -182,7 +190,7 @@ func (n *Notifier) OnNewTyping( defer n.streamLock.Unlock() n.currPos.ApplyUpdates(posUpdate) - n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos) + n.wakeupUsers(n.JoinedUsers(roomID), nil, n.currPos) } // OnNewReceipt updates the current position @@ -194,7 +202,7 @@ func (n *Notifier) OnNewReceipt( defer n.streamLock.Unlock() n.currPos.ApplyUpdates(posUpdate) - n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos) + n.wakeupUsers(n.JoinedUsers(roomID), nil, n.currPos) } func (n *Notifier) OnNewKeyChange( @@ -228,6 +236,28 @@ func (n *Notifier) OnNewNotificationData( n.wakeupUsers([]string{userID}, nil, n.currPos) } +func (n *Notifier) OnNewPresence( + posUpdate types.StreamingToken, userID string, +) { + n.streamLock.Lock() + defer n.streamLock.Unlock() + + n.currPos.ApplyUpdates(posUpdate) + sharedUsers := n.SharedUsers(userID) + sharedUsers = append(sharedUsers, userID) + + n.wakeupUsers(sharedUsers, nil, n.currPos) +} + +func (n *Notifier) SharedUsers(userID string) (sharedUsers []string) { + for roomID, users := range n.roomIDToJoinedUsers { + if _, ok := users[userID]; ok { + sharedUsers = append(sharedUsers, n.JoinedUsers(roomID)...) + } + } + return sharedUsers +} + // GetListener returns a UserStreamListener that can be used to wait for // updates for a user. Must be closed. // notify for anything before sincePos @@ -250,6 +280,8 @@ func (n *Notifier) GetListener(req types.SyncRequest) UserDeviceStreamListener { // Load the membership states required to notify users correctly. func (n *Notifier) Load(ctx context.Context, db storage.Database) error { + n.mapLock.Lock() + defer n.mapLock.Unlock() roomToUsers, err := db.AllJoinedUsersInRooms(ctx) if err != nil { return err @@ -377,6 +409,8 @@ func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream { // Not thread-safe: must be called on the OnNewEvent goroutine only func (n *Notifier) addJoinedUser(roomID, userID string) { + n.mapLock.Lock() + defer n.mapLock.Unlock() if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { n.roomIDToJoinedUsers[roomID] = make(userIDSet) } @@ -385,6 +419,8 @@ func (n *Notifier) addJoinedUser(roomID, userID string) { // Not thread-safe: must be called on the OnNewEvent goroutine only func (n *Notifier) removeJoinedUser(roomID, userID string) { + n.mapLock.Lock() + defer n.mapLock.Unlock() if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { n.roomIDToJoinedUsers[roomID] = make(userIDSet) } @@ -392,7 +428,9 @@ func (n *Notifier) removeJoinedUser(roomID, userID string) { } // Not thread-safe: must be called on the OnNewEvent goroutine only -func (n *Notifier) joinedUsers(roomID string) (userIDs []string) { +func (n *Notifier) JoinedUsers(roomID string) (userIDs []string) { + n.mapLock.RLock() + defer n.mapLock.RUnlock() if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { return } @@ -401,6 +439,8 @@ func (n *Notifier) joinedUsers(roomID string) (userIDs []string) { // Not thread-safe: must be called on the OnNewEvent goroutine only func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) { + n.mapLock.Lock() + defer n.mapLock.Unlock() if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) } @@ -410,6 +450,8 @@ func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) { // Not thread-safe: must be called on the OnNewEvent goroutine only // nolint:unused func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) { + n.mapLock.Lock() + defer n.mapLock.Unlock() if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) } @@ -419,6 +461,8 @@ func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) { // Not thread-safe: must be called on the OnNewEvent goroutine only func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice) { + n.mapLock.RLock() + defer n.mapLock.RUnlock() if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { return } diff --git a/syncapi/notifier/notifier_test.go b/syncapi/notifier/notifier_test.go index 60403d5d..8b305586 100644 --- a/syncapi/notifier/notifier_test.go +++ b/syncapi/notifier/notifier_test.go @@ -107,7 +107,8 @@ func mustEqualPositions(t *testing.T, got, want types.StreamingToken) { // Test that the current position is returned if a request is already behind. func TestImmediateNotification(t *testing.T) { - n := NewNotifier(syncPositionBefore) + n := NewNotifier() + n.SetCurrentPosition(syncPositionBefore) pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionVeryOld)) if err != nil { t.Fatalf("TestImmediateNotification error: %s", err) @@ -117,7 +118,8 @@ func TestImmediateNotification(t *testing.T) { // Test that new events to a joined room unblocks the request. func TestNewEventAndJoinedToRoom(t *testing.T) { - n := NewNotifier(syncPositionBefore) + n := NewNotifier() + n.SetCurrentPosition(syncPositionBefore) n.setUsersJoinedToRooms(map[string][]string{ roomID: {alice, bob}, }) @@ -142,7 +144,8 @@ func TestNewEventAndJoinedToRoom(t *testing.T) { } func TestCorrectStream(t *testing.T) { - n := NewNotifier(syncPositionBefore) + n := NewNotifier() + n.SetCurrentPosition(syncPositionBefore) stream := lockedFetchUserStream(n, bob, bobDev) if stream.UserID != bob { t.Fatalf("expected user %q, got %q", bob, stream.UserID) @@ -153,7 +156,8 @@ func TestCorrectStream(t *testing.T) { } func TestCorrectStreamWakeup(t *testing.T) { - n := NewNotifier(syncPositionBefore) + n := NewNotifier() + n.SetCurrentPosition(syncPositionBefore) awoken := make(chan string) streamone := lockedFetchUserStream(n, alice, "one") @@ -180,7 +184,8 @@ func TestCorrectStreamWakeup(t *testing.T) { // Test that an invite unblocks the request func TestNewInviteEventForUser(t *testing.T) { - n := NewNotifier(syncPositionBefore) + n := NewNotifier() + n.SetCurrentPosition(syncPositionBefore) n.setUsersJoinedToRooms(map[string][]string{ roomID: {alice, bob}, }) @@ -236,7 +241,8 @@ func TestEDUWakeup(t *testing.T) { // Test that all blocked requests get woken up on a new event. func TestMultipleRequestWakeup(t *testing.T) { - n := NewNotifier(syncPositionBefore) + n := NewNotifier() + n.SetCurrentPosition(syncPositionBefore) n.setUsersJoinedToRooms(map[string][]string{ roomID: {alice, bob}, }) @@ -272,7 +278,8 @@ func TestMultipleRequestWakeup(t *testing.T) { func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { // listen as bob. Make bob leave room. Make alice send event to room. // Make sure alice gets woken up only and not bob as well. - n := NewNotifier(syncPositionBefore) + n := NewNotifier() + n.SetCurrentPosition(syncPositionBefore) n.setUsersJoinedToRooms(map[string][]string{ roomID: {alice, bob}, }) diff --git a/syncapi/producers/federationapi_presence.go b/syncapi/producers/federationapi_presence.go new file mode 100644 index 00000000..dc03457e --- /dev/null +++ b/syncapi/producers/federationapi_presence.go @@ -0,0 +1,48 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "strconv" + "time" + + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" +) + +// FederationAPIPresenceProducer produces events for the federation API server to consume +type FederationAPIPresenceProducer struct { + Topic string + JetStream nats.JetStreamContext +} + +func (f *FederationAPIPresenceProducer) SendPresence( + userID string, presence types.Presence, statusMsg *string, +) error { + msg := nats.NewMsg(f.Topic) + msg.Header.Set(jetstream.UserID, userID) + msg.Header.Set("presence", presence.String()) + msg.Header.Set("from_sync", "true") // only update last_active_ts and presence + msg.Header.Set("last_active_ts", strconv.Itoa(int(gomatrixserverlib.AsTimestamp(time.Now())))) + + if statusMsg != nil { + msg.Header.Set("status_msg", *statusMsg) + } + + _, err := f.JetStream.PublishMsg(msg) + return err +} diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 03313ec6..0b3ab235 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -26,6 +26,7 @@ import ( ) type Database interface { + Presence MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) @@ -150,3 +151,10 @@ type Database interface { StreamToTopologicalPosition(ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool) (types.TopologyToken, error) } + +type Presence interface { + UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) + GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) + PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error) + MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) +} diff --git a/syncapi/storage/postgres/presence_table.go b/syncapi/storage/postgres/presence_table.go new file mode 100644 index 00000000..49336c4e --- /dev/null +++ b/syncapi/storage/postgres/presence_table.go @@ -0,0 +1,162 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +const presenceSchema = ` +CREATE SEQUENCE IF NOT EXISTS syncapi_presence_id; +-- Stores data about presence +CREATE TABLE IF NOT EXISTS syncapi_presence ( + -- The ID + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_presence_id'), + -- The Matrix user ID + user_id TEXT NOT NULL, + -- The actual presence + presence INT NOT NULL, + -- The status message + status_msg TEXT, + -- The last time an action was received by this user + last_active_ts BIGINT NOT NULL, + CONSTRAINT presence_presences_unique UNIQUE (user_id) +); +CREATE INDEX IF NOT EXISTS syncapi_presence_user_id ON syncapi_presence(user_id); +` + +const upsertPresenceSQL = "" + + "INSERT INTO syncapi_presence AS p" + + " (user_id, presence, status_msg, last_active_ts)" + + " VALUES ($1, $2, $3, $4)" + + " ON CONFLICT (user_id)" + + " DO UPDATE SET id = nextval('syncapi_presence_id')," + + " presence = $2, status_msg = COALESCE($3, p.status_msg), last_active_ts = $4" + + " RETURNING id" + +const upsertPresenceFromSyncSQL = "" + + "INSERT INTO syncapi_presence AS p" + + " (user_id, presence, last_active_ts)" + + " VALUES ($1, $2, $3)" + + " ON CONFLICT (user_id)" + + " DO UPDATE SET id = nextval('syncapi_presence_id')," + + " presence = $2, last_active_ts = $3" + + " RETURNING id" + +const selectPresenceForUserSQL = "" + + "SELECT presence, status_msg, last_active_ts" + + " FROM syncapi_presence" + + " WHERE user_id = $1 LIMIT 1" + +const selectMaxPresenceSQL = "" + + "SELECT COALESCE(MAX(id), 0) FROM syncapi_presence" + +const selectPresenceAfter = "" + + " SELECT id, user_id, presence, status_msg, last_active_ts" + + " FROM syncapi_presence" + + " WHERE id > $1" + +type presenceStatements struct { + upsertPresenceStmt *sql.Stmt + upsertPresenceFromSyncStmt *sql.Stmt + selectPresenceForUsersStmt *sql.Stmt + selectMaxPresenceStmt *sql.Stmt + selectPresenceAfterStmt *sql.Stmt +} + +func NewPostgresPresenceTable(db *sql.DB) (*presenceStatements, error) { + _, err := db.Exec(presenceSchema) + if err != nil { + return nil, err + } + s := &presenceStatements{} + return s, sqlutil.StatementList{ + {&s.upsertPresenceStmt, upsertPresenceSQL}, + {&s.upsertPresenceFromSyncStmt, upsertPresenceFromSyncSQL}, + {&s.selectPresenceForUsersStmt, selectPresenceForUserSQL}, + {&s.selectMaxPresenceStmt, selectMaxPresenceSQL}, + {&s.selectPresenceAfterStmt, selectPresenceAfter}, + }.Prepare(db) +} + +// UpsertPresence creates/updates a presence status. +func (p *presenceStatements) UpsertPresence( + ctx context.Context, + txn *sql.Tx, + userID string, + statusMsg *string, + presence types.Presence, + lastActiveTS gomatrixserverlib.Timestamp, + fromSync bool, +) (pos types.StreamPosition, err error) { + if fromSync { + stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt) + err = stmt.QueryRowContext(ctx, userID, presence, lastActiveTS).Scan(&pos) + } else { + stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt) + err = stmt.QueryRowContext(ctx, userID, presence, statusMsg, lastActiveTS).Scan(&pos) + } + return +} + +// GetPresenceForUser returns the current presence of a user. +func (p *presenceStatements) GetPresenceForUser( + ctx context.Context, txn *sql.Tx, + userID string, +) (*types.PresenceInternal, error) { + result := &types.PresenceInternal{ + UserID: userID, + } + stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt) + err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS) + result.ClientFields.Presence = result.Presence.String() + return result, err +} + +func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) { + stmt := sqlutil.TxStmt(txn, p.selectMaxPresenceStmt) + err = stmt.QueryRowContext(ctx).Scan(&pos) + return +} + +// GetPresenceAfter returns the changes presences after a given stream id +func (p *presenceStatements) GetPresenceAfter( + ctx context.Context, txn *sql.Tx, + after types.StreamPosition, +) (presences map[string]*types.PresenceInternal, err error) { + presences = make(map[string]*types.PresenceInternal) + stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt) + + rows, err := stmt.QueryContext(ctx, after) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows") + for rows.Next() { + qryRes := &types.PresenceInternal{} + if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil { + return nil, err + } + qryRes.ClientFields.Presence = qryRes.Presence.String() + presences[qryRes.UserID] = qryRes + } + return presences, rows.Err() +} diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 4e4b5c0b..54445c7e 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -90,6 +90,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e if err != nil { return nil, err } + presence, err := NewPostgresPresenceTable(d.db) + if err != nil { + return nil, err + } m := sqlutil.NewMigrations() deltas.LoadFixSequences(m) deltas.LoadRemoveSendToDeviceSentColumn(m) @@ -111,6 +115,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e Receipts: receipts, Memberships: memberships, NotificationData: notificationData, + Presence: presence, } return &d, nil } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index de43678d..7c4786fc 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -48,6 +48,7 @@ type Database struct { Receipts tables.Receipts Memberships tables.Memberships NotificationData tables.NotificationData + Presence tables.Presence } func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) { @@ -1002,3 +1003,19 @@ func (s *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) { return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter) } + +func (s *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) { + return s.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync) +} + +func (s *Database) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) { + return s.Presence.GetPresenceForUser(ctx, nil, userID) +} + +func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error) { + return s.Presence.GetPresenceAfter(ctx, nil, after) +} + +func (s *Database) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) { + return s.Presence.GetMaxPresenceID(ctx, nil) +} diff --git a/syncapi/storage/sqlite3/presence_table.go b/syncapi/storage/sqlite3/presence_table.go new file mode 100644 index 00000000..e7b78a70 --- /dev/null +++ b/syncapi/storage/sqlite3/presence_table.go @@ -0,0 +1,177 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +const presenceSchema = ` +-- Stores data about presence +CREATE TABLE IF NOT EXISTS syncapi_presence ( + -- The ID + id BIGINT NOT NULL, + -- The Matrix user ID + user_id TEXT NOT NULL, + -- The actual presence + presence INT NOT NULL, + -- The status message + status_msg TEXT, + -- The last time an action was received by this user + last_active_ts BIGINT NOT NULL, + CONSTRAINT presence_presences_unique UNIQUE (user_id) +); +CREATE INDEX IF NOT EXISTS syncapi_presence_user_id ON syncapi_presence(user_id); +` + +const upsertPresenceSQL = "" + + "INSERT INTO syncapi_presence AS p" + + " (id, user_id, presence, status_msg, last_active_ts)" + + " VALUES ($1, $2, $3, $4, $5)" + + " ON CONFLICT (user_id)" + + " DO UPDATE SET id = $6, " + + " presence = $7, status_msg = COALESCE($8, p.status_msg), last_active_ts = $9" + + " RETURNING id" + +const upsertPresenceFromSyncSQL = "" + + "INSERT INTO syncapi_presence AS p" + + " (id, user_id, presence, last_active_ts)" + + " VALUES ($1, $2, $3, $4)" + + " ON CONFLICT (user_id)" + + " DO UPDATE SET id = $5, " + + " presence = $6, last_active_ts = $7" + + " RETURNING id" + +const selectPresenceForUserSQL = "" + + "SELECT presence, status_msg, last_active_ts" + + " FROM syncapi_presence" + + " WHERE user_id = $1 LIMIT 1" + +const selectMaxPresenceSQL = "" + + "SELECT COALESCE(MAX(id), 0) FROM syncapi_presence" + +const selectPresenceAfter = "" + + " SELECT id, user_id, presence, status_msg, last_active_ts" + + " FROM syncapi_presence" + + " WHERE id > $1" + +type presenceStatements struct { + db *sql.DB + streamIDStatements *streamIDStatements + upsertPresenceStmt *sql.Stmt + upsertPresenceFromSyncStmt *sql.Stmt + selectPresenceForUsersStmt *sql.Stmt + selectMaxPresenceStmt *sql.Stmt + selectPresenceAfterStmt *sql.Stmt +} + +func NewSqlitePresenceTable(db *sql.DB, streamID *streamIDStatements) (*presenceStatements, error) { + _, err := db.Exec(presenceSchema) + if err != nil { + return nil, err + } + s := &presenceStatements{ + db: db, + streamIDStatements: streamID, + } + return s, sqlutil.StatementList{ + {&s.upsertPresenceStmt, upsertPresenceSQL}, + {&s.upsertPresenceFromSyncStmt, upsertPresenceFromSyncSQL}, + {&s.selectPresenceForUsersStmt, selectPresenceForUserSQL}, + {&s.selectMaxPresenceStmt, selectMaxPresenceSQL}, + {&s.selectPresenceAfterStmt, selectPresenceAfter}, + }.Prepare(db) +} + +// UpsertPresence creates/updates a presence status. +func (p *presenceStatements) UpsertPresence( + ctx context.Context, + txn *sql.Tx, + userID string, + statusMsg *string, + presence types.Presence, + lastActiveTS gomatrixserverlib.Timestamp, + fromSync bool, +) (pos types.StreamPosition, err error) { + pos, err = p.streamIDStatements.nextPresenceID(ctx, txn) + if err != nil { + return pos, err + } + + if fromSync { + stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt) + err = stmt.QueryRowContext(ctx, + pos, userID, presence, + lastActiveTS, pos, + presence, lastActiveTS).Scan(&pos) + } else { + stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt) + err = stmt.QueryRowContext(ctx, + pos, userID, presence, + statusMsg, lastActiveTS, pos, + presence, statusMsg, lastActiveTS).Scan(&pos) + } + return +} + +// GetPresenceForUser returns the current presence of a user. +func (p *presenceStatements) GetPresenceForUser( + ctx context.Context, txn *sql.Tx, + userID string, +) (*types.PresenceInternal, error) { + result := &types.PresenceInternal{ + UserID: userID, + } + stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt) + err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS) + result.ClientFields.Presence = result.Presence.String() + return result, err +} + +func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) { + stmt := sqlutil.TxStmt(txn, p.selectMaxPresenceStmt) + err = stmt.QueryRowContext(ctx).Scan(&pos) + return +} + +// GetPresenceAfter returns the changes presences after a given stream id +func (p *presenceStatements) GetPresenceAfter( + ctx context.Context, txn *sql.Tx, + after types.StreamPosition, +) (presences map[string]*types.PresenceInternal, err error) { + presences = make(map[string]*types.PresenceInternal) + stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt) + + rows, err := stmt.QueryContext(ctx, after) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows") + for rows.Next() { + qryRes := &types.PresenceInternal{} + if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil { + return nil, err + } + qryRes.ClientFields.Presence = qryRes.Presence.String() + presences[qryRes.UserID] = qryRes + } + return presences, rows.Err() +} diff --git a/syncapi/storage/sqlite3/stream_id_table.go b/syncapi/storage/sqlite3/stream_id_table.go index 2be3ae93..faa2c41f 100644 --- a/syncapi/storage/sqlite3/stream_id_table.go +++ b/syncapi/storage/sqlite3/stream_id_table.go @@ -24,6 +24,8 @@ INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("accountdata", 0) ON CONFLICT DO NOTHING; INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("invite", 0) ON CONFLICT DO NOTHING; +INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("presence", 0) + ON CONFLICT DO NOTHING; ` const increaseStreamIDStmt = "" + @@ -70,3 +72,9 @@ func (s *streamIDStatements) nextAccountDataID(ctx context.Context, txn *sql.Tx) err = increaseStmt.QueryRowContext(ctx, "accountdata").Scan(&pos) return } + +func (s *streamIDStatements) nextPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) { + increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt) + err = increaseStmt.QueryRowContext(ctx, "presence").Scan(&pos) + return +} diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index cb7e3b46..cb2c3169 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er if err != nil { return err } + presence, err := NewSqlitePresenceTable(d.db, &d.streamID) + if err != nil { + return err + } m := sqlutil.NewMigrations() deltas.LoadFixSequences(m) deltas.LoadRemoveSendToDeviceSentColumn(m) @@ -121,6 +125,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er Receipts: receipts, Memberships: memberships, NotificationData: notificationData, + Presence: presence, } return nil } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 2c29888d..ef0587bb 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -182,3 +182,10 @@ type NotificationData interface { SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) SelectMaxID(ctx context.Context) (int64, error) } + +type Presence interface { + UpsertPresence(ctx context.Context, txn *sql.Tx, userID string, statusMsg *string, presence types.Presence, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (pos types.StreamPosition, err error) + GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.PresenceInternal, err error) + GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) + GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition) (presences map[string]*types.PresenceInternal, err error) +} diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go new file mode 100644 index 00000000..a24edad5 --- /dev/null +++ b/syncapi/streams/stream_presence.go @@ -0,0 +1,179 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package streams + +import ( + "context" + "database/sql" + "encoding/json" + "sync" + + "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +type PresenceStreamProvider struct { + StreamProvider + // cache contains previously sent presence updates to avoid unneeded updates + cache sync.Map + notifier *notifier.Notifier +} + +func (p *PresenceStreamProvider) Setup() { + p.StreamProvider.Setup() + + id, err := p.DB.MaxStreamPositionForPresence(context.Background()) + if err != nil { + panic(err) + } + p.latest = id +} + +func (p *PresenceStreamProvider) CompleteSync( + ctx context.Context, + req *types.SyncRequest, +) types.StreamPosition { + return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) +} + +func (p *PresenceStreamProvider) IncrementalSync( + ctx context.Context, + req *types.SyncRequest, + from, to types.StreamPosition, +) types.StreamPosition { + presences, err := p.DB.PresenceAfter(ctx, from) + if err != nil { + req.Log.WithError(err).Error("p.DB.PresenceAfter failed") + return from + } + + if len(presences) == 0 { + return to + } + + // get all joined users + // TODO: SharedUsers might get out of sync + sharedUsers := p.notifier.SharedUsers(req.Device.UserID) + + sharedUsersMap := map[string]bool{ + req.Device.UserID: true, + } + // convert array to a map for easier checking if a user exists + for i := range sharedUsers { + sharedUsersMap[sharedUsers[i]] = true + } + + // add newly joined rooms user presences + newlyJoined := joinedRooms(req.Response, req.Device.UserID) + if len(newlyJoined) > 0 { + // TODO: This refreshes all lists and is quite expensive + // The notifier should update the lists itself + if err = p.notifier.Load(ctx, p.DB); err != nil { + req.Log.WithError(err).Error("unable to refresh notifier lists") + return from + } + for _, roomID := range newlyJoined { + roomUsers := p.notifier.JoinedUsers(roomID) + for i := range roomUsers { + sharedUsersMap[roomUsers[i]] = true + // we already got a presence from this user + if _, ok := presences[roomUsers[i]]; ok { + continue + } + presences[roomUsers[i]], err = p.DB.GetPresence(ctx, roomUsers[i]) + if err != nil { + if err == sql.ErrNoRows { + continue + } + req.Log.WithError(err).Error("unable to query presence for user") + return from + } + } + } + } + + lastPos := to + for i := range presences { + presence := presences[i] + // Ignore users we don't share a room with + if !sharedUsersMap[presence.UserID] { + continue + } + cacheKey := req.Device.UserID + req.Device.ID + presence.UserID + pres, ok := p.cache.Load(cacheKey) + if ok { + // skip already sent presence + prevPresence := pres.(*types.PresenceInternal) + currentlyActive := prevPresence.CurrentlyActive() + skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID + if skip { + req.Log.Debugf("Skipping presence, no change (%s)", presence.UserID) + continue + } + } + presence.ClientFields.LastActiveAgo = presence.LastActiveAgo() + if presence.ClientFields.Presence == "online" { + currentlyActive := presence.CurrentlyActive() + presence.ClientFields.CurrentlyActive = ¤tlyActive + } + + content, err := json.Marshal(presence.ClientFields) + if err != nil { + return from + } + + req.Response.Presence.Events = append(req.Response.Presence.Events, gomatrixserverlib.ClientEvent{ + Content: content, + Sender: presence.UserID, + Type: gomatrixserverlib.MPresence, + }) + if presence.StreamPos > lastPos { + lastPos = presence.StreamPos + } + p.cache.Store(cacheKey, presence) + } + + return lastPos +} + +func joinedRooms(res *types.Response, userID string) []string { + var roomIDs []string + for roomID, join := range res.Rooms.Join { + // we would expect to see our join event somewhere if we newly joined the room. + // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'. + newlyJoined := membershipEventPresent(join.State.Events, userID) + if newlyJoined { + roomIDs = append(roomIDs, roomID) + continue + } + newlyJoined = membershipEventPresent(join.Timeline.Events, userID) + if newlyJoined { + roomIDs = append(roomIDs, roomID) + } + } + return roomIDs +} + +func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool { + for _, ev := range events { + // it's enough to know that we have our member event here, don't need to check membership content + // as it's implied by being in the respective section of the sync response. + if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID { + return true + } + } + return false +} diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index b2273aad..07322388 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -6,6 +6,7 @@ import ( "github.com/matrix-org/dendrite/internal/caching" keyapi "github.com/matrix-org/dendrite/keyserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -20,12 +21,13 @@ type Streams struct { AccountDataStreamProvider types.StreamProvider DeviceListStreamProvider types.StreamProvider NotificationDataStreamProvider types.StreamProvider + PresenceStreamProvider types.StreamProvider } func NewSyncStreamProviders( d storage.Database, userAPI userapi.UserInternalAPI, rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI, - eduCache *caching.EDUCache, + eduCache *caching.EDUCache, notifier *notifier.Notifier, ) *Streams { streams := &Streams{ PDUStreamProvider: &PDUStreamProvider{ @@ -56,6 +58,10 @@ func NewSyncStreamProviders( rsAPI: rsAPI, keyAPI: keyAPI, }, + PresenceStreamProvider: &PresenceStreamProvider{ + StreamProvider: StreamProvider{DB: d}, + notifier: notifier, + }, } streams.PDUStreamProvider.Setup() @@ -66,6 +72,7 @@ func NewSyncStreamProviders( streams.AccountDataStreamProvider.Setup() streams.NotificationDataStreamProvider.Setup() streams.DeviceListStreamProvider.Setup() + streams.PresenceStreamProvider.Setup() return streams } @@ -80,5 +87,6 @@ func (s *Streams) Latest(ctx context.Context) types.StreamingToken { AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx), NotificationDataPosition: s.NotificationDataStreamProvider.LatestPosition(ctx), DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx), + PresencePosition: s.PresenceStreamProvider.LatestPosition(ctx), } } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 2c9920d1..cf667337 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -17,6 +17,8 @@ package sync import ( + "context" + "database/sql" "net" "net/http" "strings" @@ -33,8 +35,10 @@ import ( "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" ) // RequestPool manages HTTP long-poll connections for /sync @@ -44,9 +48,15 @@ type RequestPool struct { userAPI userapi.UserInternalAPI keyAPI keyapi.KeyInternalAPI rsAPI roomserverAPI.RoomserverInternalAPI - lastseen sync.Map + lastseen *sync.Map + presence *sync.Map streams *streams.Streams Notifier *notifier.Notifier + producer PresencePublisher +} + +type PresencePublisher interface { + SendPresence(userID string, presence types.Presence, statusMsg *string) error } // NewRequestPool makes a new RequestPool @@ -55,6 +65,7 @@ func NewRequestPool( userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, streams *streams.Streams, notifier *notifier.Notifier, + producer PresencePublisher, ) *RequestPool { rp := &RequestPool{ db: db, @@ -62,11 +73,14 @@ func NewRequestPool( userAPI: userAPI, keyAPI: keyAPI, rsAPI: rsAPI, - lastseen: sync.Map{}, + lastseen: &sync.Map{}, + presence: &sync.Map{}, streams: streams, Notifier: notifier, + producer: producer, } go rp.cleanLastSeen() + go rp.cleanPresence(db, time.Minute*5) return rp } @@ -80,6 +94,68 @@ func (rp *RequestPool) cleanLastSeen() { } } +func (rp *RequestPool) cleanPresence(db storage.Presence, cleanupTime time.Duration) { + if !rp.cfg.Matrix.Presence.EnableOutbound { + return + } + for { + rp.presence.Range(func(key interface{}, v interface{}) bool { + p := v.(types.PresenceInternal) + if time.Since(p.LastActiveTS.Time()) > cleanupTime { + rp.updatePresence(db, types.PresenceUnavailable.String(), p.UserID) + rp.presence.Delete(key) + } + return true + }) + time.Sleep(cleanupTime) + } +} + +// updatePresence sends presence updates to the SyncAPI and FederationAPI +func (rp *RequestPool) updatePresence(db storage.Presence, presence string, userID string) { + if !rp.cfg.Matrix.Presence.EnableOutbound { + return + } + if presence == "" { + presence = types.PresenceOnline.String() + } + + presenceID, ok := types.PresenceFromString(presence) + if !ok { // this should almost never happen + logrus.Errorf("unknown presence '%s'", presence) + return + } + + newPresence := types.PresenceInternal{ + ClientFields: types.PresenceClientResponse{ + Presence: presenceID.String(), + }, + Presence: presenceID, + UserID: userID, + LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()), + } + defer rp.presence.Store(userID, newPresence) + // avoid spamming presence updates when syncing + existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence) + if ok { + p := existingPresence.(types.PresenceInternal) + if p.ClientFields.Presence == newPresence.ClientFields.Presence { + return + } + } + + // ensure we also send the current status_msg to federated servers and not nil + dbPresence, err := db.GetPresence(context.Background(), userID) + if err != nil && err != sql.ErrNoRows { + return + } + + if err := rp.producer.SendPresence(userID, presenceID, dbPresence.ClientFields.StatusMsg); err != nil { + logrus.WithError(err).Error("Unable to publish presence message from sync") + return + } +} + func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device) { if _, ok := rp.lastseen.LoadOrStore(device.UserID+device.ID, struct{}{}); ok { return @@ -156,6 +232,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. defer activeSyncRequests.Dec() rp.updateLastSeen(req, device) + rp.updatePresence(rp.db, req.FormValue("set_presence"), device.UserID) waitingSyncRequests.Inc() defer waitingSyncRequests.Dec() @@ -219,6 +296,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync( syncReq.Context, syncReq, ), + PresencePosition: rp.streams.PresenceStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), } } else { // Incremental sync @@ -255,6 +335,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. syncReq.Context, syncReq, syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition, ), + PresencePosition: rp.streams.PresenceStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.PresencePosition, currentPos.PresencePosition, + ), } } diff --git a/syncapi/sync/requestpool_test.go b/syncapi/sync/requestpool_test.go new file mode 100644 index 00000000..a8008994 --- /dev/null +++ b/syncapi/sync/requestpool_test.go @@ -0,0 +1,128 @@ +package sync + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +type dummyPublisher struct { + count int +} + +func (d *dummyPublisher) SendPresence(userID string, presence types.Presence, statusMsg *string) error { + d.count++ + return nil +} + +type dummyDB struct{} + +func (d dummyDB) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) { + return 0, nil +} + +func (d dummyDB) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) { + return &types.PresenceInternal{}, nil +} + +func (d dummyDB) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error) { + return map[string]*types.PresenceInternal{}, nil +} + +func (d dummyDB) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) { + return 0, nil +} + +func TestRequestPool_updatePresence(t *testing.T) { + type args struct { + presence string + userID string + sleep time.Duration + } + publisher := &dummyPublisher{} + syncMap := sync.Map{} + + tests := []struct { + name string + args args + wantIncrease bool + }{ + { + name: "new presence is published", + wantIncrease: true, + args: args{ + userID: "dummy", + }, + }, + { + name: "presence not published, no change", + args: args{ + userID: "dummy", + }, + }, + { + name: "new presence is published dummy2", + wantIncrease: true, + args: args{ + userID: "dummy2", + presence: "online", + }, + }, + { + name: "different presence is published dummy2", + wantIncrease: true, + args: args{ + userID: "dummy2", + presence: "unavailable", + }, + }, + { + name: "same presence is not published dummy2", + args: args{ + userID: "dummy2", + presence: "unavailable", + sleep: time.Millisecond * 150, + }, + }, + { + name: "same presence is published after being deleted", + wantIncrease: true, + args: args{ + userID: "dummy2", + presence: "unavailable", + }, + }, + } + rp := &RequestPool{ + presence: &syncMap, + producer: publisher, + cfg: &config.SyncAPI{ + Matrix: &config.Global{ + JetStream: config.JetStream{ + TopicPrefix: "Dendrite", + }, + Presence: config.PresenceOptions{ + EnableInbound: true, + EnableOutbound: true, + }, + }, + }, + } + db := dummyDB{} + go rp.cleanPresence(db, time.Millisecond*50) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + beforeCount := publisher.count + rp.updatePresence(db, tt.args.presence, tt.args.userID) + if tt.wantIncrease && publisher.count <= beforeCount { + t.Fatalf("expected count to increase: %d <= %d", publisher.count, beforeCount) + } + time.Sleep(tt.args.sleep) + }) + } +} diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index b579467a..384121a8 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -49,7 +49,7 @@ func AddPublicRoutes( federation *gomatrixserverlib.FederationClient, cfg *config.SyncAPI, ) { - js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream) + js, natsClient := jetstream.Prepare(process, &cfg.Matrix.JetStream) syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) if err != nil { @@ -57,13 +57,19 @@ func AddPublicRoutes( } eduCache := caching.NewTypingCache() - streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache) - notifier := notifier.NewNotifier(streams.Latest(context.Background())) + notifier := notifier.NewNotifier() + streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, notifier) + notifier.SetCurrentPosition(streams.Latest(context.Background())) if err = notifier.Load(context.Background(), syncDB); err != nil { logrus.WithError(err).Panicf("failed to load notifier ") } - requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier) + federationPresenceProducer := &producers.FederationAPIPresenceProducer{ + Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), + JetStream: js, + } + + requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer) userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{ JetStream: js, @@ -75,8 +81,6 @@ func AddPublicRoutes( Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate), } - _ = userAPIReadUpdateProducer - keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( process, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), js, keyAPI, rsAPI, syncDB, notifier, @@ -131,5 +135,14 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start receipts consumer") } + presenceConsumer := consumers.NewPresenceConsumer( + process, cfg, js, natsClient, syncDB, + notifier, streams.PresenceStreamProvider, + userAPI, + ) + if err = presenceConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start presence consumer") + } + routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg) } diff --git a/syncapi/types/presence.go b/syncapi/types/presence.go new file mode 100644 index 00000000..40aa29cf --- /dev/null +++ b/syncapi/types/presence.go @@ -0,0 +1,75 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "strings" + "time" + + "github.com/matrix-org/gomatrixserverlib" +) + +//go:generate stringer -type=Presence -linecomment +type Presence uint8 + +const ( + PresenceUnavailable Presence = iota + 1 // unavailable + PresenceOnline // online + PresenceOffline // offline +) + +// PresenceFromString returns the integer representation of the given input presence. +// Returns false for ok, if input is not a valid presence value. +func PresenceFromString(input string) (p Presence, ok bool) { + for i := 0; i < len(_Presence_index)-1; i++ { + l, r := _Presence_index[i], _Presence_index[i+1] + if strings.EqualFold(input, _Presence_name[l:r]) { + return Presence(i + 1), true + } + } + return 0, false +} + +type PresenceInternal struct { + ClientFields PresenceClientResponse + StreamPos StreamPosition `json:"-"` + UserID string `json:"-"` + LastActiveTS gomatrixserverlib.Timestamp `json:"-"` + Presence Presence `json:"-"` +} + +// Equals compares p1 with p2. +func (p1 *PresenceInternal) Equals(p2 *PresenceInternal) bool { + return p1.ClientFields.Presence == p2.ClientFields.Presence && + p1.ClientFields.StatusMsg == p2.ClientFields.StatusMsg && + p1.UserID == p2.UserID +} + +// CurrentlyActive returns the current active state. +func (p *PresenceInternal) CurrentlyActive() bool { + return time.Since(p.LastActiveTS.Time()).Minutes() < 5 +} + +// LastActiveAgo returns the time since the LastActiveTS in milliseconds. +func (p *PresenceInternal) LastActiveAgo() int64 { + return time.Since(p.LastActiveTS.Time()).Milliseconds() +} + +type PresenceClientResponse struct { + CurrentlyActive *bool `json:"currently_active,omitempty"` + LastActiveAgo int64 `json:"last_active_ago,omitempty"` + Presence string `json:"presence"` + StatusMsg *string `json:"status_msg,omitempty"` +} diff --git a/syncapi/types/presence_string.go b/syncapi/types/presence_string.go new file mode 100644 index 00000000..467b463b --- /dev/null +++ b/syncapi/types/presence_string.go @@ -0,0 +1,26 @@ +// Code generated by "stringer -type=Presence -linecomment"; DO NOT EDIT. + +package types + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[PresenceUnavailable-1] + _ = x[PresenceOnline-2] + _ = x[PresenceOffline-3] +} + +const _Presence_name = "unavailableonlineoffline" + +var _Presence_index = [...]uint8{0, 11, 17, 24} + +func (i Presence) String() string { + i -= 1 + if i >= Presence(len(_Presence_index)-1) { + return "Presence(" + strconv.FormatInt(int64(i+1), 10) + ")" + } + return _Presence_name[_Presence_index[i]:_Presence_index[i+1]] +} diff --git a/syncapi/types/presence_test.go b/syncapi/types/presence_test.go new file mode 100644 index 00000000..dbc201c5 --- /dev/null +++ b/syncapi/types/presence_test.go @@ -0,0 +1,42 @@ +package types + +import "testing" + +func TestPresenceFromString(t *testing.T) { + tests := []struct { + name string + input string + wantStatus Presence + wantOk bool + }{ + { + name: "presence unavailable", + input: "unavailable", + wantStatus: PresenceUnavailable, + wantOk: true, + }, + { + name: "presence online", + input: "OnLINE", + wantStatus: PresenceOnline, + wantOk: true, + }, + { + name: "unknown presence", + input: "unknown", + wantStatus: 0, + wantOk: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, got1 := PresenceFromString(tt.input) + if got != tt.wantStatus { + t.Errorf("PresenceFromString() got = %v, want %v", got, tt.wantStatus) + } + if got1 != tt.wantOk { + t.Errorf("PresenceFromString() got1 = %v, want %v", got1, tt.wantOk) + } + }) + } +} diff --git a/syncapi/types/types.go b/syncapi/types/types.go index d0efa1bb..d21203b5 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -103,6 +103,7 @@ type StreamingToken struct { AccountDataPosition StreamPosition DeviceListPosition StreamPosition NotificationDataPosition StreamPosition + PresencePosition StreamPosition } // This will be used as a fallback by json.Marshal. @@ -118,11 +119,12 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) { func (t StreamingToken) String() string { posStr := fmt.Sprintf( - "s%d_%d_%d_%d_%d_%d_%d_%d", + "s%d_%d_%d_%d_%d_%d_%d_%d_%d", t.PDUPosition, t.TypingPosition, t.ReceiptPosition, t.SendToDevicePosition, t.InvitePosition, t.AccountDataPosition, t.DeviceListPosition, t.NotificationDataPosition, + t.PresencePosition, ) return posStr } @@ -146,12 +148,14 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool { return true case t.NotificationDataPosition > other.NotificationDataPosition: return true + case t.PresencePosition > other.PresencePosition: + return true } return false } func (t *StreamingToken) IsEmpty() bool { - return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition == 0 + return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition+t.PresencePosition == 0 } // WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken. @@ -192,6 +196,9 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) { if other.NotificationDataPosition > t.NotificationDataPosition { t.NotificationDataPosition = other.NotificationDataPosition } + if other.PresencePosition > t.PresencePosition { + t.PresencePosition = other.PresencePosition + } } type TopologyToken struct { @@ -284,7 +291,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { // s478_0_0_0_0_13.dl-0-2 but we have now removed partitioned stream positions tok = strings.Split(tok, ".")[0] parts := strings.Split(tok[1:], "_") - var positions [8]StreamPosition + var positions [9]StreamPosition for i, p := range parts { if i >= len(positions) { break @@ -306,6 +313,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { AccountDataPosition: positions[5], DeviceListPosition: positions[6], NotificationDataPosition: positions[7], + PresencePosition: positions[8], } return token, nil } diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go index ff78bfb9..19fcfc15 100644 --- a/syncapi/types/types_test.go +++ b/syncapi/types/types_test.go @@ -9,10 +9,10 @@ import ( func TestSyncTokens(t *testing.T) { shouldPass := map[string]string{ - "s4_0_0_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0}.String(), - "s3_1_0_0_0_0_2_0": StreamingToken{3, 1, 0, 0, 0, 0, 2, 0}.String(), - "s3_1_2_3_5_0_0_0": StreamingToken{3, 1, 2, 3, 5, 0, 0, 0}.String(), - "t3_1": TopologyToken{3, 1}.String(), + "s4_0_0_0_0_0_0_0_3": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0, 3}.String(), + "s3_1_0_0_0_0_2_0_5": StreamingToken{3, 1, 0, 0, 0, 0, 2, 0, 5}.String(), + "s3_1_2_3_5_0_0_0_6": StreamingToken{3, 1, 2, 3, 5, 0, 0, 0, 6}.String(), + "t3_1": TopologyToken{3, 1}.String(), } for a, b := range shouldPass { diff --git a/sytest-whitelist b/sytest-whitelist index 38a057da..69fa19c6 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -679,6 +679,20 @@ remote user has push rules copied to upgraded room /upgrade of a bogus room fails gracefully Cannot send tombstone event that points to the same room Room summary counts change when membership changes +GET /presence/:user_id/status fetches initial status +PUT /presence/:user_id/status updates my presence +Presence change reports an event to myself +Existing members see new members' presence +#Existing members see new member's presence +Newly joined room includes presence in incremental sync +Get presence for newly joined members in incremental sync +User sees their own presence in a sync +User sees updates to presence from other users in the incremental sync. +Presence changes are reported to local room members +Presence changes are also reported to remote room members +Presence changes to UNAVAILABLE are reported to local room members +Presence changes to UNAVAILABLE are reported to remote room members +New federated private chats get full presence information (SYN-115) /upgrade copies >100 power levels to the new room Room state after a rejected message event is the same as before Room state after a rejected state event is the same as before \ No newline at end of file diff --git a/userapi/api/api.go b/userapi/api/api.go index a9544f00..b86774d1 100644 --- a/userapi/api/api.go +++ b/userapi/api/api.go @@ -31,12 +31,10 @@ type UserInternalAPI interface { UserRegisterAPI UserAccountAPI UserThreePIDAPI + UserDeviceAPI InputAccountData(ctx context.Context, req *InputAccountDataRequest, res *InputAccountDataResponse) error - PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error - PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error - PerformDeviceUpdate(ctx context.Context, req *PerformDeviceUpdateRequest, res *PerformDeviceUpdateResponse) error PerformOpenIDTokenCreation(ctx context.Context, req *PerformOpenIDTokenCreationRequest, res *PerformOpenIDTokenCreationResponse) error PerformKeyBackup(ctx context.Context, req *PerformKeyBackupRequest, res *PerformKeyBackupResponse) error PerformPusherSet(ctx context.Context, req *PerformPusherSetRequest, res *struct{}) error @@ -45,15 +43,21 @@ type UserInternalAPI interface { QueryKeyBackup(ctx context.Context, req *QueryKeyBackupRequest, res *QueryKeyBackupResponse) QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error - QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error - QueryDeviceInfos(ctx context.Context, req *QueryDeviceInfosRequest, res *QueryDeviceInfosResponse) error QueryOpenIDToken(ctx context.Context, req *QueryOpenIDTokenRequest, res *QueryOpenIDTokenResponse) error QueryPushers(ctx context.Context, req *QueryPushersRequest, res *QueryPushersResponse) error QueryPushRules(ctx context.Context, req *QueryPushRulesRequest, res *QueryPushRulesResponse) error QueryNotifications(ctx context.Context, req *QueryNotificationsRequest, res *QueryNotificationsResponse) error } +type UserDeviceAPI interface { + PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error + PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error + PerformDeviceUpdate(ctx context.Context, req *PerformDeviceUpdateRequest, res *PerformDeviceUpdateResponse) error + QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error + QueryDeviceInfos(ctx context.Context, req *QueryDeviceInfosRequest, res *QueryDeviceInfosResponse) error +} + type UserDirectoryProvider interface { QuerySearchProfiles(ctx context.Context, req *QuerySearchProfilesRequest, res *QuerySearchProfilesResponse) error }