diff --git a/syncapi/notifier/notifier_test.go b/syncapi/notifier/notifier_test.go index 2273ad76..b0631371 100644 --- a/syncapi/notifier/notifier_test.go +++ b/syncapi/notifier/notifier_test.go @@ -165,9 +165,9 @@ func TestCorrectStreamWakeup(t *testing.T) { go func() { select { - case <-streamone.signalChannel: + case <-streamone.ch(): awoken <- "one" - case <-streamtwo.signalChannel: + case <-streamtwo.ch(): awoken <- "two" } }() diff --git a/syncapi/notifier/userstream.go b/syncapi/notifier/userstream.go index 720185d5..bcd69fad 100644 --- a/syncapi/notifier/userstream.go +++ b/syncapi/notifier/userstream.go @@ -118,6 +118,12 @@ func (s *UserDeviceStream) TimeOfLastNonEmpty() time.Time { return s.timeOfLastChannel } +func (s *UserDeviceStream) ch() <-chan struct{} { + s.lock.Lock() + defer s.lock.Unlock() + return s.signalChannel +} + // GetSyncPosition returns last sync position which the UserStream was // notified about func (s *UserDeviceStreamListener) GetSyncPosition() types.StreamingToken {