Handle m.room.tombstone events in the UserAPI (#2864)

Fixes #2863 and makes 
```
/upgrade preserves direct room state
local user has tags copied to the new room
remote user has tags copied to the new room
```
pass.
This commit is contained in:
Till 2022-11-07 09:47:18 +01:00 committed by GitHub
parent a7b74176e3
commit c125203eb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 120 additions and 2 deletions

View File

@ -759,3 +759,6 @@ Can filter rooms/{roomId}/members
Current state appears in timeline in private history with many messages after
AS can publish rooms in their own list
AS and main public room lists are separate
/upgrade preserves direct room state
local user has tags copied to the new room
remote user has tags copied to the new room

View File

@ -2,12 +2,16 @@ package consumers
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/tidwall/gjson"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
@ -185,13 +189,115 @@ func (s *OutputRoomEventConsumer) storeMessageStats(ctx context.Context, eventTy
}
}
func (s *OutputRoomEventConsumer) handleRoomUpgrade(ctx context.Context, oldRoomID, newRoomID string, localMembers []*localMembership, roomSize int) error {
for _, membership := range localMembers {
// Copy any existing push rules from old -> new room
if err := s.copyPushrules(ctx, oldRoomID, newRoomID, membership.Localpart); err != nil {
return err
}
// preserve m.direct room state
if err := s.updateMDirect(ctx, oldRoomID, newRoomID, membership.Localpart, roomSize); err != nil {
return err
}
// copy existing m.tag entries, if any
if err := s.copyTags(ctx, oldRoomID, newRoomID, membership.Localpart); err != nil {
return err
}
}
return nil
}
func (s *OutputRoomEventConsumer) copyPushrules(ctx context.Context, oldRoomID, newRoomID string, localpart string) error {
pushRules, err := s.db.QueryPushRules(ctx, localpart)
if err != nil {
return fmt.Errorf("failed to query pushrules for user: %w", err)
}
if pushRules == nil {
return nil
}
for _, roomRule := range pushRules.Global.Room {
if roomRule.RuleID != oldRoomID {
continue
}
cpRool := *roomRule
cpRool.RuleID = newRoomID
pushRules.Global.Room = append(pushRules.Global.Room, &cpRool)
rules, err := json.Marshal(pushRules)
if err != nil {
return err
}
if err = s.db.SaveAccountData(ctx, localpart, "", "m.push_rules", rules); err != nil {
return fmt.Errorf("failed to update pushrules: %w", err)
}
}
return nil
}
// updateMDirect copies the "is_direct" flag from oldRoomID to newROomID
func (s *OutputRoomEventConsumer) updateMDirect(ctx context.Context, oldRoomID, newRoomID, localpart string, roomSize int) error {
// this is most likely not a DM, so skip updating m.direct state
if roomSize > 2 {
return nil
}
// Get direct message state
directChatsRaw, err := s.db.GetAccountDataByType(ctx, localpart, "", "m.direct")
if err != nil {
return fmt.Errorf("failed to get m.direct from database: %w", err)
}
directChats := gjson.ParseBytes(directChatsRaw)
newDirectChats := make(map[string][]string)
// iterate over all userID -> roomIDs
directChats.ForEach(func(userID, roomIDs gjson.Result) bool {
var found bool
for _, roomID := range roomIDs.Array() {
newDirectChats[userID.Str] = append(newDirectChats[userID.Str], roomID.Str)
// add the new roomID to m.direct
if roomID.Str == oldRoomID {
found = true
newDirectChats[userID.Str] = append(newDirectChats[userID.Str], newRoomID)
}
}
// Only hit the database if we found the old room as a DM for this user
if found {
var data []byte
data, err = json.Marshal(newDirectChats)
if err != nil {
return true
}
if err = s.db.SaveAccountData(ctx, localpart, "", "m.direct", data); err != nil {
return true
}
}
return true
})
if err != nil {
return fmt.Errorf("failed to update m.direct state")
}
return nil
}
func (s *OutputRoomEventConsumer) copyTags(ctx context.Context, oldRoomID, newRoomID, localpart string) error {
tag, err := s.db.GetAccountDataByType(ctx, localpart, oldRoomID, "m.tag")
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return err
}
if tag == nil {
return nil
}
return s.db.SaveAccountData(ctx, localpart, newRoomID, "m.tag", tag)
}
func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, streamPos uint64) error {
members, roomSize, err := s.localRoomMembers(ctx, event.RoomID())
if err != nil {
return fmt.Errorf("s.localRoomMembers: %w", err)
}
if event.Type() == gomatrixserverlib.MRoomMember {
switch {
case event.Type() == gomatrixserverlib.MRoomMember:
cevent := gomatrixserverlib.HeaderedToClientEvent(event, gomatrixserverlib.FormatAll)
var member *localMembership
member, err = newLocalMembership(&cevent)
@ -203,6 +309,15 @@ func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gom
// should also be pushed to the target user.
members = append(members, member)
}
case event.Type() == "m.room.tombstone" && event.StateKeyEquals(""):
// Handle room upgrades
oldRoomID := event.RoomID()
newRoomID := gjson.GetBytes(event.Content(), "replacement_room").Str
if err = s.handleRoomUpgrade(ctx, oldRoomID, newRoomID, members, roomSize); err != nil {
// while inconvenient, this shouldn't stop us from sending push notifications
log.WithError(err).Errorf("UserAPI: failed to handle room upgrade for users")
}
}
// TODO: run in parallel with localRoomMembers.