Add presence module V2 (#2312)

* Syncapi presence

* Clientapi http presence handler

* Why is this here?

* Missing files

* FederationAPI presence implementation

* Add new presence stream

* Pinecone update

* Pinecone update

* Add passing tests

* Make linter happy

* Add presence producer

* Add presence config option

* Set user to unavailable after x minutes

* Only set currently_active if online
Avoid unneeded presence updates when syncing

* Tweaks

* Query devices for last_active_ts
Fixes & tweaks

* Export SharedUsers/SharedUsers

* Presence stream in MemoryStorage

* Remove status_msg_nil

* Fix sytest crashes

* Make presence types const and use stringer for it

* Change options to allow inbound/outbound presence

* Fix option & typo

* Update configs

Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
This commit is contained in:
Till 2022-04-06 13:11:19 +02:00 committed by GitHub
parent 16e2d243fc
commit e5e3350ce1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 1706 additions and 66 deletions

View File

@ -62,6 +62,17 @@ global:
- matrix.org
- vector.im
# Disables federation. Dendrite will not be able to make any outbound HTTP requests
# to other servers and the federation API will not be exposed.
disable_federation: false
# Configures the handling of presence events.
presence:
# Whether inbound presence events are allowed, e.g. receiving presence events from other servers
enable_inbound: false
# Whether outbound presence events are allowed, e.g. sending presence events to other servers
enable_outbound: false
# Configuration for NATS JetStream
jetstream:
# A list of NATS Server addresses to connect to. If none are specified, an

View File

@ -48,7 +48,7 @@ func AddPublicRoutes(
extRoomsProvider api.ExtraPublicRoomsProvider,
mscCfg *config.MSCs,
) {
js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream)
js, natsClient := jetstream.Prepare(process, &cfg.Matrix.JetStream)
syncProducer := &producers.SyncAPIProducer{
JetStream: js,
@ -56,6 +56,7 @@ func AddPublicRoutes(
TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
UserAPI: userAPI,
ServerName: cfg.Matrix.ServerName,
}
@ -64,6 +65,6 @@ func AddPublicRoutes(
router, synapseAdminRouter, cfg, rsAPI, asAPI,
userAPI, userDirectoryProvider, federation,
syncProducer, transactionsCache, fsAPI, keyAPI,
extRoomsProvider, mscCfg,
extRoomsProvider, mscCfg, natsClient,
)
}

View File

@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"strconv"
"time"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/jetstream"
@ -34,6 +35,7 @@ type SyncAPIProducer struct {
TopicReceiptEvent string
TopicSendToDeviceEvent string
TopicTypingEvent string
TopicPresenceEvent string
JetStream nats.JetStreamContext
ServerName gomatrixserverlib.ServerName
UserAPI userapi.UserInternalAPI
@ -173,3 +175,19 @@ func (p *SyncAPIProducer) SendTyping(
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
return err
}
func (p *SyncAPIProducer) SendPresence(
ctx context.Context, userID string, presence types.Presence, statusMsg *string,
) error {
m := nats.NewMsg(p.TopicPresenceEvent)
m.Header.Set(jetstream.UserID, userID)
m.Header.Set("presence", presence.String())
if statusMsg != nil {
m.Header.Set("status_msg", *statusMsg)
}
m.Header.Set("last_active_ts", strconv.Itoa(int(gomatrixserverlib.AsTimestamp(time.Now()))))
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
return err
}

View File

@ -0,0 +1,138 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"fmt"
"net/http"
"strconv"
"time"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
type presenceReq struct {
Presence string `json:"presence"`
StatusMsg *string `json:"status_msg,omitempty"`
}
func SetPresence(
req *http.Request,
cfg *config.ClientAPI,
device *api.Device,
producer *producers.SyncAPIProducer,
userID string,
) util.JSONResponse {
if !cfg.Matrix.Presence.EnableOutbound {
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
}
}
if device.UserID != userID {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("Unable to set presence for other user."),
}
}
var presence presenceReq
parseErr := httputil.UnmarshalJSONRequest(req, &presence)
if parseErr != nil {
return *parseErr
}
presenceStatus, ok := types.PresenceFromString(presence.Presence)
if !ok {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.Unknown(fmt.Sprintf("Unknown presence '%s'.", presence.Presence)),
}
}
err := producer.SendPresence(req.Context(), userID, presenceStatus, presence.StatusMsg)
if err != nil {
log.WithError(err).Errorf("failed to update presence")
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: jsonerror.InternalServerError(),
}
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
}
}
func GetPresence(
req *http.Request,
device *api.Device,
natsClient *nats.Conn,
presenceTopic string,
userID string,
) util.JSONResponse {
msg := nats.NewMsg(presenceTopic)
msg.Header.Set(jetstream.UserID, userID)
presence, err := natsClient.RequestMsg(msg, time.Second*10)
if err != nil {
log.WithError(err).Errorf("unable to get presence")
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: jsonerror.InternalServerError(),
}
}
statusMsg := presence.Header.Get("status_msg")
e := presence.Header.Get("error")
if e != "" {
log.Errorf("received error msg from nats: %s", e)
return util.JSONResponse{
Code: http.StatusOK,
JSON: types.PresenceClientResponse{
Presence: types.PresenceUnavailable.String(),
},
}
}
lastActive, err := strconv.Atoi(presence.Header.Get("last_active_ts"))
if err != nil {
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: jsonerror.InternalServerError(),
}
}
p := types.PresenceInternal{LastActiveTS: gomatrixserverlib.Timestamp(lastActive)}
currentlyActive := p.CurrentlyActive()
return util.JSONResponse{
Code: http.StatusOK,
JSON: types.PresenceClientResponse{
CurrentlyActive: &currentlyActive,
LastActiveAgo: p.LastActiveAgo(),
Presence: presence.Header.Get("presence"),
StatusMsg: &statusMsg,
},
}
}

View File

@ -32,9 +32,11 @@ import (
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
@ -56,7 +58,7 @@ func Setup(
federationSender federationAPI.FederationInternalAPI,
keyAPI keyserverAPI.KeyInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider,
mscCfg *config.MSCs,
mscCfg *config.MSCs, natsClient *nats.Conn,
) {
rateLimits := httputil.NewRateLimits(&cfg.RateLimiting)
userInteractiveAuth := auth.NewUserInteractive(userAPI, cfg)
@ -779,20 +781,6 @@ func Setup(
}),
).Methods(http.MethodPost, http.MethodOptions)
// Element logs get flooded unless this is handled
v3mux.Handle("/presence/{userID}/status",
httputil.MakeExternalAPI("presence", func(req *http.Request) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
return *r
}
// TODO: Set presence (probably the responsibility of a presence server not clientapi)
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
}
}),
).Methods(http.MethodPut, http.MethodOptions)
v3mux.Handle("/voip/turnServer",
httputil.MakeAuthAPI("turn_server", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
@ -1308,4 +1296,22 @@ func Setup(
return SetReceipt(req, syncProducer, device, vars["roomId"], vars["receiptType"], vars["eventId"])
}),
).Methods(http.MethodPost, http.MethodOptions)
v3mux.Handle("/presence/{userId}/status",
httputil.MakeAuthAPI("set_presence", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
return SetPresence(req, cfg, device, syncProducer, vars["userId"])
}),
).Methods(http.MethodPut, http.MethodOptions)
v3mux.Handle("/presence/{userId}/status",
httputil.MakeAuthAPI("get_presence", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
return GetPresence(req, device, natsClient, cfg.Matrix.JetStream.Prefixed(jetstream.RequestPresence), vars["userId"])
}),
).Methods(http.MethodGet, http.MethodOptions)
}

View File

@ -91,6 +91,10 @@ func main() {
cfg.UserAPI.BCryptCost = bcrypt.MinCost
cfg.Global.JetStream.InMemory = true
cfg.ClientAPI.RegistrationSharedSecret = "complement"
cfg.Global.Presence = config.PresenceOptions{
EnableInbound: true,
EnableOutbound: true,
}
}
j, err := yaml.Marshal(cfg)

