mirror of
https://github.com/1f349/dendrite.git
synced 2024-11-09 22:42:58 +00:00
Try to recover from corrupted NATS streams in memory temporarily (#2301)
This commit is contained in:
parent
5e780d3ca2
commit
e6d4bdeed5
@ -1,11 +1,13 @@
|
|||||||
package jetstream
|
package jetstream
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/getsentry/sentry-go"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/setup/process"
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
@ -20,7 +22,7 @@ var natsServerMutex sync.Mutex
|
|||||||
func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
|
func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
|
||||||
// check if we need an in-process NATS Server
|
// check if we need an in-process NATS Server
|
||||||
if len(cfg.Addresses) != 0 {
|
if len(cfg.Addresses) != 0 {
|
||||||
return setupNATS(cfg, nil)
|
return setupNATS(process, cfg, nil)
|
||||||
}
|
}
|
||||||
natsServerMutex.Lock()
|
natsServerMutex.Lock()
|
||||||
if natsServer == nil {
|
if natsServer == nil {
|
||||||
@ -56,10 +58,10 @@ func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Fatalln("Failed to create NATS client")
|
logrus.Fatalln("Failed to create NATS client")
|
||||||
}
|
}
|
||||||
return setupNATS(cfg, nc)
|
return setupNATS(process, cfg, nc)
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
|
func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
|
||||||
if nc == nil {
|
if nc == nil {
|
||||||
var err error
|
var err error
|
||||||
nc, err = natsclient.Connect(strings.Join(cfg.Addresses, ","))
|
nc, err = natsclient.Connect(strings.Join(cfg.Addresses, ","))
|
||||||
@ -117,7 +119,40 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
|
|||||||
namespaced.Name = name
|
namespaced.Name = name
|
||||||
namespaced.Subjects = subjects
|
namespaced.Subjects = subjects
|
||||||
if _, err = s.AddStream(&namespaced); err != nil {
|
if _, err = s.AddStream(&namespaced); err != nil {
|
||||||
logrus.WithError(err).WithField("stream", name).WithField("subjects", subjects).Fatal("Unable to add stream")
|
logger := logrus.WithError(err).WithFields(logrus.Fields{
|
||||||
|
"stream": namespaced.Name,
|
||||||
|
"subjects": namespaced.Subjects,
|
||||||
|
})
|
||||||
|
|
||||||
|
// If the stream was supposed to be in-memory to begin with
|
||||||
|
// then an error here is fatal so we'll give up.
|
||||||
|
if namespaced.Storage == natsclient.MemoryStorage {
|
||||||
|
logger.WithError(err).Fatal("Unable to add in-memory stream")
|
||||||
|
}
|
||||||
|
|
||||||
|
// The stream was supposed to be on disk. Let's try starting
|
||||||
|
// Dendrite with the stream in-memory instead. That'll mean that
|
||||||
|
// we can't recover anything that was queued on the disk but we
|
||||||
|
// will still be able to start and run hopefully in the meantime.
|
||||||
|
logger.WithError(err).Error("Unable to add stream")
|
||||||
|
sentry.CaptureException(fmt.Errorf("Unable to add stream %q: %w", namespaced.Name, err))
|
||||||
|
|
||||||
|
namespaced.Storage = natsclient.MemoryStorage
|
||||||
|
if _, err = s.AddStream(&namespaced); err != nil {
|
||||||
|
// We tried to add the stream in-memory instead but something
|
||||||
|
// went wrong. That's an unrecoverable situation so we will
|
||||||
|
// give up at this point.
|
||||||
|
logger.WithError(err).Fatal("Unable to add in-memory stream")
|
||||||
|
}
|
||||||
|
|
||||||
|
if stream.Storage != namespaced.Storage {
|
||||||
|
// We've managed to add the stream in memory. What's on the
|
||||||
|
// disk will be left alone, but our ability to recover from a
|
||||||
|
// future crash will be limited. Yell about it.
|
||||||
|
sentry.CaptureException(fmt.Errorf("Stream %q is running in-memory; this may be due to data corruption in the JetStream storage directory, investigate as soon as possible", namespaced.Name))
|
||||||
|
logrus.Warn("Stream is running in-memory; this may be due to data corruption in the JetStream storage directory, investigate as soon as possible")
|
||||||
|
process.Degraded()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,13 +2,19 @@ package process
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/getsentry/sentry-go"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
"go.uber.org/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ProcessContext struct {
|
type ProcessContext struct {
|
||||||
wg *sync.WaitGroup // used to wait for components to shutdown
|
wg *sync.WaitGroup // used to wait for components to shutdown
|
||||||
ctx context.Context // cancelled when Stop is called
|
ctx context.Context // cancelled when Stop is called
|
||||||
shutdown context.CancelFunc // shut down Dendrite
|
shutdown context.CancelFunc // shut down Dendrite
|
||||||
|
degraded atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProcessContext() *ProcessContext {
|
func NewProcessContext() *ProcessContext {
|
||||||
@ -43,3 +49,14 @@ func (b *ProcessContext) WaitForShutdown() <-chan struct{} {
|
|||||||
func (b *ProcessContext) WaitForComponentsToFinish() {
|
func (b *ProcessContext) WaitForComponentsToFinish() {
|
||||||
b.wg.Wait()
|
b.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *ProcessContext) Degraded() {
|
||||||
|
if b.degraded.CAS(false, true) {
|
||||||
|
logrus.Warn("Dendrite is running in a degraded state")
|
||||||
|
sentry.CaptureException(fmt.Errorf("Process is running in a degraded state"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *ProcessContext) IsDegraded() bool {
|
||||||
|
return b.degraded.Load()
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user