diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 45faa287..09dac464 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -110,12 +110,26 @@ func (oq *destinationQueue) backgroundSend() { // of the queue and they will all be added to transactions // in order. oq.pendingPDUs = append(oq.pendingPDUs, pdu) + // If there are any more things waiting in the channel queue + // then read them. This is safe because we guarantee only + // having one goroutine per destination queue, so the channel + // isn't being consumed anywhere else. + for len(oq.incomingPDUs) > 0 { + oq.pendingPDUs = append(oq.pendingPDUs, <-oq.incomingPDUs) + } case edu := <-oq.incomingEDUs: // Likewise for EDUs, although we should probably not try // too hard with some EDUs (like typing notifications) after // a certain amount of time has passed. // TODO: think about EDU expiry some more oq.pendingEDUs = append(oq.pendingEDUs, edu) + // If there are any more things waiting in the channel queue + // then read them. This is safe because we guarantee only + // having one goroutine per destination queue, so the channel + // isn't being consumed anywhere else. + for len(oq.incomingEDUs) > 0 { + oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs) + } case invite := <-oq.incomingInvites: // There's no strict ordering requirement for invites like // there is for transactions, so we put the invite onto the @@ -126,6 +140,13 @@ func (oq *destinationQueue) backgroundSend() { []*gomatrixserverlib.InviteV2Request{invite}, oq.pendingInvites..., ) + // If there are any more things waiting in the channel queue + // then read them. This is safe because we guarantee only + // having one goroutine per destination queue, so the channel + // isn't being consumed anywhere else. + for len(oq.incomingInvites) > 0 { + oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites) + } case <-time.After(time.Second * 30): // The worker is idle so stop the goroutine. It'll // get restarted automatically the next time we