mirror of
https://github.com/1f349/dendrite.git
synced 2024-11-26 05:31:32 +00:00
Make clientapi:sendevents idempotent (#444)
* Add transactions.Cache to clientapi setup * Add idempotency to clientapi/SendEvent
This commit is contained in:
parent
8861437c26
commit
29532e7bc3
@ -21,6 +21,7 @@ import (
|
|||||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||||
"github.com/matrix-org/dendrite/clientapi/routing"
|
"github.com/matrix-org/dendrite/clientapi/routing"
|
||||||
"github.com/matrix-org/dendrite/common/basecomponent"
|
"github.com/matrix-org/dendrite/common/basecomponent"
|
||||||
|
"github.com/matrix-org/dendrite/common/transactions"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
@ -37,6 +38,7 @@ func SetupClientAPIComponent(
|
|||||||
aliasAPI api.RoomserverAliasAPI,
|
aliasAPI api.RoomserverAliasAPI,
|
||||||
inputAPI api.RoomserverInputAPI,
|
inputAPI api.RoomserverInputAPI,
|
||||||
queryAPI api.RoomserverQueryAPI,
|
queryAPI api.RoomserverQueryAPI,
|
||||||
|
transactionsCache *transactions.Cache,
|
||||||
) {
|
) {
|
||||||
roomserverProducer := producers.NewRoomserverProducer(inputAPI)
|
roomserverProducer := producers.NewRoomserverProducer(inputAPI)
|
||||||
|
|
||||||
@ -62,5 +64,6 @@ func SetupClientAPIComponent(
|
|||||||
queryAPI, aliasAPI, accountsDB, deviceDB,
|
queryAPI, aliasAPI, accountsDB, deviceDB,
|
||||||
federation, *keyRing,
|
federation, *keyRing,
|
||||||
userUpdateProducer, syncProducer,
|
userUpdateProducer, syncProducer,
|
||||||
|
transactionsCache,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/common/config"
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
|
"github.com/matrix-org/dendrite/common/transactions"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
@ -48,6 +49,7 @@ func Setup(
|
|||||||
keyRing gomatrixserverlib.KeyRing,
|
keyRing gomatrixserverlib.KeyRing,
|
||||||
userUpdateProducer *producers.UserUpdateProducer,
|
userUpdateProducer *producers.UserUpdateProducer,
|
||||||
syncProducer *producers.SyncAPIProducer,
|
syncProducer *producers.SyncAPIProducer,
|
||||||
|
transactionsCache *transactions.Cache,
|
||||||
) {
|
) {
|
||||||
|
|
||||||
apiMux.Handle("/_matrix/client/versions",
|
apiMux.Handle("/_matrix/client/versions",
|
||||||
@ -92,14 +94,15 @@ func Setup(
|
|||||||
r0mux.Handle("/rooms/{roomID}/send/{eventType}",
|
r0mux.Handle("/rooms/{roomID}/send/{eventType}",
|
||||||
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
vars := mux.Vars(req)
|
vars := mux.Vars(req)
|
||||||
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, queryAPI, producer)
|
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, queryAPI, producer, nil)
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodPost, http.MethodOptions)
|
).Methods(http.MethodPost, http.MethodOptions)
|
||||||
r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}",
|
r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}",
|
||||||
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
vars := mux.Vars(req)
|
vars := mux.Vars(req)
|
||||||
txnID := vars["txnID"]
|
txnID := vars["txnID"]
|
||||||
return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID, nil, cfg, queryAPI, producer)
|
return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID,
|
||||||
|
nil, cfg, queryAPI, producer, transactionsCache)
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodPut, http.MethodOptions)
|
).Methods(http.MethodPut, http.MethodOptions)
|
||||||
r0mux.Handle("/rooms/{roomID}/state/{eventType:[^/]+/?}",
|
r0mux.Handle("/rooms/{roomID}/state/{eventType:[^/]+/?}",
|
||||||
@ -111,14 +114,14 @@ func Setup(
|
|||||||
if strings.HasSuffix(eventType, "/") {
|
if strings.HasSuffix(eventType, "/") {
|
||||||
eventType = eventType[:len(eventType)-1]
|
eventType = eventType[:len(eventType)-1]
|
||||||
}
|
}
|
||||||
return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, queryAPI, producer)
|
return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, queryAPI, producer, nil)
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodPut, http.MethodOptions)
|
).Methods(http.MethodPut, http.MethodOptions)
|
||||||
r0mux.Handle("/rooms/{roomID}/state/{eventType}/{stateKey}",
|
r0mux.Handle("/rooms/{roomID}/state/{eventType}/{stateKey}",
|
||||||
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
vars := mux.Vars(req)
|
vars := mux.Vars(req)
|
||||||
stateKey := vars["stateKey"]
|
stateKey := vars["stateKey"]
|
||||||
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, queryAPI, producer)
|
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, queryAPI, producer, nil)
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodPut, http.MethodOptions)
|
).Methods(http.MethodPut, http.MethodOptions)
|
||||||
|
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/common/config"
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
|
"github.com/matrix-org/dendrite/common/transactions"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
@ -45,7 +46,15 @@ func SendEvent(
|
|||||||
cfg config.Dendrite,
|
cfg config.Dendrite,
|
||||||
queryAPI api.RoomserverQueryAPI,
|
queryAPI api.RoomserverQueryAPI,
|
||||||
producer *producers.RoomserverProducer,
|
producer *producers.RoomserverProducer,
|
||||||
|
txnCache *transactions.Cache,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
|
if txnID != nil {
|
||||||
|
// Try to fetch response from transactionsCache
|
||||||
|
if res, ok := txnCache.FetchTransaction(*txnID); ok {
|
||||||
|
return *res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// parse the incoming http request
|
// parse the incoming http request
|
||||||
userID := device.UserID
|
userID := device.UserID
|
||||||
var r map[string]interface{} // must be a JSON object
|
var r map[string]interface{} // must be a JSON object
|
||||||
@ -105,8 +114,14 @@ func SendEvent(
|
|||||||
return httputil.LogThenError(req, err)
|
return httputil.LogThenError(req, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return util.JSONResponse{
|
res := util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
JSON: sendEventResponse{e.EventID()},
|
JSON: sendEventResponse{e.EventID()},
|
||||||
}
|
}
|
||||||
|
// Add response to transactionsCache
|
||||||
|
if txnID != nil {
|
||||||
|
txnCache.AddTransaction(*txnID, &res)
|
||||||
|
}
|
||||||
|
|
||||||
|
return res
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@ import (
|
|||||||
"github.com/matrix-org/dendrite/clientapi"
|
"github.com/matrix-org/dendrite/clientapi"
|
||||||
"github.com/matrix-org/dendrite/common/basecomponent"
|
"github.com/matrix-org/dendrite/common/basecomponent"
|
||||||
"github.com/matrix-org/dendrite/common/keydb"
|
"github.com/matrix-org/dendrite/common/keydb"
|
||||||
|
"github.com/matrix-org/dendrite/common/transactions"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@ -33,10 +34,11 @@ func main() {
|
|||||||
keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
|
keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
|
||||||
|
|
||||||
alias, input, query := base.CreateHTTPRoomserverAPIs()
|
alias, input, query := base.CreateHTTPRoomserverAPIs()
|
||||||
|
cache := transactions.New()
|
||||||
|
|
||||||
clientapi.SetupClientAPIComponent(
|
clientapi.SetupClientAPIComponent(
|
||||||
base, deviceDB, accountDB, federation, &keyRing,
|
base, deviceDB, accountDB, federation, &keyRing,
|
||||||
alias, input, query,
|
alias, input, query, cache,
|
||||||
)
|
)
|
||||||
|
|
||||||
base.SetupAndServeHTTP(string(base.Cfg.Listen.ClientAPI))
|
base.SetupAndServeHTTP(string(base.Cfg.Listen.ClientAPI))
|
||||||
|
@ -19,6 +19,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/common/keydb"
|
"github.com/matrix-org/dendrite/common/keydb"
|
||||||
|
"github.com/matrix-org/dendrite/common/transactions"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi"
|
"github.com/matrix-org/dendrite/clientapi"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
@ -52,7 +53,11 @@ func main() {
|
|||||||
|
|
||||||
alias, input, query := roomserver.SetupRoomServerComponent(base)
|
alias, input, query := roomserver.SetupRoomServerComponent(base)
|
||||||
|
|
||||||
clientapi.SetupClientAPIComponent(base, deviceDB, accountDB, federation, &keyRing, alias, input, query)
|
clientapi.SetupClientAPIComponent(
|
||||||
|
base, deviceDB, accountDB,
|
||||||
|
federation, &keyRing, alias, input, query,
|
||||||
|
transactions.New(),
|
||||||
|
)
|
||||||
federationapi.SetupFederationAPIComponent(base, accountDB, federation, &keyRing, alias, input, query)
|
federationapi.SetupFederationAPIComponent(base, accountDB, federation, &keyRing, alias, input, query)
|
||||||
federationsender.SetupFederationSenderComponent(base, federation, query)
|
federationsender.SetupFederationSenderComponent(base, federation, query)
|
||||||
mediaapi.SetupMediaAPIComponent(base, deviceDB)
|
mediaapi.SetupMediaAPIComponent(base, deviceDB)
|
||||||
|
Loading…
Reference in New Issue
Block a user