View File

@ -68,6 +68,13 @@ global:
# to other servers and the federation API will not be exposed.
disable_federation: false
# Configures the handling of presence events.
presence:
# Whether inbound presence events are allowed, e.g. receiving presence events from other servers
enable_inbound: false
# Whether outbound presence events are allowed, e.g. sending presence events to other servers
enable_outbound: false
# Server notices allows server admins to send messages to all users.
server_notices:
enabled: false

View File

@ -0,0 +1,143 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumers
import (
"context"
"encoding/json"
"strconv"
"github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/storage"
fedTypes "github.com/matrix-org/dendrite/federationapi/types"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// OutputReceiptConsumer consumes events that originate in the clientapi.
type OutputPresenceConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable string
db storage.Database
queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName
topic string
outboundPresenceEnabled bool
}
// NewOutputPresenceConsumer creates a new OutputPresenceConsumer. Call Start() to begin consuming events.
func NewOutputPresenceConsumer(
process *process.ProcessContext,
cfg *config.FederationAPI,
js nats.JetStreamContext,
queues *queue.OutgoingQueues,
store storage.Database,
) *OutputPresenceConsumer {
return &OutputPresenceConsumer{
ctx: process.Context(),
jetstream: js,
queues: queues,
db: store,
ServerName: cfg.Matrix.ServerName,
durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound,
}
}
// Start consuming from the clientapi
func (t *OutputPresenceConsumer) Start() error {
if !t.outboundPresenceEnabled {
return nil
}
return jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
)
}
// onMessage is called in response to a message received on the presence
// events topic from the client api.
func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
// only send presence events which originated from us
userID := msg.Header.Get(jetstream.UserID)
_, serverName, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
log.WithError(err).WithField("user_id", userID).Error("failed to extract domain from receipt sender")
return true
}
if serverName != t.ServerName {
return true
}
presence := msg.Header.Get("presence")
ts, err := strconv.Atoi(msg.Header.Get("last_active_ts"))
if err != nil {
return true
}
joined, err := t.db.GetAllJoinedHosts(ctx)
if err != nil {
log.WithError(err).Error("failed to get joined hosts")
return true
}
if len(joined) == 0 {
return true
}
var statusMsg *string = nil
if data, ok := msg.Header["status_msg"]; ok && len(data) > 0 {
status := msg.Header.Get("status_msg")
statusMsg = &status
}
p := types.PresenceInternal{LastActiveTS: gomatrixserverlib.Timestamp(ts)}
content := fedTypes.Presence{
Push: []fedTypes.PresenceContent{
{
CurrentlyActive: p.CurrentlyActive(),
LastActiveAgo: p.LastActiveAgo(),
Presence: presence,
StatusMsg: statusMsg,
UserID: userID,
},
},
}
edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MPresence,
Origin: string(t.ServerName),
}
if edu.Content, err = json.Marshal(content); err != nil {
log.WithError(err).Error("failed to marshal EDU JSON")
return true
}
log.Debugf("sending presence EDU to %d servers", len(joined))
if err = t.queues.SendEDU(edu, t.ServerName, joined); err != nil {
log.WithError(err).Error("failed to send EDU")
return false
}
return true
}

View File

@ -66,6 +66,7 @@ func AddPublicRoutes(
TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
ServerName: cfg.Matrix.ServerName,
UserAPI: userAPI,
}
@ -149,5 +150,11 @@ func NewInternalAPI(
logrus.WithError(err).Panic("failed to start key server consumer")
}
presenceConsumer := consumers.NewOutputPresenceConsumer(
base.ProcessContext, cfg, js, queues, federationDB,
)
if err = presenceConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start presence consumer")
}
return internal.NewFederationInternalAPI(federationDB, cfg, rsAPI, federation, stats, caches, queues, keyRing)
}

View File

