Implement key uploads (#1202)

* Add storage layer for postgres/sqlite

* Return OTK counts when inserting new keys

* Hook up the key DB and make a test pass

* Convert postgres queries to be sqlite queries

* Blacklist test due to requiring rejected events

* Unbreak tests

* Update blacklist
This commit is contained in:
Kegsay 2020-07-15 12:02:34 +01:00 committed by GitHub
parent b4c07995d6
commit 9dd2ed7f65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 868 additions and 27 deletions

View File

@ -26,6 +26,7 @@ import (
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/transactions"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
@ -48,6 +49,7 @@ func AddPublicRoutes(
transactionsCache *transactions.Cache,
fsAPI federationSenderAPI.FederationSenderInternalAPI,
userAPI userapi.UserInternalAPI,
keyAPI keyserverAPI.KeyInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider,
) {
syncProducer := &producers.SyncAPIProducer{
@ -58,6 +60,6 @@ func AddPublicRoutes(
routing.Setup(
router, cfg, eduInputAPI, rsAPI, asAPI,
accountsDB, deviceDB, userAPI, federation,
syncProducer, transactionsCache, fsAPI, stateAPI, extRoomsProvider,
syncProducer, transactionsCache, fsAPI, stateAPI, keyAPI, extRoomsProvider,
)
}

View File

@ -15,8 +15,13 @@
package routing
import (
"encoding/json"
"net/http"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/keyserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
)
@ -32,9 +37,55 @@ func QueryKeys(
}
}
func UploadKeys(req *http.Request) util.JSONResponse {
type uploadKeysRequest struct {
DeviceKeys json.RawMessage `json:"device_keys"`
OneTimeKeys map[string]json.RawMessage `json:"one_time_keys"`
}
func UploadKeys(req *http.Request, keyAPI api.KeyInternalAPI, device *userapi.Device) util.JSONResponse {
var r uploadKeysRequest
resErr := httputil.UnmarshalJSONRequest(req, &r)
if resErr != nil {
return *resErr
}
uploadReq := &api.PerformUploadKeysRequest{}
if r.DeviceKeys != nil {
uploadReq.DeviceKeys = []api.DeviceKeys{
{
DeviceID: device.ID,
UserID: device.UserID,
KeyJSON: r.DeviceKeys,
},
}
}
if r.OneTimeKeys != nil {
uploadReq.OneTimeKeys = []api.OneTimeKeys{
{
DeviceID: device.ID,
UserID: device.UserID,
KeyJSON: r.OneTimeKeys,
},
}
}
var uploadRes api.PerformUploadKeysResponse
keyAPI.PerformUploadKeys(req.Context(), uploadReq, &uploadRes)
if uploadRes.Error != nil {
util.GetLogger(req.Context()).WithError(uploadRes.Error).Error("Failed to PerformUploadKeys")
return jsonerror.InternalServerError()
}
if len(uploadRes.KeyErrors) > 0 {
util.GetLogger(req.Context()).WithField("key_errors", uploadRes.KeyErrors).Error("Failed to upload one or more keys")
return util.JSONResponse{
Code: 400,
JSON: uploadRes.KeyErrors,
}
}
return util.JSONResponse{
Code: 200,
JSON: struct{}{},
JSON: struct {
OTKCounts interface{} `json:"one_time_key_counts"`
}{uploadRes.OneTimeKeyCounts[0].KeyCount},
}
}

View File

@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/transactions"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
@ -62,6 +63,7 @@ func Setup(
transactionsCache *transactions.Cache,
federationSender federationSenderAPI.FederationSenderInternalAPI,
stateAPI currentstateAPI.CurrentStateInternalAPI,
keyAPI keyserverAPI.KeyInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider,
) {
userInteractiveAuth := auth.NewUserInteractive(accountDB.GetAccountByPassword, cfg)
@ -705,7 +707,12 @@ func Setup(
// Supplying a device ID is deprecated.
r0mux.Handle("/keys/upload/{deviceID}",
httputil.MakeAuthAPI("keys_upload", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return UploadKeys(req)
return UploadKeys(req, keyAPI, device)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/keys/upload",
httputil.MakeAuthAPI("keys_upload", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return UploadKeys(req, keyAPI, device)
}),
).Methods(http.MethodPost, http.MethodOptions)
}

View File

@ -36,10 +36,11 @@ func main() {
eduInputAPI := base.EDUServerClient()
userAPI := base.UserAPIClient()
stateAPI := base.CurrentStateAPIClient()
keyAPI := base.KeyServerHTTPClient()
clientapi.AddPublicRoutes(
base.PublicAPIMux, base.Cfg, base.KafkaProducer, deviceDB, accountDB, federation,
rsAPI, eduInputAPI, asQuery, stateAPI, transactions.New(), fsAPI, userAPI, nil,
rsAPI, eduInputAPI, asQuery, stateAPI, transactions.New(), fsAPI, userAPI, keyAPI, nil,
)
base.SetupAndServeHTTP(string(base.Cfg.Bind.ClientAPI), string(base.Cfg.Listen.ClientAPI))

View File

@ -33,6 +33,7 @@ import (
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/serverkeyapi"
"github.com/matrix-org/dendrite/userapi"
@ -129,6 +130,7 @@ func main() {
cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
cfg.Database.Naffka = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName))
cfg.Database.CurrentState = config.DataSource(fmt.Sprintf("file:%s-currentstate.db", *instanceName))
cfg.Database.E2EKey = config.DataSource(fmt.Sprintf("file:%s-e2ekey.db", *instanceName))
if err = cfg.Derive(); err != nil {
panic(err)
}
@ -184,6 +186,7 @@ func main() {
ServerKeyAPI: serverKeyAPI,
StateAPI: stateAPI,
UserAPI: userAPI,
KeyAPI: keyserver.NewInternalAPI(base.Base.Cfg),
ExtPublicRoomsProvider: provider,
}
monolith.AddAllPublicRoutes(base.Base.PublicAPIMux)

View File

@ -38,6 +38,7 @@ import (
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib"
@ -87,6 +88,7 @@ func main() {
cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
cfg.Database.CurrentState = config.DataSource(fmt.Sprintf("file:%s-currentstate.db", *instanceName))
cfg.Database.Naffka = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName))
cfg.Database.E2EKey = config.DataSource(fmt.Sprintf("file:%s-e2ekey.db", *instanceName))
if err = cfg.Derive(); err != nil {
panic(err)
}
@ -140,6 +142,7 @@ func main() {
RoomserverAPI: rsAPI,
UserAPI: userAPI,
StateAPI: stateAPI,
KeyAPI: keyserver.NewInternalAPI(base.Cfg),
//ServerKeyAPI: serverKeyAPI,
ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider(
ygg, fsAPI, federation,

View File

@ -24,7 +24,7 @@ func main() {
base := setup.NewBaseDendrite(cfg, "KeyServer", true)
defer base.Close() // nolint: errcheck
intAPI := keyserver.NewInternalAPI()
intAPI := keyserver.NewInternalAPI(base.Cfg)
keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI)

View File

@ -119,7 +119,7 @@ func main() {
rsImpl.SetFederationSenderAPI(fsAPI)
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
keyAPI := keyserver.NewInternalAPI()
keyAPI := keyserver.NewInternalAPI(base.Cfg)
monolith := setup.Monolith{
Config: base.Cfg,

View File

@ -29,6 +29,7 @@ import (
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/userapi"
go_http_js_libp2p "github.com/matrix-org/go-http-js-libp2p"
@ -172,6 +173,7 @@ func main() {
cfg.Database.ServerKey = "file:/idb/dendritejs_serverkey.db"
cfg.Database.SyncAPI = "file:/idb/dendritejs_syncapi.db"
cfg.Database.CurrentState = "file:/idb/dendritejs_currentstate.db"
cfg.Database.E2EKey = "file:/idb/dendritejs_e2ekey.db"
cfg.Kafka.Topics.OutputTypingEvent = "output_typing_event"
cfg.Kafka.Topics.OutputSendToDeviceEvent = "output_send_to_device_event"
cfg.Kafka.Topics.OutputClientData = "output_client_data"
@ -231,6 +233,7 @@ func main() {
RoomserverAPI: rsAPI,
StateAPI: stateAPI,
UserAPI: userAPI,
KeyAPI: keyserver.NewInternalAPI(base.Cfg),
//ServerKeyAPI: serverKeyAPI,
ExtPublicRoomsProvider: p2pPublicRoomProvider,
}

View File

@ -121,6 +121,7 @@ database:
federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable"
appservice: "postgres://dendrite:itsasecret@localhost/dendrite_appservice?sslmode=disable"
current_state: "postgres://dendrite:itsasecret@localhost/dendrite_currentstate?sslmode=disable"
e2e_key: "postgres://dendrite:itsasecret@localhost/dendrite_e2ekey?sslmode=disable"
max_open_conns: 100
max_idle_conns: 2
conn_max_lifetime: -1

View File

@ -174,6 +174,9 @@ type Dendrite struct {
// The ServerKey database caches the public keys of remote servers.
// It may be accessed by the FederationAPI, the ClientAPI, and the MediaAPI.
ServerKey DataSource `yaml:"server_key"`
// The E2EKey database stores one-time public keys for devices in addition to
// signed device keys. Used for E2E.
E2EKey DataSource `yaml:"e2e_key"`
// The SyncAPI stores information used by the SyncAPI server.
// It is only accessed by the SyncAPI server.
SyncAPI DataSource `yaml:"sync_api"`
@ -602,6 +605,7 @@ func (config *Dendrite) checkDatabase(configErrs *configErrors) {
checkNotEmpty(configErrs, "database.sync_api", string(config.Database.SyncAPI))
checkNotEmpty(configErrs, "database.room_server", string(config.Database.RoomServer))
checkNotEmpty(configErrs, "database.current_state", string(config.Database.CurrentState))
checkNotEmpty(configErrs, "database.e2e_key", string(config.Database.E2EKey))
}
// checkListen verifies the parameters listen.* are valid.
@ -615,6 +619,7 @@ func (config *Dendrite) checkListen(configErrs *configErrors) {
checkNotEmpty(configErrs, "listen.server_key_api", string(config.Listen.EDUServer))
checkNotEmpty(configErrs, "listen.user_api", string(config.Listen.UserAPI))
checkNotEmpty(configErrs, "listen.current_state_server", string(config.Listen.CurrentState))
checkNotEmpty(configErrs, "listen.key_server", string(config.Listen.KeyServer))
}
// checkLogging verifies the parameters logging.* are valid.

View File

@ -56,6 +56,7 @@ database:
room_server: "postgresql:///room_server"
appservice: "postgresql:///appservice"
current_state: "postgresql:///current_state"
e2e_key: "postgresql:///e2e_key"
listen:
room_server: "localhost:7770"
client_api: "localhost:7771"
@ -66,6 +67,7 @@ listen:
edu_server: "localhost:7778"
user_api: "localhost:7779"
current_state_server: "localhost:7775"
key_server: "localhost:7776"
logging:
- type: "file"
level: "info"

View File

@ -68,7 +68,7 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) {
publicMux, m.Config, m.KafkaProducer, m.DeviceDB, m.AccountDB,
m.FedClient, m.RoomserverAPI,
m.EDUInternalAPI, m.AppserviceAPI, m.StateAPI, transactions.New(),
m.FederationSenderAPI, m.UserAPI, m.ExtPublicRoomsProvider,
m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider,
)
federationapi.AddPublicRoutes(
publicMux, m.Config, m.UserAPI, m.FedClient,

View File

@ -17,6 +17,7 @@ package api
import (
"context"
"encoding/json"
"strings"
)
type KeyInternalAPI interface {
@ -27,7 +28,11 @@ type KeyInternalAPI interface {
// KeyError is returned if there was a problem performing/querying the server
type KeyError struct {
Error string
Err string
}
func (k *KeyError) Error() string {
return k.Err
}
// DeviceKeys represents a set of device keys for a single device
@ -52,6 +57,12 @@ type OneTimeKeys struct {
KeyJSON map[string]json.RawMessage
}
// Split a key in KeyJSON into algorithm and key ID
func (k *OneTimeKeys) Split(keyIDWithAlgo string) (algo string, keyID string) {
segments := strings.Split(keyIDWithAlgo, ":")
return segments[0], segments[1]
}
// OneTimeKeysCount represents the counts of one-time keys for a single device
type OneTimeKeysCount struct {
// The user who owns this device
@ -74,6 +85,7 @@ type PerformUploadKeysRequest struct {
// PerformUploadKeysResponse is the response to PerformUploadKeys
type PerformUploadKeysResponse struct {
// A fatal error when processing e.g database failures
Error *KeyError
// A map of user_id -> device_id -> Error for tracking failures.
KeyErrors map[string]map[string]*KeyError

View File

@ -25,7 +25,7 @@ import (
)
type KeyInternalAPI struct {
db storage.Database
DB storage.Database
}
func (a *KeyInternalAPI) PerformUploadKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) {
@ -52,7 +52,7 @@ func (a *KeyInternalAPI) uploadDeviceKeys(ctx context.Context, req *api.PerformU
}
res.KeyError(key.UserID, key.DeviceID, &api.KeyError{
Error: fmt.Sprintf(
Err: fmt.Sprintf(
"user_id or device_id mismatch: users: %s - %s, devices: %s - %s",
gotUserID, key.UserID, gotDeviceID, key.DeviceID,
),
@ -66,16 +66,16 @@ func (a *KeyInternalAPI) uploadDeviceKeys(ctx context.Context, req *api.PerformU
DeviceID: keysToStore[i].DeviceID,
}
}
if err := a.db.DeviceKeysJSON(ctx, existingKeys); err != nil {
if err := a.DB.DeviceKeysJSON(ctx, existingKeys); err != nil {
res.Error = &api.KeyError{
Error: fmt.Sprintf("failed to query existing device keys: %s", err.Error()),
Err: fmt.Sprintf("failed to query existing device keys: %s", err.Error()),
}
return
}
// store the device keys and emit changes
if err := a.db.StoreDeviceKeys(ctx, keysToStore); err != nil {
if err := a.DB.StoreDeviceKeys(ctx, keysToStore); err != nil {
res.Error = &api.KeyError{
Error: fmt.Sprintf("failed to store device keys: %s", err.Error()),
Err: fmt.Sprintf("failed to store device keys: %s", err.Error()),
}
return
}
@ -91,10 +91,10 @@ func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.Perform
keyIDsWithAlgorithms[i] = keyIDWithAlgo
i++
}
existingKeys, err := a.db.ExistingOneTimeKeys(ctx, key.UserID, key.DeviceID, keyIDsWithAlgorithms)
existingKeys, err := a.DB.ExistingOneTimeKeys(ctx, key.UserID, key.DeviceID, keyIDsWithAlgorithms)
if err != nil {
res.KeyError(key.UserID, key.DeviceID, &api.KeyError{
Error: "failed to query existing one-time keys: " + err.Error(),
Err: "failed to query existing one-time keys: " + err.Error(),
})
continue
}
@ -102,17 +102,21 @@ func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.Perform
// if keys exist and the JSON doesn't match, error out as the key already exists
if !bytes.Equal(existingKeys[keyIDWithAlgo], key.KeyJSON[keyIDWithAlgo]) {
res.KeyError(key.UserID, key.DeviceID, &api.KeyError{
Error: fmt.Sprintf("%s device %s: algorithm / key ID %s one-time key already exists", key.UserID, key.DeviceID, keyIDWithAlgo),
Err: fmt.Sprintf("%s device %s: algorithm / key ID %s one-time key already exists", key.UserID, key.DeviceID, keyIDWithAlgo),
})
continue
}
}
// store one-time keys
if err := a.db.StoreOneTimeKeys(ctx, key); err != nil {
counts, err := a.DB.StoreOneTimeKeys(ctx, key)
if err != nil {
res.KeyError(key.UserID, key.DeviceID, &api.KeyError{
Error: fmt.Sprintf("%s device %s : failed to store one-time keys: %s", key.UserID, key.DeviceID, err.Error()),
Err: fmt.Sprintf("%s device %s : failed to store one-time keys: %s", key.UserID, key.DeviceID, err.Error()),
})
continue
}
// collect counts
res.OneTimeKeyCounts = append(res.OneTimeKeyCounts, *counts)
}
}

View File

@ -63,7 +63,7 @@ func (h *httpKeyInternalAPI) PerformClaimKeys(
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
if err != nil {
response.Error = &api.KeyError{
Error: err.Error(),
Err: err.Error(),
}
}
}
@ -80,7 +80,7 @@ func (h *httpKeyInternalAPI) PerformUploadKeys(
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
if err != nil {
response.Error = &api.KeyError{
Error: err.Error(),
Err: err.Error(),
}
}
}
@ -97,7 +97,7 @@ func (h *httpKeyInternalAPI) QueryKeys(
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
if err != nil {
response.Error = &api.KeyError{
Error: err.Error(),
Err: err.Error(),
}
}
}

View File

@ -16,9 +16,12 @@ package keyserver
import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/keyserver/inthttp"
"github.com/matrix-org/dendrite/keyserver/storage"
"github.com/sirupsen/logrus"
)
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
@ -29,6 +32,15 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
// NewInternalAPI returns a concerete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI() api.KeyInternalAPI {
return &internal.KeyInternalAPI{}
func NewInternalAPI(cfg *config.Dendrite) api.KeyInternalAPI {
db, err := storage.NewDatabase(
string(cfg.Database.E2EKey),
cfg.DbProperties(),
)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to key server database")
}
return &internal.KeyInternalAPI{
DB: db,
}
}

View File

@ -27,7 +27,7 @@ type Database interface {
ExistingOneTimeKeys(ctx context.Context, userID, deviceID string, keyIDsWithAlgorithms []string) (map[string]json.RawMessage, error)
// StoreOneTimeKeys persists the given one-time keys.
StoreOneTimeKeys(ctx context.Context, keys api.OneTimeKeys) error
StoreOneTimeKeys(ctx context.Context, keys api.OneTimeKeys) (*api.OneTimeKeysCount, error)
// DeviceKeysJSON populates the KeyJSON for the given keys. If any proided `keys` have a `KeyJSON` already then it will be replaced.
DeviceKeysJSON(ctx context.Context, keys []api.DeviceKeys) error

View File

@ -0,0 +1,97 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"time"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
var deviceKeysSchema = `
-- Stores device keys for users
CREATE TABLE IF NOT EXISTS keyserver_device_keys (
user_id TEXT NOT NULL,
device_id TEXT NOT NULL,
ts_added_secs BIGINT NOT NULL,
key_json TEXT NOT NULL,
-- Clobber based on tuple of user/device.
CONSTRAINT keyserver_device_keys_unique UNIQUE (user_id, device_id)
);
`
const upsertDeviceKeysSQL = "" +
"INSERT INTO keyserver_device_keys (user_id, device_id, ts_added_secs, key_json)" +
" VALUES ($1, $2, $3, $4)" +
" ON CONFLICT ON CONSTRAINT keyserver_device_keys_unique" +
" DO UPDATE SET key_json = $4"
const selectDeviceKeysSQL = "" +
"SELECT key_json FROM keyserver_device_keys WHERE user_id=$1 AND device_id=$2"
type deviceKeysStatements struct {
db *sql.DB
upsertDeviceKeysStmt *sql.Stmt
selectDeviceKeysStmt *sql.Stmt
}
func NewPostgresDeviceKeysTable(db *sql.DB) (tables.DeviceKeys, error) {
s := &deviceKeysStatements{
db: db,
}
_, err := db.Exec(deviceKeysSchema)
if err != nil {
return nil, err
}
if s.upsertDeviceKeysStmt, err = db.Prepare(upsertDeviceKeysSQL); err != nil {
return nil, err
}
if s.selectDeviceKeysStmt, err = db.Prepare(selectDeviceKeysSQL); err != nil {
return nil, err
}
return s, nil
}
func (s *deviceKeysStatements) SelectDeviceKeysJSON(ctx context.Context, keys []api.DeviceKeys) error {
for i, key := range keys {
var keyJSONStr string
err := s.selectDeviceKeysStmt.QueryRowContext(ctx, key.UserID, key.DeviceID).Scan(&keyJSONStr)
if err != nil && err != sql.ErrNoRows {
return err
}
// this will be '' when there is no device
keys[i].KeyJSON = []byte(keyJSONStr)
}
return nil
}
func (s *deviceKeysStatements) InsertDeviceKeys(ctx context.Context, keys []api.DeviceKeys) error {
now := time.Now().Unix()
return sqlutil.WithTransaction(s.db, func(txn *sql.Tx) error {
for _, key := range keys {
_, err := txn.Stmt(s.upsertDeviceKeysStmt).ExecContext(
ctx, key.UserID, key.DeviceID, now, string(key.KeyJSON),
)
if err != nil {
return err
}
}
return nil
})
}

View File

@ -0,0 +1,143 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"encoding/json"
"time"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
var oneTimeKeysSchema = `
-- Stores one-time public keys for users
CREATE TABLE IF NOT EXISTS keyserver_one_time_keys (
user_id TEXT NOT NULL,
device_id TEXT NOT NULL,
key_id TEXT NOT NULL,
algorithm TEXT NOT NULL,
ts_added_secs BIGINT NOT NULL,
key_json TEXT NOT NULL,
-- Clobber based on 4-uple of user/device/key/algorithm.
CONSTRAINT keyserver_one_time_keys_unique UNIQUE (user_id, device_id, key_id, algorithm)
);
`
const upsertKeysSQL = "" +
"INSERT INTO keyserver_one_time_keys (user_id, device_id, key_id, algorithm, ts_added_secs, key_json)" +
" VALUES ($1, $2, $3, $4, $5, $6)" +
" ON CONFLICT ON CONSTRAINT keyserver_one_time_keys_unique" +
" DO UPDATE SET key_json = $6"
const selectKeysSQL = "" +
"SELECT key_id, algorithm, key_json FROM keyserver_one_time_keys WHERE user_id=$1 AND device_id=$2"
const selectKeysCountSQL = "" +
"SELECT algorithm, COUNT(key_id) FROM keyserver_one_time_keys WHERE user_id=$1 AND device_id=$2 GROUP BY algorithm"
type oneTimeKeysStatements struct {
db *sql.DB
upsertKeysStmt *sql.Stmt
selectKeysStmt *sql.Stmt
selectKeysCountStmt *sql.Stmt
}
func NewPostgresOneTimeKeysTable(db *sql.DB) (tables.OneTimeKeys, error) {
s := &oneTimeKeysStatements{
db: db,
}
_, err := db.Exec(oneTimeKeysSchema)
if err != nil {
return nil, err
}
if s.upsertKeysStmt, err = db.Prepare(upsertKeysSQL); err != nil {
return nil, err
}
if s.selectKeysStmt, err = db.Prepare(selectKeysSQL); err != nil {
return nil, err
}
if s.selectKeysCountStmt, err = db.Prepare(selectKeysCountSQL); err != nil {
return nil, err
}
return s, nil
}
func (s *oneTimeKeysStatements) SelectOneTimeKeys(ctx context.Context, userID, deviceID string, keyIDsWithAlgorithms []string) (map[string]json.RawMessage, error) {
rows, err := s.selectKeysStmt.QueryContext(ctx, userID, deviceID)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectKeysStmt: rows.close() failed")
wantSet := make(map[string]bool, len(keyIDsWithAlgorithms))
for _, ka := range keyIDsWithAlgorithms {
wantSet[ka] = true
}
result := make(map[string]json.RawMessage)
for rows.Next() {
var keyID string
var algorithm string
var keyJSONStr string
if err := rows.Scan(&keyID, &algorithm, &keyJSONStr); err != nil {
return nil, err
}
keyIDWithAlgo := algorithm + ":" + keyID
if wantSet[keyIDWithAlgo] {
result[keyIDWithAlgo] = json.RawMessage(keyJSONStr)
}
}
return result, rows.Err()
}
func (s *oneTimeKeysStatements) InsertOneTimeKeys(ctx context.Context, keys api.OneTimeKeys) (*api.OneTimeKeysCount, error) {
now := time.Now().Unix()
counts := &api.OneTimeKeysCount{
DeviceID: keys.DeviceID,
UserID: keys.UserID,
KeyCount: make(map[string]int),
}
return counts, sqlutil.WithTransaction(s.db, func(txn *sql.Tx) error {
for keyIDWithAlgo, keyJSON := range keys.KeyJSON {
algo, keyID := keys.Split(keyIDWithAlgo)
_, err := txn.Stmt(s.upsertKeysStmt).ExecContext(
ctx, keys.UserID, keys.DeviceID, keyID, algo, now, string(keyJSON),
)
if err != nil {
return err
}
}
rows, err := txn.Stmt(s.selectKeysCountStmt).QueryContext(ctx, keys.UserID, keys.DeviceID)
if err != nil {
return err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectKeysCountStmt: rows.close() failed")
for rows.Next() {
var algorithm string
var count int
if err = rows.Scan(&algorithm, &count); err != nil {
return err
}
counts.KeyCount[algorithm] = count
}
return rows.Err()
})
}

View File

@ -0,0 +1,42 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/storage/shared"
)
// NewDatabase creates a new sync server database
func NewDatabase(dbDataSourceName string, dbProperties sqlutil.DbProperties) (*shared.Database, error) {
var err error
db, err := sqlutil.Open("postgres", dbDataSourceName, dbProperties)
if err != nil {
return nil, err
}
otk, err := NewPostgresOneTimeKeysTable(db)
if err != nil {
return nil, err
}
dk, err := NewPostgresDeviceKeysTable(db)
if err != nil {
return nil, err
}
return &shared.Database{
DB: db,
OneTimeKeysTable: otk,
DeviceKeysTable: dk,
}, nil
}

View File

@ -0,0 +1,46 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package shared
import (
"context"
"database/sql"
"encoding/json"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
type Database struct {
DB *sql.DB
OneTimeKeysTable tables.OneTimeKeys
DeviceKeysTable tables.DeviceKeys
}
func (d *Database) ExistingOneTimeKeys(ctx context.Context, userID, deviceID string, keyIDsWithAlgorithms []string) (map[string]json.RawMessage, error) {
return d.OneTimeKeysTable.SelectOneTimeKeys(ctx, userID, deviceID, keyIDsWithAlgorithms)
}
func (d *Database) StoreOneTimeKeys(ctx context.Context, keys api.OneTimeKeys) (*api.OneTimeKeysCount, error) {
return d.OneTimeKeysTable.InsertOneTimeKeys(ctx, keys)
}
func (d *Database) DeviceKeysJSON(ctx context.Context, keys []api.DeviceKeys) error {
return d.DeviceKeysTable.SelectDeviceKeysJSON(ctx, keys)
}
func (d *Database) StoreDeviceKeys(ctx context.Context, keys []api.DeviceKeys) error {
return d.DeviceKeysTable.InsertDeviceKeys(ctx, keys)
}

View File

@ -0,0 +1,97 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"time"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
var deviceKeysSchema = `
-- Stores device keys for users
CREATE TABLE IF NOT EXISTS keyserver_device_keys (
user_id TEXT NOT NULL,
device_id TEXT NOT NULL,
ts_added_secs BIGINT NOT NULL,
key_json TEXT NOT NULL,
-- Clobber based on tuple of user/device.
UNIQUE (user_id, device_id)
);
`
const upsertDeviceKeysSQL = "" +
"INSERT INTO keyserver_device_keys (user_id, device_id, ts_added_secs, key_json)" +
" VALUES ($1, $2, $3, $4)" +
" ON CONFLICT (user_id, device_id)" +
" DO UPDATE SET key_json = $4"
const selectDeviceKeysSQL = "" +
"SELECT key_json FROM keyserver_device_keys WHERE user_id=$1 AND device_id=$2"
type deviceKeysStatements struct {
db *sql.DB
upsertDeviceKeysStmt *sql.Stmt
selectDeviceKeysStmt *sql.Stmt
}
func NewSqliteDeviceKeysTable(db *sql.DB) (tables.DeviceKeys, error) {
s := &deviceKeysStatements{
db: db,
}
_, err := db.Exec(deviceKeysSchema)
if err != nil {
return nil, err
}
if s.upsertDeviceKeysStmt, err = db.Prepare(upsertDeviceKeysSQL); err != nil {
return nil, err
}
if s.selectDeviceKeysStmt, err = db.Prepare(selectDeviceKeysSQL); err != nil {
return nil, err
}
return s, nil
}
func (s *deviceKeysStatements) SelectDeviceKeysJSON(ctx context.Context, keys []api.DeviceKeys) error {
for i, key := range keys {
var keyJSONStr string
err := s.selectDeviceKeysStmt.QueryRowContext(ctx, key.UserID, key.DeviceID).Scan(&keyJSONStr)
if err != nil && err != sql.ErrNoRows {
return err
}
// this will be '' when there is no device
keys[i].KeyJSON = []byte(keyJSONStr)
}
return nil
}
func (s *deviceKeysStatements) InsertDeviceKeys(ctx context.Context, keys []api.DeviceKeys) error {
now := time.Now().Unix()
return sqlutil.WithTransaction(s.db, func(txn *sql.Tx) error {
for _, key := range keys {
_, err := txn.Stmt(s.upsertDeviceKeysStmt).ExecContext(
ctx, key.UserID, key.DeviceID, now, string(key.KeyJSON),
)
if err != nil {
return err
}
}
return nil
})
}

View File

@ -0,0 +1,143 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"encoding/json"
"time"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
var oneTimeKeysSchema = `
-- Stores one-time public keys for users
CREATE TABLE IF NOT EXISTS keyserver_one_time_keys (
user_id TEXT NOT NULL,
device_id TEXT NOT NULL,
key_id TEXT NOT NULL,
algorithm TEXT NOT NULL,
ts_added_secs BIGINT NOT NULL,
key_json TEXT NOT NULL,
-- Clobber based on 4-uple of user/device/key/algorithm.
UNIQUE (user_id, device_id, key_id, algorithm)
);
`
const upsertKeysSQL = "" +
"INSERT INTO keyserver_one_time_keys (user_id, device_id, key_id, algorithm, ts_added_secs, key_json)" +
" VALUES ($1, $2, $3, $4, $5, $6)" +
" ON CONFLICT (user_id, device_id, key_id, algorithm)" +
" DO UPDATE SET key_json = $6"
const selectKeysSQL = "" +
"SELECT key_id, algorithm, key_json FROM keyserver_one_time_keys WHERE user_id=$1 AND device_id=$2"
const selectKeysCountSQL = "" +
"SELECT algorithm, COUNT(key_id) FROM keyserver_one_time_keys WHERE user_id=$1 AND device_id=$2 GROUP BY algorithm"
type oneTimeKeysStatements struct {
db *sql.DB
upsertKeysStmt *sql.Stmt
selectKeysStmt *sql.Stmt
selectKeysCountStmt *sql.Stmt
}
func NewSqliteOneTimeKeysTable(db *sql.DB) (tables.OneTimeKeys, error) {
s := &oneTimeKeysStatements{
db: db,
}
_, err := db.Exec(oneTimeKeysSchema)
if err != nil {
return nil, err
}
if s.upsertKeysStmt, err = db.Prepare(upsertKeysSQL); err != nil {
return nil, err
}
if s.selectKeysStmt, err = db.Prepare(selectKeysSQL); err != nil {
return nil, err
}
if s.selectKeysCountStmt, err = db.Prepare(selectKeysCountSQL); err != nil {
return nil, err
}
return s, nil
}
func (s *oneTimeKeysStatements) SelectOneTimeKeys(ctx context.Context, userID, deviceID string, keyIDsWithAlgorithms []string) (map[string]json.RawMessage, error) {
rows, err := s.selectKeysStmt.QueryContext(ctx, userID, deviceID)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectKeysStmt: rows.close() failed")
wantSet := make(map[string]bool, len(keyIDsWithAlgorithms))
for _, ka := range keyIDsWithAlgorithms {
wantSet[ka] = true
}
result := make(map[string]json.RawMessage)
for rows.Next() {
var keyID string
var algorithm string
var keyJSONStr string
if err := rows.Scan(&keyID, &algorithm, &keyJSONStr); err != nil {
return nil, err
}
keyIDWithAlgo := algorithm + ":" + keyID
if wantSet[keyIDWithAlgo] {
result[keyIDWithAlgo] = json.RawMessage(keyJSONStr)
}
}
return result, rows.Err()
}
func (s *oneTimeKeysStatements) InsertOneTimeKeys(ctx context.Context, keys api.OneTimeKeys) (*api.OneTimeKeysCount, error) {
now := time.Now().Unix()
counts := &api.OneTimeKeysCount{
DeviceID: keys.DeviceID,
UserID: keys.UserID,
KeyCount: make(map[string]int),
}
return counts, sqlutil.WithTransaction(s.db, func(txn *sql.Tx) error {
for keyIDWithAlgo, keyJSON := range keys.KeyJSON {
algo, keyID := keys.Split(keyIDWithAlgo)
_, err := txn.Stmt(s.upsertKeysStmt).ExecContext(
ctx, keys.UserID, keys.DeviceID, keyID, algo, now, string(keyJSON),
)
if err != nil {
return err
}
}
rows, err := txn.Stmt(s.selectKeysCountStmt).QueryContext(ctx, keys.UserID, keys.DeviceID)
if err != nil {
return err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectKeysCountStmt: rows.close() failed")
for rows.Next() {
var algorithm string
var count int
if err = rows.Scan(&algorithm, &count); err != nil {
return err
}
counts.KeyCount[algorithm] = count
}
return rows.Err()
})
}

View File

@ -0,0 +1,45 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/storage/shared"
)
func NewDatabase(dataSourceName string) (*shared.Database, error) {
var err error
cs, err := sqlutil.ParseFileURI(dataSourceName)
if err != nil {
return nil, err
}
db, err := sqlutil.Open(sqlutil.SQLiteDriverName(), cs, nil)
if err != nil {
return nil, err
}
otk, err := NewSqliteOneTimeKeysTable(db)
if err != nil {
return nil, err
}
dk, err := NewSqliteDeviceKeysTable(db)
if err != nil {
return nil, err
}
return &shared.Database{
DB: db,
OneTimeKeysTable: otk,
DeviceKeysTable: dk,
}, nil
}

View File

@ -0,0 +1,42 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !wasm
package storage
import (
"net/url"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/storage/postgres"
"github.com/matrix-org/dendrite/keyserver/storage/sqlite3"
)
// NewDatabase opens a new Postgres or Sqlite database (based on dataSourceName scheme)
// and sets postgres connection parameters
func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties) (Database, error) {
uri, err := url.Parse(dataSourceName)
if err != nil {
return postgres.NewDatabase(dataSourceName, dbProperties)
}
switch uri.Scheme {
case "postgres":
return postgres.NewDatabase(dataSourceName, dbProperties)
case "file":
return sqlite3.NewDatabase(dataSourceName)
default:
return postgres.NewDatabase(dataSourceName, dbProperties)
}
}

View File

@ -0,0 +1,41 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"fmt"
"net/url"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/storage/accounts/sqlite3"
)
func NewDatabase(
dataSourceName string,
dbProperties sqlutil.DbProperties, // nolint:unparam
) (Database, error) {
uri, err := url.Parse(dataSourceName)
if err != nil {
return nil, fmt.Errorf("Cannot use postgres implementation")
}
switch uri.Scheme {
case "postgres":
return nil, fmt.Errorf("Cannot use postgres implementation")
case "file":
return sqlite3.NewDatabase(dataSourceName)
default:
return nil, fmt.Errorf("Cannot use postgres implementation")
}
}

View File

@ -0,0 +1,32 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tables
import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/keyserver/api"
)
type OneTimeKeys interface {
SelectOneTimeKeys(ctx context.Context, userID, deviceID string, keyIDsWithAlgorithms []string) (map[string]json.RawMessage, error)
InsertOneTimeKeys(ctx context.Context, keys api.OneTimeKeys) (*api.OneTimeKeysCount, error)
}
type DeviceKeys interface {
SelectDeviceKeysJSON(ctx context.Context, keys []api.DeviceKeys) error
InsertDeviceKeys(ctx context.Context, keys []api.DeviceKeys) error
}

View File

@ -52,5 +52,12 @@ Remote invited user can see room metadata
# this test thinks it's all fine...
Inbound federation accepts a second soft-failed event
# Relies on a rejected PL event which will never be accepted into the DAG
# Caused by https://github.com/matrix-org/sytest/pull/911
Outbound federation requests missing prev_events and then asks for /state_ids and resolves the state
# We don't implement device lists yet
Device list doesn't change if remote server is down
# We don't implement lazy membership loading yet.
The only membership state included in a gapped incremental sync is for senders in the timeline

View File

@ -119,6 +119,7 @@ Newly banned rooms appear in the leave section of incremental sync
Newly banned rooms appear in the leave section of incremental sync
local user can join room with version 1
User can invite local user to room with version 1
Can upload device keys
Should reject keys claiming to belong to a different user
Can add account data
Can add account data to room
@ -280,7 +281,6 @@ Inbound federation of state requires event_id as a mandatory paramater
Inbound federation can get state_ids for a room
Inbound federation of state_ids requires event_id as a mandatory paramater
Federation rejects inbound events where the prev_events cannot be found
Outbound federation requests missing prev_events and then asks for /state_ids and resolves the state
Alternative server names do not cause a routing loop
Events whose auth_events are in the wrong room do not mess up the room state
Inbound federation can return events