Skip to content
This repository has been archived by the owner on Mar 28, 2023. It is now read-only.

Manually send order messages and trigger offline message scan #1584

Merged
merged 27 commits into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b5b179d
first pass at manually sending order messages and triggering offline …
amangale May 17, 2019
9cdef62
remove reduntant return statement
amangale May 17, 2019
02279da
lint fixes
amangale May 17, 2019
7235b68
rename order_messages to messages and in general make this pr inclusi…
amangale May 21, 2019
ee1b999
add repo.Message as a wrapper for pb.Message
amangale May 22, 2019
1ef4342
add repo.Message as a wrapper for pb.Message
amangale May 22, 2019
bb5f5fe
use repo.Message in the messages datastore
amangale May 22, 2019
d18ec20
use repo.Message in message handler and the resend api endpoint
amangale May 22, 2019
75bdee7
lint fixes
amangale Jun 24, 2019
26428f1
Merge remote-tracking branch 'origin/master' into manual_order_messaging
amangale Jul 10, 2019
63624ff
add messages composite index for peerID messageType tuple
amangale Jul 10, 2019
d4d9305
handle errors in core/net put messages
amangale Jul 10, 2019
cae2cde
fix repo messages
amangale Jul 10, 2019
1366f14
fix migration 24 and add unit test
amangale Jul 10, 2019
bcca56d
lint fix
amangale Jul 10, 2019
35fddca
lint fix
amangale Jul 10, 2019
db438f9
fix panic on pre initialization invoking of offline message scan
amangale Jul 11, 2019
a6e1da5
decrease the manual scan interval to 2 minutes
amangale Jul 12, 2019
9c860ba
reduce retry interval to 1 minute
amangale Jul 16, 2019
26c47e8
update the utxo wallets to invoke the transaction listeners for ORDER…
amangale Jul 17, 2019
4ce3238
fix reviewed issues
amangale Jul 18, 2019
3b17209
remove GetMessageByID
amangale Jul 19, 2019
81187cd
Merge branch 'master' into manual_order_messaging
amangale Jul 23, 2019
57a6bfd
fix error logging and add offline message send after unsuccessful mes…
amangale Jul 24, 2019
4b37451
Merge branch 'manual_order_messaging' of https://github.com/OpenBazaa…
amangale Jul 24, 2019
b948af3
merge master
amangale Jul 24, 2019
0ee517e
fix indentation and modify the offline scan conditional
amangale Jul 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func post(i *jsonAPIHandler, path string, w http.ResponseWriter, r *http.Request
i.POSTPost(w, r)
case strings.HasPrefix(path, "/ob/bulkupdatecurrency"):
i.POSTBulkUpdateCurrency(w, r)
case strings.HasPrefix(path, "/ob/resendordermessage"):
i.POSTResendOrderMessage(w, r)
default:
ErrorResponse(w, http.StatusNotFound, "Not Found")
}
Expand Down Expand Up @@ -201,6 +203,8 @@ func get(i *jsonAPIHandler, path string, w http.ResponseWriter, r *http.Request)
i.GETPosts(w, r)
case strings.HasPrefix(path, "/ob/post"):
i.GETPost(w, r)
case strings.HasPrefix(path, "/ob/scanofflinemessages"):
i.GETScanOfflineMessages(w, r)
default:
ErrorResponse(w, http.StatusNotFound, "Not Found")
}
Expand Down
78 changes: 71 additions & 7 deletions api/jsonapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type jsonAPIHandler struct {
node *core.OpenBazaarNode
}

var lastManualScan time.Time

func newJSONAPIHandler(node *core.OpenBazaarNode, authCookie http.Cookie, config schema.APIConfig) *jsonAPIHandler {
allowedIPs := make(map[string]bool)
for _, ip := range config.AllowedIPs {
Expand Down Expand Up @@ -1475,7 +1477,7 @@ func (i *jsonAPIHandler) GETProfile(w http.ResponseWriter, r *http.Request) {
return
}
if profile.PeerID != peerID {
ErrorResponse(w, http.StatusNotFound, err.Error())
ErrorResponse(w, http.StatusNotFound, "incorrect peer id")
return
}
w.Header().Set("Cache-Control", "public, max-age=600, immutable")
Expand Down Expand Up @@ -1884,7 +1886,7 @@ func (i *jsonAPIHandler) GETModerators(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}
i.node.Broadcast <- repo.PremarshalledNotifier{b}
i.node.Broadcast <- repo.PremarshalledNotifier{Payload: b}
} else {
type wsResp struct {
ID string `json:"id"`
Expand All @@ -1895,7 +1897,7 @@ func (i *jsonAPIHandler) GETModerators(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}
i.node.Broadcast <- repo.PremarshalledNotifier{data}
i.node.Broadcast <- repo.PremarshalledNotifier{Payload: data}
}
}(p)
}
Expand Down Expand Up @@ -2819,7 +2821,7 @@ func (i *jsonAPIHandler) POSTFetchProfiles(w http.ResponseWriter, r *http.Reques
if err != nil {
return
}
i.node.Broadcast <- repo.PremarshalledNotifier{ret}
i.node.Broadcast <- repo.PremarshalledNotifier{Payload: ret}
}