@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"strconv"
"time"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
@ -32,6 +33,7 @@ type SyncAPIProducer struct {
TopicReceiptEvent string
TopicSendToDeviceEvent string
TopicTypingEvent string
TopicPresenceEvent string
JetStream nats.JetStreamContext
ServerName gomatrixserverlib.ServerName
UserAPI userapi.UserInternalAPI
@ -142,3 +144,20 @@ func (p *SyncAPIProducer) SendTyping(
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
return err
}
func (p *SyncAPIProducer) SendPresence(
ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveAgo int64,
) error {
m := nats.NewMsg(p.TopicPresenceEvent)
m.Header.Set(jetstream.UserID, userID)
m.Header.Set("presence", presence.String())
if statusMsg != nil {
m.Header.Set("status_msg", *statusMsg)
}
lastActiveTS := gomatrixserverlib.AsTimestamp(time.Now().Add(-(time.Duration(lastActiveAgo) * time.Millisecond)))
m.Header.Set("last_active_ts", strconv.Itoa(int(lastActiveTS)))
log.Debugf("Sending presence to syncAPI: %+v", m.Header)
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
return err
}

View File

@ -30,6 +30,7 @@ import (
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
syncTypes "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
@ -134,6 +135,7 @@ func Send(
keyAPI: keyAPI,
roomsMu: mu,
producer: producer,
inboundPresenceEnabled: cfg.Matrix.Presence.EnableInbound,
}
var txnEvents struct {
@ -192,6 +194,7 @@ type txnReq struct {
roomsMu *internal.MutexByRoom
servers federationAPI.ServersInRoomProvider
producer *producers.SyncAPIProducer
inboundPresenceEnabled bool
}
// A subset of FederationClient functionality that txn requires. Useful for testing.
@ -389,12 +392,37 @@ func (t *txnReq) processEDUs(ctx context.Context) {
if err := t.processSigningKeyUpdate(ctx, e); err != nil {
logrus.WithError(err).Errorf("Failed to process signing key update")
}
case gomatrixserverlib.MPresence:
if t.inboundPresenceEnabled {
if err := t.processPresence(ctx, e); err != nil {
logrus.WithError(err).Errorf("Failed to process presence update")
}
}
default:
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
}
}
}
// processPresence handles m.receipt events
func (t *txnReq) processPresence(ctx context.Context, e gomatrixserverlib.EDU) error {
payload := types.Presence{}
if err := json.Unmarshal(e.Content, &payload); err != nil {
return err
}
for _, content := range payload.Push {
presence, ok := syncTypes.PresenceFromString(content.Presence)
if !ok {
logrus.Warnf("invalid presence '%s', skipping.", content.Presence)
continue
}
if err := t.producer.SendPresence(ctx, content.UserID, presence, content.StatusMsg, content.LastActiveAgo); err != nil {
return err
}
}
return nil
}
func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverlib.EDU) error {
var updatePayload keyapi.CrossSigningKeyUpdate
if err := json.Unmarshal(e.Content, &updatePayload); err != nil {

View File

@ -66,3 +66,15 @@ type FederationReceiptData struct {
type ReceiptTS struct {
TS gomatrixserverlib.Timestamp `json:"ts"`
}
type Presence struct {
Push []PresenceContent `json:"push"`
}
type PresenceContent struct {
CurrentlyActive bool `json:"currently_active,omitempty"`
LastActiveAgo int64 `json:"last_active_ago"`
Presence string `json:"presence"`
StatusMsg *string `json:"status_msg,omitempty"`
UserID string `json:"user_id"`
}

View File

@ -41,6 +41,9 @@ type Global struct {
// to other servers and the federation API will not be exposed.
DisableFederation bool `yaml:"disable_federation"`
// Configures the handling of presence events.
Presence PresenceOptions `yaml:"presence"`
// List of domains that the server will trust as identity servers to
// verify third-party identifiers.
// Defaults to an empty array.
@ -225,3 +228,11 @@ func (c *DNSCacheOptions) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkPositive(configErrs, "cache_size", int64(c.CacheSize))
checkPositive(configErrs, "cache_lifetime", int64(c.CacheLifetime))
}
// PresenceOptions defines possible configurations for presence events.
type PresenceOptions struct {
// Whether inbound presence events are allowed
EnableInbound bool `yaml:"enable_inbound"`
// Whether outbound presence events are allowed
EnableOutbound bool `yaml:"enable_outbound"`
}

View File

@ -25,6 +25,8 @@ var (
OutputReceiptEvent = "OutputReceiptEvent"
OutputStreamEvent = "OutputStreamEvent"
OutputReadUpdate = "OutputReadUpdate"
RequestPresence = "GetPresence"
OutputPresenceEvent = "OutputPresenceEvent"
)
var safeCharacters = regexp.MustCompile("[^A-Za-z0-9$]+")
@ -89,4 +91,10 @@ var streams = []*nats.StreamConfig{
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{
Name: OutputPresenceEvent,
Retention: nats.InterestPolicy,
Storage: nats.MemoryStorage,
MaxAge: time.Minute * 5,
},
}

View File

@ -0,0 +1,158 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumers
import (
"context"
"strconv"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
// OutputTypingEventConsumer consumes events that originated in the EDU server.
type PresenceConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
nats *nats.Conn
durable string
requestTopic string
presenceTopic string
db storage.Database
stream types.StreamProvider
notifier *notifier.Notifier
deviceAPI api.UserDeviceAPI
cfg *config.SyncAPI
}
// NewPresenceConsumer creates a new PresenceConsumer.
// Call Start() to begin consuming events.
func NewPresenceConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
js nats.JetStreamContext,
nats *nats.Conn,
db storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
deviceAPI api.UserDeviceAPI,
) *PresenceConsumer {
return &PresenceConsumer{
ctx: process.Context(),
nats: nats,
jetstream: js,
durable: cfg.Matrix.JetStream.Durable("SyncAPIPresenceConsumer"),
presenceTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
requestTopic: cfg.Matrix.JetStream.Prefixed(jetstream.RequestPresence),
db: db,
notifier: notifier,
stream: stream,
deviceAPI: deviceAPI,
cfg: cfg,
}
}
// Start consuming typing events.
func (s *PresenceConsumer) Start() error {
// Normal NATS subscription, used by Request/Reply
_, err := s.nats.Subscribe(s.requestTopic, func(msg *nats.Msg) {
userID := msg.Header.Get(jetstream.UserID)
presence, err := s.db.GetPresence(context.Background(), userID)
m := &nats.Msg{
Header: nats.Header{},
}
if err != nil {
m.Header.Set("error", err.Error())
if err = msg.RespondMsg(m); err != nil {
logrus.WithError(err).Error("Unable to respond to messages")
}
return
}
deviceRes := api.QueryDevicesResponse{}
if err = s.deviceAPI.QueryDevices(s.ctx, &api.QueryDevicesRequest{UserID: userID}, &deviceRes); err != nil {
m.Header.Set("error", err.Error())
if err = msg.RespondMsg(m); err != nil {
logrus.WithError(err).Error("Unable to respond to messages")
}
return
}
for i := range deviceRes.Devices {
if int64(presence.LastActiveTS) < deviceRes.Devices[i].LastSeenTS {
presence.LastActiveTS = gomatrixserverlib.Timestamp(deviceRes.Devices[i].LastSeenTS)
}
}
m.Header.Set(jetstream.UserID, presence.UserID)
m.Header.Set("presence", presence.ClientFields.Presence)
m.Header.Set("status_msg", *presence.ClientFields.StatusMsg)
m.Header.Set("last_active_ts", strconv.Itoa(int(presence.LastActiveTS)))
if err = msg.RespondMsg(m); err != nil {
logrus.WithError(err).Error("Unable to respond to messages")
return
}
})
if err != nil {
return err
}
if !s.cfg.Matrix.Presence.EnableInbound && !s.cfg.Matrix.Presence.EnableOutbound {
return nil
}
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.presenceTopic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
)
}
func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
userID := msg.Header.Get(jetstream.UserID)
presence := msg.Header.Get("presence")
timestamp := msg.Header.Get("last_active_ts")
fromSync, _ := strconv.ParseBool(msg.Header.Get("from_sync"))
logrus.Debugf("syncAPI received presence event: %+v", msg.Header)
ts, err := strconv.Atoi(timestamp)
if err != nil {
return true
}
var statusMsg *string = nil
if data, ok := msg.Header["status_msg"]; ok && len(data) > 0 {
newMsg := msg.Header.Get("status_msg")
statusMsg = &newMsg
}
// OK is already checked, so no need to do it again
p, _ := types.PresenceFromString(presence)
pos, err := s.db.UpdatePresence(ctx, userID, p, statusMsg, gomatrixserverlib.Timestamp(ts), fromSync)
if err != nil {
return true
}
s.stream.Advance(pos)
s.notifier.OnNewPresence(types.StreamingToken{PresencePosition: pos}, userID)
return true
}

View File

