mirror of
https://github.com/1f349/dendrite.git
synced 2025-01-21 23:06:32 +00:00
Implement lazy loading on /sync
(#2346)
* Initial work on lazyloading * Partially implement lazy loading on /sync * Rename methods * Make missing tests pass * Preallocate slice, even if it will end up with fewer values * Let the cache handle the user mapping * Linter * Cap cache growth
This commit is contained in:
parent
3ddbffd59e
commit
57e3622b85
86
internal/caching/cache_lazy_load_members.go
Normal file
86
internal/caching/cache_lazy_load_members.go
Normal file
@ -0,0 +1,86 @@
|
||||
package caching
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
)
|
||||
|
||||
const (
|
||||
LazyLoadCacheName = "lazy_load_members"
|
||||
LazyLoadCacheMaxEntries = 128
|
||||
LazyLoadCacheMaxUserEntries = 128
|
||||
LazyLoadCacheMutable = true
|
||||
LazyLoadCacheMaxAge = time.Minute * 30
|
||||
)
|
||||
|
||||
type LazyLoadCache struct {
|
||||
// InMemoryLRUCachePartition containing other InMemoryLRUCachePartitions
|
||||
// with the actual cached members
|
||||
userCaches *InMemoryLRUCachePartition
|
||||
}
|
||||
|
||||
// NewLazyLoadCache creates a new LazyLoadCache.
|
||||
func NewLazyLoadCache() (*LazyLoadCache, error) {
|
||||
cache, err := NewInMemoryLRUCachePartition(
|
||||
LazyLoadCacheName,
|
||||
LazyLoadCacheMutable,
|
||||
LazyLoadCacheMaxEntries,
|
||||
LazyLoadCacheMaxAge,
|
||||
true,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go cacheCleaner(cache)
|
||||
return &LazyLoadCache{
|
||||
userCaches: cache,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *LazyLoadCache) lazyLoadCacheForUser(device *userapi.Device) (*InMemoryLRUCachePartition, error) {
|
||||
cacheName := fmt.Sprintf("%s/%s", device.UserID, device.ID)
|
||||
userCache, ok := c.userCaches.Get(cacheName)
|
||||
if ok && userCache != nil {
|
||||
if cache, ok := userCache.(*InMemoryLRUCachePartition); ok {
|
||||
return cache, nil
|
||||
}
|
||||
}
|
||||
cache, err := NewInMemoryLRUCachePartition(
|
||||
LazyLoadCacheName,
|
||||
LazyLoadCacheMutable,
|
||||
LazyLoadCacheMaxUserEntries,
|
||||
LazyLoadCacheMaxAge,
|
||||
false,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.userCaches.Set(cacheName, cache)
|
||||
go cacheCleaner(cache)
|
||||
return cache, nil
|
||||
}
|
||||
|
||||
func (c *LazyLoadCache) StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) {
|
||||
cache, err := c.lazyLoadCacheForUser(device)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
cacheKey := fmt.Sprintf("%s/%s/%s/%s", device.UserID, device.ID, roomID, userID)
|
||||
cache.Set(cacheKey, eventID)
|
||||
}
|
||||
|
||||
func (c *LazyLoadCache) IsLazyLoadedUserCached(device *userapi.Device, roomID, userID string) (string, bool) {
|
||||
cache, err := c.lazyLoadCacheForUser(device)
|
||||
if err != nil {
|
||||
return "", false
|
||||
}
|
||||
|
||||
cacheKey := fmt.Sprintf("%s/%s/%s/%s", device.UserID, device.ID, roomID, userID)
|
||||
val, ok := cache.Get(cacheKey)
|
||||
if !ok {
|
||||
return "", ok
|
||||
}
|
||||
return val.(string), ok
|
||||
}
|
@ -6,6 +6,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
@ -26,7 +27,8 @@ type PDUStreamProvider struct {
|
||||
|
||||
tasks chan func()
|
||||
workers atomic.Int32
|
||||
userAPI userapi.UserInternalAPI
|
||||
// userID+deviceID -> lazy loading cache
|
||||
lazyLoadCache *caching.LazyLoadCache
|
||||
}
|
||||
|
||||
func (p *PDUStreamProvider) worker() {
|
||||
@ -188,7 +190,7 @@ func (p *PDUStreamProvider) IncrementalSync(
|
||||
newPos = from
|
||||
for _, delta := range stateDeltas {
|
||||
var pos types.StreamPosition
|
||||
if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil {
|
||||
if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, &stateFilter, req.Response); err != nil {
|
||||
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
|
||||
return to
|
||||
}
|
||||
@ -209,6 +211,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||
r types.Range,
|
||||
delta types.StateDelta,
|
||||
eventFilter *gomatrixserverlib.RoomEventFilter,
|
||||
stateFilter *gomatrixserverlib.StateFilter,
|
||||
res *types.Response,
|
||||
) (types.StreamPosition, error) {
|
||||
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
|
||||
@ -247,7 +250,8 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||
// room that were returned.
|
||||
latestPosition := r.To
|
||||
updateLatestPosition := func(mostRecentEventID string) {
|
||||
if _, pos, err := p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil {
|
||||
var pos types.StreamPosition
|
||||
if _, pos, err = p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil {
|
||||
switch {
|
||||
case r.Backwards && pos > latestPosition:
|
||||
fallthrough
|
||||
@ -263,6 +267,19 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||
updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID())
|
||||
}
|
||||
|
||||
if stateFilter.LazyLoadMembers {
|
||||
if err != nil {
|
||||
return r.From, err
|
||||
}
|
||||
delta.StateEvents, err = p.lazyLoadMembers(
|
||||
ctx, delta.RoomID, true, limited, stateFilter.IncludeRedundantMembers,
|
||||
device, recentEvents, delta.StateEvents,
|
||||
)
|
||||
if err != nil {
|
||||
return r.From, err
|
||||
}
|
||||
}
|
||||
|
||||
hasMembershipChange := false
|
||||
for _, recentEvent := range recentStreamEvents {
|
||||
if recentEvent.Type() == gomatrixserverlib.MRoomMember && recentEvent.StateKey() != nil {
|
||||
@ -402,6 +419,20 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
|
||||
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
|
||||
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
||||
|
||||
if stateFilter.LazyLoadMembers {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stateEvents, err = p.lazyLoadMembers(ctx, roomID,
|
||||
false, limited, stateFilter.IncludeRedundantMembers,
|
||||
device, recentEvents, stateEvents,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
jr = types.NewJoinResponse()
|
||||
jr.Summary.JoinedMemberCount = &joinedCount
|
||||
jr.Summary.InvitedMemberCount = &invitedCount
|
||||
@ -412,6 +443,69 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||
return jr, nil
|
||||
}
|
||||
|
||||
func (p *PDUStreamProvider) lazyLoadMembers(
|
||||
ctx context.Context, roomID string,
|
||||
incremental, limited, includeRedundant bool,
|
||||
device *userapi.Device,
|
||||
timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent,
|
||||
) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
if len(timelineEvents) == 0 {
|
||||
return stateEvents, nil
|
||||
}
|
||||
// Work out which memberships to include
|
||||
timelineUsers := make(map[string]struct{})
|
||||
if !incremental {
|
||||
timelineUsers[device.UserID] = struct{}{}
|
||||
}
|
||||
// Add all users the client doesn't know about yet to a list
|
||||
for _, event := range timelineEvents {
|
||||
// Membership is not yet cached, add it to the list
|
||||
if _, ok := p.lazyLoadCache.IsLazyLoadedUserCached(device, roomID, event.Sender()); !ok {
|
||||
timelineUsers[event.Sender()] = struct{}{}
|
||||
}
|
||||
}
|
||||
// Preallocate with the same amount, even if it will end up with fewer values
|
||||
newStateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEvents))
|
||||
// Remove existing membership events we don't care about, e.g. users not in the timeline.events
|
||||
for _, event := range stateEvents {
|
||||
if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil {
|
||||
// If this is a gapped incremental sync, we still want this membership
|
||||
isGappedIncremental := limited && incremental
|
||||
// We want this users membership event, keep it in the list
|
||||
_, ok := timelineUsers[event.Sender()]
|
||||
wantMembership := ok || isGappedIncremental
|
||||
if wantMembership {
|
||||
newStateEvents = append(newStateEvents, event)
|
||||
if !includeRedundant {
|
||||
p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, event.Sender(), event.EventID())
|
||||
}
|
||||
delete(timelineUsers, event.Sender())
|
||||
}
|
||||
} else {
|
||||
newStateEvents = append(newStateEvents, event)
|
||||
}
|
||||
}
|
||||
wantUsers := make([]string, 0, len(timelineUsers))
|
||||
for userID := range timelineUsers {
|
||||
wantUsers = append(wantUsers, userID)
|
||||
}
|
||||
// Query missing membership events
|
||||
memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &gomatrixserverlib.StateFilter{
|
||||
Limit: 100,
|
||||
Senders: &wantUsers,
|
||||
Types: &[]string{gomatrixserverlib.MRoomMember},
|
||||
})
|
||||
if err != nil {
|
||||
return stateEvents, err
|
||||
}
|
||||
// cache the membership events
|
||||
for _, membership := range memberships {
|
||||
p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, membership.Sender(), membership.EventID())
|
||||
}
|
||||
stateEvents = append(newStateEvents, memberships...)
|
||||
return stateEvents, nil
|
||||
}
|
||||
|
||||
// addIgnoredUsersToFilter adds ignored users to the eventfilter and
|
||||
// the syncreq itself for further use in streams.
|
||||
func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error {
|
||||
|
@ -27,12 +27,12 @@ type Streams struct {
|
||||
func NewSyncStreamProviders(
|
||||
d storage.Database, userAPI userapi.UserInternalAPI,
|
||||
rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI,
|
||||
eduCache *caching.EDUCache, notifier *notifier.Notifier,
|
||||
eduCache *caching.EDUCache, lazyLoadCache *caching.LazyLoadCache, notifier *notifier.Notifier,
|
||||
) *Streams {
|
||||
streams := &Streams{
|
||||
PDUStreamProvider: &PDUStreamProvider{
|
||||
StreamProvider: StreamProvider{DB: d},
|
||||
userAPI: userAPI,
|
||||
lazyLoadCache: lazyLoadCache,
|
||||
},
|
||||
TypingStreamProvider: &TypingStreamProvider{
|
||||
StreamProvider: StreamProvider{DB: d},
|
||||
|
@ -57,8 +57,12 @@ func AddPublicRoutes(
|
||||
}
|
||||
|
||||
eduCache := caching.NewTypingCache()
|
||||
lazyLoadCache, err := caching.NewLazyLoadCache()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panicf("failed to create lazy loading cache")
|
||||
}
|
||||
notifier := notifier.NewNotifier()
|
||||
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, notifier)
|
||||
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, lazyLoadCache, notifier)
|
||||
notifier.SetCurrentPosition(streams.Latest(context.Background()))
|
||||
if err = notifier.Load(context.Background(), syncDB); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to load notifier ")
|
||||
|
@ -699,4 +699,13 @@ Ignore invite in full sync
|
||||
Ignore invite in incremental sync
|
||||
A filtered timeline reaches its limit
|
||||
A change to displayname should not result in a full state sync
|
||||
Can fetch images in room
|
||||
Can fetch images in room
|
||||
The only membership state included in an initial sync is for all the senders in the timeline
|
||||
The only membership state included in an incremental sync is for senders in the timeline
|
||||
Old members are included in gappy incr LL sync if they start speaking
|
||||
We do send redundant membership state across incremental syncs if asked
|
||||
Rejecting invite over federation doesn't break incremental /sync
|
||||
Gapped incremental syncs include all state changes
|
||||
Old leaves are present in gapped incremental syncs
|
||||
Leaves are present in non-gapped incremental syncs
|
||||
Members from the gap are included in gappy incr LL sync
|
Loading…
Reference in New Issue
Block a user