mirror of
https://github.com/1f349/dendrite.git
synced 2024-11-25 21:21:35 +00:00
Join room support in federation sender (#989)
* Implement PerformJoinRequest * Rename perform functions * Check send join response * Temporary wiring to test federation sender room joins * Actually pass through the config * Make sure membership content shows join
This commit is contained in:
parent
a308e61331
commit
64e94e9a6f
@ -27,12 +27,11 @@ import (
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrix"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// JoinRoomByIDOrAlias implements the "/join/{roomIDOrAlias}" API.
|
||||
@ -46,6 +45,7 @@ func JoinRoomByIDOrAlias(
|
||||
producer *producers.RoomserverProducer,
|
||||
queryAPI roomserverAPI.RoomserverQueryAPI,
|
||||
aliasAPI roomserverAPI.RoomserverAliasAPI,
|
||||
fsAPI federationSenderAPI.FederationSenderInternalAPI,
|
||||
keyRing gomatrixserverlib.KeyRing,
|
||||
accountDB accounts.Database,
|
||||
) util.JSONResponse {
|
||||
@ -79,7 +79,8 @@ func JoinRoomByIDOrAlias(
|
||||
content["avatar_url"] = profile.AvatarURL
|
||||
|
||||
r := joinRoomReq{
|
||||
req, evTime, content, device.UserID, cfg, federation, producer, queryAPI, aliasAPI, keyRing,
|
||||
req, evTime, content, device.UserID, cfg, federation, producer,
|
||||
queryAPI, aliasAPI, fsAPI, keyRing,
|
||||
}
|
||||
|
||||
if strings.HasPrefix(roomIDOrAlias, "!") {
|
||||
@ -107,6 +108,7 @@ type joinRoomReq struct {
|
||||
producer *producers.RoomserverProducer
|
||||
queryAPI roomserverAPI.RoomserverQueryAPI
|
||||
aliasAPI roomserverAPI.RoomserverAliasAPI
|
||||
fsAPI federationSenderAPI.FederationSenderInternalAPI
|
||||
keyRing gomatrixserverlib.KeyRing
|
||||
}
|
||||
|
||||
@ -326,71 +328,15 @@ func (r joinRoomReq) joinRoomUsingServers(
|
||||
// server was invalid this returns an error.
|
||||
// Otherwise this returns a JSONResponse.
|
||||
func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib.ServerName) (*util.JSONResponse, error) {
|
||||
// Ask the room server for information about room versions.
|
||||
var request api.QueryRoomVersionCapabilitiesRequest
|
||||
var response api.QueryRoomVersionCapabilitiesResponse
|
||||
if err := r.queryAPI.QueryRoomVersionCapabilities(r.req.Context(), &request, &response); err != nil {
|
||||
fedJoinReq := federationSenderAPI.PerformJoinRequest{
|
||||
RoomID: roomID,
|
||||
UserID: r.userID,
|
||||
ServerName: server,
|
||||
}
|
||||
fedJoinRes := federationSenderAPI.PerformJoinResponse{}
|
||||
if err := r.fsAPI.PerformJoin(r.req.Context(), &fedJoinReq, &fedJoinRes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var supportedVersions []gomatrixserverlib.RoomVersion
|
||||
for version := range response.AvailableRoomVersions {
|
||||
supportedVersions = append(supportedVersions, version)
|
||||
}
|
||||
respMakeJoin, err := r.federation.MakeJoin(r.req.Context(), server, roomID, r.userID, supportedVersions)
|
||||
if err != nil {
|
||||
// TODO: Check if the user was not allowed to join the room.
|
||||
return nil, fmt.Errorf("r.federation.MakeJoin: %w", err)
|
||||
}
|
||||
|
||||
// Set all the fields to be what they should be, this should be a no-op
|
||||
// but it's possible that the remote server returned us something "odd"
|
||||
err = r.writeToBuilder(&respMakeJoin.JoinEvent, roomID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("r.writeToBuilder: %w", err)
|
||||
}
|
||||
|
||||
if respMakeJoin.RoomVersion == "" {
|
||||
respMakeJoin.RoomVersion = gomatrixserverlib.RoomVersionV1
|
||||
}
|
||||
if _, err = respMakeJoin.RoomVersion.EventFormat(); err != nil {
|
||||
return &util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.UnsupportedRoomVersion(
|
||||
fmt.Sprintf("Room version '%s' is not supported", respMakeJoin.RoomVersion),
|
||||
),
|
||||
}, nil
|
||||
}
|
||||
|
||||
event, err := respMakeJoin.JoinEvent.Build(
|
||||
r.evTime, r.cfg.Matrix.ServerName, r.cfg.Matrix.KeyID,
|
||||
r.cfg.Matrix.PrivateKey, respMakeJoin.RoomVersion,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err)
|
||||
}
|
||||
|
||||
respSendJoin, err := r.federation.SendJoin(r.req.Context(), server, event, respMakeJoin.RoomVersion)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("r.federation.SendJoin: %w", err)
|
||||
}
|
||||
|
||||
if err = r.checkSendJoinResponse(event, server, respMakeJoin, respSendJoin); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
util.GetLogger(r.req.Context()).WithFields(logrus.Fields{
|
||||
"room_id": roomID,
|
||||
"num_auth_events": len(respSendJoin.AuthEvents),
|
||||
"num_state_events": len(respSendJoin.StateEvents),
|
||||
}).Info("Room join signature and auth verification passed")
|
||||
|
||||
if err = r.producer.SendEventWithState(
|
||||
r.req.Context(),
|
||||
respSendJoin.ToRespState(),
|
||||
event.Headered(respMakeJoin.RoomVersion),
|
||||
); err != nil {
|
||||
util.GetLogger(r.req.Context()).WithError(err).Error("r.producer.SendEventWithState")
|
||||
}
|
||||
|
||||
return &util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
@ -400,49 +346,3 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib
|
||||
}{roomID},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// checkSendJoinResponse checks that all of the signatures are correct
|
||||
// and that the join is allowed by the supplied state.
|
||||
func (r joinRoomReq) checkSendJoinResponse(
|
||||
event gomatrixserverlib.Event,
|
||||
server gomatrixserverlib.ServerName,
|
||||
respMakeJoin gomatrixserverlib.RespMakeJoin,
|
||||
respSendJoin gomatrixserverlib.RespSendJoin,
|
||||
) error {
|
||||
// A list of events that we have retried, if they were not included in
|
||||
// the auth events supplied in the send_join.
|
||||
retries := map[string]bool{}
|
||||
|
||||
retryCheck:
|
||||
// TODO: Can we expand Check here to return a list of missing auth
|
||||
// events rather than failing one at a time?
|
||||
if err := respSendJoin.Check(r.req.Context(), r.keyRing, event); err != nil {
|
||||
switch e := err.(type) {
|
||||
case gomatrixserverlib.MissingAuthEventError:
|
||||
// Check that we haven't already retried for this event, prevents
|
||||
// us from ending up in endless loops
|
||||
if !retries[e.AuthEventID] {
|
||||
// Ask the server that we're talking to right now for the event
|
||||
tx, txerr := r.federation.GetEvent(r.req.Context(), server, e.AuthEventID)
|
||||
if txerr != nil {
|
||||
return fmt.Errorf("r.federation.GetEvent: %w", txerr)
|
||||
}
|
||||
// For each event returned, add it to the auth events.
|
||||
for _, pdu := range tx.PDUs {
|
||||
ev, everr := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, respMakeJoin.RoomVersion)
|
||||
if everr != nil {
|
||||
return fmt.Errorf("gomatrixserverlib.NewEventFromUntrustedJSON: %w", everr)
|
||||
}
|
||||
respSendJoin.AuthEvents = append(respSendJoin.AuthEvents, ev)
|
||||
}
|
||||
// Mark the event as retried and then give the check another go.
|
||||
retries[e.AuthEventID] = true
|
||||
goto retryCheck
|
||||
}
|
||||
return fmt.Errorf("respSendJoin (after retries): %w", e)
|
||||
default:
|
||||
return fmt.Errorf("respSendJoin: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -101,7 +101,8 @@ func Setup(
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return JoinRoomByIDOrAlias(
|
||||
req, device, vars["roomIDOrAlias"], cfg, federation, producer, queryAPI, aliasAPI, keyRing, accountDB,
|
||||
req, device, vars["roomIDOrAlias"], cfg, federation, producer,
|
||||
queryAPI, aliasAPI, federationSender, keyRing, accountDB,
|
||||
)
|
||||
}),
|
||||
).Methods(http.MethodPost, http.MethodOptions)
|
||||
|
@ -153,7 +153,7 @@ func main() {
|
||||
asQuery := appservice.SetupAppServiceAPIComponent(
|
||||
&base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(),
|
||||
)
|
||||
fsAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query, input)
|
||||
fsAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query, input, &keyRing)
|
||||
|
||||
clientapi.SetupClientAPIComponent(
|
||||
&base.Base, deviceDB, accountDB,
|
||||
|
@ -16,6 +16,7 @@ package main
|
||||
|
||||
import (
|
||||
"github.com/matrix-org/dendrite/common/basecomponent"
|
||||
"github.com/matrix-org/dendrite/common/keydb"
|
||||
"github.com/matrix-org/dendrite/federationsender"
|
||||
)
|
||||
|
||||
@ -25,11 +26,13 @@ func main() {
|
||||
defer base.Close() // nolint: errcheck
|
||||
|
||||
federation := base.CreateFederationClient()
|
||||
keyDB := base.CreateKeyDB()
|
||||
keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives)
|
||||
|
||||
_, input, query := base.CreateHTTPRoomserverAPIs()
|
||||
|
||||
federationsender.SetupFederationSenderComponent(
|
||||
base, federation, query, input,
|
||||
base, federation, query, input, &keyRing,
|
||||
)
|
||||
|
||||
base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationSender), string(base.Cfg.Listen.FederationSender))
|
||||
|
@ -62,7 +62,7 @@ func main() {
|
||||
asQuery := appservice.SetupAppServiceAPIComponent(
|
||||
base, accountDB, deviceDB, federation, alias, query, transactions.New(),
|
||||
)
|
||||
fsAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input)
|
||||
fsAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input, &keyRing)
|
||||
input.SetFederationSenderAPI(fsAPI)
|
||||
|
||||
clientapi.SetupClientAPIComponent(
|
||||
|
@ -128,7 +128,7 @@ func main() {
|
||||
asQuery := appservice.SetupAppServiceAPIComponent(
|
||||
base, accountDB, deviceDB, federation, alias, query, transactions.New(),
|
||||
)
|
||||
fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input)
|
||||
fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input, &keyRing)
|
||||
input.SetFederationSenderAPI(fedSenderAPI)
|
||||
|
||||
clientapi.SetupClientAPIComponent(
|
||||
|
@ -25,13 +25,13 @@ type FederationSenderInternalAPI interface {
|
||||
response *QueryJoinedHostServerNamesInRoomResponse,
|
||||
) error
|
||||
// Handle an instruction to make_join & send_join with a remote server.
|
||||
PerformJoinRequest(
|
||||
PerformJoin(
|
||||
ctx context.Context,
|
||||
request *PerformJoinRequest,
|
||||
response *PerformJoinResponse,
|
||||
) error
|
||||
// Handle an instruction to make_leave & send_leave with a remote server.
|
||||
PerformLeaveRequest(
|
||||
PerformLeave(
|
||||
ctx context.Context,
|
||||
request *PerformLeaveRequest,
|
||||
response *PerformLeaveResponse,
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
|
||||
commonHTTP "github.com/matrix-org/dendrite/common/http"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
)
|
||||
|
||||
@ -16,14 +17,17 @@ const (
|
||||
)
|
||||
|
||||
type PerformJoinRequest struct {
|
||||
RoomID string `json:"room_id"`
|
||||
RoomID string `json:"room_id"`
|
||||
UserID string `json:"user_id"`
|
||||
ServerName gomatrixserverlib.ServerName `json:"server_name"`
|
||||
Content map[string]interface{} `json:"content"`
|
||||
}
|
||||
|
||||
type PerformJoinResponse struct {
|
||||
}
|
||||
|
||||
// Handle an instruction to make_join & send_join with a remote server.
|
||||
func (h *httpFederationSenderInternalAPI) PerformJoinRequest(
|
||||
func (h *httpFederationSenderInternalAPI) PerformJoin(
|
||||
ctx context.Context,
|
||||
request *PerformJoinRequest,
|
||||
response *PerformJoinResponse,
|
||||
@ -43,7 +47,7 @@ type PerformLeaveResponse struct {
|
||||
}
|
||||
|
||||
// Handle an instruction to make_leave & send_leave with a remote server.
|
||||
func (h *httpFederationSenderInternalAPI) PerformLeaveRequest(
|
||||
func (h *httpFederationSenderInternalAPI) PerformLeave(
|
||||
ctx context.Context,
|
||||
request *PerformLeaveRequest,
|
||||
response *PerformLeaveResponse,
|
||||
|
@ -36,6 +36,7 @@ func SetupFederationSenderComponent(
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
rsQueryAPI roomserverAPI.RoomserverQueryAPI,
|
||||
rsInputAPI roomserverAPI.RoomserverInputAPI,
|
||||
keyRing *gomatrixserverlib.KeyRing,
|
||||
) api.FederationSenderInternalAPI {
|
||||
federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender))
|
||||
if err != nil {
|
||||
@ -61,10 +62,10 @@ func SetupFederationSenderComponent(
|
||||
logrus.WithError(err).Panic("failed to start typing server consumer")
|
||||
}
|
||||
|
||||
queryAPI := query.FederationSenderInternalAPI{
|
||||
DB: federationSenderDB,
|
||||
}
|
||||
queryAPI := query.NewFederationSenderInternalAPI(
|
||||
federationSenderDB, base.Cfg, roomserverProducer, federation, keyRing,
|
||||
)
|
||||
queryAPI.SetupHTTP(http.DefaultServeMux)
|
||||
|
||||
return &queryAPI
|
||||
return queryAPI
|
||||
}
|
||||
|
@ -54,6 +54,42 @@ func (c *RoomserverProducer) SendInviteResponse(
|
||||
return c.SendInputRoomEvents(ctx, []api.InputRoomEvent{ire})
|
||||
}
|
||||
|
||||
// SendEventWithState writes an event with KindNew to the roomserver input log
|
||||
// with the state at the event as KindOutlier before it.
|
||||
func (c *RoomserverProducer) SendEventWithState(
|
||||
ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent,
|
||||
) error {
|
||||
outliers, err := state.Events()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var ires []api.InputRoomEvent
|
||||
for _, outlier := range outliers {
|
||||
ires = append(ires, api.InputRoomEvent{
|
||||
Kind: api.KindOutlier,
|
||||
Event: outlier.Headered(event.RoomVersion),
|
||||
AuthEventIDs: outlier.AuthEventIDs(),
|
||||
})
|
||||
}
|
||||
|
||||
stateEventIDs := make([]string, len(state.StateEvents))
|
||||
for i := range state.StateEvents {
|
||||
stateEventIDs[i] = state.StateEvents[i].EventID()
|
||||
}
|
||||
|
||||
ires = append(ires, api.InputRoomEvent{
|
||||
Kind: api.KindNew,
|
||||
Event: event,
|
||||
AuthEventIDs: event.AuthEventIDs(),
|
||||
HasState: true,
|
||||
StateEventIDs: stateEventIDs,
|
||||
})
|
||||
|
||||
_, err = c.SendInputRoomEvents(ctx, ires)
|
||||
return err
|
||||
}
|
||||
|
||||
// SendInputRoomEvents writes the given input room events to the roomserver input API.
|
||||
func (c *RoomserverProducer) SendInputRoomEvents(
|
||||
ctx context.Context, ires []api.InputRoomEvent,
|
||||
|
@ -5,17 +5,37 @@ import (
|
||||
"net/http"
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/federationsender/api"
|
||||
"github.com/matrix-org/dendrite/federationsender/producers"
|
||||
"github.com/matrix-org/dendrite/federationsender/storage"
|
||||
rsAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
// FederationSenderInternalAPI is an implementation of api.FederationSenderInternalAPI
|
||||
type FederationSenderInternalAPI struct {
|
||||
api.FederationSenderInternalAPI
|
||||
DB storage.Database
|
||||
RoomserverInputAPI rsAPI.RoomserverInputAPI
|
||||
db storage.Database
|
||||
cfg *config.Dendrite
|
||||
producer *producers.RoomserverProducer
|
||||
federation *gomatrixserverlib.FederationClient
|
||||
keyRing *gomatrixserverlib.KeyRing
|
||||
}
|
||||
|
||||
func NewFederationSenderInternalAPI(
|
||||
db storage.Database, cfg *config.Dendrite,
|
||||
producer *producers.RoomserverProducer,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
keyRing *gomatrixserverlib.KeyRing,
|
||||
) *FederationSenderInternalAPI {
|
||||
return &FederationSenderInternalAPI{
|
||||
db: db,
|
||||
cfg: cfg,
|
||||
producer: producer,
|
||||
federation: federation,
|
||||
keyRing: keyRing,
|
||||
}
|
||||
}
|
||||
|
||||
// SetupHTTP adds the FederationSenderInternalAPI handlers to the http.ServeMux.
|
||||
@ -55,7 +75,7 @@ func (f *FederationSenderInternalAPI) SetupHTTP(servMux *http.ServeMux) {
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
if err := f.PerformJoinRequest(req.Context(), &request, &response); err != nil {
|
||||
if err := f.PerformJoin(req.Context(), &request, &response); err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
@ -68,7 +88,7 @@ func (f *FederationSenderInternalAPI) SetupHTTP(servMux *http.ServeMux) {
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
if err := f.PerformLeaveRequest(req.Context(), &request, &response); err != nil {
|
||||
if err := f.PerformLeave(req.Context(), &request, &response); err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
|
@ -2,21 +2,116 @@ package query
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/federationsender/api"
|
||||
"github.com/matrix-org/dendrite/federationsender/query/perform"
|
||||
"github.com/matrix-org/dendrite/roomserver/version"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// PerformJoinRequest implements api.FederationSenderInternalAPI
|
||||
func (r *FederationSenderInternalAPI) PerformJoinRequest(
|
||||
func (r *FederationSenderInternalAPI) PerformJoin(
|
||||
ctx context.Context,
|
||||
request *api.PerformJoinRequest,
|
||||
response *api.PerformJoinResponse,
|
||||
) (err error) {
|
||||
// Look up the supported room versions.
|
||||
var supportedVersions []gomatrixserverlib.RoomVersion
|
||||
for version := range version.SupportedRoomVersions() {
|
||||
supportedVersions = append(supportedVersions, version)
|
||||
}
|
||||
|
||||
// Try to perform a make_join using the information supplied in the
|
||||
// request.
|
||||
respMakeJoin, err := r.federation.MakeJoin(
|
||||
ctx,
|
||||
request.ServerName,
|
||||
request.RoomID,
|
||||
request.UserID,
|
||||
supportedVersions,
|
||||
)
|
||||
if err != nil {
|
||||
// TODO: Check if the user was not allowed to join the room.
|
||||
return fmt.Errorf("r.federation.MakeJoin: %w", err)
|
||||
}
|
||||
|
||||
// Set all the fields to be what they should be, this should be a no-op
|
||||
// but it's possible that the remote server returned us something "odd"
|
||||
respMakeJoin.JoinEvent.Type = "m.room.member"
|
||||
respMakeJoin.JoinEvent.Sender = request.UserID
|
||||
respMakeJoin.JoinEvent.StateKey = &request.UserID
|
||||
respMakeJoin.JoinEvent.RoomID = request.RoomID
|
||||
respMakeJoin.JoinEvent.Redacts = ""
|
||||
if request.Content == nil {
|
||||
request.Content = map[string]interface{}{}
|
||||
}
|
||||
request.Content["membership"] = "join"
|
||||
if err = respMakeJoin.JoinEvent.SetContent(request.Content); err != nil {
|
||||
return fmt.Errorf("respMakeJoin.JoinEvent.SetContent: %w", err)
|
||||
}
|
||||
if err = respMakeJoin.JoinEvent.SetUnsigned(struct{}{}); err != nil {
|
||||
return fmt.Errorf("respMakeJoin.JoinEvent.SetUnsigned: %w", err)
|
||||
}
|
||||
|
||||
// Work out if we support the room version that has been supplied in
|
||||
// the make_join response.
|
||||
if respMakeJoin.RoomVersion == "" {
|
||||
respMakeJoin.RoomVersion = gomatrixserverlib.RoomVersionV1
|
||||
}
|
||||
if _, err = respMakeJoin.RoomVersion.EventFormat(); err != nil {
|
||||
return fmt.Errorf("respMakeJoin.RoomVersion.EventFormat: %w", err)
|
||||
}
|
||||
|
||||
// Build the join event.
|
||||
event, err := respMakeJoin.JoinEvent.Build(
|
||||
time.Now(),
|
||||
r.cfg.Matrix.ServerName,
|
||||
r.cfg.Matrix.KeyID,
|
||||
r.cfg.Matrix.PrivateKey,
|
||||
respMakeJoin.RoomVersion,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err)
|
||||
}
|
||||
|
||||
// Try to perform a send_join using the newly built event.
|
||||
respSendJoin, err := r.federation.SendJoin(
|
||||
ctx,
|
||||
request.ServerName,
|
||||
event,
|
||||
respMakeJoin.RoomVersion,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.federation.SendJoin: %w", err)
|
||||
}
|
||||
|
||||
// Check that the send_join response was valid.
|
||||
joinCtx := perform.JoinContext(r.federation, r.keyRing)
|
||||
if err = joinCtx.CheckSendJoinResponse(
|
||||
ctx, event, request.ServerName, respMakeJoin, respSendJoin,
|
||||
); err != nil {
|
||||
return fmt.Errorf("perform.JoinRequest.CheckSendJoinResponse: %w", err)
|
||||
}
|
||||
|
||||
// If we successfully performed a send_join above then the other
|
||||
// server now thinks we're a part of the room. Send the newly
|
||||
// returned state to the roomserver to update our local view.
|
||||
if err = r.producer.SendEventWithState(
|
||||
ctx,
|
||||
respSendJoin.ToRespState(),
|
||||
event.Headered(respMakeJoin.RoomVersion),
|
||||
); err != nil {
|
||||
return fmt.Errorf("r.producer.SendEventWithState: %w", err)
|
||||
}
|
||||
|
||||
// Everything went to plan.
|
||||
return nil
|
||||
}
|
||||
|
||||
// PerformLeaveRequest implements api.FederationSenderInternalAPI
|
||||
func (r *FederationSenderInternalAPI) PerformLeaveRequest(
|
||||
func (r *FederationSenderInternalAPI) PerformLeave(
|
||||
ctx context.Context,
|
||||
request *api.PerformLeaveRequest,
|
||||
response *api.PerformLeaveResponse,
|
||||
|
70
federationsender/query/perform/join.go
Normal file
70
federationsender/query/perform/join.go
Normal file
@ -0,0 +1,70 @@
|
||||
package perform
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// This file contains helpers for the PerformJoin function.
|
||||
|
||||
type joinContext struct {
|
||||
federation *gomatrixserverlib.FederationClient
|
||||
keyRing *gomatrixserverlib.KeyRing
|
||||
}
|
||||
|
||||
// Returns a new join context.
|
||||
func JoinContext(f *gomatrixserverlib.FederationClient, k *gomatrixserverlib.KeyRing) *joinContext {
|
||||
return &joinContext{
|
||||
federation: f,
|
||||
keyRing: k,
|
||||
}
|
||||
}
|
||||
|
||||
// checkSendJoinResponse checks that all of the signatures are correct
|
||||
// and that the join is allowed by the supplied state.
|
||||
func (r joinContext) CheckSendJoinResponse(
|
||||
ctx context.Context,
|
||||
event gomatrixserverlib.Event,
|
||||
server gomatrixserverlib.ServerName,
|
||||
respMakeJoin gomatrixserverlib.RespMakeJoin,
|
||||
respSendJoin gomatrixserverlib.RespSendJoin,
|
||||
) error {
|
||||
// A list of events that we have retried, if they were not included in
|
||||
// the auth events supplied in the send_join.
|
||||
retries := map[string]bool{}
|
||||
|
||||
retryCheck:
|
||||
// TODO: Can we expand Check here to return a list of missing auth
|
||||
// events rather than failing one at a time?
|
||||
if err := respSendJoin.Check(ctx, r.keyRing, event); err != nil {
|
||||
switch e := err.(type) {
|
||||
case gomatrixserverlib.MissingAuthEventError:
|
||||
// Check that we haven't already retried for this event, prevents
|
||||
// us from ending up in endless loops
|
||||
if !retries[e.AuthEventID] {
|
||||
// Ask the server that we're talking to right now for the event
|
||||
tx, txerr := r.federation.GetEvent(ctx, server, e.AuthEventID)
|
||||
if txerr != nil {
|
||||
return fmt.Errorf("r.federation.GetEvent: %w", txerr)
|
||||
}
|
||||
// For each event returned, add it to the auth events.
|
||||
for _, pdu := range tx.PDUs {
|
||||
ev, everr := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, respMakeJoin.RoomVersion)
|
||||
if everr != nil {
|
||||
return fmt.Errorf("gomatrixserverlib.NewEventFromUntrustedJSON: %w", everr)
|
||||
}
|
||||
respSendJoin.AuthEvents = append(respSendJoin.AuthEvents, ev)
|
||||
}
|
||||
// Mark the event as retried and then give the check another go.
|
||||
retries[e.AuthEventID] = true
|
||||
goto retryCheck
|
||||
}
|
||||
return fmt.Errorf("respSendJoin (after retries): %w", e)
|
||||
default:
|
||||
return fmt.Errorf("respSendJoin: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@ -13,7 +13,7 @@ func (f *FederationSenderInternalAPI) QueryJoinedHostsInRoom(
|
||||
request *api.QueryJoinedHostsInRoomRequest,
|
||||
response *api.QueryJoinedHostsInRoomResponse,
|
||||
) (err error) {
|
||||
response.JoinedHosts, err = f.DB.GetJoinedHosts(ctx, request.RoomID)
|
||||
response.JoinedHosts, err = f.db.GetJoinedHosts(ctx, request.RoomID)
|
||||
return
|
||||
}
|
||||
|
||||
@ -23,7 +23,7 @@ func (f *FederationSenderInternalAPI) QueryJoinedHostServerNamesInRoom(
|
||||
request *api.QueryJoinedHostServerNamesInRoomRequest,
|
||||
response *api.QueryJoinedHostServerNamesInRoomResponse,
|
||||
) (err error) {
|
||||
joinedHosts, err := f.DB.GetJoinedHosts(ctx, request.RoomID)
|
||||
joinedHosts, err := f.db.GetJoinedHosts(ctx, request.RoomID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user