Try to retrieve missing auth events from multiple servers (#1516)

* Recursively fetch auth events if needed

* Fix processEvent call

* Ask more servers in lookupEvent

* Don't panic!

* Panic at the Disco

* Find servers more aggressively

* Add getServers

* Fix number of servers to 5, don't bail making RespState if auth events missing

* Fix panic

* Ignore missing state events too

* Report number of servers correctly

* Don't reuse request context for /send_join

* Update federation API tests

* Don't recurse processEvents

* Implement getEvents differently
This commit is contained in:
Neil Alexander 2020-10-13 11:53:20 +01:00 committed by GitHub
parent d7ea814fa8
commit 9d6b77c58a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 130 additions and 75 deletions

View File

@ -246,9 +246,6 @@ func isProcessingErrorFatal(err error) bool {
type roomNotFoundError struct {
roomID string
}
type unmarshalError struct {
err error
}
type verifySigError struct {
eventID string
err error
@ -259,7 +256,6 @@ type missingPrevEventsError struct {
}
func (e roomNotFoundError) Error() string { return fmt.Sprintf("room %q not found", e.roomID) }
func (e unmarshalError) Error() string { return fmt.Sprintf("unable to parse event: %s", e.err) }
func (e verifySigError) Error() string {
return fmt.Sprintf("unable to verify signature of event %q: %s", e.eventID, e.err)
}
@ -338,6 +334,19 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
}
}
func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserverlib.ServerName {
servers := []gomatrixserverlib.ServerName{t.Origin}
serverReq := &api.QueryServerJoinedToRoomRequest{
RoomID: roomID,
}
serverRes := &api.QueryServerJoinedToRoomResponse{}
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
servers = append(servers, serverRes.ServerNames...)
util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(servers), roomID)
}
return servers
}
func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event) error {
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
@ -354,7 +363,7 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event) er
}
var stateResp api.QueryMissingAuthPrevEventsResponse
if err := t.rsAPI.QueryMissingAuthPrevEvents(ctx, &stateReq, &stateResp); err != nil {
return err
return fmt.Errorf("t.rsAPI.QueryMissingAuthPrevEvents: %w", err)
}
if !stateResp.RoomExists {
@ -369,46 +378,8 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event) er
if len(stateResp.MissingAuthEventIDs) > 0 {
logger.Infof("Event refers to %d unknown auth_events", len(stateResp.MissingAuthEventIDs))
servers := []gomatrixserverlib.ServerName{t.Origin}
serverReq := &api.QueryServerJoinedToRoomRequest{
RoomID: e.RoomID(),
}
serverRes := &api.QueryServerJoinedToRoomResponse{}
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
servers = append(servers, serverRes.ServerNames...)
logger.Infof("Found %d server(s) to query for missing events", len(servers))
}
getAuthEvent:
for _, missingAuthEventID := range stateResp.MissingAuthEventIDs {
for _, server := range servers {
logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server)
tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID)
if err != nil {
continue // try the next server
}
ev, err := gomatrixserverlib.NewEventFromUntrustedJSON(tx.PDUs[0], stateResp.RoomVersion)
if err != nil {
logger.WithError(err).Errorf("Failed to unmarshal auth event %q", missingAuthEventID)
continue // try the next server
}
if err = api.SendInputRoomEvents(
context.Background(),
t.rsAPI,
[]api.InputRoomEvent{
{
Kind: api.KindOutlier,
Event: ev.Headered(stateResp.RoomVersion),
AuthEventIDs: ev.AuthEventIDs(),
SendAsServer: api.DoNotSendToOtherServers,
},
},
); err != nil {
logger.WithError(err).Errorf("Failed to send auth event %q to roomserver", missingAuthEventID)
continue getAuthEvent // move onto the next event
}
}
if err := t.retrieveMissingAuthEvents(ctx, e, &stateResp); err != nil {
return fmt.Errorf("t.retrieveMissingAuthEvents: %w", err)
}
}
@ -431,6 +402,60 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event) er
)
}
func (t *txnReq) retrieveMissingAuthEvents(
ctx context.Context, e gomatrixserverlib.Event, stateResp *api.QueryMissingAuthPrevEventsResponse,
) error {
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
missingAuthEvents := make(map[string]struct{})
for _, missingAuthEventID := range stateResp.MissingAuthEventIDs {
missingAuthEvents[missingAuthEventID] = struct{}{}
}
servers := t.getServers(ctx, e.RoomID())
if len(servers) > 5 {
servers = servers[:5]
}
withNextEvent:
for missingAuthEventID := range missingAuthEvents {
withNextServer:
for _, server := range servers {
logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server)
tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID)
if err != nil {
logger.WithError(err).Warnf("Failed to retrieve auth event %q", missingAuthEventID)
continue withNextServer
}
ev, err := gomatrixserverlib.NewEventFromUntrustedJSON(tx.PDUs[0], stateResp.RoomVersion)
if err != nil {
logger.WithError(err).Warnf("Failed to unmarshal auth event %q", missingAuthEventID)
continue withNextServer
}
if err = api.SendInputRoomEvents(
context.Background(),
t.rsAPI,
[]api.InputRoomEvent{
{
Kind: api.KindOutlier,
Event: ev.Headered(stateResp.RoomVersion),
AuthEventIDs: ev.AuthEventIDs(),
SendAsServer: api.DoNotSendToOtherServers,
},
},
); err != nil {
return fmt.Errorf("api.SendEvents: %w", err)
}
delete(missingAuthEvents, missingAuthEventID)
continue withNextEvent
}
}
if missing := len(missingAuthEvents); missing > 0 {
return fmt.Errorf("Event refers to %d auth_events which we failed to fetch", missing)
}
return nil
}
func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error {
authUsingState := gomatrixserverlib.NewAuthEvents(nil)
for i := range stateEvents {
@ -557,18 +582,23 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
respState, err := t.lookupStateBeforeEvent(ctx, roomVersion, roomID, eventID)
if err != nil {
return nil, err
return nil, fmt.Errorf("t.lookupStateBeforeEvent: %w", err)
}
servers := t.getServers(ctx, roomID)
if len(servers) > 5 {
servers = servers[:5]
}
// fetch the event we're missing and add it to the pile
h, err := t.lookupEvent(ctx, roomVersion, eventID, false)
h, err := t.lookupEvent(ctx, roomVersion, eventID, false, servers)
switch err.(type) {
case verifySigError:
return respState, nil
case nil:
// do nothing
default:
return nil, err
return nil, fmt.Errorf("t.lookupEvent: %w", err)
}
t.haveEvents[h.EventID()] = h
if h.StateKey() != nil {
@ -669,7 +699,11 @@ retryAllowedState:
if err = checkAllowedByState(*backwardsExtremity, resolvedStateEvents); err != nil {
switch missing := err.(type) {
case gomatrixserverlib.MissingAuthEventError:
h, err2 := t.lookupEvent(ctx, roomVersion, missing.AuthEventID, true)
servers := t.getServers(ctx, backwardsExtremity.RoomID())
if len(servers) > 5 {
servers = servers[:5]
}
h, err2 := t.lookupEvent(ctx, roomVersion, missing.AuthEventID, true, servers)
switch err2.(type) {
case verifySigError:
return &gomatrixserverlib.RespState{
@ -874,6 +908,12 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
"concurrent_requests": concurrentRequests,
}).Info("Fetching missing state at event")
// Get a list of servers to fetch from.
servers := t.getServers(ctx, roomID)
if len(servers) > 5 {
servers = servers[:5]
}
// Create a queue containing all of the missing event IDs that we want
// to retrieve.
pending := make(chan string, missingCount)
@ -899,7 +939,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
// Define what we'll do in order to fetch the missing event ID.
fetch := func(missingEventID string) {
var h *gomatrixserverlib.HeaderedEvent
h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false)
h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false, servers)
switch err.(type) {
case verifySigError:
return
@ -937,26 +977,25 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
}
func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs) (
*gomatrixserverlib.RespState, error) {
*gomatrixserverlib.RespState, error) { // nolint:unparam
// create a RespState response using the response to /state_ids as a guide
respState := gomatrixserverlib.RespState{
AuthEvents: make([]gomatrixserverlib.Event, len(stateIDs.AuthEventIDs)),
StateEvents: make([]gomatrixserverlib.Event, len(stateIDs.StateEventIDs)),
}
respState := gomatrixserverlib.RespState{}
for i := range stateIDs.StateEventIDs {
ev, ok := t.haveEvents[stateIDs.StateEventIDs[i]]
if !ok {
return nil, fmt.Errorf("missing state event %s", stateIDs.StateEventIDs[i])
logrus.Warnf("Missing state event in createRespStateFromStateIDs: %s", stateIDs.StateEventIDs[i])
continue
}
respState.StateEvents[i] = ev.Unwrap()
respState.StateEvents = append(respState.StateEvents, ev.Unwrap())
}
for i := range stateIDs.AuthEventIDs {
ev, ok := t.haveEvents[stateIDs.AuthEventIDs[i]]
if !ok {
return nil, fmt.Errorf("missing auth event %s", stateIDs.AuthEventIDs[i])
logrus.Warnf("Missing auth event in createRespStateFromStateIDs: %s", stateIDs.AuthEventIDs[i])
continue
}
respState.AuthEvents[i] = ev.Unwrap()
respState.AuthEvents = append(respState.AuthEvents, ev.Unwrap())
}
// We purposefully do not do auth checks on the returned events, as they will still
// be processed in the exact same way, just as a 'rejected' event
@ -964,7 +1003,7 @@ func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStat
return &respState, nil
}
func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) {
func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool, servers []gomatrixserverlib.ServerName) (*gomatrixserverlib.HeaderedEvent, error) {
if localFirst {
// fetch from the roomserver
queryReq := api.QueryEventsByIDRequest{
@ -977,19 +1016,27 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.
return &queryRes.Events[0], nil
}
}
txn, err := t.federation.GetEvent(ctx, t.Origin, missingEventID)
if err != nil || len(txn.PDUs) == 0 {
util.GetLogger(ctx).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID")
return nil, err
}
pdu := txn.PDUs[0]
var event gomatrixserverlib.Event
event, err = gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
if err != nil {
util.GetLogger(ctx).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID())
return nil, unmarshalError{err}
found := false
for _, serverName := range servers {
txn, err := t.federation.GetEvent(ctx, serverName, missingEventID)
if err != nil || len(txn.PDUs) == 0 {
util.GetLogger(ctx).WithError(err).WithField("event_id", missingEventID).Warn("Failed to get missing /event for event ID")
continue
}
event, err = gomatrixserverlib.NewEventFromUntrustedJSON(txn.PDUs[0], roomVersion)
if err != nil {
util.GetLogger(ctx).WithError(err).WithField("event_id", missingEventID).Warnf("Transaction: Failed to parse event JSON of event")
continue
}
found = true
break
}
if err = gomatrixserverlib.VerifyAllEventSignatures(ctx, []gomatrixserverlib.Event{event}, t.keys); err != nil {
if !found {
util.GetLogger(ctx).WithField("event_id", missingEventID).Warnf("Failed to get missing /event for event ID from %d server(s)", len(servers))
return nil, fmt.Errorf("wasn't able to find event via %d server(s)", len(servers))
}
if err := gomatrixserverlib.VerifyAllEventSignatures(ctx, []gomatrixserverlib.Event{event}, t.keys); err != nil {
util.GetLogger(ctx).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
return nil, verifySigError{event.EventID(), err}
}

View File

@ -491,7 +491,7 @@ func TestTransactionFailAuthChecks(t *testing.T) {
queryMissingAuthPrevEvents: func(req *api.QueryMissingAuthPrevEventsRequest) api.QueryMissingAuthPrevEventsResponse {
return api.QueryMissingAuthPrevEventsResponse{
RoomExists: true,
MissingAuthEventIDs: []string{"create_event"},
MissingAuthEventIDs: []string{},
MissingPrevEventIDs: []string{},
}
},

View File

@ -196,6 +196,11 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
return fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err)
}
// No longer reuse the request context from this point forward.
// We don't want the client timing out to interrupt the join.
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(context.Background())
// Try to perform a send_join using the newly built event.
respSendJoin, err := r.federation.SendJoin(
ctx,
@ -205,11 +210,16 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
)
if err != nil {
r.statistics.ForServer(serverName).Failure()
cancel()
return fmt.Errorf("r.federation.SendJoin: %w", err)
}
r.statistics.ForServer(serverName).Success()
// Sanity-check the join response to ensure that it has a create
// event, that the room version is known, etc.
if err := sanityCheckSendJoinResponse(respSendJoin); err != nil {
return err
cancel()
return fmt.Errorf("sanityCheckSendJoinResponse: %w", err)
}
// Process the join response in a goroutine. The idea here is
@ -217,8 +227,6 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
// to complete, but if the client does give up waiting, we'll
// still continue to process the join anyway so that we don't
// waste the effort.
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(context.Background())
go func() {
defer cancel()