pro, err := i.node.FetchProfile(pid, useCache)
Expand All @@ -2844,7 +2846,7 @@ func (i *jsonAPIHandler) POSTFetchProfiles(w http.ResponseWriter, r *http.Reques
respondWithError("error Marshalling to JSON")
return
}
i.node.Broadcast <- repo.PremarshalledNotifier{b}
i.node.Broadcast <- repo.PremarshalledNotifier{Payload: b}
}(p)
}
}()
Expand Down Expand Up @@ -3594,7 +3596,7 @@ func (i *jsonAPIHandler) POSTFetchRatings(w http.ResponseWriter, r *http.Request
if err != nil {
return
}
i.node.Broadcast <- repo.PremarshalledNotifier{ret}
i.node.Broadcast <- repo.PremarshalledNotifier{Payload: ret}
}
ratingBytes, err := ipfs.Cat(i.node.IpfsNode, rid, time.Minute)
if err != nil {
Expand Down Expand Up @@ -3633,7 +3635,7 @@ func (i *jsonAPIHandler) POSTFetchRatings(w http.ResponseWriter, r *http.Request
respondWithError("error marshalling rating")
return
}
i.node.Broadcast <- repo.PremarshalledNotifier{b}
i.node.Broadcast <- repo.PremarshalledNotifier{Payload: b}
}(r)
}
}
Expand Down Expand Up @@ -4150,3 +4152,65 @@ func (i *jsonAPIHandler) GETPost(w http.ResponseWriter, r *http.Request) {
}
SanitizedResponseM(w, out, new(pb.SignedPost))
}

// POSTSendOrderMessage - used to manually send an order message
func (i *jsonAPIHandler) POSTResendOrderMessage(w http.ResponseWriter, r *http.Request) {
type sendRequest struct {
OrderID string `json:"orderID"`
MessageType string `json:"messageType"`
}

var args sendRequest
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&args)
if err != nil {
ErrorResponse(w, http.StatusBadRequest, err.Error())
return
}

if args.OrderID == "" {
ErrorResponse(w, http.StatusBadRequest, core.ErrOrderNotFound.Error())
return
}

var msgType int32
var ok bool

if msgType, ok = pb.Message_MessageType_value[args.MessageType]; !ok {
ErrorResponse(w, http.StatusBadRequest, "invalid order message type")
return
}

msg, peerID, err := i.node.Datastore.Messages().
GetByOrderIDType(args.OrderID, pb.Message_MessageType(msgType))
if err != nil || msg == nil || msg.Msg.GetPayload() == nil {
ErrorResponse(w, http.StatusBadRequest, "order message not found")
return
}

p, err := peer.IDB58Decode(peerID)
if err != nil {
ErrorResponse(w, http.StatusBadRequest, "invalid peer id")
return
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err = i.node.Service.SendMessage(ctx, p, &msg.Msg)
if err != nil {
ErrorResponse(w, http.StatusBadRequest, "order message not sent")
return
}

SanitizedResponse(w, "")
}

// GETScanOfflineMessages - used to manually trigger offline message scan
func (i *jsonAPIHandler) GETScanOfflineMessages(w http.ResponseWriter, r *http.Request) {
if time.Since(lastManualScan).Minutes() > 10 {
i.node.MessageRetriever.RunOnce()
lastManualScan = time.Now()
}
SanitizedResponse(w, "")
}
31 changes: 28 additions & 3 deletions core/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ package core

import (
"errors"
"fmt"
"sync"
"time"

libp2p "gx/ipfs/QmTW4SdgBWq9GjsBsHeUx8WuGxzhgzAf88UMH2w62PC8yK/go-libp2p-crypto"
"gx/ipfs/QmTbxNB1NwDesLmKTscr4udL2tVP7MaxvXnD1D9yX7g3PN/go-cid"
"gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer"
"gx/ipfs/QmerPMzPk1mJVowm8KgmoknWa4yCYvvugMPsgWmDNUvDLW/go-multihash"

"sync"
"time"

"github.com/OpenBazaar/openbazaar-go/ipfs"
"github.com/OpenBazaar/openbazaar-go/pb"
"github.com/OpenBazaar/openbazaar-go/repo"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
Expand Down Expand Up @@ -276,6 +277,11 @@ func (n *OpenBazaarNode) SendOrder(peerID string, contract *pb.RicardianContract
Payload: pbAny,
}

orderID0, _ := n.CalcOrderID(contract.BuyerOrder)
placer14 marked this conversation as resolved.
Show resolved Hide resolved
n.Datastore.Messages().Put(
fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER)),
orderID0, pb.Message_ORDER, peerID, repo.Message{Msg: m})

