Send presence to newly added servers (#2869)

This should make `New federated private chats get full presence
information (SYN-115)` happy.
This commit is contained in:
Till 2022-11-11 10:35:17 +01:00 committed by GitHub
parent efa50253f6
commit 0193549201
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 107 additions and 18 deletions

View File

@ -18,6 +18,10 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
syncAPITypes "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
@ -34,14 +38,16 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
ctx context.Context
cfg *config.FederationAPI
rsAPI api.FederationRoomserverAPI
jetstream nats.JetStreamContext
durable string
db storage.Database
queues *queue.OutgoingQueues
topic string
ctx context.Context
cfg *config.FederationAPI
rsAPI api.FederationRoomserverAPI
jetstream nats.JetStreamContext
natsClient *nats.Conn
durable string
db storage.Database
queues *queue.OutgoingQueues
topic string
topicPresence string
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@ -49,19 +55,22 @@ func NewOutputRoomEventConsumer(
process *process.ProcessContext,
cfg *config.FederationAPI,
js nats.JetStreamContext,
natsClient *nats.Conn,
queues *queue.OutgoingQueues,
store storage.Database,
rsAPI api.FederationRoomserverAPI,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
ctx: process.Context(),
cfg: cfg,
jetstream: js,
db: store,
queues: queues,
rsAPI: rsAPI,
durable: cfg.Matrix.JetStream.Durable("FederationAPIRoomServerConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
ctx: process.Context(),
cfg: cfg,
jetstream: js,
natsClient: natsClient,
db: store,
queues: queues,
rsAPI: rsAPI,
durable: cfg.Matrix.JetStream.Durable("FederationAPIRoomServerConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
topicPresence: cfg.Matrix.JetStream.Prefixed(jetstream.RequestPresence),
}
}
@ -146,6 +155,7 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee
// processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent, rewritesState bool) error {
addsStateEvents, missingEventIDs := ore.NeededStateEventIDs()
// Ask the roomserver and add in the rest of the results into the set.
@ -184,6 +194,14 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent, rew
return err
}
// If we added new hosts, inform them about our known presence events for this room
if len(addsJoinedHosts) > 0 && ore.Event.Type() == gomatrixserverlib.MRoomMember && ore.Event.StateKey() != nil {
membership, _ := ore.Event.Membership()
if membership == gomatrixserverlib.Join {
s.sendPresence(ore.Event.RoomID(), addsJoinedHosts)
}
}
if oldJoinedHosts == nil {
// This means that there is nothing to update as this is a duplicate
// message.
@ -213,6 +231,76 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent, rew
)
}
func (s *OutputRoomEventConsumer) sendPresence(roomID string, addedJoined []types.JoinedHost) {
joined := make([]gomatrixserverlib.ServerName, len(addedJoined))
for _, added := range addedJoined {
joined = append(joined, added.ServerName)
}
// get our locally joined users
var queryRes api.QueryMembershipsForRoomResponse
err := s.rsAPI.QueryMembershipsForRoom(s.ctx, &api.QueryMembershipsForRoomRequest{
JoinedOnly: true,
LocalOnly: true,
RoomID: roomID,
}, &queryRes)
if err != nil {
log.WithError(err).Error("failed to calculate joined rooms for user")
return
}
// send every presence we know about to the remote server
content := types.Presence{}
for _, ev := range queryRes.JoinEvents {
msg := nats.NewMsg(s.topicPresence)
msg.Header.Set(jetstream.UserID, ev.Sender)
var presence *nats.Msg
presence, err = s.natsClient.RequestMsg(msg, time.Second*10)
if err != nil {
log.WithError(err).Errorf("unable to get presence")
continue
}
statusMsg := presence.Header.Get("status_msg")
e := presence.Header.Get("error")
if e != "" {
continue
}
var lastActive int
lastActive, err = strconv.Atoi(presence.Header.Get("last_active_ts"))
if err != nil {
continue
}
p := syncAPITypes.PresenceInternal{LastActiveTS: gomatrixserverlib.Timestamp(lastActive)}
content.Push = append(content.Push, types.PresenceContent{
CurrentlyActive: p.CurrentlyActive(),
LastActiveAgo: p.LastActiveAgo(),
Presence: presence.Header.Get("presence"),
StatusMsg: &statusMsg,
UserID: ev.Sender,
})
}
if len(content.Push) == 0 {
return
}
edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MPresence,
Origin: string(s.cfg.Matrix.ServerName),
}
if edu.Content, err = json.Marshal(content); err != nil {
log.WithError(err).Error("failed to marshal EDU JSON")
return
}
if err := s.queues.SendEDU(edu, s.cfg.Matrix.ServerName, joined); err != nil {
log.WithError(err).Error("failed to send EDU")
}
}
// joinedHostsAtEvent works out a list of matrix servers that were joined to
// the room at the event (including peeking ones)
// It is important to use the state at the event for sending messages because:

View File

@ -118,7 +118,7 @@ func NewInternalAPI(
stats := statistics.NewStatistics(federationDB, cfg.FederationMaxRetries+1)
js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
js, nats := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
queues := queue.NewOutgoingQueues(
federationDB, base.ProcessContext,
@ -132,7 +132,7 @@ func NewInternalAPI(
)
rsConsumer := consumers.NewOutputRoomEventConsumer(
base.ProcessContext, cfg, js, queues,
base.ProcessContext, cfg, js, nats, queues,
federationDB, rsAPI,
)
if err = rsConsumer.Start(); err != nil {

View File

@ -177,6 +177,7 @@ type FederationRoomserverAPI interface {
QueryBulkStateContentAPI
// QueryServerBannedFromRoom returns whether a server is banned from a room by server ACLs.
QueryServerBannedFromRoom(ctx context.Context, req *QueryServerBannedFromRoomRequest, res *QueryServerBannedFromRoomResponse) error
QueryMembershipsForRoom(ctx context.Context, req *QueryMembershipsForRoomRequest, res *QueryMembershipsForRoomResponse) error
QueryRoomVersionForRoom(ctx context.Context, req *QueryRoomVersionForRoomRequest, res *QueryRoomVersionForRoomResponse) error
GetRoomIDForAlias(ctx context.Context, req *GetRoomIDForAliasRequest, res *GetRoomIDForAliasResponse) error
QueryEventsByID(ctx context.Context, req *QueryEventsByIDRequest, res *QueryEventsByIDResponse) error