From db7d9cba8ad28c300dd66c01b9b0ce2414e8cbe0 Mon Sep 17 00:00:00 2001 From: kegsay Date: Thu, 20 Jan 2022 15:26:45 +0000 Subject: [PATCH] BREAKING: Remove Partitioned Stream Positions (#2096) * go mod tidy * Break complement to check it fails CI * Remove partitioned stream positions This was used by the device list stream position. The device list position now corresponds to the `Offset`, and the partition is always 0, in prep for removing reliance on Kafka topics for device list changes. * Linting * Migrate old style tokens to new style because element-web doesn't soft-logoout on 4xx errors on /sync --- go.mod | 2 +- keyserver/api/api.go | 4 -- keyserver/internal/internal.go | 7 +-- keyserver/producers/keychange.go | 9 --- syncapi/consumers/keychange.go | 22 +++---- syncapi/internal/keychange.go | 40 +++++-------- syncapi/internal/keychange_test.go | 21 +++---- syncapi/streams/stream_devicelist.go | 8 +-- syncapi/streams/streams.go | 8 +-- syncapi/streams/template_pstream.go | 38 ------------ syncapi/sync/requestpool.go | 6 ++ syncapi/types/provider.go | 8 --- syncapi/types/types.go | 87 +++++++--------------------- syncapi/types/types_test.go | 41 ++----------- 14 files changed, 70 insertions(+), 231 deletions(-) delete mode 100644 syncapi/streams/template_pstream.go diff --git a/go.mod b/go.mod index eb421ce1..e6202f66 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/Arceliar/phony v0.0.0-20210209235338-dde1a8dca979 github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect - github.com/MFAshby/stdemuxerhook v1.0.0 // indirect + github.com/MFAshby/stdemuxerhook v1.0.0 github.com/Masterminds/semver/v3 v3.1.1 github.com/S7evinK/saramajetstream v0.0.0-20210709110708-de6efc8c4a32 github.com/Shopify/sarama v1.29.0 diff --git a/keyserver/api/api.go b/keyserver/api/api.go index 5a109cc6..eae14ae1 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -224,8 +224,6 @@ type QueryKeysResponse struct { } type QueryKeyChangesRequest struct { - // The partition which had key events sent to - Partition int32 // The offset of the last received key event, or sarama.OffsetOldest if this is from the beginning Offset int64 // The inclusive offset where to track key changes up to. Messages with this offset are included in the response. @@ -236,8 +234,6 @@ type QueryKeyChangesRequest struct { type QueryKeyChangesResponse struct { // The set of users who have had their keys change. UserIDs []string - // The partition being served - useful if the partition is unknown at request time - Partition int32 // The latest offset represented in this response. Offset int64 // Set if there was a problem handling the request. diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index 3e91962e..0140a8f4 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -59,17 +59,14 @@ func (a *KeyInternalAPI) InputDeviceListUpdate( } func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) { - if req.Partition < 0 { - req.Partition = a.Producer.DefaultPartition() - } - userIDs, latest, err := a.DB.KeyChanges(ctx, req.Partition, req.Offset, req.ToOffset) + partition := 0 + userIDs, latest, err := a.DB.KeyChanges(ctx, int32(partition), req.Offset, req.ToOffset) if err != nil { res.Error = &api.KeyError{ Err: err.Error(), } } res.Offset = latest - res.Partition = req.Partition res.UserIDs = userIDs } diff --git a/keyserver/producers/keychange.go b/keyserver/producers/keychange.go index 782675c2..c5ddf2c1 100644 --- a/keyserver/producers/keychange.go +++ b/keyserver/producers/keychange.go @@ -32,15 +32,6 @@ type KeyChange struct { DB storage.Database } -// DefaultPartition returns the default partition this process is sending key changes to. -// NB: A keyserver MUST send key changes to only 1 partition or else query operations will -// become inconsistent. Partitions can be sharded (e.g by hash of user ID of key change) but -// then all keyservers must be queried to calculate the entire set of key changes between -// two sync tokens. -func (p *KeyChange) DefaultPartition() int32 { - return 0 -} - // ProduceKeyChanges creates new change events for each key func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error { userToDeviceCount := make(map[string]int) diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 76b143d8..d63e4832 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -38,7 +38,7 @@ type OutputKeyChangeEventConsumer struct { keyChangeConsumer *internal.ContinualConsumer db storage.Database notifier *notifier.Notifier - stream types.PartitionedStreamProvider + stream types.StreamProvider serverName gomatrixserverlib.ServerName // our server name rsAPI roomserverAPI.RoomserverInternalAPI keyAPI api.KeyInternalAPI @@ -57,7 +57,7 @@ func NewOutputKeyChangeEventConsumer( rsAPI roomserverAPI.RoomserverInternalAPI, store storage.Database, notifier *notifier.Notifier, - stream types.PartitionedStreamProvider, + stream types.StreamProvider, ) *OutputKeyChangeEventConsumer { consumer := internal.ContinualConsumer{ @@ -118,15 +118,15 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er } switch m.Type { case api.TypeCrossSigningUpdate: - return s.onCrossSigningMessage(m, msg.Offset, msg.Partition) + return s.onCrossSigningMessage(m, msg.Offset) case api.TypeDeviceKeyUpdate: fallthrough default: - return s.onDeviceKeyMessage(m, msg.Offset, msg.Partition) + return s.onDeviceKeyMessage(m, msg.Offset) } } -func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, offset int64, partition int32) error { +func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, offset int64) error { if m.DeviceKeys == nil { return nil } @@ -143,10 +143,7 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, o } // make sure we get our own key updates too! queryRes.UserIDsToCount[output.UserID] = 1 - posUpdate := types.LogPosition{ - Offset: offset, - Partition: partition, - } + posUpdate := types.StreamPosition(offset) s.stream.Advance(posUpdate) for userID := range queryRes.UserIDsToCount { @@ -156,7 +153,7 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, o return nil } -func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, offset int64, partition int32) error { +func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, offset int64) error { output := m.CrossSigningKeyUpdate // work out who we need to notify about the new key var queryRes roomserverAPI.QuerySharedUsersResponse @@ -170,10 +167,7 @@ func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage } // make sure we get our own key updates too! queryRes.UserIDsToCount[output.UserID] = 1 - posUpdate := types.LogPosition{ - Offset: offset, - Partition: partition, - } + posUpdate := types.StreamPosition(offset) s.stream.Advance(posUpdate) for userID := range queryRes.UserIDsToCount { diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index 56a438fb..41efd4a0 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -47,8 +47,8 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID, // be already filled in with join/leave information. func DeviceListCatchup( ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, - userID string, res *types.Response, from, to types.LogPosition, -) (newPos types.LogPosition, hasNew bool, err error) { + userID string, res *types.Response, from, to types.StreamPosition, +) (newPos types.StreamPosition, hasNew bool, err error) { // Track users who we didn't track before but now do by virtue of sharing a room with them, or not. newlyJoinedRooms := joinedRooms(res, userID) @@ -64,27 +64,18 @@ func DeviceListCatchup( } // now also track users who we already share rooms with but who have updated their devices between the two tokens - - var partition int32 - var offset int64 - partition = -1 - offset = sarama.OffsetOldest - // Extract partition/offset from sync token - // TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make. - if !from.IsEmpty() { - partition = from.Partition - offset = from.Offset + offset := sarama.OffsetOldest + toOffset := sarama.OffsetNewest + if to > 0 && to > from { + toOffset = int64(to) } - var toOffset int64 - toOffset = sarama.OffsetNewest - if toLog := to; toLog.Partition == partition && toLog.Offset > 0 { - toOffset = toLog.Offset + if from > 0 { + offset = int64(from) } var queryRes keyapi.QueryKeyChangesResponse keyAPI.QueryKeyChanges(ctx, &keyapi.QueryKeyChangesRequest{ - Partition: partition, - Offset: offset, - ToOffset: toOffset, + Offset: offset, + ToOffset: toOffset, }, &queryRes) if queryRes.Error != nil { // don't fail the catchup because we may have got useful information by tracking membership @@ -95,8 +86,8 @@ func DeviceListCatchup( var sharedUsersMap map[string]int sharedUsersMap, queryRes.UserIDs = filterSharedUsers(ctx, rsAPI, userID, queryRes.UserIDs) util.GetLogger(ctx).Debugf( - "QueryKeyChanges request p=%d,off=%d,to=%d response p=%d off=%d uids=%v", - partition, offset, toOffset, queryRes.Partition, queryRes.Offset, queryRes.UserIDs, + "QueryKeyChanges request off=%d,to=%d response off=%d uids=%v", + offset, toOffset, queryRes.Offset, queryRes.UserIDs, ) userSet := make(map[string]bool) for _, userID := range res.DeviceLists.Changed { @@ -125,13 +116,8 @@ func DeviceListCatchup( res.DeviceLists.Left = append(res.DeviceLists.Left, userID) } } - // set the new token - to = types.LogPosition{ - Partition: queryRes.Partition, - Offset: queryRes.Offset, - } - return to, hasNew, nil + return types.StreamPosition(queryRes.Offset), hasNew, nil } // TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response. diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index e52e5556..d9fb9cf8 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -6,7 +6,6 @@ import ( "sort" "testing" - "github.com/Shopify/sarama" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" @@ -16,11 +15,7 @@ import ( var ( syncingUser = "@alice:localhost" - emptyToken = types.LogPosition{} - newestToken = types.LogPosition{ - Offset: sarama.OffsetNewest, - Partition: 0, - } + emptyToken = types.StreamPosition(0) ) type mockKeyAPI struct{} @@ -186,7 +181,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { "!another:room": {syncingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -209,7 +204,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { "!another:room": {syncingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -232,7 +227,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { "!another:room": {syncingUser, existingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -254,7 +249,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { "!another:room": {syncingUser, existingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -313,7 +308,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { roomID: {syncingUser, existingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -341,7 +336,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) { "!another:room": {syncingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -427,7 +422,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { }, } _, hasNew, err := DeviceListCatchup( - context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken, + context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken, ) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) diff --git a/syncapi/streams/stream_devicelist.go b/syncapi/streams/stream_devicelist.go index 9ea9d088..6ff8a7fd 100644 --- a/syncapi/streams/stream_devicelist.go +++ b/syncapi/streams/stream_devicelist.go @@ -10,7 +10,7 @@ import ( ) type DeviceListStreamProvider struct { - PartitionedStreamProvider + StreamProvider rsAPI api.RoomserverInternalAPI keyAPI keyapi.KeyInternalAPI } @@ -18,15 +18,15 @@ type DeviceListStreamProvider struct { func (p *DeviceListStreamProvider) CompleteSync( ctx context.Context, req *types.SyncRequest, -) types.LogPosition { +) types.StreamPosition { return p.LatestPosition(ctx) } func (p *DeviceListStreamProvider) IncrementalSync( ctx context.Context, req *types.SyncRequest, - from, to types.LogPosition, -) types.LogPosition { + from, to types.StreamPosition, +) types.StreamPosition { var err error to, _, err = internal.DeviceListCatchup(context.Background(), p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to) if err != nil { diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index ba4118df..6b02c75e 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -18,7 +18,7 @@ type Streams struct { InviteStreamProvider types.StreamProvider SendToDeviceStreamProvider types.StreamProvider AccountDataStreamProvider types.StreamProvider - DeviceListStreamProvider types.PartitionedStreamProvider + DeviceListStreamProvider types.StreamProvider } func NewSyncStreamProviders( @@ -48,9 +48,9 @@ func NewSyncStreamProviders( userAPI: userAPI, }, DeviceListStreamProvider: &DeviceListStreamProvider{ - PartitionedStreamProvider: PartitionedStreamProvider{DB: d}, - rsAPI: rsAPI, - keyAPI: keyAPI, + StreamProvider: StreamProvider{DB: d}, + rsAPI: rsAPI, + keyAPI: keyAPI, }, } diff --git a/syncapi/streams/template_pstream.go b/syncapi/streams/template_pstream.go deleted file mode 100644 index 265e22a2..00000000 --- a/syncapi/streams/template_pstream.go +++ /dev/null @@ -1,38 +0,0 @@ -package streams - -import ( - "context" - "sync" - - "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/types" -) - -type PartitionedStreamProvider struct { - DB storage.Database - latest types.LogPosition - latestMutex sync.RWMutex -} - -func (p *PartitionedStreamProvider) Setup() { -} - -func (p *PartitionedStreamProvider) Advance( - latest types.LogPosition, -) { - p.latestMutex.Lock() - defer p.latestMutex.Unlock() - - if latest.IsAfter(&p.latest) { - p.latest = latest - } -} - -func (p *PartitionedStreamProvider) LatestPosition( - ctx context.Context, -) types.LogPosition { - p.latestMutex.RLock() - defer p.latestMutex.RUnlock() - - return p.latest -} diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index a4573610..ca35951a 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -140,6 +140,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. // Extract values from request syncReq, err := newSyncRequest(req, *device, rp.db) if err != nil { + if err == types.ErrMalformedSyncToken { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidArgumentValue(err.Error()), + } + } return util.JSONResponse{ Code: http.StatusBadRequest, JSON: jsonerror.Unknown(err.Error()), diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go index 93ed1266..f6185fcb 100644 --- a/syncapi/types/provider.go +++ b/syncapi/types/provider.go @@ -42,11 +42,3 @@ type StreamProvider interface { // LatestPosition returns the latest stream position for this stream. LatestPosition(ctx context.Context) StreamPosition } - -type PartitionedStreamProvider interface { - Setup() - Advance(latest LogPosition) - CompleteSync(ctx context.Context, req *SyncRequest) LogPosition - IncrementalSync(ctx context.Context, req *SyncRequest, from, to LogPosition) LogPosition - LatestPosition(ctx context.Context) LogPosition -} diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 44e718b3..68c308d8 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -16,6 +16,7 @@ package types import ( "encoding/json" + "errors" "fmt" "strconv" "strings" @@ -26,13 +27,10 @@ import ( ) var ( - // ErrInvalidSyncTokenType is returned when an attempt at creating a - // new instance of SyncToken with an invalid type (i.e. neither "s" - // nor "t"). - ErrInvalidSyncTokenType = fmt.Errorf("sync token has an unknown prefix (should be either s or t)") - // ErrInvalidSyncTokenLen is returned when the pagination token is an - // invalid length - ErrInvalidSyncTokenLen = fmt.Errorf("sync token has an invalid length") + // This error is returned when parsing sync tokens if the token is invalid. Callers can use this + // error to detect whether to 400 or 401 the client. It is recommended to 401 them to force a + // logout. + ErrMalformedSyncToken = errors.New("malformed sync token") ) type StateDelta struct { @@ -47,27 +45,6 @@ type StateDelta struct { // StreamPosition represents the offset in the sync stream a client is at. type StreamPosition int64 -// LogPosition represents the offset in a Kafka log a client is at. -type LogPosition struct { - Partition int32 - Offset int64 -} - -func (p *LogPosition) IsEmpty() bool { - return p.Offset == 0 -} - -// IsAfter returns true if this position is after `lp`. -func (p *LogPosition) IsAfter(lp *LogPosition) bool { - if lp == nil { - return false - } - if p.Partition != lp.Partition { - return false - } - return p.Offset > lp.Offset -} - // StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event. type StreamEvent struct { *gomatrixserverlib.HeaderedEvent @@ -124,7 +101,7 @@ type StreamingToken struct { SendToDevicePosition StreamPosition InvitePosition StreamPosition AccountDataPosition StreamPosition - DeviceListPosition LogPosition + DeviceListPosition StreamPosition } // This will be used as a fallback by json.Marshal. @@ -140,14 +117,11 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) { func (t StreamingToken) String() string { posStr := fmt.Sprintf( - "s%d_%d_%d_%d_%d_%d", + "s%d_%d_%d_%d_%d_%d_%d", t.PDUPosition, t.TypingPosition, t.ReceiptPosition, t.SendToDevicePosition, - t.InvitePosition, t.AccountDataPosition, + t.InvitePosition, t.AccountDataPosition, t.DeviceListPosition, ) - if dl := t.DeviceListPosition; !dl.IsEmpty() { - posStr += fmt.Sprintf(".dl-%d-%d", dl.Partition, dl.Offset) - } return posStr } @@ -166,14 +140,14 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool { return true case t.AccountDataPosition > other.AccountDataPosition: return true - case t.DeviceListPosition.IsAfter(&other.DeviceListPosition): + case t.DeviceListPosition > other.DeviceListPosition: return true } return false } func (t *StreamingToken) IsEmpty() bool { - return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition == 0 && t.DeviceListPosition.IsEmpty() + return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition == 0 } // WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken. @@ -208,7 +182,7 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) { if other.AccountDataPosition > t.AccountDataPosition { t.AccountDataPosition = other.AccountDataPosition } - if other.DeviceListPosition.IsAfter(&t.DeviceListPosition) { + if other.DeviceListPosition > t.DeviceListPosition { t.DeviceListPosition = other.DeviceListPosition } } @@ -292,16 +266,18 @@ func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) { func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { if len(tok) < 1 { - err = fmt.Errorf("empty stream token") + err = ErrMalformedSyncToken return } if tok[0] != SyncTokenTypeStream[0] { - err = fmt.Errorf("stream token must start with 's'") + err = ErrMalformedSyncToken return } - categories := strings.Split(tok[1:], ".") - parts := strings.Split(categories[0], "_") - var positions [6]StreamPosition + // Migration: Remove everything after and including '.' - we previously had tokens like: + // s478_0_0_0_0_13.dl-0-2 but we have now removed partitioned stream positions + tok = strings.Split(tok, ".")[0] + parts := strings.Split(tok[1:], "_") + var positions [7]StreamPosition for i, p := range parts { if i > len(positions) { break @@ -309,6 +285,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { var pos int pos, err = strconv.Atoi(p) if err != nil { + err = ErrMalformedSyncToken return } positions[i] = StreamPosition(pos) @@ -320,31 +297,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { SendToDevicePosition: positions[3], InvitePosition: positions[4], AccountDataPosition: positions[5], - } - // dl-0-1234 - // $log_name-$partition-$offset - for _, logStr := range categories[1:] { - segments := strings.Split(logStr, "-") - if len(segments) != 3 { - err = fmt.Errorf("invalid log position %q", logStr) - return - } - switch segments[0] { - case "dl": - // Device list syncing - var partition, offset int - if partition, err = strconv.Atoi(segments[1]); err != nil { - return - } - if offset, err = strconv.Atoi(segments[2]); err != nil { - return - } - token.DeviceListPosition.Partition = int32(partition) - token.DeviceListPosition.Offset = int64(offset) - default: - err = fmt.Errorf("unrecognised token type %q", segments[0]) - return - } + DeviceListPosition: positions[6], } return token, nil } diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go index 3e577788..cda178b3 100644 --- a/syncapi/types/types_test.go +++ b/syncapi/types/types_test.go @@ -2,50 +2,17 @@ package types import ( "encoding/json" - "reflect" "testing" "github.com/matrix-org/gomatrixserverlib" ) -func TestNewSyncTokenWithLogs(t *testing.T) { - tests := map[string]*StreamingToken{ - "s4_0_0_0_0_0": { - PDUPosition: 4, - }, - "s4_0_0_0_0_0.dl-0-123": { - PDUPosition: 4, - DeviceListPosition: LogPosition{ - Partition: 0, - Offset: 123, - }, - }, - } - for tok, want := range tests { - got, err := NewStreamTokenFromString(tok) - if err != nil { - if want == nil { - continue // error expected - } - t.Errorf("%s errored: %s", tok, err) - continue - } - if !reflect.DeepEqual(got, *want) { - t.Errorf("%s mismatch: got %v want %v", tok, got, want) - } - gotStr := got.String() - if gotStr != tok { - t.Errorf("%s reserialisation mismatch: got %s want %s", tok, gotStr, tok) - } - } -} - func TestSyncTokens(t *testing.T) { shouldPass := map[string]string{ - "s4_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, LogPosition{}}.String(), - "s3_1_0_0_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, 0, 0, LogPosition{1, 2}}.String(), - "s3_1_2_3_5_0": StreamingToken{3, 1, 2, 3, 5, 0, LogPosition{}}.String(), - "t3_1": TopologyToken{3, 1}.String(), + "s4_0_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, 0}.String(), + "s3_1_0_0_0_0_2": StreamingToken{3, 1, 0, 0, 0, 0, 2}.String(), + "s3_1_2_3_5_0_0": StreamingToken{3, 1, 2, 3, 5, 0, 0}.String(), + "t3_1": TopologyToken{3, 1}.String(), } for a, b := range shouldPass {