diff --git a/syncapi/streams/stream_sendtodevice.go b/syncapi/streams/stream_sendtodevice.go index 6a18df50..31c6187c 100644 --- a/syncapi/streams/stream_sendtodevice.go +++ b/syncapi/streams/stream_sendtodevice.go @@ -39,21 +39,13 @@ func (p *SendToDeviceStreamProvider) IncrementalSync( return from } - if len(events) > 0 { - // Clean up old send-to-device messages from before this stream position. - if err := p.DB.CleanSendToDeviceUpdates(req.Context, req.Device.UserID, req.Device.ID, from); err != nil { - req.Log.WithError(err).Error("p.DB.CleanSendToDeviceUpdates failed") - return from - } - - // Add the updates into the sync response. - for _, event := range events { - // skip ignored user events - if _, ok := req.IgnoredUsers.List[event.Sender]; ok { - continue - } - req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent) + // Add the updates into the sync response. + for _, event := range events { + // skip ignored user events + if _, ok := req.IgnoredUsers.List[event.Sender]; ok { + continue } + req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent) } return lastPos diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index b6b4779a..d908a962 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -25,6 +25,11 @@ import ( "sync" "time" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "github.com/matrix-org/dendrite/clientapi/jsonerror" keyapi "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" @@ -35,10 +40,6 @@ import ( "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" ) // RequestPool manages HTTP long-poll connections for /sync @@ -251,6 +252,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. waitingSyncRequests.Inc() defer waitingSyncRequests.Dec() + // Clean up old send-to-device messages from before this stream position. + // This is needed to avoid sending the same message multiple times + if err = rp.db.CleanSendToDeviceUpdates(syncReq.Context, syncReq.Device.UserID, syncReq.Device.ID, syncReq.Since.SendToDevicePosition); err != nil { + syncReq.Log.WithError(err).Error("p.DB.CleanSendToDeviceUpdates failed") + } + // loop until we get some data for { startTime := time.Now() diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 159fa08b..39b085d9 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -21,9 +21,10 @@ import ( "strconv" "strings" - "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/tidwall/gjson" + + "github.com/matrix-org/dendrite/roomserver/api" ) var ( @@ -330,23 +331,23 @@ type Response struct { NextBatch StreamingToken `json:"next_batch"` AccountData struct { Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"` - } `json:"account_data"` + } `json:"account_data,omitempty"` Presence struct { Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"` - } `json:"presence"` + } `json:"presence,omitempty"` Rooms struct { - Join map[string]JoinResponse `json:"join"` - Peek map[string]JoinResponse `json:"peek"` - Invite map[string]InviteResponse `json:"invite"` - Leave map[string]LeaveResponse `json:"leave"` - } `json:"rooms"` + Join map[string]JoinResponse `json:"join,omitempty"` + Peek map[string]JoinResponse `json:"peek,omitempty"` + Invite map[string]InviteResponse `json:"invite,omitempty"` + Leave map[string]LeaveResponse `json:"leave,omitempty"` + } `json:"rooms,omitempty"` ToDevice struct { - Events []gomatrixserverlib.SendToDeviceEvent `json:"events"` - } `json:"to_device"` + Events []gomatrixserverlib.SendToDeviceEvent `json:"events,omitempty"` + } `json:"to_device,omitempty"` DeviceLists struct { Changed []string `json:"changed,omitempty"` Left []string `json:"left,omitempty"` - } `json:"device_lists"` + } `json:"device_lists,omitempty"` DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"` }