2020-01-23 17:51:10 +00:00
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"context"
2020-03-06 14:31:12 +00:00
"fmt"
2020-01-23 17:51:10 +00:00
"net/http"
"sort"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
2022-04-22 10:38:29 +01:00
"github.com/matrix-org/dendrite/internal/caching"
2020-01-23 17:51:10 +00:00
"github.com/matrix-org/dendrite/roomserver/api"
2020-12-02 17:41:00 +00:00
"github.com/matrix-org/dendrite/setup/config"
2020-01-23 17:51:10 +00:00
"github.com/matrix-org/dendrite/syncapi/storage"
2020-12-16 17:31:03 +00:00
"github.com/matrix-org/dendrite/syncapi/sync"
2020-01-23 17:51:10 +00:00
"github.com/matrix-org/dendrite/syncapi/types"
2020-10-02 17:08:13 +01:00
userapi "github.com/matrix-org/dendrite/userapi/api"
2020-01-23 17:51:10 +00:00
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
2020-03-24 12:20:10 +00:00
"github.com/sirupsen/logrus"
2020-01-23 17:51:10 +00:00
)
type messagesReq struct {
ctx context . Context
db storage . Database
2020-05-01 10:48:17 +01:00
rsAPI api . RoomserverInternalAPI
2020-01-23 17:51:10 +00:00
federation * gomatrixserverlib . FederationClient
2020-08-10 14:18:04 +01:00
cfg * config . SyncAPI
2020-01-23 17:51:10 +00:00
roomID string
2020-05-13 12:14:50 +01:00
from * types . TopologyToken
to * types . TopologyToken
2020-10-02 17:08:13 +01:00
device * userapi . Device
2020-01-23 17:51:10 +00:00
wasToProvided bool
backwardOrdering bool
2022-03-01 14:39:56 +00:00
filter * gomatrixserverlib . RoomEventFilter
2020-01-23 17:51:10 +00:00
}
type messagesResp struct {
2020-12-16 18:10:39 +00:00
Start string ` json:"start" `
2022-03-18 10:40:01 +00:00
StartStream string ` json:"start_stream,omitempty" ` // NOTSPEC: used by Cerulean, so clients can hit /messages then immediately /sync with a latest sync token
2020-12-16 18:10:39 +00:00
End string ` json:"end" `
Chunk [ ] gomatrixserverlib . ClientEvent ` json:"chunk" `
2022-03-01 14:39:56 +00:00
State [ ] gomatrixserverlib . ClientEvent ` json:"state" `
2020-01-23 17:51:10 +00:00
}
// OnIncomingMessagesRequest implements the /messages endpoint from the
// client-server API.
// See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages
func OnIncomingMessagesRequest (
2020-10-02 17:08:13 +01:00
req * http . Request , db storage . Database , roomID string , device * userapi . Device ,
2020-01-23 17:51:10 +00:00
federation * gomatrixserverlib . FederationClient ,
2020-05-01 10:48:17 +01:00
rsAPI api . RoomserverInternalAPI ,
2020-08-10 14:18:04 +01:00
cfg * config . SyncAPI ,
2020-12-16 17:31:03 +00:00
srp * sync . RequestPool ,
2022-04-22 10:38:29 +01:00
lazyLoadCache * caching . LazyLoadCache ,
2020-01-23 17:51:10 +00:00
) util . JSONResponse {
var err error
2020-11-05 10:19:23 +00:00
// check if the user has already forgotten about this room
isForgotten , err := checkIsRoomForgotten ( req . Context ( ) , roomID , device . UserID , rsAPI )
if err != nil {
return jsonerror . InternalServerError ( )
}
if isForgotten {
return util . JSONResponse {
Code : http . StatusForbidden ,
JSON : jsonerror . Forbidden ( "user already forgot about this room" ) ,
}
}
2022-03-01 14:39:56 +00:00
filter , err := parseRoomEventFilter ( req )
if err != nil {
return util . JSONResponse {
Code : http . StatusBadRequest ,
JSON : jsonerror . InvalidArgumentValue ( "unable to parse filter" ) ,
}
}
2020-01-23 17:51:10 +00:00
// Extract parameters from the request's URL.
// Pagination tokens.
2020-05-13 12:14:50 +01:00
var fromStream * types . StreamingToken
2020-12-16 17:31:03 +00:00
fromQuery := req . URL . Query ( ) . Get ( "from" )
2022-03-18 10:40:01 +00:00
toQuery := req . URL . Query ( ) . Get ( "to" )
2020-12-16 18:10:39 +00:00
emptyFromSupplied := fromQuery == ""
if emptyFromSupplied {
2020-12-16 17:31:03 +00:00
// NOTSPEC: We will pretend they used the latest sync token if no ?from= was provided.
// We do this to allow clients to get messages without having to call `/sync` e.g Cerulean
currPos := srp . Notifier . CurrentPosition ( )
fromQuery = currPos . String ( )
}
2020-01-23 17:51:10 +00:00
// Direction to return events from.
dir := req . URL . Query ( ) . Get ( "dir" )
if dir != "b" && dir != "f" {
return util . JSONResponse {
Code : http . StatusBadRequest ,
JSON : jsonerror . MissingArgument ( "Bad or missing dir query parameter (should be either 'b' or 'f')" ) ,
}
}
// A boolean is easier to handle in this case, especially since dir is sure
// to have one of the two accepted values (so dir == "f" <=> !backwardOrdering).
backwardOrdering := ( dir == "b" )
2022-03-18 10:40:01 +00:00
from , err := types . NewTopologyTokenFromString ( fromQuery )
if err != nil {
var streamToken types . StreamingToken
if streamToken , err = types . NewStreamTokenFromString ( fromQuery ) ; err != nil {
return util . JSONResponse {
Code : http . StatusBadRequest ,
JSON : jsonerror . InvalidArgumentValue ( "Invalid from parameter: " + err . Error ( ) ) ,
}
} else {
fromStream = & streamToken
from , err = db . StreamToTopologicalPosition ( req . Context ( ) , roomID , streamToken . PDUPosition , backwardOrdering )
if err != nil {
logrus . WithError ( err ) . Errorf ( "Failed to get topological position for streaming token %v" , streamToken )
return jsonerror . InternalServerError ( )
}
}
}
2020-01-23 17:51:10 +00:00
// Pagination tokens. To is optional, and its default value depends on the
// direction ("b" or "f").
2020-05-13 12:14:50 +01:00
var to types . TopologyToken
2020-01-23 17:51:10 +00:00
wasToProvided := true
2022-03-18 10:40:01 +00:00
if len ( toQuery ) > 0 {
to , err = types . NewTopologyTokenFromString ( toQuery )
2020-01-23 17:51:10 +00:00
if err != nil {
2022-03-18 10:40:01 +00:00
var streamToken types . StreamingToken
if streamToken , err = types . NewStreamTokenFromString ( toQuery ) ; err != nil {
return util . JSONResponse {
Code : http . StatusBadRequest ,
JSON : jsonerror . InvalidArgumentValue ( "Invalid to parameter: " + err . Error ( ) ) ,
}
} else {
to , err = db . StreamToTopologicalPosition ( req . Context ( ) , roomID , streamToken . PDUPosition , ! backwardOrdering )
if err != nil {
logrus . WithError ( err ) . Errorf ( "Failed to get topological position for streaming token %v" , streamToken )
return jsonerror . InternalServerError ( )
}
2020-01-23 17:51:10 +00:00
}
}
} else {
// If "to" isn't provided, it defaults to either the earliest stream
// position (if we're going backward) or to the latest one (if we're
// going forward).
to , err = setToDefault ( req . Context ( ) , db , backwardOrdering , roomID )
if err != nil {
2020-03-02 16:20:44 +00:00
util . GetLogger ( req . Context ( ) ) . WithError ( err ) . Error ( "setToDefault failed" )
return jsonerror . InternalServerError ( )
2020-01-23 17:51:10 +00:00
}
wasToProvided = false
}
// TODO: Implement filtering (#587)
// Check the room ID's format.
if _ , _ , err = gomatrixserverlib . SplitID ( '!' , roomID ) ; err != nil {
return util . JSONResponse {
Code : http . StatusBadRequest ,
JSON : jsonerror . MissingArgument ( "Bad room ID: " + err . Error ( ) ) ,
}
}
mReq := messagesReq {
ctx : req . Context ( ) ,
db : db ,
2020-05-01 10:48:17 +01:00
rsAPI : rsAPI ,
2020-01-23 17:51:10 +00:00
federation : federation ,
cfg : cfg ,
roomID : roomID ,
2020-05-13 12:14:50 +01:00
from : & from ,
to : & to ,
2020-01-23 17:51:10 +00:00
wasToProvided : wasToProvided ,
2022-03-01 14:39:56 +00:00
filter : filter ,
2020-01-23 17:51:10 +00:00
backwardOrdering : backwardOrdering ,
2020-10-02 17:08:13 +01:00
device : device ,
2020-01-23 17:51:10 +00:00
}
clientEvents , start , end , err := mReq . retrieveEvents ( )
if err != nil {
2020-03-02 16:20:44 +00:00
util . GetLogger ( req . Context ( ) ) . WithError ( err ) . Error ( "mreq.retrieveEvents failed" )
return jsonerror . InternalServerError ( )
2020-01-23 17:51:10 +00:00
}
2020-06-17 17:41:45 +01:00
2022-03-01 14:39:56 +00:00
// at least fetch the membership events for the users returned in chunk if LazyLoadMembers is set
state := [ ] gomatrixserverlib . ClientEvent { }
if filter . LazyLoadMembers {
2022-03-04 10:24:26 +00:00
membershipToUser := make ( map [ string ] * gomatrixserverlib . HeaderedEvent )
2022-03-01 14:39:56 +00:00
for _ , evt := range clientEvents {
2022-04-22 10:38:29 +01:00
// Don't add membership events the client should already know about
if _ , cached := lazyLoadCache . IsLazyLoadedUserCached ( device , roomID , evt . Sender ) ; cached {
continue
}
2022-03-04 10:24:26 +00:00
membership , err := db . GetStateEvent ( req . Context ( ) , roomID , gomatrixserverlib . MRoomMember , evt . Sender )
2022-03-01 14:39:56 +00:00
if err != nil {
util . GetLogger ( req . Context ( ) ) . WithError ( err ) . Error ( "failed to get membership event for user" )
continue
}
2022-03-04 10:24:26 +00:00
if membership != nil {
membershipToUser [ evt . Sender ] = membership
2022-04-22 10:38:29 +01:00
lazyLoadCache . StoreLazyLoadedUser ( device , roomID , evt . Sender , membership . EventID ( ) )
2022-03-04 10:24:26 +00:00
}
2022-03-01 14:39:56 +00:00
}
2022-03-04 10:24:26 +00:00
for _ , evt := range membershipToUser {
2022-04-22 10:38:29 +01:00
state = append ( state , gomatrixserverlib . HeaderedToClientEvent ( evt , gomatrixserverlib . FormatSync ) )
2022-03-01 14:39:56 +00:00
}
}
2020-03-24 12:20:10 +00:00
util . GetLogger ( req . Context ( ) ) . WithFields ( logrus . Fields {
"from" : from . String ( ) ,
"to" : to . String ( ) ,
2022-03-01 14:39:56 +00:00
"limit" : filter . Limit ,
2020-03-24 12:20:10 +00:00
"backwards" : backwardOrdering ,
"return_start" : start . String ( ) ,
"return_end" : end . String ( ) ,
} ) . Info ( "Responding" )
2020-01-23 17:51:10 +00:00
2020-12-16 18:10:39 +00:00
res := messagesResp {
Chunk : clientEvents ,
Start : start . String ( ) ,
End : end . String ( ) ,
2022-03-01 14:39:56 +00:00
State : state ,
2020-12-16 18:10:39 +00:00
}
2022-03-18 10:40:01 +00:00
if fromStream != nil {
2020-12-16 18:10:39 +00:00
res . StartStream = fromStream . String ( )
}
2020-01-23 17:51:10 +00:00
// Respond with the events.
return util . JSONResponse {
Code : http . StatusOK ,
2020-12-16 18:10:39 +00:00
JSON : res ,
2020-01-23 17:51:10 +00:00
}
}
2020-11-05 10:19:23 +00:00
func checkIsRoomForgotten ( ctx context . Context , roomID , userID string , rsAPI api . RoomserverInternalAPI ) ( bool , error ) {
req := api . QueryMembershipForUserRequest {
RoomID : roomID ,
UserID : userID ,
}
resp := api . QueryMembershipForUserResponse { }
if err := rsAPI . QueryMembershipForUser ( ctx , & req , & resp ) ; err != nil {
return false , err
}
return resp . IsRoomForgotten , nil
}
2020-09-27 22:23:42 +01:00
// retrieveEvents retrieves events from the local database for a request on
2020-01-23 17:51:10 +00:00
// /messages. If there's not enough events to retrieve, it asks another
// homeserver in the room for older events.
// Returns an error if there was an issue talking to the database or with the
// remote homeserver.
func ( r * messagesReq ) retrieveEvents ( ) (
clientEvents [ ] gomatrixserverlib . ClientEvent , start ,
2020-05-13 12:14:50 +01:00
end types . TopologyToken , err error ,
2020-01-23 17:51:10 +00:00
) {
// Retrieve the events from the local database.
2022-04-13 12:16:02 +01:00
streamEvents , err := r . db . GetEventsInTopologicalRange ( r . ctx , r . from , r . to , r . roomID , r . filter , r . backwardOrdering )
2020-01-23 17:51:10 +00:00
if err != nil {
2020-03-18 12:48:51 +00:00
err = fmt . Errorf ( "GetEventsInRange: %w" , err )
2020-01-23 17:51:10 +00:00
return
}
2020-11-16 15:44:53 +00:00
var events [ ] * gomatrixserverlib . HeaderedEvent
2020-05-20 16:04:31 +01:00
util . GetLogger ( r . ctx ) . WithField ( "start" , start ) . WithField ( "end" , end ) . Infof ( "Fetched %d events locally" , len ( streamEvents ) )
2020-01-23 17:51:10 +00:00
// There can be two reasons for streamEvents to be empty: either we've
// reached the oldest event in the room (or the most recent one, depending
// on the ordering), or we've reached a backward extremity.
if len ( streamEvents ) == 0 {
if events , err = r . handleEmptyEventsSlice ( ) ; err != nil {
return
}
} else {
if events , err = r . handleNonEmptyEventsSlice ( streamEvents ) ; err != nil {
return
}
}
// If we didn't get any event, we don't need to proceed any further.
if len ( events ) == 0 {
2020-05-13 12:14:50 +01:00
return [ ] gomatrixserverlib . ClientEvent { } , * r . from , * r . to , nil
2020-01-23 17:51:10 +00:00
}
2021-01-13 12:59:29 +00:00
// Get the position of the first and the last event in the room's topology.
// This position is currently determined by the event's depth, so we could
// also use it instead of retrieving from the database. However, if we ever
// change the way topological positions are defined (as depth isn't the most
// reliable way to define it), it would be easier and less troublesome to
// only have to change it in one place, i.e. the database.
start , end , err = r . getStartEnd ( events )
2020-04-15 16:10:18 +01:00
// Sort the events to ensure we send them in the right order.
2020-01-23 17:51:10 +00:00
if r . backwardOrdering {
2020-04-15 16:10:18 +01:00
// This reverses the array from old->new to new->old
2020-11-16 15:44:53 +00:00
reversed := func ( in [ ] * gomatrixserverlib . HeaderedEvent ) [ ] * gomatrixserverlib . HeaderedEvent {
out := make ( [ ] * gomatrixserverlib . HeaderedEvent , len ( in ) )
2020-05-01 16:41:13 +01:00
for i := 0 ; i < len ( in ) ; i ++ {
out [ i ] = in [ len ( in ) - i - 1 ]
}
return out
}
events = reversed ( events )
2020-01-23 17:51:10 +00:00
}
2020-10-02 17:08:13 +01:00
events = r . filterHistoryVisible ( events )
if len ( events ) == 0 {
return [ ] gomatrixserverlib . ClientEvent { } , * r . from , * r . to , nil
}
2020-01-23 17:51:10 +00:00
// Convert all of the events into client events.
2020-03-19 12:07:01 +00:00
clientEvents = gomatrixserverlib . HeaderedToClientEvents ( events , gomatrixserverlib . FormatAll )
2020-06-17 17:41:45 +01:00
return clientEvents , start , end , err
}
2020-11-16 15:44:53 +00:00
func ( r * messagesReq ) filterHistoryVisible ( events [ ] * gomatrixserverlib . HeaderedEvent ) [ ] * gomatrixserverlib . HeaderedEvent {
2020-10-02 17:08:13 +01:00
// TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the
// user shouldn't see, we check the recent events and remove any prior to the join event of the user
// which is equiv to history_visibility: joined
joinEventIndex := - 1
for i , ev := range events {
if ev . Type ( ) == gomatrixserverlib . MRoomMember && ev . StateKeyEquals ( r . device . UserID ) {
membership , _ := ev . Membership ( )
if membership == "join" {
joinEventIndex = i
break
}
}
}
2020-11-16 15:44:53 +00:00
var result [ ] * gomatrixserverlib . HeaderedEvent
var eventsToCheck [ ] * gomatrixserverlib . HeaderedEvent
2020-10-02 17:08:13 +01:00
if joinEventIndex != - 1 {
if r . backwardOrdering {
result = events [ : joinEventIndex + 1 ]
eventsToCheck = append ( eventsToCheck , result [ 0 ] )
} else {
result = events [ joinEventIndex : ]
eventsToCheck = append ( eventsToCheck , result [ len ( result ) - 1 ] )
}
} else {
2020-11-16 15:44:53 +00:00
eventsToCheck = [ ] * gomatrixserverlib . HeaderedEvent { events [ 0 ] , events [ len ( events ) - 1 ] }
2020-10-02 17:08:13 +01:00
result = events
}
// make sure the user was in the room for both the earliest and latest events, we need this because
// some backpagination results will not have the join event (e.g if they hit /messages at the join event itself)
wasJoined := true
for _ , ev := range eventsToCheck {
var queryRes api . QueryStateAfterEventsResponse
err := r . rsAPI . QueryStateAfterEvents ( r . ctx , & api . QueryStateAfterEventsRequest {
RoomID : ev . RoomID ( ) ,
PrevEventIDs : ev . PrevEventIDs ( ) ,
StateToFetch : [ ] gomatrixserverlib . StateKeyTuple {
{ EventType : gomatrixserverlib . MRoomMember , StateKey : r . device . UserID } ,
{ EventType : gomatrixserverlib . MRoomHistoryVisibility , StateKey : "" } ,
} ,
} , & queryRes )
if err != nil {
wasJoined = false
break
}
var hisVisEvent , membershipEvent * gomatrixserverlib . HeaderedEvent
for i := range queryRes . StateEvents {
switch queryRes . StateEvents [ i ] . Type ( ) {
case gomatrixserverlib . MRoomMember :
2020-11-16 15:44:53 +00:00
membershipEvent = queryRes . StateEvents [ i ]
2020-10-02 17:08:13 +01:00
case gomatrixserverlib . MRoomHistoryVisibility :
2020-11-16 15:44:53 +00:00
hisVisEvent = queryRes . StateEvents [ i ]
2020-10-02 17:08:13 +01:00
}
}
if hisVisEvent == nil {
return events // apply no filtering as it defaults to Shared.
}
hisVis , _ := hisVisEvent . HistoryVisibility ( )
2021-01-13 12:59:29 +00:00
if hisVis == "shared" || hisVis == "world_readable" {
2020-10-02 17:08:13 +01:00
return events // apply no filtering
}
if membershipEvent == nil {
wasJoined = false
break
}
membership , err := membershipEvent . Membership ( )
if err != nil {
wasJoined = false
break
}
if membership != "join" {
wasJoined = false
break
}
}
if ! wasJoined {
util . GetLogger ( r . ctx ) . WithField ( "num_events" , len ( events ) ) . Warnf ( "%s was not joined to room during these events, omitting them" , r . device . UserID )
2020-11-16 15:44:53 +00:00
return [ ] * gomatrixserverlib . HeaderedEvent { }
2020-10-02 17:08:13 +01:00
}
return result
}
2020-11-16 15:44:53 +00:00
func ( r * messagesReq ) getStartEnd ( events [ ] * gomatrixserverlib . HeaderedEvent ) ( start , end types . TopologyToken , err error ) {
2021-01-13 12:59:29 +00:00
if r . backwardOrdering {
start = * r . from
if events [ len ( events ) - 1 ] . Type ( ) == gomatrixserverlib . MRoomCreate {
// NOTSPEC: We've hit the beginning of the room so there's really nowhere
2021-07-20 09:45:40 +01:00
// else to go. This seems to fix Element iOS from looping on /messages endlessly.
2021-01-13 12:59:29 +00:00
end = types . TopologyToken { }
} else {
end , err = r . db . EventPositionInTopology (
r . ctx , events [ 0 ] . EventID ( ) ,
)
2020-06-17 17:41:45 +01:00
// A stream/topological position is a cursor located between two events.
// While they are identified in the code by the event on their right (if
// we consider a left to right chronological order), tokens need to refer
// to them by the event on their left, therefore we need to decrement the
// end position we send in the response if we're going backward.
end . Decrement ( )
}
2021-01-13 12:59:29 +00:00
} else {
start = * r . from
end , err = r . db . EventPositionInTopology (
r . ctx , events [ len ( events ) - 1 ] . EventID ( ) ,
)
}
if err != nil {
err = fmt . Errorf ( "EventPositionInTopology: for end event %s: %w" , events [ len ( events ) - 1 ] . EventID ( ) , err )
return
2020-01-23 17:51:10 +00:00
}
2020-06-17 17:41:45 +01:00
return
2020-01-23 17:51:10 +00:00
}
// handleEmptyEventsSlice handles the case where the initial request to the
// database returned an empty slice of events. It does so by checking whether
// the set is empty because we've reached a backward extremity, and if that is
// the case, by retrieving as much events as requested by backfilling from
// another homeserver.
// Returns an error if there was an issue talking with the database or
// backfilling.
func ( r * messagesReq ) handleEmptyEventsSlice ( ) (
2020-11-16 15:44:53 +00:00
events [ ] * gomatrixserverlib . HeaderedEvent , err error ,
2020-01-23 17:51:10 +00:00
) {
backwardExtremities , err := r . db . BackwardExtremitiesForRoom ( r . ctx , r . roomID )
// Check if we have backward extremities for this room.
if len ( backwardExtremities ) > 0 {
// If so, retrieve as much events as needed through backfilling.
2022-03-01 14:39:56 +00:00
events , err = r . backfill ( r . roomID , backwardExtremities , r . filter . Limit )
2020-01-23 17:51:10 +00:00
if err != nil {
return
}
} else {
// If not, it means the slice was empty because we reached the room's
// creation, so return an empty slice.
2020-11-16 15:44:53 +00:00
events = [ ] * gomatrixserverlib . HeaderedEvent { }
2020-01-23 17:51:10 +00:00
}
return
}
// handleNonEmptyEventsSlice handles the case where the initial request to the
// database returned a non-empty slice of events. It does so by checking whether
// events are missing from the expected result, and retrieve missing events
// through backfilling if needed.
// Returns an error if there was an issue while backfilling.
func ( r * messagesReq ) handleNonEmptyEventsSlice ( streamEvents [ ] types . StreamEvent ) (
2020-11-16 15:44:53 +00:00
events [ ] * gomatrixserverlib . HeaderedEvent , err error ,
2020-01-23 17:51:10 +00:00
) {
// Check if we have enough events.
2022-03-01 14:39:56 +00:00
isSetLargeEnough := len ( streamEvents ) >= r . filter . Limit
2020-03-24 12:20:10 +00:00
if ! isSetLargeEnough {
// it might be fine we don't have up to 'limit' events, let's find out
2020-01-23 17:51:10 +00:00
if r . backwardOrdering {
if r . wasToProvided {
// The condition in the SQL query is a strict "greater than" so
// we need to check against to-1.
streamPos := types . StreamPosition ( streamEvents [ len ( streamEvents ) - 1 ] . StreamPosition )
2020-12-10 18:57:10 +00:00
isSetLargeEnough = ( r . to . PDUPosition - 1 == streamPos )
2020-01-23 17:51:10 +00:00
}
} else {
streamPos := types . StreamPosition ( streamEvents [ 0 ] . StreamPosition )
2020-12-10 18:57:10 +00:00
isSetLargeEnough = ( r . from . PDUPosition - 1 == streamPos )
2020-01-23 17:51:10 +00:00
}
}
// Check if the slice contains a backward extremity.
backwardExtremities , err := r . db . BackwardExtremitiesForRoom ( r . ctx , r . roomID )
if err != nil {
return
}
// Backfill is needed if we've reached a backward extremity and need more
// events. It's only needed if the direction is backward.
if len ( backwardExtremities ) > 0 && ! isSetLargeEnough && r . backwardOrdering {
2020-11-16 15:44:53 +00:00
var pdus [ ] * gomatrixserverlib . HeaderedEvent
2020-01-23 17:51:10 +00:00
// Only ask the remote server for enough events to reach the limit.
2022-03-01 14:39:56 +00:00
pdus , err = r . backfill ( r . roomID , backwardExtremities , r . filter . Limit - len ( streamEvents ) )
2020-01-23 17:51:10 +00:00
if err != nil {
return
}
// Append the PDUs to the list to send back to the client.
events = append ( events , pdus ... )
}
// Append the events ve previously retrieved locally.
events = append ( events , r . db . StreamEventsToEvents ( nil , streamEvents ) ... )
2020-05-01 16:41:13 +01:00
sort . Sort ( eventsByDepth ( events ) )
2020-01-23 17:51:10 +00:00
return
}
2020-11-16 15:44:53 +00:00
type eventsByDepth [ ] * gomatrixserverlib . HeaderedEvent
2020-05-01 16:41:13 +01:00
func ( e eventsByDepth ) Len ( ) int {
return len ( e )
}
func ( e eventsByDepth ) Swap ( i , j int ) {
e [ i ] , e [ j ] = e [ j ] , e [ i ]
}
func ( e eventsByDepth ) Less ( i , j int ) bool {
return e [ i ] . Depth ( ) < e [ j ] . Depth ( )
}
2020-01-23 17:51:10 +00:00
// backfill performs a backfill request over the federation on another
// homeserver in the room.
// See: https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid
// It also stores the PDUs retrieved from the remote homeserver's response to
// the database.
// Returns with an empty string if the remote homeserver didn't return with any
// event, or if there is no remote homeserver to contact.
// Returns an error if there was an issue with retrieving the list of servers in
/ / the room or sending the request .
2020-11-16 15:44:53 +00:00
func ( r * messagesReq ) backfill ( roomID string , backwardsExtremities map [ string ] [ ] string , limit int ) ( [ ] * gomatrixserverlib . HeaderedEvent , error ) {
2020-06-11 19:50:40 +01:00
var res api . PerformBackfillResponse
err := r . rsAPI . PerformBackfill ( context . Background ( ) , & api . PerformBackfillRequest {
2020-05-20 16:04:31 +01:00
RoomID : roomID ,
BackwardsExtremities : backwardsExtremities ,
Limit : limit ,
ServerName : r . cfg . Matrix . ServerName ,
2020-04-28 11:46:47 +01:00
} , & res )
2020-03-24 12:20:10 +00:00
if err != nil {
2020-06-11 19:50:40 +01:00
return nil , fmt . Errorf ( "PerformBackfill failed: %w" , err )
2020-03-24 12:20:10 +00:00
}
2020-04-28 11:46:47 +01:00
util . GetLogger ( r . ctx ) . WithField ( "new_events" , len ( res . Events ) ) . Info ( "Storing new events from backfill" )
2020-03-24 12:20:10 +00:00
2020-04-28 11:46:47 +01:00
// TODO: we should only be inserting events into the database from the roomserver's kafka output stream.
// Currently, this can race with live events for the room and cause problems. It's also just a bit unclear
// when you have multiple entry points to write events.
2020-03-24 12:20:10 +00:00
2020-05-01 16:41:13 +01:00
// we have to order these by depth, starting with the lowest because otherwise the topology tokens
// will skip over events that have the same depth but different stream positions due to the query which is:
// - anything less than the depth OR
// - anything with the same depth and a lower stream position.
sort . Sort ( eventsByDepth ( res . Events ) )
2020-03-24 12:20:10 +00:00
// Store the events in the database, while marking them as unfit to show
// up in responses to sync requests.
2020-04-28 11:46:47 +01:00
for i := range res . Events {
2020-05-01 16:41:13 +01:00
_ , err = r . db . WriteEvent (
2020-10-05 11:06:31 +01:00
context . Background ( ) ,
2020-11-16 15:44:53 +00:00
res . Events [ i ] ,
[ ] * gomatrixserverlib . HeaderedEvent { } ,
2020-03-24 12:20:10 +00:00
[ ] string { } ,
[ ] string { } ,
nil , true ,
2020-05-01 16:41:13 +01:00
)
if err != nil {
2020-03-24 12:20:10 +00:00
return nil , err
}
}
2020-05-19 18:42:55 +01:00
// we may have got more than the requested limit so resize now
events := res . Events
if len ( events ) > limit {
// last `limit` events
events = events [ len ( events ) - limit : ]
}
return events , nil
2020-01-23 17:51:10 +00:00
}
// setToDefault returns the default value for the "to" query parameter of a
// request to /messages if not provided. It defaults to either the earliest
// topological position (if we're going backward) or to the latest one (if we're
// going forward).
// Returns an error if there was an issue with retrieving the latest position
// from the database
func setToDefault (
ctx context . Context , db storage . Database , backwardOrdering bool ,
roomID string ,
2020-05-13 12:14:50 +01:00
) ( to types . TopologyToken , err error ) {
2020-01-23 17:51:10 +00:00
if backwardOrdering {
2020-03-24 12:20:10 +00:00
// go 1 earlier than the first event so we correctly fetch the earliest event
2020-05-14 17:30:16 +01:00
// this is because Database.GetEventsInTopologicalRange is exclusive of the lower-bound.
2020-12-10 18:57:10 +00:00
to = types . TopologyToken { }
2020-01-23 17:51:10 +00:00
} else {
2020-05-14 17:30:16 +01:00
to , err = db . MaxTopologicalPosition ( ctx , roomID )
2020-01-23 17:51:10 +00:00
}
return
}