@ -43,22 +43,30 @@ type Notifier struct {
userDeviceStreams map[string]map[string]*UserDeviceStream
// The last time we cleaned out stale entries from the userStreams map
lastCleanUpTime time.Time
// Protects roomIDToJoinedUsers and roomIDToPeekingDevices
mapLock *sync.RWMutex
}
// NewNotifier creates a new notifier set to the given sync position.
// In order for this to be of any use, the Notifier needs to be told all rooms and
// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
func NewNotifier(currPos types.StreamingToken) *Notifier {
func NewNotifier() *Notifier {
return &Notifier{
currPos: currPos,
roomIDToJoinedUsers: make(map[string]userIDSet),
roomIDToPeekingDevices: make(map[string]peekingDeviceSet),
userDeviceStreams: make(map[string]map[string]*UserDeviceStream),
streamLock: &sync.Mutex{},
mapLock: &sync.RWMutex{},
lastCleanUpTime: time.Now(),
}
}
// SetCurrentPosition sets the current streaming positions.
// This must be called directly after NewNotifier and initialising the streams.
func (n *Notifier) SetCurrentPosition(currPos types.StreamingToken) {
n.currPos = currPos
}
// OnNewEvent is called when a new event is received from the room server. Must only be
// called from a single goroutine, to avoid races between updates which could set the
// current sync position incorrectly.
@ -83,7 +91,7 @@ func (n *Notifier) OnNewEvent(
if ev != nil {
// Map this event's room_id to a list of joined users, and wake them up.
usersToNotify := n.joinedUsers(ev.RoomID())
usersToNotify := n.JoinedUsers(ev.RoomID())
// Map this event's room_id to a list of peeking devices, and wake them up.
peekingDevicesToNotify := n.PeekingDevices(ev.RoomID())
// If this is an invite, also add in the invitee to this list.
@ -114,7 +122,7 @@ func (n *Notifier) OnNewEvent(
n.wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos)
} else if roomID != "" {
n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), n.currPos)
n.wakeupUsers(n.JoinedUsers(roomID), n.PeekingDevices(roomID), n.currPos)
} else if len(userIDs) > 0 {
n.wakeupUsers(userIDs, nil, n.currPos)
} else {
@ -182,7 +190,7 @@ func (n *Notifier) OnNewTyping(
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
n.wakeupUsers(n.JoinedUsers(roomID), nil, n.currPos)
}
// OnNewReceipt updates the current position
@ -194,7 +202,7 @@ func (n *Notifier) OnNewReceipt(
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
n.wakeupUsers(n.JoinedUsers(roomID), nil, n.currPos)
}
func (n *Notifier) OnNewKeyChange(
@ -228,6 +236,28 @@ func (n *Notifier) OnNewNotificationData(
n.wakeupUsers([]string{userID}, nil, n.currPos)
}
func (n *Notifier) OnNewPresence(
posUpdate types.StreamingToken, userID string,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
sharedUsers := n.SharedUsers(userID)
sharedUsers = append(sharedUsers, userID)
n.wakeupUsers(sharedUsers, nil, n.currPos)
}
func (n *Notifier) SharedUsers(userID string) (sharedUsers []string) {
for roomID, users := range n.roomIDToJoinedUsers {
if _, ok := users[userID]; ok {
sharedUsers = append(sharedUsers, n.JoinedUsers(roomID)...)
}
}
return sharedUsers
}
// GetListener returns a UserStreamListener that can be used to wait for
// updates for a user. Must be closed.
// notify for anything before sincePos
@ -250,6 +280,8 @@ func (n *Notifier) GetListener(req types.SyncRequest) UserDeviceStreamListener {
// Load the membership states required to notify users correctly.
func (n *Notifier) Load(ctx context.Context, db storage.Database) error {
n.mapLock.Lock()
defer n.mapLock.Unlock()
roomToUsers, err := db.AllJoinedUsersInRooms(ctx)
if err != nil {
return err
@ -377,6 +409,8 @@ func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream {
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) addJoinedUser(roomID, userID string) {
n.mapLock.Lock()
defer n.mapLock.Unlock()
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
}
@ -385,6 +419,8 @@ func (n *Notifier) addJoinedUser(roomID, userID string) {
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) removeJoinedUser(roomID, userID string) {
n.mapLock.Lock()
defer n.mapLock.Unlock()
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
}
@ -392,7 +428,9 @@ func (n *Notifier) removeJoinedUser(roomID, userID string) {
}
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) joinedUsers(roomID string) (userIDs []string) {
func (n *Notifier) JoinedUsers(roomID string) (userIDs []string) {
n.mapLock.RLock()
defer n.mapLock.RUnlock()
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
return
}
@ -401,6 +439,8 @@ func (n *Notifier) joinedUsers(roomID string) (userIDs []string) {
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) {
n.mapLock.Lock()
defer n.mapLock.Unlock()
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
}
@ -410,6 +450,8 @@ func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) {
// Not thread-safe: must be called on the OnNewEvent goroutine only
// nolint:unused
func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) {
n.mapLock.Lock()
defer n.mapLock.Unlock()
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
}
@ -419,6 +461,8 @@ func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) {
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice) {
n.mapLock.RLock()
defer n.mapLock.RUnlock()
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
return
}

View File

@ -107,7 +107,8 @@ func mustEqualPositions(t *testing.T, got, want types.StreamingToken) {
// Test that the current position is returned if a request is already behind.
func TestImmediateNotification(t *testing.T) {
n := NewNotifier(syncPositionBefore)
n := NewNotifier()
n.SetCurrentPosition(syncPositionBefore)
pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionVeryOld))
if err != nil {
t.Fatalf("TestImmediateNotification error: %s", err)
@ -117,7 +118,8 @@ func TestImmediateNotification(t *testing.T) {
// Test that new events to a joined room unblocks the request.
func TestNewEventAndJoinedToRoom(t *testing.T) {
n := NewNotifier(syncPositionBefore)
n := NewNotifier()
n.SetCurrentPosition(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@ -142,7 +144,8 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
}
func TestCorrectStream(t *testing.T) {
n := NewNotifier(syncPositionBefore)
n := NewNotifier()
n.SetCurrentPosition(syncPositionBefore)
stream := lockedFetchUserStream(n, bob, bobDev)
if stream.UserID != bob {
t.Fatalf("expected user %q, got %q", bob, stream.UserID)
@ -153,7 +156,8 @@ func TestCorrectStream(t *testing.T) {
}
func TestCorrectStreamWakeup(t *testing.T) {
n := NewNotifier(syncPositionBefore)
n := NewNotifier()
n.SetCurrentPosition(syncPositionBefore)
awoken := make(chan string)
streamone := lockedFetchUserStream(n, alice, "one")
@ -180,7 +184,8 @@ func TestCorrectStreamWakeup(t *testing.T) {
// Test that an invite unblocks the request
func TestNewInviteEventForUser(t *testing.T) {
n := NewNotifier(syncPositionBefore)
n := NewNotifier()
n.SetCurrentPosition(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@ -236,7 +241,8 @@ func TestEDUWakeup(t *testing.T) {
// Test that all blocked requests get woken up on a new event.
func TestMultipleRequestWakeup(t *testing.T) {
n := NewNotifier(syncPositionBefore)
n := NewNotifier()
n.SetCurrentPosition(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@ -272,7 +278,8 @@ func TestMultipleRequestWakeup(t *testing.T) {
func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
// listen as bob. Make bob leave room. Make alice send event to room.
// Make sure alice gets woken up only and not bob as well.
n := NewNotifier(syncPositionBefore)
n := NewNotifier()
n.SetCurrentPosition(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})

View File

@ -0,0 +1,48 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package producers
import (
"strconv"
"time"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
)
// FederationAPIPresenceProducer produces events for the federation API server to consume
type FederationAPIPresenceProducer struct {
Topic string
JetStream nats.JetStreamContext
}
func (f *FederationAPIPresenceProducer) SendPresence(
userID string, presence types.Presence, statusMsg *string,
) error {
msg := nats.NewMsg(f.Topic)
msg.Header.Set(jetstream.UserID, userID)
msg.Header.Set("presence", presence.String())
msg.Header.Set("from_sync", "true") // only update last_active_ts and presence
msg.Header.Set("last_active_ts", strconv.Itoa(int(gomatrixserverlib.AsTimestamp(time.Now()))))
if statusMsg != nil {
msg.Header.Set("status_msg", *statusMsg)
}
_, err := f.JetStream.PublishMsg(msg)
return err
}

View File

@ -26,6 +26,7 @@ import (
)
type Database interface {
Presence
MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error)
@ -150,3 +151,10 @@ type Database interface {
StreamToTopologicalPosition(ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool) (types.TopologyToken, error)
}
type Presence interface {
UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error)
GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error)
PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error)
MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error)
}

View File

@ -0,0 +1,162 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
const presenceSchema = `
CREATE SEQUENCE IF NOT EXISTS syncapi_presence_id;
-- Stores data about presence
CREATE TABLE IF NOT EXISTS syncapi_presence (
-- The ID
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_presence_id'),
-- The Matrix user ID
user_id TEXT NOT NULL,
-- The actual presence
presence INT NOT NULL,
-- The status message
status_msg TEXT,
-- The last time an action was received by this user
last_active_ts BIGINT NOT NULL,
CONSTRAINT presence_presences_unique UNIQUE (user_id)
);
CREATE INDEX IF NOT EXISTS syncapi_presence_user_id ON syncapi_presence(user_id);
`
const upsertPresenceSQL = "" +
"INSERT INTO syncapi_presence AS p" +
" (user_id, presence, status_msg, last_active_ts)" +
" VALUES ($1, $2, $3, $4)" +
" ON CONFLICT (user_id)" +
" DO UPDATE SET id = nextval('syncapi_presence_id')," +
" presence = $2, status_msg = COALESCE($3, p.status_msg), last_active_ts = $4" +
" RETURNING id"
const upsertPresenceFromSyncSQL = "" +
"INSERT INTO syncapi_presence AS p" +
" (user_id, presence, last_active_ts)" +
" VALUES ($1, $2, $3)" +
" ON CONFLICT (user_id)" +
" DO UPDATE SET id = nextval('syncapi_presence_id')," +
" presence = $2, last_active_ts = $3" +
" RETURNING id"
const selectPresenceForUserSQL = "" +
"SELECT presence, status_msg, last_active_ts" +
" FROM syncapi_presence" +
" WHERE user_id = $1 LIMIT 1"
const selectMaxPresenceSQL = "" +
"SELECT COALESCE(MAX(id), 0) FROM syncapi_presence"
const selectPresenceAfter = "" +
" SELECT id, user_id, presence, status_msg, last_active_ts" +
" FROM syncapi_presence" +
" WHERE id > $1"
type presenceStatements struct {
upsertPresenceStmt *sql.Stmt
upsertPresenceFromSyncStmt *sql.Stmt
selectPresenceForUsersStmt *sql.Stmt
selectMaxPresenceStmt *sql.Stmt
selectPresenceAfterStmt *sql.Stmt
}
func NewPostgresPresenceTable(db *sql.DB) (*presenceStatements, error) {
_, err := db.Exec(presenceSchema)
if err != nil {
return nil, err
}
s := &presenceStatements{}
return s, sqlutil.StatementList{
{&s.upsertPresenceStmt, upsertPresenceSQL},
{&s.upsertPresenceFromSyncStmt, upsertPresenceFromSyncSQL},
{&s.selectPresenceForUsersStmt, selectPresenceForUserSQL},
{&s.selectMaxPresenceStmt, selectMaxPresenceSQL},
{&s.selectPresenceAfterStmt, selectPresenceAfter},
}.Prepare(db)
}
// UpsertPresence creates/updates a presence status.
func (p *presenceStatements) UpsertPresence(
ctx context.Context,
txn *sql.Tx,
userID string,
statusMsg *string,
presence types.Presence,
lastActiveTS gomatrixserverlib.Timestamp,
fromSync bool,
) (pos types.StreamPosition, err error) {
if fromSync {
stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt)
err = stmt.QueryRowContext(ctx, userID, presence, lastActiveTS).Scan(&pos)
} else {
stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt)
err = stmt.QueryRowContext(ctx, userID, presence, statusMsg, lastActiveTS).Scan(&pos)
}
return
}
// GetPresenceForUser returns the current presence of a user.
func (p *presenceStatements) GetPresenceForUser(
ctx context.Context, txn *sql.Tx,
userID string,
) (*types.PresenceInternal, error) {
result := &types.PresenceInternal{
UserID: userID,
}
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
result.ClientFields.Presence = result.Presence.String()
return result, err
}
func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
stmt := sqlutil.TxStmt(txn, p.selectMaxPresenceStmt)
err = stmt.QueryRowContext(ctx).Scan(&pos)
return
}
// GetPresenceAfter returns the changes presences after a given stream id
func (p *presenceStatements) GetPresenceAfter(
ctx context.Context, txn *sql.Tx,
after types.StreamPosition,
) (presences map[string]*types.PresenceInternal, err error) {
presences = make(map[string]*types.PresenceInternal)
stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt)
rows, err := stmt.QueryContext(ctx, after)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows")
for rows.Next() {
qryRes := &types.PresenceInternal{}
if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil {
return nil, err
}
qryRes.ClientFields.Presence = qryRes.Presence.String()
presences[qryRes.UserID] = qryRes
}
return presences, rows.Err()
}

View File

@ -90,6 +90,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
if err != nil {
return nil, err
}
presence, err := NewPostgresPresenceTable(d.db)
if err != nil {
return nil, err
}
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
@ -111,6 +115,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
Receipts: receipts,
Memberships: memberships,
NotificationData: notificationData,
Presence: presence,
}
return &d, nil
}

View File

@ -48,6 +48,7 @@ type Database struct {
Receipts tables.Receipts
Memberships tables.Memberships
NotificationData tables.NotificationData
Presence tables.Presence
}
func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
@ -1002,3 +1003,19 @@ func (s *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID
func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) {
return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
}
func (s *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
return s.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync)
}
func (s *Database) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
return s.Presence.GetPresenceForUser(ctx, nil, userID)
}
func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error) {
return s.Presence.GetPresenceAfter(ctx, nil, after)
}
func (s *Database) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) {
return s.Presence.GetMaxPresenceID(ctx, nil)
}

View File

@ -0,0 +1,177 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
const presenceSchema = `
-- Stores data about presence
CREATE TABLE IF NOT EXISTS syncapi_presence (
-- The ID
id BIGINT NOT NULL,
-- The Matrix user ID
user_id TEXT NOT NULL,
-- The actual presence
presence INT NOT NULL,
-- The status message
status_msg TEXT,
-- The last time an action was received by this user
last_active_ts BIGINT NOT NULL,
CONSTRAINT presence_presences_unique UNIQUE (user_id)
);
CREATE INDEX IF NOT EXISTS syncapi_presence_user_id ON syncapi_presence(user_id);
`
const upsertPresenceSQL = "" +
"INSERT INTO syncapi_presence AS p" +
" (id, user_id, presence, status_msg, last_active_ts)" +
" VALUES ($1, $2, $3, $4, $5)" +
" ON CONFLICT (user_id)" +
" DO UPDATE SET id = $6, " +
" presence = $7, status_msg = COALESCE($8, p.status_msg), last_active_ts = $9" +
" RETURNING id"
const upsertPresenceFromSyncSQL = "" +
"INSERT INTO syncapi_presence AS p" +
" (id, user_id, presence, last_active_ts)" +
" VALUES ($1, $2, $3, $4)" +
" ON CONFLICT (user_id)" +
" DO UPDATE SET id = $5, " +
" presence = $6, last_active_ts = $7" +
" RETURNING id"
const selectPresenceForUserSQL = "" +
"SELECT presence, status_msg, last_active_ts" +
" FROM syncapi_presence" +
" WHERE user_id = $1 LIMIT 1"
const selectMaxPresenceSQL = "" +
"SELECT COALESCE(MAX(id), 0) FROM syncapi_presence"
const selectPresenceAfter = "" +
" SELECT id, user_id, presence, status_msg, last_active_ts" +
" FROM syncapi_presence" +
" WHERE id > $1"
type presenceStatements struct {
db *sql.DB
streamIDStatements *streamIDStatements
upsertPresenceStmt *sql.Stmt
upsertPresenceFromSyncStmt *sql.Stmt
selectPresenceForUsersStmt *sql.Stmt
selectMaxPresenceStmt *sql.Stmt
selectPresenceAfterStmt *sql.Stmt
}
func NewSqlitePresenceTable(db *sql.DB, streamID *streamIDStatements) (*presenceStatements, error) {
_, err := db.Exec(presenceSchema)
if err != nil {
return nil, err
}
s := &presenceStatements{
db: db,
streamIDStatements: streamID,
}
return s, sqlutil.StatementList{
{&s.upsertPresenceStmt, upsertPresenceSQL},
{&s.upsertPresenceFromSyncStmt, upsertPresenceFromSyncSQL},
{&s.selectPresenceForUsersStmt, selectPresenceForUserSQL},
{&s.selectMaxPresenceStmt, selectMaxPresenceSQL},
{&s.selectPresenceAfterStmt, selectPresenceAfter},
}.Prepare(db)
}
// UpsertPresence creates/updates a presence status.
func (p *presenceStatements) UpsertPresence(
ctx context.Context,
txn *sql.Tx,
userID string,
statusMsg *string,
presence types.Presence,
lastActiveTS gomatrixserverlib.Timestamp,
fromSync bool,
) (pos types.StreamPosition, err error) {
pos, err = p.streamIDStatements.nextPresenceID(ctx, txn)
if err != nil {
return pos, err
}
if fromSync {
stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt)
err = stmt.QueryRowContext(ctx,
pos, userID, presence,
lastActiveTS, pos,
presence, lastActiveTS).Scan(&pos)
} else {
stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt)
err = stmt.QueryRowContext(ctx,
pos, userID, presence,
statusMsg, lastActiveTS, pos,
presence, statusMsg, lastActiveTS).Scan(&pos)
}
return
}
// GetPresenceForUser returns the current presence of a user.
func (p *presenceStatements) GetPresenceForUser(
ctx context.Context, txn *sql.Tx,
userID string,
) (*types.PresenceInternal, error) {
result := &types.PresenceInternal{
UserID: userID,
}
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
result.ClientFields.Presence = result.Presence.String()
return result, err
}
func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
stmt := sqlutil.TxStmt(txn, p.selectMaxPresenceStmt)
err = stmt.QueryRowContext(ctx).Scan(&pos)
return
}
// GetPresenceAfter returns the changes presences after a given stream id
func (p *presenceStatements) GetPresenceAfter(
ctx context.Context, txn *sql.Tx,
after types.StreamPosition,
) (presences map[string]*types.PresenceInternal, err error) {
presences = make(map[string]*types.PresenceInternal)
stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt)
rows, err := stmt.QueryContext(ctx, after)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows")
for rows.Next() {
qryRes := &types.PresenceInternal{}
if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil {
return nil, err
}
qryRes.ClientFields.Presence = qryRes.Presence.String()
presences[qryRes.UserID] = qryRes
}
return presences, rows.Err()
}

