dendrite/syncapi/consumers/roomserver.go
S7evinK 161f145176
Add NATS JetStream support (#1866)
* Add NATS JetStream support
Update shopify/sarama

* Fix addresses

* Don't change Addresses in Defaults

* Update saramajetstream

* Add missing error check

Keep typing events for at least one minute

* Use all configured NATS addresses

* Update saramajetstream

* Try setting up with NATS

* Make sure NATS uses own persistent directory (TODO: make this configurable)

* Update go.mod/go.sum

* Jetstream package

* Various other refactoring

* Build fixes

* Config tweaks, make random jetstream storage path for CI

* Disable interest policies

* Try to sane default on jetstream base path

* Try to use in-memory for CI

* Restore storage/retention

* Update nats.go dependency

* Adapt changes to config

* Remove unneeded TopicFor

* Dep update

* Revert "Remove unneeded TopicFor"

This reverts commit f5a4e4a339b6f94ec215778dca22204adaa893d1.

* Revert changes made to streams

* Fix build problems

* Update nats-server

* Update go.mod/go.sum

* Roomserver input API queuing using NATS

* Fix topic naming

* Prometheus metrics

* More refactoring to remove saramajetstream

* Add missing topic

* Don't try to populate map that doesn't exist

* Roomserver output topic

* Update go.mod/go.sum

* Message acknowledgements

* Ack tweaks

* Try to resume transaction re-sends

* Try to resume transaction re-sends

* Update to matrix-org/gomatrixserverlib@91dadfb

* Remove internal.PartitionStorer from components that don't consume keychanges

* Try to reduce re-allocations a bit in resolveConflictsV2

* Tweak delivery options on RS input

* Publish send-to-device messages into correct JetStream subject

* Async and sync roomserver input

* Update dendrite-config.yaml

* Remove roomserver tests for now (they need rewriting)

* Remove roomserver test again (was merged back in)

* Update documentation

* Docker updates

* More Docker updates

* Update Docker readme again

* Fix lint issues

* Send final event in `processEvent` synchronously (since this might stop Sytest from being so upset)

* Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that

* Go 1.16 instead of Go 1.13 for upgrade tests and Complement

* Revert "Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that"

This reverts commit 368675283fc44501f227639811bdb16dd5deef8c.

* Don't report any errors on `/send` to see what fun that creates

* Fix panics on closed channel sends

* Enforce state key matches sender

* Do the same for leave

* Various tweaks to make tests happier

Squashed commit of the following:

commit 13f9028e7a63662759ce7c55504a9d2423058668
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date:   Tue Jan 4 15:47:14 2022 +0000

    Do the same for leave

commit e6be7f05c349fafbdddfe818337a17a60c867be1
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date:   Tue Jan 4 15:33:42 2022 +0000

    Enforce state key matches sender

commit 85ede6d64bf10ce9b91cdd6d80f87350ee55242f
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date:   Tue Jan 4 14:07:04 2022 +0000

    Fix panics on closed channel sends

commit 9755494a98bed62450f8001d8128e40481d27e15
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date:   Tue Jan 4 13:38:22 2022 +0000

    Don't report any errors on `/send` to see what fun that creates

commit 3bb4f87b5dd56882febb4db5621db484c8789b7c
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date:   Tue Jan 4 13:00:26 2022 +0000

    Revert "Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that"

    This reverts commit 368675283fc44501f227639811bdb16dd5deef8c.

commit fe2673ed7be9559eaca134424e403a4faca100b0
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date:   Tue Jan 4 12:09:34 2022 +0000

    Go 1.16 instead of Go 1.13 for upgrade tests and Complement

commit 368675283fc44501f227639811bdb16dd5deef8c
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date:   Tue Jan 4 11:51:45 2022 +0000

    Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that

commit b028dfc08577bcf52e6cb498026e15fa5d46d07c
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date:   Tue Jan 4 10:29:08 2022 +0000

    Send final event in `processEvent` synchronously (since this might stop Sytest from being so upset)

* Merge in NATS Server v2.6.6 and nats.go v1.13 into the in-process connection fork

* Add `jetstream.WithJetStreamMessage` to make ack/nak-ing less messy, use process context in consumers

* Fix consumer component name in  federation API

* Add comment explaining where streams are defined

* Tweaks to roomserver input with comments

* Finish that sentence that I apparently forgot to finish in INSTALL.md

* Bump version number of config to 2

* Add comments around asynchronous sends to roomserver in processEventWithMissingState

* More useful error message when the config version does not match

* Set version in generate-config

* Fix version in config.Defaults

Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
2022-01-05 17:44:49 +00:00

388 lines
12 KiB
Go

// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumers
import (
"context"
"encoding/json"
"fmt"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
ctx context.Context
cfg *config.SyncAPI
rsAPI api.RoomserverInternalAPI
jetstream nats.JetStreamContext
topic string
db storage.Database
pduStream types.StreamProvider
inviteStream types.StreamProvider
notifier *notifier.Notifier
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
js nats.JetStreamContext,
store storage.Database,
notifier *notifier.Notifier,
pduStream types.StreamProvider,
inviteStream types.StreamProvider,
rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
ctx: process.Context(),
cfg: cfg,
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
db: store,
notifier: notifier,
pduStream: pduStream,
inviteStream: inviteStream,
rsAPI: rsAPI,
}
}
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
return err
}
// onMessage is called when the sync server receives a new event from the room server output log.
// It is not safe for this function to be called from multiple goroutines, or else the
// sync stream position may race and be incorrectly calculated.
func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
// Parse out the event JSON
var err error
var output api.OutputEvent
if err = json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return true
}
switch output.Type {
case api.OutputTypeNewRoomEvent:
// Ignore redaction events. We will add them to the database when they are
// validated (when we receive OutputTypeRedactedEvent)
event := output.NewRoomEvent.Event
if event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil {
// in the special case where the event redacts itself, just pass the message through because
// we will never see the other part of the pair
if event.Redacts() != event.EventID() {
return true
}
}
err = s.onNewRoomEvent(s.ctx, *output.NewRoomEvent)
case api.OutputTypeOldRoomEvent:
err = s.onOldRoomEvent(s.ctx, *output.OldRoomEvent)
case api.OutputTypeNewInviteEvent:
s.onNewInviteEvent(s.ctx, *output.NewInviteEvent)
case api.OutputTypeRetireInviteEvent:
s.onRetireInviteEvent(s.ctx, *output.RetireInviteEvent)
case api.OutputTypeNewPeek:
s.onNewPeek(s.ctx, *output.NewPeek)
case api.OutputTypeRetirePeek:
s.onRetirePeek(s.ctx, *output.RetirePeek)
case api.OutputTypeRedactedEvent:
err = s.onRedactEvent(s.ctx, *output.RedactedEvent)
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
}
if err != nil {
log.WithError(err).Error("roomserver output log: failed to process event")
return false
}
return true
})
}
func (s *OutputRoomEventConsumer) onRedactEvent(
ctx context.Context, msg api.OutputRedactedEvent,
) error {
err := s.db.RedactEvent(ctx, msg.RedactedEventID, msg.RedactedBecause)
if err != nil {
log.WithError(err).Error("RedactEvent error'd")
return err
}
// fake a room event so we notify clients about the redaction, as if it were
// a normal event.
return s.onNewRoomEvent(ctx, api.OutputNewRoomEvent{
Event: msg.RedactedBecause,
})
}
func (s *OutputRoomEventConsumer) onNewRoomEvent(
ctx context.Context, msg api.OutputNewRoomEvent,
) error {
ev := msg.Event
addsStateEvents := msg.AddsState()
ev, err := s.updateStateEvent(ev)
if err != nil {
sentry.CaptureException(err)
return err
}
for i := range addsStateEvents {
addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i])
if err != nil {
sentry.CaptureException(err)
return err
}
}
if msg.RewritesState {
if err = s.db.PurgeRoomState(ctx, ev.RoomID()); err != nil {
sentry.CaptureException(err)
return fmt.Errorf("s.db.PurgeRoom: %w", err)
}
}
pduPos, err := s.db.WriteEvent(
ctx,
ev,
addsStateEvents,
msg.AddsStateEventIDs,
msg.RemovesStateEventIDs,
msg.TransactionID,
false,
)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"event": string(ev.JSON()),
log.ErrorKey: err,
"add": msg.AddsStateEventIDs,
"del": msg.RemovesStateEventIDs,
}).Panicf("roomserver output log: write new event failure")
return nil
}
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
sentry.CaptureException(err)
return err
}
s.pduStream.Advance(pduPos)
s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
return nil
}
func (s *OutputRoomEventConsumer) onOldRoomEvent(
ctx context.Context, msg api.OutputOldRoomEvent,
) error {
ev := msg.Event
// TODO: The state key check when excluding from sync is designed
// to stop us from lying to clients with old state, whilst still
// allowing normal timeline events through. This is an absolute
// hack but until we have some better strategy for dealing with
// old events in the sync API, this should at least prevent us
// from confusing clients into thinking they've joined/left rooms.
pduPos, err := s.db.WriteEvent(
ctx,
ev,
[]*gomatrixserverlib.HeaderedEvent{},
[]string{}, // adds no state
[]string{}, // removes no state
nil, // no transaction
ev.StateKey() != nil, // exclude from sync?
)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"event": string(ev.JSON()),
log.ErrorKey: err,
}).Panicf("roomserver output log: write old event failure")
return nil
}
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
return err
}
s.pduStream.Advance(pduPos)
s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
return nil
}
func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, sp types.StreamPosition) (types.StreamPosition, error) {
if ev.Type() != gomatrixserverlib.MRoomMember {
return sp, nil
}
membership, err := ev.Membership()
if err != nil {
return sp, fmt.Errorf("ev.Membership: %w", err)
}
// TODO: check that it's a join and not a profile change (means unmarshalling prev_content)
if membership == gomatrixserverlib.Join {
// check it's a local join
_, domain, err := gomatrixserverlib.SplitID('@', *ev.StateKey())
if err != nil {
return sp, fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
}
if domain != s.cfg.Matrix.ServerName {
return sp, nil
}
// cancel any peeks for it
peekSP, peekErr := s.db.DeletePeeks(ctx, ev.RoomID(), *ev.StateKey())
if peekErr != nil {
return sp, fmt.Errorf("s.db.DeletePeeks: %w", peekErr)
}
if peekSP > 0 {
sp = peekSP
}
}
return sp, nil
}
func (s *OutputRoomEventConsumer) onNewInviteEvent(
ctx context.Context, msg api.OutputNewInviteEvent,
) {
if msg.Event.StateKey() == nil {
log.WithFields(log.Fields{
"event": string(msg.Event.JSON()),
}).Panicf("roomserver output log: invite has no state key")
return
}
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
if err != nil {
sentry.CaptureException(err)
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event_id": msg.Event.EventID(),
"event": string(msg.Event.JSON()),
"pdupos": pduPos,
log.ErrorKey: err,
}).Panicf("roomserver output log: write invite failure")
return
}
s.inviteStream.Advance(pduPos)
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey())
}
func (s *OutputRoomEventConsumer) onRetireInviteEvent(
ctx context.Context, msg api.OutputRetireInviteEvent,
) {
pduPos, err := s.db.RetireInviteEvent(ctx, msg.EventID)
if err != nil {
sentry.CaptureException(err)
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event_id": msg.EventID,
log.ErrorKey: err,
}).Panicf("roomserver output log: remove invite failure")
return
}
// Notify any active sync requests that the invite has been retired.
s.inviteStream.Advance(pduPos)
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID)
}
func (s *OutputRoomEventConsumer) onNewPeek(
ctx context.Context, msg api.OutputNewPeek,
) {
sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
if err != nil {
sentry.CaptureException(err)
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
log.ErrorKey: err,
}).Panicf("roomserver output log: write peek failure")
return
}
// tell the notifier about the new peek so it knows to wake up new devices
// TODO: This only works because the peeks table is reusing the same
// index as PDUs, but we should fix this
s.pduStream.Advance(sp)
s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
}
func (s *OutputRoomEventConsumer) onRetirePeek(
ctx context.Context, msg api.OutputRetirePeek,
) {
sp, err := s.db.DeletePeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
log.ErrorKey: err,
}).Panicf("roomserver output log: write peek failure")
return
}
// tell the notifier about the new peek so it knows to wake up new devices
// TODO: This only works because the peeks table is reusing the same
// index as PDUs, but we should fix this
s.pduStream.Advance(sp)
s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
}
func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.HeaderedEvent) (*gomatrixserverlib.HeaderedEvent, error) {
if event.StateKey() == nil {
return event, nil
}
stateKey := *event.StateKey()
prevEvent, err := s.db.GetStateEvent(
context.TODO(), event.RoomID(), event.Type(), stateKey,
)
if err != nil {
return event, err
}
if prevEvent == nil || prevEvent.EventID() == event.EventID() {
return event, nil
}
prev := types.PrevEventRef{
PrevContent: prevEvent.Content(),
ReplacesState: prevEvent.EventID(),
PrevSender: prevEvent.Sender(),
}
event.Event, err = event.SetUnsigned(prev)
return event, err
}