Use pull consumers (#2140)

* Pull consumers

* Pull consumers

* Only nuke consumers if they are push consumers

* Clean up old consumers

* Better error handling

* Update comments
This commit is contained in:
Neil Alexander 2022-02-02 13:32:48 +00:00 committed by GitHub
parent 2dee706f9e
commit c773b038bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 521 additions and 459 deletions

View File

@ -34,7 +34,7 @@ import (
type OutputRoomEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
durable string
topic string
asDB storage.Database
rsAPI api.RoomserverInternalAPI
@ -66,14 +66,15 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
}
// onMessage is called when the appservice component receives a new event from
// the room server output log.
func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
@ -96,7 +97,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
}
return true
})
}
// filterRoomserverEvents takes in events and decides whether any of them need

View File

@ -34,7 +34,7 @@ import (
type OutputEDUConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
durable string
db storage.Database
queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName
@ -66,13 +66,22 @@ func NewOutputEDUConsumer(
// Start consuming from EDU servers
func (t *OutputEDUConsumer) Start() error {
if _, err := t.jetstream.Subscribe(t.typingTopic, t.onTypingEvent, t.durable); err != nil {
if err := jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.typingTopic, t.durable, t.onTypingEvent,
nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}
if _, err := t.jetstream.Subscribe(t.sendToDeviceTopic, t.onSendToDeviceEvent, t.durable); err != nil {
if err := jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.sendToDeviceTopic, t.durable, t.onSendToDeviceEvent,
nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}
if _, err := t.jetstream.Subscribe(t.receiptTopic, t.onReceiptEvent, t.durable); err != nil {
if err := jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.receiptTopic, t.durable, t.onReceiptEvent,
nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}
return nil
@ -80,9 +89,8 @@ func (t *OutputEDUConsumer) Start() error {
// onSendToDeviceEvent is called in response to a message received on the
// send-to-device events topic from the EDU server.
func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) {
func (t *OutputEDUConsumer) onSendToDeviceEvent(ctx context.Context, msg *nats.Msg) bool {
// Extract the send-to-device event from msg.
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
var ote api.OutputSendToDeviceEvent
if err := json.Unmarshal(msg.Data, &ote); err != nil {
log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)")
@ -133,13 +141,11 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) {
}
return true
})
}
// onTypingEvent is called in response to a message received on the typing
// events topic from the EDU server.
func (t *OutputEDUConsumer) onTypingEvent(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
func (t *OutputEDUConsumer) onTypingEvent(ctx context.Context, msg *nats.Msg) bool {
// Extract the typing event from msg.
var ote api.OutputTypingEvent
if err := json.Unmarshal(msg.Data, &ote); err != nil {
@ -160,7 +166,7 @@ func (t *OutputEDUConsumer) onTypingEvent(msg *nats.Msg) {
return true
}
joined, err := t.db.GetJoinedHosts(t.ctx, ote.Event.RoomID)
joined, err := t.db.GetJoinedHosts(ctx, ote.Event.RoomID)
if err != nil {
log.WithError(err).WithField("room_id", ote.Event.RoomID).Error("failed to get joined hosts for room")
return false
@ -187,13 +193,11 @@ func (t *OutputEDUConsumer) onTypingEvent(msg *nats.Msg) {
}
return true
})
}
// onReceiptEvent is called in response to a message received on the receipt
// events topic from the EDU server.
func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
func (t *OutputEDUConsumer) onReceiptEvent(ctx context.Context, msg *nats.Msg) bool {
// Extract the typing event from msg.
var receipt api.OutputReceiptEvent
if err := json.Unmarshal(msg.Data, &receipt); err != nil {
@ -212,7 +216,7 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) {
return true
}
joined, err := t.db.GetJoinedHosts(t.ctx, receipt.RoomID)
joined, err := t.db.GetJoinedHosts(ctx, receipt.RoomID)
if err != nil {
log.WithError(err).WithField("room_id", receipt.RoomID).Error("failed to get joined hosts for room")
return false
@ -250,5 +254,4 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) {
}
return true
})
}