placer14 marked this conversation as resolved.
Show resolved Hide resolved
resp, err = n.Service.SendRequest(ctx, p, &m)
if err != nil {
return resp, err
Expand All @@ -302,6 +308,11 @@ func (n *OpenBazaarNode) SendOrderConfirmation(peerID string, contract *pb.Ricar
if err != nil {
return err
}
orderID0, _ := n.CalcOrderID(contract.BuyerOrder)
placer14 marked this conversation as resolved.
Show resolved Hide resolved
n.Datastore.Messages().Put(
placer14 marked this conversation as resolved.
Show resolved Hide resolved
fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_CONFIRMATION)),
orderID0, pb.Message_ORDER_CONFIRMATION, peerID, repo.Message{Msg: m})

return n.sendMessage(peerID, &k, m)
}

Expand All @@ -324,6 +335,9 @@ func (n *OpenBazaarNode) SendCancel(peerID, orderID string) error {
}
kp = &k
}
n.Datastore.Messages().Put(
placer14 marked this conversation as resolved.
Show resolved Hide resolved
fmt.Sprintf("%s-%d", orderID, int(pb.Message_ORDER_CANCEL)),
orderID, pb.Message_ORDER_CANCEL, peerID, repo.Message{Msg: m})
return n.sendMessage(peerID, kp, m)
}

Expand All @@ -349,6 +363,9 @@ func (n *OpenBazaarNode) SendReject(peerID string, rejectMessage *pb.OrderReject
}
kp = &k
}
n.Datastore.Messages().Put(
placer14 marked this conversation as resolved.
Show resolved Hide resolved
fmt.Sprintf("%s-%d", rejectMessage.OrderID, int(pb.Message_ORDER_REJECT)),
rejectMessage.OrderID, pb.Message_ORDER_REJECT, peerID, repo.Message{Msg: m})
return n.sendMessage(peerID, kp, m)
}

Expand Down Expand Up @@ -379,6 +396,10 @@ func (n *OpenBazaarNode) SendOrderFulfillment(peerID string, k *libp2p.PubKey, f
MessageType: pb.Message_ORDER_FULFILLMENT,
Payload: a,
}
orderID0, _ := n.CalcOrderID(fulfillmentMessage.BuyerOrder)
n.Datastore.Messages().Put(
placer14 marked this conversation as resolved.
Show resolved Hide resolved
fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_FULFILLMENT)),
orderID0, pb.Message_ORDER_FULFILLMENT, peerID, repo.Message{Msg: m})
return n.sendMessage(peerID, k, m)
}

Expand All @@ -395,6 +416,10 @@ func (n *OpenBazaarNode) SendOrderCompletion(peerID string, k *libp2p.PubKey, co
if err != nil {
return err
}
orderID0, _ := n.CalcOrderID(completionMessage.BuyerOrder)
n.Datastore.Messages().Put(
placer14 marked this conversation as resolved.
Show resolved Hide resolved
fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_COMPLETION)),
orderID0, pb.Message_ORDER_COMPLETION, peerID, repo.Message{Msg: m})
return n.sendMessage(peerID, k, m)
}

Expand Down
6 changes: 6 additions & 0 deletions net/retriever/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ func (m *MessageRetriever) Run() {
}
}

// RunOnce - used to fetch messages only once
func (m *MessageRetriever) RunOnce() {
go m.fetchPointers(true)
go m.fetchPointers(false)
}

func (m *MessageRetriever) fetchPointers(useDHT bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
15 changes: 15 additions & 0 deletions repo/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Datastore interface {
Coupons() CouponStore
TxMetadata() TransactionMetadataStore
ModeratedStores() ModeratedStore
Messages() MessageStore
Ping() error
Close()
}
Expand Down Expand Up @@ -431,3 +432,17 @@ type WatchedScriptStore interface {
Queryable
wallet.WatchedScripts
}

// MessageStore is the messages table interface
type MessageStore interface {
Queryable

// Save a new message
Put(messageID, orderID string, mType pb.Message_MessageType, peerID string, msg Message) error

// GetByOrderIDType returns the message for specified order and type
GetByOrderIDType(orderID string, mType pb.Message_MessageType) (*Message, string, error)

// GetByMessageIDType returns the message for specified message id
GetByMessageIDType(messageID string) (*Message, string, error)
}
7 changes: 7 additions & 0 deletions repo/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type SQLiteDatastore struct {
coupons repo.CouponStore
txMetadata repo.TransactionMetadataStore
moderatedStores repo.ModeratedStore
messages repo.MessageStore
db *sql.DB
lock *sync.Mutex
}
Expand Down Expand Up @@ -81,6 +82,7 @@ func NewSQLiteDatastore(db *sql.DB, l *sync.Mutex, coinType wallet.CoinType) *SQ
coupons: NewCouponStore(db, l),
txMetadata: NewTransactionMetadataStore(db, l),
moderatedStores: NewModeratedStore(db, l),
messages: NewMessageStore(db, l),
db: db,
lock: l,
}
Expand Down Expand Up @@ -183,6 +185,11 @@ func (d *SQLiteDatastore) ModeratedStores() repo.ModeratedStore {
return d.moderatedStores
}

// Messages - return the messages datastore
func (d *SQLiteDatastore) Messages() repo.MessageStore {
return d.messages
}

func (d *SQLiteDatastore) Copy(dbPath string, password string) error {
d.lock.Lock()
defer d.lock.Unlock()
Expand Down
Loading