Update getting pushrules, add tests, tweak pushrules (#2705)

This PR
- adds tests for `evaluatePushrules`
- removes the need for the UserAPI on the `OutputStreamEventConsumer`
(for easier testing)
- adds a method to get the pushrules from the database
- adds a new default pushrule for `m.reaction` events (and some other
tweaks)
This commit is contained in:
Till 2022-09-09 13:56:33 +02:00 committed by GitHub
parent 42a82091a8
commit 64472d9aab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 255 additions and 82 deletions

View File

@ -16,6 +16,12 @@ func mRuleContainsUserNameDefinition(localpart string) *Rule {
Default: true, Default: true,
Enabled: true, Enabled: true,
Pattern: localpart, Pattern: localpart,
Conditions: []*Condition{
{
Kind: EventMatchCondition,
Key: "content.body",
},
},
Actions: []*Action{ Actions: []*Action{
{Kind: NotifyAction}, {Kind: NotifyAction},
{ {

View File

@ -7,8 +7,9 @@ func defaultOverrideRules(userID string) []*Rule {
mRuleInviteForMeDefinition(userID), mRuleInviteForMeDefinition(userID),
&mRuleMemberEventDefinition, &mRuleMemberEventDefinition,
&mRuleContainsDisplayNameDefinition, &mRuleContainsDisplayNameDefinition,
&mRuleTombstoneDefinition,
&mRuleRoomNotifDefinition, &mRuleRoomNotifDefinition,
&mRuleTombstoneDefinition,
&mRuleReactionDefinition,
} }
} }
@ -20,6 +21,7 @@ const (
MRuleContainsDisplayName = ".m.rule.contains_display_name" MRuleContainsDisplayName = ".m.rule.contains_display_name"
MRuleTombstone = ".m.rule.tombstone" MRuleTombstone = ".m.rule.tombstone"
MRuleRoomNotif = ".m.rule.roomnotif" MRuleRoomNotif = ".m.rule.roomnotif"
MRuleReaction = ".m.rule.reaction"
) )
var ( var (
@ -96,7 +98,7 @@ var (
{ {
Kind: SetTweakAction, Kind: SetTweakAction,
Tweak: HighlightTweak, Tweak: HighlightTweak,
Value: false, Value: true,
}, },
}, },
} }
@ -120,10 +122,25 @@ var (
{ {
Kind: SetTweakAction, Kind: SetTweakAction,
Tweak: HighlightTweak, Tweak: HighlightTweak,
Value: false, Value: true,
}, },
}, },
} }
mRuleReactionDefinition = Rule{
RuleID: MRuleReaction,
Default: true,
Enabled: true,
Conditions: []*Condition{
{
Kind: EventMatchCondition,
Key: "type",
Pattern: "m.reaction",
},
},
Actions: []*Action{
{Kind: DontNotifyAction},
},
}
) )
func mRuleInviteForMeDefinition(userID string) *Rule { func mRuleInviteForMeDefinition(userID string) *Rule {

View File

@ -10,8 +10,8 @@ const (
var defaultUnderrideRules = []*Rule{ var defaultUnderrideRules = []*Rule{
&mRuleCallDefinition, &mRuleCallDefinition,
&mRuleEncryptedRoomOneToOneDefinition,
&mRuleRoomOneToOneDefinition, &mRuleRoomOneToOneDefinition,
&mRuleEncryptedRoomOneToOneDefinition,
&mRuleMessageDefinition, &mRuleMessageDefinition,
&mRuleEncryptedDefinition, &mRuleEncryptedDefinition,
} }
@ -59,6 +59,11 @@ var (
}, },
Actions: []*Action{ Actions: []*Action{
{Kind: NotifyAction}, {Kind: NotifyAction},
{
Kind: SetTweakAction,
Tweak: SoundTweak,
Value: "default",
},
{ {
Kind: SetTweakAction, Kind: SetTweakAction,
Tweak: HighlightTweak, Tweak: HighlightTweak,
@ -88,6 +93,11 @@ var (
Tweak: HighlightTweak, Tweak: HighlightTweak,
Value: false, Value: false,
}, },
{
Kind: SetTweakAction,
Tweak: HighlightTweak,
Value: false,
},
}, },
} }
mRuleMessageDefinition = Rule{ mRuleMessageDefinition = Rule{
@ -101,7 +111,14 @@ var (
Pattern: "m.room.message", Pattern: "m.room.message",
}, },
}, },
Actions: []*Action{{Kind: NotifyAction}}, Actions: []*Action{
{Kind: NotifyAction},
{
Kind: SetTweakAction,
Tweak: HighlightTweak,
Value: false,
},
},
} }
mRuleEncryptedDefinition = Rule{ mRuleEncryptedDefinition = Rule{
RuleID: MRuleEncrypted, RuleID: MRuleEncrypted,
@ -114,6 +131,13 @@ var (
Pattern: "m.room.encrypted", Pattern: "m.room.encrypted",
}, },
}, },
Actions: []*Action{{Kind: NotifyAction}}, Actions: []*Action{
{Kind: NotifyAction},
{
Kind: SetTweakAction,
Tweak: HighlightTweak,
Value: false,
},
},
} }
) )

