Change how servers are selected for missing auth/prev events (#1892)

* Change how servers are selected for missing auth/prev events

* Shuffle order

* Move ServersInRoomProvider into api package
This commit is contained in:
Neil Alexander 2021-06-30 12:05:58 +01:00 committed by GitHub
parent 0e69212206
commit b7a2d369c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 50 additions and 29 deletions

View File

@ -33,7 +33,7 @@ func FederationAPI(base *setup.BaseDendrite, cfg *config.Dendrite) {
base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicFederationAPIMux, base.PublicKeyAPIMux,
&base.Cfg.FederationAPI, userAPI, federation, keyRing, &base.Cfg.FederationAPI, userAPI, federation, keyRing,
rsAPI, fsAPI, base.EDUServerClient(), keyAPI, rsAPI, fsAPI, base.EDUServerClient(), keyAPI,
&base.Cfg.MSCs, &base.Cfg.MSCs, nil,
) )
base.SetupAndServeHTTP( base.SetupAndServeHTTP(

View File

@ -0,0 +1,11 @@
package api
import (
"context"
"github.com/matrix-org/gomatrixserverlib"
)
type ServersInRoomProvider interface {
GetServersForRoom(ctx context.Context, roomID string, event *gomatrixserverlib.Event) []gomatrixserverlib.ServerName
}

View File

@ -17,6 +17,7 @@ package federationapi
import ( import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
@ -39,10 +40,12 @@ func AddPublicRoutes(
eduAPI eduserverAPI.EDUServerInputAPI, eduAPI eduserverAPI.EDUServerInputAPI,
keyAPI keyserverAPI.KeyInternalAPI, keyAPI keyserverAPI.KeyInternalAPI,
mscCfg *config.MSCs, mscCfg *config.MSCs,
servers federationAPI.ServersInRoomProvider,
) { ) {
routing.Setup( routing.Setup(
fedRouter, keyRouter, cfg, rsAPI, fedRouter, keyRouter, cfg, rsAPI,
eduAPI, federationSenderAPI, keyRing, eduAPI, federationSenderAPI, keyRing,
federation, userAPI, keyAPI, mscCfg, federation, userAPI, keyAPI, mscCfg,
servers,
) )
} }

View File

@ -31,7 +31,7 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) {
fsAPI := base.FederationSenderHTTPClient() fsAPI := base.FederationSenderHTTPClient()
// TODO: This is pretty fragile, as if anything calls anything on these nils this test will break. // TODO: This is pretty fragile, as if anything calls anything on these nils this test will break.
// Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing. // Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing.
federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, &cfg.MSCs) federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, &cfg.MSCs, nil)
baseURL, cancel := test.ListenAndServe(t, base.PublicFederationAPIMux, true) baseURL, cancel := test.ListenAndServe(t, base.PublicFederationAPIMux, true)
defer cancel() defer cancel()
serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://")) serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://"))

View File

@ -20,6 +20,7 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
@ -50,6 +51,7 @@ func Setup(
userAPI userapi.UserInternalAPI, userAPI userapi.UserInternalAPI,
keyAPI keyserverAPI.KeyInternalAPI, keyAPI keyserverAPI.KeyInternalAPI,
mscCfg *config.MSCs, mscCfg *config.MSCs,
servers federationAPI.ServersInRoomProvider,
) { ) {
v2keysmux := keyMux.PathPrefix("/v2").Subrouter() v2keysmux := keyMux.PathPrefix("/v2").Subrouter()
v1fedmux := fedMux.PathPrefix("/v1").Subrouter() v1fedmux := fedMux.PathPrefix("/v1").Subrouter()
@ -99,7 +101,7 @@ func Setup(
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return Send( return Send(
httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]), httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]),
cfg, rsAPI, eduAPI, keyAPI, keys, federation, mu, cfg, rsAPI, eduAPI, keyAPI, keys, federation, mu, servers,
) )
}, },
)).Methods(http.MethodPut, http.MethodOptions) )).Methods(http.MethodPut, http.MethodOptions)

View File