View File

@ -37,7 +37,7 @@ type OutputRoomEventConsumer struct {
cfg *config.FederationAPI
rsAPI api.RoomserverInternalAPI
jetstream nats.JetStreamContext
durable nats.SubOpt
durable string
db storage.Database
queues *queue.OutgoingQueues
topic string
@ -66,20 +66,17 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(
s.topic, s.onMessage, s.durable,
nats.DeliverAll(),
nats.ManualAck(),
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
return err
}
// onMessage is called when the federation server receives a new event from the room server output log.
// It is unsafe to call this with messages for the same room in multiple gorountines
// because updates it will likely fail with a types.EventIDMismatchError when it
// realises that it cannot update the room state using the deltas.
func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
@ -133,7 +130,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
}
return true
})
}
// processInboundPeek starts tracking a new federated inbound peek (replacing the existing one if any)

View File

@ -41,7 +41,7 @@ type RoomserverInternalAPI struct {
fsAPI fsAPI.FederationInternalAPI
asAPI asAPI.AppServiceQueryAPI
JetStream nats.JetStreamContext
Durable nats.SubOpt
Durable string
InputRoomEventTopic string // JetStream topic for new input room events
OutputRoomEventTopic string // JetStream topic for new output room events
PerspectiveServerNames []gomatrixserverlib.ServerName
@ -87,7 +87,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
InputRoomEventTopic: r.InputRoomEventTopic,
OutputRoomEventTopic: r.OutputRoomEventTopic,
JetStream: r.JetStream,
Durable: r.Durable,
Durable: nats.Durable(r.Durable),
ServerName: r.Cfg.Matrix.ServerName,
FSAPI: fsAPI,
KeyRing: keyRing,

View File

@ -2,8 +2,6 @@ package config
import (
"fmt"
"github.com/nats-io/nats.go"
)
type JetStream struct {
@ -25,8 +23,8 @@ func (c *JetStream) TopicFor(name string) string {
return fmt.Sprintf("%s%s", c.TopicPrefix, name)
}
func (c *JetStream) Durable(name string) nats.SubOpt {
return nats.Durable(c.TopicFor(name))
func (c *JetStream) Durable(name string) string {
return c.TopicFor(name)
}
func (c *JetStream) Defaults(generate bool) {

View File

@ -1,12 +1,81 @@
package jetstream
import "github.com/nats-io/nats.go"
import (
"context"
"fmt"
func WithJetStreamMessage(msg *nats.Msg, f func(msg *nats.Msg) bool) {
_ = msg.InProgress()
if f(msg) {
_ = msg.Ack()
} else {
_ = msg.Nak()
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
func JetStreamConsumer(
ctx context.Context, js nats.JetStreamContext, subj, durable string,
f func(ctx context.Context, msg *nats.Msg) bool,
opts ...nats.SubOpt,
) error {
defer func() {
// If there are existing consumers from before they were pull
// consumers, we need to clean up the old push consumers. However,
// in order to not affect the interest-based policies, we need to
// do this *after* creating the new pull consumers, which have
// "Pull" suffixed to their name.
if _, err := js.ConsumerInfo(subj, durable); err == nil {
if err := js.DeleteConsumer(subj, durable); err != nil {
logrus.WithContext(ctx).Warnf("Failed to clean up old consumer %q", durable)
}
}
}()
name := durable + "Pull"
sub, err := js.PullSubscribe(subj, name, opts...)
if err != nil {
return fmt.Errorf("nats.SubscribeSync: %w", err)
}
go func() {
for {
// The context behaviour here is surprising — we supply a context
// so that we can interrupt the fetch if we want, but NATS will still
// enforce its own deadline (roughly 5 seconds by default). Therefore
// it is our responsibility to check whether our context expired or
// not when a context error is returned. Footguns. Footguns everywhere.
msgs, err := sub.Fetch(1, nats.Context(ctx))
if err != nil {
if err == context.Canceled || err == context.DeadlineExceeded {
// Work out whether it was the JetStream context that expired
// or whether it was our supplied context.
select {
case <-ctx.Done():
// The supplied context expired, so we want to stop the
// consumer altogether.
return
default:
// The JetStream context expired, so the fetch probably
// just timed out and we should try again.
continue
}
} else {
// Something else went wrong, so we'll panic.
logrus.WithContext(ctx).WithField("subject", subj).Fatal(err)
}
}
if len(msgs) < 1 {
continue
}
msg := msgs[0]
if err = msg.InProgress(); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
continue
}
if f(ctx, msg) {
if err = msg.Ack(); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Ack: %w", err))
}
} else {
if err = msg.Nak(); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err))
}
}
}
}()
return nil
}

View File

@ -34,7 +34,7 @@ import (
type OutputClientDataConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
durable string
topic string
db storage.Database
stream types.StreamProvider
@ -63,15 +63,16 @@ func NewOutputClientDataConsumer(
// Start consuming from room servers
func (s *OutputClientDataConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
}
// onMessage is called when the sync server receives a new event from the client API server output log.
// It is not safe for this function to be called from multiple goroutines, or else the
// sync stream position may race and be incorrectly calculated.
func (s *OutputClientDataConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
// Parse out the event JSON
userID := msg.Header.Get(jetstream.UserID)
var output eventutil.AccountData
@ -103,5 +104,4 @@ func (s *OutputClientDataConsumer) onMessage(msg *nats.Msg) {
s.notifier.OnNewAccountData(userID, types.StreamingToken{AccountDataPosition: streamPos})
return true
})
}

View File

@ -34,7 +34,7 @@ import (
type OutputReceiptEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
durable string
topic string
db storage.Database
stream types.StreamProvider
@ -64,12 +64,13 @@ func NewOutputReceiptEventConsumer(
// Start consuming from EDU api
func (s *OutputReceiptEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
}
func (s *OutputReceiptEventConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var output api.OutputReceiptEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
@ -95,5 +96,4 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *nats.Msg) {
s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
return true
})
}

View File

@ -36,7 +36,7 @@ import (
type OutputSendToDeviceEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
durable string
topic string
db storage.Database
serverName gomatrixserverlib.ServerName // our server name
@ -68,12 +68,13 @@ func NewOutputSendToDeviceEventConsumer(
// Start consuming from EDU api
func (s *OutputSendToDeviceEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
}
func (s *OutputSendToDeviceEventConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var output api.OutputSendToDeviceEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
@ -115,5 +116,4 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *nats.Msg) {
)
return true
})
}

View File

@ -35,7 +35,7 @@ import (
type OutputTypingEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
durable string
topic string
eduCache *cache.EDUCache
stream types.StreamProvider
@ -66,12 +66,13 @@ func NewOutputTypingEventConsumer(
// Start consuming from EDU api
func (s *OutputTypingEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
}
func (s *OutputTypingEventConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
func (s *OutputTypingEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var output api.OutputTypingEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
@ -102,5 +103,4 @@ func (s *OutputTypingEventConsumer) onMessage(msg *nats.Msg) {
s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos})
return true
})
}

View File

@ -38,7 +38,7 @@ type OutputRoomEventConsumer struct {
cfg *config.SyncAPI
rsAPI api.RoomserverInternalAPI
jetstream nats.JetStreamContext
durable nats.SubOpt
durable string
topic string
db storage.Database
pduStream types.StreamProvider
@ -73,19 +73,16 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(
s.topic, s.onMessage, s.durable,
nats.DeliverAll(),
nats.ManualAck(),
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
return err
}
// onMessage is called when the sync server receives a new event from the room server output log.
// It is not safe for this function to be called from multiple goroutines, or else the
// sync stream position may race and be incorrectly calculated.
func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
// Parse out the event JSON
var err error
var output api.OutputEvent
@ -131,7 +128,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
}
return true
})
}
func (s *OutputRoomEventConsumer) onRedactEvent(