diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go index c597dd27..dff194dd 100644 --- a/clientapi/routing/membership.go +++ b/clientapi/routing/membership.go @@ -127,18 +127,18 @@ func SendMembership( returnData = struct { RoomID string `json:"room_id"` }{roomID} + fallthrough default: - } - - _, err = producer.SendEvents( - req.Context(), - []gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)}, - cfg.Matrix.ServerName, - nil, - ) - if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") - return jsonerror.InternalServerError() + _, err = producer.SendEvents( + req.Context(), + []gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)}, + cfg.Matrix.ServerName, + nil, + ) + if err != nil { + util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + return jsonerror.InternalServerError() + } } return util.JSONResponse{ diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index f280c748..0365a6f2 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -153,7 +153,7 @@ func main() { asQuery := appservice.SetupAppServiceAPIComponent( &base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(), ) - fedSenderAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query) + fedSenderAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query, input) clientapi.SetupClientAPIComponent( &base.Base, deviceDB, accountDB, diff --git a/cmd/dendrite-federation-sender-server/main.go b/cmd/dendrite-federation-sender-server/main.go index 71fc0b01..1593afaa 100644 --- a/cmd/dendrite-federation-sender-server/main.go +++ b/cmd/dendrite-federation-sender-server/main.go @@ -26,10 +26,10 @@ func main() { federation := base.CreateFederationClient() - _, _, query := base.CreateHTTPRoomserverAPIs() + _, input, query := base.CreateHTTPRoomserverAPIs() federationsender.SetupFederationSenderComponent( - base, federation, query, + base, federation, query, input, ) base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationSender), string(base.Cfg.Listen.FederationSender)) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 6b0d83ae..70a59ed6 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -62,7 +62,7 @@ func main() { asQuery := appservice.SetupAppServiceAPIComponent( base, accountDB, deviceDB, federation, alias, query, transactions.New(), ) - fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query) + fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input) clientapi.SetupClientAPIComponent( base, deviceDB, accountDB, diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index a318d209..a06caf40 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -20,6 +20,7 @@ import ( "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/consumers" + "github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/query" "github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/storage" @@ -34,13 +35,16 @@ func SetupFederationSenderComponent( base *basecomponent.BaseDendrite, federation *gomatrixserverlib.FederationClient, rsQueryAPI roomserverAPI.RoomserverQueryAPI, + rsInputAPI roomserverAPI.RoomserverInputAPI, ) api.FederationSenderQueryAPI { federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender)) if err != nil { logrus.WithError(err).Panic("failed to connect to federation sender db") } - queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation) + roomserverProducer := producers.NewRoomserverProducer(rsInputAPI, base.Cfg.Matrix.ServerName) + + queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation, roomserverProducer) rsConsumer := consumers.NewOutputRoomEventConsumer( base.Cfg, base.KafkaConsumer, queues, diff --git a/federationsender/producers/roomserver.go b/federationsender/producers/roomserver.go new file mode 100644 index 00000000..0395f962 --- /dev/null +++ b/federationsender/producers/roomserver.go @@ -0,0 +1,66 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "context" + + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" +) + +// RoomserverProducer produces events for the roomserver to consume. +type RoomserverProducer struct { + InputAPI api.RoomserverInputAPI + serverName gomatrixserverlib.ServerName +} + +// NewRoomserverProducer creates a new RoomserverProducer +func NewRoomserverProducer( + inputAPI api.RoomserverInputAPI, serverName gomatrixserverlib.ServerName, +) *RoomserverProducer { + return &RoomserverProducer{ + InputAPI: inputAPI, + serverName: serverName, + } +} + +// SendInviteResponse drops an invite response back into the roomserver so that users +// already in the room will be notified of the new invite. The invite response is signed +// by the remote side. +func (c *RoomserverProducer) SendInviteResponse( + ctx context.Context, res gomatrixserverlib.RespInviteV2, roomVersion gomatrixserverlib.RoomVersion, +) (string, error) { + ev := res.Event.Headered(roomVersion) + ire := api.InputRoomEvent{ + Kind: api.KindNew, + Event: ev, + AuthEventIDs: ev.AuthEventIDs(), + SendAsServer: string(c.serverName), + TransactionID: nil, + } + return c.SendInputRoomEvents(ctx, []api.InputRoomEvent{ire}) +} + +// SendInputRoomEvents writes the given input room events to the roomserver input API. +func (c *RoomserverProducer) SendInputRoomEvents( + ctx context.Context, ires []api.InputRoomEvent, +) (eventID string, err error) { + request := api.InputRoomEventsRequest{InputRoomEvents: ires} + var response api.InputRoomEventsResponse + err = c.InputAPI.InputRoomEvents(ctx, &request, &response) + eventID = response.EventID + return +} diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 7d4dc850..89526fcf 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" log "github.com/sirupsen/logrus" @@ -32,6 +33,7 @@ import ( // ensures that only one request is in flight to a given destination // at a time. type destinationQueue struct { + rsProducer *producers.RoomserverProducer client *gomatrixserverlib.FederationClient origin gomatrixserverlib.ServerName destination gomatrixserverlib.ServerName @@ -165,18 +167,38 @@ func (oq *destinationQueue) nextInvites() bool { } for _, inviteReq := range oq.pendingInvites { - ev := inviteReq.Event() + ev, roomVersion := inviteReq.Event(), inviteReq.RoomVersion() - if _, err := oq.client.SendInviteV2( + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_version": roomVersion, + "destination": oq.destination, + }).Info("sending invite") + + inviteRes, err := oq.client.SendInviteV2( context.TODO(), oq.destination, *inviteReq, - ); err != nil { + ) + if err != nil { log.WithFields(log.Fields{ "event_id": ev.EventID(), "state_key": ev.StateKey(), "destination": oq.destination, }).WithError(err).Error("failed to send invite") + continue + } + + if _, err = oq.rsProducer.SendInviteResponse( + context.TODO(), + inviteRes, + roomVersion, + ); err != nil { + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "state_key": ev.StateKey(), + "destination": oq.destination, + }).WithError(err).Error("failed to return signed invite to roomserver") } } diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 88d47f12..33abc8fd 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -18,6 +18,7 @@ import ( "fmt" "sync" + "github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" ) @@ -25,19 +26,25 @@ import ( // OutgoingQueues is a collection of queues for sending transactions to other // matrix servers type OutgoingQueues struct { - origin gomatrixserverlib.ServerName - client *gomatrixserverlib.FederationClient + rsProducer *producers.RoomserverProducer + origin gomatrixserverlib.ServerName + client *gomatrixserverlib.FederationClient // The queuesMutex protects queues queuesMutex sync.Mutex queues map[gomatrixserverlib.ServerName]*destinationQueue } // NewOutgoingQueues makes a new OutgoingQueues -func NewOutgoingQueues(origin gomatrixserverlib.ServerName, client *gomatrixserverlib.FederationClient) *OutgoingQueues { +func NewOutgoingQueues( + origin gomatrixserverlib.ServerName, + client *gomatrixserverlib.FederationClient, + rsProducer *producers.RoomserverProducer, +) *OutgoingQueues { return &OutgoingQueues{ - origin: origin, - client: client, - queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, + rsProducer: rsProducer, + origin: origin, + client: client, + queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, } } @@ -67,6 +74,7 @@ func (oqs *OutgoingQueues) SendEvent( oq := oqs.queues[destination] if oq == nil { oq = &destinationQueue{ + rsProducer: oqs.rsProducer, origin: oqs.origin, destination: destination, client: oqs.client, @@ -111,6 +119,7 @@ func (oqs *OutgoingQueues) SendInvite( oq := oqs.queues[destination] if oq == nil { oq = &destinationQueue{ + rsProducer: oqs.rsProducer, origin: oqs.origin, destination: destination, client: oqs.client, @@ -151,6 +160,7 @@ func (oqs *OutgoingQueues) SendEDU( oq := oqs.queues[destination] if oq == nil { oq = &destinationQueue{ + rsProducer: oqs.rsProducer, origin: oqs.origin, destination: destination, client: oqs.client, diff --git a/go.mod b/go.mod index fd1c2de8..f566e6d9 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3 + github.com/matrix-org/gomatrixserverlib v0.0.0-20200428095012-a95e289995b1 github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible diff --git a/go.sum b/go.sum index 156f6725..535a999d 100644 --- a/go.sum +++ b/go.sum @@ -367,8 +367,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:kmRjpmFOenVpOaV/DRlo9p6z/IbOKlUC+hhKsAAh8Qg= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3 h1:aJMAKjfXG5I8TqPxJQbQIkGSWM770oxkpgsPHE8C06E= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200428095012-a95e289995b1 h1:TB4V69eOtvmHdFp0+BgLNrDCcCwq6QDUOTjmi8fjC/M= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200428095012-a95e289995b1/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= diff --git a/sytest-whitelist b/sytest-whitelist index 7bd2a63c..439c306c 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -253,3 +253,8 @@ User can invite local user to room with version 3 User can invite local user to room with version 4 A pair of servers can establish a join in a v2 room Can logout all devices +State from remote users is included in the timeline in an incremental sync +User can invite remote user to room with version 1 +User can invite remote user to room with version 2 +User can invite remote user to room with version 3 +User can invite remote user to room with version 4