mirror of
https://github.com/1f349/dendrite.git
synced 2025-01-14 19:36:34 +00:00
b507312d4c
* Update GMSL * Add MSC2836EventRelationships to fedsender * Call MSC2836EventRelationships in reqCtx * auth remote servers * Extract room ID and servers from previous events; refactor a bit * initial cut of federated threading * Use the right client/fed struct in the response * Add QueryAuthChain for use with MSC2836 * Add auth chain to federated response * Fix pointers * under CI: more logging and enable mscs, nil fix * Handle direction: up * Actually send message events to the roomserver.. * Add children and children_hash to unsigned, with tests * Add logic for exploring threads and tracking children; missing storage functions * Implement storage functions for children * Add fetchUnknownEvent * Do federated hits for include_children if we have unexplored children * Use /ev_rel rather than /event as the former includes child metadata * Remove cross-room threading impl * Enable MSC2836 in the p2p demo * Namespace mscs db * Enable msc2836 for ygg Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
853 lines
29 KiB
Go
853 lines
29 KiB
Go
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// Package msc2836 'Threading' implements https://github.com/matrix-org/matrix-doc/pull/2836
|
|
package msc2836
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
|
fs "github.com/matrix-org/dendrite/federationsender/api"
|
|
"github.com/matrix-org/dendrite/internal/hooks"
|
|
"github.com/matrix-org/dendrite/internal/httputil"
|
|
roomserver "github.com/matrix-org/dendrite/roomserver/api"
|
|
"github.com/matrix-org/dendrite/setup"
|
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
"github.com/matrix-org/util"
|
|
)
|
|
|
|
const (
|
|
constRelType = "m.reference"
|
|
)
|
|
|
|
type EventRelationshipRequest struct {
|
|
EventID string `json:"event_id"`
|
|
RoomID string `json:"room_id"`
|
|
MaxDepth int `json:"max_depth"`
|
|
MaxBreadth int `json:"max_breadth"`
|
|
Limit int `json:"limit"`
|
|
DepthFirst bool `json:"depth_first"`
|
|
RecentFirst bool `json:"recent_first"`
|
|
IncludeParent bool `json:"include_parent"`
|
|
IncludeChildren bool `json:"include_children"`
|
|
Direction string `json:"direction"`
|
|
Batch string `json:"batch"`
|
|
}
|
|
|
|
func NewEventRelationshipRequest(body io.Reader) (*EventRelationshipRequest, error) {
|
|
var relation EventRelationshipRequest
|
|
relation.Defaults()
|
|
if err := json.NewDecoder(body).Decode(&relation); err != nil {
|
|
return nil, err
|
|
}
|
|
return &relation, nil
|
|
}
|
|
|
|
func (r *EventRelationshipRequest) Defaults() {
|
|
r.Limit = 100
|
|
r.MaxBreadth = 10
|
|
r.MaxDepth = 3
|
|
r.DepthFirst = false
|
|
r.RecentFirst = true
|
|
r.IncludeParent = false
|
|
r.IncludeChildren = false
|
|
r.Direction = "down"
|
|
}
|
|
|
|
type EventRelationshipResponse struct {
|
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
|
NextBatch string `json:"next_batch"`
|
|
Limited bool `json:"limited"`
|
|
}
|
|
|
|
func toClientResponse(res *gomatrixserverlib.MSC2836EventRelationshipsResponse) *EventRelationshipResponse {
|
|
out := &EventRelationshipResponse{
|
|
Events: gomatrixserverlib.ToClientEvents(res.Events, gomatrixserverlib.FormatAll),
|
|
Limited: res.Limited,
|
|
NextBatch: res.NextBatch,
|
|
}
|
|
return out
|
|
}
|
|
|
|
// Enable this MSC
|
|
func Enable(
|
|
base *setup.BaseDendrite, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI,
|
|
userAPI userapi.UserInternalAPI, keyRing gomatrixserverlib.JSONVerifier,
|
|
) error {
|
|
db, err := NewDatabase(&base.Cfg.MSCs.Database)
|
|
if err != nil {
|
|
return fmt.Errorf("Cannot enable MSC2836: %w", err)
|
|
}
|
|
hooks.Enable()
|
|
hooks.Attach(hooks.KindNewEventPersisted, func(headeredEvent interface{}) {
|
|
he := headeredEvent.(*gomatrixserverlib.HeaderedEvent)
|
|
hookErr := db.StoreRelation(context.Background(), he)
|
|
if hookErr != nil {
|
|
util.GetLogger(context.Background()).WithError(hookErr).WithField("event_id", he.EventID()).Error(
|
|
"failed to StoreRelation",
|
|
)
|
|
}
|
|
// we need to update child metadata here as well as after doing remote /event_relationships requests
|
|
// so we catch child metadata originating from /send transactions
|
|
hookErr = db.UpdateChildMetadata(context.Background(), he)
|
|
if hookErr != nil {
|
|
util.GetLogger(context.Background()).WithError(err).WithField("event_id", he.EventID()).Warn(
|
|
"failed to update child metadata for event",
|
|
)
|
|
}
|
|
})
|
|
|
|
base.PublicClientAPIMux.Handle("/unstable/event_relationships",
|
|
httputil.MakeAuthAPI("eventRelationships", userAPI, eventRelationshipHandler(db, rsAPI, fsAPI)),
|
|
).Methods(http.MethodPost, http.MethodOptions)
|
|
|
|
base.PublicFederationAPIMux.Handle("/unstable/event_relationships", httputil.MakeExternalAPI(
|
|
"msc2836_event_relationships", func(req *http.Request) util.JSONResponse {
|
|
fedReq, errResp := gomatrixserverlib.VerifyHTTPRequest(
|
|
req, time.Now(), base.Cfg.Global.ServerName, keyRing,
|
|
)
|
|
if fedReq == nil {
|
|
return errResp
|
|
}
|
|
return federatedEventRelationship(req.Context(), fedReq, db, rsAPI, fsAPI)
|
|
},
|
|
)).Methods(http.MethodPost, http.MethodOptions)
|
|
return nil
|
|
}
|
|
|
|
type reqCtx struct {
|
|
ctx context.Context
|
|
rsAPI roomserver.RoomserverInternalAPI
|
|
db Database
|
|
req *EventRelationshipRequest
|
|
userID string
|
|
roomVersion gomatrixserverlib.RoomVersion
|
|
|
|
// federated request args
|
|
isFederatedRequest bool
|
|
serverName gomatrixserverlib.ServerName
|
|
fsAPI fs.FederationSenderInternalAPI
|
|
}
|
|
|
|
func eventRelationshipHandler(db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI) func(*http.Request, *userapi.Device) util.JSONResponse {
|
|
return func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
|
relation, err := NewEventRelationshipRequest(req.Body)
|
|
if err != nil {
|
|
util.GetLogger(req.Context()).WithError(err).Error("failed to decode HTTP request as JSON")
|
|
return util.JSONResponse{
|
|
Code: 400,
|
|
JSON: jsonerror.BadJSON(fmt.Sprintf("invalid json: %s", err)),
|
|
}
|
|
}
|
|
rc := reqCtx{
|
|
ctx: req.Context(),
|
|
req: relation,
|
|
userID: device.UserID,
|
|
rsAPI: rsAPI,
|
|
fsAPI: fsAPI,
|
|
isFederatedRequest: false,
|
|
db: db,
|
|
}
|
|
res, resErr := rc.process()
|
|
if resErr != nil {
|
|
return *resErr
|
|
}
|
|
|
|
return util.JSONResponse{
|
|
Code: 200,
|
|
JSON: toClientResponse(res),
|
|
}
|
|
}
|
|
}
|
|
|
|
func federatedEventRelationship(
|
|
ctx context.Context, fedReq *gomatrixserverlib.FederationRequest, db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI,
|
|
) util.JSONResponse {
|
|
relation, err := NewEventRelationshipRequest(bytes.NewBuffer(fedReq.Content()))
|
|
if err != nil {
|
|
util.GetLogger(ctx).WithError(err).Error("failed to decode HTTP request as JSON")
|
|
return util.JSONResponse{
|
|
Code: 400,
|
|
JSON: jsonerror.BadJSON(fmt.Sprintf("invalid json: %s", err)),
|
|
}
|
|
}
|
|
rc := reqCtx{
|
|
ctx: ctx,
|
|
req: relation,
|
|
rsAPI: rsAPI,
|
|
db: db,
|
|
// federation args
|
|
isFederatedRequest: true,
|
|
fsAPI: fsAPI,
|
|
serverName: fedReq.Origin(),
|
|
}
|
|
res, resErr := rc.process()
|
|
if resErr != nil {
|
|
return *resErr
|
|
}
|
|
// add auth chain information
|
|
requiredAuthEventsSet := make(map[string]bool)
|
|
var requiredAuthEvents []string
|
|
for _, ev := range res.Events {
|
|
for _, a := range ev.AuthEventIDs() {
|
|
if requiredAuthEventsSet[a] {
|
|
continue
|
|
}
|
|
requiredAuthEvents = append(requiredAuthEvents, a)
|
|
requiredAuthEventsSet[a] = true
|
|
}
|
|
}
|
|
var queryRes roomserver.QueryAuthChainResponse
|
|
err = rsAPI.QueryAuthChain(ctx, &roomserver.QueryAuthChainRequest{
|
|
EventIDs: requiredAuthEvents,
|
|
}, &queryRes)
|
|
if err != nil {
|
|
// they may already have the auth events so don't fail this request
|
|
util.GetLogger(ctx).WithError(err).Error("Failed to QueryAuthChain")
|
|
}
|
|
res.AuthChain = make([]*gomatrixserverlib.Event, len(queryRes.AuthChain))
|
|
for i := range queryRes.AuthChain {
|
|
res.AuthChain[i] = queryRes.AuthChain[i].Unwrap()
|
|
}
|
|
|
|
return util.JSONResponse{
|
|
Code: 200,
|
|
JSON: res,
|
|
}
|
|
}
|
|
|
|
// nolint:gocyclo
|
|
func (rc *reqCtx) process() (*gomatrixserverlib.MSC2836EventRelationshipsResponse, *util.JSONResponse) {
|
|
var res gomatrixserverlib.MSC2836EventRelationshipsResponse
|
|
var returnEvents []*gomatrixserverlib.HeaderedEvent
|
|
// Can the user see (according to history visibility) event_id? If no, reject the request, else continue.
|
|
event := rc.getLocalEvent(rc.req.EventID)
|
|
if event == nil {
|
|
event = rc.fetchUnknownEvent(rc.req.EventID, rc.req.RoomID)
|
|
}
|
|
if rc.req.RoomID == "" && event != nil {
|
|
rc.req.RoomID = event.RoomID()
|
|
}
|
|
if event == nil || !rc.authorisedToSeeEvent(event) {
|
|
return nil, &util.JSONResponse{
|
|
Code: 403,
|
|
JSON: jsonerror.Forbidden("Event does not exist or you are not authorised to see it"),
|
|
}
|
|
}
|
|
rc.roomVersion = event.Version()
|
|
|
|
// Retrieve the event. Add it to response array.
|
|
returnEvents = append(returnEvents, event)
|
|
|
|
if rc.req.IncludeParent {
|
|
if parentEvent := rc.includeParent(event); parentEvent != nil {
|
|
returnEvents = append(returnEvents, parentEvent)
|
|
}
|
|
}
|
|
|
|
if rc.req.IncludeChildren {
|
|
remaining := rc.req.Limit - len(returnEvents)
|
|
if remaining > 0 {
|
|
children, resErr := rc.includeChildren(rc.db, event.EventID(), remaining, rc.req.RecentFirst)
|
|
if resErr != nil {
|
|
return nil, resErr
|
|
}
|
|
returnEvents = append(returnEvents, children...)
|
|
}
|
|
}
|
|
|
|
remaining := rc.req.Limit - len(returnEvents)
|
|
var walkLimited bool
|
|
if remaining > 0 {
|
|
included := make(map[string]bool, len(returnEvents))
|
|
for _, ev := range returnEvents {
|
|
included[ev.EventID()] = true
|
|
}
|
|
var events []*gomatrixserverlib.HeaderedEvent
|
|
events, walkLimited = walkThread(
|
|
rc.ctx, rc.db, rc, included, remaining,
|
|
)
|
|
returnEvents = append(returnEvents, events...)
|
|
}
|
|
res.Events = make([]*gomatrixserverlib.Event, len(returnEvents))
|
|
for i, ev := range returnEvents {
|
|
// for each event, extract the children_count | hash and add it as unsigned data.
|
|
rc.addChildMetadata(ev)
|
|
res.Events[i] = ev.Unwrap()
|
|
}
|
|
res.Limited = remaining == 0 || walkLimited
|
|
return &res, nil
|
|
}
|
|
|
|
// fetchUnknownEvent retrieves an unknown event from the room specified. This server must
|
|
// be joined to the room in question. This has the side effect of injecting surround threaded
|
|
// events into the roomserver.
|
|
func (rc *reqCtx) fetchUnknownEvent(eventID, roomID string) *gomatrixserverlib.HeaderedEvent {
|
|
if rc.isFederatedRequest || roomID == "" {
|
|
// we don't do fed hits for fed requests, and we can't ask servers without a room ID!
|
|
return nil
|
|
}
|
|
logger := util.GetLogger(rc.ctx).WithField("room_id", roomID)
|
|
// if they supplied a room_id, check the room exists.
|
|
var queryVerRes roomserver.QueryRoomVersionForRoomResponse
|
|
err := rc.rsAPI.QueryRoomVersionForRoom(rc.ctx, &roomserver.QueryRoomVersionForRoomRequest{
|
|
RoomID: roomID,
|
|
}, &queryVerRes)
|
|
if err != nil {
|
|
logger.WithError(err).Warn("failed to query room version for room, does this room exist?")
|
|
return nil
|
|
}
|
|
|
|
// check the user is joined to that room
|
|
var queryMemRes roomserver.QueryMembershipForUserResponse
|
|
err = rc.rsAPI.QueryMembershipForUser(rc.ctx, &roomserver.QueryMembershipForUserRequest{
|
|
RoomID: roomID,
|
|
UserID: rc.userID,
|
|
}, &queryMemRes)
|
|
if err != nil {
|
|
logger.WithError(err).Warn("failed to query membership for user in room")
|
|
return nil
|
|
}
|
|
if !queryMemRes.IsInRoom {
|
|
return nil
|
|
}
|
|
|
|
// ask one of the servers in the room for the event
|
|
var queryRes fs.QueryJoinedHostServerNamesInRoomResponse
|
|
err = rc.fsAPI.QueryJoinedHostServerNamesInRoom(rc.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{
|
|
RoomID: roomID,
|
|
}, &queryRes)
|
|
if err != nil {
|
|
logger.WithError(err).Error("failed to QueryJoinedHostServerNamesInRoom")
|
|
return nil
|
|
}
|
|
// query up to 5 servers
|
|
serversToQuery := queryRes.ServerNames
|
|
if len(serversToQuery) > 5 {
|
|
serversToQuery = serversToQuery[:5]
|
|
}
|
|
|
|
// fetch the event, along with some of the surrounding thread (if it's threaded) and the auth chain.
|
|
// Inject the response into the roomserver to remember the event across multiple calls and to set
|
|
// unexplored flags correctly.
|
|
for _, srv := range serversToQuery {
|
|
res, err := rc.MSC2836EventRelationships(eventID, srv, queryVerRes.RoomVersion)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
rc.injectResponseToRoomserver(res)
|
|
for _, ev := range res.Events {
|
|
if ev.EventID() == eventID {
|
|
return ev.Headered(ev.Version())
|
|
}
|
|
}
|
|
}
|
|
logger.WithField("servers", serversToQuery).Warn("failed to query event relationships")
|
|
return nil
|
|
}
|
|
|
|
// If include_parent: true and there is a valid m.relationship field in the event,
|
|
// retrieve the referenced event. Apply history visibility check to that event and if it passes, add it to the response array.
|
|
func (rc *reqCtx) includeParent(childEvent *gomatrixserverlib.HeaderedEvent) (parent *gomatrixserverlib.HeaderedEvent) {
|
|
parentID, _, _ := parentChildEventIDs(childEvent)
|
|
if parentID == "" {
|
|
return nil
|
|
}
|
|
return rc.lookForEvent(parentID)
|
|
}
|
|
|
|
// If include_children: true, lookup all events which have event_id as an m.relationship
|
|
// Apply history visibility checks to all these events and add the ones which pass into the response array,
|
|
// honouring the recent_first flag and the limit.
|
|
func (rc *reqCtx) includeChildren(db Database, parentID string, limit int, recentFirst bool) ([]*gomatrixserverlib.HeaderedEvent, *util.JSONResponse) {
|
|
if rc.hasUnexploredChildren(parentID) {
|
|
// we need to do a remote request to pull in the children as we are missing them locally.
|
|
serversToQuery := rc.getServersForEventID(parentID)
|
|
var result *gomatrixserverlib.MSC2836EventRelationshipsResponse
|
|
for _, srv := range serversToQuery {
|
|
res, err := rc.fsAPI.MSC2836EventRelationships(rc.ctx, srv, gomatrixserverlib.MSC2836EventRelationshipsRequest{
|
|
EventID: parentID,
|
|
Direction: "down",
|
|
Limit: 100,
|
|
MaxBreadth: -1,
|
|
MaxDepth: 1, // we just want the children from this parent
|
|
RecentFirst: true,
|
|
}, rc.roomVersion)
|
|
if err != nil {
|
|
util.GetLogger(rc.ctx).WithError(err).WithField("server", srv).Error("includeChildren: failed to call MSC2836EventRelationships")
|
|
} else {
|
|
result = &res
|
|
break
|
|
}
|
|
}
|
|
if result != nil {
|
|
rc.injectResponseToRoomserver(result)
|
|
}
|
|
// fallthrough to pull these new events from the DB
|
|
}
|
|
children, err := db.ChildrenForParent(rc.ctx, parentID, constRelType, recentFirst)
|
|
if err != nil {
|
|
util.GetLogger(rc.ctx).WithError(err).Error("failed to get ChildrenForParent")
|
|
resErr := jsonerror.InternalServerError()
|
|
return nil, &resErr
|
|
}
|
|
var childEvents []*gomatrixserverlib.HeaderedEvent
|
|
for _, child := range children {
|
|
childEvent := rc.lookForEvent(child.EventID)
|
|
if childEvent != nil {
|
|
childEvents = append(childEvents, childEvent)
|
|
}
|
|
}
|
|
if len(childEvents) > limit {
|
|
return childEvents[:limit], nil
|
|
}
|
|
return childEvents, nil
|
|
}
|
|
|
|
// Begin to walk the thread DAG in the direction specified, either depth or breadth first according to the depth_first flag,
|
|
// honouring the limit, max_depth and max_breadth values according to the following rules
|
|
func walkThread(
|
|
ctx context.Context, db Database, rc *reqCtx, included map[string]bool, limit int,
|
|
) ([]*gomatrixserverlib.HeaderedEvent, bool) {
|
|
var result []*gomatrixserverlib.HeaderedEvent
|
|
eventWalker := walker{
|
|
ctx: ctx,
|
|
req: rc.req,
|
|
db: db,
|
|
fn: func(wi *walkInfo) bool {
|
|
// If already processed event, skip.
|
|
if included[wi.EventID] {
|
|
return false
|
|
}
|
|
|
|
// If the response array is >= limit, stop.
|
|
if len(result) >= limit {
|
|
return true
|
|
}
|
|
|
|
// Process the event.
|
|
// if event is not found, use remoteEventRelationships to explore that part of the thread remotely.
|
|
// This will probably be easiest if the event relationships response is directly pumped into the database
|
|
// so the next walk will do the right thing. This requires those events to be authed and likely injected as
|
|
// outliers into the roomserver DB, which will de-dupe appropriately.
|
|
event := rc.lookForEvent(wi.EventID)
|
|
if event != nil {
|
|
result = append(result, event)
|
|
}
|
|
included[wi.EventID] = true
|
|
return false
|
|
},
|
|
}
|
|
limited, err := eventWalker.WalkFrom(rc.req.EventID)
|
|
if err != nil {
|
|
util.GetLogger(ctx).WithError(err).Errorf("Failed to WalkFrom %s", rc.req.EventID)
|
|
}
|
|
return result, limited
|
|
}
|
|
|
|
// MSC2836EventRelationships performs an /event_relationships request to a remote server
|
|
func (rc *reqCtx) MSC2836EventRelationships(eventID string, srv gomatrixserverlib.ServerName, ver gomatrixserverlib.RoomVersion) (*gomatrixserverlib.MSC2836EventRelationshipsResponse, error) {
|
|
res, err := rc.fsAPI.MSC2836EventRelationships(rc.ctx, srv, gomatrixserverlib.MSC2836EventRelationshipsRequest{
|
|
EventID: eventID,
|
|
DepthFirst: rc.req.DepthFirst,
|
|
Direction: rc.req.Direction,
|
|
Limit: rc.req.Limit,
|
|
MaxBreadth: rc.req.MaxBreadth,
|
|
MaxDepth: rc.req.MaxDepth,
|
|
RecentFirst: rc.req.RecentFirst,
|
|
}, ver)
|
|
if err != nil {
|
|
util.GetLogger(rc.ctx).WithError(err).Error("Failed to call MSC2836EventRelationships")
|
|
return nil, err
|
|
}
|
|
return &res, nil
|
|
|
|
}
|
|
|
|
// authorisedToSeeEvent checks that the user or server is allowed to see this event. Returns true if allowed to
|
|
// see this request. This only needs to be done once per room at present as we just check for joined status.
|
|
func (rc *reqCtx) authorisedToSeeEvent(event *gomatrixserverlib.HeaderedEvent) bool {
|
|
if rc.isFederatedRequest {
|
|
// make sure the server is in this room
|
|
var res fs.QueryJoinedHostServerNamesInRoomResponse
|
|
err := rc.fsAPI.QueryJoinedHostServerNamesInRoom(rc.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{
|
|
RoomID: event.RoomID(),
|
|
}, &res)
|
|
if err != nil {
|
|
util.GetLogger(rc.ctx).WithError(err).Error("authorisedToSeeEvent: failed to QueryJoinedHostServerNamesInRoom")
|
|
return false
|
|
}
|
|
for _, srv := range res.ServerNames {
|
|
if srv == rc.serverName {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
// make sure the user is in this room
|
|
// Allow events if the member is in the room
|
|
// TODO: This does not honour history_visibility
|
|
// TODO: This does not honour m.room.create content
|
|
var queryMembershipRes roomserver.QueryMembershipForUserResponse
|
|
err := rc.rsAPI.QueryMembershipForUser(rc.ctx, &roomserver.QueryMembershipForUserRequest{
|
|
RoomID: event.RoomID(),
|
|
UserID: rc.userID,
|
|
}, &queryMembershipRes)
|
|
if err != nil {
|
|
util.GetLogger(rc.ctx).WithError(err).Error("authorisedToSeeEvent: failed to QueryMembershipForUser")
|
|
return false
|
|
}
|
|
return queryMembershipRes.IsInRoom
|
|
}
|
|
|
|
func (rc *reqCtx) getServersForEventID(eventID string) []gomatrixserverlib.ServerName {
|
|
if rc.req.RoomID == "" {
|
|
util.GetLogger(rc.ctx).WithField("event_id", eventID).Error(
|
|
"getServersForEventID: event exists in unknown room",
|
|
)
|
|
return nil
|
|
}
|
|
if rc.roomVersion == "" {
|
|
util.GetLogger(rc.ctx).WithField("event_id", eventID).Errorf(
|
|
"getServersForEventID: event exists in %s with unknown room version", rc.req.RoomID,
|
|
)
|
|
return nil
|
|
}
|
|
var queryRes fs.QueryJoinedHostServerNamesInRoomResponse
|
|
err := rc.fsAPI.QueryJoinedHostServerNamesInRoom(rc.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{
|
|
RoomID: rc.req.RoomID,
|
|
}, &queryRes)
|
|
if err != nil {
|
|
util.GetLogger(rc.ctx).WithError(err).Error("getServersForEventID: failed to QueryJoinedHostServerNamesInRoom")
|
|
return nil
|
|
}
|
|
// query up to 5 servers
|
|
serversToQuery := queryRes.ServerNames
|
|
if len(serversToQuery) > 5 {
|
|
serversToQuery = serversToQuery[:5]
|
|
}
|
|
return serversToQuery
|
|
}
|
|
|
|
func (rc *reqCtx) remoteEventRelationships(eventID string) *gomatrixserverlib.MSC2836EventRelationshipsResponse {
|
|
if rc.isFederatedRequest {
|
|
return nil // we don't query remote servers for remote requests
|
|
}
|
|
serversToQuery := rc.getServersForEventID(eventID)
|
|
var res *gomatrixserverlib.MSC2836EventRelationshipsResponse
|
|
var err error
|
|
for _, srv := range serversToQuery {
|
|
res, err = rc.MSC2836EventRelationships(eventID, srv, rc.roomVersion)
|
|
if err != nil {
|
|
util.GetLogger(rc.ctx).WithError(err).WithField("server", srv).Error("remoteEventRelationships: failed to call MSC2836EventRelationships")
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
// lookForEvent returns the event for the event ID given, by trying to query remote servers
|
|
// if the event ID is unknown via /event_relationships.
|
|
func (rc *reqCtx) lookForEvent(eventID string) *gomatrixserverlib.HeaderedEvent {
|
|
event := rc.getLocalEvent(eventID)
|
|
if event == nil {
|
|
queryRes := rc.remoteEventRelationships(eventID)
|
|
if queryRes != nil {
|
|
// inject all the events into the roomserver then return the event in question
|
|
rc.injectResponseToRoomserver(queryRes)
|
|
for _, ev := range queryRes.Events {
|
|
if ev.EventID() == eventID && rc.req.RoomID == ev.RoomID() {
|
|
return ev.Headered(ev.Version())
|
|
}
|
|
}
|
|
}
|
|
} else if rc.hasUnexploredChildren(eventID) {
|
|
// we have the local event but we may need to do a remote hit anyway if we are exploring the thread and have unknown children.
|
|
// If we don't do this then we risk never fetching the children.
|
|
queryRes := rc.remoteEventRelationships(eventID)
|
|
if queryRes != nil {
|
|
rc.injectResponseToRoomserver(queryRes)
|
|
err := rc.db.MarkChildrenExplored(context.Background(), eventID)
|
|
if err != nil {
|
|
util.GetLogger(rc.ctx).WithError(err).Warnf("failed to mark children of %s as explored", eventID)
|
|
}
|
|
}
|
|
}
|
|
if rc.req.RoomID == event.RoomID() {
|
|
return event
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (rc *reqCtx) getLocalEvent(eventID string) *gomatrixserverlib.HeaderedEvent {
|
|
var queryEventsRes roomserver.QueryEventsByIDResponse
|
|
err := rc.rsAPI.QueryEventsByID(rc.ctx, &roomserver.QueryEventsByIDRequest{
|
|
EventIDs: []string{eventID},
|
|
}, &queryEventsRes)
|
|
if err != nil {
|
|
util.GetLogger(rc.ctx).WithError(err).Error("getLocalEvent: failed to QueryEventsByID")
|
|
return nil
|
|
}
|
|
if len(queryEventsRes.Events) == 0 {
|
|
util.GetLogger(rc.ctx).WithField("event_id", eventID).Infof("getLocalEvent: event does not exist")
|
|
return nil // event does not exist
|
|
}
|
|
return queryEventsRes.Events[0]
|
|
}
|
|
|
|
// injectResponseToRoomserver injects the events
|
|
// into the roomserver as KindOutlier, with auth chains.
|
|
func (rc *reqCtx) injectResponseToRoomserver(res *gomatrixserverlib.MSC2836EventRelationshipsResponse) {
|
|
var stateEvents []*gomatrixserverlib.Event
|
|
var messageEvents []*gomatrixserverlib.Event
|
|
for _, ev := range res.Events {
|
|
if ev.StateKey() != nil {
|
|
stateEvents = append(stateEvents, ev)
|
|
} else {
|
|
messageEvents = append(messageEvents, ev)
|
|
}
|
|
}
|
|
respState := gomatrixserverlib.RespState{
|
|
AuthEvents: res.AuthChain,
|
|
StateEvents: stateEvents,
|
|
}
|
|
eventsInOrder, err := respState.Events()
|
|
if err != nil {
|
|
util.GetLogger(rc.ctx).WithError(err).Error("failed to calculate order to send events in MSC2836EventRelationshipsResponse")
|
|
return
|
|
}
|
|
// everything gets sent as an outlier because auth chain events may be disjoint from the DAG
|
|
// as may the threaded events.
|
|
var ires []roomserver.InputRoomEvent
|
|
for _, outlier := range append(eventsInOrder, messageEvents...) {
|
|
ires = append(ires, roomserver.InputRoomEvent{
|
|
Kind: roomserver.KindOutlier,
|
|
Event: outlier.Headered(outlier.Version()),
|
|
AuthEventIDs: outlier.AuthEventIDs(),
|
|
})
|
|
}
|
|
// we've got the data by this point so use a background context
|
|
err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires)
|
|
if err != nil {
|
|
util.GetLogger(rc.ctx).WithError(err).Error("failed to inject MSC2836EventRelationshipsResponse into the roomserver")
|
|
}
|
|
// update the child count / hash columns for these nodes. We need to do this here because not all events will make it
|
|
// through to the KindNewEventPersisted hook because the roomserver will ignore duplicates. Duplicates have meaning though
|
|
// as the `unsigned` field may differ (if the number of children changes).
|
|
for _, ev := range ires {
|
|
err = rc.db.UpdateChildMetadata(context.Background(), ev.Event)
|
|
if err != nil {
|
|
util.GetLogger(rc.ctx).WithError(err).WithField("event_id", ev.Event.EventID()).Warn("failed to update child metadata for event")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (rc *reqCtx) addChildMetadata(ev *gomatrixserverlib.HeaderedEvent) {
|
|
count, hash := rc.getChildMetadata(ev.EventID())
|
|
if count == 0 {
|
|
return
|
|
}
|
|
err := ev.SetUnsignedField("children_hash", gomatrixserverlib.Base64Bytes(hash))
|
|
if err != nil {
|
|
util.GetLogger(rc.ctx).WithError(err).Warn("Failed to set children_hash")
|
|
}
|
|
err = ev.SetUnsignedField("children", map[string]int{
|
|
constRelType: count,
|
|
})
|
|
if err != nil {
|
|
util.GetLogger(rc.ctx).WithError(err).Warn("Failed to set children count")
|
|
}
|
|
}
|
|
|
|
func (rc *reqCtx) getChildMetadata(eventID string) (count int, hash []byte) {
|
|
children, err := rc.db.ChildrenForParent(rc.ctx, eventID, constRelType, false)
|
|
if err != nil {
|
|
util.GetLogger(rc.ctx).WithError(err).Warn("Failed to get ChildrenForParent for getting child metadata")
|
|
return
|
|
}
|
|
if len(children) == 0 {
|
|
return
|
|
}
|
|
// sort it lexiographically
|
|
sort.Slice(children, func(i, j int) bool {
|
|
return children[i].EventID < children[j].EventID
|
|
})
|
|
// hash it
|
|
var eventIDs strings.Builder
|
|
for _, c := range children {
|
|
_, _ = eventIDs.WriteString(c.EventID)
|
|
}
|
|
hashValBytes := sha256.Sum256([]byte(eventIDs.String()))
|
|
|
|
count = len(children)
|
|
hash = hashValBytes[:]
|
|
return
|
|
}
|
|
|
|
// hasUnexploredChildren returns true if this event has unexplored children.
|
|
// "An event has unexplored children if the `unsigned` child count on the parent does not match
|
|
// how many children the server believes the parent to have. In addition, if the counts match but
|
|
// the hashes do not match, then the event is unexplored."
|
|
func (rc *reqCtx) hasUnexploredChildren(eventID string) bool {
|
|
if rc.isFederatedRequest {
|
|
return false // we only explore children for clients, not servers.
|
|
}
|
|
// extract largest child count from event
|
|
eventCount, eventHash, explored, err := rc.db.ChildMetadata(rc.ctx, eventID)
|
|
if err != nil {
|
|
util.GetLogger(rc.ctx).WithError(err).WithField("event_id", eventID).Warn(
|
|
"failed to get ChildMetadata from db",
|
|
)
|
|
return false
|
|
}
|
|
// if there are no recorded children then we know we have >= children.
|
|
// if the event has already been explored (read: we hit /event_relationships successfully)
|
|
// then don't do it again. We'll only re-do this if we get an even bigger children count,
|
|
// see Database.UpdateChildMetadata
|
|
if eventCount == 0 || explored {
|
|
return false // short-circuit
|
|
}
|
|
|
|
// calculate child count for event
|
|
calcCount, calcHash := rc.getChildMetadata(eventID)
|
|
|
|
if eventCount < calcCount {
|
|
return false // we have more children
|
|
} else if eventCount > calcCount {
|
|
return true // the event has more children than we know about
|
|
}
|
|
// we have the same count, so a mismatched hash means some children are different
|
|
return !bytes.Equal(eventHash, calcHash)
|
|
}
|
|
|
|
type walkInfo struct {
|
|
eventInfo
|
|
SiblingNumber int
|
|
Depth int
|
|
}
|
|
|
|
type walker struct {
|
|
ctx context.Context
|
|
req *EventRelationshipRequest
|
|
db Database
|
|
fn func(wi *walkInfo) bool // callback invoked for each event walked, return true to terminate the walk
|
|
}
|
|
|
|
// WalkFrom the event ID given
|
|
func (w *walker) WalkFrom(eventID string) (limited bool, err error) {
|
|
children, err := w.childrenForParent(eventID)
|
|
if err != nil {
|
|
util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() childrenForParent failed, cannot walk")
|
|
return false, err
|
|
}
|
|
var next *walkInfo
|
|
toWalk := w.addChildren(nil, children, 1)
|
|
next, toWalk = w.nextChild(toWalk)
|
|
for next != nil {
|
|
stop := w.fn(next)
|
|
if stop {
|
|
return true, nil
|
|
}
|
|
// find the children's children
|
|
children, err = w.childrenForParent(next.EventID)
|
|
if err != nil {
|
|
util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() childrenForParent failed, cannot walk")
|
|
return false, err
|
|
}
|
|
toWalk = w.addChildren(toWalk, children, next.Depth+1)
|
|
next, toWalk = w.nextChild(toWalk)
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// addChildren adds an event's children to the to walk data structure
|
|
func (w *walker) addChildren(toWalk []walkInfo, children []eventInfo, depthOfChildren int) []walkInfo {
|
|
// Check what number child this event is (ordered by recent_first) compared to its parent, does it exceed (greater than) max_breadth? If yes, skip.
|
|
if len(children) > w.req.MaxBreadth {
|
|
children = children[:w.req.MaxBreadth]
|
|
}
|
|
// Check how deep the event is compared to event_id, does it exceed (greater than) max_depth? If yes, skip.
|
|
if depthOfChildren > w.req.MaxDepth {
|
|
return toWalk
|
|
}
|
|
|
|
if w.req.DepthFirst {
|
|
// the slice is a stack so push them in reverse order so we pop them in the correct order
|
|
// e.g [3,2,1] => [3,2] , 1 => [3] , 2 => [] , 3
|
|
for i := len(children) - 1; i >= 0; i-- {
|
|
toWalk = append(toWalk, walkInfo{
|
|
eventInfo: children[i],
|
|
SiblingNumber: i + 1, // index from 1
|
|
Depth: depthOfChildren,
|
|
})
|
|
}
|
|
} else {
|
|
// the slice is a queue so push them in normal order to we dequeue them in the correct order
|
|
// e.g [1,2,3] => 1, [2, 3] => 2 , [3] => 3, []
|
|
for i := range children {
|
|
toWalk = append(toWalk, walkInfo{
|
|
eventInfo: children[i],
|
|
SiblingNumber: i + 1, // index from 1
|
|
Depth: depthOfChildren,
|
|
})
|
|
}
|
|
}
|
|
return toWalk
|
|
}
|
|
|
|
func (w *walker) nextChild(toWalk []walkInfo) (*walkInfo, []walkInfo) {
|
|
if len(toWalk) == 0 {
|
|
return nil, nil
|
|
}
|
|
var child walkInfo
|
|
if w.req.DepthFirst {
|
|
// toWalk is a stack so pop the child off
|
|
child, toWalk = toWalk[len(toWalk)-1], toWalk[:len(toWalk)-1]
|
|
return &child, toWalk
|
|
}
|
|
// toWalk is a queue so shift the child off
|
|
child, toWalk = toWalk[0], toWalk[1:]
|
|
return &child, toWalk
|
|
}
|
|
|
|
// childrenForParent returns the children events for this event ID, honouring the direction: up|down flags
|
|
// meaning this can actually be returning the parent for the event instead of the children.
|
|
func (w *walker) childrenForParent(eventID string) ([]eventInfo, error) {
|
|
if w.req.Direction == "down" {
|
|
return w.db.ChildrenForParent(w.ctx, eventID, constRelType, w.req.RecentFirst)
|
|
}
|
|
// find the event to pull out the parent
|
|
ei, err := w.db.ParentForChild(w.ctx, eventID, constRelType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if ei != nil {
|
|
return []eventInfo{*ei}, nil
|
|
}
|
|
return nil, nil
|
|
}
|