mirror of
https://github.com/1f349/dendrite.git
synced 2024-11-09 22:42:58 +00:00
Set inactivity threshold on durable consumers in the roomserver input API (#2795)
This prevents us from holding onto durable consumers indefinitely for rooms that have long since turned inactive, since they do have a bit of a processing overhead in the NATS Server. If we clear up a consumer and then a room becomes active again, the consumer gets recreated as needed. The threshold is set to 24 hours for now, we can tweak it later if needs be.
This commit is contained in:
parent
eac5678449
commit
cd8f7e1251
@ -89,6 +89,13 @@ type Inputer struct {
|
|||||||
Queryer *query.Queryer
|
Queryer *query.Queryer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If a room consumer is inactive for a while then we will allow NATS
|
||||||
|
// to clean it up. This stops us from holding onto durable consumers
|
||||||
|
// indefinitely for rooms that might no longer be active, since they do
|
||||||
|
// have an interest overhead in the NATS Server. If the room becomes
|
||||||
|
// active again then we'll recreate the consumer anyway.
|
||||||
|
const inactiveThreshold = time.Hour * 24
|
||||||
|
|
||||||
type worker struct {
|
type worker struct {
|
||||||
phony.Inbox
|
phony.Inbox
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
@ -125,11 +132,12 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
|
|||||||
if _, err := w.r.JetStream.AddConsumer(
|
if _, err := w.r.JetStream.AddConsumer(
|
||||||
r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
|
r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
|
||||||
&nats.ConsumerConfig{
|
&nats.ConsumerConfig{
|
||||||
Durable: consumer,
|
Durable: consumer,
|
||||||
AckPolicy: nats.AckAllPolicy,
|
AckPolicy: nats.AckAllPolicy,
|
||||||
DeliverPolicy: nats.DeliverAllPolicy,
|
DeliverPolicy: nats.DeliverAllPolicy,
|
||||||
FilterSubject: subject,
|
FilterSubject: subject,
|
||||||
AckWait: MaximumMissingProcessingTime + (time.Second * 10),
|
AckWait: MaximumMissingProcessingTime + (time.Second * 10),
|
||||||
|
InactiveThreshold: inactiveThreshold,
|
||||||
},
|
},
|
||||||
); err != nil {
|
); err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
|
logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
|
||||||
@ -145,6 +153,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
|
|||||||
nats.DeliverAll(),
|
nats.DeliverAll(),
|
||||||
nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)),
|
nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)),
|
||||||
nats.Bind(r.InputRoomEventTopic, consumer),
|
nats.Bind(r.InputRoomEventTopic, consumer),
|
||||||
|
nats.InactiveThreshold(inactiveThreshold),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
|
logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
|
||||||
@ -180,6 +189,21 @@ func (r *Inputer) Start() error {
|
|||||||
nats.ReplayInstant(),
|
nats.ReplayInstant(),
|
||||||
nats.BindStream(r.InputRoomEventTopic),
|
nats.BindStream(r.InputRoomEventTopic),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Make sure that the room consumers have the right config.
|
||||||
|
stream := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent)
|
||||||
|
for consumer := range r.JetStream.Consumers(stream) {
|
||||||
|
switch {
|
||||||
|
case consumer.Config.Durable == "":
|
||||||
|
continue // Ignore ephemeral consumers
|
||||||
|
case consumer.Config.InactiveThreshold != inactiveThreshold:
|
||||||
|
consumer.Config.InactiveThreshold = inactiveThreshold
|
||||||
|
if _, cerr := r.JetStream.UpdateConsumer(stream, &consumer.Config); cerr != nil {
|
||||||
|
logrus.WithError(cerr).Warnf("Failed to update inactive threshold on consumer %q", consumer.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user