From 989795973103c463a33f053663c6a8616177186c Mon Sep 17 00:00:00 2001 From: Alex Date: Sat, 27 Jul 2024 22:30:17 +0200 Subject: [PATCH] Fix: Edited messages appear twice in fulltext search (#3363) As stated in https://github.com/matrix-org/dendrite/issues/3358 the search response contains both original and edited message. This PR fixes it by removing of the original message from the fulltext index after indexing the edit message event. I also made some cosmetic changes/fixes i found in the code Signed-off-by: `Alexander Dubovikov ` --- clientapi/routing/admin.go | 4 +- clientapi/routing/pushrules.go | 2 +- clientapi/routing/sendevent_test.go | 2 +- syncapi/consumers/roomserver.go | 18 ++++++ syncapi/syncapi_test.go | 92 +++++++++++++++++++++++++++++ test/testrig/base.go | 1 + 6 files changed, 115 insertions(+), 4 deletions(-) diff --git a/clientapi/routing/admin.go b/clientapi/routing/admin.go index 68e62b08..73a0afc3 100644 --- a/clientapi/routing/admin.go +++ b/clientapi/routing/admin.go @@ -328,7 +328,7 @@ func AdminPurgeRoom(req *http.Request, rsAPI roomserverAPI.ClientRoomserverAPI) } } -func AdminResetPassword(req *http.Request, cfg *config.ClientAPI, device *api.Device, userAPI api.ClientUserAPI) util.JSONResponse { +func AdminResetPassword(req *http.Request, cfg *config.ClientAPI, device *api.Device, userAPI userapi.ClientUserAPI) util.JSONResponse { if req.Body == nil { return util.JSONResponse{ Code: http.StatusBadRequest, @@ -423,7 +423,7 @@ func AdminReindex(req *http.Request, cfg *config.ClientAPI, device *api.Device, } } -func AdminMarkAsStale(req *http.Request, cfg *config.ClientAPI, keyAPI api.ClientKeyAPI) util.JSONResponse { +func AdminMarkAsStale(req *http.Request, cfg *config.ClientAPI, keyAPI userapi.ClientKeyAPI) util.JSONResponse { vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) if err != nil { return util.ErrorResponse(err) diff --git a/clientapi/routing/pushrules.go b/clientapi/routing/pushrules.go index 74873d5c..43c034f9 100644 --- a/clientapi/routing/pushrules.go +++ b/clientapi/routing/pushrules.go @@ -70,7 +70,7 @@ func GetPushRulesByKind(ctx context.Context, scope, kind string, device *userapi } rulesPtr := pushRuleSetKindPointer(ruleSet, pushrules.Kind(kind)) // Even if rulesPtr is not nil, there may not be any rules for this kind - if rulesPtr == nil || (rulesPtr != nil && len(*rulesPtr) == 0) { + if rulesPtr == nil || len(*rulesPtr) == 0 { return errorResponse(ctx, spec.InvalidParam("invalid push rules kind"), "pushRuleSetKindPointer failed") } return util.JSONResponse{ diff --git a/clientapi/routing/sendevent_test.go b/clientapi/routing/sendevent_test.go index 9cdd7535..00d19154 100644 --- a/clientapi/routing/sendevent_test.go +++ b/clientapi/routing/sendevent_test.go @@ -265,7 +265,7 @@ func createEvents(eventsJSON []string, roomVer gomatrixserverlib.RoomVersion) ([ for i, eventJSON := range eventsJSON { pdu, evErr := roomVerImpl.NewEventFromTrustedJSON([]byte(eventJSON), false) if evErr != nil { - return nil, fmt.Errorf("failed to make event: %s", err.Error()) + return nil, fmt.Errorf("failed to make event: %s", evErr.Error()) } ev := types.HeaderedEvent{PDU: pdu} events[i] = &ev diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 81c532f1..abf88882 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -601,9 +601,11 @@ func (s *OutputRoomEventConsumer) writeFTS(ev *rstypes.HeaderedEvent, pduPositio } e.SetContentType(ev.Type()) + var relatesTo gjson.Result switch ev.Type() { case "m.room.message": e.Content = gjson.GetBytes(ev.Content(), "body").String() + relatesTo = gjson.GetBytes(ev.Content(), "m\\.relates_to") case spec.MRoomName: e.Content = gjson.GetBytes(ev.Content(), "name").String() case spec.MRoomTopic: @@ -622,6 +624,22 @@ func (s *OutputRoomEventConsumer) writeFTS(ev *rstypes.HeaderedEvent, pduPositio if err := s.fts.Index(e); err != nil { return err } + // If the event is an edited message we remove the original event from the index + // to avoid duplicates in the search results. + if relatesTo.Exists() { + relatedData := relatesTo.Map() + if _, ok := relatedData["rel_type"]; ok && relatedData["rel_type"].Str == "m.replace" { + // We remove the original event from the index + if srcEventID, ok := relatedData["event_id"]; ok { + if err := s.fts.Delete(srcEventID.Str); err != nil { + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "src_id": srcEventID.Str, + }).WithError(err).Error("Failed to delete edited message from the fulltext index") + } + } + } + } } return nil } diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go index 0392f209..d360e10d 100644 --- a/syncapi/syncapi_test.go +++ b/syncapi/syncapi_test.go @@ -4,12 +4,14 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "net/http/httptest" "reflect" "testing" "time" + "github.com/gorilla/mux" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -17,6 +19,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib/spec" "github.com/nats-io/nats.go" + "github.com/stretchr/testify/assert" "github.com/tidwall/gjson" rstypes "github.com/matrix-org/dendrite/roomserver/types" @@ -1324,6 +1327,95 @@ func TestUpdateRelations(t *testing.T) { }) } +func TestRemoveEditedEventFromSearchIndex(t *testing.T) { + user := test.NewUser(t) + alice := userapi.Device{ + ID: "ALICEID", + UserID: user.ID, + AccessToken: "ALICE_BEARER_TOKEN", + DisplayName: "Alice", + AccountType: userapi.AccountTypeUser, + } + + routers := httputil.NewRouters() + + cfg, processCtx, close := testrig.CreateConfig(t, test.DBTypeSQLite) + cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) + caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) + defer close() + + // Use an actual roomserver for this + natsInstance := jetstream.NATSInstance{} + jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) + defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream) + + rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) + rsAPI.SetFederationAPI(nil, nil) + + room := test.NewRoom(t, user) + AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, caching.DisableMetrics) + + if err := api.SendEvents(processCtx.Context(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil { + t.Fatalf("failed to send events: %v", err) + } + + ev1 := room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{"body": "first"}) + ev2 := room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{ + "body": " * first", + "m.new_content": map[string]interface{}{ + "body": "first", + "msgtype": "m.text", + }, + "m.relates_to": map[string]interface{}{ + "event_id": ev1.EventID(), + "rel_type": "m.replace", + }, + }) + events := []*rstypes.HeaderedEvent{ev1, ev2} + + for _, e := range events { + roomEvents := append([]*rstypes.HeaderedEvent{}, e) + if err := api.SendEvents(processCtx.Context(), rsAPI, api.KindNew, roomEvents, "test", "test", "test", nil, false); err != nil { + t.Fatalf("failed to send events: %v", err) + } + + syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool { + // wait for the last sent eventID to come down sync + path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, e.EventID()) + + return gjson.Get(syncBody, path).Exists() + }) + + // We search that event is the only one nad is the exact event we sent + searchResult := searchRequest(t, routers.Client, alice.AccessToken, "first", []string{room.ID}) + results := gjson.GetBytes(searchResult, fmt.Sprintf(`search_categories.room_events.groups.room_id.%s.results`, room.ID)) + assert.True(t, results.Exists(), "Should be a search response") + assert.Equal(t, 1, len(results.Array()), "Should be exactly one result") + assert.Equal(t, e.EventID(), results.Array()[0].String(), "Should be only found exact event") + } +} + +func searchRequest(t *testing.T, router *mux.Router, accessToken, searchTerm string, roomList []string) []byte { + t.Helper() + w := httptest.NewRecorder() + rq := test.NewRequest(t, "POST", "/_matrix/client/v3/search", test.WithQueryParams(map[string]string{ + "access_token": accessToken, + }), test.WithJSONBody(t, map[string]interface{}{ + "search_categories": map[string]interface{}{ + "room_events": map[string]interface{}{ + "filters": roomList, + "search_term": searchTerm, + }, + }, + })) + + router.ServeHTTP(w, rq) + assert.Equal(t, 200, w.Code) + defer w.Result().Body.Close() + body, err := io.ReadAll(w.Result().Body) + assert.NoError(t, err) + return body +} func syncUntil(t *testing.T, routers httputil.Routers, accessToken string, skip bool, diff --git a/test/testrig/base.go b/test/testrig/base.go index 95370459..a21cfe80 100644 --- a/test/testrig/base.go +++ b/test/testrig/base.go @@ -71,6 +71,7 @@ func CreateConfig(t *testing.T, dbType test.DBType) (*config.Dendrite, *process. SingleDatabase: false, }) cfg.Global.ServerName = "test" + cfg.SyncAPI.Fulltext.Enabled = true cfg.SyncAPI.Fulltext.InMemory = true // use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use // the file system event with InMemory=true :(