mirror of
https://github.com/1f349/dendrite.git
synced 2024-11-22 11:41:38 +00:00
Tweaks to latest events updater (#1045)
* Comment out updaters a bit, add overwrite flag to latest events * Make sure we don't send fast-forwarded state changes over federation, start with empty set when overwriting * Remove redundant check for overwrite
This commit is contained in:
parent
5c221f0655
commit
9ef30bb13b
@ -52,13 +52,15 @@ func processRoomEvent(
|
|||||||
headered := input.Event
|
headered := input.Event
|
||||||
event := headered.Unwrap()
|
event := headered.Unwrap()
|
||||||
|
|
||||||
// Check that the event passes authentication checks and work out the numeric IDs for the auth events.
|
// Check that the event passes authentication checks and work out
|
||||||
|
// the numeric IDs for the auth events.
|
||||||
authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs)
|
authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("processRoomEvent.checkAuthEvents failed for event")
|
logrus.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("processRoomEvent.checkAuthEvents failed for event")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we don't have a transaction ID then get one.
|
||||||
if input.TransactionID != nil {
|
if input.TransactionID != nil {
|
||||||
tdID := input.TransactionID
|
tdID := input.TransactionID
|
||||||
eventID, err = db.GetTransactionEventID(
|
eventID, err = db.GetTransactionEventID(
|
||||||
@ -70,17 +72,21 @@ func processRoomEvent(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the event
|
// Store the event.
|
||||||
roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs)
|
roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For outliers we can stop after we've stored the event itself as it
|
||||||
|
// doesn't have any associated state to store and we don't need to
|
||||||
|
// notify anyone about it.
|
||||||
if input.Kind == api.KindOutlier {
|
if input.Kind == api.KindOutlier {
|
||||||
// For outliers we can stop after we've stored the event itself as it
|
logrus.WithFields(logrus.Fields{
|
||||||
// doesn't have any associated state to store and we don't need to
|
"event_id": event.EventID(),
|
||||||
// notify anyone about it.
|
"type": event.Type(),
|
||||||
logrus.WithField("event_id", event.EventID()).WithField("type", event.Type()).WithField("room", event.RoomID()).Info("Stored outlier")
|
"room": event.RoomID(),
|
||||||
|
}).Info("Stored outlier")
|
||||||
return event.EventID(), nil
|
return event.EventID(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -93,10 +99,21 @@ func processRoomEvent(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = updateLatestEvents(
|
||||||
|
ctx, // context
|
||||||
|
db, // roomserver database
|
||||||
|
ow, // output event writer
|
||||||
|
roomNID, // room NID to update
|
||||||
|
stateAtEvent, // state at event (below)
|
||||||
|
event, // event
|
||||||
|
input.SendAsServer, // send as server
|
||||||
|
input.TransactionID, // transaction ID
|
||||||
|
); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Update the extremities of the event graph for the room
|
// Update the extremities of the event graph for the room
|
||||||
return event.EventID(), updateLatestEvents(
|
return event.EventID(), nil
|
||||||
ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func calculateAndSetState(
|
func calculateAndSetState(
|
||||||
@ -111,6 +128,9 @@ func calculateAndSetState(
|
|||||||
roomState := state.NewStateResolution(db)
|
roomState := state.NewStateResolution(db)
|
||||||
|
|
||||||
if input.HasState {
|
if input.HasState {
|
||||||
|
// TODO: Check here if we think we're in the room already.
|
||||||
|
stateAtEvent.Overwrite = true
|
||||||
|
|
||||||
// We've been told what the state at the event is so we don't need to calculate it.
|
// We've been told what the state at the event is so we don't need to calculate it.
|
||||||
// Check that those state events are in the database and store the state.
|
// Check that those state events are in the database and store the state.
|
||||||
var entries []types.StateEntry
|
var entries []types.StateEntry
|
||||||
@ -122,6 +142,8 @@ func calculateAndSetState(
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
stateAtEvent.Overwrite = false
|
||||||
|
|
||||||
// We haven't been told what the state at the event is so we need to calculate it from the prev_events
|
// We haven't been told what the state at the event is so we need to calculate it from the prev_events
|
||||||
if stateAtEvent.BeforeStateSnapshotNID, err = roomState.CalculateAndStoreStateBeforeEvent(ctx, event, roomNID); err != nil {
|
if stateAtEvent.BeforeStateSnapshotNID, err = roomState.CalculateAndStoreStateBeforeEvent(ctx, event, roomNID); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -69,10 +69,17 @@ func updateLatestEvents(
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
u := latestEventsUpdater{
|
u := latestEventsUpdater{
|
||||||
ctx: ctx, db: db, updater: updater, ow: ow, roomNID: roomNID,
|
ctx: ctx,
|
||||||
stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer,
|
db: db,
|
||||||
|
updater: updater,
|
||||||
|
ow: ow,
|
||||||
|
roomNID: roomNID,
|
||||||
|
stateAtEvent: stateAtEvent,
|
||||||
|
event: event,
|
||||||
|
sendAsServer: sendAsServer,
|
||||||
transactionID: transactionID,
|
transactionID: transactionID,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = u.doUpdateLatestEvents(); err != nil {
|
if err = u.doUpdateLatestEvents(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -115,38 +122,65 @@ type latestEventsUpdater struct {
|
|||||||
|
|
||||||
func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
||||||
prevEvents := u.event.PrevEvents()
|
prevEvents := u.event.PrevEvents()
|
||||||
oldLatest := u.updater.LatestEvents()
|
|
||||||
u.lastEventIDSent = u.updater.LastEventIDSent()
|
u.lastEventIDSent = u.updater.LastEventIDSent()
|
||||||
u.oldStateNID = u.updater.CurrentStateSnapshotNID()
|
u.oldStateNID = u.updater.CurrentStateSnapshotNID()
|
||||||
|
|
||||||
|
// If we are doing a regular event update then we will get the
|
||||||
|
// previous latest events to use as a part of the calculation. If
|
||||||
|
// we are overwriting the latest events because we have a complete
|
||||||
|
// state snapshot from somewhere else, e.g. a federated room join,
|
||||||
|
// then start with an empty set - none of the forward extremities
|
||||||
|
// that we knew about before matter anymore.
|
||||||
|
oldLatest := []types.StateAtEventAndReference{}
|
||||||
|
if !u.stateAtEvent.Overwrite {
|
||||||
|
oldLatest = u.updater.LatestEvents()
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the event has already been written to the output log then we
|
||||||
|
// don't need to do anything, as we've handled it already.
|
||||||
hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID)
|
hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else if hasBeenSent {
|
} else if hasBeenSent {
|
||||||
// Already sent this event so we can stop processing
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update the roomserver_previous_events table with references. This
|
||||||
|
// is effectively tracking the structure of the DAG.
|
||||||
if err = u.updater.StorePreviousEvents(u.stateAtEvent.EventNID, prevEvents); err != nil {
|
if err = u.updater.StorePreviousEvents(u.stateAtEvent.EventNID, prevEvents); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get the event reference for our new event. This will be used when
|
||||||
|
// determining if the event is referenced by an existing event.
|
||||||
eventReference := u.event.EventReference()
|
eventReference := u.event.EventReference()
|
||||||
// Check if this event is already referenced by another event in the room.
|
|
||||||
|
// Check if our new event is already referenced by an existing event
|
||||||
|
// in the room. If it is then it isn't a latest event.
|
||||||
alreadyReferenced, err := u.updater.IsReferenced(eventReference)
|
alreadyReferenced, err := u.updater.IsReferenced(eventReference)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
u.latest = calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{
|
// Work out what the latest events are.
|
||||||
EventReference: eventReference,
|
u.latest = calculateLatest(
|
||||||
StateAtEvent: u.stateAtEvent,
|
oldLatest,
|
||||||
})
|
alreadyReferenced,
|
||||||
|
prevEvents,
|
||||||
|
types.StateAtEventAndReference{
|
||||||
|
EventReference: eventReference,
|
||||||
|
StateAtEvent: u.stateAtEvent,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
// Now that we know what the latest events are, it's time to get the
|
||||||
|
// latest state.
|
||||||
if err = u.latestState(); err != nil {
|
if err = u.latestState(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we need to generate any output events then here's where we do it.
|
||||||
|
// TODO: Move this!
|
||||||
updates, err := updateMemberships(u.ctx, u.db, u.updater, u.removed, u.added)
|
updates, err := updateMemberships(u.ctx, u.db, u.updater, u.removed, u.added)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -181,10 +215,15 @@ func (u *latestEventsUpdater) latestState() error {
|
|||||||
var err error
|
var err error
|
||||||
roomState := state.NewStateResolution(u.db)
|
roomState := state.NewStateResolution(u.db)
|
||||||
|
|
||||||
|
// Get a list of the current latest events.
|
||||||
latestStateAtEvents := make([]types.StateAtEvent, len(u.latest))
|
latestStateAtEvents := make([]types.StateAtEvent, len(u.latest))
|
||||||
for i := range u.latest {
|
for i := range u.latest {
|
||||||
latestStateAtEvents[i] = u.latest[i].StateAtEvent
|
latestStateAtEvents[i] = u.latest[i].StateAtEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Takes the NIDs of the latest events and creates a state snapshot
|
||||||
|
// of the state after the events. The snapshot state will be resolved
|
||||||
|
// using the correct state resolution algorithm for the room.
|
||||||
u.newStateNID, err = roomState.CalculateAndStoreStateAfterEvents(
|
u.newStateNID, err = roomState.CalculateAndStoreStateAfterEvents(
|
||||||
u.ctx, u.roomNID, latestStateAtEvents,
|
u.ctx, u.roomNID, latestStateAtEvents,
|
||||||
)
|
)
|
||||||
@ -192,6 +231,18 @@ func (u *latestEventsUpdater) latestState() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we are overwriting the state then we should make sure that we
|
||||||
|
// don't send anything out over federation again, it will very likely
|
||||||
|
// be a repeat.
|
||||||
|
if u.stateAtEvent.Overwrite {
|
||||||
|
u.sendAsServer = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now that we have a new state snapshot based on the latest events,
|
||||||
|
// we can compare that new snapshot to the previous one and see what
|
||||||
|
// has changed. This gives us one list of removed state events and
|
||||||
|
// another list of added ones. Replacing a value for a state-key tuple
|
||||||
|
// will result one removed (the old event) and one added (the new event).
|
||||||
u.removed, u.added, err = roomState.DifferenceBetweeenStateSnapshots(
|
u.removed, u.added, err = roomState.DifferenceBetweeenStateSnapshots(
|
||||||
u.ctx, u.oldStateNID, u.newStateNID,
|
u.ctx, u.oldStateNID, u.newStateNID,
|
||||||
)
|
)
|
||||||
@ -199,6 +250,8 @@ func (u *latestEventsUpdater) latestState() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Also work out the state before the event removes and the event
|
||||||
|
// adds.
|
||||||
u.stateBeforeEventRemoves, u.stateBeforeEventAdds, err = roomState.DifferenceBetweeenStateSnapshots(
|
u.stateBeforeEventRemoves, u.stateBeforeEventAdds, err = roomState.DifferenceBetweeenStateSnapshots(
|
||||||
u.ctx, u.newStateNID, u.stateAtEvent.BeforeStateSnapshotNID,
|
u.ctx, u.newStateNID, u.stateAtEvent.BeforeStateSnapshotNID,
|
||||||
)
|
)
|
||||||
|
@ -16,7 +16,6 @@ package internal
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
@ -108,10 +107,10 @@ func updateMembership(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if add == nil {
|
if add == nil {
|
||||||
// This shouldn't happen. Returning an error here is better than panicking
|
// This can happen when we have rejoined a room and suddenly we have a
|
||||||
// in the membership updater functions later on.
|
// divergence between the former state and the new one. We don't want to
|
||||||
// TODO: Why does this happen to begin with?
|
// act on removals and apparently there are no adds, so stop here.
|
||||||
return updates, errors.New("add should not be nil")
|
return updates, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
mu, err := updater.MembershipUpdater(targetUserNID)
|
mu, err := updater.MembershipUpdater(targetUserNID)
|
||||||
|
@ -75,6 +75,10 @@ func (a StateEntry) LessThan(b StateEntry) bool {
|
|||||||
|
|
||||||
// StateAtEvent is the state before and after a matrix event.
|
// StateAtEvent is the state before and after a matrix event.
|
||||||
type StateAtEvent struct {
|
type StateAtEvent struct {
|
||||||
|
// Should this state overwrite the latest events and memberships of the room?
|
||||||
|
// This might be necessary when rejoining a federated room after a period of
|
||||||
|
// absence, as our state and latest events will be out of date.
|
||||||
|
Overwrite bool
|
||||||
// The state before the event.
|
// The state before the event.
|
||||||
BeforeStateSnapshotNID StateSnapshotNID
|
BeforeStateSnapshotNID StateSnapshotNID
|
||||||
// The state entry for the event itself, allows us to calculate the state after the event.
|
// The state entry for the event itself, allows us to calculate the state after the event.
|
||||||
|
Loading…
Reference in New Issue
Block a user