View File

@ -24,24 +24,28 @@ func TestRuleSetEvaluatorMatchEvent(t *testing.T) {
Default: false, Default: false,
Enabled: true, Enabled: true,
} }
defaultRuleset := DefaultGlobalRuleSet("test", "test")
tsts := []struct { tsts := []struct {
Name string Name string
RuleSet RuleSet RuleSet RuleSet
Want *Rule Want *Rule
Event *gomatrixserverlib.Event
}{ }{
{"empty", RuleSet{}, nil}, {"empty", RuleSet{}, nil, ev},
{"defaultCanWin", RuleSet{Override: []*Rule{defaultEnabled}}, defaultEnabled}, {"defaultCanWin", RuleSet{Override: []*Rule{defaultEnabled}}, defaultEnabled, ev},
{"userWins", RuleSet{Override: []*Rule{defaultEnabled, userEnabled}}, userEnabled}, {"userWins", RuleSet{Override: []*Rule{defaultEnabled, userEnabled}}, userEnabled, ev},
{"defaultOverrideWins", RuleSet{Override: []*Rule{defaultEnabled}, Underride: []*Rule{userEnabled}}, defaultEnabled}, {"defaultOverrideWins", RuleSet{Override: []*Rule{defaultEnabled}, Underride: []*Rule{userEnabled}}, defaultEnabled, ev},
{"overrideContent", RuleSet{Override: []*Rule{userEnabled}, Content: []*Rule{userEnabled2}}, userEnabled}, {"overrideContent", RuleSet{Override: []*Rule{userEnabled}, Content: []*Rule{userEnabled2}}, userEnabled, ev},
{"overrideRoom", RuleSet{Override: []*Rule{userEnabled}, Room: []*Rule{userEnabled2}}, userEnabled}, {"overrideRoom", RuleSet{Override: []*Rule{userEnabled}, Room: []*Rule{userEnabled2}}, userEnabled, ev},
{"overrideSender", RuleSet{Override: []*Rule{userEnabled}, Sender: []*Rule{userEnabled2}}, userEnabled}, {"overrideSender", RuleSet{Override: []*Rule{userEnabled}, Sender: []*Rule{userEnabled2}}, userEnabled, ev},
{"overrideUnderride", RuleSet{Override: []*Rule{userEnabled}, Underride: []*Rule{userEnabled2}}, userEnabled}, {"overrideUnderride", RuleSet{Override: []*Rule{userEnabled}, Underride: []*Rule{userEnabled2}}, userEnabled, ev},
{"reactions don't notify", *defaultRuleset, &mRuleReactionDefinition, mustEventFromJSON(t, `{"type":"m.reaction"}`)},
{"receipts don't notify", *defaultRuleset, nil, mustEventFromJSON(t, `{"type":"m.receipt"}`)},
} }
for _, tst := range tsts { for _, tst := range tsts {
t.Run(tst.Name, func(t *testing.T) { t.Run(tst.Name, func(t *testing.T) {
rse := NewRuleSetEvaluator(nil, &tst.RuleSet) rse := NewRuleSetEvaluator(fakeEvaluationContext{3}, &tst.RuleSet)
got, err := rse.MatchEvent(ev) got, err := rse.MatchEvent(tst.Event)
if err != nil { if err != nil {
t.Fatalf("MatchEvent failed: %v", err) t.Fatalf("MatchEvent failed: %v", err)
} }
@ -128,7 +132,7 @@ func TestConditionMatches(t *testing.T) {
} }
for _, tst := range tsts { for _, tst := range tsts {
t.Run(tst.Name, func(t *testing.T) { t.Run(tst.Name, func(t *testing.T) {
got, err := conditionMatches(&tst.Cond, mustEventFromJSON(t, tst.EventJSON), &fakeEvaluationContext{}) got, err := conditionMatches(&tst.Cond, mustEventFromJSON(t, tst.EventJSON), &fakeEvaluationContext{2})
if err != nil { if err != nil {
t.Fatalf("conditionMatches failed: %v", err) t.Fatalf("conditionMatches failed: %v", err)
} }
@ -139,10 +143,10 @@ func TestConditionMatches(t *testing.T) {
} }
} }
type fakeEvaluationContext struct{} type fakeEvaluationContext struct{ memberCount int }
func (fakeEvaluationContext) UserDisplayName() string { return "Dear User" } func (fakeEvaluationContext) UserDisplayName() string { return "Dear User" }
func (fakeEvaluationContext) RoomMemberCount() (int, error) { return 2, nil } func (f fakeEvaluationContext) RoomMemberCount() (int, error) { return f.memberCount, nil }
func (fakeEvaluationContext) HasPowerLevel(userID, levelKey string) (bool, error) { func (fakeEvaluationContext) HasPowerLevel(userID, levelKey string) (bool, error) {
return userID == "@poweruser:example.com" && levelKey == "powerlevel", nil return userID == "@poweruser:example.com" && levelKey == "powerlevel", nil
} }

View File

@ -11,7 +11,7 @@ import (
// kind and a tweaks map. Returns a nil map if it would have been // kind and a tweaks map. Returns a nil map if it would have been
// empty. // empty.
func ActionsToTweaks(as []*Action) (ActionKind, map[string]interface{}, error) { func ActionsToTweaks(as []*Action) (ActionKind, map[string]interface{}, error) {
var kind ActionKind kind := UnknownAction
tweaks := map[string]interface{}{} tweaks := map[string]interface{}{}
for _, a := range as { for _, a := range as {

View File

@ -29,7 +29,6 @@ import (
type OutputStreamEventConsumer struct { type OutputStreamEventConsumer struct {
ctx context.Context ctx context.Context
cfg *config.UserAPI cfg *config.UserAPI
userAPI api.UserInternalAPI
rsAPI rsapi.UserRoomserverAPI rsAPI rsapi.UserRoomserverAPI
jetstream nats.JetStreamContext jetstream nats.JetStreamContext
durable string durable string
@ -45,7 +44,6 @@ func NewOutputStreamEventConsumer(
js nats.JetStreamContext, js nats.JetStreamContext,
store storage.Database, store storage.Database,
pgClient pushgateway.Client, pgClient pushgateway.Client,
userAPI api.UserInternalAPI,
rsAPI rsapi.UserRoomserverAPI, rsAPI rsapi.UserRoomserverAPI,
syncProducer *producers.SyncAPI, syncProducer *producers.SyncAPI,
) *OutputStreamEventConsumer { ) *OutputStreamEventConsumer {
@ -57,7 +55,6 @@ func NewOutputStreamEventConsumer(
durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"), durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent),
pgClient: pgClient, pgClient: pgClient,
userAPI: userAPI,
rsAPI: rsAPI, rsAPI: rsAPI,
syncProducer: syncProducer, syncProducer: syncProducer,
} }
@ -305,7 +302,7 @@ func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *goma
"event_id": event.EventID(), "event_id": event.EventID(),
"room_id": event.RoomID(), "room_id": event.RoomID(),
"localpart": mem.Localpart, "localpart": mem.Localpart,
}).Tracef("Push rule evaluation rejected the event") }).Debugf("Push rule evaluation rejected the event")
return nil return nil
} }
@ -348,7 +345,7 @@ func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *goma
"localpart": mem.Localpart, "localpart": mem.Localpart,
"num_urls": len(devicesByURLAndFormat), "num_urls": len(devicesByURLAndFormat),
"num_unread": userNumUnreadNotifs, "num_unread": userNumUnreadNotifs,
}).Tracef("Notifying single member") }).Debugf("Notifying single member")
// Push gateways are out of our control, and we cannot risk // Push gateways are out of our control, and we cannot risk
// looking up the server on a misbehaving push gateway. Each user // looking up the server on a misbehaving push gateway. Each user
@ -422,8 +419,8 @@ func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event
return nil, fmt.Errorf("user %s is ignored", sender) return nil, fmt.Errorf("user %s is ignored", sender)
} }
} }
var res api.QueryPushRulesResponse ruleSets, err := s.db.QueryPushRules(ctx, mem.Localpart)
if err = s.userAPI.QueryPushRules(ctx, &api.QueryPushRulesRequest{UserID: mem.UserID}, &res); err != nil { if err != nil {
return nil, err return nil, err
} }
@ -434,7 +431,7 @@ func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event
roomID: event.RoomID(), roomID: event.RoomID(),
roomSize: roomSize, roomSize: roomSize,
} }
eval := pushrules.NewRuleSetEvaluator(ec, &res.RuleSets.Global) eval := pushrules.NewRuleSetEvaluator(ec, &ruleSets.Global)
rule, err := eval.MatchEvent(event.Event) rule, err := eval.MatchEvent(event.Event)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -0,0 +1,129 @@
package consumers
import (
"context"
"testing"
"github.com/matrix-org/gomatrixserverlib"
"github.com/stretchr/testify/assert"
"github.com/matrix-org/dendrite/internal/pushrules"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/userapi/storage"
)
func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
t.Helper()
connStr, close := test.PrepareDBConnectionString(t, dbType)
db, err := storage.NewUserAPIDatabase(nil, &config.DatabaseOptions{
ConnectionString: config.DataSource(connStr),
}, "", 4, 0, 0, "")
if err != nil {
t.Fatalf("failed to create new user db: %v", err)
}
return db, close
}
func mustCreateEvent(t *testing.T, content string) *gomatrixserverlib.HeaderedEvent {
t.Helper()
ev, err := gomatrixserverlib.NewEventFromTrustedJSON([]byte(content), false, gomatrixserverlib.RoomVersionV10)
if err != nil {
t.Fatalf("failed to create event: %v", err)
}
return ev.Headered(gomatrixserverlib.RoomVersionV10)
}
func Test_evaluatePushRules(t *testing.T) {
ctx := context.Background()
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
db, close := mustCreateDatabase(t, dbType)
defer close()
consumer := OutputStreamEventConsumer{db: db}
testCases := []struct {
name string
eventContent string
wantAction pushrules.ActionKind
wantActions []*pushrules.Action
wantNotify bool
}{
{
name: "m.receipt doesn't notify",
eventContent: `{"type":"m.receipt"}`,
wantAction: pushrules.UnknownAction,
wantActions: nil,
},
{
name: "m.reaction doesn't notify",
eventContent: `{"type":"m.reaction"}`,
wantAction: pushrules.DontNotifyAction,
wantActions: []*pushrules.Action{
{
Kind: pushrules.DontNotifyAction,
},
},
},
{
name: "m.room.message notifies",
eventContent: `{"type":"m.room.message"}`,
wantNotify: true,
wantAction: pushrules.NotifyAction,
wantActions: []*pushrules.Action{
{Kind: pushrules.NotifyAction},
{
Kind: pushrules.SetTweakAction,
Tweak: pushrules.HighlightTweak,
Value: false,
},
},
},
{
name: "m.room.message highlights",
eventContent: `{"type":"m.room.message", "content": {"body": "test"} }`,
wantNotify: true,
wantAction: pushrules.NotifyAction,
wantActions: []*pushrules.Action{
{Kind: pushrules.NotifyAction},
{
Kind: pushrules.SetTweakAction,
Tweak: pushrules.SoundTweak,
Value: "default",
},
{
Kind: pushrules.SetTweakAction,
Tweak: pushrules.HighlightTweak,
Value: true,
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actions, err := consumer.evaluatePushRules(ctx, mustCreateEvent(t, tc.eventContent), &localMembership{
UserID: "@test:localhost",
Localpart: "test",
Domain: "localhost",
}, 10)
if err != nil {
t.Fatalf("failed to evaluate push rules: %v", err)
}
assert.Equal(t, tc.wantActions, actions)
gotAction, _, err := pushrules.ActionsToTweaks(actions)
if err != nil {
t.Fatalf("failed to get actions: %v", err)
}
if gotAction != tc.wantAction {
t.Fatalf("expected action to be '%s', got '%s'", tc.wantAction, gotAction)
}
// this is taken from `notifyLocal`
if tc.wantNotify && gotAction != pushrules.NotifyAction && gotAction != pushrules.CoalesceAction {
t.Fatalf("expected to notify but didn't")
}
})
}
})
}

