From 04bc09f591ec9562bf7070fb45a8c1c5979e686f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 3 Sep 2020 21:17:55 +0100 Subject: [PATCH] Defer keyserver and federationsender wakeups to give HTTP listeners time to start (#1389) --- federationsender/queue/queue.go | 37 ++++++++++++++++++--------------- keyserver/keyserver.go | 9 ++++---- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 6561251d..b13df612 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "sync" + "time" stateapi "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/federationsender/statistics" @@ -65,26 +66,28 @@ func NewOutgoingQueues( queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, } // Look up which servers we have pending items for and then rehydrate those queues. - serverNames := map[gomatrixserverlib.ServerName]struct{}{} - if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil { - for _, serverName := range names { - serverNames[serverName] = struct{}{} + time.AfterFunc(time.Second*5, func() { + serverNames := map[gomatrixserverlib.ServerName]struct{}{} + if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil { + for _, serverName := range names { + serverNames[serverName] = struct{}{} + } + } else { + log.WithError(err).Error("Failed to get PDU server names for destination queue hydration") } - } else { - log.WithError(err).Error("Failed to get PDU server names for destination queue hydration") - } - if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil { - for _, serverName := range names { - serverNames[serverName] = struct{}{} + if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil { + for _, serverName := range names { + serverNames[serverName] = struct{}{} + } + } else { + log.WithError(err).Error("Failed to get EDU server names for destination queue hydration") } - } else { - log.WithError(err).Error("Failed to get EDU server names for destination queue hydration") - } - for serverName := range serverNames { - if !queues.getQueue(serverName).statistics.Blacklisted() { - queues.getQueue(serverName).wakeQueueIfNeeded() + for serverName := range serverNames { + if !queues.getQueue(serverName).statistics.Blacklisted() { + queues.getQueue(serverName).wakeQueueIfNeeded() + } } - } + }) return queues } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 2e561363..78420db1 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -48,10 +48,11 @@ func NewInternalAPI( DB: db, } updater := internal.NewDeviceListUpdater(db, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable - err = updater.Start() - if err != nil { - logrus.WithError(err).Panicf("failed to start device list updater") - } + go func() { + if err := updater.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start device list updater") + } + }() return &internal.KeyInternalAPI{ DB: db, ThisServer: cfg.Matrix.ServerName,