diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index bd7aa018..44de02c9 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -116,7 +116,7 @@ const updateEventJSONSQL = "" + // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). const selectStateInRangeSQL = "" + - "SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + + "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + " FROM syncapi_output_room_events" + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + " AND ( $3::text[] IS NULL OR sender = ANY($3) )" + @@ -221,13 +221,14 @@ func (s *outputRoomEventsStatements) SelectStateInRange( for rows.Next() { var ( + eventID string streamPos types.StreamPosition eventBytes []byte excludeFromSync bool addIDs pq.StringArray delIDs pq.StringArray ) - if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil { return nil, nil, err } // Sanity check for deleted state and whine if we see it. We don't need to do anything @@ -243,7 +244,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( // TODO: Handle redacted events var ev gomatrixserverlib.HeaderedEvent - if err := json.Unmarshal(eventBytes, &ev); err != nil { + if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { return nil, nil, err } needSet := stateNeeded[ev.RoomID()] @@ -258,7 +259,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( } stateNeeded[ev.RoomID()] = needSet - eventIDToEvent[ev.EventID()] = types.StreamEvent{ + eventIDToEvent[eventID] = types.StreamEvent{ HeaderedEvent: &ev, StreamPosition: streamPos, ExcludeFromSync: excludeFromSync, diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 37f7ea00..afdbe55c 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -81,7 +81,7 @@ const updateEventJSONSQL = "" + "UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2" const selectStateInRangeSQL = "" + - "SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + + "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + " FROM syncapi_output_room_events" + " WHERE (id > $1 AND id <= $2)" + " AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))" @@ -173,13 +173,14 @@ func (s *outputRoomEventsStatements) SelectStateInRange( for rows.Next() { var ( + eventID string streamPos types.StreamPosition eventBytes []byte excludeFromSync bool addIDsJSON string delIDsJSON string ) - if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil { return nil, nil, err } @@ -201,7 +202,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( // TODO: Handle redacted events var ev gomatrixserverlib.HeaderedEvent - if err := json.Unmarshal(eventBytes, &ev); err != nil { + if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { return nil, nil, err } needSet := stateNeeded[ev.RoomID()] @@ -216,7 +217,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( } stateNeeded[ev.RoomID()] = needSet - eventIDToEvent[ev.EventID()] = types.StreamEvent{ + eventIDToEvent[eventID] = types.StreamEvent{ HeaderedEvent: &ev, StreamPosition: streamPos, ExcludeFromSync: excludeFromSync,