2017-06-28 16:10:17 +01:00
|
|
|
// Copyright 2017 Vector Creations Ltd
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
package queue
|
|
|
|
|
|
|
|
import (
|
2020-07-01 11:46:38 +01:00
|
|
|
"encoding/json"
|
2017-06-28 16:10:17 +01:00
|
|
|
"fmt"
|
|
|
|
"sync"
|
2020-09-03 21:17:55 +01:00
|
|
|
"time"
|
2017-06-28 16:10:17 +01:00
|
|
|
|
2022-10-10 16:36:26 +01:00
|
|
|
"github.com/getsentry/sentry-go"
|
2022-02-11 17:15:44 +00:00
|
|
|
"github.com/matrix-org/gomatrixserverlib"
|
2023-04-06 09:55:01 +01:00
|
|
|
"github.com/matrix-org/gomatrixserverlib/fclient"
|
2023-04-19 15:50:33 +01:00
|
|
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
2022-02-11 17:15:44 +00:00
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
2022-10-21 11:50:51 +01:00
|
|
|
"github.com/sirupsen/logrus"
|
2022-02-11 17:15:44 +00:00
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
|
2021-11-24 10:45:23 +00:00
|
|
|
"github.com/matrix-org/dendrite/federationapi/statistics"
|
|
|
|
"github.com/matrix-org/dendrite/federationapi/storage"
|
2023-01-23 17:55:12 +00:00
|
|
|
"github.com/matrix-org/dendrite/federationapi/storage/shared/receipt"
|
2023-04-27 12:54:20 +01:00
|
|
|
"github.com/matrix-org/dendrite/roomserver/types"
|
2021-01-26 12:56:20 +00:00
|
|
|
"github.com/matrix-org/dendrite/setup/process"
|
2017-06-28 16:10:17 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
// OutgoingQueues is a collection of queues for sending transactions to other
|
|
|
|
// matrix servers
|
|
|
|
type OutgoingQueues struct {
|
2020-07-01 11:46:38 +01:00
|
|
|
db storage.Database
|
2021-01-26 12:56:20 +00:00
|
|
|
process *process.ProcessContext
|
2020-12-02 15:10:03 +00:00
|
|
|
disabled bool
|
2023-04-19 15:50:33 +01:00
|
|
|
origin spec.ServerName
|
2023-04-24 17:23:25 +01:00
|
|
|
client fclient.FederationClient
|
2020-07-22 17:01:29 +01:00
|
|
|
statistics *statistics.Statistics
|
2023-04-19 15:50:33 +01:00
|
|
|
signing map[spec.ServerName]*fclient.SigningIdentity
|
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 12:42:06 +01:00
|
|
|
queuesMutex sync.Mutex // protects the below
|
2023-04-19 15:50:33 +01:00
|
|
|
queues map[spec.ServerName]*destinationQueue
|
2017-06-28 16:10:17 +01:00
|
|
|
}
|
|
|
|
|
2020-12-16 15:02:39 +00:00
|
|
|
func init() {
|
|
|
|
prometheus.MustRegister(
|
|
|
|
destinationQueueTotal, destinationQueueRunning,
|
|
|
|
destinationQueueBackingOff,
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
var destinationQueueTotal = prometheus.NewGauge(
|
|
|
|
prometheus.GaugeOpts{
|
|
|
|
Namespace: "dendrite",
|
2021-11-24 10:45:23 +00:00
|
|
|
Subsystem: "federationapi",
|
2020-12-16 15:02:39 +00:00
|
|
|
Name: "destination_queues_total",
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
var destinationQueueRunning = prometheus.NewGauge(
|
|
|
|
prometheus.GaugeOpts{
|
|
|
|
Namespace: "dendrite",
|
2021-11-24 10:45:23 +00:00
|
|
|
Subsystem: "federationapi",
|
2020-12-16 15:02:39 +00:00
|
|
|
Name: "destination_queues_running",
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
var destinationQueueBackingOff = prometheus.NewGauge(
|
|
|
|
prometheus.GaugeOpts{
|
|
|
|
Namespace: "dendrite",
|
2021-11-24 10:45:23 +00:00
|
|
|
Subsystem: "federationapi",
|
2020-12-16 15:02:39 +00:00
|
|
|
Name: "destination_queues_backing_off",
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
2017-06-28 16:10:17 +01:00
|
|
|
// NewOutgoingQueues makes a new OutgoingQueues
|
2020-04-28 10:53:07 +01:00
|
|
|
func NewOutgoingQueues(
|
2020-07-01 11:46:38 +01:00
|
|
|
db storage.Database,
|
2021-01-26 12:56:20 +00:00
|
|
|
process *process.ProcessContext,
|
2020-12-02 15:10:03 +00:00
|
|
|
disabled bool,
|
2023-04-19 15:50:33 +01:00
|
|
|
origin spec.ServerName,
|
2023-04-24 17:23:25 +01:00
|
|
|
client fclient.FederationClient,
|
2020-07-22 17:01:29 +01:00
|
|
|
statistics *statistics.Statistics,
|
2023-04-06 09:55:01 +01:00
|
|
|
signing []*fclient.SigningIdentity,
|
2020-04-28 10:53:07 +01:00
|
|
|
) *OutgoingQueues {
|
2020-07-03 11:49:49 +01:00
|
|
|
queues := &OutgoingQueues{
|
2020-12-02 15:10:03 +00:00
|
|
|
disabled: disabled,
|
2021-01-26 12:56:20 +00:00
|
|
|
process: process,
|
2020-07-01 11:46:38 +01:00
|
|
|
db: db,
|
2020-04-28 10:53:07 +01:00
|
|
|
origin: origin,
|
|
|
|
client: client,
|
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 12:42:06 +01:00
|
|
|
statistics: statistics,
|
2023-04-19 15:50:33 +01:00
|
|
|
signing: map[spec.ServerName]*fclient.SigningIdentity{},
|
|
|
|
queues: map[spec.ServerName]*destinationQueue{},
|
2017-06-28 16:10:17 +01:00
|
|
|
}
|
2022-11-15 15:05:23 +00:00
|
|
|
for _, identity := range signing {
|
|
|
|
queues.signing[identity.ServerName] = identity
|
|
|
|
}
|
2020-07-03 11:49:49 +01:00
|
|
|
// Look up which servers we have pending items for and then rehydrate those queues.
|
2020-12-02 15:10:03 +00:00
|
|
|
if !disabled {
|
2023-04-19 15:50:33 +01:00
|
|
|
serverNames := map[spec.ServerName]struct{}{}
|
2022-04-27 15:29:49 +01:00
|
|
|
if names, err := db.GetPendingPDUServerNames(process.Context()); err == nil {
|
2022-04-04 15:14:10 +01:00
|
|
|
for _, serverName := range names {
|
|
|
|
serverNames[serverName] = struct{}{}
|
2020-09-03 21:17:55 +01:00
|
|
|
}
|
2022-04-04 15:14:10 +01:00
|
|
|
} else {
|
|
|
|
log.WithError(err).Error("Failed to get PDU server names for destination queue hydration")
|
|
|
|
}
|
2022-04-27 15:29:49 +01:00
|
|
|
if names, err := db.GetPendingEDUServerNames(process.Context()); err == nil {
|
2022-04-04 15:14:10 +01:00
|
|
|
for _, serverName := range names {
|
|
|
|
serverNames[serverName] = struct{}{}
|
2020-09-03 21:17:55 +01:00
|
|
|
}
|
2022-04-04 15:14:10 +01:00
|
|
|
} else {
|
|
|
|
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
|
|
|
|
}
|
|
|
|
offset, step := time.Second*5, time.Second
|
|
|
|
if max := len(serverNames); max > 120 {
|
|
|
|
step = (time.Second * 120) / time.Duration(max)
|
|
|
|
}
|
|
|
|
for serverName := range serverNames {
|
|
|
|
if queue := queues.getQueue(serverName); queue != nil {
|
|
|
|
time.AfterFunc(offset, queue.wakeQueueIfNeeded)
|
|
|
|
offset += step
|
2020-09-03 21:17:55 +01:00
|
|
|
}
|
2022-04-04 15:14:10 +01:00
|
|
|
}
|
2020-12-02 15:10:03 +00:00
|
|
|
}
|
2020-07-03 11:49:49 +01:00
|
|
|
return queues
|
2017-06-28 16:10:17 +01:00
|
|
|
}
|
|
|
|
|
2020-12-09 10:03:22 +00:00
|
|
|
type queuedPDU struct {
|
2023-01-23 17:55:12 +00:00
|
|
|
dbReceipt *receipt.Receipt
|
2023-04-27 12:54:20 +01:00
|
|
|
pdu *types.HeaderedEvent
|
2020-12-09 10:03:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type queuedEDU struct {
|
2023-01-23 17:55:12 +00:00
|
|
|
dbReceipt *receipt.Receipt
|
|
|
|
edu *gomatrixserverlib.EDU
|
2020-12-09 10:03:22 +00:00
|
|
|
}
|
|
|
|
|
2023-04-19 15:50:33 +01:00
|
|
|
func (oqs *OutgoingQueues) getQueue(destination spec.ServerName) *destinationQueue {
|
2021-02-17 15:16:35 +00:00
|
|
|
if oqs.statistics.ForServer(destination).Blacklisted() {
|
|
|
|
return nil
|
|
|
|
}
|
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 12:42:06 +01:00
|
|
|
oqs.queuesMutex.Lock()
|
|
|
|
defer oqs.queuesMutex.Unlock()
|
2021-02-17 15:16:35 +00:00
|
|
|
oq, ok := oqs.queues[destination]
|
2022-08-05 06:20:34 +01:00
|
|
|
if !ok || oq == nil {
|
2020-12-16 15:02:39 +00:00
|
|
|
destinationQueueTotal.Inc()
|
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 12:42:06 +01:00
|
|
|
oq = &destinationQueue{
|
2022-10-19 11:03:16 +01:00
|
|
|
queues: oqs,
|
|
|
|
db: oqs.db,
|
|
|
|
process: oqs.process,
|
|
|
|
origin: oqs.origin,
|
|
|
|
destination: destination,
|
|
|
|
client: oqs.client,
|
|
|
|
statistics: oqs.statistics.ForServer(destination),
|
|
|
|
notify: make(chan struct{}, 1),
|
|
|
|
signing: oqs.signing,
|
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 12:42:06 +01:00
|
|
|
}
|
2022-10-19 11:03:16 +01:00
|
|
|
oq.statistics.AssignBackoffNotifier(oq.handleBackoffNotifier)
|
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 12:42:06 +01:00
|
|
|
oqs.queues[destination] = oq
|
|
|
|
}
|
|
|
|
return oq
|
|
|
|
}
|
|
|
|
|
2022-10-19 11:03:16 +01:00
|
|
|
// clearQueue removes the queue for the provided destination from the
|
|
|
|
// set of destination queues.
|
2021-02-17 15:16:35 +00:00
|
|
|
func (oqs *OutgoingQueues) clearQueue(oq *destinationQueue) {
|
|
|
|
oqs.queuesMutex.Lock()
|
|
|
|
defer oqs.queuesMutex.Unlock()
|
|
|
|
|
|
|
|
delete(oqs.queues, oq.destination)
|
|
|
|
destinationQueueTotal.Dec()
|
|
|
|
}
|
|
|
|
|
2017-06-28 16:10:17 +01:00
|
|
|
// SendEvent sends an event to the destinations
|
|
|
|
func (oqs *OutgoingQueues) SendEvent(
|
2023-04-27 12:54:20 +01:00
|
|
|
ev *types.HeaderedEvent, origin spec.ServerName,
|
2023-04-19 15:50:33 +01:00
|
|
|
destinations []spec.ServerName,
|
2017-06-28 16:10:17 +01:00
|
|
|
) error {
|
2020-12-02 15:10:03 +00:00
|
|
|
if oqs.disabled {
|
2022-02-11 17:15:44 +00:00
|
|
|
log.Trace("Federation is disabled, not sending event")
|
|
|
|
return nil
|
2020-12-02 15:10:03 +00:00
|
|
|
}
|
2022-11-11 16:41:37 +00:00
|
|
|
if _, ok := oqs.signing[origin]; !ok {
|
2017-06-28 16:10:17 +01:00
|
|
|
return fmt.Errorf(
|
2022-11-11 16:41:37 +00:00
|
|
|
"sendevent: unexpected server to send as %q",
|
|
|
|
origin,
|
2017-06-28 16:10:17 +01:00
|
|
|
)
|
|
|
|
}
|
|
|
|
|
2020-08-13 14:23:37 +01:00
|
|
|
// Deduplicate destinations and remove the origin from the list of
|
|
|
|
// destinations just to be sure.
|
2023-04-19 15:50:33 +01:00
|
|
|
destmap := map[spec.ServerName]struct{}{}
|
2020-08-13 14:23:37 +01:00
|
|
|
for _, d := range destinations {
|
|
|
|
destmap[d] = struct{}{}
|
|
|
|
}
|
|
|
|
delete(destmap, oqs.origin)
|
2022-11-11 16:41:37 +00:00
|
|
|
for local := range oqs.signing {
|
|
|
|
delete(destmap, local)
|
|
|
|
}
|
2020-08-13 14:23:37 +01:00
|
|
|
|
|
|
|
// If there are no remaining destinations then give up.
|
|
|
|
if len(destmap) == 0 {
|
2020-06-12 15:11:33 +01:00
|
|
|
return nil
|
|
|
|
}
|
2017-06-28 16:10:17 +01:00
|
|
|
|
|
|
|
log.WithFields(log.Fields{
|
2020-08-13 14:23:37 +01:00
|
|
|
"destinations": len(destmap), "event": ev.EventID(),
|
2020-08-07 15:00:23 +01:00
|
|
|
}).Infof("Sending event")
|
2017-06-28 16:10:17 +01:00
|
|
|
|
2020-07-01 11:46:38 +01:00
|
|
|
headeredJSON, err := json.Marshal(ev)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("json.Marshal: %w", err)
|
|
|
|
}
|
|
|
|
|
2022-04-27 15:29:49 +01:00
|
|
|
nid, err := oqs.db.StoreJSON(oqs.process.Context(), string(headeredJSON))
|
2020-07-01 11:46:38 +01:00
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
|
|
|
|
}
|
|
|
|
|
2022-10-26 17:35:01 +01:00
|
|
|
destQueues := make([]*destinationQueue, 0, len(destmap))
|
2020-08-13 14:23:37 +01:00
|
|
|
for destination := range destmap {
|
2022-10-26 17:35:01 +01:00
|
|
|
if queue := oqs.getQueue(destination); queue != nil {
|
|
|
|
destQueues = append(destQueues, queue)
|
2022-10-21 11:50:51 +01:00
|
|
|
} else {
|
|
|
|
delete(destmap, destination)
|
2021-02-17 15:16:35 +00:00
|
|
|
}
|
2017-06-28 16:10:17 +01:00
|
|
|
}
|
2018-08-10 16:26:57 +01:00
|
|
|
|
2022-10-21 11:50:51 +01:00
|
|
|
// Create a database entry that associates the given PDU NID with
|
|
|
|
// this destinations queue. We'll then be able to retrieve the PDU
|
|
|
|
// later.
|
|
|
|
if err := oqs.db.AssociatePDUWithDestinations(
|
|
|
|
oqs.process.Context(),
|
|
|
|
destmap,
|
|
|
|
nid, // NIDs from federationapi_queue_json table
|
|
|
|
); err != nil {
|
|
|
|
logrus.WithError(err).Errorf("failed to associate PDUs %q with destinations", nid)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-10-26 17:35:01 +01:00
|
|
|
// NOTE : PDUs should be associated with destinations before sending
|
|
|
|
// them, otherwise this is technically a race.
|
|
|
|
// If the send completes before they are associated then they won't
|
|
|
|
// get properly cleaned up in the database.
|
|
|
|
for _, queue := range destQueues {
|
|
|
|
queue.sendEvent(ev, nid)
|
|
|
|
}
|
|
|
|
|
2018-08-10 16:26:57 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-07-14 12:33:37 +01:00
|
|
|
// SendEDU sends an EDU event to the destinations.
|
2018-08-10 16:26:57 +01:00
|
|
|
func (oqs *OutgoingQueues) SendEDU(
|
2023-04-19 15:50:33 +01:00
|
|
|
e *gomatrixserverlib.EDU, origin spec.ServerName,
|
|
|
|
destinations []spec.ServerName,
|
2018-08-10 16:26:57 +01:00
|
|
|
) error {
|
2020-12-02 15:10:03 +00:00
|
|
|
if oqs.disabled {
|
2022-02-11 17:15:44 +00:00
|
|
|
log.Trace("Federation is disabled, not sending EDU")
|
|
|
|
return nil
|
2020-12-02 15:10:03 +00:00
|
|
|
}
|
2022-11-11 16:41:37 +00:00
|
|
|
if _, ok := oqs.signing[origin]; !ok {
|
2018-08-10 16:26:57 +01:00
|
|
|
return fmt.Errorf(
|
2022-11-11 16:41:37 +00:00
|
|
|
"sendevent: unexpected server to send as %q",
|
|
|
|
origin,
|
2018-08-10 16:26:57 +01:00
|
|
|
)
|
|
|
|
}
|
|
|
|
|
2020-08-13 14:23:37 +01:00
|
|
|
// Deduplicate destinations and remove the origin from the list of
|
|
|
|
// destinations just to be sure.
|
2023-04-19 15:50:33 +01:00
|
|
|
destmap := map[spec.ServerName]struct{}{}
|
2020-08-13 14:23:37 +01:00
|
|
|
for _, d := range destinations {
|
|
|
|
destmap[d] = struct{}{}
|
|
|
|
}
|
|
|
|
delete(destmap, oqs.origin)
|
2022-11-11 16:41:37 +00:00
|
|
|
for local := range oqs.signing {
|
|
|
|
delete(destmap, local)
|
|
|
|
}
|
2018-08-10 16:26:57 +01:00
|
|
|
|
2020-08-13 14:23:37 +01:00
|
|
|
// If there are no remaining destinations then give up.
|
|
|
|
if len(destmap) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
log.WithFields(log.Fields{
|
|
|
|
"destinations": len(destmap), "edu_type": e.Type,
|
|
|
|
}).Info("Sending EDU event")
|
|
|
|
|
2020-07-20 16:55:20 +01:00
|
|
|
ephemeralJSON, err := json.Marshal(e)
|
|
|
|
if err != nil {
|
2022-10-10 16:36:26 +01:00
|
|
|
sentry.CaptureException(err)
|
2020-07-20 16:55:20 +01:00
|
|
|
return fmt.Errorf("json.Marshal: %w", err)
|
|
|
|
}
|
|
|
|
|
2022-04-27 15:29:49 +01:00
|
|
|
nid, err := oqs.db.StoreJSON(oqs.process.Context(), string(ephemeralJSON))
|
2020-07-20 16:55:20 +01:00
|
|
|
if err != nil {
|
2022-10-10 16:36:26 +01:00
|
|
|
sentry.CaptureException(err)
|
2020-07-20 16:55:20 +01:00
|
|
|
return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
|
|
|
|
}
|
|
|
|
|
2022-10-26 17:35:01 +01:00
|
|
|
destQueues := make([]*destinationQueue, 0, len(destmap))
|
2020-08-13 14:23:37 +01:00
|
|
|
for destination := range destmap {
|
2022-10-26 17:35:01 +01:00
|
|
|
if queue := oqs.getQueue(destination); queue != nil {
|
|
|
|
destQueues = append(destQueues, queue)
|
2022-10-21 11:50:51 +01:00
|
|
|
} else {
|
|
|
|
delete(destmap, destination)
|
2021-02-17 15:16:35 +00:00
|
|
|
}
|
2018-08-10 16:26:57 +01:00
|
|
|
}
|
|
|
|
|
2022-10-21 11:50:51 +01:00
|
|
|
// Create a database entry that associates the given PDU NID with
|
2022-10-26 17:35:01 +01:00
|
|
|
// these destination queues. We'll then be able to retrieve the PDU
|
2022-10-21 11:50:51 +01:00
|
|
|
// later.
|
|
|
|
if err := oqs.db.AssociateEDUWithDestinations(
|
|
|
|
oqs.process.Context(),
|
2022-10-26 17:35:01 +01:00
|
|
|
destmap, // the destination server names
|
2022-10-21 11:50:51 +01:00
|
|
|
nid, // NIDs from federationapi_queue_json table
|
|
|
|
e.Type,
|
|
|
|
nil, // this will use the default expireEDUTypes map
|
|
|
|
); err != nil {
|
|
|
|
logrus.WithError(err).Errorf("failed to associate EDU with destinations")
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-10-26 17:35:01 +01:00
|
|
|
// NOTE : EDUs should be associated with destinations before sending
|
|
|
|
// them, otherwise this is technically a race.
|
|
|
|
// If the send completes before they are associated then they won't
|
|
|
|
// get properly cleaned up in the database.
|
|
|
|
for _, queue := range destQueues {
|
|
|
|
queue.sendEDU(e, nid)
|
|
|
|
}
|
|
|
|
|
2017-06-28 16:10:17 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-06-01 18:34:08 +01:00
|
|
|
// RetryServer attempts to resend events to the given server if we had given up.
|
2023-04-19 15:50:33 +01:00
|
|
|
func (oqs *OutgoingQueues) RetryServer(srv spec.ServerName, wasBlacklisted bool) {
|
2020-12-02 15:10:03 +00:00
|
|
|
if oqs.disabled {
|
|
|
|
return
|
|
|
|
}
|
2022-11-18 00:29:23 +00:00
|
|
|
|
2021-02-17 15:16:35 +00:00
|
|
|
if queue := oqs.getQueue(srv); queue != nil {
|
2023-01-23 17:55:12 +00:00
|
|
|
queue.wakeQueueIfEventsPending(wasBlacklisted)
|
2020-06-01 18:34:08 +01:00
|
|
|
}
|
|
|
|
}
|