dendrite/common/basecomponent/base.go
Kegsay a97b8eafd4
Add peer-to-peer support into Dendrite via libp2p and fetch (#880)
* Use a fork of pq which supports userCurrent on wasm

* Use sqlite3_js driver when running in JS

* Add cmd/dendritejs to pull in sqlite3_js driver for wasm only

* Update to latest go-sqlite-js version

* Replace prometheus with a stub. sigh

* Hard-code a config and don't use opentracing

* Latest go-sqlite3-js version

* Generate a key for now

* Listen for fetch traffic rather than HTTP

* Latest hacks for js

* libp2p support

* More libp2p

* Fork gjson to allow us to enforce auth checks as before

Previously, all events would come down redacted because the hash
checks would fail. They would fail because sjson.DeleteBytes didn't
remove keys not used for hashing. This didn't work because of a build
tag which included a file which no-oped the index returned.

See https://github.com/tidwall/gjson/issues/157

When it's resolved, let's go back to mainline.

* Use gjson@1.6.0 as it fixes https://github.com/tidwall/gjson/issues/157

* Use latest gomatrixserverlib for sig checks

* Fix a bug which could cause exclude_from_sync to not be set

Caused when sending events over federation.

* Use query variadic to make lookups actually work!

* Latest gomatrixserverlib

* Add notes on getting p2p up and running

Partly so I don't forget myself!

* refactor: Move p2p specific stuff to cmd/dendritejs

This is important or else the normal build of dendrite will fail
because the p2p libraries depend on syscall/js which doesn't work
on normal builds.

Also, clean up main.go to read a bit better.

* Update ho-http-js-libp2p to return errors from RoundTrip

* Add an LRU cache around the key DB

We actually need this for P2P because otherwise we can *segfault*
with things like: "runtime: unexpected return pc for runtime.handleEvent"
where the event is a `syscall/js` event, caused by spamming sql.js
caused by "Checking event signatures for 14 events of room state" which
hammers the key DB repeatedly in quick succession.

Using a cache fixes this, though the underlying cause is probably a bug
in the version of Go I'm on (1.13.7)

* breaking: Add Tracing.Enabled to toggle whether we do opentracing

Defaults to false, which is why this is a breaking change. We need
this flag because WASM builds cannot do opentracing.

* Start adding conditional builds for wasm to handle lib/pq

The general idea here is to have the wasm build have a `NewXXXDatabase`
that doesn't import any postgres package and hence we never import
`lib/pq`, which doesn't work under WASM (undefined `userCurrent`).

* Remove lib/pq for wasm for syncapi

* Add conditional building to remaining storage APIs

* Update build script to set env vars correctly for dendritejs

* sqlite bug fixes

* Docs

* Add a no-op main for dendritejs when not building under wasm

* Use the real prometheus, even for WASM

Instead, the dendrite-sw.js must mock out `process.pid` and
`fs.stat` - which must invoke the callback with an error (e.g `EINVAL`)
in order for it to work:

```
    global.process = {
        pid: 1,
    };
    global.fs.stat = function(path, cb) {
        cb({
            code: "EINVAL",
        });
    }
```

* Linting
2020-03-06 10:23:55 +00:00

251 lines
8.0 KiB
Go

// Copyright 2017 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package basecomponent
import (
"database/sql"
"io"
"net/http"
"net/url"
"golang.org/x/crypto/ed25519"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/naffka"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/common"
"github.com/gorilla/mux"
sarama "gopkg.in/Shopify/sarama.v1"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/common/config"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
typingServerAPI "github.com/matrix-org/dendrite/typingserver/api"
"github.com/sirupsen/logrus"
)
// BaseDendrite is a base for creating new instances of dendrite. It parses
// command line flags and config, and exposes methods for creating various
// resources. All errors are handled by logging then exiting, so all methods
// should only be used during start up.
// Must be closed when shutting down.
type BaseDendrite struct {
componentName string
tracerCloser io.Closer
// APIMux should be used to register new public matrix api endpoints
APIMux *mux.Router
Cfg *config.Dendrite
KafkaConsumer sarama.Consumer
KafkaProducer sarama.SyncProducer
}
// NewBaseDendrite creates a new instance to be used by a component.
// The componentName is used for logging purposes, and should be a friendly name
// of the compontent running, e.g. "SyncAPI"
func NewBaseDendrite(cfg *config.Dendrite, componentName string) *BaseDendrite {
common.SetupStdLogging()
common.SetupHookLogging(cfg.Logging, componentName)
closer, err := cfg.SetupTracing("Dendrite" + componentName)
if err != nil {
logrus.WithError(err).Panicf("failed to start opentracing")
}
var kafkaConsumer sarama.Consumer
var kafkaProducer sarama.SyncProducer
if cfg.Kafka.UseNaffka {
kafkaConsumer, kafkaProducer = setupNaffka(cfg)
} else {
kafkaConsumer, kafkaProducer = setupKafka(cfg)
}
return &BaseDendrite{
componentName: componentName,
tracerCloser: closer,
Cfg: cfg,
APIMux: mux.NewRouter().UseEncodedPath(),
KafkaConsumer: kafkaConsumer,
KafkaProducer: kafkaProducer,
}
}
// Close implements io.Closer
func (b *BaseDendrite) Close() error {
return b.tracerCloser.Close()
}
// CreateHTTPAppServiceAPIs returns the QueryAPI for hitting the appservice
// component over HTTP.
func (b *BaseDendrite) CreateHTTPAppServiceAPIs() appserviceAPI.AppServiceQueryAPI {
return appserviceAPI.NewAppServiceQueryAPIHTTP(b.Cfg.AppServiceURL(), nil)
}
// CreateHTTPRoomserverAPIs returns the AliasAPI, InputAPI and QueryAPI for hitting
// the roomserver over HTTP.
func (b *BaseDendrite) CreateHTTPRoomserverAPIs() (
roomserverAPI.RoomserverAliasAPI,
roomserverAPI.RoomserverInputAPI,
roomserverAPI.RoomserverQueryAPI,
) {
alias := roomserverAPI.NewRoomserverAliasAPIHTTP(b.Cfg.RoomServerURL(), nil)
input := roomserverAPI.NewRoomserverInputAPIHTTP(b.Cfg.RoomServerURL(), nil)
query := roomserverAPI.NewRoomserverQueryAPIHTTP(b.Cfg.RoomServerURL(), nil)
return alias, input, query
}
// CreateHTTPTypingServerAPIs returns typingInputAPI for hitting the typing
// server over HTTP
func (b *BaseDendrite) CreateHTTPTypingServerAPIs() typingServerAPI.TypingServerInputAPI {
return typingServerAPI.NewTypingServerInputAPIHTTP(b.Cfg.TypingServerURL(), nil)
}
// CreateHTTPFederationSenderAPIs returns FederationSenderQueryAPI for hitting
// the federation sender over HTTP
func (b *BaseDendrite) CreateHTTPFederationSenderAPIs() federationSenderAPI.FederationSenderQueryAPI {
return federationSenderAPI.NewFederationSenderQueryAPIHTTP(b.Cfg.FederationSenderURL(), nil)
}
// CreateDeviceDB creates a new instance of the device database. Should only be
// called once per component.
func (b *BaseDendrite) CreateDeviceDB() devices.Database {
db, err := devices.NewDatabase(string(b.Cfg.Database.Device), b.Cfg.Matrix.ServerName)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to devices db")
}
return db
}
// CreateAccountsDB creates a new instance of the accounts database. Should only
// be called once per component.
func (b *BaseDendrite) CreateAccountsDB() accounts.Database {
db, err := accounts.NewDatabase(string(b.Cfg.Database.Account), b.Cfg.Matrix.ServerName)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to accounts db")
}
return db
}
// CreateKeyDB creates a new instance of the key database. Should only be called
// once per component.
func (b *BaseDendrite) CreateKeyDB() keydb.Database {
db, err := keydb.NewDatabase(
string(b.Cfg.Database.ServerKey),
b.Cfg.Matrix.ServerName,
b.Cfg.Matrix.PrivateKey.Public().(ed25519.PublicKey),
b.Cfg.Matrix.KeyID,
)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to keys db")
}
return db
}
// CreateFederationClient creates a new federation client. Should only be called
// once per component.
func (b *BaseDendrite) CreateFederationClient() *gomatrixserverlib.FederationClient {
return gomatrixserverlib.NewFederationClient(
b.Cfg.Matrix.ServerName, b.Cfg.Matrix.KeyID, b.Cfg.Matrix.PrivateKey,
)
}
// SetupAndServeHTTP sets up the HTTP server to serve endpoints registered on
// ApiMux under /api/ and adds a prometheus handler under /metrics.
func (b *BaseDendrite) SetupAndServeHTTP(bindaddr string, listenaddr string) {
// If a separate bind address is defined, listen on that. Otherwise use
// the listen address
var addr string
if bindaddr != "" {
addr = bindaddr
} else {
addr = listenaddr
}
common.SetupHTTPAPI(http.DefaultServeMux, common.WrapHandlerInCORS(b.APIMux))
logrus.Infof("Starting %s server on %s", b.componentName, addr)
err := http.ListenAndServe(addr, nil)
if err != nil {
logrus.WithError(err).Fatal("failed to serve http")
}
logrus.Infof("Stopped %s server on %s", b.componentName, addr)
}
// setupKafka creates kafka consumer/producer pair from the config.
func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
consumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
if err != nil {
logrus.WithError(err).Panic("failed to start kafka consumer")
}
producer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil)
if err != nil {
logrus.WithError(err).Panic("failed to setup kafka producers")
}
return consumer, producer
}
// setupNaffka creates kafka consumer/producer pair from the config.
func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
var err error
var db *sql.DB
var naffkaDB *naffka.DatabaseImpl
uri, err := url.Parse(string(cfg.Database.Naffka))
if err != nil || uri.Scheme == "file" {
db, err = sql.Open(common.SQLiteDriverName(), string(cfg.Database.Naffka))
if err != nil {
logrus.WithError(err).Panic("Failed to open naffka database")
}
naffkaDB, err = naffka.NewSqliteDatabase(db)
if err != nil {
logrus.WithError(err).Panic("Failed to setup naffka database")
}
} else {
db, err = sql.Open("postgres", string(cfg.Database.Naffka))
if err != nil {
logrus.WithError(err).Panic("Failed to open naffka database")
}
naffkaDB, err = naffka.NewPostgresqlDatabase(db)
if err != nil {
logrus.WithError(err).Panic("Failed to setup naffka database")
}
}
if naffkaDB == nil {
panic("naffka connection string not understood")
}
naff, err := naffka.New(naffkaDB)
if err != nil {
logrus.WithError(err).Panic("Failed to setup naffka")
}
return naff, naff
}