mirror of
https://github.com/1f349/dendrite.git
synced 2024-11-22 03:31:41 +00:00
Add possibilty to configure MaxMessageBytes for sarama (#1563)
* Add configuration for max_message_bytes for sarama * Log all errors when sending multiple messages Signed-off-by: Till Faelligen <tfaelligen@gmail.com> * Add missing config * - Better comments on what MaxMessageBytes is used for - Also sets the size the consumer may use
This commit is contained in:
parent
c5888bb64c
commit
d5675feb96
@ -76,6 +76,12 @@ global:
|
|||||||
# Kafka.
|
# Kafka.
|
||||||
use_naffka: true
|
use_naffka: true
|
||||||
|
|
||||||
|
# The max size a Kafka message is allowed to use.
|
||||||
|
# You only need to change this value, if you encounter issues with too large messages.
|
||||||
|
# Must be less than/equal to "max.message.bytes" configured in Kafka.
|
||||||
|
# Defaults to 8388608 bytes.
|
||||||
|
# max_message_bytes: 8388608
|
||||||
|
|
||||||
# Naffka database options. Not required when using Kafka.
|
# Naffka database options. Not required when using Kafka.
|
||||||
naffka_database:
|
naffka_database:
|
||||||
connection_string: file:naffka.db
|
connection_string: file:naffka.db
|
||||||
|
@ -24,6 +24,9 @@ type Kafka struct {
|
|||||||
UseNaffka bool `yaml:"use_naffka"`
|
UseNaffka bool `yaml:"use_naffka"`
|
||||||
// The Naffka database is used internally by the naffka library, if used.
|
// The Naffka database is used internally by the naffka library, if used.
|
||||||
Database DatabaseOptions `yaml:"naffka_database"`
|
Database DatabaseOptions `yaml:"naffka_database"`
|
||||||
|
// The max size a Kafka message passed between consumer/producer can have
|
||||||
|
// Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka
|
||||||
|
MaxMessageBytes *int `yaml:"max_message_bytes"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *Kafka) TopicFor(name string) string {
|
func (k *Kafka) TopicFor(name string) string {
|
||||||
@ -36,6 +39,9 @@ func (c *Kafka) Defaults() {
|
|||||||
c.Addresses = []string{"localhost:2181"}
|
c.Addresses = []string{"localhost:2181"}
|
||||||
c.Database.ConnectionString = DataSource("file:naffka.db")
|
c.Database.ConnectionString = DataSource("file:naffka.db")
|
||||||
c.TopicPrefix = "Dendrite"
|
c.TopicPrefix = "Dendrite"
|
||||||
|
|
||||||
|
maxBytes := 1024 * 1024 * 8 // about 8MB
|
||||||
|
c.MaxMessageBytes = &maxBytes
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
|
func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
|
||||||
@ -50,4 +56,5 @@ func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
|
|||||||
checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses)))
|
checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses)))
|
||||||
}
|
}
|
||||||
checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix))
|
checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix))
|
||||||
|
checkPositive(configErrs, "global.kafka.max_message_bytes", int64(*c.MaxMessageBytes))
|
||||||
}
|
}
|
||||||
|
@ -17,12 +17,17 @@ func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProdu
|
|||||||
|
|
||||||
// setupKafka creates kafka consumer/producer pair from the config.
|
// setupKafka creates kafka consumer/producer pair from the config.
|
||||||
func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
|
func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
|
||||||
consumer, err := sarama.NewConsumer(cfg.Addresses, nil)
|
sCfg := sarama.NewConfig()
|
||||||
|
sCfg.Producer.MaxMessageBytes = *cfg.MaxMessageBytes
|
||||||
|
sCfg.Producer.Return.Successes = true
|
||||||
|
sCfg.Consumer.Fetch.Default = int32(*cfg.MaxMessageBytes)
|
||||||
|
|
||||||
|
consumer, err := sarama.NewConsumer(cfg.Addresses, sCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panic("failed to start kafka consumer")
|
logrus.WithError(err).Panic("failed to start kafka consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
producer, err := sarama.NewSyncProducer(cfg.Addresses, nil)
|
producer, err := sarama.NewSyncProducer(cfg.Addresses, sCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panic("failed to setup kafka producers")
|
logrus.WithError(err).Panic("failed to setup kafka producers")
|
||||||
}
|
}
|
||||||
|
@ -102,7 +102,13 @@ func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) er
|
|||||||
Value: sarama.ByteEncoder(value),
|
Value: sarama.ByteEncoder(value),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return r.Producer.SendMessages(messages)
|
errs := r.Producer.SendMessages(messages)
|
||||||
|
if errs != nil {
|
||||||
|
for _, err := range errs.(sarama.ProducerErrors) {
|
||||||
|
log.WithError(err).WithField("message_bytes", err.Msg.Value.Length()).Error("Write to kafka failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return errs
|
||||||
}
|
}
|
||||||
|
|
||||||
// InputRoomEvents implements api.RoomserverInternalAPI
|
// InputRoomEvents implements api.RoomserverInternalAPI
|
||||||
|
Loading…
Reference in New Issue
Block a user