diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 55735560..080d4d9f 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -48,7 +48,6 @@ func AddPublicRoutes( syncProducer := &producers.SyncAPIProducer{ JetStream: js, - TopicClientData: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go index 0ac63779..5933ce1a 100644 --- a/clientapi/producers/syncapi.go +++ b/clientapi/producers/syncapi.go @@ -21,7 +21,6 @@ import ( "strconv" "time" - "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -32,7 +31,6 @@ import ( // SyncAPIProducer produces events for the sync API server to consume type SyncAPIProducer struct { - TopicClientData string TopicReceiptEvent string TopicSendToDeviceEvent string TopicTypingEvent string @@ -42,36 +40,6 @@ type SyncAPIProducer struct { UserAPI userapi.ClientUserAPI } -// SendData sends account data to the sync API server -func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON, ignoredUsers *types.IgnoredUsers) error { - m := &nats.Msg{ - Subject: p.TopicClientData, - Header: nats.Header{}, - } - m.Header.Set(jetstream.UserID, userID) - - data := eventutil.AccountData{ - RoomID: roomID, - Type: dataType, - ReadMarker: readMarker, - IgnoredUsers: ignoredUsers, - } - var err error - m.Data, err = json.Marshal(data) - if err != nil { - return err - } - - log.WithFields(log.Fields{ - "user_id": userID, - "room_id": roomID, - "data_type": dataType, - }).Tracef("Producing to topic '%s'", p.TopicClientData) - - _, err = p.JetStream.PublishMsg(m) - return err -} - func (p *SyncAPIProducer) SendReceipt( ctx context.Context, userID, roomID, eventID, receiptType string, timestamp gomatrixserverlib.Timestamp, diff --git a/clientapi/routing/account_data.go b/clientapi/routing/account_data.go index a5a3014a..0d3a4949 100644 --- a/clientapi/routing/account_data.go +++ b/clientapi/routing/account_data.go @@ -25,7 +25,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal/eventutil" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/util" @@ -127,18 +126,6 @@ func SaveAccountData( return util.ErrorResponse(err) } - var ignoredUsers *types.IgnoredUsers - if dataType == "m.ignored_user_list" { - ignoredUsers = &types.IgnoredUsers{} - _ = json.Unmarshal(body, ignoredUsers) - } - - // TODO: user API should do this since it's account data - if err := syncProducer.SendData(userID, roomID, dataType, nil, ignoredUsers); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed") - return jsonerror.InternalServerError() - } - return util.JSONResponse{ Code: http.StatusOK, JSON: struct{}{}, @@ -191,11 +178,6 @@ func SaveReadMarker( return util.ErrorResponse(err) } - if err := syncProducer.SendData(device.UserID, roomID, "m.fully_read", &r, nil); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed") - return jsonerror.InternalServerError() - } - // Handle the read receipt that may be included in the read marker if r.Read != "" { return SetReceipt(req, syncProducer, device, roomID, "m.read", r.Read) diff --git a/clientapi/routing/room_tagging.go b/clientapi/routing/room_tagging.go index 03928956..92b9e665 100644 --- a/clientapi/routing/room_tagging.go +++ b/clientapi/routing/room_tagging.go @@ -18,8 +18,6 @@ import ( "encoding/json" "net/http" - "github.com/sirupsen/logrus" - "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" @@ -98,10 +96,6 @@ func PutTag( return jsonerror.InternalServerError() } - if err = syncProducer.SendData(userID, roomID, "m.tag", nil, nil); err != nil { - logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi") - } - return util.JSONResponse{ Code: http.StatusOK, JSON: struct{}{}, @@ -150,11 +144,6 @@ func DeleteTag( return jsonerror.InternalServerError() } - // TODO: user API should do this since it's account data - if err := syncProducer.SendData(userID, roomID, "m.tag", nil, nil); err != nil { - logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi") - } - return util.JSONResponse{ Code: http.StatusOK, JSON: struct{}{}, diff --git a/userapi/internal/api.go b/userapi/internal/api.go index 27ed15a0..422eb076 100644 --- a/userapi/internal/api.go +++ b/userapi/internal/api.go @@ -30,11 +30,13 @@ import ( "github.com/matrix-org/dendrite/appservice/types" "github.com/matrix-org/dendrite/clientapi/userutil" + "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/dendrite/internal/sqlutil" keyapi "github.com/matrix-org/dendrite/keyserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + synctypes "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/producers" "github.com/matrix-org/dendrite/userapi/storage" @@ -64,7 +66,24 @@ func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAc if req.DataType == "" { return fmt.Errorf("data type must not be empty") } - return a.DB.SaveAccountData(ctx, local, req.RoomID, req.DataType, req.AccountData) + if err := a.DB.SaveAccountData(ctx, local, req.RoomID, req.DataType, req.AccountData); err != nil { + util.GetLogger(ctx).WithError(err).Error("a.DB.SaveAccountData failed") + return fmt.Errorf("failed to save account data: %w", err) + } + var ignoredUsers *synctypes.IgnoredUsers + if req.DataType == "m.ignored_user_list" { + ignoredUsers = &synctypes.IgnoredUsers{} + _ = json.Unmarshal(req.AccountData, ignoredUsers) + } + if err := a.SyncProducer.SendAccountData(req.UserID, eventutil.AccountData{ + RoomID: req.RoomID, + Type: req.DataType, + IgnoredUsers: ignoredUsers, + }); err != nil { + util.GetLogger(ctx).WithError(err).Error("a.SyncProducer.SendAccountData failed") + return fmt.Errorf("failed to send account data to output: %w", err) + } + return nil } func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.PerformAccountCreationRequest, res *api.PerformAccountCreationResponse) error { @@ -93,7 +112,9 @@ func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.P } // Inform the SyncAPI about the newly created push_rules - if err = a.SyncProducer.SendAccountData(acc.UserID, "", "m.push_rules"); err != nil { + if err = a.SyncProducer.SendAccountData(acc.UserID, eventutil.AccountData{ + Type: "m.push_rules", + }); err != nil { util.GetLogger(ctx).WithFields(logrus.Fields{ "user_id": acc.UserID, }).WithError(err).Warn("failed to send account data to the SyncAPI") @@ -732,11 +753,11 @@ func (a *UserInternalAPI) PerformPushRulesPut( if err := a.InputAccountData(ctx, &userReq, &userRes); err != nil { return err } - - if err := a.SyncProducer.SendAccountData(req.UserID, "" /* roomID */, pushRulesAccountDataType); err != nil { + if err := a.SyncProducer.SendAccountData(req.UserID, eventutil.AccountData{ + Type: pushRulesAccountDataType, + }); err != nil { util.GetLogger(ctx).WithError(err).Errorf("syncProducer.SendData failed") } - return nil } diff --git a/userapi/producers/syncapi.go b/userapi/producers/syncapi.go index 4a206f33..27cfc284 100644 --- a/userapi/producers/syncapi.go +++ b/userapi/producers/syncapi.go @@ -34,7 +34,7 @@ func NewSyncAPI(db storage.Database, js JetStreamPublisher, clientDataTopic stri } // SendAccountData sends account data to the Sync API server. -func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string) error { +func (p *SyncAPI) SendAccountData(userID string, data eventutil.AccountData) error { m := &nats.Msg{ Subject: p.clientDataTopic, Header: nats.Header{}, @@ -42,18 +42,15 @@ func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string) m.Header.Set(jetstream.UserID, userID) var err error - m.Data, err = json.Marshal(eventutil.AccountData{ - RoomID: roomID, - Type: dataType, - }) + m.Data, err = json.Marshal(data) if err != nil { return err } log.WithFields(log.Fields{ "user_id": userID, - "room_id": roomID, - "data_type": dataType, + "room_id": data.RoomID, + "data_type": data.Type, }).Tracef("Producing to topic '%s'", p.clientDataTopic) _, err = p.producer.PublishMsg(m)