Optionally use naffka in the monolithic server (#183)

* dependency injection for the kafka consumers/producers

* Optionally use naffka in the monolithic server

* remember to call setupKafka()

* tweak imports

* fix integration tests

* Add use_naffka to the example config

* Update comment on the listen APIs
This commit is contained in:
Mark Haines 2017-08-16 13:36:41 +01:00 committed by GitHub
parent 0d894e3da5
commit c27d1fdfb4
14 changed files with 195 additions and 131 deletions

View File

@ -52,9 +52,13 @@ media:
kafka: kafka:
# Where the kafka servers are running. # Where the kafka servers are running.
addresses: ["localhost:9092"] addresses: ["localhost:9092"]
# Whether to use naffka instead of kafka.
# Naffka can only be used when running dendrite as a single monolithic server.
# Kafka can be used both with a monolithic server and when running the
# components as separate servers.
use_naffka: false
# The names of the kafka topics to use. # The names of the kafka topics to use.
topics: topics:
input_room_event: roomserverInput
output_room_event: roomserverOutput output_room_event: roomserverOutput
output_client_data: clientapiOutput output_client_data: clientapiOutput
user_updates: userUpdates user_updates: userUpdates
@ -71,6 +75,7 @@ database:
# The TCP host:port pairs to bind the internal HTTP APIs to. # The TCP host:port pairs to bind the internal HTTP APIs to.
# These shouldn't be exposed to the public internet. # These shouldn't be exposed to the public internet.
# These aren't needed when running dendrite as a monolithic server.
listen: listen:
room_server: "localhost:7770" room_server: "localhost:7770"
client_api: "localhost:7771" client_api: "localhost:7771"

View File

@ -17,12 +17,13 @@ package consumers
import ( import (
"encoding/json" "encoding/json"
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/Sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1" sarama "gopkg.in/Shopify/sarama.v1"
) )
@ -35,12 +36,12 @@ type OutputRoomEvent struct {
} }
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEvent(cfg *config.Dendrite, store *accounts.Database) (*OutputRoomEvent, error) { func NewOutputRoomEvent(
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) cfg *config.Dendrite,
if err != nil { kafkaConsumer sarama.Consumer,
return nil, err store *accounts.Database,
} queryAPI api.RoomserverQueryAPI,
roomServerURL := cfg.RoomServerURL() ) *OutputRoomEvent {
consumer := common.ContinualConsumer{ consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent), Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
@ -50,12 +51,12 @@ func NewOutputRoomEvent(cfg *config.Dendrite, store *accounts.Database) (*Output
s := &OutputRoomEvent{ s := &OutputRoomEvent{
roomServerConsumer: &consumer, roomServerConsumer: &consumer,
db: store, db: store,
query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil), query: queryAPI,
serverName: string(cfg.Matrix.ServerName), serverName: string(cfg.Matrix.ServerName),
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
return s, nil return s
} }
// Start consuming from room servers // Start consuming from room servers

View File

@ -28,18 +28,6 @@ type SyncAPIProducer struct {
Producer sarama.SyncProducer Producer sarama.SyncProducer
} }
// NewSyncAPIProducer creates a new SyncAPIProducer
func NewSyncAPIProducer(kafkaURIs []string, topic string) (*SyncAPIProducer, error) {
producer, err := sarama.NewSyncProducer(kafkaURIs, nil)
if err != nil {
return nil, err
}
return &SyncAPIProducer{
Topic: topic,
Producer: producer,
}, nil
}
// SendData sends account data to the sync API server // SendData sends account data to the sync API server
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error { func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error {
var m sarama.ProducerMessage var m sarama.ProducerMessage

View File

@ -34,18 +34,6 @@ type profileUpdate struct {
NewValue string `json:"new_value"` // The attribute's value after the update NewValue string `json:"new_value"` // The attribute's value after the update
} }
// NewUserUpdateProducer creates a new UserUpdateProducer
func NewUserUpdateProducer(kafkaURIs []string, topic string) (*UserUpdateProducer, error) {
producer, err := sarama.NewSyncProducer(kafkaURIs, nil)
if err != nil {
return nil, err
}
return &UserUpdateProducer{
Topic: topic,
Producer: producer,
}, nil
}
// SendUpdate sends an update using kafka to notify the roomserver of the // SendUpdate sends an update using kafka to notify the roomserver of the
// profile update. Returns an error if the update failed to send. // profile update. Returns an error if the update failed to send.
func (p *UserUpdateProducer) SendUpdate( func (p *UserUpdateProducer) SendUpdate(

View File

@ -33,6 +33,7 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
) )
var ( var (
@ -50,24 +51,28 @@ func main() {
log.Fatalf("Invalid config file: %s", err) log.Fatalf("Invalid config file: %s", err)
} }
log.Info("config: ", cfg)
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
aliasAPI := api.NewRoomserverAliasAPIHTTP(cfg.RoomServerURL(), nil) aliasAPI := api.NewRoomserverAliasAPIHTTP(cfg.RoomServerURL(), nil)
inputAPI := api.NewRoomserverInputAPIHTTP(cfg.RoomServerURL(), nil) inputAPI := api.NewRoomserverInputAPIHTTP(cfg.RoomServerURL(), nil)
roomserverProducer := producers.NewRoomserverProducer(inputAPI) roomserverProducer := producers.NewRoomserverProducer(inputAPI)
userUpdateProducer, err := producers.NewUserUpdateProducer(
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates), kafkaProducer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil)
)
if err != nil { if err != nil {
log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err) log.WithFields(log.Fields{
log.ErrorKey: err,
"addresses": cfg.Kafka.Addresses,
}).Panic("Failed to setup kafka producers")
} }
syncProducer, err := producers.NewSyncAPIProducer(
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.OutputClientData), userUpdateProducer := &producers.UserUpdateProducer{
) Producer: kafkaProducer,
if err != nil { Topic: string(cfg.Kafka.Topics.UserUpdates),
log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err) }
syncProducer := &producers.SyncAPIProducer{
Producer: kafkaProducer,
Topic: string(cfg.Kafka.Topics.OutputClientData),
} }
federation := gomatrixserverlib.NewFederationClient( federation := gomatrixserverlib.NewFederationClient(
@ -90,15 +95,20 @@ func main() {
keyRing := gomatrixserverlib.KeyRing{ keyRing := gomatrixserverlib.KeyRing{
KeyFetchers: []gomatrixserverlib.KeyFetcher{ KeyFetchers: []gomatrixserverlib.KeyFetcher{
// TODO: Use perspective key fetchers for production. // TODO: Use perspective key fetchers for production.
&gomatrixserverlib.DirectKeyFetcher{federation.Client}, &gomatrixserverlib.DirectKeyFetcher{Client: federation.Client},
}, },
KeyDatabase: keyDB, KeyDatabase: keyDB,
} }
consumer, err := consumers.NewOutputRoomEvent(cfg, accountDB) kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
if err != nil { if err != nil {
log.Panicf("startup: failed to create room server consumer: %s", err) log.WithFields(log.Fields{
log.ErrorKey: err,
"addresses": cfg.Kafka.Addresses,
}).Panic("Failed to setup kafka consumers")
} }
consumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, accountDB, queryAPI)
if err = consumer.Start(); err != nil { if err = consumer.Start(); err != nil {
log.Panicf("startup: failed to start room server consumer") log.Panicf("startup: failed to start room server consumer")
} }

View File

@ -25,9 +25,11 @@ import (
"github.com/matrix-org/dendrite/federationsender/consumers" "github.com/matrix-org/dendrite/federationsender/consumers"
"github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
) )
var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
@ -45,7 +47,15 @@ func main() {
log.Fatalf("Invalid config file: %s", err) log.Fatalf("Invalid config file: %s", err)
} }
log.Info("config: ", cfg) kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
if err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"addresses": cfg.Kafka.Addresses,
}).Panic("Failed to setup kafka consumers")
}
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
db, err := storage.NewDatabase(string(cfg.Database.FederationSender)) db, err := storage.NewDatabase(string(cfg.Database.FederationSender))
if err != nil { if err != nil {
@ -58,10 +68,7 @@ func main() {
queues := queue.NewOutgoingQueues(cfg.Matrix.ServerName, federation) queues := queue.NewOutgoingQueues(cfg.Matrix.ServerName, federation)
consumer, err := consumers.NewOutputRoomEvent(cfg, queues, db) consumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, queues, db, queryAPI)
if err != nil {
log.WithError(err).Panicf("startup: failed to create room server consumer")
}
if err = consumer.Start(); err != nil { if err = consumer.Start(); err != nil {
log.WithError(err).Panicf("startup: failed to start room server consumer") log.WithError(err).Panicf("startup: failed to start room server consumer")
} }

View File

@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/naffka"
mediaapi_routing "github.com/matrix-org/dendrite/mediaapi/routing" mediaapi_routing "github.com/matrix-org/dendrite/mediaapi/routing"
mediaapi_storage "github.com/matrix-org/dendrite/mediaapi/storage" mediaapi_storage "github.com/matrix-org/dendrite/mediaapi/storage"
@ -72,7 +73,7 @@ func main() {
if *configPath == "" { if *configPath == "" {
log.Fatal("--config must be supplied") log.Fatal("--config must be supplied")
} }
cfg, err := config.Load(*configPath) cfg, err := config.LoadMonolithic(*configPath)
if err != nil { if err != nil {
log.Fatalf("Invalid config file: %s", err) log.Fatalf("Invalid config file: %s", err)
} }
@ -80,6 +81,7 @@ func main() {
m := newMonolith(cfg) m := newMonolith(cfg)
m.setupDatabases() m.setupDatabases()
m.setupFederation() m.setupFederation()
m.setupKafka()
m.setupRoomServer() m.setupRoomServer()
m.setupProducers() m.setupProducers()
m.setupNotifiers() m.setupNotifiers()
@ -125,6 +127,9 @@ type monolith struct {
queryAPI *roomserver_query.RoomserverQueryAPI queryAPI *roomserver_query.RoomserverQueryAPI
aliasAPI *roomserver_alias.RoomserverAliasAPI aliasAPI *roomserver_alias.RoomserverAliasAPI
kafkaConsumer sarama.Consumer
kafkaProducer sarama.SyncProducer
roomServerProducer *producers.RoomserverProducer roomServerProducer *producers.RoomserverProducer
userUpdateProducer *producers.UserUpdateProducer userUpdateProducer *producers.UserUpdateProducer
syncProducer *producers.SyncAPIProducer syncProducer *producers.SyncAPIProducer
@ -182,15 +187,39 @@ func (m *monolith) setupFederation() {
} }
} }
func (m *monolith) setupRoomServer() { func (m *monolith) setupKafka() {
kafkaProducer, err := sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil) var err error
if err != nil { if m.cfg.Kafka.UseNaffka {
panic(err) naff, err := naffka.New(&naffka.MemoryDatabase{})
if err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
}).Panic("Failed to setup naffka")
}
m.kafkaConsumer = naff
m.kafkaProducer = naff
} else {
m.kafkaConsumer, err = sarama.NewConsumer(m.cfg.Kafka.Addresses, nil)
if err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"addresses": m.cfg.Kafka.Addresses,
}).Panic("Failed to setup kafka consumers")
}
m.kafkaProducer, err = sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil)
if err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"addresses": m.cfg.Kafka.Addresses,
}).Panic("Failed to setup kafka producers")
}
} }
}
func (m *monolith) setupRoomServer() {
m.inputAPI = &roomserver_input.RoomserverInputAPI{ m.inputAPI = &roomserver_input.RoomserverInputAPI{
DB: m.roomServerDB, DB: m.roomServerDB,
Producer: kafkaProducer, Producer: m.kafkaProducer,
OutputRoomEventTopic: string(m.cfg.Kafka.Topics.OutputRoomEvent), OutputRoomEventTopic: string(m.cfg.Kafka.Topics.OutputRoomEvent),
} }
@ -207,19 +236,14 @@ func (m *monolith) setupRoomServer() {
} }
func (m *monolith) setupProducers() { func (m *monolith) setupProducers() {
var err error
m.roomServerProducer = producers.NewRoomserverProducer(m.inputAPI) m.roomServerProducer = producers.NewRoomserverProducer(m.inputAPI)
m.userUpdateProducer, err = producers.NewUserUpdateProducer( m.userUpdateProducer = &producers.UserUpdateProducer{
m.cfg.Kafka.Addresses, string(m.cfg.Kafka.Topics.UserUpdates), Producer: m.kafkaProducer,
) Topic: string(m.cfg.Kafka.Topics.UserUpdates),
if err != nil {
log.Panicf("Failed to setup kafka producers(%q): %s", m.cfg.Kafka.Addresses, err)
} }
m.syncProducer, err = producers.NewSyncAPIProducer( m.syncProducer = &producers.SyncAPIProducer{
m.cfg.Kafka.Addresses, string(m.cfg.Kafka.Topics.OutputClientData), Producer: m.kafkaProducer,
) Topic: string(m.cfg.Kafka.Topics.OutputClientData),
if err != nil {
log.Panicf("Failed to setup kafka producers(%q): %s", m.cfg.Kafka.Addresses, err)
} }
} }
@ -236,42 +260,34 @@ func (m *monolith) setupNotifiers() {
} }
func (m *monolith) setupConsumers() { func (m *monolith) setupConsumers() {
clientAPIConsumer, err := clientapi_consumers.NewOutputRoomEvent(m.cfg, m.accountDB) var err error
if err != nil {
log.Panicf("startup: failed to create room server consumer: %s", err) clientAPIConsumer := clientapi_consumers.NewOutputRoomEvent(
} m.cfg, m.kafkaConsumer, m.accountDB, m.queryAPI,
)
if err = clientAPIConsumer.Start(); err != nil { if err = clientAPIConsumer.Start(); err != nil {
log.Panicf("startup: failed to start room server consumer") log.Panicf("startup: failed to start room server consumer")
} }
syncAPIRoomConsumer, err := syncapi_consumers.NewOutputRoomEvent( syncAPIRoomConsumer := syncapi_consumers.NewOutputRoomEvent(
m.cfg, m.syncAPINotifier, m.syncAPIDB, m.cfg, m.kafkaConsumer, m.syncAPINotifier, m.syncAPIDB, m.queryAPI,
) )
if err != nil {
log.Panicf("startup: failed to create room server consumer: %s", err)
}
if err = syncAPIRoomConsumer.Start(); err != nil { if err = syncAPIRoomConsumer.Start(); err != nil {
log.Panicf("startup: failed to start room server consumer: %s", err) log.Panicf("startup: failed to start room server consumer: %s", err)
} }
syncAPIClientConsumer, err := syncapi_consumers.NewOutputClientData( syncAPIClientConsumer := syncapi_consumers.NewOutputClientData(
m.cfg, m.syncAPINotifier, m.syncAPIDB, m.cfg, m.kafkaConsumer, m.syncAPINotifier, m.syncAPIDB,
) )
if err != nil {
log.Panicf("startup: failed to create client API server consumer: %s", err)
}
if err = syncAPIClientConsumer.Start(); err != nil { if err = syncAPIClientConsumer.Start(); err != nil {
log.Panicf("startup: failed to start client API server consumer: %s", err) log.Panicf("startup: failed to start client API server consumer: %s", err)
} }
federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation) federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation)
federationSenderRoomConsumer, err := federationsender_consumers.NewOutputRoomEvent( federationSenderRoomConsumer := federationsender_consumers.NewOutputRoomEvent(
m.cfg, federationSenderQueues, m.federationSenderDB, m.cfg, m.kafkaConsumer, federationSenderQueues, m.federationSenderDB, m.queryAPI,
) )
if err != nil {
log.WithError(err).Panicf("startup: failed to create room server consumer")
}
if err = federationSenderRoomConsumer.Start(); err != nil { if err = federationSenderRoomConsumer.Start(); err != nil {
log.WithError(err).Panicf("startup: failed to start room server consumer") log.WithError(err).Panicf("startup: failed to start room server consumer")
} }