View File

@ -30,7 +30,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/userutil" "github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/pushrules"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api"
@ -760,57 +759,15 @@ func (a *UserInternalAPI) PerformPushRulesPut(
} }
func (a *UserInternalAPI) QueryPushRules(ctx context.Context, req *api.QueryPushRulesRequest, res *api.QueryPushRulesResponse) error { func (a *UserInternalAPI) QueryPushRules(ctx context.Context, req *api.QueryPushRulesRequest, res *api.QueryPushRulesResponse) error {
userReq := api.QueryAccountDataRequest{ localpart, _, err := gomatrixserverlib.SplitID('@', req.UserID)
UserID: req.UserID, if err != nil {
DataType: pushRulesAccountDataType, return fmt.Errorf("failed to split user ID %q for push rules", req.UserID)
} }
var userRes api.QueryAccountDataResponse pushRules, err := a.DB.QueryPushRules(ctx, localpart)
if err := a.QueryAccountData(ctx, &userReq, &userRes); err != nil { if err != nil {
return err return fmt.Errorf("failed to query push rules: %w", err)
} }
bs, ok := userRes.GlobalAccountData[pushRulesAccountDataType] res.RuleSets = pushRules
if ok {
// Legacy Dendrite users will have completely empty push rules, so we should
// detect that situation and set some defaults.
var rules struct {
G struct {
Content []json.RawMessage `json:"content"`
Override []json.RawMessage `json:"override"`
Room []json.RawMessage `json:"room"`
Sender []json.RawMessage `json:"sender"`
Underride []json.RawMessage `json:"underride"`
} `json:"global"`
}
if err := json.Unmarshal([]byte(bs), &rules); err == nil {
count := len(rules.G.Content) + len(rules.G.Override) +
len(rules.G.Room) + len(rules.G.Sender) + len(rules.G.Underride)
ok = count > 0
}
}
if !ok {
// If we didn't find any default push rules then we should just generate some
// fresh ones.
localpart, _, err := gomatrixserverlib.SplitID('@', req.UserID)
if err != nil {
return fmt.Errorf("failed to split user ID %q for push rules", req.UserID)
}
pushRuleSets := pushrules.DefaultAccountRuleSets(localpart, a.ServerName)
prbs, err := json.Marshal(pushRuleSets)
if err != nil {
return fmt.Errorf("failed to marshal default push rules: %w", err)
}
if err := a.DB.SaveAccountData(ctx, localpart, "", pushRulesAccountDataType, json.RawMessage(prbs)); err != nil {
return fmt.Errorf("failed to save default push rules: %w", err)
}
res.RuleSets = pushRuleSets
return nil
}
var data pushrules.AccountRuleSets
if err := json.Unmarshal([]byte(bs), &data); err != nil {
util.GetLogger(ctx).WithError(err).Error("json.Unmarshal of push rules failed")
return err
}
res.RuleSets = &data
return nil return nil
} }

