diff --git a/.travis.yml b/.travis.yml index 37d0d52a..1bd717f7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,9 +17,9 @@ install: - go get github.com/constabulary/gb/... - go get github.com/golang/lint/golint - go get github.com/fzipp/gocyclo - - ./travis-install-kafka.sh script: + - ./travis-install-kafka.sh - ./travis-test.sh notifications: diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/query.go b/src/github.com/matrix-org/dendrite/roomserver/api/query.go index 6f38da39..14afb3e8 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/query.go @@ -24,7 +24,7 @@ import ( // QueryLatestEventsAndStateRequest is a request to QueryLatestEventsAndState type QueryLatestEventsAndStateRequest struct { - // The roomID to query the latest events for. + // The room ID to query the latest events for. RoomID string // The state key tuples to fetch from the room current state. // If this list is empty or nil then no state events are returned. @@ -44,6 +44,30 @@ type QueryLatestEventsAndStateResponse struct { StateEvents []gomatrixserverlib.Event } +// QueryStateAfterEventsRequest is a request to QueryStateAfterEvents +type QueryStateAfterEventsRequest struct { + // The room ID to query the state in. + RoomID string + // The list of previous events to return the events after. + PrevEventIDs []string + // The state key tuples to fetch from the state + StateToFetch []gomatrixserverlib.StateKeyTuple +} + +// QueryStateAfterEventsResponse is a response to QueryStateAfterEvents +type QueryStateAfterEventsResponse struct { + // Copy of the request for debugging. + QueryStateAfterEventsRequest + // Does the room exist on this roomserver? + // If the room doesn't exist this will be false and StateEvents will be empty. + RoomExists bool + // Do all the previous events exist on this roomserver? + // If some of previous events do not exist this will be false and StateEvents will be empty. + PrevEventsExist bool + // The state events requested. + StateEvents []gomatrixserverlib.Event +} + // RoomserverQueryAPI is used to query information from the room server. type RoomserverQueryAPI interface { // Query the latest events and state for a room from the room server. @@ -51,11 +75,20 @@ type RoomserverQueryAPI interface { request *QueryLatestEventsAndStateRequest, response *QueryLatestEventsAndStateResponse, ) error + + // Query the state after a list of events in a room from the room server. + QueryStateAfterEvents( + request *QueryStateAfterEventsRequest, + response *QueryStateAfterEventsResponse, + ) error } // RoomserverQueryLatestEventsAndStatePath is the HTTP path for the QueryLatestEventsAndState API. const RoomserverQueryLatestEventsAndStatePath = "/api/roomserver/QueryLatestEventsAndState" +// RoomserverQueryStateAfterEventsPath is the HTTP path for the QueryStateAfterEvents API. +const RoomserverQueryStateAfterEventsPath = "/api/roomserver/QueryStateAfterEvents" + // NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API. // If httpClient is nil then it uses the http.DefaultClient func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverQueryAPI { @@ -79,6 +112,15 @@ func (h *httpRoomserverQueryAPI) QueryLatestEventsAndState( return postJSON(h.httpClient, apiURL, request, response) } +// QueryStateAfterEvents implements RoomserverQueryAPI +func (h *httpRoomserverQueryAPI) QueryStateAfterEvents( + request *QueryStateAfterEventsRequest, + response *QueryStateAfterEventsResponse, +) error { + apiURL := h.roomserverURL + RoomserverQueryStateAfterEventsPath + return postJSON(h.httpClient, apiURL, request, response) +} + func postJSON(httpClient http.Client, apiURL string, request, response interface{}) error { jsonBytes, err := json.Marshal(request) if err != nil { diff --git a/src/github.com/matrix-org/dendrite/roomserver/query/query.go b/src/github.com/matrix-org/dendrite/roomserver/query/query.go index 35db3bcb..fedc9870 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/query/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/query/query.go @@ -16,12 +16,12 @@ package query import ( "encoding/json" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - "github.com/prometheus/client_golang/prometheus" "net/http" ) @@ -68,6 +68,54 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState( return err } + stateEvents, err := r.loadStateEvents(stateEntries) + if err != nil { + return err + } + + response.StateEvents = stateEvents + return nil +} + +// QueryStateAfterEvents implements api.RoomserverQueryAPI +func (r *RoomserverQueryAPI) QueryStateAfterEvents( + request *api.QueryStateAfterEventsRequest, + response *api.QueryStateAfterEventsResponse, +) (err error) { + response.QueryStateAfterEventsRequest = *request + roomNID, err := r.DB.RoomNID(request.RoomID) + if err != nil { + return err + } + if roomNID == 0 { + return nil + } + response.RoomExists = true + + prevStates, err := r.DB.StateAtEventIDs(request.PrevEventIDs) + if err != nil { + // TODO: Check if the error was because we are missing events from the + // database or are missing state at events from the database. + return err + } + response.PrevEventsExist = true + + // Lookup the currrent state for the requested tuples. + stateEntries, err := state.LoadStateAfterEventsForStringTuples(r.DB, prevStates, request.StateToFetch) + if err != nil { + return err + } + + stateEvents, err := r.loadStateEvents(stateEntries) + if err != nil { + return err + } + + response.StateEvents = stateEvents + return nil +} + +func (r *RoomserverQueryAPI) loadStateEvents(stateEntries []types.StateEntry) ([]gomatrixserverlib.Event, error) { eventNIDs := make([]types.EventNID, len(stateEntries)) for i := range stateEntries { eventNIDs[i] = stateEntries[i].EventNID @@ -75,21 +123,21 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState( stateEvents, err := r.DB.Events(eventNIDs) if err != nil { - return err + return nil, err } - response.StateEvents = make([]gomatrixserverlib.Event, len(stateEvents)) + result := make([]gomatrixserverlib.Event, len(stateEvents)) for i := range stateEvents { - response.StateEvents[i] = stateEvents[i].Event + result[i] = stateEvents[i].Event } - return nil + return result, nil } // SetupHTTP adds the RoomserverQueryAPI handlers to the http.ServeMux. func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { servMux.Handle( api.RoomserverQueryLatestEventsAndStatePath, - makeAPI("query_latest_events_and_state", func(req *http.Request) util.JSONResponse { + common.MakeAPI("query_latest_events_and_state", func(req *http.Request) util.JSONResponse { var request api.QueryLatestEventsAndStateRequest var response api.QueryLatestEventsAndStateResponse if err := json.NewDecoder(req.Body).Decode(&request); err != nil { @@ -101,8 +149,18 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { return util.JSONResponse{Code: 200, JSON: &response} }), ) -} - -func makeAPI(metric string, apiFunc func(req *http.Request) util.JSONResponse) http.Handler { - return prometheus.InstrumentHandler(metric, util.MakeJSONAPI(util.NewJSONRequestHandler(apiFunc))) + servMux.Handle( + api.RoomserverQueryStateAfterEventsPath, + common.MakeAPI("query_state_after_events", func(req *http.Request) util.JSONResponse { + var request api.QueryStateAfterEventsRequest + var response api.QueryStateAfterEventsResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.ErrorResponse(err) + } + if err := r.QueryStateAfterEvents(&request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: 200, JSON: &response} + }), + ) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/state/state.go b/src/github.com/matrix-org/dendrite/roomserver/state/state.go index e9657fd9..6b49aba6 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/state/state.go +++ b/src/github.com/matrix-org/dendrite/roomserver/state/state.go @@ -316,6 +316,82 @@ func loadStateAtSnapshotForNumericTuples( return fullState, nil } +// LoadStateAfterEventsForStringTuples loads the state for a list of event type +// and state key pairs after list of events. +// This is used when we only want to load a subset of the room state after a list of events. +// If there is no entry for a given event type and state key pair then it will be discarded. +// This is typically the state before an event. +// Returns a sorted list of state entries or an error if there was a problem talking to the database. +func LoadStateAfterEventsForStringTuples( + db RoomStateDatabase, prevStates []types.StateAtEvent, stateKeyTuples []gomatrixserverlib.StateKeyTuple, +) ([]types.StateEntry, error) { + numericTuples, err := stringTuplesToNumericTuples(db, stateKeyTuples) + if err != nil { + return nil, err + } + return loadStateAfterEventsForNumericTuples(db, prevStates, numericTuples) +} + +func loadStateAfterEventsForNumericTuples( + db RoomStateDatabase, prevStates []types.StateAtEvent, stateKeyTuples []types.StateKeyTuple, +) ([]types.StateEntry, error) { + if len(prevStates) == 1 { + // Fast path for a single event. + prevState := prevStates[0] + result, err := loadStateAtSnapshotForNumericTuples( + db, prevState.BeforeStateSnapshotNID, stateKeyTuples, + ) + if err != nil { + return nil, err + } + if prevState.IsStateEvent() { + // The result is current the state before the requested event. + // We want the state after the requested event. + // If the requested event was a state event then we need to + // update that key in the result. + // If the requested event wasn't a state event then the state after + // it is the same as the state before it. + for i := range result { + if result[i].StateKeyTuple == prevState.StateKeyTuple { + result[i] = prevState.StateEntry + } + } + } + return result, nil + } + + // Slow path for more that one event. + // Load the entire state so that we can do conflict resolution if we need to. + // TODO: The are some optimistations we could do here: + // 1) We only need to do conflict resolution if there is a conflict in the + // requested tuples so we might try loading just those tuples and then + // checking for conflicts. + // 2) When there is a conflict we still only need to load the state + // needed to do conflict resolution which would save us having to load + // the full state. + + // TODO: Add metrics for this as it could take a long time for big rooms + // with large conflicts. + fullState, _, _, err := calculateStateAfterManyEvents(db, prevStates) + if err != nil { + return nil, err + } + + // Sort the full state so we can use it as a map. + sort.Sort(stateEntrySorter(fullState)) + + // Filter the full state down to the required tuples. + var result []types.StateEntry + for _, tuple := range stateKeyTuples { + eventNID, ok := stateEntryMap(fullState).lookup(tuple) + if ok { + result = append(result, types.StateEntry{tuple, eventNID}) + } + } + sort.Sort(stateEntrySorter(result)) + return result, nil +} + var calculateStateDurations = prometheus.NewSummaryVec( prometheus.SummaryOpts{ Namespace: "dendrite", @@ -491,12 +567,30 @@ const maxStateBlockNIDs = 64 func calculateAndStoreStateAfterManyEvents( db RoomStateDatabase, roomNID types.RoomNID, prevStates []types.StateAtEvent, metrics calculateStateMetrics, ) (types.StateSnapshotNID, error) { + + state, algorithm, conflictLength, err := calculateStateAfterManyEvents(db, prevStates) + metrics.algorithm = algorithm + if err != nil { + return metrics.stop(0, err) + } + + // TODO: Check if we can encode the new state as a delta against the + // previous state. + metrics.conflictLength = conflictLength + metrics.fullStateLength = len(state) + return metrics.stop(db.AddState(roomNID, nil, state)) +} + +func calculateStateAfterManyEvents( + db RoomStateDatabase, prevStates []types.StateAtEvent, +) (state []types.StateEntry, algorithm string, conflictLength int, err error) { + var combined []types.StateEntry // Conflict resolution. // First stage: load the state after each of the prev events. - combined, err := LoadCombinedStateAfterEvents(db, prevStates) + combined, err = LoadCombinedStateAfterEvents(db, prevStates) if err != nil { - metrics.algorithm = "_load_combined_state" - return metrics.stop(0, err) + algorithm = "_load_combined_state" + return } // Collect all the entries with the same type and key together. @@ -508,9 +602,8 @@ func calculateAndStoreStateAfterManyEvents( // Find the conflicts conflicts := findDuplicateStateKeys(combined) - var state []types.StateEntry if len(conflicts) > 0 { - metrics.conflictLength = len(conflicts) + conflictLength = len(conflicts) // 5) There are conflicting state events, for each conflict workout // what the appropriate state event is. @@ -523,23 +616,20 @@ func calculateAndStoreStateAfterManyEvents( } } - resolved, err := resolveConflicts(db, notConflicted, conflicts) + var resolved []types.StateEntry + resolved, err = resolveConflicts(db, notConflicted, conflicts) if err != nil { - metrics.algorithm = "_resolve_conflicts" - return metrics.stop(0, err) + algorithm = "_resolve_conflicts" + return } - metrics.algorithm = "full_state_with_conflicts" + algorithm = "full_state_with_conflicts" state = resolved } else { - metrics.algorithm = "full_state_no_conflicts" + algorithm = "full_state_no_conflicts" // 6) There weren't any conflicts state = combined } - metrics.fullStateLength = len(state) - - // TODO: Check if we can encode the new state as a delta against the - // previous state. - return metrics.stop(db.AddState(roomNID, nil, state)) + return } // resolveConflicts resolves a list of conflicted state entries. It takes two lists.