Skip to content
This repository has been archived by the owner on Nov 25, 2024. It is now read-only.

Implement transactions cache to ensure idempotency of sendEvents #419

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/github.com/matrix-org/dendrite/clientapi/clientapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/consumers"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/routing"
"github.com/matrix-org/dendrite/clientapi/transactions"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
Expand All @@ -37,6 +38,7 @@ func SetupClientAPIComponent(
aliasAPI api.RoomserverAliasAPI,
inputAPI api.RoomserverInputAPI,
queryAPI api.RoomserverQueryAPI,
transactionsCache *transactions.Cache,
) {
roomserverProducer := producers.NewRoomserverProducer(inputAPI)

Expand All @@ -62,5 +64,6 @@ func SetupClientAPIComponent(
queryAPI, aliasAPI, accountsDB, deviceDB,
federation, *keyRing,
userUpdateProducer, syncProducer,
transactionsCache,
)
}
10 changes: 6 additions & 4 deletions src/github.com/matrix-org/dendrite/clientapi/routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/transactions"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
Expand All @@ -48,6 +49,7 @@ func Setup(
keyRing gomatrixserverlib.KeyRing,
userUpdateProducer *producers.UserUpdateProducer,
syncProducer *producers.SyncAPIProducer,
transactionsCache *transactions.Cache,
) {

apiMux.Handle("/_matrix/client/versions",
Expand Down Expand Up @@ -91,14 +93,14 @@ func Setup(
r0mux.Handle("/rooms/{roomID}/send/{eventType}",
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, queryAPI, producer)
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, queryAPI, producer, nil)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}",
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
txnID := vars["txnID"]
return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID, nil, cfg, queryAPI, producer)
return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID, nil, cfg, queryAPI, producer, transactionsCache)
}),
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/state/{eventType:[^/]+/?}",
Expand All @@ -110,14 +112,14 @@ func Setup(
if strings.HasSuffix(eventType, "/") {
eventType = eventType[:len(eventType)-1]
}
return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, queryAPI, producer)
return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, queryAPI, producer, nil)
}),
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/state/{eventType}/{stateKey}",
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
stateKey := vars["stateKey"]
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, queryAPI, producer)
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, queryAPI, producer, nil)
}),
).Methods(http.MethodPut, http.MethodOptions)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/transactions"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
Expand All @@ -45,7 +46,18 @@ func SendEvent(
cfg config.Dendrite,
queryAPI api.RoomserverQueryAPI,
producer *producers.RoomserverProducer,
transactionsCache *transactions.Cache,
) util.JSONResponse {

if txnID != nil {
// Try to fetch response from transactionsCache
res, err := transactionsCache.FetchTransaction(*txnID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You made sure that there is a correlation to txnID so this should work but as you get a pointer of transactionsCache so I would add a null guard before accessing it.


if err != nil {
return *res
}
}

// parse the incoming http request
userID := device.UserID
var r map[string]interface{} // must be a JSON object
Expand Down Expand Up @@ -105,8 +117,15 @@ func SendEvent(
return httputil.LogThenError(req, err)
}

return util.JSONResponse{
res := util.JSONResponse{
Code: http.StatusOK,
JSON: sendEventResponse{e.EventID()},
}

// Add response to transactionsCache for later access
if txnID != nil {
transactionsCache.AddTransaction(*txnID, &res)
}

return res
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package transactions

import (
"errors"
"sync"
"time"

"github.com/matrix-org/util"
)

// CleanupPeriod represents time in nanoseconds after which cacheCleanService runs.
const CleanupPeriod time.Duration = 30 * time.Minute

type response struct {
cache *util.JSONResponse
cacheTime time.Time
}

// Cache represents a temporary store for responses.
// This is used to ensure idempotency of requests.
type Cache struct {
sync.RWMutex
txns map[string]response
}

// CreateCache creates and returns a initialized Cache object.
// This cache is automatically cleaned every `CleanupPeriod`.
func CreateCache() *Cache {
t := Cache{txns: make(map[string]response)}

// Start cleanup service as the Cache is created
go cacheCleanService(&t)

return &t
}

// FetchTransaction looks up response for txnID in Cache.
// Returns a JSON response if txnID is found, which can be sent to client,
// else returns error.
func (t *Cache) FetchTransaction(txnID string) (*util.JSONResponse, error) {
t.RLock()
res, ok := t.txns[txnID]
t.RUnlock()

if ok {
return res.cache, nil
}

return nil, errors.New("TxnID not present")
}

// AddTransaction adds a response for txnID in Cache for later access.
func (t *Cache) AddTransaction(txnID string, res *util.JSONResponse) {
t.Lock()
defer t.Unlock()
t.txns[txnID] = response{cache: res, cacheTime: time.Now()}
}

// cacheCleanService is responsible for cleaning up transactions older than 30 min.
// It guarantees that a transaction will be present in cache for at least 30 min & at most 60 min.
func cacheCleanService(t *Cache) {
for {
time.Sleep(CleanupPeriod)
go clean(t)
}
}

func clean(t *Cache) {
expire := time.Now().Add(-CleanupPeriod)
for key := range t.txns {
t.Lock()
if t.txns[key].cacheTime.Before(expire) {
delete(t.txns, key)
}
t.Unlock()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/transactions"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb"
)
Expand All @@ -33,10 +34,11 @@ func main() {
keyRing := keydb.CreateKeyRing(federation.Client, keyDB)

alias, input, query := base.CreateHTTPRoomserverAPIs()
txnCache := transactions.CreateCache()

clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB, federation, &keyRing,
alias, input, query,
alias, input, query, txnCache,
)

base.SetupAndServeHTTP(string(base.Cfg.Listen.ClientAPI))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/common/keydb"

"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/transactions"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/federationapi"
Expand Down Expand Up @@ -51,8 +52,9 @@ func main() {
keyRing := keydb.CreateKeyRing(federation.Client, keyDB)

alias, input, query := roomserver.SetupRoomServerComponent(base)
txnCache := transactions.CreateCache()

clientapi.SetupClientAPIComponent(base, deviceDB, accountDB, federation, &keyRing, alias, input, query)
clientapi.SetupClientAPIComponent(base, deviceDB, accountDB, federation, &keyRing, alias, input, query, txnCache)
federationapi.SetupFederationAPIComponent(base, accountDB, federation, &keyRing, alias, input, query)
federationsender.SetupFederationSenderComponent(base, federation, query)
mediaapi.SetupMediaAPIComponent(base, deviceDB)
Expand Down