View File

@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/consumers" "github.com/matrix-org/dendrite/syncapi/consumers"
"github.com/matrix-org/dendrite/syncapi/routing" "github.com/matrix-org/dendrite/syncapi/routing"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
@ -31,6 +32,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
) )
var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
@ -48,7 +50,7 @@ func main() {
log.Fatalf("Invalid config file: %s", err) log.Fatalf("Invalid config file: %s", err)
} }
log.Info("config: ", cfg) queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
db, err := storage.NewSyncServerDatabase(string(cfg.Database.SyncAPI)) db, err := storage.NewSyncServerDatabase(string(cfg.Database.SyncAPI))
if err != nil { if err != nil {
@ -74,17 +76,20 @@ func main() {
if err = n.Load(db); err != nil { if err = n.Load(db); err != nil {
log.Panicf("startup: failed to set up notifier: %s", err) log.Panicf("startup: failed to set up notifier: %s", err)
} }
roomConsumer, err := consumers.NewOutputRoomEvent(cfg, n, db)
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
if err != nil { if err != nil {
log.Panicf("startup: failed to create room server consumer: %s", err) log.WithFields(log.Fields{
log.ErrorKey: err,
"addresses": cfg.Kafka.Addresses,
}).Panic("Failed to setup kafka consumers")
} }
roomConsumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, n, db, queryAPI)
if err = roomConsumer.Start(); err != nil { if err = roomConsumer.Start(); err != nil {
log.Panicf("startup: failed to start room server consumer: %s", err) log.Panicf("startup: failed to start room server consumer: %s", err)
} }
clientConsumer, err := consumers.NewOutputClientData(cfg, n, db) clientConsumer := consumers.NewOutputClientData(cfg, kafkaConsumer, n, db)
if err != nil {
log.Panicf("startup: failed to create client API server consumer: %s", err)
}
if err = clientConsumer.Start(); err != nil { if err = clientConsumer.Start(); err != nil {
log.Panicf("startup: failed to start client API server consumer: %s", err) log.Panicf("startup: failed to start client API server consumer: %s", err)
} }

View File

@ -94,6 +94,11 @@ type Dendrite struct {
Kafka struct { Kafka struct {
// A list of kafka addresses to connect to. // A list of kafka addresses to connect to.
Addresses []string `yaml:"addresses"` Addresses []string `yaml:"addresses"`
// Whether to use naffka instead of kafka.
// Naffka can only be used when running dendrite as a single monolithic server.
// Kafka can be used both with a monolithic server and when running the
// components as separate servers.
UseNaffka bool `yaml:"use_naffka,omitempty"`
// The names of the topics to use when reading and writing from kafka. // The names of the topics to use when reading and writing from kafka.
Topics struct { Topics struct {
// Topic for roomserver/api.OutputRoomEvent events. // Topic for roomserver/api.OutputRoomEvent events.
@ -169,7 +174,10 @@ type ThumbnailSize struct {
ResizeMethod string `yaml:"method,omitempty"` ResizeMethod string `yaml:"method,omitempty"`
} }
// Load a yaml config file // Load a yaml config file for a server run as multiple processes.
// Checks the config to ensure that it is valid.
// The checks are different if the server is run as a monolithic process instead
// of being split into multiple components
func Load(configPath string) (*Dendrite, error) { func Load(configPath string) (*Dendrite, error) {
configData, err := ioutil.ReadFile(configPath) configData, err := ioutil.ReadFile(configPath)
if err != nil { if err != nil {
@ -181,7 +189,27 @@ func Load(configPath string) (*Dendrite, error) {
} }
// Pass the current working directory and ioutil.ReadFile so that they can // Pass the current working directory and ioutil.ReadFile so that they can
// be mocked in the tests // be mocked in the tests
return loadConfig(basePath, configData, ioutil.ReadFile) monolithic := false
return loadConfig(basePath, configData, ioutil.ReadFile, monolithic)
}
// LoadMonolithic loads a yaml config file for a server run as a single monolith.
// Checks the config to ensure that it is valid.
// The checks are different if the server is run as a monolithic process instead
// of being split into multiple components
func LoadMonolithic(configPath string) (*Dendrite, error) {
configData, err := ioutil.ReadFile(configPath)
if err != nil {
return nil, err
}
basePath, err := filepath.Abs(".")
if err != nil {
return nil, err
}
// Pass the current working directory and ioutil.ReadFile so that they can
// be mocked in the tests
monolithic := true
return loadConfig(basePath, configData, ioutil.ReadFile, monolithic)
} }
// An Error indicates a problem parsing the config. // An Error indicates a problem parsing the config.
@ -194,6 +222,7 @@ func loadConfig(
basePath string, basePath string,
configData []byte, configData []byte,
readFile func(string) ([]byte, error), readFile func(string) ([]byte, error),
monolithic bool,
) (*Dendrite, error) { ) (*Dendrite, error) {
var config Dendrite var config Dendrite
var err error var err error
@ -203,7 +232,7 @@ func loadConfig(
config.setDefaults() config.setDefaults()
if err = config.check(); err != nil { if err = config.check(monolithic); err != nil {
return nil, err return nil, err
} }
@ -259,7 +288,7 @@ func (e Error) Error() string {
) )
} }
func (config *Dendrite) check() error { func (config *Dendrite) check(monolithic bool) error {
var problems []string var problems []string
if config.Version != Version { if config.Version != Version {
@ -297,21 +326,32 @@ func (config *Dendrite) check() error {
checkPositive(fmt.Sprintf("media.thumbnail_sizes[%d].width", i), int64(size.Width)) checkPositive(fmt.Sprintf("media.thumbnail_sizes[%d].width", i), int64(size.Width))
checkPositive(fmt.Sprintf("media.thumbnail_sizes[%d].height", i), int64(size.Height)) checkPositive(fmt.Sprintf("media.thumbnail_sizes[%d].height", i), int64(size.Height))
} }
if config.Kafka.UseNaffka {
checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses))) if !monolithic {
problems = append(problems, fmt.Sprintf("naffka can only be used in a monolithic server"))
}
} else {
// If we aren't using naffka then we need to have at least one kafka
// server to talk to.
checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses)))
}
checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent)) checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
checkNotEmpty("kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData)) checkNotEmpty("kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData))
checkNotEmpty("kafka.topics.user_updates", string(config.Kafka.Topics.UserUpdates))
checkNotEmpty("database.account", string(config.Database.Account)) checkNotEmpty("database.account", string(config.Database.Account))
checkNotEmpty("database.device", string(config.Database.Device)) checkNotEmpty("database.device", string(config.Database.Device))
checkNotEmpty("database.server_key", string(config.Database.ServerKey)) checkNotEmpty("database.server_key", string(config.Database.ServerKey))
checkNotEmpty("database.media_api", string(config.Database.MediaAPI)) checkNotEmpty("database.media_api", string(config.Database.MediaAPI))
checkNotEmpty("database.sync_api", string(config.Database.SyncAPI)) checkNotEmpty("database.sync_api", string(config.Database.SyncAPI))
checkNotEmpty("database.room_server", string(config.Database.RoomServer)) checkNotEmpty("database.room_server", string(config.Database.RoomServer))
checkNotEmpty("listen.media_api", string(config.Listen.MediaAPI))
checkNotEmpty("listen.client_api", string(config.Listen.ClientAPI)) if !monolithic {
checkNotEmpty("listen.federation_api", string(config.Listen.FederationAPI)) checkNotEmpty("listen.media_api", string(config.Listen.MediaAPI))
checkNotEmpty("listen.sync_api", string(config.Listen.SyncAPI)) checkNotEmpty("listen.client_api", string(config.Listen.ClientAPI))
checkNotEmpty("listen.room_server", string(config.Listen.RoomServer)) checkNotEmpty("listen.federation_api", string(config.Listen.FederationAPI))
checkNotEmpty("listen.sync_api", string(config.Listen.SyncAPI))
checkNotEmpty("listen.room_server", string(config.Listen.RoomServer))
}
if problems != nil { if problems != nil {
return Error{problems} return Error{problems}

View File

@ -25,6 +25,7 @@ func TestLoadConfigRelative(t *testing.T) {
"/my/config/dir/matrix_key.pem": testKey, "/my/config/dir/matrix_key.pem": testKey,
"/my/config/dir/tls_cert.pem": testCert, "/my/config/dir/tls_cert.pem": testCert,
}.readFile, }.readFile,
false,
) )
if err != nil { if err != nil {
t.Error("failed to load config:", err) t.Error("failed to load config:", err)
@ -42,9 +43,9 @@ media:
kafka: kafka:
addresses: ["localhost:9092"] addresses: ["localhost:9092"]
topics: topics:
input_room_event: input.room
output_room_event: output.room output_room_event: output.room
output_client_data: output.client output_client_data: output.client
user_updates: output.user
database: database:
media_api: "postgresql:///media_api" media_api: "postgresql:///media_api"
account: "postgresql:///account" account: "postgresql:///account"

View File

@ -83,6 +83,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
// Make this configurable somehow? // Make this configurable somehow?
cfg.Kafka.Topics.OutputRoomEvent = "test.room.output" cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
cfg.Kafka.Topics.OutputClientData = "test.clientapi.output" cfg.Kafka.Topics.OutputClientData = "test.clientapi.output"
cfg.Kafka.Topics.UserUpdates = "test.user.output"
// TODO: Use different databases for the different schemas. // TODO: Use different databases for the different schemas.
// Using the same database for every schema currently works because // Using the same database for every schema currently works because

View File

@ -38,13 +38,13 @@ type OutputRoomEvent struct {
} }
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEvent(cfg *config.Dendrite, queues *queue.OutgoingQueues, store *storage.Database) (*OutputRoomEvent, error) { func NewOutputRoomEvent(
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) cfg *config.Dendrite,
if err != nil { kafkaConsumer sarama.Consumer,
return nil, err queues *queue.OutgoingQueues,
} store *storage.Database,
roomServerURL := cfg.RoomServerURL() queryAPI api.RoomserverQueryAPI,
) *OutputRoomEvent {
consumer := common.ContinualConsumer{ consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent), Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
Consumer: kafkaConsumer, Consumer: kafkaConsumer,
@ -54,11 +54,11 @@ func NewOutputRoomEvent(cfg *config.Dendrite, queues *queue.OutgoingQueues, stor
roomServerConsumer: &consumer, roomServerConsumer: &consumer,
db: store, db: store,
queues: queues, queues: queues,
query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil), query: queryAPI,
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
return s, nil return s
} }
// Start consuming from room servers // Start consuming from room servers

