Implement backfill in the roomserver (#983)

* Initial cut for backfilling

The syncserver now asks the roomserver via QueryBackfill (which already
existed to *handle* backfill requests) which then makes federation requests
via gomatrixserverlib.RequestBackfill.

Currently, tests fail on subsequent /messages requests because we don't know
which servers are in the room, because we are unable to get state snapshots
from a backfilled event because that code doesn't exist yet.

* WIP backfill, doesn't work

* Make initial backfill pass checks

* Persist backfilled events with state snapshots

* Remove debug lines

* Linting

* Review comments
This commit is contained in:
Kegsay 2020-04-28 11:46:47 +01:00 committed by GitHub
parent 3a858afca2
commit 6d832ae544
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 367 additions and 193 deletions

View File

@ -148,7 +148,7 @@ func main() {
federation := createFederationClient(base) federation := createFederationClient(base)
keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives) keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives)
alias, input, query := roomserver.SetupRoomServerComponent(&base.Base) alias, input, query := roomserver.SetupRoomServerComponent(&base.Base, keyRing)
eduInputAPI := eduserver.SetupEDUServerComponent(&base.Base, cache.New()) eduInputAPI := eduserver.SetupEDUServerComponent(&base.Base, cache.New())
asQuery := appservice.SetupAppServiceAPIComponent( asQuery := appservice.SetupAppServiceAPIComponent(
&base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(), &base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(),

View File

@ -57,7 +57,7 @@ func main() {
federation := base.CreateFederationClient() federation := base.CreateFederationClient()
keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives) keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives)
alias, input, query := roomserver.SetupRoomServerComponent(base) alias, input, query := roomserver.SetupRoomServerComponent(base, keyRing)
eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New())
asQuery := appservice.SetupAppServiceAPIComponent( asQuery := appservice.SetupAppServiceAPIComponent(
base, accountDB, deviceDB, federation, alias, query, transactions.New(), base, accountDB, deviceDB, federation, alias, query, transactions.New(),

View File

@ -18,6 +18,7 @@ import (
_ "net/http/pprof" _ "net/http/pprof"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
) )
@ -25,8 +26,11 @@ func main() {
cfg := basecomponent.ParseFlags() cfg := basecomponent.ParseFlags()
base := basecomponent.NewBaseDendrite(cfg, "RoomServerAPI") base := basecomponent.NewBaseDendrite(cfg, "RoomServerAPI")
defer base.Close() // nolint: errcheck defer base.Close() // nolint: errcheck
keyDB := base.CreateKeyDB()
federation := base.CreateFederationClient()
keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives)
roomserver.SetupRoomServerComponent(base) roomserver.SetupRoomServerComponent(base, keyRing)
base.SetupAndServeHTTP(string(base.Cfg.Bind.RoomServer), string(base.Cfg.Listen.RoomServer)) base.SetupAndServeHTTP(string(base.Cfg.Bind.RoomServer), string(base.Cfg.Listen.RoomServer))

View File

@ -69,6 +69,7 @@ func Backfill(
// Populate the request. // Populate the request.
req := api.QueryBackfillRequest{ req := api.QueryBackfillRequest{
RoomID: roomID,
EarliestEventsIDs: eIDs, EarliestEventsIDs: eIDs,
ServerName: request.Origin(), ServerName: request.Origin(),
} }

1
go.mod
View File

@ -26,6 +26,7 @@ require (
github.com/opentracing/opentracing-go v1.1.0 github.com/opentracing/opentracing-go v1.1.0
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.4.1 github.com/prometheus/client_golang v1.4.1
github.com/prometheus/common v0.9.1
github.com/sirupsen/logrus v1.4.2 github.com/sirupsen/logrus v1.4.2
github.com/tidwall/gjson v1.6.0 github.com/tidwall/gjson v1.6.0
github.com/uber/jaeger-client-go v2.15.0+incompatible github.com/uber/jaeger-client-go v2.15.0+incompatible

11
go.sum
View File

@ -367,6 +367,16 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:kmRjpmFOenVpOaV/DRlo9p6z/IbOKlUC+hhKsAAh8Qg= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:kmRjpmFOenVpOaV/DRlo9p6z/IbOKlUC+hhKsAAh8Qg=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200422082552-d7b4202c47f3 h1:xis1ojN99vjygwqudzB9VQq3cM2SJ7aCAMlXj/YN+88=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200422082552-d7b4202c47f3/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200423090438-562549dbe799 h1:OsoUMTirIpeuZJdYkKKiYe6jm0E5viZR7aOS9K465QI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200423090438-562549dbe799/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200424154222-2827b39252bd h1:243fMfK0XqTQsdUY3IIqtxPX5g9MfPTaAP92CseqOek=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200424154222-2827b39252bd/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3 h1:aJMAKjfXG5I8TqPxJQbQIkGSWM770oxkpgsPHE8C06E=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200427152419-6a0535cc473a h1:tlXCVU3eab9kksGYBRA3oyrmIRwD/aPujo5KJCdlCVQ=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200427152419-6a0535cc473a/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200428095012-a95e289995b1 h1:TB4V69eOtvmHdFp0+BgLNrDCcCwq6QDUOTjmi8fjC/M= github.com/matrix-org/gomatrixserverlib v0.0.0-20200428095012-a95e289995b1 h1:TB4V69eOtvmHdFp0+BgLNrDCcCwq6QDUOTjmi8fjC/M=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200428095012-a95e289995b1/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/gomatrixserverlib v0.0.0-20200428095012-a95e289995b1/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk=
@ -677,6 +687,7 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
gopkg.in/Shopify/sarama.v1 v1.20.1 h1:Gi09A3fJXm0Jgt8kuKZ8YK+r60GfYn7MQuEmI3oq6hE= gopkg.in/Shopify/sarama.v1 v1.20.1 h1:Gi09A3fJXm0Jgt8kuKZ8YK+r60GfYn7MQuEmI3oq6hE=
gopkg.in/Shopify/sarama.v1 v1.20.1/go.mod h1:AxnvoaevB2nBjNK17cG61A3LleFcWFwVBHBt+cot4Oc= gopkg.in/Shopify/sarama.v1 v1.20.1/go.mod h1:AxnvoaevB2nBjNK17cG61A3LleFcWFwVBHBt+cot4Oc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@ -229,6 +229,8 @@ type QueryStateAndAuthChainResponse struct {
// QueryBackfillRequest is a request to QueryBackfill. // QueryBackfillRequest is a request to QueryBackfill.
type QueryBackfillRequest struct { type QueryBackfillRequest struct {
// The room to backfill
RoomID string `json:"room_id"`
// Events to start paginating from. // Events to start paginating from.
EarliestEventsIDs []string `json:"earliest_event_ids"` EarliestEventsIDs []string `json:"earliest_event_ids"`
// The maximum number of events to retrieve. // The maximum number of events to retrieve.
@ -243,21 +245,7 @@ type QueryBackfillResponse struct {
Events []gomatrixserverlib.HeaderedEvent `json:"events"` Events []gomatrixserverlib.HeaderedEvent `json:"events"`
} }
// QueryServersInRoomAtEventRequest is a request to QueryServersInRoomAtEvent // QueryRoomVersionCapabilitiesRequest asks for the default room version
type QueryServersInRoomAtEventRequest struct {
// ID of the room to retrieve member servers for.
RoomID string `json:"room_id"`
// ID of the event for which to retrieve member servers.
EventID string `json:"event_id"`
}
// QueryServersInRoomAtEventResponse is a response to QueryServersInRoomAtEvent
type QueryServersInRoomAtEventResponse struct {
// Servers present in the room for these events.
Servers []gomatrixserverlib.ServerName `json:"servers"`
}
// QueryRoomVersionCapabilities asks for the default room version
type QueryRoomVersionCapabilitiesRequest struct{} type QueryRoomVersionCapabilitiesRequest struct{}
// QueryRoomVersionCapabilitiesResponse is a response to QueryRoomVersionCapabilitiesRequest // QueryRoomVersionCapabilitiesResponse is a response to QueryRoomVersionCapabilitiesRequest
@ -266,12 +254,12 @@ type QueryRoomVersionCapabilitiesResponse struct {
AvailableRoomVersions map[gomatrixserverlib.RoomVersion]string `json:"available"` AvailableRoomVersions map[gomatrixserverlib.RoomVersion]string `json:"available"`
} }
// QueryRoomVersionForRoom asks for the room version for a given room. // QueryRoomVersionForRoomRequest asks for the room version for a given room.
type QueryRoomVersionForRoomRequest struct { type QueryRoomVersionForRoomRequest struct {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
} }
// QueryRoomVersionCapabilitiesResponse is a response to QueryServersInRoomAtEventResponse // QueryRoomVersionForRoomResponse is a response to QueryRoomVersionForRoomRequest
type QueryRoomVersionForRoomResponse struct { type QueryRoomVersionForRoomResponse struct {
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"` RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
} }
@ -350,12 +338,6 @@ type RoomserverQueryAPI interface {
response *QueryBackfillResponse, response *QueryBackfillResponse,
) error ) error
QueryServersInRoomAtEvent(
ctx context.Context,
request *QueryServersInRoomAtEventRequest,
response *QueryServersInRoomAtEventResponse,
) error
// Asks for the default room version as preferred by the server. // Asks for the default room version as preferred by the server.
QueryRoomVersionCapabilities( QueryRoomVersionCapabilities(
ctx context.Context, ctx context.Context,
@ -401,13 +383,10 @@ const RoomserverQueryStateAndAuthChainPath = "/api/roomserver/queryStateAndAuthC
// RoomserverQueryBackfillPath is the HTTP path for the QueryBackfillPath API // RoomserverQueryBackfillPath is the HTTP path for the QueryBackfillPath API
const RoomserverQueryBackfillPath = "/api/roomserver/queryBackfill" const RoomserverQueryBackfillPath = "/api/roomserver/queryBackfill"
// RoomserverQueryServersInRoomAtEventPath is the HTTP path for the QueryServersInRoomAtEvent API
const RoomserverQueryServersInRoomAtEventPath = "/api/roomserver/queryServersInRoomAtEvents"
// RoomserverQueryRoomVersionCapabilitiesPath is the HTTP path for the QueryRoomVersionCapabilities API // RoomserverQueryRoomVersionCapabilitiesPath is the HTTP path for the QueryRoomVersionCapabilities API
const RoomserverQueryRoomVersionCapabilitiesPath = "/api/roomserver/queryRoomVersionCapabilities" const RoomserverQueryRoomVersionCapabilitiesPath = "/api/roomserver/queryRoomVersionCapabilities"
// RoomserverQueryRoomVersionCapabilitiesPath is the HTTP path for the QueryRoomVersionCapabilities API // RoomserverQueryRoomVersionForRoomPath is the HTTP path for the QueryRoomVersionForRoom API
const RoomserverQueryRoomVersionForRoomPath = "/api/roomserver/queryRoomVersionForRoom" const RoomserverQueryRoomVersionForRoomPath = "/api/roomserver/queryRoomVersionForRoom"
// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API. // NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API.
@ -555,19 +534,6 @@ func (h *httpRoomserverQueryAPI) QueryBackfill(
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }
// QueryServersInRoomAtEvent implements RoomServerQueryAPI
func (h *httpRoomserverQueryAPI) QueryServersInRoomAtEvent(
ctx context.Context,
request *QueryServersInRoomAtEventRequest,
response *QueryServersInRoomAtEventResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryServersInRoomAtEvent")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverQueryServersInRoomAtEventPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryRoomVersionCapabilities implements RoomServerQueryAPI // QueryRoomVersionCapabilities implements RoomServerQueryAPI
func (h *httpRoomserverQueryAPI) QueryRoomVersionCapabilities( func (h *httpRoomserverQueryAPI) QueryRoomVersionCapabilities(
ctx context.Context, ctx context.Context,

View File

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -53,6 +54,7 @@ func processRoomEvent(
// Check that the event passes authentication checks and work out the numeric IDs for the auth events. // Check that the event passes authentication checks and work out the numeric IDs for the auth events.
authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs) authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs)
if err != nil { if err != nil {
logrus.WithError(err).WithField("event_id", event.EventID()).Error("processRoomEvent.checkAuthEvents failed for event")
return return
} }
@ -77,6 +79,7 @@ func processRoomEvent(
// For outliers we can stop after we've stored the event itself as it // For outliers we can stop after we've stored the event itself as it
// doesn't have any associated state to store and we don't need to // doesn't have any associated state to store and we don't need to
// notify anyone about it. // notify anyone about it.
logrus.WithField("event_id", event.EventID()).WithField("type", event.Type()).WithField("room", event.RoomID()).Info("Stored outlier")
return event.EventID(), nil return event.EventID(), nil
} }
@ -89,11 +92,6 @@ func processRoomEvent(
} }
} }
if input.Kind == api.KindBackfill {
// Backfill is not implemented.
panic("Not implemented")
}
// Update the extremities of the event graph for the room // Update the extremities of the event graph for the room
return event.EventID(), updateLatestEvents( return event.EventID(), updateLatestEvents(
ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID, ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID,

View File

@ -0,0 +1,217 @@
package query
import (
"context"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
// backfillRequester implements gomatrixserverlib.BackfillRequester
type backfillRequester struct {
db storage.Database
fedClient *gomatrixserverlib.FederationClient
thisServer gomatrixserverlib.ServerName
// per-request state
servers []gomatrixserverlib.ServerName
eventIDToBeforeStateIDs map[string][]string
eventIDMap map[string]gomatrixserverlib.Event
}
func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName) *backfillRequester {
return &backfillRequester{
db: db,
fedClient: fedClient,
thisServer: thisServer,
eventIDToBeforeStateIDs: make(map[string][]string),
eventIDMap: make(map[string]gomatrixserverlib.Event),
}
}
func (b *backfillRequester) StateIDsBeforeEvent(ctx context.Context, targetEvent gomatrixserverlib.HeaderedEvent) ([]string, error) {
b.eventIDMap[targetEvent.EventID()] = targetEvent.Unwrap()
if ids, ok := b.eventIDToBeforeStateIDs[targetEvent.EventID()]; ok {
return ids, nil
}
// if we have exactly 1 prev event and we know the state of the room at that prev event, then just roll forward the prev event.
// Else, we have to hit /state_ids because either we don't know the state at all at this event (new backwards extremity) or
// we don't know the result of state res to merge forks (2 or more prev_events)
if len(targetEvent.PrevEventIDs()) == 1 {
prevEventID := targetEvent.PrevEventIDs()[0]
prevEvent, ok := b.eventIDMap[prevEventID]
if !ok {
goto FederationHit
}
prevEventStateIDs, ok := b.eventIDToBeforeStateIDs[prevEventID]
if !ok {
goto FederationHit
}
newStateIDs := b.calculateNewStateIDs(targetEvent.Unwrap(), prevEvent, prevEventStateIDs)
if newStateIDs != nil {
b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs
return newStateIDs, nil
}
// else we failed to calculate the new state, so fallthrough
}
FederationHit:
var lastErr error
logrus.WithField("event_id", targetEvent.EventID()).Info("Requesting /state_ids at event")
for _, srv := range b.servers { // hit any valid server
c := gomatrixserverlib.FederatedStateProvider{
FedClient: b.fedClient,
AuthEventsOnly: false,
Server: srv,
}
res, err := c.StateIDsBeforeEvent(ctx, targetEvent)
if err != nil {
lastErr = err
continue
}
b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res
return res, nil
}
return nil, lastErr
}
func (b *backfillRequester) calculateNewStateIDs(targetEvent, prevEvent gomatrixserverlib.Event, prevEventStateIDs []string) []string {
newStateIDs := prevEventStateIDs[:]
if prevEvent.StateKey() == nil {
// state is the same as the previous event
b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs
return newStateIDs
}
missingState := false // true if we are missing the info for a state event ID
foundEvent := false // true if we found a (type, state_key) match
// find which state ID to replace, if any
for i, id := range newStateIDs {
ev, ok := b.eventIDMap[id]
if !ok {
missingState = true
continue
}
// The state IDs BEFORE the target event are the state IDs BEFORE the prev_event PLUS the prev_event itself
if ev.Type() == prevEvent.Type() && ev.StateKey() != nil && *ev.StateKey() == *prevEvent.StateKey() {
newStateIDs[i] = prevEvent.EventID()
foundEvent = true
break
}
}
if !foundEvent && !missingState {
// we can be certain that this is new state
newStateIDs = append(newStateIDs, prevEvent.EventID())
foundEvent = true
}
if foundEvent {
b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs
return newStateIDs
}
return nil
}
func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatrixserverlib.RoomVersion, event gomatrixserverlib.HeaderedEvent, eventIDs []string) (map[string]*gomatrixserverlib.Event, error) {
// try to fetch the events from the database first
events, err := b.ProvideEvents(roomVer, eventIDs)
if err != nil {
// non-fatal, fallthrough
logrus.WithError(err).Info("Failed to fetch events")
} else {
logrus.Infof("Fetched %d/%d events from the database", len(events), len(eventIDs))
if len(events) == len(eventIDs) {
result := make(map[string]*gomatrixserverlib.Event)
for i := range events {
result[events[i].EventID()] = &events[i]
b.eventIDMap[events[i].EventID()] = events[i]
}
return result, nil
}
}
c := gomatrixserverlib.FederatedStateProvider{
FedClient: b.fedClient,
AuthEventsOnly: false,
Server: b.servers[0],
}
result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs)
if err != nil {
return nil, err
}
for eventID, ev := range result {
b.eventIDMap[eventID] = *ev
}
return result, nil
}
// ServersAtEvent is called when trying to determine which server to request from.
// It returns a list of servers which can be queried for backfill requests. These servers
// will be servers that are in the room already. The entries at the beginning are preferred servers
// and will be tried first. An empty list will fail the request.
func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID string) (servers []gomatrixserverlib.ServerName) {
// getMembershipsBeforeEventNID requires a NID, so retrieving the NID for
// the event is necessary.
NIDs, err := b.db.EventNIDs(ctx, []string{eventID})
if err != nil {
logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get event NID for event")
return
}
// Retrieve all "m.room.member" state events of "join" membership, which
// contains the list of users in the room before the event, therefore all
// the servers in it at that moment.
events, err := getMembershipsBeforeEventNID(ctx, b.db, NIDs[eventID], true)
if err != nil {
logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get memberships before event")
return
}
// Store the server names in a temporary map to avoid duplicates.
serverSet := make(map[gomatrixserverlib.ServerName]bool)
for _, event := range events {
serverSet[event.Origin()] = true
}
for server := range serverSet {
if server == b.thisServer {
continue
}
servers = append(servers, server)
}
b.servers = servers
return
}
// Backfill performs a backfill request to the given server.
// https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid
func (b *backfillRequester) Backfill(ctx context.Context, server gomatrixserverlib.ServerName, roomID string, fromEventIDs []string, limit int) (*gomatrixserverlib.Transaction, error) {
tx, err := b.fedClient.Backfill(ctx, server, roomID, limit, fromEventIDs)
return &tx, err
}
func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.Event, error) {
ctx := context.Background()
nidMap, err := b.db.EventNIDs(ctx, eventIDs)
if err != nil {
logrus.WithError(err).WithField("event_ids", eventIDs).Error("Failed to find events")
return nil, err
}
eventNIDs := make([]types.EventNID, len(nidMap))
i := 0
for _, nid := range nidMap {
eventNIDs[i] = nid
i++
}
eventsWithNids, err := b.db.Events(ctx, eventNIDs)
if err != nil {
logrus.WithError(err).WithField("event_nids", eventNIDs).Error("Failed to load events")
return nil, err
}
events := make([]gomatrixserverlib.Event, len(eventsWithNids))
for i := range eventsWithNids {
events[i] = eventsWithNids[i].Event
}
return events, nil
}

View File

@ -19,6 +19,7 @@ package query
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
@ -31,12 +32,16 @@ import (
"github.com/matrix-org/dendrite/roomserver/version" "github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/sirupsen/logrus"
) )
// RoomserverQueryAPI is an implementation of api.RoomserverQueryAPI // RoomserverQueryAPI is an implementation of api.RoomserverQueryAPI
type RoomserverQueryAPI struct { type RoomserverQueryAPI struct {
DB storage.Database DB storage.Database
ImmutableCache caching.ImmutableCache ImmutableCache caching.ImmutableCache
ServerName gomatrixserverlib.ServerName
KeyRing gomatrixserverlib.JSONVerifier
FedClient *gomatrixserverlib.FederationClient
} }
// QueryLatestEventsAndState implements api.RoomserverQueryAPI // QueryLatestEventsAndState implements api.RoomserverQueryAPI
@ -281,7 +286,7 @@ func (r *RoomserverQueryAPI) QueryMembershipsForRoom(
events, err = r.DB.Events(ctx, eventNIDs) events, err = r.DB.Events(ctx, eventNIDs)
} else { } else {
events, err = r.getMembershipsBeforeEventNID(ctx, membershipEventNID, request.JoinedOnly) events, err = getMembershipsBeforeEventNID(ctx, r.DB, membershipEventNID, request.JoinedOnly)
} }
if err != nil { if err != nil {
@ -300,19 +305,19 @@ func (r *RoomserverQueryAPI) QueryMembershipsForRoom(
// of the event's room as it was when this event was fired, then filters the state events to // of the event's room as it was when this event was fired, then filters the state events to
// only keep the "m.room.member" events with a "join" membership. These events are returned. // only keep the "m.room.member" events with a "join" membership. These events are returned.
// Returns an error if there was an issue fetching the events. // Returns an error if there was an issue fetching the events.
func (r *RoomserverQueryAPI) getMembershipsBeforeEventNID( func getMembershipsBeforeEventNID(
ctx context.Context, eventNID types.EventNID, joinedOnly bool, ctx context.Context, db storage.Database, eventNID types.EventNID, joinedOnly bool,
) ([]types.Event, error) { ) ([]types.Event, error) {
roomState := state.NewStateResolution(r.DB) roomState := state.NewStateResolution(db)
events := []types.Event{} events := []types.Event{}
// Lookup the event NID // Lookup the event NID
eIDs, err := r.DB.EventIDs(ctx, []types.EventNID{eventNID}) eIDs, err := db.EventIDs(ctx, []types.EventNID{eventNID})
if err != nil { if err != nil {
return nil, err return nil, err
} }
eventIDs := []string{eIDs[eventNID]} eventIDs := []string{eIDs[eventNID]}
prevState, err := r.DB.StateAtEventIDs(ctx, eventIDs) prevState, err := db.StateAtEventIDs(ctx, eventIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -332,7 +337,7 @@ func (r *RoomserverQueryAPI) getMembershipsBeforeEventNID(
} }
// Get all of the events in this state // Get all of the events in this state
stateEvents, err := r.DB.Events(ctx, eventNIDs) stateEvents, err := db.Events(ctx, eventNIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -484,6 +489,13 @@ func (r *RoomserverQueryAPI) QueryBackfill(
request *api.QueryBackfillRequest, request *api.QueryBackfillRequest,
response *api.QueryBackfillResponse, response *api.QueryBackfillResponse,
) error { ) error {
// if we are requesting the backfill then we need to do a federation hit
// TODO: we could be more sensible and fetch as many events we already have then request the rest
// which is what the syncapi does already.
if request.ServerName == r.ServerName {
return r.backfillViaFederation(ctx, request, response)
}
// someone else is requesting the backfill, try to service their request.
var err error var err error
var front []string var front []string
@ -525,6 +537,55 @@ func (r *RoomserverQueryAPI) QueryBackfill(
return err return err
} }
func (r *RoomserverQueryAPI) backfillViaFederation(ctx context.Context, req *api.QueryBackfillRequest, res *api.QueryBackfillResponse) error {
roomVer, err := r.DB.GetRoomVersionForRoom(ctx, req.RoomID)
if err != nil {
return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err)
}
requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName)
events, err := gomatrixserverlib.RequestBackfill(
ctx, requester,
r.KeyRing, req.RoomID, roomVer, req.EarliestEventsIDs, req.Limit)
if err != nil {
return err
}
logrus.WithField("room_id", req.RoomID).Infof("backfilled %d events", len(events))
// persist these new events - auth checks have already been done
roomNID, backfilledEventMap := persistEvents(ctx, r.DB, events)
if err != nil {
return err
}
for _, ev := range backfilledEventMap {
// now add state for these events
stateIDs, ok := requester.eventIDToBeforeStateIDs[ev.EventID()]
if !ok {
// this should be impossible as all events returned must have pass Step 5 of the PDU checks
// which requires a list of state IDs.
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to find state IDs for event which passed auth checks")
continue
}
var entries []types.StateEntry
if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs); err != nil {
return err
}
var beforeStateSnapshotNID types.StateSnapshotNID
if beforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, entries); err != nil {
return err
}
if err = r.DB.SetState(ctx, ev.EventNID, beforeStateSnapshotNID); err != nil {
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to set state before event")
}
}
// TODO: update backwards extremities, as that should be moved from syncapi to roomserver at some point.
res.Events = events
return nil
}
func (r *RoomserverQueryAPI) isServerCurrentlyInRoom(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID string) (bool, error) { func (r *RoomserverQueryAPI) isServerCurrentlyInRoom(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID string) (bool, error) {
roomNID, err := r.DB.RoomNID(ctx, roomID) roomNID, err := r.DB.RoomNID(ctx, roomID)
if err != nil { if err != nil {
@ -778,39 +839,33 @@ func getAuthChain(
return authEvents, nil return authEvents, nil
} }
// QueryServersInRoomAtEvent implements api.RoomserverQueryAPI func persistEvents(ctx context.Context, db storage.Database, events []gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) {
func (r *RoomserverQueryAPI) QueryServersInRoomAtEvent( var roomNID types.RoomNID
ctx context.Context, backfilledEventMap := make(map[string]types.Event)
request *api.QueryServersInRoomAtEventRequest, for _, ev := range events {
response *api.QueryServersInRoomAtEventResponse, nidMap, err := db.EventNIDs(ctx, ev.AuthEventIDs())
) error { if err != nil { // this shouldn't happen as RequestBackfill already found them
// getMembershipsBeforeEventNID requires a NID, so retrieving the NID for logrus.WithError(err).WithField("auth_events", ev.AuthEventIDs()).Error("Failed to find one or more auth events")
// the event is necessary. continue
NIDs, err := r.DB.EventNIDs(ctx, []string{request.EventID}) }
authNids := make([]types.EventNID, len(nidMap))
i := 0
for _, nid := range nidMap {
authNids[i] = nid
i++
}
var stateAtEvent types.StateAtEvent
roomNID, stateAtEvent, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids)
if err != nil { if err != nil {
return err logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to store backfilled event")
continue
} }
backfilledEventMap[ev.EventID()] = types.Event{
// Retrieve all "m.room.member" state events of "join" membership, which EventNID: stateAtEvent.StateEntry.EventNID,
// contains the list of users in the room before the event, therefore all Event: ev.Unwrap(),
// the servers in it at that moment.
events, err := r.getMembershipsBeforeEventNID(ctx, NIDs[request.EventID], true)
if err != nil {
return err
} }
// Store the server names in a temporary map to avoid duplicates.
servers := make(map[gomatrixserverlib.ServerName]bool)
for _, event := range events {
servers[event.Origin()] = true
} }
return roomNID, backfilledEventMap
// Populate the response.
for server := range servers {
response.Servers = append(response.Servers, server)
}
return nil
} }
// QueryRoomVersionCapabilities implements api.RoomserverQueryAPI // QueryRoomVersionCapabilities implements api.RoomserverQueryAPI
@ -994,20 +1049,6 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}), }),
) )
servMux.Handle(
api.RoomserverQueryServersInRoomAtEventPath,
common.MakeInternalAPI("QueryServersInRoomAtEvent", func(req *http.Request) util.JSONResponse {
var request api.QueryServersInRoomAtEventRequest
var response api.QueryServersInRoomAtEventResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := r.QueryServersInRoomAtEvent(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle( servMux.Handle(
api.RoomserverQueryRoomVersionCapabilitiesPath, api.RoomserverQueryRoomVersionCapabilitiesPath,
common.MakeInternalAPI("QueryRoomVersionCapabilities", func(req *http.Request) util.JSONResponse { common.MakeInternalAPI("QueryRoomVersionCapabilities", func(req *http.Request) util.JSONResponse {

View File

@ -18,6 +18,7 @@ import (
"net/http" "net/http"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
asQuery "github.com/matrix-org/dendrite/appservice/query" asQuery "github.com/matrix-org/dendrite/appservice/query"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
@ -33,7 +34,7 @@ import (
// allowing other components running in the same process to hit the query the // allowing other components running in the same process to hit the query the
// APIs directly instead of having to use HTTP. // APIs directly instead of having to use HTTP.
func SetupRoomServerComponent( func SetupRoomServerComponent(
base *basecomponent.BaseDendrite, base *basecomponent.BaseDendrite, keyRing gomatrixserverlib.JSONVerifier,
) (api.RoomserverAliasAPI, api.RoomserverInputAPI, api.RoomserverQueryAPI) { ) (api.RoomserverAliasAPI, api.RoomserverInputAPI, api.RoomserverQueryAPI) {
roomserverDB, err := storage.Open(string(base.Cfg.Database.RoomServer)) roomserverDB, err := storage.Open(string(base.Cfg.Database.RoomServer))
if err != nil { if err != nil {
@ -51,6 +52,11 @@ func SetupRoomServerComponent(
queryAPI := query.RoomserverQueryAPI{ queryAPI := query.RoomserverQueryAPI{
DB: roomserverDB, DB: roomserverDB,
ImmutableCache: base.ImmutableCache, ImmutableCache: base.ImmutableCache,
ServerName: base.Cfg.Matrix.ServerName,
FedClient: base.CreateFederationClient(),
// TODO: We should have a key server so we don't keep adding components
// which talk to the same DB.
KeyRing: keyRing,
} }
queryAPI.SetupHTTP(http.DefaultServeMux) queryAPI.SetupHTTP(http.DefaultServeMux)

View File

@ -31,7 +31,8 @@ type Database interface {
state []types.StateEntry, state []types.StateEntry,
) (types.StateSnapshotNID, error) ) (types.StateSnapshotNID, error)
// Look up the state of a room at each event for a list of string event IDs. // Look up the state of a room at each event for a list of string event IDs.
// Returns an error if there is an error talking to the database // Returns an error if there is an error talking to the database.
// The length of []types.StateAtEvent is guaranteed to equal the length of eventIDs if no error is returned.
// Returns a types.MissingEventError if the room state for the event IDs aren't in the database // Returns a types.MissingEventError if the room state for the event IDs aren't in the database
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error) StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
// Look up the numeric IDs for a list of string event types. // Look up the numeric IDs for a list of string event types.

View File

@ -285,7 +285,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
// Check if we have backward extremities for this room. // Check if we have backward extremities for this room.
if len(backwardExtremities) > 0 { if len(backwardExtremities) > 0 {
// If so, retrieve as much events as needed through backfilling. // If so, retrieve as much events as needed through backfilling.
events, err = r.backfill(backwardExtremities, r.limit) events, err = r.backfill(r.roomID, backwardExtremities, r.limit)
if err != nil { if err != nil {
return return
} }
@ -334,7 +334,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering { if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering {
var pdus []gomatrixserverlib.HeaderedEvent var pdus []gomatrixserverlib.HeaderedEvent
// Only ask the remote server for enough events to reach the limit. // Only ask the remote server for enough events to reach the limit.
pdus, err = r.backfill(backwardExtremities, r.limit-len(streamEvents)) pdus, err = r.backfill(r.roomID, backwardExtremities, r.limit-len(streamEvents))
if err != nil { if err != nil {
return return
} }
@ -358,45 +358,29 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
// event, or if there is no remote homeserver to contact. // event, or if there is no remote homeserver to contact.
// Returns an error if there was an issue with retrieving the list of servers in // Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request. // the room or sending the request.
func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) { func (r *messagesReq) backfill(roomID string, fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: r.roomID} var res api.QueryBackfillResponse
verRes := api.QueryRoomVersionForRoomResponse{} err := r.queryAPI.QueryBackfill(context.Background(), &api.QueryBackfillRequest{
if err := r.queryAPI.QueryRoomVersionForRoom(r.ctx, &verReq, &verRes); err != nil { RoomID: roomID,
return nil, err EarliestEventsIDs: fromEventIDs,
} Limit: limit,
ServerName: r.cfg.Matrix.ServerName,
srvToBackfillFrom, err := r.serverToBackfillFrom(fromEventIDs) }, &res)
if err != nil { if err != nil {
return nil, fmt.Errorf("Cannot find server to backfill from: %w", err) return nil, fmt.Errorf("QueryBackfill failed: %w", err)
} }
util.GetLogger(r.ctx).WithField("new_events", len(res.Events)).Info("Storing new events from backfill")
headered := make([]gomatrixserverlib.HeaderedEvent, 0) // TODO: we should only be inserting events into the database from the roomserver's kafka output stream.
// Currently, this can race with live events for the room and cause problems. It's also just a bit unclear
// If the roomserver responded with at least one server that isn't us, // when you have multiple entry points to write events.
// send it a request for backfill.
util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("limit", limit).Info("Backfilling from server")
txn, err := r.federation.Backfill(
r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs,
)
if err != nil {
return nil, err
}
for _, p := range txn.PDUs {
event, e := gomatrixserverlib.NewEventFromUntrustedJSON(p, verRes.RoomVersion)
if e != nil {
continue
}
headered = append(headered, event.Headered(verRes.RoomVersion))
}
util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("new_events", len(headered)).Info("Storing new events from backfill")
// Store the events in the database, while marking them as unfit to show // Store the events in the database, while marking them as unfit to show
// up in responses to sync requests. // up in responses to sync requests.
for i := range headered { for i := range res.Events {
if _, err = r.db.WriteEvent( if _, err = r.db.WriteEvent(
r.ctx, r.ctx,
&headered[i], &res.Events[i],
[]gomatrixserverlib.HeaderedEvent{}, []gomatrixserverlib.HeaderedEvent{},
[]string{}, []string{},
[]string{}, []string{},
@ -406,63 +390,7 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
} }
} }
return headered, nil return res.Events, nil
}
func (r *messagesReq) serverToBackfillFrom(fromEventIDs []string) (gomatrixserverlib.ServerName, error) {
// Query the list of servers in the room when one of the backward extremities
// was sent.
var serversResponse api.QueryServersInRoomAtEventResponse
serversRequest := api.QueryServersInRoomAtEventRequest{
RoomID: r.roomID,
EventID: fromEventIDs[0],
}
if err := r.queryAPI.QueryServersInRoomAtEvent(r.ctx, &serversRequest, &serversResponse); err != nil {
util.GetLogger(r.ctx).WithError(err).Warn("Failed to query servers in room at event, falling back to event sender")
// FIXME: We shouldn't be doing this but in situations where we have already backfilled once
// the query API doesn't work as backfilled events do not make it to the room server.
// This means QueryServersInRoomAtEvent returns an error as it doesn't have the event ID in question.
// We need to inject backfilled events into the room server and store them appropriately.
events, err := r.db.Events(r.ctx, fromEventIDs)
if err != nil {
return "", err
}
if len(events) == 0 {
// should be impossible as these event IDs are backwards extremities
return "", fmt.Errorf("backfill: missing backwards extremities, event IDs: %s", fromEventIDs)
}
// The rationale here is that the last event was unlikely to be sent by us, so poke the server who sent it.
// We shouldn't be doing this really, but as a heuristic it should work pretty well for now.
for _, e := range events {
_, srv, srverr := gomatrixserverlib.SplitID('@', e.Sender())
if srverr != nil {
util.GetLogger(r.ctx).WithError(srverr).Warn("Failed to extract domain from event sender")
continue
}
if srv != r.cfg.Matrix.ServerName {
return srv, nil
}
}
// no valid events which have a remote server, fail.
return "", err
}
// Use the first server from the response, except if that server is us.
// In that case, use the second one if the roomserver responded with
// enough servers. If not, use an empty string to prevent the backfill
// from happening as there's no server to direct the request towards.
// TODO: Be smarter at selecting the server to direct the request
// towards.
srvToBackfillFrom := serversResponse.Servers[0]
if srvToBackfillFrom == r.cfg.Matrix.ServerName {
if len(serversResponse.Servers) > 1 {
srvToBackfillFrom = serversResponse.Servers[1]
} else {
util.GetLogger(r.ctx).Info("Not enough servers to backfill from")
return "", nil
}
}
return srvToBackfillFrom, nil
} }
// setToDefault returns the default value for the "to" query parameter of a // setToDefault returns the default value for the "to" query parameter of a