View File

@ -24,6 +24,8 @@ INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("accountdata", 0)
ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("invite", 0)
ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("presence", 0)
ON CONFLICT DO NOTHING;
`
const increaseStreamIDStmt = "" +
@ -70,3 +72,9 @@ func (s *streamIDStatements) nextAccountDataID(ctx context.Context, txn *sql.Tx)
err = increaseStmt.QueryRowContext(ctx, "accountdata").Scan(&pos)
return
}
func (s *streamIDStatements) nextPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
err = increaseStmt.QueryRowContext(ctx, "presence").Scan(&pos)
return
}

View File

@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
if err != nil {
return err
}
presence, err := NewSqlitePresenceTable(d.db, &d.streamID)
if err != nil {
return err
}
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
@ -121,6 +125,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
Receipts: receipts,
Memberships: memberships,
NotificationData: notificationData,
Presence: presence,
}
return nil
}

View File

@ -182,3 +182,10 @@ type NotificationData interface {
SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error)
SelectMaxID(ctx context.Context) (int64, error)
}
type Presence interface {
UpsertPresence(ctx context.Context, txn *sql.Tx, userID string, statusMsg *string, presence types.Presence, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (pos types.StreamPosition, err error)
GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.PresenceInternal, err error)
GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error)
GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition) (presences map[string]*types.PresenceInternal, err error)
}

View File

@ -0,0 +1,179 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package streams
import (
"context"
"database/sql"
"encoding/json"
"sync"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
type PresenceStreamProvider struct {
StreamProvider
// cache contains previously sent presence updates to avoid unneeded updates
cache sync.Map
notifier *notifier.Notifier
}
func (p *PresenceStreamProvider) Setup() {
p.StreamProvider.Setup()
id, err := p.DB.MaxStreamPositionForPresence(context.Background())
if err != nil {
panic(err)
}
p.latest = id
}
func (p *PresenceStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
}
func (p *PresenceStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
presences, err := p.DB.PresenceAfter(ctx, from)
if err != nil {
req.Log.WithError(err).Error("p.DB.PresenceAfter failed")
return from
}
if len(presences) == 0 {
return to
}
// get all joined users
// TODO: SharedUsers might get out of sync
sharedUsers := p.notifier.SharedUsers(req.Device.UserID)
sharedUsersMap := map[string]bool{
req.Device.UserID: true,
}
// convert array to a map for easier checking if a user exists
for i := range sharedUsers {
sharedUsersMap[sharedUsers[i]] = true
}
// add newly joined rooms user presences
newlyJoined := joinedRooms(req.Response, req.Device.UserID)
if len(newlyJoined) > 0 {
// TODO: This refreshes all lists and is quite expensive
// The notifier should update the lists itself
if err = p.notifier.Load(ctx, p.DB); err != nil {
req.Log.WithError(err).Error("unable to refresh notifier lists")
return from
}
for _, roomID := range newlyJoined {
roomUsers := p.notifier.JoinedUsers(roomID)
for i := range roomUsers {
sharedUsersMap[roomUsers[i]] = true
// we already got a presence from this user
if _, ok := presences[roomUsers[i]]; ok {
continue
}
presences[roomUsers[i]], err = p.DB.GetPresence(ctx, roomUsers[i])
if err != nil {
if err == sql.ErrNoRows {
continue
}
req.Log.WithError(err).Error("unable to query presence for user")
return from
}
}
}
}
lastPos := to
for i := range presences {
presence := presences[i]
// Ignore users we don't share a room with
if !sharedUsersMap[presence.UserID] {
continue
}
cacheKey := req.Device.UserID + req.Device.ID + presence.UserID
pres, ok := p.cache.Load(cacheKey)
if ok {
// skip already sent presence
prevPresence := pres.(*types.PresenceInternal)
currentlyActive := prevPresence.CurrentlyActive()
skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID
if skip {
req.Log.Debugf("Skipping presence, no change (%s)", presence.UserID)
continue
}
}
presence.ClientFields.LastActiveAgo = presence.LastActiveAgo()
if presence.ClientFields.Presence == "online" {
currentlyActive := presence.CurrentlyActive()
presence.ClientFields.CurrentlyActive = &currentlyActive
}
content, err := json.Marshal(presence.ClientFields)
if err != nil {
return from
}
req.Response.Presence.Events = append(req.Response.Presence.Events, gomatrixserverlib.ClientEvent{
Content: content,
Sender: presence.UserID,
Type: gomatrixserverlib.MPresence,
})
if presence.StreamPos > lastPos {
lastPos = presence.StreamPos
}
p.cache.Store(cacheKey, presence)
}
return lastPos
}
func joinedRooms(res *types.Response, userID string) []string {
var roomIDs []string
for roomID, join := range res.Rooms.Join {
// we would expect to see our join event somewhere if we newly joined the room.
// Normal events get put in the join section so it's not enough to know the room ID is present in 'join'.
newlyJoined := membershipEventPresent(join.State.Events, userID)
if newlyJoined {
roomIDs = append(roomIDs, roomID)
continue
}
newlyJoined = membershipEventPresent(join.Timeline.Events, userID)
if newlyJoined {
roomIDs = append(roomIDs, roomID)
}
}
return roomIDs
}
func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool {
for _, ev := range events {
// it's enough to know that we have our member event here, don't need to check membership content
// as it's implied by being in the respective section of the sync response.
if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
return true
}
}
return false
}

View File

@ -6,6 +6,7 @@ import (
"github.com/matrix-org/dendrite/internal/caching"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
@ -20,12 +21,13 @@ type Streams struct {
AccountDataStreamProvider types.StreamProvider
DeviceListStreamProvider types.StreamProvider
NotificationDataStreamProvider types.StreamProvider
PresenceStreamProvider types.StreamProvider
}
func NewSyncStreamProviders(
d storage.Database, userAPI userapi.UserInternalAPI,
rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI,
eduCache *caching.EDUCache,
eduCache *caching.EDUCache, notifier *notifier.Notifier,
) *Streams {
streams := &Streams{
PDUStreamProvider: &PDUStreamProvider{
@ -56,6 +58,10 @@ func NewSyncStreamProviders(
rsAPI: rsAPI,
keyAPI: keyAPI,
},
PresenceStreamProvider: &PresenceStreamProvider{
StreamProvider: StreamProvider{DB: d},
notifier: notifier,
},
}
streams.PDUStreamProvider.Setup()
@ -66,6 +72,7 @@ func NewSyncStreamProviders(
streams.AccountDataStreamProvider.Setup()
streams.NotificationDataStreamProvider.Setup()
streams.DeviceListStreamProvider.Setup()
streams.PresenceStreamProvider.Setup()
return streams
}
@ -80,5 +87,6 @@ func (s *Streams) Latest(ctx context.Context) types.StreamingToken {
AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx),
NotificationDataPosition: s.NotificationDataStreamProvider.LatestPosition(ctx),
DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx),
PresencePosition: s.PresenceStreamProvider.LatestPosition(ctx),
}
}

View File

@ -17,6 +17,8 @@
package sync
import (
"context"
"database/sql"
"net"
"net/http"
"strings"
@ -33,8 +35,10 @@ import (
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
// RequestPool manages HTTP long-poll connections for /sync
@ -44,9 +48,15 @@ type RequestPool struct {
userAPI userapi.UserInternalAPI
keyAPI keyapi.KeyInternalAPI
rsAPI roomserverAPI.RoomserverInternalAPI
lastseen sync.Map
lastseen *sync.Map
presence *sync.Map
streams *streams.Streams
Notifier *notifier.Notifier
producer PresencePublisher
}
type PresencePublisher interface {
SendPresence(userID string, presence types.Presence, statusMsg *string) error
}
// NewRequestPool makes a new RequestPool
@ -55,6 +65,7 @@ func NewRequestPool(
userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
streams *streams.Streams, notifier *notifier.Notifier,
producer PresencePublisher,
) *RequestPool {
rp := &RequestPool{
db: db,
@ -62,11 +73,14 @@ func NewRequestPool(
userAPI: userAPI,
keyAPI: keyAPI,
rsAPI: rsAPI,
lastseen: sync.Map{},
lastseen: &sync.Map{},
presence: &sync.Map{},
streams: streams,
Notifier: notifier,
producer: producer,
}
go rp.cleanLastSeen()
go rp.cleanPresence(db, time.Minute*5)
return rp
}
@ -80,6 +94,68 @@ func (rp *RequestPool) cleanLastSeen() {
}
}
func (rp *RequestPool) cleanPresence(db storage.Presence, cleanupTime time.Duration) {
if !rp.cfg.Matrix.Presence.EnableOutbound {
return
}
for {
rp.presence.Range(func(key interface{}, v interface{}) bool {
p := v.(types.PresenceInternal)
if time.Since(p.LastActiveTS.Time()) > cleanupTime {
rp.updatePresence(db, types.PresenceUnavailable.String(), p.UserID)
rp.presence.Delete(key)
}
return true
})
time.Sleep(cleanupTime)
}
}
// updatePresence sends presence updates to the SyncAPI and FederationAPI
func (rp *RequestPool) updatePresence(db storage.Presence, presence string, userID string) {
if !rp.cfg.Matrix.Presence.EnableOutbound {
return
}
if presence == "" {
presence = types.PresenceOnline.String()
}
presenceID, ok := types.PresenceFromString(presence)
if !ok { // this should almost never happen
logrus.Errorf("unknown presence '%s'", presence)
return
}
newPresence := types.PresenceInternal{
ClientFields: types.PresenceClientResponse{
Presence: presenceID.String(),
},
Presence: presenceID,
UserID: userID,
LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()),
}
defer rp.presence.Store(userID, newPresence)
// avoid spamming presence updates when syncing
existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence)
if ok {
p := existingPresence.(types.PresenceInternal)
if p.ClientFields.Presence == newPresence.ClientFields.Presence {
return
}
}
// ensure we also send the current status_msg to federated servers and not nil
dbPresence, err := db.GetPresence(context.Background(), userID)
if err != nil && err != sql.ErrNoRows {
return
}
if err := rp.producer.SendPresence(userID, presenceID, dbPresence.ClientFields.StatusMsg); err != nil {
logrus.WithError(err).Error("Unable to publish presence message from sync")
return
}
}
func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device) {
if _, ok := rp.lastseen.LoadOrStore(device.UserID+device.ID, struct{}{}); ok {
return
@ -156,6 +232,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
defer activeSyncRequests.Dec()
rp.updateLastSeen(req, device)
rp.updatePresence(rp.db, req.FormValue("set_presence"), device.UserID)
waitingSyncRequests.Inc()
defer waitingSyncRequests.Dec()
@ -219,6 +296,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
PresencePosition: rp.streams.PresenceStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
}
} else {
// Incremental sync
@ -255,6 +335,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
syncReq.Context, syncReq,
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
),
PresencePosition: rp.streams.PresenceStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.PresencePosition, currentPos.PresencePosition,
),
}
}

View File

@ -0,0 +1,128 @@
package sync
import (
"context"
"sync"
"testing"
"time"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
type dummyPublisher struct {
count int
}
func (d *dummyPublisher) SendPresence(userID string, presence types.Presence, statusMsg *string) error {
d.count++
return nil
}
type dummyDB struct{}
func (d dummyDB) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
return 0, nil
}
func (d dummyDB) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
return &types.PresenceInternal{}, nil
}
func (d dummyDB) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error) {
return map[string]*types.PresenceInternal{}, nil
}
func (d dummyDB) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) {
return 0, nil
}
func TestRequestPool_updatePresence(t *testing.T) {
type args struct {
presence string
userID string
sleep time.Duration
}
publisher := &dummyPublisher{}
syncMap := sync.Map{}
tests := []struct {
name string
args args
wantIncrease bool
}{
{
name: "new presence is published",
wantIncrease: true,
args: args{
userID: "dummy",
},
},
{
name: "presence not published, no change",
args: args{
userID: "dummy",
},
},
{
name: "new presence is published dummy2",
wantIncrease: true,
args: args{
userID: "dummy2",
presence: "online",
},
},
{
name: "different presence is published dummy2",
wantIncrease: true,
args: args{
userID: "dummy2",
presence: "unavailable",
},
},
{
name: "same presence is not published dummy2",
args: args{
userID: "dummy2",
presence: "unavailable",
sleep: time.Millisecond * 150,
},
},
{
name: "same presence is published after being deleted",
wantIncrease: true,
args: args{
userID: "dummy2",
presence: "unavailable",
},
},
}
rp := &RequestPool{
presence: &syncMap,
producer: publisher,
cfg: &config.SyncAPI{
Matrix: &config.Global{
JetStream: config.JetStream{
TopicPrefix: "Dendrite",
},
Presence: config.PresenceOptions{
EnableInbound: true,
EnableOutbound: true,
},
},
},
}
db := dummyDB{}
go rp.cleanPresence(db, time.Millisecond*50)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
beforeCount := publisher.count
rp.updatePresence(db, tt.args.presence, tt.args.userID)
if tt.wantIncrease && publisher.count <= beforeCount {
t.Fatalf("expected count to increase: %d <= %d", publisher.count, beforeCount)
}
time.Sleep(tt.args.sleep)
})
}
}

View File

@ -49,7 +49,7 @@ func AddPublicRoutes(
federation *gomatrixserverlib.FederationClient,
cfg *config.SyncAPI,
) {
js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream)
js, natsClient := jetstream.Prepare(process, &cfg.Matrix.JetStream)
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database)
if err != nil {
@ -57,13 +57,19 @@ func AddPublicRoutes(
}
eduCache := caching.NewTypingCache()
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache)
notifier := notifier.NewNotifier(streams.Latest(context.Background()))
notifier := notifier.NewNotifier()
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, notifier)
notifier.SetCurrentPosition(streams.Latest(context.Background()))
if err = notifier.Load(context.Background(), syncDB); err != nil {
logrus.WithError(err).Panicf("failed to load notifier ")
}
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
federationPresenceProducer := &producers.FederationAPIPresenceProducer{
Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
JetStream: js,
}
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer)
userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{
JetStream: js,
@ -75,8 +81,6 @@ func AddPublicRoutes(
Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate),
}
_ = userAPIReadUpdateProducer
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
process, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
js, keyAPI, rsAPI, syncDB, notifier,
@ -131,5 +135,14 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start receipts consumer")
}
presenceConsumer := consumers.NewPresenceConsumer(
process, cfg, js, natsClient, syncDB,
notifier, streams.PresenceStreamProvider,
userAPI,
)
if err = presenceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start presence consumer")
}
routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg)
}

75
syncapi/types/presence.go Normal file
View File

@ -0,0 +1,75 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"strings"
"time"
"github.com/matrix-org/gomatrixserverlib"
)
//go:generate stringer -type=Presence -linecomment
type Presence uint8
const (
PresenceUnavailable Presence = iota + 1 // unavailable
PresenceOnline // online
PresenceOffline // offline
)
// PresenceFromString returns the integer representation of the given input presence.
// Returns false for ok, if input is not a valid presence value.
func PresenceFromString(input string) (p Presence, ok bool) {
for i := 0; i < len(_Presence_index)-1; i++ {
l, r := _Presence_index[i], _Presence_index[i+1]
if strings.EqualFold(input, _Presence_name[l:r]) {
return Presence(i + 1), true
}
}
return 0, false
}
type PresenceInternal struct {
ClientFields PresenceClientResponse
StreamPos StreamPosition `json:"-"`
UserID string `json:"-"`
LastActiveTS gomatrixserverlib.Timestamp `json:"-"`
Presence Presence `json:"-"`
}
// Equals compares p1 with p2.
func (p1 *PresenceInternal) Equals(p2 *PresenceInternal) bool {
return p1.ClientFields.Presence == p2.ClientFields.Presence &&
p1.ClientFields.StatusMsg == p2.ClientFields.StatusMsg &&
p1.UserID == p2.UserID
}
// CurrentlyActive returns the current active state.
func (p *PresenceInternal) CurrentlyActive() bool {
return time.Since(p.LastActiveTS.Time()).Minutes() < 5
}
// LastActiveAgo returns the time since the LastActiveTS in milliseconds.
func (p *PresenceInternal) LastActiveAgo() int64 {
return time.Since(p.LastActiveTS.Time()).Milliseconds()
}
type PresenceClientResponse struct {
CurrentlyActive *bool `json:"currently_active,omitempty"`
LastActiveAgo int64 `json:"last_active_ago,omitempty"`
Presence string `json:"presence"`
StatusMsg *string `json:"status_msg,omitempty"`
}

View File

@ -0,0 +1,26 @@
// Code generated by "stringer -type=Presence -linecomment"; DO NOT EDIT.
package types
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[PresenceUnavailable-1]
_ = x[PresenceOnline-2]
_ = x[PresenceOffline-3]
}
const _Presence_name = "unavailableonlineoffline"
var _Presence_index = [...]uint8{0, 11, 17, 24}
func (i Presence) String() string {
i -= 1
if i >= Presence(len(_Presence_index)-1) {
return "Presence(" + strconv.FormatInt(int64(i+1), 10) + ")"
}
return _Presence_name[_Presence_index[i]:_Presence_index[i+1]]
}

View File

@ -0,0 +1,42 @@
package types
import "testing"
func TestPresenceFromString(t *testing.T) {
tests := []struct {
name string
input string
wantStatus Presence
wantOk bool
}{
{
name: "presence unavailable",
input: "unavailable",
wantStatus: PresenceUnavailable,
wantOk: true,
},
{
name: "presence online",
input: "OnLINE",
wantStatus: PresenceOnline,
wantOk: true,
},
{
name: "unknown presence",
input: "unknown",
wantStatus: 0,
wantOk: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, got1 := PresenceFromString(tt.input)
if got != tt.wantStatus {
t.Errorf("PresenceFromString() got = %v, want %v", got, tt.wantStatus)
}
if got1 != tt.wantOk {
t.Errorf("PresenceFromString() got1 = %v, want %v", got1, tt.wantOk)
}
})
}
}

View File

@ -103,6 +103,7 @@ type StreamingToken struct {
AccountDataPosition StreamPosition
DeviceListPosition StreamPosition
NotificationDataPosition StreamPosition
PresencePosition StreamPosition
}
// This will be used as a fallback by json.Marshal.
@ -118,11 +119,12 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) {
func (t StreamingToken) String() string {
posStr := fmt.Sprintf(
"s%d_%d_%d_%d_%d_%d_%d_%d",
"s%d_%d_%d_%d_%d_%d_%d_%d_%d",
t.PDUPosition, t.TypingPosition,
t.ReceiptPosition, t.SendToDevicePosition,
t.InvitePosition, t.AccountDataPosition,
t.DeviceListPosition, t.NotificationDataPosition,
t.PresencePosition,
)
return posStr
}
@ -146,12 +148,14 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
return true
case t.NotificationDataPosition > other.NotificationDataPosition:
return true
case t.PresencePosition > other.PresencePosition:
return true
}
return false
}
func (t *StreamingToken) IsEmpty() bool {
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition == 0
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition+t.PresencePosition == 0
}
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
@ -192,6 +196,9 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) {
if other.NotificationDataPosition > t.NotificationDataPosition {
t.NotificationDataPosition = other.NotificationDataPosition
}
if other.PresencePosition > t.PresencePosition {
t.PresencePosition = other.PresencePosition
}
}
type TopologyToken struct {
@ -284,7 +291,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
// s478_0_0_0_0_13.dl-0-2 but we have now removed partitioned stream positions
tok = strings.Split(tok, ".")[0]
parts := strings.Split(tok[1:], "_")
var positions [8]StreamPosition
var positions [9]StreamPosition
for i, p := range parts {
if i >= len(positions) {
break
@ -306,6 +313,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
AccountDataPosition: positions[5],
DeviceListPosition: positions[6],
NotificationDataPosition: positions[7],
PresencePosition: positions[8],
}
return token, nil
}

View File

@ -9,9 +9,9 @@ import (
func TestSyncTokens(t *testing.T) {
shouldPass := map[string]string{
"s4_0_0_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0}.String(),
"s3_1_0_0_0_0_2_0": StreamingToken{3, 1, 0, 0, 0, 0, 2, 0}.String(),
"s3_1_2_3_5_0_0_0": StreamingToken{3, 1, 2, 3, 5, 0, 0, 0}.String(),
"s4_0_0_0_0_0_0_0_3": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0, 3}.String(),
"s3_1_0_0_0_0_2_0_5": StreamingToken{3, 1, 0, 0, 0, 0, 2, 0, 5}.String(),
"s3_1_2_3_5_0_0_0_6": StreamingToken{3, 1, 2, 3, 5, 0, 0, 0, 6}.String(),
"t3_1": TopologyToken{3, 1}.String(),
}

View File

@ -679,6 +679,20 @@ remote user has push rules copied to upgraded room
/upgrade of a bogus room fails gracefully
Cannot send tombstone event that points to the same room
Room summary counts change when membership changes
GET /presence/:user_id/status fetches initial status
PUT /presence/:user_id/status updates my presence
Presence change reports an event to myself
Existing members see new members' presence
#Existing members see new member's presence
Newly joined room includes presence in incremental sync
Get presence for newly joined members in incremental sync
User sees their own presence in a sync
User sees updates to presence from other users in the incremental sync.
Presence changes are reported to local room members
Presence changes are also reported to remote room members
Presence changes to UNAVAILABLE are reported to local room members
Presence changes to UNAVAILABLE are reported to remote room members
New federated private chats get full presence information (SYN-115)
/upgrade copies >100 power levels to the new room
Room state after a rejected message event is the same as before
Room state after a rejected state event is the same as before

View File

@ -31,12 +31,10 @@ type UserInternalAPI interface {
UserRegisterAPI
UserAccountAPI
UserThreePIDAPI
UserDeviceAPI
InputAccountData(ctx context.Context, req *InputAccountDataRequest, res *InputAccountDataResponse) error
PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error
PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error
PerformDeviceUpdate(ctx context.Context, req *PerformDeviceUpdateRequest, res *PerformDeviceUpdateResponse) error
PerformOpenIDTokenCreation(ctx context.Context, req *PerformOpenIDTokenCreationRequest, res *PerformOpenIDTokenCreationResponse) error
PerformKeyBackup(ctx context.Context, req *PerformKeyBackupRequest, res *PerformKeyBackupResponse) error
PerformPusherSet(ctx context.Context, req *PerformPusherSetRequest, res *struct{}) error
@ -45,15 +43,21 @@ type UserInternalAPI interface {
QueryKeyBackup(ctx context.Context, req *QueryKeyBackupRequest, res *QueryKeyBackupResponse)
QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error
QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error
QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error
QueryDeviceInfos(ctx context.Context, req *QueryDeviceInfosRequest, res *QueryDeviceInfosResponse) error
QueryOpenIDToken(ctx context.Context, req *QueryOpenIDTokenRequest, res *QueryOpenIDTokenResponse) error
QueryPushers(ctx context.Context, req *QueryPushersRequest, res *QueryPushersResponse) error
QueryPushRules(ctx context.Context, req *QueryPushRulesRequest, res *QueryPushRulesResponse) error
QueryNotifications(ctx context.Context, req *QueryNotificationsRequest, res *QueryNotificationsResponse) error
}
type UserDeviceAPI interface {
PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error
PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error
PerformDeviceUpdate(ctx context.Context, req *PerformDeviceUpdateRequest, res *PerformDeviceUpdateResponse) error
QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error
QueryDeviceInfos(ctx context.Context, req *QueryDeviceInfosRequest, res *QueryDeviceInfosResponse) error
}
type UserDirectoryProvider interface {
QuerySearchProfiles(ctx context.Context, req *QuerySearchProfilesRequest, res *QuerySearchProfilesResponse) error
}