From 534f9a9eb69ad35c7492c18c65cdbd55892bf930 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 21 Oct 2020 15:37:07 +0100 Subject: [PATCH] Refactor forward extremities (#1556) * Add resolve-state helper * Tweaks * Refactor forward extremities, again * Tweaks * Minor optimisation * Make path a bit clearer * Only process state/membership if forward extremities have changed * Usage comments in resolve-state --- cmd/resolve-state/main.go | 132 ++++++++++++++++++ .../internal/input/input_latest_events.go | 127 ++++++++++------- roomserver/state/state.go | 8 +- 3 files changed, 209 insertions(+), 58 deletions(-) create mode 100644 cmd/resolve-state/main.go diff --git a/cmd/resolve-state/main.go b/cmd/resolve-state/main.go new file mode 100644 index 00000000..9fb14f05 --- /dev/null +++ b/cmd/resolve-state/main.go @@ -0,0 +1,132 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "strconv" + + "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/roomserver/state" + "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/gomatrixserverlib" +) + +// This is a utility for inspecting state snapshots and running state resolution +// against real snapshots in an actual database. +// It takes one or more state snapshot NIDs as arguments, along with a room version +// to use for unmarshalling events, and will produce resolved output. +// +// Usage: ./resolve-state --roomversion=version snapshot [snapshot ...] +// e.g. ./resolve-state --roomversion=5 1254 1235 1282 + +var roomVersion = flag.String("roomversion", "5", "the room version to parse events as") + +// nolint:gocyclo +func main() { + ctx := context.Background() + cfg := setup.ParseFlags(true) + args := os.Args[1:] + + fmt.Println("Room version", *roomVersion) + + snapshotNIDs := []types.StateSnapshotNID{} + for _, arg := range args { + if i, err := strconv.Atoi(arg); err == nil { + snapshotNIDs = append(snapshotNIDs, types.StateSnapshotNID(i)) + } + } + + fmt.Println("Fetching", len(snapshotNIDs), "snapshot NIDs") + + cache, err := caching.NewInMemoryLRUCache(true) + if err != nil { + panic(err) + } + + roomserverDB, err := storage.Open(&cfg.RoomServer.Database, cache) + if err != nil { + panic(err) + } + + blockNIDs, err := roomserverDB.StateBlockNIDs(ctx, snapshotNIDs) + if err != nil { + panic(err) + } + + var stateEntries []types.StateEntryList + for _, list := range blockNIDs { + entries, err2 := roomserverDB.StateEntries(ctx, list.StateBlockNIDs) + if err2 != nil { + panic(err2) + } + stateEntries = append(stateEntries, entries...) + } + + var eventNIDs []types.EventNID + for _, entry := range stateEntries { + for _, e := range entry.StateEntries { + eventNIDs = append(eventNIDs, e.EventNID) + } + } + + fmt.Println("Fetching", len(eventNIDs), "state events") + eventEntries, err := roomserverDB.Events(ctx, eventNIDs) + if err != nil { + panic(err) + } + + authEventIDMap := make(map[string]struct{}) + eventPtrs := make([]*gomatrixserverlib.Event, len(eventEntries)) + for i := range eventEntries { + eventPtrs[i] = &eventEntries[i].Event + for _, authEventID := range eventEntries[i].AuthEventIDs() { + authEventIDMap[authEventID] = struct{}{} + } + } + + authEventIDs := make([]string, 0, len(authEventIDMap)) + for authEventID := range authEventIDMap { + authEventIDs = append(authEventIDs, authEventID) + } + + fmt.Println("Fetching", len(authEventIDs), "auth events") + authEventEntries, err := roomserverDB.EventsFromIDs(ctx, authEventIDs) + if err != nil { + panic(err) + } + + authEventPtrs := make([]*gomatrixserverlib.Event, len(authEventEntries)) + for i := range authEventEntries { + authEventPtrs[i] = &authEventEntries[i].Event + } + + events := make([]gomatrixserverlib.Event, len(eventEntries)) + authEvents := make([]gomatrixserverlib.Event, len(authEventEntries)) + for i, ptr := range eventPtrs { + events[i] = *ptr + } + for i, ptr := range authEventPtrs { + authEvents[i] = *ptr + } + + fmt.Println("Resolving state") + resolved, err := state.ResolveConflictsAdhoc( + gomatrixserverlib.RoomVersion(*roomVersion), + events, + authEvents, + ) + if err != nil { + panic(err) + } + + fmt.Println("Resolved state contains", len(resolved), "events") + for _, event := range resolved { + fmt.Println() + fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey()) + fmt.Printf(" %s\n", string(event.Content())) + } +} diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index 5631959b..a86e901d 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -17,7 +17,6 @@ package input import ( - "bytes" "context" "fmt" @@ -28,7 +27,6 @@ import ( "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - "github.com/sirupsen/logrus" ) // updateLatestEvents updates the list of latest events for this room in the database and writes the @@ -141,27 +139,30 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // Work out what the latest events are. This will include the new // event if it is not already referenced. - if err := u.calculateLatest( - oldLatest, + extremitiesChanged, err := u.calculateLatest( + oldLatest, &u.event, types.StateAtEventAndReference{ EventReference: u.event.EventReference(), StateAtEvent: u.stateAtEvent, }, - ); err != nil { + ) + if err != nil { return fmt.Errorf("u.calculateLatest: %w", err) } // Now that we know what the latest events are, it's time to get the // latest state. - if err := u.latestState(); err != nil { - return fmt.Errorf("u.latestState: %w", err) - } + var updates []api.OutputEvent + if extremitiesChanged { + if err = u.latestState(); err != nil { + return fmt.Errorf("u.latestState: %w", err) + } - // If we need to generate any output events then here's where we do it. - // TODO: Move this! - updates, err := u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added) - if err != nil { - return fmt.Errorf("u.api.updateMemberships: %w", err) + // If we need to generate any output events then here's where we do it. + // TODO: Move this! + if updates, err = u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added); err != nil { + return fmt.Errorf("u.api.updateMemberships: %w", err) + } } update, err := u.makeOutputNewRoomEvent() @@ -250,50 +251,74 @@ func (u *latestEventsUpdater) latestState() error { // true if the new event is included in those extremites, false otherwise. func (u *latestEventsUpdater) calculateLatest( oldLatest []types.StateAtEventAndReference, - newEvent types.StateAtEventAndReference, -) error { - var newLatest []types.StateAtEventAndReference - - // First of all, let's see if any of the existing forward extremities - // now have entries in the previous events table. If they do then we - // will no longer include them as forward extremities. - for _, l := range oldLatest { - referenced, err := u.updater.IsReferenced(l.EventReference) - if err != nil { - logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", l.EventID) - return fmt.Errorf("u.updater.IsReferenced (old): %w", err) - } else if !referenced { - newLatest = append(newLatest, l) - } + newEvent *gomatrixserverlib.Event, + newStateAndRef types.StateAtEventAndReference, +) (bool, error) { + // First of all, get a list of all of the events in our current + // set of forward extremities. + existingRefs := make(map[string]*types.StateAtEventAndReference) + existingNIDs := make([]types.EventNID, len(oldLatest)) + for i, old := range oldLatest { + existingRefs[old.EventID] = &oldLatest[i] + existingNIDs[i] = old.EventNID } - // Then check and see if our new event is already included in that set. - // This ordinarily won't happen but it covers the edge-case that we've - // already seen this event before and it's a forward extremity, so rather - // than adding a duplicate, we'll just return the set as complete. - for _, l := range newLatest { - if l.EventReference.EventID == newEvent.EventReference.EventID && bytes.Equal(l.EventReference.EventSHA256, newEvent.EventReference.EventSHA256) { - // We've already referenced this new event so we can just return - // the newly completed extremities at this point. - u.latest = newLatest - return nil - } - } - - // At this point we've processed the old extremities, and we've checked - // that our new event isn't already in that set. Therefore now we can - // check if our *new* event is a forward extremity, and if it is, add - // it in. - referenced, err := u.updater.IsReferenced(newEvent.EventReference) + // Look up the old extremity events. This allows us to find their + // prev events. + events, err := u.api.DB.Events(u.ctx, existingNIDs) if err != nil { - logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", newEvent.EventReference.EventID) - return fmt.Errorf("u.updater.IsReferenced (new): %w", err) - } else if !referenced || len(newLatest) == 0 { - newLatest = append(newLatest, newEvent) + return false, fmt.Errorf("u.api.DB.Events: %w", err) + } + + // Make a list of all of the prev events as referenced by all of + // the current forward extremities. + existingPrevs := make(map[string]struct{}) + for _, old := range events { + for _, prevEventID := range old.PrevEventIDs() { + existingPrevs[prevEventID] = struct{}{} + } + } + + // If the "new" event is already referenced by a forward extremity + // then do nothing - it's not a candidate to be a new extremity if + // it has been referenced. + if _, ok := existingPrevs[newEvent.EventID()]; ok { + return false, nil + } + + // If the "new" event is already a forward extremity then stop, as + // nothing changes. + for _, event := range events { + if event.EventID() == newEvent.EventID() { + return false, nil + } + } + + // Include our new event in the extremities. + newLatest := []types.StateAtEventAndReference{newStateAndRef} + + // Then run through and see if the other extremities are still valid. + // If our new event references them then they are no longer good + // candidates. + for _, prevEventID := range newEvent.PrevEventIDs() { + delete(existingRefs, prevEventID) + } + + // Ensure that we don't add any candidate forward extremities from + // the old set that are, themselves, referenced by the old set of + // forward extremities. This shouldn't happen but guards against + // the possibility anyway. + for prevEventID := range existingPrevs { + delete(existingRefs, prevEventID) + } + + // Then re-add any old extremities that are still valid after all. + for _, old := range existingRefs { + newLatest = append(newLatest, *old) } u.latest = newLatest - return nil + return true, nil } func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) { diff --git a/roomserver/state/state.go b/roomserver/state/state.go index 2944f71c..d23f14c8 100644 --- a/roomserver/state/state.go +++ b/roomserver/state/state.go @@ -526,13 +526,7 @@ func (v StateResolution) CalculateAndStoreStateBeforeEvent( isRejected bool, ) (types.StateSnapshotNID, error) { // Load the state at the prev events. - prevEventRefs := event.PrevEvents() - prevEventIDs := make([]string, len(prevEventRefs)) - for i := range prevEventRefs { - prevEventIDs[i] = prevEventRefs[i].EventID - } - - prevStates, err := v.db.StateAtEventIDs(ctx, prevEventIDs) + prevStates, err := v.db.StateAtEventIDs(ctx, event.PrevEventIDs()) if err != nil { return 0, err }