View File

@ -20,6 +20,7 @@ import (
"errors" "errors"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/internal/pushrules"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/tables" "github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/dendrite/userapi/types" "github.com/matrix-org/dendrite/userapi/types"
@ -53,6 +54,7 @@ type AccountData interface {
// If no account data could be found, returns nil // If no account data could be found, returns nil
// Returns an error if there was an issue with the retrieval // Returns an error if there was an issue with the retrieval
GetAccountDataByType(ctx context.Context, localpart, roomID, dataType string) (data json.RawMessage, err error) GetAccountDataByType(ctx context.Context, localpart, roomID, dataType string) (data json.RawMessage, err error)
QueryPushRules(ctx context.Context, localpart string) (*pushrules.AccountRuleSets, error)
} }
type Device interface { type Device interface {

View File

@ -26,10 +26,11 @@ import (
"strings" "strings"
"time" "time"
"github.com/matrix-org/dendrite/userapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"golang.org/x/crypto/bcrypt" "golang.org/x/crypto/bcrypt"
"github.com/matrix-org/dendrite/userapi/types"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/dendrite/internal/pushrules"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
@ -177,6 +178,41 @@ func (d *Database) createAccount(
return account, nil return account, nil
} }
func (d *Database) QueryPushRules(
ctx context.Context,
localpart string,
) (*pushrules.AccountRuleSets, error) {
data, err := d.AccountDatas.SelectAccountDataByType(ctx, localpart, "", "m.push_rules")
if err != nil {
return nil, err
}
// If we didn't find any default push rules then we should just generate some
// fresh ones.
if len(data) == 0 {
pushRuleSets := pushrules.DefaultAccountRuleSets(localpart, d.ServerName)
prbs, err := json.Marshal(pushRuleSets)
if err != nil {
return nil, fmt.Errorf("failed to marshal default push rules: %w", err)
}
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
if dbErr := d.AccountDatas.InsertAccountData(ctx, txn, localpart, "", "m.push_rules", prbs); dbErr != nil {
return fmt.Errorf("failed to save default push rules: %w", dbErr)
}
return nil
})
return pushRuleSets, err
}
var pushRules pushrules.AccountRuleSets
if err := json.Unmarshal(data, &pushRules); err != nil {
return nil, err
}
return &pushRules, nil
}
// SaveAccountData saves new account data for a given user and a given room. // SaveAccountData saves new account data for a given user and a given room.
// If the account data is not specific to a room, the room ID should be an empty string // If the account data is not specific to a room, the room ID should be an empty string
// If an account data already exists for a given set (user, room, data type), it will // If an account data already exists for a given set (user, room, data type), it will

View File

@ -18,6 +18,8 @@ import (
"time" "time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/pushgateway"
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api"
@ -31,7 +33,6 @@ import (
"github.com/matrix-org/dendrite/userapi/producers" "github.com/matrix-org/dendrite/userapi/producers"
"github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/dendrite/userapi/util" "github.com/matrix-org/dendrite/userapi/util"
"github.com/sirupsen/logrus"
) )
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions // AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
@ -90,7 +91,7 @@ func NewInternalAPI(
} }
eventConsumer := consumers.NewOutputStreamEventConsumer( eventConsumer := consumers.NewOutputStreamEventConsumer(
base.ProcessContext, cfg, js, db, pgClient, userAPI, rsAPI, syncProducer, base.ProcessContext, cfg, js, db, pgClient, rsAPI, syncProducer,
) )
if err := eventConsumer.Start(); err != nil { if err := eventConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start user API streamed event consumer") logrus.WithError(err).Panic("failed to start user API streamed event consumer")