Remove federationsender producer, which in fact was not a producer (#1115)

* Remove federationsender producer, which in fact was not a producer

* Set the signing struct
This commit is contained in:
Kegsay 2020-06-10 16:54:43 +01:00 committed by GitHub
parent 3b4be90000
commit 399b6ae334
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 40 additions and 137 deletions

View File

@ -20,7 +20,6 @@ import (
"github.com/matrix-org/dendrite/federationsender/consumers"
"github.com/matrix-org/dendrite/federationsender/internal"
"github.com/matrix-org/dendrite/federationsender/inthttp"
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/types"
@ -49,13 +48,13 @@ func NewInternalAPI(
logrus.WithError(err).Panic("failed to connect to federation sender db")
}
roomserverProducer := producers.NewRoomserverProducer(
rsAPI, base.Cfg.Matrix.ServerName, base.Cfg.Matrix.KeyID, base.Cfg.Matrix.PrivateKey,
)
statistics := &types.Statistics{}
queues := queue.NewOutgoingQueues(
base.Cfg.Matrix.ServerName, federation, roomserverProducer, statistics,
base.Cfg.Matrix.ServerName, federation, rsAPI, statistics, &queue.SigningInfo{
KeyID: base.Cfg.Matrix.KeyID,
PrivateKey: base.Cfg.Matrix.PrivateKey,
ServerName: base.Cfg.Matrix.ServerName,
},
)
rsConsumer := consumers.NewOutputRoomEventConsumer(
@ -73,5 +72,5 @@ func NewInternalAPI(
logrus.WithError(err).Panic("failed to start typing server consumer")
}
return internal.NewFederationSenderInternalAPI(federationSenderDB, base.Cfg, roomserverProducer, federation, keyRing, statistics, queues)
return internal.NewFederationSenderInternalAPI(federationSenderDB, base.Cfg, rsAPI, federation, keyRing, statistics, queues)
}

View File

@ -1,22 +1,20 @@
package internal
import (
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
)
// FederationSenderInternalAPI is an implementation of api.FederationSenderInternalAPI
type FederationSenderInternalAPI struct {
api.FederationSenderInternalAPI
db storage.Database
cfg *config.Dendrite
statistics *types.Statistics
producer *producers.RoomserverProducer
rsAPI api.RoomserverInternalAPI
federation *gomatrixserverlib.FederationClient
keyRing *gomatrixserverlib.KeyRing
queues *queue.OutgoingQueues
@ -24,7 +22,7 @@ type FederationSenderInternalAPI struct {
func NewFederationSenderInternalAPI(
db storage.Database, cfg *config.Dendrite,
producer *producers.RoomserverProducer,
rsAPI api.RoomserverInternalAPI,
federation *gomatrixserverlib.FederationClient,
keyRing *gomatrixserverlib.KeyRing,
statistics *types.Statistics,
@ -33,7 +31,7 @@ func NewFederationSenderInternalAPI(
return &FederationSenderInternalAPI{
db: db,
cfg: cfg,
producer: producer,
rsAPI: rsAPI,
federation: federation,
keyRing: keyRing,
statistics: statistics,

View File

@ -7,6 +7,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/internal/perform"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -175,10 +176,11 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
// If we successfully performed a send_join above then the other
// server now thinks we're a part of the room. Send the newly
// returned state to the roomserver to update our local view.
if err = r.producer.SendEventWithState(
ctx,
respSendJoin.ToRespState(),
event.Headered(respMakeJoin.RoomVersion),
respState := respSendJoin.ToRespState()
if err = roomserverAPI.SendEventWithState(
ctx, r.rsAPI,
&respState,
event.Headered(respMakeJoin.RoomVersion), nil,
); err != nil {
return fmt.Errorf("r.producer.SendEventWithState: %w", err)
}

View File

@ -1,108 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package producers
import (
"context"
"crypto/ed25519"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
)
// RoomserverProducer produces events for the roomserver to consume.
type RoomserverProducer struct {
InputAPI api.RoomserverInternalAPI
serverName gomatrixserverlib.ServerName
keyID gomatrixserverlib.KeyID
privateKey ed25519.PrivateKey
}
// NewRoomserverProducer creates a new RoomserverProducer
func NewRoomserverProducer(
rsAPI api.RoomserverInternalAPI, serverName gomatrixserverlib.ServerName,
keyID gomatrixserverlib.KeyID, privateKey ed25519.PrivateKey,
) *RoomserverProducer {
return &RoomserverProducer{
InputAPI: rsAPI,
serverName: serverName,
keyID: keyID,
privateKey: privateKey,
}
}
// SendInviteResponse drops an invite response back into the roomserver so that users
// already in the room will be notified of the new invite. The invite response is signed
// by the remote side.
func (c *RoomserverProducer) SendInviteResponse(
ctx context.Context, res gomatrixserverlib.RespInviteV2, roomVersion gomatrixserverlib.RoomVersion,
) (string, error) {
ev := res.Event.Sign(string(c.serverName), c.keyID, c.privateKey).Headered(roomVersion)
ire := api.InputRoomEvent{
Kind: api.KindNew,
Event: ev,
AuthEventIDs: ev.AuthEventIDs(),
SendAsServer: string(c.serverName),
TransactionID: nil,
}
return c.SendInputRoomEvents(ctx, []api.InputRoomEvent{ire})
}
// SendEventWithState writes an event with KindNew to the roomserver input log
// with the state at the event as KindOutlier before it.
func (c *RoomserverProducer) SendEventWithState(
ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent,
) error {
outliers, err := state.Events()
if err != nil {
return err
}
var ires []api.InputRoomEvent
for _, outlier := range outliers {
ires = append(ires, api.InputRoomEvent{
Kind: api.KindOutlier,
Event: outlier.Headered(event.RoomVersion),
AuthEventIDs: outlier.AuthEventIDs(),
})
}
stateEventIDs := make([]string, len(state.StateEvents))
for i := range state.StateEvents {
stateEventIDs[i] = state.StateEvents[i].EventID()
}
ires = append(ires, api.InputRoomEvent{
Kind: api.KindNew,
Event: event,
AuthEventIDs: event.AuthEventIDs(),
HasState: true,
StateEventIDs: stateEventIDs,
})
_, err = c.SendInputRoomEvents(ctx, ires)
return err
}
// SendInputRoomEvents writes the given input room events to the roomserver input API.
func (c *RoomserverProducer) SendInputRoomEvents(
ctx context.Context, ires []api.InputRoomEvent,
) (eventID string, err error) {
request := api.InputRoomEventsRequest{InputRoomEvents: ires}
var response api.InputRoomEventsResponse
err = c.InputAPI.InputRoomEvents(ctx, &request, &response)
eventID = response.EventID
return
}

View File

@ -20,8 +20,8 @@ import (
"fmt"
"time"
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
@ -34,7 +34,8 @@ import (
// ensures that only one request is in flight to a given destination
// at a time.
type destinationQueue struct {
rsProducer *producers.RoomserverProducer // roomserver producer
signing *SigningInfo
rsAPI api.RoomserverInternalAPI
client *gomatrixserverlib.FederationClient // federation client
origin gomatrixserverlib.ServerName // origin of requests
destination gomatrixserverlib.ServerName // destination of requests
@ -370,11 +371,9 @@ func (oq *destinationQueue) nextInvites(
return done, err
}
if _, err = oq.rsProducer.SendInviteResponse(
context.TODO(),
inviteRes,
roomVersion,
); err != nil {
invEv := inviteRes.Event.Sign(string(oq.signing.ServerName), oq.signing.KeyID, oq.signing.PrivateKey).Headered(roomVersion)
_, err = api.SendEvents(context.TODO(), oq.rsAPI, []gomatrixserverlib.HeaderedEvent{invEv}, oq.signing.ServerName, nil)
if err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"state_key": ev.StateKey(),

View File

@ -15,11 +15,12 @@
package queue
import (
"crypto/ed25519"
"fmt"
"sync"
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
@ -28,10 +29,11 @@ import (
// OutgoingQueues is a collection of queues for sending transactions to other
// matrix servers
type OutgoingQueues struct {
rsProducer *producers.RoomserverProducer
rsAPI api.RoomserverInternalAPI
origin gomatrixserverlib.ServerName
client *gomatrixserverlib.FederationClient
statistics *types.Statistics
signing *SigningInfo
queuesMutex sync.Mutex // protects the below
queues map[gomatrixserverlib.ServerName]*destinationQueue
}
@ -40,18 +42,28 @@ type OutgoingQueues struct {
func NewOutgoingQueues(
origin gomatrixserverlib.ServerName,
client *gomatrixserverlib.FederationClient,
rsProducer *producers.RoomserverProducer,
rsAPI api.RoomserverInternalAPI,
statistics *types.Statistics,
signing *SigningInfo,
) *OutgoingQueues {
return &OutgoingQueues{
rsProducer: rsProducer,
rsAPI: rsAPI,
origin: origin,
client: client,
statistics: statistics,
signing: signing,
queues: map[gomatrixserverlib.ServerName]*destinationQueue{},
}
}
// TODO: Move this somewhere useful for other components as we often need to ferry these 3 variables
// around together
type SigningInfo struct {
ServerName gomatrixserverlib.ServerName
KeyID gomatrixserverlib.KeyID
PrivateKey ed25519.PrivateKey
}
func (oqs *OutgoingQueues) getQueueIfExists(destination gomatrixserverlib.ServerName) *destinationQueue {
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
@ -64,7 +76,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
oq := oqs.queues[destination]
if oq == nil {
oq = &destinationQueue{
rsProducer: oqs.rsProducer,
rsAPI: oqs.rsAPI,
origin: oqs.origin,
destination: destination,
client: oqs.client,
@ -73,6 +85,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
retryServerCh: make(chan bool),
signing: oqs.signing,
}
oqs.queues[destination] = oq
}