mirror of
https://github.com/1f349/dendrite.git
synced 2025-01-21 23:06:32 +00:00
Remove current state server (#1405)
* Remove current state server Closes #1365 #1272 #1357 * Remove current state server from scripts/docs
This commit is contained in:
parent
8589f8373e
commit
c992f4f1f4
@ -133,17 +133,6 @@ client_api:
|
||||
turn_username: ""
|
||||
turn_password: ""
|
||||
|
||||
# Configuration for the Current State Server.
|
||||
current_state_server:
|
||||
internal_api:
|
||||
listen: http://0.0.0.0:7782
|
||||
connect: http://current_state_server:7782
|
||||
database:
|
||||
connection_string: postgresql://dendrite:itsasecret@postgres/dendrite_currentstate?sslmode=disable
|
||||
max_open_conns: 100
|
||||
max_idle_conns: 2
|
||||
conn_max_lifetime: -1
|
||||
|
||||
# Configuration for the EDU server.
|
||||
edu_server:
|
||||
internal_api:
|
||||
|
@ -43,17 +43,6 @@ services:
|
||||
networks:
|
||||
- internal
|
||||
|
||||
current_state_server:
|
||||
hostname: current_state_server
|
||||
image: matrixdotorg/dendrite:currentstateserver
|
||||
command: [
|
||||
"--config=dendrite.yaml"
|
||||
]
|
||||
volumes:
|
||||
- ./config:/etc/dendrite
|
||||
networks:
|
||||
- internal
|
||||
|
||||
sync_api:
|
||||
hostname: sync_api
|
||||
image: matrixdotorg/dendrite:syncapi
|
||||
|
@ -15,7 +15,6 @@ docker build -t matrixdotorg/dendrite:federationsender --build-arg component=de
|
||||
docker build -t matrixdotorg/dendrite:federationproxy --build-arg component=federation-api-proxy -f build/docker/Dockerfile.component .
|
||||
docker build -t matrixdotorg/dendrite:keyserver --build-arg component=dendrite-key-server -f build/docker/Dockerfile.component .
|
||||
docker build -t matrixdotorg/dendrite:mediaapi --build-arg component=dendrite-media-api-server -f build/docker/Dockerfile.component .
|
||||
docker build -t matrixdotorg/dendrite:currentstateserver --build-arg component=dendrite-current-state-server -f build/docker/Dockerfile.component .
|
||||
docker build -t matrixdotorg/dendrite:roomserver --build-arg component=dendrite-room-server -f build/docker/Dockerfile.component .
|
||||
docker build -t matrixdotorg/dendrite:syncapi --build-arg component=dendrite-sync-api-server -f build/docker/Dockerfile.component .
|
||||
docker build -t matrixdotorg/dendrite:serverkeyapi --build-arg component=dendrite-server-key-api-server -f build/docker/Dockerfile.component .
|
||||
|
@ -11,7 +11,6 @@ docker pull matrixdotorg/dendrite:federationsender
|
||||
docker pull matrixdotorg/dendrite:federationproxy
|
||||
docker pull matrixdotorg/dendrite:keyserver
|
||||
docker pull matrixdotorg/dendrite:mediaapi
|
||||
docker pull matrixdotorg/dendrite:currentstateserver
|
||||
docker pull matrixdotorg/dendrite:roomserver
|
||||
docker pull matrixdotorg/dendrite:syncapi
|
||||
docker pull matrixdotorg/dendrite:userapi
|
||||
|
@ -11,7 +11,6 @@ docker push matrixdotorg/dendrite:federationsender
|
||||
docker push matrixdotorg/dendrite:federationproxy
|
||||
docker push matrixdotorg/dendrite:keyserver
|
||||
docker push matrixdotorg/dendrite:mediaapi
|
||||
docker push matrixdotorg/dendrite:currentstateserver
|
||||
docker push matrixdotorg/dendrite:roomserver
|
||||
docker push matrixdotorg/dendrite:syncapi
|
||||
docker push matrixdotorg/dendrite:serverkeyapi
|
||||
|
@ -1,5 +1,5 @@
|
||||
#!/bin/bash
|
||||
|
||||
for db in account device mediaapi syncapi roomserver serverkey keyserver federationsender currentstate appservice e2ekey naffka; do
|
||||
for db in account device mediaapi syncapi roomserver serverkey keyserver federationsender appservice e2ekey naffka; do
|
||||
createdb -U dendrite -O dendrite dendrite_$db
|
||||
done
|
||||
|
@ -13,7 +13,6 @@ import (
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms"
|
||||
"github.com/matrix-org/dendrite/currentstateserver"
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/federationsender"
|
||||
@ -99,7 +98,6 @@ func (m *DendriteMonolith) Start() {
|
||||
cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-keyserver.db", m.StorageDirectory))
|
||||
cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-federationsender.db", m.StorageDirectory))
|
||||
cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-appservice.db", m.StorageDirectory))
|
||||
cfg.CurrentStateServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-currentstate.db", m.StorageDirectory))
|
||||
cfg.MediaAPI.BasePath = config.Path(fmt.Sprintf("%s/tmp", m.StorageDirectory))
|
||||
cfg.MediaAPI.AbsBasePath = config.Path(fmt.Sprintf("%s/tmp", m.StorageDirectory))
|
||||
cfg.FederationSender.FederationMaxRetries = 8
|
||||
@ -128,7 +126,6 @@ func (m *DendriteMonolith) Start() {
|
||||
)
|
||||
|
||||
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
||||
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
|
||||
fsAPI := federationsender.NewInternalAPI(
|
||||
base, federation, rsAPI, keyRing,
|
||||
)
|
||||
@ -163,7 +160,6 @@ func (m *DendriteMonolith) Start() {
|
||||
FederationSenderAPI: fsAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
UserAPI: userAPI,
|
||||
StateAPI: stateAPI,
|
||||
KeyAPI: keyAPI,
|
||||
ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider(
|
||||
ygg, fsAPI, federation,
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
"github.com/matrix-org/dendrite/clientapi/api"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/clientapi/routing"
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
@ -43,7 +42,6 @@ func AddPublicRoutes(
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
eduInputAPI eduServerAPI.EDUServerInputAPI,
|
||||
asAPI appserviceAPI.AppServiceQueryAPI,
|
||||
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||
transactionsCache *transactions.Cache,
|
||||
fsAPI federationSenderAPI.FederationSenderInternalAPI,
|
||||
userAPI userapi.UserInternalAPI,
|
||||
@ -58,6 +56,6 @@ func AddPublicRoutes(
|
||||
routing.Setup(
|
||||
router, cfg, eduInputAPI, rsAPI, asAPI,
|
||||
accountsDB, userAPI, federation,
|
||||
syncProducer, transactionsCache, fsAPI, stateAPI, keyAPI, extRoomsProvider,
|
||||
syncProducer, transactionsCache, fsAPI, keyAPI, extRoomsProvider,
|
||||
)
|
||||
}
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
@ -270,7 +269,7 @@ func GetVisibility(
|
||||
// SetVisibility implements PUT /directory/list/room/{roomID}
|
||||
// TODO: Allow admin users to edit the room visibility
|
||||
func SetVisibility(
|
||||
req *http.Request, stateAPI currentstateAPI.CurrentStateInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, dev *userapi.Device,
|
||||
req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, dev *userapi.Device,
|
||||
roomID string,
|
||||
) util.JSONResponse {
|
||||
resErr := checkMemberInRoom(req.Context(), rsAPI, dev.UserID, roomID)
|
||||
|
@ -23,7 +23,6 @@ import (
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
@ -94,7 +93,7 @@ func GetAvatarURL(
|
||||
// SetAvatarURL implements PUT /profile/{userID}/avatar_url
|
||||
// nolint:gocyclo
|
||||
func SetAvatarURL(
|
||||
req *http.Request, accountDB accounts.Database, stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||
req *http.Request, accountDB accounts.Database,
|
||||
device *userapi.Device, userID string, cfg *config.ClientAPI, rsAPI api.RoomserverInternalAPI,
|
||||
) util.JSONResponse {
|
||||
if userID != device.UserID {
|
||||
@ -212,7 +211,7 @@ func GetDisplayName(
|
||||
// SetDisplayName implements PUT /profile/{userID}/displayname
|
||||
// nolint:gocyclo
|
||||
func SetDisplayName(
|
||||
req *http.Request, accountDB accounts.Database, stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||
req *http.Request, accountDB accounts.Database,
|
||||
device *userapi.Device, userID string, cfg *config.ClientAPI, rsAPI api.RoomserverInternalAPI,
|
||||
) util.JSONResponse {
|
||||
if userID != device.UserID {
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
"github.com/matrix-org/dendrite/clientapi/auth"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
@ -56,7 +55,6 @@ func Setup(
|
||||
syncProducer *producers.SyncAPIProducer,
|
||||
transactionsCache *transactions.Cache,
|
||||
federationSender federationSenderAPI.FederationSenderInternalAPI,
|
||||
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||
keyAPI keyserverAPI.KeyInternalAPI,
|
||||
extRoomsProvider api.ExtraPublicRoomsProvider,
|
||||
) {
|
||||
@ -331,7 +329,7 @@ func Setup(
|
||||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return SetVisibility(req, stateAPI, rsAPI, device, vars["roomID"])
|
||||
return SetVisibility(req, rsAPI, device, vars["roomID"])
|
||||
}),
|
||||
).Methods(http.MethodPut, http.MethodOptions)
|
||||
r0mux.Handle("/publicRooms",
|
||||
@ -494,7 +492,7 @@ func Setup(
|
||||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return SetAvatarURL(req, accountDB, stateAPI, device, vars["userID"], cfg, rsAPI)
|
||||
return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, rsAPI)
|
||||
}),
|
||||
).Methods(http.MethodPut, http.MethodOptions)
|
||||
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
||||
@ -519,7 +517,7 @@ func Setup(
|
||||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return SetDisplayName(req, accountDB, stateAPI, device, vars["userID"], cfg, rsAPI)
|
||||
return SetDisplayName(req, accountDB, device, vars["userID"], cfg, rsAPI)
|
||||
}),
|
||||
).Methods(http.MethodPut, http.MethodOptions)
|
||||
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
||||
|
@ -34,12 +34,11 @@ func main() {
|
||||
fsAPI := base.FederationSenderHTTPClient()
|
||||
eduInputAPI := base.EDUServerClient()
|
||||
userAPI := base.UserAPIClient()
|
||||
stateAPI := base.CurrentStateAPIClient()
|
||||
keyAPI := base.KeyServerHTTPClient()
|
||||
|
||||
clientapi.AddPublicRoutes(
|
||||
base.PublicClientAPIMux, &base.Cfg.ClientAPI, base.KafkaProducer, accountDB, federation,
|
||||
rsAPI, eduInputAPI, asQuery, stateAPI, transactions.New(), fsAPI, userAPI, keyAPI, nil,
|
||||
rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil,
|
||||
)
|
||||
|
||||
base.SetupAndServeHTTP(
|
||||
|
@ -1,36 +0,0 @@
|
||||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/matrix-org/dendrite/currentstateserver"
|
||||
"github.com/matrix-org/dendrite/internal/setup"
|
||||
)
|
||||
|
||||
func main() {
|
||||
cfg := setup.ParseFlags(false)
|
||||
base := setup.NewBaseDendrite(cfg, "CurrentStateServer", true)
|
||||
defer base.Close() // nolint: errcheck
|
||||
|
||||
stateAPI := currentstateserver.NewInternalAPI(&cfg.CurrentStateServer, base.KafkaConsumer)
|
||||
|
||||
currentstateserver.AddInternalRoutes(base.InternalAPIMux, stateAPI)
|
||||
|
||||
base.SetupAndServeHTTP(
|
||||
base.Cfg.CurrentStateServer.InternalAPI.Listen,
|
||||
setup.NoExternalListener,
|
||||
nil, nil,
|
||||
)
|
||||
}
|
@ -29,7 +29,6 @@ import (
|
||||
p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery"
|
||||
"github.com/matrix-org/dendrite/appservice"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/embed"
|
||||
"github.com/matrix-org/dendrite/currentstateserver"
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/federationsender"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
@ -129,7 +128,6 @@ func main() {
|
||||
cfg.ServerKeyAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-serverkey.db", *instanceName))
|
||||
cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName))
|
||||
cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
|
||||
cfg.CurrentStateServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-currentstate.db", *instanceName))
|
||||
cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName))
|
||||
cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-e2ekey.db", *instanceName))
|
||||
if err = cfg.Derive(); err != nil {
|
||||
@ -153,7 +151,6 @@ func main() {
|
||||
base, serverKeyAPI,
|
||||
)
|
||||
|
||||
stateAPI := currentstateserver.NewInternalAPI(&base.Base.Cfg.CurrentStateServer, base.Base.KafkaConsumer)
|
||||
rsAPI := roomserver.NewInternalAPI(
|
||||
&base.Base, keyRing,
|
||||
)
|
||||
@ -165,7 +162,7 @@ func main() {
|
||||
&base.Base, federation, rsAPI, keyRing,
|
||||
)
|
||||
rsAPI.SetFederationSenderAPI(fsAPI)
|
||||
provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI, stateAPI)
|
||||
provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI)
|
||||
err = provider.Start()
|
||||
if err != nil {
|
||||
panic("failed to create new public rooms provider: " + err.Error())
|
||||
@ -185,7 +182,6 @@ func main() {
|
||||
FederationSenderAPI: fsAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
ServerKeyAPI: serverKeyAPI,
|
||||
StateAPI: stateAPI,
|
||||
UserAPI: userAPI,
|
||||
KeyAPI: keyAPI,
|
||||
ExtPublicRoomsProvider: provider,
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
@ -46,15 +45,13 @@ type publicRoomsProvider struct {
|
||||
maintenanceTimer *time.Timer //
|
||||
roomsAdvertised atomic.Value // stores int
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI
|
||||
stateAPI currentstateAPI.CurrentStateInternalAPI
|
||||
}
|
||||
|
||||
func newPublicRoomsProvider(ps *pubsub.PubSub, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI) *publicRoomsProvider {
|
||||
func newPublicRoomsProvider(ps *pubsub.PubSub, rsAPI roomserverAPI.RoomserverInternalAPI) *publicRoomsProvider {
|
||||
return &publicRoomsProvider{
|
||||
foundRooms: make(map[string]discoveredRoom),
|
||||
pubsub: ps,
|
||||
rsAPI: rsAPI,
|
||||
stateAPI: stateAPI,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -29,7 +29,6 @@ import (
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms"
|
||||
"github.com/matrix-org/dendrite/currentstateserver"
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/federationsender"
|
||||
@ -84,7 +83,6 @@ func main() {
|
||||
cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", *instanceName))
|
||||
cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName))
|
||||
cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
|
||||
cfg.CurrentStateServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-currentstate.db", *instanceName))
|
||||
cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName))
|
||||
if err = cfg.Derive(); err != nil {
|
||||
panic(err)
|
||||
@ -113,7 +111,6 @@ func main() {
|
||||
)
|
||||
|
||||
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
||||
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
|
||||
fsAPI := federationsender.NewInternalAPI(
|
||||
base, federation, rsAPI, keyRing,
|
||||
)
|
||||
@ -146,7 +143,6 @@ func main() {
|
||||
FederationSenderAPI: fsAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
UserAPI: userAPI,
|
||||
StateAPI: stateAPI,
|
||||
KeyAPI: keyAPI,
|
||||
ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider(
|
||||
ygg, fsAPI, federation,
|
||||
|
@ -35,7 +35,7 @@ func main() {
|
||||
federationapi.AddPublicRoutes(
|
||||
base.PublicFederationAPIMux, base.PublicKeyAPIMux,
|
||||
&base.Cfg.FederationAPI, userAPI, federation, keyRing,
|
||||
rsAPI, fsAPI, base.EDUServerClient(), base.CurrentStateAPIClient(), keyAPI,
|
||||
rsAPI, fsAPI, base.EDUServerClient(), keyAPI,
|
||||
)
|
||||
|
||||
base.SetupAndServeHTTP(
|
||||
|
@ -19,7 +19,6 @@ import (
|
||||
"os"
|
||||
|
||||
"github.com/matrix-org/dendrite/appservice"
|
||||
"github.com/matrix-org/dendrite/currentstateserver"
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/federationsender"
|
||||
@ -54,7 +53,6 @@ func main() {
|
||||
// itself.
|
||||
cfg.AppServiceAPI.InternalAPI.Connect = httpAddr
|
||||
cfg.ClientAPI.InternalAPI.Connect = httpAddr
|
||||
cfg.CurrentStateServer.InternalAPI.Connect = httpAddr
|
||||
cfg.EDUServer.InternalAPI.Connect = httpAddr
|
||||
cfg.FederationAPI.InternalAPI.Connect = httpAddr
|
||||
cfg.FederationSender.InternalAPI.Connect = httpAddr
|
||||
@ -95,8 +93,6 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
|
||||
|
||||
fsAPI := federationsender.NewInternalAPI(
|
||||
base, federation, rsAPI, keyRing,
|
||||
)
|
||||
@ -140,7 +136,6 @@ func main() {
|
||||
FederationSenderAPI: fsAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
ServerKeyAPI: serverKeyAPI,
|
||||
StateAPI: stateAPI,
|
||||
UserAPI: userAPI,
|
||||
KeyAPI: keyAPI,
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ func main() {
|
||||
|
||||
syncapi.AddPublicRoutes(
|
||||
base.PublicClientAPIMux, base.KafkaConsumer, userAPI, rsAPI,
|
||||
base.KeyServerHTTPClient(), base.CurrentStateAPIClient(),
|
||||
base.KeyServerHTTPClient(),
|
||||
federation, &cfg.SyncAPI,
|
||||
)
|
||||
|
||||
|
@ -23,7 +23,6 @@ import (
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/appservice"
|
||||
"github.com/matrix-org/dendrite/currentstateserver"
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/federationsender"
|
||||
@ -171,7 +170,6 @@ func main() {
|
||||
cfg.RoomServer.Database.ConnectionString = "file:/idb/dendritejs_roomserver.db"
|
||||
cfg.ServerKeyAPI.Database.ConnectionString = "file:/idb/dendritejs_serverkey.db"
|
||||
cfg.SyncAPI.Database.ConnectionString = "file:/idb/dendritejs_syncapi.db"
|
||||
cfg.CurrentStateServer.Database.ConnectionString = "file:/idb/dendritejs_currentstate.db"
|
||||
cfg.KeyServer.Database.ConnectionString = "file:/idb/dendritejs_e2ekey.db"
|
||||
cfg.Global.Kafka.UseNaffka = true
|
||||
cfg.Global.Kafka.Database.ConnectionString = "file:/idb/dendritejs_naffka.db"
|
||||
@ -204,7 +202,6 @@ func main() {
|
||||
KeyDatabase: fetcher,
|
||||
}
|
||||
|
||||
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
|
||||
rsAPI := roomserver.NewInternalAPI(base, keyRing)
|
||||
eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI)
|
||||
asQuery := appservice.NewInternalAPI(
|
||||
@ -227,7 +224,6 @@ func main() {
|
||||
EDUInternalAPI: eduInputAPI,
|
||||
FederationSenderAPI: fedSenderAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
StateAPI: stateAPI,
|
||||
UserAPI: userAPI,
|
||||
KeyAPI: keyAPI,
|
||||
//ServerKeyAPI: serverKeyAPI,
|
||||
|
@ -1,18 +0,0 @@
|
||||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package api
|
||||
|
||||
type CurrentStateInternalAPI interface {
|
||||
}
|
@ -1,147 +0,0 @@
|
||||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package consumers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type OutputRoomEventConsumer struct {
|
||||
rsConsumer *internal.ContinualConsumer
|
||||
db storage.Database
|
||||
}
|
||||
|
||||
func NewOutputRoomEventConsumer(topicName string, kafkaConsumer sarama.Consumer, store storage.Database) *OutputRoomEventConsumer {
|
||||
consumer := &internal.ContinualConsumer{
|
||||
ComponentName: "currentstateserver/roomserver",
|
||||
Topic: topicName,
|
||||
Consumer: kafkaConsumer,
|
||||
PartitionStore: store,
|
||||
}
|
||||
s := &OutputRoomEventConsumer{
|
||||
rsConsumer: consumer,
|
||||
db: store,
|
||||
}
|
||||
consumer.ProcessMessage = s.onMessage
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (c *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||
// Parse out the event JSON
|
||||
var output api.OutputEvent
|
||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||
return nil
|
||||
}
|
||||
|
||||
switch output.Type {
|
||||
case api.OutputTypeNewRoomEvent:
|
||||
return c.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
|
||||
case api.OutputTypeNewInviteEvent:
|
||||
case api.OutputTypeRetireInviteEvent:
|
||||
case api.OutputTypeRedactedEvent:
|
||||
return c.onRedactEvent(context.Background(), *output.RedactedEvent)
|
||||
default:
|
||||
log.WithField("type", output.Type).Debug(
|
||||
"roomserver output log: ignoring unknown output type",
|
||||
)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *OutputRoomEventConsumer) onNewRoomEvent(
|
||||
ctx context.Context, msg api.OutputNewRoomEvent,
|
||||
) error {
|
||||
ev := msg.Event
|
||||
|
||||
addsStateEvents := msg.AddsState()
|
||||
|
||||
ev, err := c.updateStateEvent(ev)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := range addsStateEvents {
|
||||
addsStateEvents[i], err = c.updateStateEvent(addsStateEvents[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err = c.db.StoreStateEvents(
|
||||
ctx,
|
||||
addsStateEvents,
|
||||
msg.RemovesStateEventIDs,
|
||||
)
|
||||
if err != nil {
|
||||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
"event": string(ev.JSON()),
|
||||
log.ErrorKey: err,
|
||||
"add": msg.AddsStateEventIDs,
|
||||
"del": msg.RemovesStateEventIDs,
|
||||
}).Panicf("roomserver output log: write event failure")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *OutputRoomEventConsumer) onRedactEvent(
|
||||
ctx context.Context, msg api.OutputRedactedEvent,
|
||||
) error {
|
||||
return c.db.RedactEvent(ctx, msg.RedactedEventID, msg.RedactedBecause)
|
||||
}
|
||||
|
||||
// Start consuming from room servers
|
||||
func (c *OutputRoomEventConsumer) Start() error {
|
||||
return c.rsConsumer.Start()
|
||||
}
|
||||
|
||||
func (c *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) {
|
||||
stateKey := ""
|
||||
if event.StateKey() != nil {
|
||||
stateKey = *event.StateKey()
|
||||
}
|
||||
|
||||
prevEvent, err := c.db.GetStateEvent(
|
||||
context.TODO(), event.RoomID(), event.Type(), stateKey,
|
||||
)
|
||||
if err != nil {
|
||||
return event, err
|
||||
}
|
||||
|
||||
if prevEvent == nil {
|
||||
return event, nil
|
||||
}
|
||||
|
||||
prev := types.PrevEventRef{
|
||||
PrevContent: prevEvent.Content(),
|
||||
ReplacesState: prevEvent.EventID(),
|
||||
PrevSender: prevEvent.Sender(),
|
||||
}
|
||||
|
||||
event.Event, err = event.SetUnsigned(prev)
|
||||
return event, err
|
||||
}
|
@ -1,51 +0,0 @@
|
||||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package currentstateserver
|
||||
|
||||
import (
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/consumers"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/internal"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/inthttp"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
|
||||
// on the given input API.
|
||||
func AddInternalRoutes(router *mux.Router, intAPI api.CurrentStateInternalAPI) {
|
||||
inthttp.AddRoutes(router, intAPI)
|
||||
}
|
||||
|
||||
// NewInternalAPI returns a concrete implementation of the internal API. Callers
|
||||
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
|
||||
func NewInternalAPI(cfg *config.CurrentStateServer, consumer sarama.Consumer) api.CurrentStateInternalAPI {
|
||||
csDB, err := storage.NewDatabase(&cfg.Database)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panicf("failed to open database")
|
||||
}
|
||||
roomConsumer := consumers.NewOutputRoomEventConsumer(
|
||||
cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent), consumer, csDB,
|
||||
)
|
||||
if err = roomConsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start room server consumer")
|
||||
}
|
||||
return &internal.CurrentStateInternalAPI{
|
||||
DB: csDB,
|
||||
}
|
||||
}
|
@ -1,23 +0,0 @@
|
||||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package internal
|
||||
|
||||
import (
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage"
|
||||
)
|
||||
|
||||
type CurrentStateInternalAPI struct {
|
||||
DB storage.Database
|
||||
}
|
@ -1,48 +0,0 @@
|
||||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package inthttp
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/http"
|
||||
|
||||
"github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
)
|
||||
|
||||
// HTTP paths for the internal HTTP APIs
|
||||
const (
|
||||
QueryRoomsForUserPath = "/currentstateserver/queryRoomsForUser"
|
||||
QueryBulkStateContentPath = "/currentstateserver/queryBulkStateContent"
|
||||
)
|
||||
|
||||
// NewCurrentStateAPIClient creates a CurrentStateInternalAPI implemented by talking to a HTTP POST API.
|
||||
// If httpClient is nil an error is returned
|
||||
func NewCurrentStateAPIClient(
|
||||
apiURL string,
|
||||
httpClient *http.Client,
|
||||
) (api.CurrentStateInternalAPI, error) {
|
||||
if httpClient == nil {
|
||||
return nil, errors.New("NewCurrentStateAPIClient: httpClient is <nil>")
|
||||
}
|
||||
return &httpCurrentStateInternalAPI{
|
||||
apiURL: apiURL,
|
||||
httpClient: httpClient,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type httpCurrentStateInternalAPI struct {
|
||||
apiURL string
|
||||
httpClient *http.Client
|
||||
}
|
@ -1,24 +0,0 @@
|
||||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package inthttp
|
||||
|
||||
import (
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
)
|
||||
|
||||
func AddRoutes(internalAPIMux *mux.Router, intAPI api.CurrentStateInternalAPI) {
|
||||
|
||||
}
|
@ -1,46 +0,0 @@
|
||||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
type Database interface {
|
||||
internal.PartitionStorer
|
||||
// StoreStateEvents updates the database with new events from the roomserver.
|
||||
StoreStateEvents(ctx context.Context, addStateEvents []gomatrixserverlib.HeaderedEvent, removeStateEventIDs []string) error
|
||||
// GetStateEvent returns the state event of a given type for a given room with a given state key
|
||||
// If no event could be found, returns nil
|
||||
// If there was an issue during the retrieval, returns an error
|
||||
GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
|
||||
// GetRoomsByMembership returns a list of room IDs matching the provided membership and user ID (as state_key).
|
||||
GetRoomsByMembership(ctx context.Context, userID, membership string) ([]string, error)
|
||||
// GetBulkStateContent returns all state events which match a given room ID and a given state key tuple. Both must be satisfied for a match.
|
||||
// If a tuple has the StateKey of '*' and allowWildcards=true then all state events with the EventType should be returned.
|
||||
GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]tables.StrippedEvent, error)
|
||||
// Redact a state event
|
||||
RedactEvent(ctx context.Context, redactedEventID string, redactedBecause gomatrixserverlib.HeaderedEvent) error
|
||||
// JoinedUsersSetInRooms returns all joined users in the rooms given, along with the count of how many times they appear.
|
||||
JoinedUsersSetInRooms(ctx context.Context, roomIDs []string) (map[string]int, error)
|
||||
// GetKnownUsers searches all users that userID knows about.
|
||||
GetKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error)
|
||||
// GetKnownRooms returns a list of all rooms we know about.
|
||||
GetKnownRooms(ctx context.Context) ([]string, error)
|
||||
}
|
@ -1,351 +0,0 @@
|
||||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
var currentRoomStateSchema = `
|
||||
-- Stores the current room state for every room.
|
||||
CREATE TABLE IF NOT EXISTS currentstate_current_room_state (
|
||||
-- The 'room_id' key for the state event.
|
||||
room_id TEXT NOT NULL,
|
||||
-- The state event ID
|
||||
event_id TEXT NOT NULL,
|
||||
-- The state event type e.g 'm.room.member'
|
||||
type TEXT NOT NULL,
|
||||
-- The 'sender' property of the event.
|
||||
sender TEXT NOT NULL,
|
||||
-- The state_key value for this state event e.g ''
|
||||
state_key TEXT NOT NULL,
|
||||
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
|
||||
headered_event_json TEXT NOT NULL,
|
||||
-- A piece of extracted content e.g membership for m.room.member events
|
||||
content_value TEXT NOT NULL DEFAULT '',
|
||||
-- Clobber based on 3-uple of room_id, type and state_key
|
||||
CONSTRAINT currentstate_current_room_state_unique UNIQUE (room_id, type, state_key)
|
||||
);
|
||||
-- for event deletion
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS currentstate_event_id_idx ON currentstate_current_room_state(event_id, room_id, type, sender);
|
||||
-- for querying membership states of users
|
||||
CREATE INDEX IF NOT EXISTS currentstate_membership_idx ON currentstate_current_room_state(type, state_key, content_value)
|
||||
WHERE type='m.room.member' AND content_value IS NOT NULL AND content_value != 'leave';
|
||||
`
|
||||
|
||||
const upsertRoomStateSQL = "" +
|
||||
"INSERT INTO currentstate_current_room_state (room_id, event_id, type, sender, state_key, headered_event_json, content_value)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6, $7)" +
|
||||
" ON CONFLICT ON CONSTRAINT currentstate_current_room_state_unique" +
|
||||
" DO UPDATE SET event_id = $2, sender=$4, headered_event_json = $6, content_value = $7"
|
||||
|
||||
const deleteRoomStateByEventIDSQL = "" +
|
||||
"DELETE FROM currentstate_current_room_state WHERE event_id = $1"
|
||||
|
||||
const selectRoomIDsWithMembershipSQL = "" +
|
||||
"SELECT room_id FROM currentstate_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND content_value = $2"
|
||||
|
||||
const selectStateEventSQL = "" +
|
||||
"SELECT headered_event_json FROM currentstate_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
|
||||
|
||||
const selectEventsWithEventIDsSQL = "" +
|
||||
"SELECT headered_event_json FROM currentstate_current_room_state WHERE event_id = ANY($1)"
|
||||
|
||||
const selectBulkStateContentSQL = "" +
|
||||
"SELECT room_id, type, state_key, content_value FROM currentstate_current_room_state WHERE room_id = ANY($1) AND type = ANY($2) AND state_key = ANY($3)"
|
||||
|
||||
const selectBulkStateContentWildSQL = "" +
|
||||
"SELECT room_id, type, state_key, content_value FROM currentstate_current_room_state WHERE room_id = ANY($1) AND type = ANY($2)"
|
||||
|
||||
const selectJoinedUsersSetForRoomsSQL = "" +
|
||||
"SELECT state_key, COUNT(room_id) FROM currentstate_current_room_state WHERE room_id = ANY($1) AND" +
|
||||
" type = 'm.room.member' and content_value = 'join' GROUP BY state_key"
|
||||
|
||||
const selectKnownRoomsSQL = "" +
|
||||
"SELECT DISTINCT room_id FROM currentstate_current_room_state"
|
||||
|
||||
// selectKnownUsersSQL uses a sub-select statement here to find rooms that the user is
|
||||
// joined to. Since this information is used to populate the user directory, we will
|
||||
// only return users that the user would ordinarily be able to see anyway.
|
||||
const selectKnownUsersSQL = "" +
|
||||
"SELECT DISTINCT state_key FROM currentstate_current_room_state WHERE room_id = ANY(" +
|
||||
" SELECT DISTINCT room_id FROM currentstate_current_room_state WHERE state_key=$1 AND TYPE='m.room.member' AND content_value='join'" +
|
||||
") AND TYPE='m.room.member' AND content_value='join' AND state_key LIKE $2 LIMIT $3"
|
||||
|
||||
type currentRoomStateStatements struct {
|
||||
upsertRoomStateStmt *sql.Stmt
|
||||
deleteRoomStateByEventIDStmt *sql.Stmt
|
||||
selectRoomIDsWithMembershipStmt *sql.Stmt
|
||||
selectEventsWithEventIDsStmt *sql.Stmt
|
||||
selectStateEventStmt *sql.Stmt
|
||||
selectBulkStateContentStmt *sql.Stmt
|
||||
selectBulkStateContentWildStmt *sql.Stmt
|
||||
selectJoinedUsersSetForRoomsStmt *sql.Stmt
|
||||
selectKnownRoomsStmt *sql.Stmt
|
||||
selectKnownUsersStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) {
|
||||
s := ¤tRoomStateStatements{}
|
||||
_, err := db.Exec(currentRoomStateSchema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectBulkStateContentStmt, err = db.Prepare(selectBulkStateContentSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectBulkStateContentWildStmt, err = db.Prepare(selectBulkStateContentWildSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectJoinedUsersSetForRoomsStmt, err = db.Prepare(selectJoinedUsersSetForRoomsSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectKnownRoomsStmt, err = db.Prepare(selectKnownRoomsSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectKnownUsersStmt, err = db.Prepare(selectKnownUsersSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectJoinedUsersSetForRooms(ctx context.Context, roomIDs []string) (map[string]int, error) {
|
||||
rows, err := s.selectJoinedUsersSetForRoomsStmt.QueryContext(ctx, pq.StringArray(roomIDs))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedUsersSetForRooms: rows.close() failed")
|
||||
result := make(map[string]int)
|
||||
for rows.Next() {
|
||||
var userID string
|
||||
var count int
|
||||
if err := rows.Scan(&userID, &count); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result[userID] = count
|
||||
}
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
||||
func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
|
||||
ctx context.Context,
|
||||
txn *sql.Tx,
|
||||
userID string,
|
||||
contentVal string,
|
||||
) ([]string, error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectRoomIDsWithMembershipStmt)
|
||||
rows, err := stmt.QueryContext(ctx, userID, contentVal)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomIDsWithMembership: rows.close() failed")
|
||||
|
||||
var result []string
|
||||
for rows.Next() {
|
||||
var roomID string
|
||||
if err := rows.Scan(&roomID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, roomID)
|
||||
}
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) DeleteRoomStateByEventID(
|
||||
ctx context.Context, txn *sql.Tx, eventID string,
|
||||
) error {
|
||||
stmt := sqlutil.TxStmt(txn, s.deleteRoomStateByEventIDStmt)
|
||||
_, err := stmt.ExecContext(ctx, eventID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) UpsertRoomState(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
event gomatrixserverlib.HeaderedEvent, contentVal string,
|
||||
) error {
|
||||
headeredJSON, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// upsert state event
|
||||
stmt := sqlutil.TxStmt(txn, s.upsertRoomStateStmt)
|
||||
_, err = stmt.ExecContext(
|
||||
ctx,
|
||||
event.RoomID(),
|
||||
event.EventID(),
|
||||
event.Type(),
|
||||
event.Sender(),
|
||||
*event.StateKey(),
|
||||
headeredJSON,
|
||||
contentVal,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
|
||||
ctx context.Context, txn *sql.Tx, eventIDs []string,
|
||||
) ([]gomatrixserverlib.HeaderedEvent, error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectEventsWithEventIDsStmt)
|
||||
rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectEventsWithEventIDs: rows.close() failed")
|
||||
result := []gomatrixserverlib.HeaderedEvent{}
|
||||
for rows.Next() {
|
||||
var eventBytes []byte
|
||||
if err := rows.Scan(&eventBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err := json.Unmarshal(eventBytes, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, ev)
|
||||
}
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectStateEvent(
|
||||
ctx context.Context, roomID, evType, stateKey string,
|
||||
) (*gomatrixserverlib.HeaderedEvent, error) {
|
||||
stmt := s.selectStateEventStmt
|
||||
var res []byte
|
||||
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err = json.Unmarshal(res, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ev, err
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectBulkStateContent(
|
||||
ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool,
|
||||
) ([]tables.StrippedEvent, error) {
|
||||
hasWildcards := false
|
||||
eventTypeSet := make(map[string]bool)
|
||||
stateKeySet := make(map[string]bool)
|
||||
var eventTypes []string
|
||||
var stateKeys []string
|
||||
for _, tuple := range tuples {
|
||||
if !eventTypeSet[tuple.EventType] {
|
||||
eventTypeSet[tuple.EventType] = true
|
||||
eventTypes = append(eventTypes, tuple.EventType)
|
||||
}
|
||||
if !stateKeySet[tuple.StateKey] {
|
||||
stateKeySet[tuple.StateKey] = true
|
||||
stateKeys = append(stateKeys, tuple.StateKey)
|
||||
}
|
||||
if tuple.StateKey == "*" {
|
||||
hasWildcards = true
|
||||
}
|
||||
}
|
||||
var rows *sql.Rows
|
||||
var err error
|
||||
if hasWildcards && allowWildcards {
|
||||
rows, err = s.selectBulkStateContentWildStmt.QueryContext(ctx, pq.StringArray(roomIDs), pq.StringArray(eventTypes))
|
||||
} else {
|
||||
rows, err = s.selectBulkStateContentStmt.QueryContext(
|
||||
ctx, pq.StringArray(roomIDs), pq.StringArray(eventTypes), pq.StringArray(stateKeys),
|
||||
)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
strippedEvents := []tables.StrippedEvent{}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "SelectBulkStateContent: rows.close() failed")
|
||||
for rows.Next() {
|
||||
var roomID string
|
||||
var eventType string
|
||||
var stateKey string
|
||||
var contentVal string
|
||||
if err = rows.Scan(&roomID, &eventType, &stateKey, &contentVal); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
strippedEvents = append(strippedEvents, tables.StrippedEvent{
|
||||
RoomID: roomID,
|
||||
ContentValue: contentVal,
|
||||
EventType: eventType,
|
||||
StateKey: stateKey,
|
||||
})
|
||||
}
|
||||
return strippedEvents, rows.Err()
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error) {
|
||||
rows, err := s.selectKnownUsersStmt.QueryContext(ctx, userID, fmt.Sprintf("%%%s%%", searchString), limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := []string{}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "SelectKnownUsers: rows.close() failed")
|
||||
for rows.Next() {
|
||||
var userID string
|
||||
if err := rows.Scan(&userID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, userID)
|
||||
}
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectKnownRooms(ctx context.Context) ([]string, error) {
|
||||
rows, err := s.selectKnownRoomsStmt.QueryContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := []string{}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "SelectKnownRooms: rows.close() failed")
|
||||
for rows.Next() {
|
||||
var roomID string
|
||||
if err := rows.Scan(&roomID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, roomID)
|
||||
}
|
||||
return result, rows.Err()
|
||||
}
|
@ -1,39 +0,0 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage/shared"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
)
|
||||
|
||||
type Database struct {
|
||||
shared.Database
|
||||
db *sql.DB
|
||||
writer sqlutil.Writer
|
||||
sqlutil.PartitionOffsetStatements
|
||||
}
|
||||
|
||||
// NewDatabase creates a new sync server database
|
||||
func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||
var d Database
|
||||
var err error
|
||||
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
d.writer = sqlutil.NewDummyWriter()
|
||||
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "currentstate"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currRoomState, err := NewPostgresCurrentRoomStateTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
d.Database = shared.Database{
|
||||
DB: d.db,
|
||||
Writer: d.writer,
|
||||
CurrentRoomState: currRoomState,
|
||||
}
|
||||
return &d, nil
|
||||
}
|
@ -1,100 +0,0 @@
|
||||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package shared
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
type Database struct {
|
||||
DB *sql.DB
|
||||
Writer sqlutil.Writer
|
||||
CurrentRoomState tables.CurrentRoomState
|
||||
}
|
||||
|
||||
func (d *Database) GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error) {
|
||||
return d.CurrentRoomState.SelectStateEvent(ctx, roomID, evType, stateKey)
|
||||
}
|
||||
|
||||
func (d *Database) GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]tables.StrippedEvent, error) {
|
||||
return d.CurrentRoomState.SelectBulkStateContent(ctx, roomIDs, tuples, allowWildcards)
|
||||
}
|
||||
|
||||
func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause gomatrixserverlib.HeaderedEvent) error {
|
||||
events, err := d.CurrentRoomState.SelectEventsWithEventIDs(ctx, nil, []string{redactedEventID})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(events) != 1 {
|
||||
// this will happen for all non-state events
|
||||
return nil
|
||||
}
|
||||
redactionEvent := redactedBecause.Unwrap()
|
||||
eventBeingRedacted := events[0].Unwrap()
|
||||
redactedEvent, err := eventutil.RedactEvent(&redactionEvent, &eventBeingRedacted)
|
||||
if err != nil {
|
||||
return fmt.Errorf("RedactEvent failed: %w", err)
|
||||
}
|
||||
// replace the state event with a redacted version of itself
|
||||
return d.StoreStateEvents(ctx, []gomatrixserverlib.HeaderedEvent{redactedEvent.Headered(redactedBecause.RoomVersion)}, []string{redactedEventID})
|
||||
}
|
||||
|
||||
func (d *Database) StoreStateEvents(ctx context.Context, addStateEvents []gomatrixserverlib.HeaderedEvent,
|
||||
removeStateEventIDs []string) error {
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
|
||||
for _, eventID := range removeStateEventIDs {
|
||||
if err := d.CurrentRoomState.DeleteRoomStateByEventID(ctx, txn, eventID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, event := range addStateEvents {
|
||||
if event.StateKey() == nil {
|
||||
// ignore non state events
|
||||
continue
|
||||
}
|
||||
contentVal := tables.ExtractContentValue(&event)
|
||||
|
||||
if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, contentVal); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Database) GetRoomsByMembership(ctx context.Context, userID, membership string) ([]string, error) {
|
||||
return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, nil, userID, membership)
|
||||
}
|
||||
|
||||
func (d *Database) JoinedUsersSetInRooms(ctx context.Context, roomIDs []string) (map[string]int, error) {
|
||||
return d.CurrentRoomState.SelectJoinedUsersSetForRooms(ctx, roomIDs)
|
||||
}
|
||||
|
||||
func (d *Database) GetKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error) {
|
||||
return d.CurrentRoomState.SelectKnownUsers(ctx, userID, searchString, limit)
|
||||
}
|
||||
|
||||
func (d *Database) GetKnownRooms(ctx context.Context) ([]string, error) {
|
||||
return d.CurrentRoomState.SelectKnownRooms(ctx)
|
||||
}
|
@ -1,365 +0,0 @@
|
||||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const currentRoomStateSchema = `
|
||||
-- Stores the current room state for every room.
|
||||
CREATE TABLE IF NOT EXISTS currentstate_current_room_state (
|
||||
room_id TEXT NOT NULL,
|
||||
event_id TEXT NOT NULL,
|
||||
type TEXT NOT NULL,
|
||||
sender TEXT NOT NULL,
|
||||
state_key TEXT NOT NULL,
|
||||
headered_event_json TEXT NOT NULL,
|
||||
content_value TEXT NOT NULL DEFAULT '',
|
||||
UNIQUE (room_id, type, state_key)
|
||||
);
|
||||
-- for event deletion
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS currentstate_event_id_idx ON currentstate_current_room_state(event_id, room_id, type, sender);
|
||||
`
|
||||
|
||||
const upsertRoomStateSQL = "" +
|
||||
"INSERT INTO currentstate_current_room_state (room_id, event_id, type, sender, state_key, headered_event_json, content_value)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6, $7)" +
|
||||
" ON CONFLICT (event_id, room_id, type, sender)" +
|
||||
" DO UPDATE SET event_id = $2, sender=$4, headered_event_json = $6, content_value = $7"
|
||||
|
||||
const deleteRoomStateByEventIDSQL = "" +
|
||||
"DELETE FROM currentstate_current_room_state WHERE event_id = $1"
|
||||
|
||||
const selectRoomIDsWithMembershipSQL = "" +
|
||||
"SELECT room_id FROM currentstate_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND content_value = $2"
|
||||
|
||||
const selectStateEventSQL = "" +
|
||||
"SELECT headered_event_json FROM currentstate_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
|
||||
|
||||
const selectEventsWithEventIDsSQL = "" +
|
||||
"SELECT headered_event_json FROM currentstate_current_room_state WHERE event_id IN ($1)"
|
||||
|
||||
const selectBulkStateContentSQL = "" +
|
||||
"SELECT room_id, type, state_key, content_value FROM currentstate_current_room_state WHERE room_id IN ($1) AND type IN ($2) AND state_key IN ($3)"
|
||||
|
||||
const selectBulkStateContentWildSQL = "" +
|
||||
"SELECT room_id, type, state_key, content_value FROM currentstate_current_room_state WHERE room_id IN ($1) AND type IN ($2)"
|
||||
|
||||
const selectJoinedUsersSetForRoomsSQL = "" +
|
||||
"SELECT state_key, COUNT(room_id) FROM currentstate_current_room_state WHERE room_id IN ($1) AND type = 'm.room.member' and content_value = 'join' GROUP BY state_key"
|
||||
|
||||
const selectKnownRoomsSQL = "" +
|
||||
"SELECT DISTINCT room_id FROM currentstate_current_room_state"
|
||||
|
||||
// selectKnownUsersSQL uses a sub-select statement here to find rooms that the user is
|
||||
// joined to. Since this information is used to populate the user directory, we will
|
||||
// only return users that the user would ordinarily be able to see anyway.
|
||||
const selectKnownUsersSQL = "" +
|
||||
"SELECT DISTINCT state_key FROM currentstate_current_room_state WHERE room_id IN (" +
|
||||
" SELECT DISTINCT room_id FROM currentstate_current_room_state WHERE state_key=$1 AND TYPE='m.room.member' AND content_value='join'" +
|
||||
") AND TYPE='m.room.member' AND content_value='join' AND state_key LIKE $2 LIMIT $3"
|
||||
|
||||
type currentRoomStateStatements struct {
|
||||
db *sql.DB
|
||||
upsertRoomStateStmt *sql.Stmt
|
||||
deleteRoomStateByEventIDStmt *sql.Stmt
|
||||
selectRoomIDsWithMembershipStmt *sql.Stmt
|
||||
selectStateEventStmt *sql.Stmt
|
||||
selectJoinedUsersSetForRoomsStmt *sql.Stmt
|
||||
selectKnownRoomsStmt *sql.Stmt
|
||||
selectKnownUsersStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewSqliteCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) {
|
||||
s := ¤tRoomStateStatements{
|
||||
db: db,
|
||||
}
|
||||
_, err := db.Exec(currentRoomStateSchema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectJoinedUsersSetForRoomsStmt, err = db.Prepare(selectJoinedUsersSetForRoomsSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectKnownRoomsStmt, err = db.Prepare(selectKnownRoomsSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectKnownUsersStmt, err = db.Prepare(selectKnownUsersSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectJoinedUsersSetForRooms(ctx context.Context, roomIDs []string) (map[string]int, error) {
|
||||
iRoomIDs := make([]interface{}, len(roomIDs))
|
||||
for i, v := range roomIDs {
|
||||
iRoomIDs[i] = v
|
||||
}
|
||||
query := strings.Replace(selectJoinedUsersSetForRoomsSQL, "($1)", sqlutil.QueryVariadic(len(iRoomIDs)), 1)
|
||||
rows, err := s.db.QueryContext(ctx, query, iRoomIDs...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedUsersSetForRooms: rows.close() failed")
|
||||
result := make(map[string]int)
|
||||
for rows.Next() {
|
||||
var userID string
|
||||
var count int
|
||||
if err := rows.Scan(&userID, &count); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result[userID] = count
|
||||
}
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
||||
func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
|
||||
ctx context.Context,
|
||||
txn *sql.Tx,
|
||||
userID string,
|
||||
membership string,
|
||||
) ([]string, error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectRoomIDsWithMembershipStmt)
|
||||
rows, err := stmt.QueryContext(ctx, userID, membership)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomIDsWithMembership: rows.close() failed")
|
||||
|
||||
var result []string
|
||||
for rows.Next() {
|
||||
var roomID string
|
||||
if err := rows.Scan(&roomID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, roomID)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) DeleteRoomStateByEventID(
|
||||
ctx context.Context, txn *sql.Tx, eventID string,
|
||||
) error {
|
||||
stmt := sqlutil.TxStmt(txn, s.deleteRoomStateByEventIDStmt)
|
||||
_, err := stmt.ExecContext(ctx, eventID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) UpsertRoomState(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
event gomatrixserverlib.HeaderedEvent, contentVal string,
|
||||
) error {
|
||||
headeredJSON, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// upsert state event
|
||||
stmt := sqlutil.TxStmt(txn, s.upsertRoomStateStmt)
|
||||
_, err = stmt.ExecContext(
|
||||
ctx,
|
||||
event.RoomID(),
|
||||
event.EventID(),
|
||||
event.Type(),
|
||||
event.Sender(),
|
||||
*event.StateKey(),
|
||||
headeredJSON,
|
||||
contentVal,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
|
||||
ctx context.Context, txn *sql.Tx, eventIDs []string,
|
||||
) ([]gomatrixserverlib.HeaderedEvent, error) {
|
||||
iEventIDs := make([]interface{}, len(eventIDs))
|
||||
for k, v := range eventIDs {
|
||||
iEventIDs[k] = v
|
||||
}
|
||||
query := strings.Replace(selectEventsWithEventIDsSQL, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1)
|
||||
var rows *sql.Rows
|
||||
var err error
|
||||
if txn != nil {
|
||||
rows, err = txn.QueryContext(ctx, query, iEventIDs...)
|
||||
} else {
|
||||
rows, err = s.db.QueryContext(ctx, query, iEventIDs...)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectEventsWithEventIDs: rows.close() failed")
|
||||
result := []gomatrixserverlib.HeaderedEvent{}
|
||||
for rows.Next() {
|
||||
var eventBytes []byte
|
||||
if err := rows.Scan(&eventBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err := json.Unmarshal(eventBytes, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, ev)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectStateEvent(
|
||||
ctx context.Context, roomID, evType, stateKey string,
|
||||
) (*gomatrixserverlib.HeaderedEvent, error) {
|
||||
stmt := s.selectStateEventStmt
|
||||
var res []byte
|
||||
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err = json.Unmarshal(res, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ev, err
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectBulkStateContent(
|
||||
ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool,
|
||||
) ([]tables.StrippedEvent, error) {
|
||||
hasWildcards := false
|
||||
eventTypeSet := make(map[string]bool)
|
||||
stateKeySet := make(map[string]bool)
|
||||
var eventTypes []string
|
||||
var stateKeys []string
|
||||
for _, tuple := range tuples {
|
||||
if !eventTypeSet[tuple.EventType] {
|
||||
eventTypeSet[tuple.EventType] = true
|
||||
eventTypes = append(eventTypes, tuple.EventType)
|
||||
}
|
||||
if !stateKeySet[tuple.StateKey] {
|
||||
stateKeySet[tuple.StateKey] = true
|
||||
stateKeys = append(stateKeys, tuple.StateKey)
|
||||
}
|
||||
if tuple.StateKey == "*" {
|
||||
hasWildcards = true
|
||||
}
|
||||
}
|
||||
|
||||
iRoomIDs := make([]interface{}, len(roomIDs))
|
||||
for i, v := range roomIDs {
|
||||
iRoomIDs[i] = v
|
||||
}
|
||||
iEventTypes := make([]interface{}, len(eventTypes))
|
||||
for i, v := range eventTypes {
|
||||
iEventTypes[i] = v
|
||||
}
|
||||
iStateKeys := make([]interface{}, len(stateKeys))
|
||||
for i, v := range stateKeys {
|
||||
iStateKeys[i] = v
|
||||
}
|
||||
|
||||
var query string
|
||||
var args []interface{}
|
||||
if hasWildcards && allowWildcards {
|
||||
query = strings.Replace(selectBulkStateContentWildSQL, "($1)", sqlutil.QueryVariadic(len(iRoomIDs)), 1)
|
||||
query = strings.Replace(query, "($2)", sqlutil.QueryVariadicOffset(len(iEventTypes), len(iRoomIDs)), 1)
|
||||
args = append(iRoomIDs, iEventTypes...)
|
||||
} else {
|
||||
query = strings.Replace(selectBulkStateContentSQL, "($1)", sqlutil.QueryVariadic(len(iRoomIDs)), 1)
|
||||
query = strings.Replace(query, "($2)", sqlutil.QueryVariadicOffset(len(iEventTypes), len(iRoomIDs)), 1)
|
||||
query = strings.Replace(query, "($3)", sqlutil.QueryVariadicOffset(len(iStateKeys), len(iEventTypes)+len(iRoomIDs)), 1)
|
||||
args = append(iRoomIDs, iEventTypes...)
|
||||
args = append(args, iStateKeys...)
|
||||
}
|
||||
rows, err := s.db.QueryContext(ctx, query, args...)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
strippedEvents := []tables.StrippedEvent{}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "SelectBulkStateContent: rows.close() failed")
|
||||
for rows.Next() {
|
||||
var roomID string
|
||||
var eventType string
|
||||
var stateKey string
|
||||
var contentVal string
|
||||
if err = rows.Scan(&roomID, &eventType, &stateKey, &contentVal); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
strippedEvents = append(strippedEvents, tables.StrippedEvent{
|
||||
RoomID: roomID,
|
||||
ContentValue: contentVal,
|
||||
EventType: eventType,
|
||||
StateKey: stateKey,
|
||||
})
|
||||
}
|
||||
return strippedEvents, rows.Err()
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error) {
|
||||
rows, err := s.selectKnownUsersStmt.QueryContext(ctx, userID, fmt.Sprintf("%%%s%%", searchString), limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := []string{}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "SelectKnownUsers: rows.close() failed")
|
||||
for rows.Next() {
|
||||
var userID string
|
||||
if err := rows.Scan(&userID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, userID)
|
||||
}
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectKnownRooms(ctx context.Context) ([]string, error) {
|
||||
rows, err := s.selectKnownRoomsStmt.QueryContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := []string{}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "SelectKnownRooms: rows.close() failed")
|
||||
for rows.Next() {
|
||||
var roomID string
|
||||
if err := rows.Scan(&roomID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, roomID)
|
||||
}
|
||||
return result, rows.Err()
|
||||
}
|
@ -1,40 +0,0 @@
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage/shared"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
)
|
||||
|
||||
type Database struct {
|
||||
shared.Database
|
||||
db *sql.DB
|
||||
writer sqlutil.Writer
|
||||
sqlutil.PartitionOffsetStatements
|
||||
}
|
||||
|
||||
// NewDatabase creates a new sync server database
|
||||
// nolint: gocyclo
|
||||
func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||
var d Database
|
||||
var err error
|
||||
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
d.writer = sqlutil.NewExclusiveWriter()
|
||||
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "currentstate"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currRoomState, err := NewSqliteCurrentRoomStateTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
d.Database = shared.Database{
|
||||
DB: d.db,
|
||||
Writer: d.writer,
|
||||
CurrentRoomState: currRoomState,
|
||||
}
|
||||
return &d, nil
|
||||
}
|
@ -1,37 +0,0 @@
|
||||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build !wasm
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage/postgres"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage/sqlite3"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
)
|
||||
|
||||
// NewDatabase opens a database connection.
|
||||
func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) {
|
||||
switch {
|
||||
case dbProperties.ConnectionString.IsSQLite():
|
||||
return sqlite3.NewDatabase(dbProperties)
|
||||
case dbProperties.ConnectionString.IsPostgres():
|
||||
return postgres.NewDatabase(dbProperties)
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected database type")
|
||||
}
|
||||
}
|
@ -1,34 +0,0 @@
|
||||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage/sqlite3"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
)
|
||||
|
||||
// NewDatabase opens a database connection.
|
||||
func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) {
|
||||
switch {
|
||||
case dbProperties.ConnectionString.IsSQLite():
|
||||
return sqlite3.NewDatabase(dbProperties)
|
||||
case dbProperties.ConnectionString.IsPostgres():
|
||||
return nil, fmt.Errorf("can't use Postgres implementation")
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected database type")
|
||||
}
|
||||
}
|
@ -1,88 +0,0 @@
|
||||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tables
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
type CurrentRoomState interface {
|
||||
SelectStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
|
||||
// SelectEventsWithEventIDs returns the events for the given event IDs. If the event(s) are missing, they are not returned
|
||||
// and no error is returned.
|
||||
SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error)
|
||||
// UpsertRoomState stores the given event in the database, along with an extracted piece of content.
|
||||
// The piece of content will vary depending on the event type, and table implementations may use this information to optimise
|
||||
// lookups e.g membership lookups. The mapped value of `contentVal` is outlined in ExtractContentValue. An empty `contentVal`
|
||||
// means there is nothing to store for this field.
|
||||
UpsertRoomState(ctx context.Context, txn *sql.Tx, event gomatrixserverlib.HeaderedEvent, contentVal string) error
|
||||
DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error
|
||||
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
||||
SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error)
|
||||
SelectBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]StrippedEvent, error)
|
||||
// SelectJoinedUsersSetForRooms returns the set of all users in the rooms who are joined to any of these rooms, along with the
|
||||
// counts of how many rooms they are joined.
|
||||
SelectJoinedUsersSetForRooms(ctx context.Context, roomIDs []string) (map[string]int, error)
|
||||
// SelectKnownUsers searches all users that userID knows about.
|
||||
SelectKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error)
|
||||
// SelectKnownRooms returns all rooms that we know about.
|
||||
SelectKnownRooms(ctx context.Context) ([]string, error)
|
||||
}
|
||||
|
||||
// StrippedEvent represents a stripped event for returning extracted content values.
|
||||
type StrippedEvent struct {
|
||||
RoomID string
|
||||
EventType string
|
||||
StateKey string
|
||||
ContentValue string
|
||||
}
|
||||
|
||||
// ExtractContentValue from the given state event. For example, given an m.room.name event with:
|
||||
// content: { name: "Foo" }
|
||||
// this returns "Foo".
|
||||
func ExtractContentValue(ev *gomatrixserverlib.HeaderedEvent) string {
|
||||
content := ev.Content()
|
||||
key := ""
|
||||
switch ev.Type() {
|
||||
case gomatrixserverlib.MRoomCreate:
|
||||
key = "creator"
|
||||
case gomatrixserverlib.MRoomCanonicalAlias:
|
||||
key = "alias"
|
||||
case gomatrixserverlib.MRoomHistoryVisibility:
|
||||
key = "history_visibility"
|
||||
case gomatrixserverlib.MRoomJoinRules:
|
||||
key = "join_rule"
|
||||
case gomatrixserverlib.MRoomMember:
|
||||
key = "membership"
|
||||
case gomatrixserverlib.MRoomName:
|
||||
key = "name"
|
||||
case "m.room.avatar":
|
||||
key = "url"
|
||||
case "m.room.topic":
|
||||
key = "topic"
|
||||
case "m.room.guest_access":
|
||||
key = "guest_access"
|
||||
}
|
||||
result := gjson.GetBytes(content, key)
|
||||
if !result.Exists() {
|
||||
return ""
|
||||
}
|
||||
// this returns the empty string if this is not a string type
|
||||
return result.Str
|
||||
}
|
@ -141,17 +141,6 @@ client_api:
|
||||
threshold: 5
|
||||
cooloff_ms: 500
|
||||
|
||||
# Configuration for the Current State Server.
|
||||
current_state_server:
|
||||
internal_api:
|
||||
listen: http://localhost:7782
|
||||
connect: http://localhost:7782
|
||||
database:
|
||||
connection_string: file:currentstate.db
|
||||
max_open_conns: 100
|
||||
max_idle_conns: 2
|
||||
conn_max_lifetime: -1
|
||||
|
||||
# Configuration for the EDU server.
|
||||
edu_server:
|
||||
internal_api:
|
||||
|
@ -109,7 +109,7 @@ Assuming that Postgres 9.5 (or later) is installed:
|
||||
* Create the component databases:
|
||||
|
||||
```bash
|
||||
for i in account device mediaapi syncapi roomserver serverkey federationsender currentstate appservice e2ekey naffka; do
|
||||
for i in account device mediaapi syncapi roomserver serverkey federationsender appservice e2ekey naffka; do
|
||||
sudo -u postgres createdb -O dendrite dendrite_$i
|
||||
done
|
||||
```
|
||||
@ -239,16 +239,6 @@ This is what implements the room DAG. Clients do not talk to this.
|
||||
./bin/dendrite-room-server --config=dendrite.yaml
|
||||
```
|
||||
|
||||
#### Current state server
|
||||
|
||||
This tracks the current state of rooms which various components need to know. For example,
|
||||
`/publicRooms` implemented by client API asks this server for the room names, joined member
|
||||
counts, etc.
|
||||
|
||||
```bash
|
||||
./bin/dendrite-current-state-server --config=dendrite.yaml
|
||||
```
|
||||
|
||||
#### Federation sender
|
||||
|
||||
This sends events from our users to other servers. This is only required if
|
||||
|
@ -16,7 +16,6 @@ package federationapi
|
||||
|
||||
import (
|
||||
"github.com/gorilla/mux"
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
@ -38,12 +37,11 @@ func AddPublicRoutes(
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
federationSenderAPI federationSenderAPI.FederationSenderInternalAPI,
|
||||
eduAPI eduserverAPI.EDUServerInputAPI,
|
||||
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||
keyAPI keyserverAPI.KeyInternalAPI,
|
||||
) {
|
||||
routing.Setup(
|
||||
fedRouter, keyRouter, cfg, rsAPI,
|
||||
eduAPI, federationSenderAPI, keyRing,
|
||||
federation, userAPI, stateAPI, keyAPI,
|
||||
federation, userAPI, keyAPI,
|
||||
)
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) {
|
||||
fsAPI := base.FederationSenderHTTPClient()
|
||||
// TODO: This is pretty fragile, as if anything calls anything on these nils this test will break.
|
||||
// Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing.
|
||||
federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, nil)
|
||||
federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil)
|
||||
baseURL, cancel := test.ListenAndServe(t, base.PublicFederationAPIMux, true)
|
||||
defer cancel()
|
||||
serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://"))
|
||||
|
@ -19,7 +19,6 @@ import (
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
@ -48,7 +47,6 @@ func Setup(
|
||||
keys gomatrixserverlib.JSONVerifier,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
userAPI userapi.UserInternalAPI,
|
||||
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||
keyAPI keyserverAPI.KeyInternalAPI,
|
||||
) {
|
||||
v2keysmux := keyMux.PathPrefix("/v2").Subrouter()
|
||||
|
@ -51,19 +51,18 @@ type Dendrite struct {
|
||||
// been a breaking change to the config file format.
|
||||
Version int `yaml:"version"`
|
||||
|
||||
Global Global `yaml:"global"`
|
||||
AppServiceAPI AppServiceAPI `yaml:"app_service_api"`
|
||||
ClientAPI ClientAPI `yaml:"client_api"`
|
||||
CurrentStateServer CurrentStateServer `yaml:"current_state_server"`
|
||||
EDUServer EDUServer `yaml:"edu_server"`
|
||||
FederationAPI FederationAPI `yaml:"federation_api"`
|
||||
FederationSender FederationSender `yaml:"federation_sender"`
|
||||
KeyServer KeyServer `yaml:"key_server"`
|
||||
MediaAPI MediaAPI `yaml:"media_api"`
|
||||
RoomServer RoomServer `yaml:"room_server"`
|
||||
ServerKeyAPI ServerKeyAPI `yaml:"server_key_api"`
|
||||
SyncAPI SyncAPI `yaml:"sync_api"`
|
||||
UserAPI UserAPI `yaml:"user_api"`
|
||||
Global Global `yaml:"global"`
|
||||
AppServiceAPI AppServiceAPI `yaml:"app_service_api"`
|
||||
ClientAPI ClientAPI `yaml:"client_api"`
|
||||
EDUServer EDUServer `yaml:"edu_server"`
|
||||
FederationAPI FederationAPI `yaml:"federation_api"`
|
||||
FederationSender FederationSender `yaml:"federation_sender"`
|
||||
KeyServer KeyServer `yaml:"key_server"`
|
||||
MediaAPI MediaAPI `yaml:"media_api"`
|
||||
RoomServer RoomServer `yaml:"room_server"`
|
||||
ServerKeyAPI ServerKeyAPI `yaml:"server_key_api"`
|
||||
SyncAPI SyncAPI `yaml:"sync_api"`
|
||||
UserAPI UserAPI `yaml:"user_api"`
|
||||
|
||||
// The config for tracing the dendrite servers.
|
||||
Tracing struct {
|
||||
@ -289,7 +288,6 @@ func (c *Dendrite) Defaults() {
|
||||
|
||||
c.Global.Defaults()
|
||||
c.ClientAPI.Defaults()
|
||||
c.CurrentStateServer.Defaults()
|
||||
c.EDUServer.Defaults()
|
||||
c.FederationAPI.Defaults()
|
||||
c.FederationSender.Defaults()
|
||||
@ -309,7 +307,7 @@ func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) {
|
||||
Verify(configErrs *ConfigErrors, isMonolith bool)
|
||||
}
|
||||
for _, c := range []verifiable{
|
||||
&c.Global, &c.ClientAPI, &c.CurrentStateServer,
|
||||
&c.Global, &c.ClientAPI,
|
||||
&c.EDUServer, &c.FederationAPI, &c.FederationSender,
|
||||
&c.KeyServer, &c.MediaAPI, &c.RoomServer,
|
||||
&c.ServerKeyAPI, &c.SyncAPI, &c.UserAPI,
|
||||
@ -321,7 +319,6 @@ func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) {
|
||||
|
||||
func (c *Dendrite) Wiring() {
|
||||
c.ClientAPI.Matrix = &c.Global
|
||||
c.CurrentStateServer.Matrix = &c.Global
|
||||
c.EDUServer.Matrix = &c.Global
|
||||
c.FederationAPI.Matrix = &c.Global
|
||||
c.FederationSender.Matrix = &c.Global
|
||||
@ -512,15 +509,6 @@ func (config *Dendrite) UserAPIURL() string {
|
||||
return string(config.UserAPI.InternalAPI.Connect)
|
||||
}
|
||||
|
||||
// CurrentStateAPIURL returns an HTTP URL for where the currentstateserver is listening.
|
||||
func (config *Dendrite) CurrentStateAPIURL() string {
|
||||
// Hard code the currentstateserver to talk HTTP for now.
|
||||
// If we support HTTPS we need to think of a practical way to do certificate validation.
|
||||
// People setting up servers shouldn't need to get a certificate valid for the public
|
||||
// internet for an internal API.
|
||||
return string(config.CurrentStateServer.InternalAPI.Connect)
|
||||
}
|
||||
|
||||
// EDUServerURL returns an HTTP URL for where the EDU server is listening.
|
||||
func (config *Dendrite) EDUServerURL() string {
|
||||
// Hard code the EDU server to talk HTTP for now.
|
||||
|
@ -1,24 +0,0 @@
|
||||
package config
|
||||
|
||||
type CurrentStateServer struct {
|
||||
Matrix *Global `yaml:"-"`
|
||||
|
||||
InternalAPI InternalAPIOptions `yaml:"internal_api"`
|
||||
|
||||
// The CurrentState database stores the current state of all rooms.
|
||||
// It is accessed by the CurrentStateServer.
|
||||
Database DatabaseOptions `yaml:"database"`
|
||||
}
|
||||
|
||||
func (c *CurrentStateServer) Defaults() {
|
||||
c.InternalAPI.Listen = "http://localhost:7782"
|
||||
c.InternalAPI.Connect = "http://localhost:7782"
|
||||
c.Database.Defaults()
|
||||
c.Database.ConnectionString = "file:currentstate.db"
|
||||
}
|
||||
|
||||
func (c *CurrentStateServer) Verify(configErrs *ConfigErrors, isMonolith bool) {
|
||||
checkURL(configErrs, "current_state_server.internal_api.listen", string(c.InternalAPI.Listen))
|
||||
checkURL(configErrs, "current_state_server.internal_api.connect", string(c.InternalAPI.Connect))
|
||||
checkNotEmpty(configErrs, "current_state_server.database.connection_string", string(c.Database.ConnectionString))
|
||||
}
|
@ -21,7 +21,6 @@ import (
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
@ -38,7 +37,6 @@ import (
|
||||
|
||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||
asinthttp "github.com/matrix-org/dendrite/appservice/inthttp"
|
||||
currentstateinthttp "github.com/matrix-org/dendrite/currentstateserver/inthttp"
|
||||
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||
eduinthttp "github.com/matrix-org/dendrite/eduserver/inthttp"
|
||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||
@ -188,15 +186,6 @@ func (b *BaseDendrite) UserAPIClient() userapi.UserInternalAPI {
|
||||
return userAPI
|
||||
}
|
||||
|
||||
// CurrentStateAPIClient returns CurrentStateInternalAPI for hitting the currentstateserver over HTTP.
|
||||
func (b *BaseDendrite) CurrentStateAPIClient() currentstateAPI.CurrentStateInternalAPI {
|
||||
stateAPI, err := currentstateinthttp.NewCurrentStateAPIClient(b.Cfg.CurrentStateAPIURL(), b.httpClient)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panic("UserAPIClient failed", b.httpClient)
|
||||
}
|
||||
return stateAPI
|
||||
}
|
||||
|
||||
// EDUServerClient returns EDUServerInputAPI for hitting the EDU server over HTTP
|
||||
func (b *BaseDendrite) EDUServerClient() eduServerAPI.EDUServerInputAPI {
|
||||
e, err := eduinthttp.NewEDUServerClient(b.Cfg.EDUServerURL(), b.httpClient)
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||
"github.com/matrix-org/dendrite/clientapi"
|
||||
"github.com/matrix-org/dendrite/clientapi/api"
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/federationapi"
|
||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||
@ -53,7 +52,6 @@ type Monolith struct {
|
||||
RoomserverAPI roomserverAPI.RoomserverInternalAPI
|
||||
ServerKeyAPI serverKeyAPI.ServerKeyInternalAPI
|
||||
UserAPI userapi.UserInternalAPI
|
||||
StateAPI currentstateAPI.CurrentStateInternalAPI
|
||||
KeyAPI keyAPI.KeyInternalAPI
|
||||
|
||||
// Optional
|
||||
@ -65,17 +63,17 @@ func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router
|
||||
clientapi.AddPublicRoutes(
|
||||
csMux, &m.Config.ClientAPI, m.KafkaProducer, m.AccountDB,
|
||||
m.FedClient, m.RoomserverAPI,
|
||||
m.EDUInternalAPI, m.AppserviceAPI, m.StateAPI, transactions.New(),
|
||||
m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
|
||||
m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider,
|
||||
)
|
||||
federationapi.AddPublicRoutes(
|
||||
ssMux, keyMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient,
|
||||
m.KeyRing, m.RoomserverAPI, m.FederationSenderAPI,
|
||||
m.EDUInternalAPI, m.StateAPI, m.KeyAPI,
|
||||
m.EDUInternalAPI, m.KeyAPI,
|
||||
)
|
||||
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client)
|
||||
syncapi.AddPublicRoutes(
|
||||
csMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI,
|
||||
m.KeyAPI, m.StateAPI, m.FedClient, &m.Config.SyncAPI,
|
||||
m.KeyAPI, m.FedClient, &m.Config.SyncAPI,
|
||||
)
|
||||
}
|
||||
|
@ -87,7 +87,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
|
||||
// the table names are globally unique. But we might not want to
|
||||
// rely on that in the future.
|
||||
cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(database)
|
||||
cfg.CurrentStateServer.Database.ConnectionString = config.DataSource(database)
|
||||
cfg.FederationSender.Database.ConnectionString = config.DataSource(database)
|
||||
cfg.KeyServer.Database.ConnectionString = config.DataSource(database)
|
||||
cfg.MediaAPI.Database.ConnectionString = config.DataSource(database)
|
||||
@ -98,7 +97,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
|
||||
cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(database)
|
||||
|
||||
cfg.AppServiceAPI.InternalAPI.Listen = assignAddress()
|
||||
cfg.CurrentStateServer.InternalAPI.Listen = assignAddress()
|
||||
cfg.EDUServer.InternalAPI.Listen = assignAddress()
|
||||
cfg.FederationAPI.InternalAPI.Listen = assignAddress()
|
||||
cfg.FederationSender.InternalAPI.Listen = assignAddress()
|
||||
@ -110,7 +108,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
|
||||
cfg.UserAPI.InternalAPI.Listen = assignAddress()
|
||||
|
||||
cfg.AppServiceAPI.InternalAPI.Connect = cfg.AppServiceAPI.InternalAPI.Listen
|
||||
cfg.CurrentStateServer.InternalAPI.Connect = cfg.CurrentStateServer.InternalAPI.Listen
|
||||
cfg.EDUServer.InternalAPI.Connect = cfg.EDUServer.InternalAPI.Listen
|
||||
cfg.FederationAPI.InternalAPI.Connect = cfg.FederationAPI.InternalAPI.Listen
|
||||
cfg.FederationSender.InternalAPI.Connect = cfg.FederationSender.InternalAPI.Listen
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
@ -38,7 +37,6 @@ type OutputKeyChangeEventConsumer struct {
|
||||
db storage.Database
|
||||
serverName gomatrixserverlib.ServerName // our server name
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI
|
||||
stateAPI currentstateAPI.CurrentStateInternalAPI
|
||||
keyAPI api.KeyInternalAPI
|
||||
partitionToOffset map[int32]int64
|
||||
partitionToOffsetMu sync.Mutex
|
||||
@ -54,7 +52,6 @@ func NewOutputKeyChangeEventConsumer(
|
||||
n *syncapi.Notifier,
|
||||
keyAPI api.KeyInternalAPI,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||
store storage.Database,
|
||||
) *OutputKeyChangeEventConsumer {
|
||||
|
||||
@ -71,7 +68,6 @@ func NewOutputKeyChangeEventConsumer(
|
||||
serverName: serverName,
|
||||
keyAPI: keyAPI,
|
||||
rsAPI: rsAPI,
|
||||
stateAPI: stateAPI,
|
||||
partitionToOffset: make(map[int32]int64),
|
||||
partitionToOffsetMu: sync.Mutex{},
|
||||
notifier: n,
|
||||
|
@ -23,7 +23,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/syncapi/internal"
|
||||
@ -42,15 +41,14 @@ type RequestPool struct {
|
||||
notifier *Notifier
|
||||
keyAPI keyapi.KeyInternalAPI
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI
|
||||
stateAPI currentstateAPI.CurrentStateInternalAPI
|
||||
}
|
||||
|
||||
// NewRequestPool makes a new RequestPool
|
||||
func NewRequestPool(
|
||||
db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
) *RequestPool {
|
||||
return &RequestPool{db, userAPI, n, keyAPI, rsAPI, stateAPI}
|
||||
return &RequestPool{db, userAPI, n, keyAPI, rsAPI}
|
||||
}
|
||||
|
||||
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
currentstateapi "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
@ -42,7 +41,6 @@ func AddPublicRoutes(
|
||||
userAPI userapi.UserInternalAPI,
|
||||
rsAPI api.RoomserverInternalAPI,
|
||||
keyAPI keyapi.KeyInternalAPI,
|
||||
currentStateAPI currentstateapi.CurrentStateInternalAPI,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
cfg *config.SyncAPI,
|
||||
) {
|
||||
@ -62,11 +60,11 @@ func AddPublicRoutes(
|
||||
logrus.WithError(err).Panicf("failed to start notifier")
|
||||
}
|
||||
|
||||
requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, rsAPI, currentStateAPI)
|
||||
requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, rsAPI)
|
||||
|
||||
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
|
||||
cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
|
||||
consumer, notifier, keyAPI, rsAPI, currentStateAPI, syncDB,
|
||||
consumer, notifier, keyAPI, rsAPI, syncDB,
|
||||
)
|
||||
if err = keyChangeConsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start key change consumer")
|
||||
|
Loading…
Reference in New Issue
Block a user