View File

@ -33,11 +33,12 @@ type OutputClientData struct {
} }
// NewOutputClientData creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. // NewOutputClientData creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
func NewOutputClientData(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputClientData, error) { func NewOutputClientData(
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) cfg *config.Dendrite,
if err != nil { kafkaConsumer sarama.Consumer,
return nil, err n *sync.Notifier,
} store *storage.SyncServerDatabase,
) *OutputClientData {
consumer := common.ContinualConsumer{ consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputClientData), Topic: string(cfg.Kafka.Topics.OutputClientData),
@ -51,7 +52,7 @@ func NewOutputClientData(cfg *config.Dendrite, n *sync.Notifier, store *storage.
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
return s, nil return s
} }
// Start consuming from room servers // Start consuming from room servers

View File

@ -44,12 +44,13 @@ type prevEventRef struct {
} }
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) { func NewOutputRoomEvent(
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) cfg *config.Dendrite,
if err != nil { kafkaConsumer sarama.Consumer,
return nil, err n *sync.Notifier,
} store *storage.SyncServerDatabase,
roomServerURL := cfg.RoomServerURL() queryAPI api.RoomserverQueryAPI,
) *OutputRoomEvent {
consumer := common.ContinualConsumer{ consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent), Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
@ -60,11 +61,11 @@ func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.S
roomServerConsumer: &consumer, roomServerConsumer: &consumer,
db: store, db: store,
notifier: n, notifier: n,
query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil), query: queryAPI,
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
return s, nil return s
} }
// Start consuming from room servers // Start consuming from room servers