diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index 573285e8..c27e291f 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -342,10 +342,12 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam if err != nil { logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it") hasFailures = true - } else { - u.clearChannel(userID) } } + for _, userID := range userIDs { + // always clear the channel to unblock Update calls regardless of success/failure + u.clearChannel(userID) + } return hasFailures } diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index ef52d014..8904d463 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -318,65 +318,12 @@ func (a *KeyInternalAPI) queryRemoteKeys( // allows us to wait until all federation servers have been poked var wg sync.WaitGroup wg.Add(len(domainToDeviceKeys)) - // mutex for failures - var failMu sync.Mutex + // mutex for writing directly to res (e.g failures) + var respMu sync.Mutex // fan out for domain, deviceKeys := range domainToDeviceKeys { - go func(serverName string, devKeys map[string][]string) { - defer wg.Done() - fedCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - // for users who we do not have any knowledge about, try to start doing device list updates for them - // by hitting /users/devices - otherwise fallback to /keys/query which has nicer bulk properties but - // lack a stream ID. - var userIDsForAllDevices []string - for userID, deviceIDs := range devKeys { - if len(deviceIDs) == 0 { - userIDsForAllDevices = append(userIDsForAllDevices, userID) - delete(devKeys, userID) - } - } - for _, userID := range userIDsForAllDevices { - err := a.Updater.ManualUpdate(context.Background(), gomatrixserverlib.ServerName(serverName), userID) - if err != nil { - logrus.WithFields(logrus.Fields{ - logrus.ErrorKey: err, - "user_id": userID, - "server": serverName, - }).Error("Failed to manually update device lists for user") - // try to do it via /keys/query - devKeys[userID] = []string{} - continue - } - // refresh entries from DB: unlike remoteKeysFromDatabase we know we previously had no device info for this - // user so the fact that we're populating all devices here isn't a problem so long as we have devices. - err = a.populateResponseWithDeviceKeysFromDatabase(ctx, res, userID, nil) - if err != nil { - logrus.WithFields(logrus.Fields{ - logrus.ErrorKey: err, - "user_id": userID, - "server": serverName, - }).Error("Failed to manually update device lists for user") - // try to do it via /keys/query - devKeys[userID] = []string{} - continue - } - } - if len(devKeys) == 0 { - return - } - queryKeysResp, err := a.FedClient.QueryKeys(fedCtx, gomatrixserverlib.ServerName(serverName), devKeys) - if err != nil { - failMu.Lock() - res.Failures[serverName] = map[string]interface{}{ - "message": err.Error(), - } - failMu.Unlock() - return - } - resultCh <- &queryKeysResp - }(domain, deviceKeys) + go a.queryRemoteKeysOnServer(ctx, domain, deviceKeys, &wg, &respMu, timeout, resultCh, res) } // Close the result channel when the goroutines have quit so the for .. range exits @@ -399,6 +346,76 @@ func (a *KeyInternalAPI) queryRemoteKeys( } } +func (a *KeyInternalAPI) queryRemoteKeysOnServer( + ctx context.Context, serverName string, devKeys map[string][]string, wg *sync.WaitGroup, + respMu *sync.Mutex, timeout time.Duration, resultCh chan<- *gomatrixserverlib.RespQueryKeys, + res *api.QueryKeysResponse, +) { + defer wg.Done() + fedCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + // for users who we do not have any knowledge about, try to start doing device list updates for them + // by hitting /users/devices - otherwise fallback to /keys/query which has nicer bulk properties but + // lack a stream ID. + var userIDsForAllDevices []string + for userID, deviceIDs := range devKeys { + if len(deviceIDs) == 0 { + userIDsForAllDevices = append(userIDsForAllDevices, userID) + delete(devKeys, userID) + } + } + for _, userID := range userIDsForAllDevices { + err := a.Updater.ManualUpdate(context.Background(), gomatrixserverlib.ServerName(serverName), userID) + if err != nil { + logrus.WithFields(logrus.Fields{ + logrus.ErrorKey: err, + "user_id": userID, + "server": serverName, + }).Error("Failed to manually update device lists for user") + // try to do it via /keys/query + devKeys[userID] = []string{} + continue + } + // refresh entries from DB: unlike remoteKeysFromDatabase we know we previously had no device info for this + // user so the fact that we're populating all devices here isn't a problem so long as we have devices. + respMu.Lock() + err = a.populateResponseWithDeviceKeysFromDatabase(ctx, res, userID, nil) + respMu.Unlock() + if err != nil { + logrus.WithFields(logrus.Fields{ + logrus.ErrorKey: err, + "user_id": userID, + "server": serverName, + }).Error("Failed to manually update device lists for user") + // try to do it via /keys/query + devKeys[userID] = []string{} + continue + } + } + if len(devKeys) == 0 { + return + } + queryKeysResp, err := a.FedClient.QueryKeys(fedCtx, gomatrixserverlib.ServerName(serverName), devKeys) + if err == nil { + resultCh <- &queryKeysResp + return + } + respMu.Lock() + res.Failures[serverName] = map[string]interface{}{ + "message": err.Error(), + } + + // last ditch, use the cache only. This is good for when clients hit /keys/query and the remote server + // is down, better to return something than nothing at all. Clients can know about the failure by + // inspecting the failures map though so they can know it's a cached response. + for userID, dkeys := range devKeys { + // drop the error as it's already a failure at this point + _ = a.populateResponseWithDeviceKeysFromDatabase(ctx, res, userID, dkeys) + } + respMu.Unlock() + +} + func (a *KeyInternalAPI) populateResponseWithDeviceKeysFromDatabase( ctx context.Context, res *api.QueryKeysResponse, userID string, deviceIDs []string, ) error { diff --git a/sytest-whitelist b/sytest-whitelist index d22b408a..c49ef94b 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -148,6 +148,7 @@ Get left notifs in sync and /keys/changes when other user leaves Can query remote device keys using POST after notification Server correctly resyncs when client query keys and there is no remote cache Server correctly resyncs when server leaves and rejoins a room +Device list doesn't change if remote server is down Can add account data Can add account data to room Can get account data without syncing