Split out notifying /sync requests and calculating sync responses (#96)

* Split out notifying /sync requests and calculating sync responses

The logic for notifying /sync requests is about to get really
complicated as we optimise when to wake up requests, so split
out that code into a separate struct to isolate it and make
it easier to unit test.
This commit is contained in:
Kegsay 2017-05-10 10:42:00 +01:00 committed by GitHub
parent e226d564ec
commit 04f3c154b8
4 changed files with 82 additions and 47 deletions

View File

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/routing" "github.com/matrix-org/dendrite/syncapi/routing"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
yaml "gopkg.in/yaml.v2" yaml "gopkg.in/yaml.v2"
@ -71,12 +72,13 @@ func main() {
log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err) log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err)
} }
rp, err := sync.NewRequestPool(db) pos, err := db.SyncStreamPosition()
if err != nil { if err != nil {
log.Panicf("startup: Failed to create request pool : %s", err) log.Panicf("startup: failed to get latest sync stream position : %s", err)
} }
server, err := consumers.NewServer(cfg, rp, db) n := sync.NewNotifier(types.StreamPosition(pos))
server, err := consumers.NewServer(cfg, n, db)
if err != nil { if err != nil {
log.Panicf("startup: failed to create sync server: %s", err) log.Panicf("startup: failed to create sync server: %s", err)
} }
@ -85,6 +87,6 @@ func main() {
} }
log.Info("Starting sync server on ", *bindAddr) log.Info("Starting sync server on ", *bindAddr)
routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg, rp) routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg, sync.NewRequestPool(db, n))
log.Fatal(http.ListenAndServe(*bindAddr, nil)) log.Fatal(http.ListenAndServe(*bindAddr, nil))
} }

View File

@ -32,11 +32,11 @@ import (
type Server struct { type Server struct {
roomServerConsumer *common.ContinualConsumer roomServerConsumer *common.ContinualConsumer
db *storage.SyncServerDatabase db *storage.SyncServerDatabase
rp *sync.RequestPool notifier *sync.Notifier
} }
// NewServer creates a new sync server. Call Start() to begin consuming from room servers. // NewServer creates a new sync server. Call Start() to begin consuming from room servers.
func NewServer(cfg *config.Sync, rp *sync.RequestPool, store *storage.SyncServerDatabase) (*Server, error) { func NewServer(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*Server, error) {
kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -50,7 +50,7 @@ func NewServer(cfg *config.Sync, rp *sync.RequestPool, store *storage.SyncServer
s := &Server{ s := &Server{
roomServerConsumer: &consumer, roomServerConsumer: &consumer,
db: store, db: store,
rp: rp, notifier: n,
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
@ -96,7 +96,7 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error {
}).Panicf("roomserver output log: write event failure") }).Panicf("roomserver output log: write event failure")
return nil return nil
} }
s.rp.OnNewEvent(&ev, types.StreamPosition(syncStreamPos)) s.notifier.OnNewEvent(&ev, types.StreamPosition(syncStreamPos))
return nil return nil
} }

View File

@ -0,0 +1,67 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sync
import (
"sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
// Notifier will wake up sleeping requests in the request pool when there
// is some new data. It does not tell requests what that data is, only the
// stream position which they can use to get at it.
type Notifier struct {
// The latest sync stream position: guarded by 'cond'.
currPos types.StreamPosition
// A condition variable to notify all waiting goroutines of a new sync stream position
cond *sync.Cond
}
// NewNotifier creates a new notifier set to the given stream position.
func NewNotifier(pos types.StreamPosition) *Notifier {
return &Notifier{
pos,
sync.NewCond(&sync.Mutex{}),
}
}
// OnNewEvent is called when a new event is received from the room server. Must only be
// called from a single goroutine, to avoid races between updates which could set the
// current position in the stream incorrectly.
func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) {
// update the current position in a guard and then notify all /sync streams
n.cond.L.Lock()
n.currPos = pos
n.cond.L.Unlock()
n.cond.Broadcast() // notify ALL waiting goroutines
}
// WaitForEvents blocks until there are new events for this request.
func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition {
// In a guard, check if the /sync request should block, and block it until we get a new position
n.cond.L.Lock()
currentPos := n.currPos
for req.since == currentPos {
// we need to wait for a new event.
// TODO: This waits for ANY new event, we need to only wait for events which we care about.
n.cond.Wait() // atomically unlocks and blocks goroutine, then re-acquires lock on unblock
currentPos = n.currPos
}
n.cond.L.Unlock()
return currentPos
}

View File

@ -16,7 +16,6 @@ package sync
import ( import (
"net/http" "net/http"
"sync"
"time" "time"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
@ -32,19 +31,12 @@ import (
// RequestPool manages HTTP long-poll connections for /sync // RequestPool manages HTTP long-poll connections for /sync
type RequestPool struct { type RequestPool struct {
db *storage.SyncServerDatabase db *storage.SyncServerDatabase
// The latest sync stream position: guarded by 'cond'. notifier *Notifier
currPos types.StreamPosition
// A condition variable to notify all waiting goroutines of a new sync stream position
cond *sync.Cond
} }
// NewRequestPool makes a new RequestPool // NewRequestPool makes a new RequestPool
func NewRequestPool(db *storage.SyncServerDatabase) (*RequestPool, error) { func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier) *RequestPool {
pos, err := db.SyncStreamPosition() return &RequestPool{db, n}
if err != nil {
return nil, err
}
return &RequestPool{db, types.StreamPosition(pos), sync.NewCond(&sync.Mutex{})}, nil
} }
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
@ -106,34 +98,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
} }
} }
// OnNewEvent is called when a new event is received from the room server. Must only be
// called from a single goroutine, to avoid races between updates which could set the
// current position in the stream incorrectly.
func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) {
// update the current position in a guard and then notify all /sync streams
rp.cond.L.Lock()
rp.currPos = pos
rp.cond.L.Unlock()
rp.cond.Broadcast() // notify ALL waiting goroutines
}
func (rp *RequestPool) waitForEvents(req syncRequest) types.StreamPosition {
// In a guard, check if the /sync request should block, and block it until we get a new position
rp.cond.L.Lock()
currentPos := rp.currPos
for req.since == currentPos {
// we need to wait for a new event.
// TODO: This waits for ANY new event, we need to only wait for events which we care about.
rp.cond.Wait() // atomically unlocks and blocks goroutine, then re-acquires lock on unblock
currentPos = rp.currPos
}
rp.cond.L.Unlock()
return currentPos
}
func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) { func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) {
currentPos := rp.waitForEvents(req) currentPos := rp.notifier.WaitForEvents(req)
if req.since == types.StreamPosition(0) { if req.since == types.StreamPosition(0) {
pos, data, err := rp.db.CompleteSync(req.userID, req.limit) pos, data, err := rp.db.CompleteSync(req.userID, req.limit)