dendrite/syncapi/streams/streams.go
Neil Alexander b5a8935042
Sync refactor — Part 1 (#1688)
* It's half-alive

* Wakeups largely working

* Other tweaks, typing works

* Fix bugs, add receipt stream

* Delete notifier, other tweaks

* Dedupe a bit, add a template for the invite stream

* Clean up, add templates for other streams

* Don't leak channels

* Bring forward some more PDU logic, clean up other places

* Add some more wakeups

* Use addRoomDeltaToResponse

* Log tweaks, typing fixed?

* Fix timed out syncs

* Don't reset next batch position on timeout

* Add account data stream/position

* End of day

* Fix complete sync for receipt, typing

* Streams package

* Clean up a bit

* Complete sync send-to-device

* Don't drop errors

* More lightweight notifications

* Fix typing positions

* Don't advance position on remove again unless needed

* Device list updates

* Advance account data position

* Use limit for incremental sync

* Limit fixes, amongst other things

* Remove some fmt.Println

* Tweaks

* Re-add notifier

* Fix invite position

* Fixes

* Notify account data without advancing PDU position in notifier

* Apply account data position

* Get initial position for account data

* Fix position update

* Fix complete sync positions

* Review comments @Kegsay

* Room consumer parameters
2021-01-08 16:59:06 +00:00

79 lines
2.6 KiB
Go

package streams
import (
"context"
"github.com/matrix-org/dendrite/eduserver/cache"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
type Streams struct {
PDUStreamProvider types.StreamProvider
TypingStreamProvider types.StreamProvider
ReceiptStreamProvider types.StreamProvider
InviteStreamProvider types.StreamProvider
SendToDeviceStreamProvider types.StreamProvider
AccountDataStreamProvider types.StreamProvider
DeviceListStreamProvider types.PartitionedStreamProvider
}
func NewSyncStreamProviders(
d storage.Database, userAPI userapi.UserInternalAPI,
rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI,
eduCache *cache.EDUCache,
) *Streams {
streams := &Streams{
PDUStreamProvider: &PDUStreamProvider{
StreamProvider: StreamProvider{DB: d},
},
TypingStreamProvider: &TypingStreamProvider{
StreamProvider: StreamProvider{DB: d},
EDUCache: eduCache,
},
ReceiptStreamProvider: &ReceiptStreamProvider{
StreamProvider: StreamProvider{DB: d},
},
InviteStreamProvider: &InviteStreamProvider{
StreamProvider: StreamProvider{DB: d},
},
SendToDeviceStreamProvider: &SendToDeviceStreamProvider{
StreamProvider: StreamProvider{DB: d},
},
AccountDataStreamProvider: &AccountDataStreamProvider{
StreamProvider: StreamProvider{DB: d},
userAPI: userAPI,
},
DeviceListStreamProvider: &DeviceListStreamProvider{
PartitionedStreamProvider: PartitionedStreamProvider{DB: d},
rsAPI: rsAPI,
keyAPI: keyAPI,
},
}
streams.PDUStreamProvider.Setup()
streams.TypingStreamProvider.Setup()
streams.ReceiptStreamProvider.Setup()
streams.InviteStreamProvider.Setup()
streams.SendToDeviceStreamProvider.Setup()
streams.AccountDataStreamProvider.Setup()
streams.DeviceListStreamProvider.Setup()
return streams
}
func (s *Streams) Latest(ctx context.Context) types.StreamingToken {
return types.StreamingToken{
PDUPosition: s.PDUStreamProvider.LatestPosition(ctx),
TypingPosition: s.TypingStreamProvider.LatestPosition(ctx),
ReceiptPosition: s.PDUStreamProvider.LatestPosition(ctx),
InvitePosition: s.InviteStreamProvider.LatestPosition(ctx),
SendToDevicePosition: s.SendToDeviceStreamProvider.LatestPosition(ctx),
AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx),
DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx),
}
}