@ -27,6 +27,7 @@ import (
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -101,6 +102,7 @@ func Send(
keys gomatrixserverlib.JSONVerifier, keys gomatrixserverlib.JSONVerifier,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
mu *internal.MutexByRoom, mu *internal.MutexByRoom,
servers federationAPI.ServersInRoomProvider,
) util.JSONResponse { ) util.JSONResponse {
t := txnReq{ t := txnReq{
rsAPI: rsAPI, rsAPI: rsAPI,
@ -109,6 +111,7 @@ func Send(
federation: federation, federation: federation,
hadEvents: make(map[string]bool), hadEvents: make(map[string]bool),
haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
servers: servers,
keyAPI: keyAPI, keyAPI: keyAPI,
roomsMu: mu, roomsMu: mu,
} }
@ -165,9 +168,9 @@ type txnReq struct {
keyAPI keyapi.KeyInternalAPI keyAPI keyapi.KeyInternalAPI
keys gomatrixserverlib.JSONVerifier keys gomatrixserverlib.JSONVerifier
federation txnFederationClient federation txnFederationClient
servers []gomatrixserverlib.ServerName
serversMutex sync.RWMutex
roomsMu *internal.MutexByRoom roomsMu *internal.MutexByRoom
// something that can tell us about which servers are in a room right now
servers federationAPI.ServersInRoomProvider
// a list of events from the auth and prev events which we already had // a list of events from the auth and prev events which we already had
hadEvents map[string]bool hadEvents map[string]bool
// local cache of events for auth checks, etc - this may include events // local cache of events for auth checks, etc - this may include events
@ -466,22 +469,24 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
} }
} }
func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserverlib.ServerName { func (t *txnReq) getServers(ctx context.Context, roomID string, event *gomatrixserverlib.Event) []gomatrixserverlib.ServerName {
t.serversMutex.Lock() // The server that sent us the event should be sufficient to tell us about missing
defer t.serversMutex.Unlock() // prev and auth events.
servers := []gomatrixserverlib.ServerName{t.Origin}
// If the event origin is different to the transaction origin then we can use
// this as a last resort. The origin server that created the event would have
// had to know the auth and prev events.
if event != nil {
if origin := event.Origin(); origin != t.Origin {
servers = append(servers, origin)
}
}
// If a specific room-to-server provider exists then use that. This will primarily
// be used for the P2P demos.
if t.servers != nil { if t.servers != nil {
return t.servers servers = append(servers, t.servers.GetServersForRoom(ctx, roomID, event)...)
} }
t.servers = []gomatrixserverlib.ServerName{t.Origin} return servers
serverReq := &api.QueryServerJoinedToRoomRequest{
RoomID: roomID,
}
serverRes := &api.QueryServerJoinedToRoomResponse{}
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
t.servers = append(t.servers, serverRes.ServerNames...)
util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(t.servers), roomID)
}
return t.servers
} }
func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error { func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error {
@ -566,7 +571,7 @@ func (t *txnReq) retrieveMissingAuthEvents(
withNextEvent: withNextEvent:
for missingAuthEventID := range missingAuthEvents { for missingAuthEventID := range missingAuthEvents {
withNextServer: withNextServer:
for _, server := range t.getServers(ctx, e.RoomID()) { for _, server := range t.getServers(ctx, e.RoomID(), e) {
logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server) logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server)
tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID) tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID)
if err != nil { if err != nil {
@ -948,7 +953,7 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even
} }
var missingResp *gomatrixserverlib.RespMissingEvents var missingResp *gomatrixserverlib.RespMissingEvents
servers := t.getServers(ctx, e.RoomID()) servers := t.getServers(ctx, e.RoomID(), e)
for _, server := range servers { for _, server := range servers {
var m gomatrixserverlib.RespMissingEvents var m gomatrixserverlib.RespMissingEvents
if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{ if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{
@ -1220,7 +1225,7 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.
} }
var event *gomatrixserverlib.Event var event *gomatrixserverlib.Event
found := false found := false
servers := t.getServers(ctx, roomID) servers := t.getServers(ctx, roomID, nil)
for _, serverName := range servers { for _, serverName := range servers {
txn, err := t.federation.GetEvent(ctx, serverName, missingEventID) txn, err := t.federation.GetEvent(ctx, serverName, missingEventID)
if err != nil || len(txn.PDUs) == 0 { if err != nil || len(txn.PDUs) == 0 {

View File

@ -68,7 +68,7 @@ func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ss
federationapi.AddPublicRoutes( federationapi.AddPublicRoutes(
ssMux, keyMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient, ssMux, keyMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient,
m.KeyRing, m.RoomserverAPI, m.FederationSenderAPI, m.KeyRing, m.RoomserverAPI, m.FederationSenderAPI,
m.EDUInternalAPI, m.KeyAPI, &m.Config.MSCs, m.EDUInternalAPI, m.KeyAPI, &m.Config.MSCs, nil,
) )
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client) mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client)
syncapi.AddPublicRoutes( syncapi.AddPublicRoutes(