From b5b179d91716b3309b66b772270361b130b5157f Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Fri, 17 May 2019 17:06:35 +0530 Subject: [PATCH 01/23] first pass at manually sending order messages and triggering offline message scan --- api/endpoints.go | 4 ++ api/jsonapi.go | 64 ++++++++++++++++++++ core/net.go | 12 ++++ net/retriever/retriever.go | 5 ++ repo/datastore.go | 12 ++++ repo/db/db.go | 6 ++ repo/db/order_messages.go | 89 ++++++++++++++++++++++++++++ repo/db/order_messages_test.go | 71 ++++++++++++++++++++++ repo/init.go | 2 +- repo/migration.go | 1 + repo/migrations/Migration022.go | 4 +- repo/migrations/Migration024.go | 75 +++++++++++++++++++++++ repo/migrations/Migration024_test.go | 1 + schema/constants.go | 1 + schema/manager.go | 1 + 15 files changed, 345 insertions(+), 3 deletions(-) create mode 100644 repo/db/order_messages.go create mode 100644 repo/db/order_messages_test.go create mode 100644 repo/migrations/Migration024.go create mode 100644 repo/migrations/Migration024_test.go diff --git a/api/endpoints.go b/api/endpoints.go index 6c0aa4073e..5f6b69d462 100644 --- a/api/endpoints.go +++ b/api/endpoints.go @@ -113,6 +113,8 @@ func post(i *jsonAPIHandler, path string, w http.ResponseWriter, r *http.Request i.POSTPost(w, r) case strings.HasPrefix(path, "/ob/bulkupdatecurrency"): i.POSTBulkUpdateCurrency(w, r) + case strings.HasPrefix(path, "/ob/sendordermessage"): + i.POSTSendOrderMessage(w, r) default: ErrorResponse(w, http.StatusNotFound, "Not Found") } @@ -201,6 +203,8 @@ func get(i *jsonAPIHandler, path string, w http.ResponseWriter, r *http.Request) i.GETPosts(w, r) case strings.HasPrefix(path, "/ob/post"): i.GETPost(w, r) + case strings.HasPrefix(path, "/ob/scanofflinemessages"): + i.GETScanOfflineMessages(w, r) default: ErrorResponse(w, http.StatusNotFound, "Not Found") } diff --git a/api/jsonapi.go b/api/jsonapi.go index 50056d13da..dfc7ac4b10 100644 --- a/api/jsonapi.go +++ b/api/jsonapi.go @@ -63,11 +63,14 @@ type jsonAPIHandler struct { node *core.OpenBazaarNode } +var lastManualScan time.Time + func newJSONAPIHandler(node *core.OpenBazaarNode, authCookie http.Cookie, config schema.APIConfig) *jsonAPIHandler { allowedIPs := make(map[string]bool) for _, ip := range config.AllowedIPs { allowedIPs[ip] = true } + lastManualScan = time.Now().Add(time.Duration(-10) * time.Minute) i := &jsonAPIHandler{ config: JSONAPIConfig{ Enabled: config.Enabled, @@ -4150,3 +4153,64 @@ func (i *jsonAPIHandler) GETPost(w http.ResponseWriter, r *http.Request) { } SanitizedResponseM(w, out, new(pb.SignedPost)) } + +// POSTSendOrderMessage - used to manually send an order message +func (i *jsonAPIHandler) POSTSendOrderMessage(w http.ResponseWriter, r *http.Request) { + type sendRequest struct { + OrderID string `json:"orderID"` + MessageType int32 `json:"messageType"` + } + + var args sendRequest + decoder := json.NewDecoder(r.Body) + err := decoder.Decode(&args) + if err != nil { + ErrorResponse(w, http.StatusBadRequest, err.Error()) + return + } + + if args.OrderID == "" { + ErrorResponse(w, http.StatusBadRequest, core.ErrOrderNotFound.Error()) + return + } + + if args.MessageType <= 0 { + ErrorResponse(w, http.StatusBadRequest, "invalid order message type") + return + } + + msg, peerID, err := i.node.Datastore.OrderMessages(). + GetByOrderIDType(args.OrderID, pb.Message_MessageType(args.MessageType)) + if err != nil { + ErrorResponse(w, http.StatusBadRequest, "order message not found") + return + } + + p, err := peer.IDB58Decode(peerID) + if err != nil { + ErrorResponse(w, http.StatusBadRequest, "invalid peer id") + return + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err = i.node.Service.SendMessage(ctx, p, msg) + if err != nil { + ErrorResponse(w, http.StatusBadRequest, "order message not sent") + return + } + + SanitizedResponse(w, "") + return +} + +// GETScanOfflineMessages - used to manually trigger offline message scan +func (i *jsonAPIHandler) GETScanOfflineMessages(w http.ResponseWriter, r *http.Request) { + if time.Since(lastManualScan).Minutes() > 10 { + i.node.MessageRetriever.RunOnce() + lastManualScan = time.Now() + } + SanitizedResponse(w, "") + return +} diff --git a/core/net.go b/core/net.go index 72ce15784e..5896b5ae12 100644 --- a/core/net.go +++ b/core/net.go @@ -276,6 +276,9 @@ func (n *OpenBazaarNode) SendOrder(peerID string, contract *pb.RicardianContract Payload: pbAny, } + orderID0, _ := n.CalcOrderID(contract.BuyerOrder) + n.Datastore.OrderMessages().Put(orderID0, pb.Message_ORDER, peerID, m) + resp, err = n.Service.SendRequest(ctx, p, &m) if err != nil { return resp, err @@ -302,6 +305,9 @@ func (n *OpenBazaarNode) SendOrderConfirmation(peerID string, contract *pb.Ricar if err != nil { return err } + orderID0, _ := n.CalcOrderID(contract.BuyerOrder) + n.Datastore.OrderMessages().Put(orderID0, pb.Message_ORDER_CONFIRMATION, peerID, m) + return n.sendMessage(peerID, &k, m) } @@ -324,6 +330,7 @@ func (n *OpenBazaarNode) SendCancel(peerID, orderID string) error { } kp = &k } + n.Datastore.OrderMessages().Put(orderID, pb.Message_ORDER_CANCEL, peerID, m) return n.sendMessage(peerID, kp, m) } @@ -349,6 +356,7 @@ func (n *OpenBazaarNode) SendReject(peerID string, rejectMessage *pb.OrderReject } kp = &k } + n.Datastore.OrderMessages().Put(rejectMessage.OrderID, pb.Message_ORDER_REJECT, peerID, m) return n.sendMessage(peerID, kp, m) } @@ -379,6 +387,8 @@ func (n *OpenBazaarNode) SendOrderFulfillment(peerID string, k *libp2p.PubKey, f MessageType: pb.Message_ORDER_FULFILLMENT, Payload: a, } + orderID0, _ := n.CalcOrderID(fulfillmentMessage.BuyerOrder) + n.Datastore.OrderMessages().Put(orderID0, pb.Message_ORDER_FULFILLMENT, peerID, m) return n.sendMessage(peerID, k, m) } @@ -395,6 +405,8 @@ func (n *OpenBazaarNode) SendOrderCompletion(peerID string, k *libp2p.PubKey, co if err != nil { return err } + orderID0, _ := n.CalcOrderID(completionMessage.BuyerOrder) + n.Datastore.OrderMessages().Put(orderID0, pb.Message_ORDER_COMPLETION, peerID, m) return n.sendMessage(peerID, k, m) } diff --git a/net/retriever/retriever.go b/net/retriever/retriever.go index 86b556b979..25872e58b8 100644 --- a/net/retriever/retriever.go +++ b/net/retriever/retriever.go @@ -118,6 +118,11 @@ func (m *MessageRetriever) Run() { } } +func (m *MessageRetriever) RunOnce() { + go m.fetchPointers(true) + go m.fetchPointers(false) +} + func (m *MessageRetriever) fetchPointers(useDHT bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/repo/datastore.go b/repo/datastore.go index d3b787f3ab..032547035f 100644 --- a/repo/datastore.go +++ b/repo/datastore.go @@ -28,6 +28,7 @@ type Datastore interface { Coupons() CouponStore TxMetadata() TransactionMetadataStore ModeratedStores() ModeratedStore + OrderMessages() OrderMessageStore Ping() error Close() } @@ -431,3 +432,14 @@ type WatchedScriptStore interface { Queryable wallet.WatchedScripts } + +// OrderMessageStore is the order_messages table interface +type OrderMessageStore interface { + Queryable + + // Save a new order message + Put(orderID string, mType pb.Message_MessageType, peerID string, msg pb.Message) error + + // GetByOrderIDType returns the dispute payout data for a case + GetByOrderIDType(orderID string, mType pb.Message_MessageType) (*pb.Message, string, error) +} diff --git a/repo/db/db.go b/repo/db/db.go index c8da7eeac6..dffdeedbf4 100644 --- a/repo/db/db.go +++ b/repo/db/db.go @@ -36,6 +36,7 @@ type SQLiteDatastore struct { coupons repo.CouponStore txMetadata repo.TransactionMetadataStore moderatedStores repo.ModeratedStore + orderMessages repo.OrderMessageStore db *sql.DB lock *sync.Mutex } @@ -81,6 +82,7 @@ func NewSQLiteDatastore(db *sql.DB, l *sync.Mutex, coinType wallet.CoinType) *SQ coupons: NewCouponStore(db, l), txMetadata: NewTransactionMetadataStore(db, l), moderatedStores: NewModeratedStore(db, l), + orderMessages: NewOrderMessageStore(db, l), db: db, lock: l, } @@ -183,6 +185,10 @@ func (d *SQLiteDatastore) ModeratedStores() repo.ModeratedStore { return d.moderatedStores } +func (d *SQLiteDatastore) OrderMessages() repo.OrderMessageStore { + return d.orderMessages +} + func (d *SQLiteDatastore) Copy(dbPath string, password string) error { d.lock.Lock() defer d.lock.Unlock() diff --git a/repo/db/order_messages.go b/repo/db/order_messages.go new file mode 100644 index 0000000000..c4e37b9e80 --- /dev/null +++ b/repo/db/order_messages.go @@ -0,0 +1,89 @@ +package db + +import ( + "database/sql" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/OpenBazaar/openbazaar-go/pb" + "github.com/OpenBazaar/openbazaar-go/repo" +) + +type OrderMessagesDB struct { + modelStore +} + +func NewOrderMessageStore(db *sql.DB, lock *sync.Mutex) repo.OrderMessageStore { + return &OrderMessagesDB{modelStore{db, lock}} +} + +func (o *OrderMessagesDB) Put(orderID string, mType pb.Message_MessageType, peerID string, msg pb.Message) error { + o.lock.Lock() + defer o.lock.Unlock() + + tx, err := o.db.Begin() + if err != nil { + return err + } + stm := `insert or replace into order_messages(messageID, orderID, message_type, message, peerID, created_at) values(?,?,?,?,?,?)` + stmt, err := tx.Prepare(stm) + if err != nil { + return err + } + + msg0, err := json.Marshal(msg) + if err != nil { + fmt.Println("err marshaling : ", err) + } + + defer stmt.Close() + _, err = stmt.Exec( + fmt.Sprintf("%s-%d", orderID, int(mType)), + orderID, + int(mType), + msg0, + peerID, + int(time.Now().Unix()), + ) + if err != nil { + rErr := tx.Rollback() + if rErr != nil { + return fmt.Errorf("order_message put fail: %s\nand rollback failed: %s\n", err.Error(), rErr.Error()) + } + return err + } + + return tx.Commit() +} + +// GetByOrderIDType returns the dispute payout data for a case +func (o *OrderMessagesDB) GetByOrderIDType(orderID string, mType pb.Message_MessageType) (*pb.Message, string, error) { + o.lock.Lock() + defer o.lock.Unlock() + var ( + msg0 []byte + peerID string + ) + + stmt, err := o.db.Prepare("select message, peerID from order_messages where messageID=?") + if err != nil { + return nil, "", err + } + err = stmt.QueryRow(fmt.Sprintf("%s-%d", orderID, mType)).Scan(&msg0, &peerID) + if err != nil { + return nil, "", err + } + + msg := new(pb.Message) + + if len(msg0) > 0 { + err = json.Unmarshal(msg0, msg) + if err != nil { + return nil, "", err + } + } + + return msg, peerID, nil +} diff --git a/repo/db/order_messages_test.go b/repo/db/order_messages_test.go new file mode 100644 index 0000000000..5f478db1b4 --- /dev/null +++ b/repo/db/order_messages_test.go @@ -0,0 +1,71 @@ +package db_test + +import ( + "fmt" + "sync" + "testing" + + "github.com/golang/protobuf/ptypes/any" + + "github.com/OpenBazaar/openbazaar-go/pb" + "github.com/OpenBazaar/openbazaar-go/repo" + "github.com/OpenBazaar/openbazaar-go/repo/db" + "github.com/OpenBazaar/openbazaar-go/schema" +) + +func buildNewOrderMessageStore() (repo.OrderMessageStore, func(), error) { + appSchema := schema.MustNewCustomSchemaManager(schema.SchemaContext{ + DataPath: schema.GenerateTempPath(), + TestModeEnabled: true, + }) + if err := appSchema.BuildSchemaDirectories(); err != nil { + return nil, nil, err + } + if err := appSchema.InitializeDatabase(); err != nil { + return nil, nil, err + } + database, err := appSchema.OpenDatabase() + if err != nil { + return nil, nil, err + } + return db.NewOrderMessageStore(database, new(sync.Mutex)), appSchema.DestroySchemaDirectories, nil +} + +func TestOrderMessageDB_Put(t *testing.T) { + var ( + messagesdb, teardown, err = buildNewOrderMessageStore() + orderID = "orderID1" + mType = pb.Message_ORDER + payload = "sample message" + peerID = "jack" + ) + if err != nil { + t.Fatal(err) + } + defer teardown() + + msg := pb.Message{ + MessageType: mType, + Payload: &any.Any{Value: []byte(payload)}, + } + + err = messagesdb.Put(orderID, mType, peerID, msg) + if err != nil { + t.Error(err) + } + + retMsg, peer, err := messagesdb.GetByOrderIDType(orderID, mType) + if err != nil { + t.Error(err) + } + + fmt.Println(string(retMsg.Payload.Value), " ", peer) + + if !(string(retMsg.Payload.Value) == payload) { + t.Error("incorrect payload") + } + + if !(peer == peerID) { + t.Error("incorrect peerID") + } +} diff --git a/repo/init.go b/repo/init.go index d03d52ec31..be9edfca18 100644 --- a/repo/init.go +++ b/repo/init.go @@ -17,7 +17,7 @@ import ( "github.com/tyler-smith/go-bip39" ) -const RepoVersion = "24" +const RepoVersion = "25" var log = logging.MustGetLogger("repo") var ErrRepoExists = errors.New("IPFS configuration file exists. Reinitializing would overwrite your keys. Use -f to force overwrite.") diff --git a/repo/migration.go b/repo/migration.go index 9e95f859d2..5df24ae6b1 100644 --- a/repo/migration.go +++ b/repo/migration.go @@ -44,6 +44,7 @@ var ( migrations.Migration021{}, migrations.Migration022{}, migrations.Migration023{}, + migrations.Migration024{}, } ) diff --git a/repo/migrations/Migration022.go b/repo/migrations/Migration022.go index 21f6c9707d..acddd6b915 100644 --- a/repo/migrations/Migration022.go +++ b/repo/migrations/Migration022.go @@ -49,7 +49,7 @@ func (Migration022) Up(repoPath, dbPassword string, testnet bool) error { } if err := writeRepoVer(repoPath, 23); err != nil { - return fmt.Errorf("bumping repover to 18: %s", err.Error()) + return fmt.Errorf("bumping repover to 23: %s", err.Error()) } return nil } @@ -91,7 +91,7 @@ func (Migration022) Down(repoPath, dbPassword string, testnet bool) error { } if err := writeRepoVer(repoPath, 22); err != nil { - return fmt.Errorf("dropping repover to 16: %s", err.Error()) + return fmt.Errorf("dropping repover to 22: %s", err.Error()) } return nil } diff --git a/repo/migrations/Migration024.go b/repo/migrations/Migration024.go new file mode 100644 index 0000000000..34f5505a72 --- /dev/null +++ b/repo/migrations/Migration024.go @@ -0,0 +1,75 @@ +package migrations + +import ( + "database/sql" + "fmt" + "io/ioutil" + "os" + "path" +) + +const ( + AM06_messagesCreateSQL = "create table order_messages (messageID text primary key not null, orderID text, message_type integer, message blob, peerID text, url text, acknowledged bool, tries integer, created_at integer, updated_at integer);" + AM06_upVer = "25" + AM06_downVer = "24" +) + +type AM06 struct{} + +type Migration024 struct { + AM06 +} + +func createOrderMessages(repoPath, databasePassword, rVer string, testnetEnabled bool) error { + fmt.Println("in create order messages") + var ( + databaseFilePath string + repoVersionFilePath = path.Join(repoPath, "repover") + ) + if testnetEnabled { + databaseFilePath = path.Join(repoPath, "datastore", "testnet.db") + } else { + databaseFilePath = path.Join(repoPath, "datastore", "mainnet.db") + } + + db, err := sql.Open("sqlite3", databaseFilePath) + if err != nil { + return err + } + defer db.Close() + if databasePassword != "" { + p := fmt.Sprintf("pragma key = '%s';", databasePassword) + _, err := db.Exec(p) + if err != nil { + return err + } + } + + tx, err := db.Begin() + if err != nil { + return err + } + if _, err = tx.Exec(AM06_messagesCreateSQL); err != nil { + tx.Rollback() + return err + } + if err = tx.Commit(); err != nil { + return err + } + + // Bump schema version + err = ioutil.WriteFile(repoVersionFilePath, []byte(rVer), os.ModePerm) + if err != nil { + return err + } + return nil +} + +func (AM06) Up(repoPath, databasePassword string, testnetEnabled bool) error { + fmt.Println("in am06 up") + return createOrderMessages(repoPath, databasePassword, AM06_upVer, testnetEnabled) +} + +func (AM06) Down(repoPath, databasePassword string, testnetEnabled bool) error { + return createOrderMessages(repoPath, databasePassword, AM06_downVer, testnetEnabled) +} diff --git a/repo/migrations/Migration024_test.go b/repo/migrations/Migration024_test.go new file mode 100644 index 0000000000..e417c8ac61 --- /dev/null +++ b/repo/migrations/Migration024_test.go @@ -0,0 +1 @@ +package migrations_test diff --git a/schema/constants.go b/schema/constants.go index cb13cc06a7..9ebad08070 100644 --- a/schema/constants.go +++ b/schema/constants.go @@ -36,6 +36,7 @@ const ( CreateTableCouponsSQL = "create table coupons (slug text, code text, hash text);" CreateIndexCouponsSQL = "create index index_coupons on coupons (slug);" CreateTableModeratedStoresSQL = "create table moderatedstores (peerID text primary key not null);" + CreateOrderMessagesSQL = "create table order_messages (messageID text primary key not null, orderID text, message_type integer, message blob, peerID text, url text, acknowledged bool, tries integer, created_at integer, updated_at integer);" // End SQL Statements // Configuration defaults diff --git a/schema/manager.go b/schema/manager.go index b687cac7c3..7dae15c473 100644 --- a/schema/manager.go +++ b/schema/manager.go @@ -305,6 +305,7 @@ func InitializeDatabaseSQL(encryptionPassword string) string { CreateTableCouponsSQL, CreateIndexCouponsSQL, CreateTableModeratedStoresSQL, + CreateOrderMessagesSQL, } return strings.Join(initializeStatement, " ") } From 9cdef62319a859f0e2e5dfde74ad88b5f885a426 Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Fri, 17 May 2019 20:03:47 +0530 Subject: [PATCH 02/23] remove reduntant return statement --- api/jsonapi.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/api/jsonapi.go b/api/jsonapi.go index dfc7ac4b10..8283a903c8 100644 --- a/api/jsonapi.go +++ b/api/jsonapi.go @@ -4202,7 +4202,6 @@ func (i *jsonAPIHandler) POSTSendOrderMessage(w http.ResponseWriter, r *http.Req } SanitizedResponse(w, "") - return } // GETScanOfflineMessages - used to manually trigger offline message scan @@ -4212,5 +4211,4 @@ func (i *jsonAPIHandler) GETScanOfflineMessages(w http.ResponseWriter, r *http.R lastManualScan = time.Now() } SanitizedResponse(w, "") - return } From 02279dacce576327f54b3799fbe7458db2179989 Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Fri, 17 May 2019 20:54:29 +0530 Subject: [PATCH 03/23] lint fixes --- net/retriever/retriever.go | 1 + repo/db/db.go | 1 + repo/db/order_messages.go | 5 ++++- repo/migrations/Migration024.go | 20 +++++++++++++------- 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/net/retriever/retriever.go b/net/retriever/retriever.go index 25872e58b8..012210289f 100644 --- a/net/retriever/retriever.go +++ b/net/retriever/retriever.go @@ -118,6 +118,7 @@ func (m *MessageRetriever) Run() { } } +// RunOnce - used to fetch messages only once func (m *MessageRetriever) RunOnce() { go m.fetchPointers(true) go m.fetchPointers(false) diff --git a/repo/db/db.go b/repo/db/db.go index dffdeedbf4..faf571a5a7 100644 --- a/repo/db/db.go +++ b/repo/db/db.go @@ -185,6 +185,7 @@ func (d *SQLiteDatastore) ModeratedStores() repo.ModeratedStore { return d.moderatedStores } +// OrderMessages - return the orderMessages datastore func (d *SQLiteDatastore) OrderMessages() repo.OrderMessageStore { return d.orderMessages } diff --git a/repo/db/order_messages.go b/repo/db/order_messages.go index c4e37b9e80..1a6fd27ef0 100644 --- a/repo/db/order_messages.go +++ b/repo/db/order_messages.go @@ -11,14 +11,17 @@ import ( "github.com/OpenBazaar/openbazaar-go/repo" ) +// OrderMessagesDB - represents the order_messages table type OrderMessagesDB struct { modelStore } +// NewOrderMessageStore - return new OrderMessagesDB func NewOrderMessageStore(db *sql.DB, lock *sync.Mutex) repo.OrderMessageStore { return &OrderMessagesDB{modelStore{db, lock}} } +// Put - insert record into the order_messages func (o *OrderMessagesDB) Put(orderID string, mType pb.Message_MessageType, peerID string, msg pb.Message) error { o.lock.Lock() defer o.lock.Unlock() @@ -50,7 +53,7 @@ func (o *OrderMessagesDB) Put(orderID string, mType pb.Message_MessageType, peer if err != nil { rErr := tx.Rollback() if rErr != nil { - return fmt.Errorf("order_message put fail: %s\nand rollback failed: %s\n", err.Error(), rErr.Error()) + return fmt.Errorf("order_message put fail: %s and rollback failed: %s", err.Error(), rErr.Error()) } return err } diff --git a/repo/migrations/Migration024.go b/repo/migrations/Migration024.go index 34f5505a72..eb3a4a4a76 100644 --- a/repo/migrations/Migration024.go +++ b/repo/migrations/Migration024.go @@ -9,13 +9,18 @@ import ( ) const ( - AM06_messagesCreateSQL = "create table order_messages (messageID text primary key not null, orderID text, message_type integer, message blob, peerID text, url text, acknowledged bool, tries integer, created_at integer, updated_at integer);" - AM06_upVer = "25" - AM06_downVer = "24" + // AM06MessagesCreateSQL - the order_messages create sql + AM06MessagesCreateSQL = "create table order_messages (messageID text primary key not null, orderID text, message_type integer, message blob, peerID text, url text, acknowledged bool, tries integer, created_at integer, updated_at integer);" + // AM06UpVer - set the repo Up version + AM06UpVer = "25" + // AM06DownVer - set the repo Down version + AM06DownVer = "24" ) +// AM06 - local migration struct type AM06 struct{} +// Migration024 - migration struct type Migration024 struct { AM06 } @@ -49,7 +54,7 @@ func createOrderMessages(repoPath, databasePassword, rVer string, testnetEnabled if err != nil { return err } - if _, err = tx.Exec(AM06_messagesCreateSQL); err != nil { + if _, err = tx.Exec(AM06MessagesCreateSQL); err != nil { tx.Rollback() return err } @@ -65,11 +70,12 @@ func createOrderMessages(repoPath, databasePassword, rVer string, testnetEnabled return nil } +// Up - the migration Up code func (AM06) Up(repoPath, databasePassword string, testnetEnabled bool) error { - fmt.Println("in am06 up") - return createOrderMessages(repoPath, databasePassword, AM06_upVer, testnetEnabled) + return createOrderMessages(repoPath, databasePassword, AM06UpVer, testnetEnabled) } +// Down - the migration Down code func (AM06) Down(repoPath, databasePassword string, testnetEnabled bool) error { - return createOrderMessages(repoPath, databasePassword, AM06_downVer, testnetEnabled) + return createOrderMessages(repoPath, databasePassword, AM06DownVer, testnetEnabled) } From 7235b6849ff758a1a760f6d9f28d4319b9ab1972 Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Tue, 21 May 2019 21:29:28 +0530 Subject: [PATCH 04/23] rename order_messages to messages and in general make this pr inclusive of all messages --- api/endpoints.go | 4 +- api/jsonapi.go | 28 ++-- core/net.go | 30 +++-- repo/datastore.go | 15 ++- repo/db/db.go | 10 +- repo/db/messages.go | 122 ++++++++++++++++++ ...rder_messages_test.go => messages_test.go} | 10 +- repo/db/order_messages.go | 92 ------------- repo/migrations/Migration024.go | 25 ++-- schema/constants.go | 4 +- schema/manager.go | 4 +- 11 files changed, 202 insertions(+), 142 deletions(-) create mode 100644 repo/db/messages.go rename repo/db/{order_messages_test.go => messages_test.go} (79%) delete mode 100644 repo/db/order_messages.go diff --git a/api/endpoints.go b/api/endpoints.go index 5f6b69d462..c7d5038418 100644 --- a/api/endpoints.go +++ b/api/endpoints.go @@ -113,8 +113,8 @@ func post(i *jsonAPIHandler, path string, w http.ResponseWriter, r *http.Request i.POSTPost(w, r) case strings.HasPrefix(path, "/ob/bulkupdatecurrency"): i.POSTBulkUpdateCurrency(w, r) - case strings.HasPrefix(path, "/ob/sendordermessage"): - i.POSTSendOrderMessage(w, r) + case strings.HasPrefix(path, "/ob/resendordermessage"): + i.POSTResendOrderMessage(w, r) default: ErrorResponse(w, http.StatusNotFound, "Not Found") } diff --git a/api/jsonapi.go b/api/jsonapi.go index 8283a903c8..bf8bfbd197 100644 --- a/api/jsonapi.go +++ b/api/jsonapi.go @@ -70,7 +70,6 @@ func newJSONAPIHandler(node *core.OpenBazaarNode, authCookie http.Cookie, config for _, ip := range config.AllowedIPs { allowedIPs[ip] = true } - lastManualScan = time.Now().Add(time.Duration(-10) * time.Minute) i := &jsonAPIHandler{ config: JSONAPIConfig{ Enabled: config.Enabled, @@ -1478,7 +1477,7 @@ func (i *jsonAPIHandler) GETProfile(w http.ResponseWriter, r *http.Request) { return } if profile.PeerID != peerID { - ErrorResponse(w, http.StatusNotFound, err.Error()) + ErrorResponse(w, http.StatusNotFound, "incorrect peer id") return } w.Header().Set("Cache-Control", "public, max-age=600, immutable") @@ -1887,7 +1886,7 @@ func (i *jsonAPIHandler) GETModerators(w http.ResponseWriter, r *http.Request) { if err != nil { return } - i.node.Broadcast <- repo.PremarshalledNotifier{b} + i.node.Broadcast <- repo.PremarshalledNotifier{Payload: b} } else { type wsResp struct { ID string `json:"id"` @@ -1898,7 +1897,7 @@ func (i *jsonAPIHandler) GETModerators(w http.ResponseWriter, r *http.Request) { if err != nil { return } - i.node.Broadcast <- repo.PremarshalledNotifier{data} + i.node.Broadcast <- repo.PremarshalledNotifier{Payload: data} } }(p) } @@ -2822,7 +2821,7 @@ func (i *jsonAPIHandler) POSTFetchProfiles(w http.ResponseWriter, r *http.Reques if err != nil { return } - i.node.Broadcast <- repo.PremarshalledNotifier{ret} + i.node.Broadcast <- repo.PremarshalledNotifier{Payload: ret} } pro, err := i.node.FetchProfile(pid, useCache) @@ -2847,7 +2846,7 @@ func (i *jsonAPIHandler) POSTFetchProfiles(w http.ResponseWriter, r *http.Reques respondWithError("error Marshalling to JSON") return } - i.node.Broadcast <- repo.PremarshalledNotifier{b} + i.node.Broadcast <- repo.PremarshalledNotifier{Payload: b} }(p) } }() @@ -3597,7 +3596,7 @@ func (i *jsonAPIHandler) POSTFetchRatings(w http.ResponseWriter, r *http.Request if err != nil { return } - i.node.Broadcast <- repo.PremarshalledNotifier{ret} + i.node.Broadcast <- repo.PremarshalledNotifier{Payload: ret} } ratingBytes, err := ipfs.Cat(i.node.IpfsNode, rid, time.Minute) if err != nil { @@ -3636,7 +3635,7 @@ func (i *jsonAPIHandler) POSTFetchRatings(w http.ResponseWriter, r *http.Request respondWithError("error marshalling rating") return } - i.node.Broadcast <- repo.PremarshalledNotifier{b} + i.node.Broadcast <- repo.PremarshalledNotifier{Payload: b} }(r) } } @@ -4155,10 +4154,10 @@ func (i *jsonAPIHandler) GETPost(w http.ResponseWriter, r *http.Request) { } // POSTSendOrderMessage - used to manually send an order message -func (i *jsonAPIHandler) POSTSendOrderMessage(w http.ResponseWriter, r *http.Request) { +func (i *jsonAPIHandler) POSTResendOrderMessage(w http.ResponseWriter, r *http.Request) { type sendRequest struct { OrderID string `json:"orderID"` - MessageType int32 `json:"messageType"` + MessageType string `json:"messageType"` } var args sendRequest @@ -4174,13 +4173,16 @@ func (i *jsonAPIHandler) POSTSendOrderMessage(w http.ResponseWriter, r *http.Req return } - if args.MessageType <= 0 { + var msgType int32 + var ok bool + + if msgType, ok = pb.Message_MessageType_value[args.MessageType]; !ok { ErrorResponse(w, http.StatusBadRequest, "invalid order message type") return } - msg, peerID, err := i.node.Datastore.OrderMessages(). - GetByOrderIDType(args.OrderID, pb.Message_MessageType(args.MessageType)) + msg, peerID, err := i.node.Datastore.Messages(). + GetByOrderIDType(args.OrderID, pb.Message_MessageType(msgType)) if err != nil { ErrorResponse(w, http.StatusBadRequest, "order message not found") return diff --git a/core/net.go b/core/net.go index 5896b5ae12..a96691e25c 100644 --- a/core/net.go +++ b/core/net.go @@ -2,15 +2,15 @@ package core import ( "errors" + "fmt" + "sync" + "time" libp2p "gx/ipfs/QmTW4SdgBWq9GjsBsHeUx8WuGxzhgzAf88UMH2w62PC8yK/go-libp2p-crypto" "gx/ipfs/QmTbxNB1NwDesLmKTscr4udL2tVP7MaxvXnD1D9yX7g3PN/go-cid" "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer" "gx/ipfs/QmerPMzPk1mJVowm8KgmoknWa4yCYvvugMPsgWmDNUvDLW/go-multihash" - "sync" - "time" - "github.com/OpenBazaar/openbazaar-go/ipfs" "github.com/OpenBazaar/openbazaar-go/pb" "github.com/golang/protobuf/proto" @@ -277,7 +277,9 @@ func (n *OpenBazaarNode) SendOrder(peerID string, contract *pb.RicardianContract } orderID0, _ := n.CalcOrderID(contract.BuyerOrder) - n.Datastore.OrderMessages().Put(orderID0, pb.Message_ORDER, peerID, m) + n.Datastore.Messages().Put( + fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER)), + orderID0, pb.Message_ORDER, peerID, m) resp, err = n.Service.SendRequest(ctx, p, &m) if err != nil { @@ -306,7 +308,9 @@ func (n *OpenBazaarNode) SendOrderConfirmation(peerID string, contract *pb.Ricar return err } orderID0, _ := n.CalcOrderID(contract.BuyerOrder) - n.Datastore.OrderMessages().Put(orderID0, pb.Message_ORDER_CONFIRMATION, peerID, m) + n.Datastore.Messages().Put( + fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_CONFIRMATION)), + orderID0, pb.Message_ORDER_CONFIRMATION, peerID, m) return n.sendMessage(peerID, &k, m) } @@ -330,7 +334,9 @@ func (n *OpenBazaarNode) SendCancel(peerID, orderID string) error { } kp = &k } - n.Datastore.OrderMessages().Put(orderID, pb.Message_ORDER_CANCEL, peerID, m) + n.Datastore.Messages().Put( + fmt.Sprintf("%s-%d", orderID, int(pb.Message_ORDER_CANCEL)), + orderID, pb.Message_ORDER_CANCEL, peerID, m) return n.sendMessage(peerID, kp, m) } @@ -356,7 +362,9 @@ func (n *OpenBazaarNode) SendReject(peerID string, rejectMessage *pb.OrderReject } kp = &k } - n.Datastore.OrderMessages().Put(rejectMessage.OrderID, pb.Message_ORDER_REJECT, peerID, m) + n.Datastore.Messages().Put( + fmt.Sprintf("%s-%d", rejectMessage.OrderID, int(pb.Message_ORDER_REJECT)), + rejectMessage.OrderID, pb.Message_ORDER_REJECT, peerID, m) return n.sendMessage(peerID, kp, m) } @@ -388,7 +396,9 @@ func (n *OpenBazaarNode) SendOrderFulfillment(peerID string, k *libp2p.PubKey, f Payload: a, } orderID0, _ := n.CalcOrderID(fulfillmentMessage.BuyerOrder) - n.Datastore.OrderMessages().Put(orderID0, pb.Message_ORDER_FULFILLMENT, peerID, m) + n.Datastore.Messages().Put( + fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_FULFILLMENT)), + orderID0, pb.Message_ORDER_FULFILLMENT, peerID, m) return n.sendMessage(peerID, k, m) } @@ -406,7 +416,9 @@ func (n *OpenBazaarNode) SendOrderCompletion(peerID string, k *libp2p.PubKey, co return err } orderID0, _ := n.CalcOrderID(completionMessage.BuyerOrder) - n.Datastore.OrderMessages().Put(orderID0, pb.Message_ORDER_COMPLETION, peerID, m) + n.Datastore.Messages().Put( + fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_COMPLETION)), + orderID0, pb.Message_ORDER_COMPLETION, peerID, m) return n.sendMessage(peerID, k, m) } diff --git a/repo/datastore.go b/repo/datastore.go index 032547035f..5b48ab2c69 100644 --- a/repo/datastore.go +++ b/repo/datastore.go @@ -28,7 +28,7 @@ type Datastore interface { Coupons() CouponStore TxMetadata() TransactionMetadataStore ModeratedStores() ModeratedStore - OrderMessages() OrderMessageStore + Messages() MessageStore Ping() error Close() } @@ -433,13 +433,16 @@ type WatchedScriptStore interface { wallet.WatchedScripts } -// OrderMessageStore is the order_messages table interface -type OrderMessageStore interface { +// MessageStore is the messages table interface +type MessageStore interface { Queryable - // Save a new order message - Put(orderID string, mType pb.Message_MessageType, peerID string, msg pb.Message) error + // Save a new message + Put(messageID, orderID string, mType pb.Message_MessageType, peerID string, msg pb.Message) error - // GetByOrderIDType returns the dispute payout data for a case + // GetByOrderIDType returns the message for specified order and type GetByOrderIDType(orderID string, mType pb.Message_MessageType) (*pb.Message, string, error) + + // GetByMessageIDType returns the message for specified message id + GetByMessageIDType(messageID string) (*pb.Message, string, error) } diff --git a/repo/db/db.go b/repo/db/db.go index faf571a5a7..d54ddac0ba 100644 --- a/repo/db/db.go +++ b/repo/db/db.go @@ -36,7 +36,7 @@ type SQLiteDatastore struct { coupons repo.CouponStore txMetadata repo.TransactionMetadataStore moderatedStores repo.ModeratedStore - orderMessages repo.OrderMessageStore + messages repo.MessageStore db *sql.DB lock *sync.Mutex } @@ -82,7 +82,7 @@ func NewSQLiteDatastore(db *sql.DB, l *sync.Mutex, coinType wallet.CoinType) *SQ coupons: NewCouponStore(db, l), txMetadata: NewTransactionMetadataStore(db, l), moderatedStores: NewModeratedStore(db, l), - orderMessages: NewOrderMessageStore(db, l), + messages: NewMessageStore(db, l), db: db, lock: l, } @@ -185,9 +185,9 @@ func (d *SQLiteDatastore) ModeratedStores() repo.ModeratedStore { return d.moderatedStores } -// OrderMessages - return the orderMessages datastore -func (d *SQLiteDatastore) OrderMessages() repo.OrderMessageStore { - return d.orderMessages +// Messages - return the messages datastore +func (d *SQLiteDatastore) Messages() repo.MessageStore { + return d.messages } func (d *SQLiteDatastore) Copy(dbPath string, password string) error { diff --git a/repo/db/messages.go b/repo/db/messages.go new file mode 100644 index 0000000000..7d1c636b14 --- /dev/null +++ b/repo/db/messages.go @@ -0,0 +1,122 @@ +package db + +import ( + "database/sql" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/OpenBazaar/openbazaar-go/pb" + "github.com/OpenBazaar/openbazaar-go/repo" +) + +// MessagesDB - represents the messages table +type MessagesDB struct { + modelStore +} + +// NewMessageStore - return new MessagesDB +func NewMessageStore(db *sql.DB, lock *sync.Mutex) repo.MessageStore { + return &MessagesDB{modelStore{db, lock}} +} + +// Put - insert record into the messages +func (o *MessagesDB) Put(messageID, orderID string, mType pb.Message_MessageType, peerID string, msg pb.Message) error { + o.lock.Lock() + defer o.lock.Unlock() + + tx, err := o.db.Begin() + if err != nil { + return err + } + stm := `insert or replace into messages(messageID, orderID, message_type, message, peerID, created_at) values(?,?,?,?,?,?)` + stmt, err := tx.Prepare(stm) + if err != nil { + return err + } + + msg0, err := json.Marshal(msg) + if err != nil { + fmt.Println("err marshaling : ", err) + } + + defer stmt.Close() + _, err = stmt.Exec( + messageID, + orderID, + int(mType), + msg0, + peerID, + int(time.Now().Unix()), + ) + if err != nil { + rErr := tx.Rollback() + if rErr != nil { + return fmt.Errorf("message put fail: %s and rollback failed: %s", err.Error(), rErr.Error()) + } + return err + } + + return tx.Commit() +} + +// GetByOrderIDType returns the message for the specified order and message type +func (o *MessagesDB) GetByOrderIDType(orderID string, mType pb.Message_MessageType) (*pb.Message, string, error) { + o.lock.Lock() + defer o.lock.Unlock() + var ( + msg0 []byte + peerID string + ) + + stmt, err := o.db.Prepare("select message, peerID from messages where orderID=? and message_type=?") + if err != nil { + return nil, "", err + } + err = stmt.QueryRow(orderID, mType).Scan(&msg0, &peerID) + if err != nil { + return nil, "", err + } + + msg := new(pb.Message) + + if len(msg0) > 0 { + err = json.Unmarshal(msg0, msg) + if err != nil { + return nil, "", err + } + } + + return msg, peerID, nil +} + +// GetByMessageIDType returns the message for the specified message id +func (o *MessagesDB) GetByMessageIDType(messageID string) (*pb.Message, string, error) { + o.lock.Lock() + defer o.lock.Unlock() + var ( + msg0 []byte + peerID string + ) + + stmt, err := o.db.Prepare("select message, peerID from messages where messageID=?") + if err != nil { + return nil, "", err + } + err = stmt.QueryRow(messageID).Scan(&msg0, &peerID) + if err != nil { + return nil, "", err + } + + msg := new(pb.Message) + + if len(msg0) > 0 { + err = json.Unmarshal(msg0, msg) + if err != nil { + return nil, "", err + } + } + + return msg, peerID, nil +} diff --git a/repo/db/order_messages_test.go b/repo/db/messages_test.go similarity index 79% rename from repo/db/order_messages_test.go rename to repo/db/messages_test.go index 5f478db1b4..55bc13f055 100644 --- a/repo/db/order_messages_test.go +++ b/repo/db/messages_test.go @@ -13,7 +13,7 @@ import ( "github.com/OpenBazaar/openbazaar-go/schema" ) -func buildNewOrderMessageStore() (repo.OrderMessageStore, func(), error) { +func buildNewMessageStore() (repo.MessageStore, func(), error) { appSchema := schema.MustNewCustomSchemaManager(schema.SchemaContext{ DataPath: schema.GenerateTempPath(), TestModeEnabled: true, @@ -28,12 +28,12 @@ func buildNewOrderMessageStore() (repo.OrderMessageStore, func(), error) { if err != nil { return nil, nil, err } - return db.NewOrderMessageStore(database, new(sync.Mutex)), appSchema.DestroySchemaDirectories, nil + return db.NewMessageStore(database, new(sync.Mutex)), appSchema.DestroySchemaDirectories, nil } -func TestOrderMessageDB_Put(t *testing.T) { +func TestMessageDB_Put(t *testing.T) { var ( - messagesdb, teardown, err = buildNewOrderMessageStore() + messagesdb, teardown, err = buildNewMessageStore() orderID = "orderID1" mType = pb.Message_ORDER payload = "sample message" @@ -49,7 +49,7 @@ func TestOrderMessageDB_Put(t *testing.T) { Payload: &any.Any{Value: []byte(payload)}, } - err = messagesdb.Put(orderID, mType, peerID, msg) + err = messagesdb.Put(fmt.Sprintf("%s-%d", orderID, mType), orderID, mType, peerID, msg) if err != nil { t.Error(err) } diff --git a/repo/db/order_messages.go b/repo/db/order_messages.go deleted file mode 100644 index 1a6fd27ef0..0000000000 --- a/repo/db/order_messages.go +++ /dev/null @@ -1,92 +0,0 @@ -package db - -import ( - "database/sql" - "encoding/json" - "fmt" - "sync" - "time" - - "github.com/OpenBazaar/openbazaar-go/pb" - "github.com/OpenBazaar/openbazaar-go/repo" -) - -// OrderMessagesDB - represents the order_messages table -type OrderMessagesDB struct { - modelStore -} - -// NewOrderMessageStore - return new OrderMessagesDB -func NewOrderMessageStore(db *sql.DB, lock *sync.Mutex) repo.OrderMessageStore { - return &OrderMessagesDB{modelStore{db, lock}} -} - -// Put - insert record into the order_messages -func (o *OrderMessagesDB) Put(orderID string, mType pb.Message_MessageType, peerID string, msg pb.Message) error { - o.lock.Lock() - defer o.lock.Unlock() - - tx, err := o.db.Begin() - if err != nil { - return err - } - stm := `insert or replace into order_messages(messageID, orderID, message_type, message, peerID, created_at) values(?,?,?,?,?,?)` - stmt, err := tx.Prepare(stm) - if err != nil { - return err - } - - msg0, err := json.Marshal(msg) - if err != nil { - fmt.Println("err marshaling : ", err) - } - - defer stmt.Close() - _, err = stmt.Exec( - fmt.Sprintf("%s-%d", orderID, int(mType)), - orderID, - int(mType), - msg0, - peerID, - int(time.Now().Unix()), - ) - if err != nil { - rErr := tx.Rollback() - if rErr != nil { - return fmt.Errorf("order_message put fail: %s and rollback failed: %s", err.Error(), rErr.Error()) - } - return err - } - - return tx.Commit() -} - -// GetByOrderIDType returns the dispute payout data for a case -func (o *OrderMessagesDB) GetByOrderIDType(orderID string, mType pb.Message_MessageType) (*pb.Message, string, error) { - o.lock.Lock() - defer o.lock.Unlock() - var ( - msg0 []byte - peerID string - ) - - stmt, err := o.db.Prepare("select message, peerID from order_messages where messageID=?") - if err != nil { - return nil, "", err - } - err = stmt.QueryRow(fmt.Sprintf("%s-%d", orderID, mType)).Scan(&msg0, &peerID) - if err != nil { - return nil, "", err - } - - msg := new(pb.Message) - - if len(msg0) > 0 { - err = json.Unmarshal(msg0, msg) - if err != nil { - return nil, "", err - } - } - - return msg, peerID, nil -} diff --git a/repo/migrations/Migration024.go b/repo/migrations/Migration024.go index eb3a4a4a76..4ebd758d23 100644 --- a/repo/migrations/Migration024.go +++ b/repo/migrations/Migration024.go @@ -9,12 +9,14 @@ import ( ) const ( - // AM06MessagesCreateSQL - the order_messages create sql - AM06MessagesCreateSQL = "create table order_messages (messageID text primary key not null, orderID text, message_type integer, message blob, peerID text, url text, acknowledged bool, tries integer, created_at integer, updated_at integer);" + // AM06MessagesCreateSQL - the messages create sql + AM06MessagesCreateSQL = "create table messages (messageID text primary key not null, orderID text, message_type integer, message blob, peerID text, url text, acknowledged bool, tries integer, created_at integer, updated_at integer);" + AM06CreateIndexMessagesSQL1 = "create index index_messages1 on messages (messageID);" + AM06CreateIndexMessagesSQL2 = "create index index_messages2 on messages (orderID, message_type);" // AM06UpVer - set the repo Up version - AM06UpVer = "25" + AM06UpVer = "25" // AM06DownVer - set the repo Down version - AM06DownVer = "24" + AM06DownVer = "24" ) // AM06 - local migration struct @@ -25,8 +27,7 @@ type Migration024 struct { AM06 } -func createOrderMessages(repoPath, databasePassword, rVer string, testnetEnabled bool) error { - fmt.Println("in create order messages") +func createMessages(repoPath, databasePassword, rVer string, testnetEnabled bool) error { var ( databaseFilePath string repoVersionFilePath = path.Join(repoPath, "repover") @@ -58,6 +59,14 @@ func createOrderMessages(repoPath, databasePassword, rVer string, testnetEnabled tx.Rollback() return err } + if _, err = tx.Exec(AM06CreateIndexMessagesSQL1); err != nil { + tx.Rollback() + return err + } + if _, err = tx.Exec(AM06CreateIndexMessagesSQL2); err != nil { + tx.Rollback() + return err + } if err = tx.Commit(); err != nil { return err } @@ -72,10 +81,10 @@ func createOrderMessages(repoPath, databasePassword, rVer string, testnetEnabled // Up - the migration Up code func (AM06) Up(repoPath, databasePassword string, testnetEnabled bool) error { - return createOrderMessages(repoPath, databasePassword, AM06UpVer, testnetEnabled) + return createMessages(repoPath, databasePassword, AM06UpVer, testnetEnabled) } // Down - the migration Down code func (AM06) Down(repoPath, databasePassword string, testnetEnabled bool) error { - return createOrderMessages(repoPath, databasePassword, AM06DownVer, testnetEnabled) + return createMessages(repoPath, databasePassword, AM06DownVer, testnetEnabled) } diff --git a/schema/constants.go b/schema/constants.go index 9ebad08070..76b1d40aab 100644 --- a/schema/constants.go +++ b/schema/constants.go @@ -36,7 +36,9 @@ const ( CreateTableCouponsSQL = "create table coupons (slug text, code text, hash text);" CreateIndexCouponsSQL = "create index index_coupons on coupons (slug);" CreateTableModeratedStoresSQL = "create table moderatedstores (peerID text primary key not null);" - CreateOrderMessagesSQL = "create table order_messages (messageID text primary key not null, orderID text, message_type integer, message blob, peerID text, url text, acknowledged bool, tries integer, created_at integer, updated_at integer);" + CreateMessagesSQL = "create table messages (messageID text primary key not null, orderID text, message_type integer, message blob, peerID text, url text, acknowledged bool, tries integer, created_at integer, updated_at integer);" + CreateIndexMessagesSQL1 = "create index index_messages1 on messages (messageID);" + CreateIndexMessagesSQL2 = "create index index_messages2 on messages (orderID, message_type);" // End SQL Statements // Configuration defaults diff --git a/schema/manager.go b/schema/manager.go index 7dae15c473..1b187fe39c 100644 --- a/schema/manager.go +++ b/schema/manager.go @@ -305,7 +305,9 @@ func InitializeDatabaseSQL(encryptionPassword string) string { CreateTableCouponsSQL, CreateIndexCouponsSQL, CreateTableModeratedStoresSQL, - CreateOrderMessagesSQL, + CreateMessagesSQL, + CreateIndexMessagesSQL1, + CreateIndexMessagesSQL2, } return strings.Join(initializeStatement, " ") } From ee1b99970479083c8fbb5595ec2d6bd800c1d248 Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Wed, 22 May 2019 21:03:05 +0530 Subject: [PATCH 05/23] add repo.Message as a wrapper for pb.Message --- repo/message.go | 35 ++++++++++++++++++++++++++++++++++ repo/message_test.go | 45 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 repo/message.go create mode 100644 repo/message_test.go diff --git a/repo/message.go b/repo/message.go new file mode 100644 index 0000000000..378337ccc8 --- /dev/null +++ b/repo/message.go @@ -0,0 +1,35 @@ +package repo + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/golang/protobuf/ptypes/any" + + "github.com/OpenBazaar/openbazaar-go/pb" +) + +var ErrUnknownMessage = errors.New("unknown or invalid message") + +type Message struct { + Msg pb.Message +} + +func (m *Message) MarshalJSON() ([]byte, error) { + fmt.Println("in marshal json") + return json.Marshal(m.Msg) +} + +func (m *Message) UnmarshalJSON(b []byte) error { + fmt.Println("in unmarshal json") + return json.Unmarshal(b, &m.Msg) +} + +func (m *Message) GetMessageType() pb.Message_MessageType { + return m.Msg.MessageType +} + +func (m *Message) GetPayload() *any.Any { + return m.Msg.Payload +} diff --git a/repo/message_test.go b/repo/message_test.go new file mode 100644 index 0000000000..3aa96908fd --- /dev/null +++ b/repo/message_test.go @@ -0,0 +1,45 @@ +package repo_test + +import ( + "fmt" + "testing" + + "github.com/golang/protobuf/ptypes/any" + + "github.com/OpenBazaar/openbazaar-go/pb" + "github.com/OpenBazaar/openbazaar-go/repo" +) + +func TestMessage(t *testing.T) { + var ( + mType = pb.Message_ORDER + payload = "sample message" + ) + + msg := pb.Message{ + MessageType: mType, + Payload: &any.Any{Value: []byte(payload)}, + } + + repoMsg := repo.Message{Msg: msg} + + repoMsgBytes, err := repoMsg.MarshalJSON() + if err != nil { + t.Error(err) + } + + var retRepoMsg repo.Message + + err = retRepoMsg.UnmarshalJSON(repoMsgBytes) + if err != nil { + t.Error(err) + } + + //fmt.Println(retRepoMsg.GetMessageType()) + //fmt.Println(retRepoMsg.GetPayload()) + + if retRepoMsg.GetMessageType() != pb.Message_ORDER { + t.Error("wrong msg type") + } + +} From 1ef434295c04da266a880286f574866b4312ec33 Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Wed, 22 May 2019 21:05:08 +0530 Subject: [PATCH 06/23] add repo.Message as a wrapper for pb.Message --- repo/message.go | 3 --- repo/message_test.go | 4 ---- 2 files changed, 7 deletions(-) diff --git a/repo/message.go b/repo/message.go index 378337ccc8..bff2fe8cc3 100644 --- a/repo/message.go +++ b/repo/message.go @@ -3,7 +3,6 @@ package repo import ( "encoding/json" "errors" - "fmt" "github.com/golang/protobuf/ptypes/any" @@ -17,12 +16,10 @@ type Message struct { } func (m *Message) MarshalJSON() ([]byte, error) { - fmt.Println("in marshal json") return json.Marshal(m.Msg) } func (m *Message) UnmarshalJSON(b []byte) error { - fmt.Println("in unmarshal json") return json.Unmarshal(b, &m.Msg) } diff --git a/repo/message_test.go b/repo/message_test.go index 3aa96908fd..04bfcf66d9 100644 --- a/repo/message_test.go +++ b/repo/message_test.go @@ -1,7 +1,6 @@ package repo_test import ( - "fmt" "testing" "github.com/golang/protobuf/ptypes/any" @@ -35,9 +34,6 @@ func TestMessage(t *testing.T) { t.Error(err) } - //fmt.Println(retRepoMsg.GetMessageType()) - //fmt.Println(retRepoMsg.GetPayload()) - if retRepoMsg.GetMessageType() != pb.Message_ORDER { t.Error("wrong msg type") } From bb5f5fe91446d7ab12764a67f97d96abaec9f34e Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Wed, 22 May 2019 21:22:25 +0530 Subject: [PATCH 07/23] use repo.Message in the messages datastore --- repo/datastore.go | 6 +++--- repo/db/messages.go | 14 +++++++------- repo/db/messages_test.go | 14 ++++++++------ 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/repo/datastore.go b/repo/datastore.go index 5b48ab2c69..36702671ba 100644 --- a/repo/datastore.go +++ b/repo/datastore.go @@ -438,11 +438,11 @@ type MessageStore interface { Queryable // Save a new message - Put(messageID, orderID string, mType pb.Message_MessageType, peerID string, msg pb.Message) error + Put(messageID, orderID string, mType pb.Message_MessageType, peerID string, msg Message) error // GetByOrderIDType returns the message for specified order and type - GetByOrderIDType(orderID string, mType pb.Message_MessageType) (*pb.Message, string, error) + GetByOrderIDType(orderID string, mType pb.Message_MessageType) (*Message, string, error) // GetByMessageIDType returns the message for specified message id - GetByMessageIDType(messageID string) (*pb.Message, string, error) + GetByMessageIDType(messageID string) (*Message, string, error) } diff --git a/repo/db/messages.go b/repo/db/messages.go index 7d1c636b14..cde68d7c15 100644 --- a/repo/db/messages.go +++ b/repo/db/messages.go @@ -22,7 +22,7 @@ func NewMessageStore(db *sql.DB, lock *sync.Mutex) repo.MessageStore { } // Put - insert record into the messages -func (o *MessagesDB) Put(messageID, orderID string, mType pb.Message_MessageType, peerID string, msg pb.Message) error { +func (o *MessagesDB) Put(messageID, orderID string, mType pb.Message_MessageType, peerID string, msg repo.Message) error { o.lock.Lock() defer o.lock.Unlock() @@ -36,7 +36,7 @@ func (o *MessagesDB) Put(messageID, orderID string, mType pb.Message_MessageType return err } - msg0, err := json.Marshal(msg) + msg0, err := msg.MarshalJSON() if err != nil { fmt.Println("err marshaling : ", err) } @@ -62,7 +62,7 @@ func (o *MessagesDB) Put(messageID, orderID string, mType pb.Message_MessageType } // GetByOrderIDType returns the message for the specified order and message type -func (o *MessagesDB) GetByOrderIDType(orderID string, mType pb.Message_MessageType) (*pb.Message, string, error) { +func (o *MessagesDB) GetByOrderIDType(orderID string, mType pb.Message_MessageType) (*repo.Message, string, error) { o.lock.Lock() defer o.lock.Unlock() var ( @@ -79,10 +79,10 @@ func (o *MessagesDB) GetByOrderIDType(orderID string, mType pb.Message_MessageTy return nil, "", err } - msg := new(pb.Message) + msg := new(repo.Message) if len(msg0) > 0 { - err = json.Unmarshal(msg0, msg) + err = msg.UnmarshalJSON(msg0) if err != nil { return nil, "", err } @@ -92,7 +92,7 @@ func (o *MessagesDB) GetByOrderIDType(orderID string, mType pb.Message_MessageTy } // GetByMessageIDType returns the message for the specified message id -func (o *MessagesDB) GetByMessageIDType(messageID string) (*pb.Message, string, error) { +func (o *MessagesDB) GetByMessageIDType(messageID string) (*repo.Message, string, error) { o.lock.Lock() defer o.lock.Unlock() var ( @@ -109,7 +109,7 @@ func (o *MessagesDB) GetByMessageIDType(messageID string) (*pb.Message, string, return nil, "", err } - msg := new(pb.Message) + msg := new(repo.Message) if len(msg0) > 0 { err = json.Unmarshal(msg0, msg) diff --git a/repo/db/messages_test.go b/repo/db/messages_test.go index 55bc13f055..ddac99754e 100644 --- a/repo/db/messages_test.go +++ b/repo/db/messages_test.go @@ -44,9 +44,11 @@ func TestMessageDB_Put(t *testing.T) { } defer teardown() - msg := pb.Message{ - MessageType: mType, - Payload: &any.Any{Value: []byte(payload)}, + msg := repo.Message{ + Msg: pb.Message{ + MessageType: mType, + Payload: &any.Any{Value: []byte(payload)}, + }, } err = messagesdb.Put(fmt.Sprintf("%s-%d", orderID, mType), orderID, mType, peerID, msg) @@ -55,13 +57,13 @@ func TestMessageDB_Put(t *testing.T) { } retMsg, peer, err := messagesdb.GetByOrderIDType(orderID, mType) - if err != nil { + if err != nil || retMsg == nil { t.Error(err) } - fmt.Println(string(retMsg.Payload.Value), " ", peer) + fmt.Println(string(retMsg.GetPayload().Value), " ", peer) - if !(string(retMsg.Payload.Value) == payload) { + if !(string(retMsg.GetPayload().Value) == payload) { t.Error("incorrect payload") } From d18ec20e72c24eda755c2300afcda59c2c30aab1 Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Wed, 22 May 2019 23:38:03 +0530 Subject: [PATCH 08/23] use repo.Message in message handler and the resend api endpoint --- api/jsonapi.go | 4 ++-- core/net.go | 13 +++++++------ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/api/jsonapi.go b/api/jsonapi.go index bf8bfbd197..e41e4a6ede 100644 --- a/api/jsonapi.go +++ b/api/jsonapi.go @@ -4183,7 +4183,7 @@ func (i *jsonAPIHandler) POSTResendOrderMessage(w http.ResponseWriter, r *http.R msg, peerID, err := i.node.Datastore.Messages(). GetByOrderIDType(args.OrderID, pb.Message_MessageType(msgType)) - if err != nil { + if err != nil || msg == nil || msg.Msg.GetPayload() == nil { ErrorResponse(w, http.StatusBadRequest, "order message not found") return } @@ -4197,7 +4197,7 @@ func (i *jsonAPIHandler) POSTResendOrderMessage(w http.ResponseWriter, r *http.R ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err = i.node.Service.SendMessage(ctx, p, msg) + err = i.node.Service.SendMessage(ctx, p, &msg.Msg) if err != nil { ErrorResponse(w, http.StatusBadRequest, "order message not sent") return diff --git a/core/net.go b/core/net.go index a96691e25c..81b97aee92 100644 --- a/core/net.go +++ b/core/net.go @@ -13,6 +13,7 @@ import ( "github.com/OpenBazaar/openbazaar-go/ipfs" "github.com/OpenBazaar/openbazaar-go/pb" + "github.com/OpenBazaar/openbazaar-go/repo" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/any" @@ -279,7 +280,7 @@ func (n *OpenBazaarNode) SendOrder(peerID string, contract *pb.RicardianContract orderID0, _ := n.CalcOrderID(contract.BuyerOrder) n.Datastore.Messages().Put( fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER)), - orderID0, pb.Message_ORDER, peerID, m) + orderID0, pb.Message_ORDER, peerID, repo.Message{Msg: m}) resp, err = n.Service.SendRequest(ctx, p, &m) if err != nil { @@ -310,7 +311,7 @@ func (n *OpenBazaarNode) SendOrderConfirmation(peerID string, contract *pb.Ricar orderID0, _ := n.CalcOrderID(contract.BuyerOrder) n.Datastore.Messages().Put( fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_CONFIRMATION)), - orderID0, pb.Message_ORDER_CONFIRMATION, peerID, m) + orderID0, pb.Message_ORDER_CONFIRMATION, peerID, repo.Message{Msg: m}) return n.sendMessage(peerID, &k, m) } @@ -336,7 +337,7 @@ func (n *OpenBazaarNode) SendCancel(peerID, orderID string) error { } n.Datastore.Messages().Put( fmt.Sprintf("%s-%d", orderID, int(pb.Message_ORDER_CANCEL)), - orderID, pb.Message_ORDER_CANCEL, peerID, m) + orderID, pb.Message_ORDER_CANCEL, peerID, repo.Message{Msg: m}) return n.sendMessage(peerID, kp, m) } @@ -364,7 +365,7 @@ func (n *OpenBazaarNode) SendReject(peerID string, rejectMessage *pb.OrderReject } n.Datastore.Messages().Put( fmt.Sprintf("%s-%d", rejectMessage.OrderID, int(pb.Message_ORDER_REJECT)), - rejectMessage.OrderID, pb.Message_ORDER_REJECT, peerID, m) + rejectMessage.OrderID, pb.Message_ORDER_REJECT, peerID, repo.Message{Msg: m}) return n.sendMessage(peerID, kp, m) } @@ -398,7 +399,7 @@ func (n *OpenBazaarNode) SendOrderFulfillment(peerID string, k *libp2p.PubKey, f orderID0, _ := n.CalcOrderID(fulfillmentMessage.BuyerOrder) n.Datastore.Messages().Put( fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_FULFILLMENT)), - orderID0, pb.Message_ORDER_FULFILLMENT, peerID, m) + orderID0, pb.Message_ORDER_FULFILLMENT, peerID, repo.Message{Msg: m}) return n.sendMessage(peerID, k, m) } @@ -418,7 +419,7 @@ func (n *OpenBazaarNode) SendOrderCompletion(peerID string, k *libp2p.PubKey, co orderID0, _ := n.CalcOrderID(completionMessage.BuyerOrder) n.Datastore.Messages().Put( fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_COMPLETION)), - orderID0, pb.Message_ORDER_COMPLETION, peerID, m) + orderID0, pb.Message_ORDER_COMPLETION, peerID, repo.Message{Msg: m}) return n.sendMessage(peerID, k, m) } From 75bdee7f475d05a16eadfb433f545927db47f4ae Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Mon, 24 Jun 2019 11:59:53 +0530 Subject: [PATCH 09/23] lint fixes --- repo/message.go | 6 ++++++ repo/migrations/Migration024.go | 4 +++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/repo/message.go b/repo/message.go index bff2fe8cc3..4f4f0c2fb7 100644 --- a/repo/message.go +++ b/repo/message.go @@ -9,24 +9,30 @@ import ( "github.com/OpenBazaar/openbazaar-go/pb" ) +// ErrUnknownMessage - notify an invalid message var ErrUnknownMessage = errors.New("unknown or invalid message") +// Message - wrapper for pb.Message type Message struct { Msg pb.Message } +// MarshalJSON - invoke the pb.Message marshaller func (m *Message) MarshalJSON() ([]byte, error) { return json.Marshal(m.Msg) } +// UnmarshalJSON - invoke the pb.Message unmarshaller func (m *Message) UnmarshalJSON(b []byte) error { return json.Unmarshal(b, &m.Msg) } +// GetMessageType - return the pb.Message messageType func (m *Message) GetMessageType() pb.Message_MessageType { return m.Msg.MessageType } +// GetPayload - return the pb.Message payload func (m *Message) GetPayload() *any.Any { return m.Msg.Payload } diff --git a/repo/migrations/Migration024.go b/repo/migrations/Migration024.go index 4ebd758d23..e7f4a3cf4f 100644 --- a/repo/migrations/Migration024.go +++ b/repo/migrations/Migration024.go @@ -10,8 +10,10 @@ import ( const ( // AM06MessagesCreateSQL - the messages create sql - AM06MessagesCreateSQL = "create table messages (messageID text primary key not null, orderID text, message_type integer, message blob, peerID text, url text, acknowledged bool, tries integer, created_at integer, updated_at integer);" + AM06MessagesCreateSQL = "create table messages (messageID text primary key not null, orderID text, message_type integer, message blob, peerID text, url text, acknowledged bool, tries integer, created_at integer, updated_at integer);" + // AM06CreateIndexMessagesSQL1 - the messages index on messageID sql AM06CreateIndexMessagesSQL1 = "create index index_messages1 on messages (messageID);" + // AM06CreateIndexMessagesSQL2 - the messages composite index on orderID and messageType create sql AM06CreateIndexMessagesSQL2 = "create index index_messages2 on messages (orderID, message_type);" // AM06UpVer - set the repo Up version AM06UpVer = "25" From 63624ff92f9cc6ca385826a60c4bb3ab10a44201 Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Wed, 10 Jul 2019 09:23:09 +0530 Subject: [PATCH 10/23] add messages composite index for peerID messageType tuple --- schema/constants.go | 1 + 1 file changed, 1 insertion(+) diff --git a/schema/constants.go b/schema/constants.go index 76b1d40aab..b301145c70 100644 --- a/schema/constants.go +++ b/schema/constants.go @@ -39,6 +39,7 @@ const ( CreateMessagesSQL = "create table messages (messageID text primary key not null, orderID text, message_type integer, message blob, peerID text, url text, acknowledged bool, tries integer, created_at integer, updated_at integer);" CreateIndexMessagesSQL1 = "create index index_messages1 on messages (messageID);" CreateIndexMessagesSQL2 = "create index index_messages2 on messages (orderID, message_type);" + CreateIndexMessagesSQL3 = "create index index_messages2 on messages (peerID, message_type);" // End SQL Statements // Configuration defaults From d4d93051c36ad3f17c72d6acdad0bfc5275b7a40 Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Wed, 10 Jul 2019 10:54:44 +0530 Subject: [PATCH 11/23] handle errors in core/net put messages --- core/net.go | 52 +++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/core/net.go b/core/net.go index 81b97aee92..15d82b5e04 100644 --- a/core/net.go +++ b/core/net.go @@ -8,7 +8,7 @@ import ( libp2p "gx/ipfs/QmTW4SdgBWq9GjsBsHeUx8WuGxzhgzAf88UMH2w62PC8yK/go-libp2p-crypto" "gx/ipfs/QmTbxNB1NwDesLmKTscr4udL2tVP7MaxvXnD1D9yX7g3PN/go-cid" - "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer" + peer "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer" "gx/ipfs/QmerPMzPk1mJVowm8KgmoknWa4yCYvvugMPsgWmDNUvDLW/go-multihash" "github.com/OpenBazaar/openbazaar-go/ipfs" @@ -277,10 +277,16 @@ func (n *OpenBazaarNode) SendOrder(peerID string, contract *pb.RicardianContract Payload: pbAny, } - orderID0, _ := n.CalcOrderID(contract.BuyerOrder) - n.Datastore.Messages().Put( + orderID0, err := n.CalcOrderID(contract.BuyerOrder) + if err != nil { + return resp, err + } + err = n.Datastore.Messages().Put( fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER)), orderID0, pb.Message_ORDER, peerID, repo.Message{Msg: m}) + if err != nil { + log.Errorf("failed putting message (%s-%d)", orderID0, int(pb.Message_ORDER)) + } resp, err = n.Service.SendRequest(ctx, p, &m) if err != nil { @@ -308,10 +314,16 @@ func (n *OpenBazaarNode) SendOrderConfirmation(peerID string, contract *pb.Ricar if err != nil { return err } - orderID0, _ := n.CalcOrderID(contract.BuyerOrder) - n.Datastore.Messages().Put( + orderID0, err := n.CalcOrderID(contract.BuyerOrder) + if err != nil { + return err + } + err = n.Datastore.Messages().Put( fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_CONFIRMATION)), orderID0, pb.Message_ORDER_CONFIRMATION, peerID, repo.Message{Msg: m}) + if err != nil { + log.Errorf("failed putting message (%s-%d)", orderID0, int(pb.Message_ORDER)) + } return n.sendMessage(peerID, &k, m) } @@ -335,9 +347,12 @@ func (n *OpenBazaarNode) SendCancel(peerID, orderID string) error { } kp = &k } - n.Datastore.Messages().Put( + err = n.Datastore.Messages().Put( fmt.Sprintf("%s-%d", orderID, int(pb.Message_ORDER_CANCEL)), orderID, pb.Message_ORDER_CANCEL, peerID, repo.Message{Msg: m}) + if err != nil { + log.Errorf("failed putting message (%s-%d)", orderID, int(pb.Message_ORDER)) + } return n.sendMessage(peerID, kp, m) } @@ -363,9 +378,12 @@ func (n *OpenBazaarNode) SendReject(peerID string, rejectMessage *pb.OrderReject } kp = &k } - n.Datastore.Messages().Put( + err = n.Datastore.Messages().Put( fmt.Sprintf("%s-%d", rejectMessage.OrderID, int(pb.Message_ORDER_REJECT)), rejectMessage.OrderID, pb.Message_ORDER_REJECT, peerID, repo.Message{Msg: m}) + if err != nil { + log.Errorf("failed putting message (%s-%d)", rejectMessage.OrderID, int(pb.Message_ORDER)) + } return n.sendMessage(peerID, kp, m) } @@ -396,10 +414,16 @@ func (n *OpenBazaarNode) SendOrderFulfillment(peerID string, k *libp2p.PubKey, f MessageType: pb.Message_ORDER_FULFILLMENT, Payload: a, } - orderID0, _ := n.CalcOrderID(fulfillmentMessage.BuyerOrder) - n.Datastore.Messages().Put( + orderID0, err := n.CalcOrderID(fulfillmentMessage.BuyerOrder) + if err != nil { + return err + } + err = n.Datastore.Messages().Put( fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_FULFILLMENT)), orderID0, pb.Message_ORDER_FULFILLMENT, peerID, repo.Message{Msg: m}) + if err != nil { + log.Errorf("failed putting message (%s-%d)", orderID0, int(pb.Message_ORDER)) + } return n.sendMessage(peerID, k, m) } @@ -416,10 +440,16 @@ func (n *OpenBazaarNode) SendOrderCompletion(peerID string, k *libp2p.PubKey, co if err != nil { return err } - orderID0, _ := n.CalcOrderID(completionMessage.BuyerOrder) - n.Datastore.Messages().Put( + orderID0, err := n.CalcOrderID(completionMessage.BuyerOrder) + if err != nil { + return err + } + err = n.Datastore.Messages().Put( fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_COMPLETION)), orderID0, pb.Message_ORDER_COMPLETION, peerID, repo.Message{Msg: m}) + if err != nil { + log.Errorf("failed putting message (%s-%d)", orderID0, int(pb.Message_ORDER)) + } return n.sendMessage(peerID, k, m) } From cae2cdedfb5010114b6f2b23666ad92d2849f59f Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Wed, 10 Jul 2019 12:06:57 +0530 Subject: [PATCH 12/23] fix repo messages --- repo/datastore.go | 4 ++-- repo/db/messages.go | 12 ++++++------ repo/db/messages_test.go | 4 +--- repo/message_test.go | 3 ++- 4 files changed, 11 insertions(+), 12 deletions(-) diff --git a/repo/datastore.go b/repo/datastore.go index 36702671ba..0a5ae93342 100644 --- a/repo/datastore.go +++ b/repo/datastore.go @@ -443,6 +443,6 @@ type MessageStore interface { // GetByOrderIDType returns the message for specified order and type GetByOrderIDType(orderID string, mType pb.Message_MessageType) (*Message, string, error) - // GetByMessageIDType returns the message for specified message id - GetByMessageIDType(messageID string) (*Message, string, error) + // GetByMessageID returns the message for specified message id + GetByMessageID(messageID string) (*Message, string, error) } diff --git a/repo/db/messages.go b/repo/db/messages.go index cde68d7c15..abf57f07ae 100644 --- a/repo/db/messages.go +++ b/repo/db/messages.go @@ -11,17 +11,17 @@ import ( "github.com/OpenBazaar/openbazaar-go/repo" ) -// MessagesDB - represents the messages table +// MessagesDB represents the messages table type MessagesDB struct { modelStore } -// NewMessageStore - return new MessagesDB +// NewMessageStore return new MessagesDB func NewMessageStore(db *sql.DB, lock *sync.Mutex) repo.MessageStore { return &MessagesDB{modelStore{db, lock}} } -// Put - insert record into the messages +// Put will insert a record into the messages func (o *MessagesDB) Put(messageID, orderID string, mType pb.Message_MessageType, peerID string, msg repo.Message) error { o.lock.Lock() defer o.lock.Unlock() @@ -38,7 +38,7 @@ func (o *MessagesDB) Put(messageID, orderID string, mType pb.Message_MessageType msg0, err := msg.MarshalJSON() if err != nil { - fmt.Println("err marshaling : ", err) + log.Errorf("err marshalling json: %v", err) } defer stmt.Close() @@ -91,8 +91,8 @@ func (o *MessagesDB) GetByOrderIDType(orderID string, mType pb.Message_MessageTy return msg, peerID, nil } -// GetByMessageIDType returns the message for the specified message id -func (o *MessagesDB) GetByMessageIDType(messageID string) (*repo.Message, string, error) { +// GetByMessageID returns the message for the specified message id +func (o *MessagesDB) GetByMessageID(messageID string) (*repo.Message, string, error) { o.lock.Lock() defer o.lock.Unlock() var ( diff --git a/repo/db/messages_test.go b/repo/db/messages_test.go index ddac99754e..7b69d4d7c7 100644 --- a/repo/db/messages_test.go +++ b/repo/db/messages_test.go @@ -61,13 +61,11 @@ func TestMessageDB_Put(t *testing.T) { t.Error(err) } - fmt.Println(string(retMsg.GetPayload().Value), " ", peer) - if !(string(retMsg.GetPayload().Value) == payload) { t.Error("incorrect payload") } - if !(peer == peerID) { + if peer != peerID { t.Error("incorrect peerID") } } diff --git a/repo/message_test.go b/repo/message_test.go index 04bfcf66d9..183df04928 100644 --- a/repo/message_test.go +++ b/repo/message_test.go @@ -1,6 +1,7 @@ package repo_test import ( + "bytes" "testing" "github.com/golang/protobuf/ptypes/any" @@ -34,7 +35,7 @@ func TestMessage(t *testing.T) { t.Error(err) } - if retRepoMsg.GetMessageType() != pb.Message_ORDER { + if !bytes.Equal(retRepoMsg.GetPayload().GetValue(), []byte(payload)) { t.Error("wrong msg type") } From 1366f14d9c6ffcbd76675de766b6d28a2d617456 Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Wed, 10 Jul 2019 20:48:24 +0530 Subject: [PATCH 13/23] fix migration 24 and add unit test --- repo/migrations/Migration024.go | 116 +++++++++++++++++++++------ repo/migrations/Migration024_test.go | 87 ++++++++++++++++++++ 2 files changed, 180 insertions(+), 23 deletions(-) diff --git a/repo/migrations/Migration024.go b/repo/migrations/Migration024.go index e7f4a3cf4f..81024b9f73 100644 --- a/repo/migrations/Migration024.go +++ b/repo/migrations/Migration024.go @@ -9,24 +9,34 @@ import ( ) const ( - // AM06MessagesCreateSQL - the messages create sql - AM06MessagesCreateSQL = "create table messages (messageID text primary key not null, orderID text, message_type integer, message blob, peerID text, url text, acknowledged bool, tries integer, created_at integer, updated_at integer);" - // AM06CreateIndexMessagesSQL1 - the messages index on messageID sql - AM06CreateIndexMessagesSQL1 = "create index index_messages1 on messages (messageID);" - // AM06CreateIndexMessagesSQL2 - the messages composite index on orderID and messageType create sql - AM06CreateIndexMessagesSQL2 = "create index index_messages2 on messages (orderID, message_type);" - // AM06UpVer - set the repo Up version - AM06UpVer = "25" - // AM06DownVer - set the repo Down version - AM06DownVer = "24" + // MigrationCreateMessages_AM06MessagesCreateSQL the messages create sql + MigrationCreateMessages_AM06MessagesCreateSQL = "create table messages (messageID text primary key not null, orderID text, message_type integer, message blob, peerID text, url text, acknowledged bool, tries integer, created_at integer, updated_at integer);" + // MigrationCreateMessages_AM06CreateIndexMessagesSQL_MessageID the messages index on messageID sql + MigrationCreateMessages_AM06CreateIndexMessagesSQL_MessageID = "create index index_messages_messageID on messages (messageID);" + // MigrationCreateMessages_AM06CreateIndexMessagesSQL_OrderID_MType the messages composite index on orderID and messageType create sql + MigrationCreateMessages_AM06CreateIndexMessagesSQL_OrderID_MType = "create index index_messages_orderIDmType on messages (orderID, message_type);" + // MigrationCreateMessages_AM06CreateIndexMessagesSQL_PeerID_MType the messages composite index on peerID and messageType create sql + MigrationCreateMessages_AM06CreateIndexMessagesSQL_PeerID_MType = "create index index_messages_peerIDmType on messages (peerID, message_type);" + // MigrationCreateMessages_AM06MessagesDeleteSQL the messages delete sql + MigrationCreateMessages_AM06MessagesDeleteSQL = "drop table if exists messages;" + // MigrationCreateMessages_AM06CreateIndexMessagesSQL_MessageID delete the messages index on messageID sql + MigrationCreateMessages_AM06DeleteIndexMessagesSQL_MessageID = "drop index if exists index_messages_messageID;" + // MigrationCreateMessages_AM06CreateIndexMessagesSQL_OrderID_MType delete the messages composite index on orderID and messageType create sql + MigrationCreateMessages_AM06DeleteIndexMessagesSQL_OrderID_MType = "drop index if exists index_messages_orderIDmType;" + // MigrationCreateMessages_AM06CreateIndexMessagesSQL_PeerID_MType delete the messages composite index on peerID and messageType create sql + MigrationCreateMessages_AM06DeleteIndexMessagesSQL_PeerID_MType = "drop index if exists index_messages_peerIDmType;" + // MigrationCreateMessages_AM06UpVer set the repo Up version + MigrationCreateMessages_AM06UpVer = "25" + // MigrationCreateMessages_AM06DownVer set the repo Down version + MigrationCreateMessages_AM06DownVer = "24" ) -// AM06 - local migration struct -type AM06 struct{} +// MigrationCreateMessages_AM06 local migration struct +type MigrationCreateMessages_AM06 struct{} -// Migration024 - migration struct +// Migration024 migration struct type Migration024 struct { - AM06 + MigrationCreateMessages_AM06 } func createMessages(repoPath, databasePassword, rVer string, testnetEnabled bool) error { @@ -57,15 +67,75 @@ func createMessages(repoPath, databasePassword, rVer string, testnetEnabled bool if err != nil { return err } - if _, err = tx.Exec(AM06MessagesCreateSQL); err != nil { + if _, err = tx.Exec(MigrationCreateMessages_AM06MessagesCreateSQL); err != nil { tx.Rollback() return err } - if _, err = tx.Exec(AM06CreateIndexMessagesSQL1); err != nil { + if _, err = tx.Exec(MigrationCreateMessages_AM06CreateIndexMessagesSQL_MessageID); err != nil { tx.Rollback() return err } - if _, err = tx.Exec(AM06CreateIndexMessagesSQL2); err != nil { + if _, err = tx.Exec(MigrationCreateMessages_AM06CreateIndexMessagesSQL_OrderID_MType); err != nil { + tx.Rollback() + return err + } + if _, err = tx.Exec(MigrationCreateMessages_AM06CreateIndexMessagesSQL_PeerID_MType); err != nil { + tx.Rollback() + return err + } + if err = tx.Commit(); err != nil { + return err + } + + // Bump schema version + err = ioutil.WriteFile(repoVersionFilePath, []byte(rVer), os.ModePerm) + if err != nil { + return err + } + return nil +} + +func deleteMessages(repoPath, databasePassword, rVer string, testnetEnabled bool) error { + var ( + databaseFilePath string + repoVersionFilePath = path.Join(repoPath, "repover") + ) + if testnetEnabled { + databaseFilePath = path.Join(repoPath, "datastore", "testnet.db") + } else { + databaseFilePath = path.Join(repoPath, "datastore", "mainnet.db") + } + + db, err := sql.Open("sqlite3", databaseFilePath) + if err != nil { + return err + } + defer db.Close() + if databasePassword != "" { + p := fmt.Sprintf("pragma key = '%s';", databasePassword) + _, err := db.Exec(p) + if err != nil { + return err + } + } + + tx, err := db.Begin() + if err != nil { + return err + } + if _, err = tx.Exec(MigrationCreateMessages_AM06DeleteIndexMessagesSQL_MessageID); err != nil { + tx.Rollback() + return err + } + if _, err = tx.Exec(MigrationCreateMessages_AM06DeleteIndexMessagesSQL_OrderID_MType); err != nil { + tx.Rollback() + return err + } + if _, err = tx.Exec(MigrationCreateMessages_AM06DeleteIndexMessagesSQL_PeerID_MType); err != nil { + tx.Rollback() + return err + } + if _, err = tx.Exec(MigrationCreateMessages_AM06MessagesDeleteSQL); err != nil { tx.Rollback() return err } @@ -81,12 +151,12 @@ func createMessages(repoPath, databasePassword, rVer string, testnetEnabled bool return nil } -// Up - the migration Up code -func (AM06) Up(repoPath, databasePassword string, testnetEnabled bool) error { - return createMessages(repoPath, databasePassword, AM06UpVer, testnetEnabled) +// Up the migration Up code +func (MigrationCreateMessages_AM06) Up(repoPath, databasePassword string, testnetEnabled bool) error { + return createMessages(repoPath, databasePassword, MigrationCreateMessages_AM06UpVer, testnetEnabled) } -// Down - the migration Down code -func (AM06) Down(repoPath, databasePassword string, testnetEnabled bool) error { - return createMessages(repoPath, databasePassword, AM06DownVer, testnetEnabled) +// Down the migration Down code +func (MigrationCreateMessages_AM06) Down(repoPath, databasePassword string, testnetEnabled bool) error { + return deleteMessages(repoPath, databasePassword, MigrationCreateMessages_AM06DownVer, testnetEnabled) } diff --git a/repo/migrations/Migration024_test.go b/repo/migrations/Migration024_test.go index e417c8ac61..671d261077 100644 --- a/repo/migrations/Migration024_test.go +++ b/repo/migrations/Migration024_test.go @@ -1 +1,88 @@ package migrations_test + +import ( + "database/sql" + "io/ioutil" + "os" + "testing" + + "github.com/OpenBazaar/openbazaar-go/repo/migrations" + "github.com/OpenBazaar/openbazaar-go/schema" +) + +func TestMigration024(t *testing.T) { + var ( + basePath = schema.GenerateTempPath() + testRepoPath, err = schema.OpenbazaarPathTransform(basePath, true) + ) + if err != nil { + t.Fatal(err) + } + appSchema, err := schema.NewCustomSchemaManager(schema.SchemaContext{DataPath: testRepoPath, TestModeEnabled: true}) + if err != nil { + t.Fatal(err) + } + if err = appSchema.BuildSchemaDirectories(); err != nil { + t.Fatal(err) + } + defer appSchema.DestroySchemaDirectories() + + var ( + databasePath = appSchema.DatabasePath() + schemaPath = appSchema.DataPathJoin("repover") + + schemaSQL = "pragma key = 'foobarbaz';" + selectMessagesSQL = "select * from messages;" + //setupSQL = strings.Join([]string{ + // schemaSQL, + //}, " ") + ) + + // create schema version file + if err = ioutil.WriteFile(schemaPath, []byte("23"), os.ModePerm); err != nil { + t.Fatal(err) + } + + // execute migration up + m := migrations.Migration024{} + if err := m.Up(testRepoPath, "foobarbaz", true); err != nil { + t.Fatal(err) + } + + db, err := sql.Open("sqlite3", databasePath) + if err != nil { + t.Fatal(err) + } + defer db.Close() + if _, err = db.Exec(schemaSQL); err != nil { + t.Fatal(err) + } + + // assert repo version updated + if err = appSchema.VerifySchemaVersion("25"); err != nil { + t.Fatal(err) + } + + // verify change was applied properly + _, err = db.Exec(selectMessagesSQL) + if err != nil { + t.Fatal(err) + } + + // execute migration down + if err := m.Down(testRepoPath, "foobarbaz", true); err != nil { + t.Fatal(err) + } + + // assert repo version reverted + if err = appSchema.VerifySchemaVersion("24"); err != nil { + t.Fatal(err) + } + + // verify change was reverted properly + _, err = db.Exec(selectMessagesSQL) + if err == nil { + t.Fatal(err) + } + +} From bcca56da7dafd6f3d2ff3c27362372f3ae5833d0 Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Wed, 10 Jul 2019 21:48:45 +0530 Subject: [PATCH 14/23] lint fix --- repo/migrations/Migration024.go | 70 ++++++++++++++++----------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/repo/migrations/Migration024.go b/repo/migrations/Migration024.go index 81024b9f73..883fbc0432 100644 --- a/repo/migrations/Migration024.go +++ b/repo/migrations/Migration024.go @@ -9,34 +9,34 @@ import ( ) const ( - // MigrationCreateMessages_AM06MessagesCreateSQL the messages create sql - MigrationCreateMessages_AM06MessagesCreateSQL = "create table messages (messageID text primary key not null, orderID text, message_type integer, message blob, peerID text, url text, acknowledged bool, tries integer, created_at integer, updated_at integer);" - // MigrationCreateMessages_AM06CreateIndexMessagesSQL_MessageID the messages index on messageID sql - MigrationCreateMessages_AM06CreateIndexMessagesSQL_MessageID = "create index index_messages_messageID on messages (messageID);" - // MigrationCreateMessages_AM06CreateIndexMessagesSQL_OrderID_MType the messages composite index on orderID and messageType create sql - MigrationCreateMessages_AM06CreateIndexMessagesSQL_OrderID_MType = "create index index_messages_orderIDmType on messages (orderID, message_type);" - // MigrationCreateMessages_AM06CreateIndexMessagesSQL_PeerID_MType the messages composite index on peerID and messageType create sql - MigrationCreateMessages_AM06CreateIndexMessagesSQL_PeerID_MType = "create index index_messages_peerIDmType on messages (peerID, message_type);" - // MigrationCreateMessages_AM06MessagesDeleteSQL the messages delete sql - MigrationCreateMessages_AM06MessagesDeleteSQL = "drop table if exists messages;" - // MigrationCreateMessages_AM06CreateIndexMessagesSQL_MessageID delete the messages index on messageID sql - MigrationCreateMessages_AM06DeleteIndexMessagesSQL_MessageID = "drop index if exists index_messages_messageID;" - // MigrationCreateMessages_AM06CreateIndexMessagesSQL_OrderID_MType delete the messages composite index on orderID and messageType create sql - MigrationCreateMessages_AM06DeleteIndexMessagesSQL_OrderID_MType = "drop index if exists index_messages_orderIDmType;" - // MigrationCreateMessages_AM06CreateIndexMessagesSQL_PeerID_MType delete the messages composite index on peerID and messageType create sql - MigrationCreateMessages_AM06DeleteIndexMessagesSQL_PeerID_MType = "drop index if exists index_messages_peerIDmType;" - // MigrationCreateMessages_AM06UpVer set the repo Up version - MigrationCreateMessages_AM06UpVer = "25" - // MigrationCreateMessages_AM06DownVer set the repo Down version - MigrationCreateMessages_AM06DownVer = "24" + // MigrationCreateMessagesAM06MessagesCreateSQL the messages create sql + MigrationCreateMessagesAM06MessagesCreateSQL = "create table messages (messageID text primary key not null, orderID text, message_type integer, message blob, peerID text, url text, acknowledged bool, tries integer, created_at integer, updated_at integer);" + // MigrationCreateMessagesAM06CreateIndexMessagesSQLMessageID the messages index on messageID sql + MigrationCreateMessagesAM06CreateIndexMessagesSQLMessageID = "create index index_messages_messageID on messages (messageID);" + // MigrationCreateMessagesAM06CreateIndexMessagesSQLOrderIDMType the messages composite index on orderID and messageType create sql + MigrationCreateMessagesAM06CreateIndexMessagesSQLOrderIDMType = "create index index_messages_orderIDmType on messages (orderID, message_type);" + // MigrationCreateMessagesAM06CreateIndexMessagesSQLPeerIDMType the messages composite index on peerID and messageType create sql + MigrationCreateMessagesAM06CreateIndexMessagesSQLPeerIDMType = "create index index_messages_peerIDmType on messages (peerID, message_type);" + // MigrationCreateMessagesAM06MessagesDeleteSQL the messages delete sql + MigrationCreateMessagesAM06MessagesDeleteSQL = "drop table if exists messages;" + // MigrationCreateMessagesAM06CreateIndexMessagesSQLMessageID delete the messages index on messageID sql + MigrationCreateMessagesAM06DeleteIndexMessagesSQLMessageID = "drop index if exists index_messages_messageID;" + // MigrationCreateMessagesAM06CreateIndexMessagesSQLOrderIDMType delete the messages composite index on orderID and messageType create sql + MigrationCreateMessagesAM06DeleteIndexMessagesSQLOrderIDMType = "drop index if exists index_messages_orderIDmType;" + // MigrationCreateMessagesAM06CreateIndexMessagesSQLPeerIDMType delete the messages composite index on peerID and messageType create sql + MigrationCreateMessagesAM06DeleteIndexMessagesSQLPeerIDMType = "drop index if exists index_messages_peerIDmType;" + // MigrationCreateMessagesAM06UpVer set the repo Up version + MigrationCreateMessagesAM06UpVer = "25" + // MigrationCreateMessagesAM06DownVer set the repo Down version + MigrationCreateMessagesAM06DownVer = "24" ) -// MigrationCreateMessages_AM06 local migration struct -type MigrationCreateMessages_AM06 struct{} +// MigrationCreateMessagesAM06 local migration struct +type MigrationCreateMessagesAM06 struct{} // Migration024 migration struct type Migration024 struct { - MigrationCreateMessages_AM06 + MigrationCreateMessagesAM06 } func createMessages(repoPath, databasePassword, rVer string, testnetEnabled bool) error { @@ -67,19 +67,19 @@ func createMessages(repoPath, databasePassword, rVer string, testnetEnabled bool if err != nil { return err } - if _, err = tx.Exec(MigrationCreateMessages_AM06MessagesCreateSQL); err != nil { + if _, err = tx.Exec(MigrationCreateMessagesAM06MessagesCreateSQL); err != nil { tx.Rollback() return err } - if _, err = tx.Exec(MigrationCreateMessages_AM06CreateIndexMessagesSQL_MessageID); err != nil { + if _, err = tx.Exec(MigrationCreateMessagesAM06CreateIndexMessagesSQLMessageID); err != nil { tx.Rollback() return err } - if _, err = tx.Exec(MigrationCreateMessages_AM06CreateIndexMessagesSQL_OrderID_MType); err != nil { + if _, err = tx.Exec(MigrationCreateMessagesAM06CreateIndexMessagesSQLOrderIDMType); err != nil { tx.Rollback() return err } - if _, err = tx.Exec(MigrationCreateMessages_AM06CreateIndexMessagesSQL_PeerID_MType); err != nil { + if _, err = tx.Exec(MigrationCreateMessagesAM06CreateIndexMessagesSQLPeerIDMType); err != nil { tx.Rollback() return err } @@ -123,19 +123,19 @@ func deleteMessages(repoPath, databasePassword, rVer string, testnetEnabled bool if err != nil { return err } - if _, err = tx.Exec(MigrationCreateMessages_AM06DeleteIndexMessagesSQL_MessageID); err != nil { + if _, err = tx.Exec(MigrationCreateMessagesAM06DeleteIndexMessagesSQLMessageID); err != nil { tx.Rollback() return err } - if _, err = tx.Exec(MigrationCreateMessages_AM06DeleteIndexMessagesSQL_OrderID_MType); err != nil { + if _, err = tx.Exec(MigrationCreateMessagesAM06DeleteIndexMessagesSQLOrderIDMType); err != nil { tx.Rollback() return err } - if _, err = tx.Exec(MigrationCreateMessages_AM06DeleteIndexMessagesSQL_PeerID_MType); err != nil { + if _, err = tx.Exec(MigrationCreateMessagesAM06DeleteIndexMessagesSQLPeerIDMType); err != nil { tx.Rollback() return err } - if _, err = tx.Exec(MigrationCreateMessages_AM06MessagesDeleteSQL); err != nil { + if _, err = tx.Exec(MigrationCreateMessagesAM06MessagesDeleteSQL); err != nil { tx.Rollback() return err } @@ -152,11 +152,11 @@ func deleteMessages(repoPath, databasePassword, rVer string, testnetEnabled bool } // Up the migration Up code -func (MigrationCreateMessages_AM06) Up(repoPath, databasePassword string, testnetEnabled bool) error { - return createMessages(repoPath, databasePassword, MigrationCreateMessages_AM06UpVer, testnetEnabled) +func (MigrationCreateMessagesAM06) Up(repoPath, databasePassword string, testnetEnabled bool) error { + return createMessages(repoPath, databasePassword, MigrationCreateMessagesAM06UpVer, testnetEnabled) } // Down the migration Down code -func (MigrationCreateMessages_AM06) Down(repoPath, databasePassword string, testnetEnabled bool) error { - return deleteMessages(repoPath, databasePassword, MigrationCreateMessages_AM06DownVer, testnetEnabled) +func (MigrationCreateMessagesAM06) Down(repoPath, databasePassword string, testnetEnabled bool) error { + return deleteMessages(repoPath, databasePassword, MigrationCreateMessagesAM06DownVer, testnetEnabled) } From 35fddca554a0396217018a41f0bd6107c48fd7f1 Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Wed, 10 Jul 2019 21:55:08 +0530 Subject: [PATCH 15/23] lint fix --- repo/migrations/Migration024.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/repo/migrations/Migration024.go b/repo/migrations/Migration024.go index 883fbc0432..c52875093d 100644 --- a/repo/migrations/Migration024.go +++ b/repo/migrations/Migration024.go @@ -19,11 +19,11 @@ const ( MigrationCreateMessagesAM06CreateIndexMessagesSQLPeerIDMType = "create index index_messages_peerIDmType on messages (peerID, message_type);" // MigrationCreateMessagesAM06MessagesDeleteSQL the messages delete sql MigrationCreateMessagesAM06MessagesDeleteSQL = "drop table if exists messages;" - // MigrationCreateMessagesAM06CreateIndexMessagesSQLMessageID delete the messages index on messageID sql + // MigrationCreateMessagesAM06DeleteIndexMessagesSQLMessageID delete the messages index on messageID sql MigrationCreateMessagesAM06DeleteIndexMessagesSQLMessageID = "drop index if exists index_messages_messageID;" - // MigrationCreateMessagesAM06CreateIndexMessagesSQLOrderIDMType delete the messages composite index on orderID and messageType create sql + // MigrationCreateMessagesAM06DeleteIndexMessagesSQLOrderIDMType delete the messages composite index on orderID and messageType create sql MigrationCreateMessagesAM06DeleteIndexMessagesSQLOrderIDMType = "drop index if exists index_messages_orderIDmType;" - // MigrationCreateMessagesAM06CreateIndexMessagesSQLPeerIDMType delete the messages composite index on peerID and messageType create sql + // MigrationCreateMessagesAM06DeleteIndexMessagesSQLPeerIDMType delete the messages composite index on peerID and messageType create sql MigrationCreateMessagesAM06DeleteIndexMessagesSQLPeerIDMType = "drop index if exists index_messages_peerIDmType;" // MigrationCreateMessagesAM06UpVer set the repo Up version MigrationCreateMessagesAM06UpVer = "25" From db438f9898193910516b7b321f93cbcc2b3b712f Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Thu, 11 Jul 2019 21:47:10 +0530 Subject: [PATCH 16/23] fix panic on pre initialization invoking of offline message scan --- api/jsonapi.go | 6 +++--- net/retriever/retriever.go | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/api/jsonapi.go b/api/jsonapi.go index e1be569c1a..697ab0927d 100644 --- a/api/jsonapi.go +++ b/api/jsonapi.go @@ -4273,14 +4273,14 @@ func (i *jsonAPIHandler) POSTResendOrderMessage(w http.ResponseWriter, r *http.R return } - SanitizedResponse(w, "") + SanitizedResponse(w, `{}`) } // GETScanOfflineMessages - used to manually trigger offline message scan func (i *jsonAPIHandler) GETScanOfflineMessages(w http.ResponseWriter, r *http.Request) { - if time.Since(lastManualScan).Minutes() > 10 { + if t := time.Since(lastManualScan).Minutes(); t > 10 && t < 100000000 { i.node.MessageRetriever.RunOnce() lastManualScan = time.Now() } - SanitizedResponse(w, "") + SanitizedResponse(w, `{}`) } diff --git a/net/retriever/retriever.go b/net/retriever/retriever.go index 012210289f..626d6548ed 100644 --- a/net/retriever/retriever.go +++ b/net/retriever/retriever.go @@ -120,7 +120,9 @@ func (m *MessageRetriever) Run() { // RunOnce - used to fetch messages only once func (m *MessageRetriever) RunOnce() { + m.Add(1) go m.fetchPointers(true) + m.Add(1) go m.fetchPointers(false) } From a6e1da502531703b629dd5d0e86c5e9680716223 Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Fri, 12 Jul 2019 09:58:41 +0530 Subject: [PATCH 17/23] decrease the manual scan interval to 2 minutes --- api/jsonapi.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/jsonapi.go b/api/jsonapi.go index 697ab0927d..bf5e25d892 100644 --- a/api/jsonapi.go +++ b/api/jsonapi.go @@ -4278,9 +4278,9 @@ func (i *jsonAPIHandler) POSTResendOrderMessage(w http.ResponseWriter, r *http.R // GETScanOfflineMessages - used to manually trigger offline message scan func (i *jsonAPIHandler) GETScanOfflineMessages(w http.ResponseWriter, r *http.Request) { - if t := time.Since(lastManualScan).Minutes(); t > 10 && t < 100000000 { + if t := time.Since(lastManualScan).Minutes(); t > 2 && t < 100000000 { i.node.MessageRetriever.RunOnce() - lastManualScan = time.Now() } + lastManualScan = time.Now() SanitizedResponse(w, `{}`) } From 9c860babdaaac9f65f9dedf288f6708923cfa463 Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Tue, 16 Jul 2019 09:01:45 +0530 Subject: [PATCH 18/23] reduce retry interval to 1 minute --- api/jsonapi.go | 11 ++++++++--- mobile/node.go | 7 +++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/api/jsonapi.go b/api/jsonapi.go index bf5e25d892..8ebe53a5bf 100644 --- a/api/jsonapi.go +++ b/api/jsonapi.go @@ -4278,9 +4278,14 @@ func (i *jsonAPIHandler) POSTResendOrderMessage(w http.ResponseWriter, r *http.R // GETScanOfflineMessages - used to manually trigger offline message scan func (i *jsonAPIHandler) GETScanOfflineMessages(w http.ResponseWriter, r *http.Request) { - if t := time.Since(lastManualScan).Minutes(); t > 2 && t < 100000000 { - i.node.MessageRetriever.RunOnce() + t := time.Since(lastManualScan).Minutes() + if t < 100000000 { + if t >= 1 { + i.node.MessageRetriever.RunOnce() + lastManualScan = time.Now() + } + } else { + lastManualScan = time.Now() } - lastManualScan = time.Now() SanitizedResponse(w, `{}`) } diff --git a/mobile/node.go b/mobile/node.go index 1bf4e5c7f7..601e2291d1 100644 --- a/mobile/node.go +++ b/mobile/node.go @@ -69,6 +69,7 @@ type Node struct { var ( fileLogFormat = logging.MustStringFormatter( `%{time:15:04:05.000} [%{level}] [%{module}/%{shortfunc}] %{message}`, + publishUnlocked = false ) mainLoggingBackend logging.Backend ) @@ -455,6 +456,7 @@ func (n *Node) Start() error { MR.Wait() core.PublishLock.Unlock() + publishUnlocked = true core.Node.UpdateFollow() if !core.InitalPublishComplete { core.Node.SeedNode() @@ -476,6 +478,11 @@ func (n *Node) Stop() error { return nil } +// PublishUnlocked return true if publish is unlocked +func (n *Node) PublishUnlocked() bool { + return publishUnlocked +} + // initializeRepo create the database func initializeRepo(dataDir, password, mnemonic string, testnet bool, creationDate time.Time, coinType wi.CoinType) (*db.SQLiteDatastore, error) { // Database From 26c47e8df43ba824aa3329026daaf995f3dd3ceb Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Wed, 17 Jul 2019 12:53:43 +0530 Subject: [PATCH 19/23] update the utxo wallets to invoke the transaction listeners for ORDER_PAYMENT message --- .../OpenBazaar/bitcoind-wallet/wallet.go | 7 +++++++ .../OpenBazaar/multiwallet/bitcoin/wallet.go | 5 +++++ .../OpenBazaar/multiwallet/bitcoincash/wallet.go | 5 +++++ .../OpenBazaar/multiwallet/litecoin/wallet.go | 8 +++++++- .../multiwallet/service/wallet_service.go | 7 +++++++ .../OpenBazaar/multiwallet/zcash/wallet.go | 5 +++++ vendor/github.com/OpenBazaar/spvwallet/wallet.go | 13 ++++++++++--- .../github.com/OpenBazaar/zcashd-wallet/wallet.go | 7 +++++++ .../github.com/cpacia/BitcoinCash-Wallet/wallet.go | 14 +++++++++++--- 9 files changed, 64 insertions(+), 7 deletions(-) diff --git a/vendor/github.com/OpenBazaar/bitcoind-wallet/wallet.go b/vendor/github.com/OpenBazaar/bitcoind-wallet/wallet.go index 30170bf2fa..eaaecd4a0b 100644 --- a/vendor/github.com/OpenBazaar/bitcoind-wallet/wallet.go +++ b/vendor/github.com/OpenBazaar/bitcoind-wallet/wallet.go @@ -1129,3 +1129,10 @@ func DefaultSocksPort(controlPort int) int { } return socksPort } + +// AssociateTransactionWithOrder used for ORDER_PAYMENT message +func (w *BitcoindWallet) AssociateTransactionWithOrder(txnCB wallet.TransactionCallback) { + for _, l := range w.listeners { + go l(txnCB) + } +} diff --git a/vendor/github.com/OpenBazaar/multiwallet/bitcoin/wallet.go b/vendor/github.com/OpenBazaar/multiwallet/bitcoin/wallet.go index faf6092818..26a6701d5e 100644 --- a/vendor/github.com/OpenBazaar/multiwallet/bitcoin/wallet.go +++ b/vendor/github.com/OpenBazaar/multiwallet/bitcoin/wallet.go @@ -405,3 +405,8 @@ func (w *BitcoinWallet) Broadcast(tx *wire.MsgTx) error { w.ws.ProcessIncomingTransaction(cTxn) return nil } + +// AssociateTransactionWithOrder used for ORDER_PAYMENT message +func (w *BitcoinWallet) AssociateTransactionWithOrder(cb wi.TransactionCallback) { + w.ws.InvokeTransactionListeners(cb) +} diff --git a/vendor/github.com/OpenBazaar/multiwallet/bitcoincash/wallet.go b/vendor/github.com/OpenBazaar/multiwallet/bitcoincash/wallet.go index 96b03f82d7..3988874bc8 100644 --- a/vendor/github.com/OpenBazaar/multiwallet/bitcoincash/wallet.go +++ b/vendor/github.com/OpenBazaar/multiwallet/bitcoincash/wallet.go @@ -416,3 +416,8 @@ func (w *BitcoinCashWallet) Broadcast(tx *wire.MsgTx) error { w.ws.ProcessIncomingTransaction(cTxn) return nil } + +// AssociateTransactionWithOrder used for ORDER_PAYMENT message +func (w *BitcoinCashWallet) AssociateTransactionWithOrder(cb wi.TransactionCallback) { + w.ws.InvokeTransactionListeners(cb) +} diff --git a/vendor/github.com/OpenBazaar/multiwallet/litecoin/wallet.go b/vendor/github.com/OpenBazaar/multiwallet/litecoin/wallet.go index df5e297c3c..fd3ca16cf2 100644 --- a/vendor/github.com/OpenBazaar/multiwallet/litecoin/wallet.go +++ b/vendor/github.com/OpenBazaar/multiwallet/litecoin/wallet.go @@ -4,10 +4,11 @@ import ( "bytes" "encoding/hex" "fmt" - "github.com/ltcsuite/ltcutil" "io" "time" + "github.com/ltcsuite/ltcutil" + "github.com/OpenBazaar/multiwallet/cache" "github.com/OpenBazaar/multiwallet/client" "github.com/OpenBazaar/multiwallet/config" @@ -436,3 +437,8 @@ func (w *LitecoinWallet) Broadcast(tx *wire.MsgTx) error { w.ws.ProcessIncomingTransaction(cTxn) return nil } + +// AssociateTransactionWithOrder used for ORDER_PAYMENT message +func (w *LitecoinWallet) AssociateTransactionWithOrder(cb wi.TransactionCallback) { + w.ws.InvokeTransactionListeners(cb) +} diff --git a/vendor/github.com/OpenBazaar/multiwallet/service/wallet_service.go b/vendor/github.com/OpenBazaar/multiwallet/service/wallet_service.go index ff8f27be49..9d3b53caef 100644 --- a/vendor/github.com/OpenBazaar/multiwallet/service/wallet_service.go +++ b/vendor/github.com/OpenBazaar/multiwallet/service/wallet_service.go @@ -111,6 +111,13 @@ func (ws *WalletService) AddTransactionListener(callback func(callback wallet.Tr ws.listeners = append(ws.listeners, callback) } +// InvokeTransactionListeners will invoke the transaction listeners for the updation of order state +func (ws *WalletService) InvokeTransactionListeners(callback wallet.TransactionCallback) { + for _, l := range ws.listeners { + go l(callback) + } +} + func (ws *WalletService) listen() { var ( addrs = ws.getStoredAddresses() diff --git a/vendor/github.com/OpenBazaar/multiwallet/zcash/wallet.go b/vendor/github.com/OpenBazaar/multiwallet/zcash/wallet.go index f3fbdda9a2..ca840deb5c 100644 --- a/vendor/github.com/OpenBazaar/multiwallet/zcash/wallet.go +++ b/vendor/github.com/OpenBazaar/multiwallet/zcash/wallet.go @@ -419,3 +419,8 @@ func (w *ZCashWallet) Broadcast(tx *wire.MsgTx) (string, error) { w.ws.ProcessIncomingTransaction(cTxn) return cTxn.Txid, nil } + +// AssociateTransactionWithOrder used for ORDER_PAYMENT message +func (w *ZCashWallet) AssociateTransactionWithOrder(cb wi.TransactionCallback) { + w.ws.InvokeTransactionListeners(cb) +} diff --git a/vendor/github.com/OpenBazaar/spvwallet/wallet.go b/vendor/github.com/OpenBazaar/spvwallet/wallet.go index 23fb65c28e..215a957b8c 100644 --- a/vendor/github.com/OpenBazaar/spvwallet/wallet.go +++ b/vendor/github.com/OpenBazaar/spvwallet/wallet.go @@ -2,6 +2,10 @@ package spvwallet import ( "errors" + "io" + "sync" + "time" + "github.com/OpenBazaar/spvwallet/exchangerates" "github.com/OpenBazaar/wallet-interface" "github.com/btcsuite/btcd/btcec" @@ -14,9 +18,6 @@ import ( "github.com/btcsuite/btcwallet/wallet/txrules" "github.com/op/go-logging" b39 "github.com/tyler-smith/go-bip39" - "io" - "sync" - "time" ) type SPVWallet struct { @@ -445,3 +446,9 @@ func (w *SPVWallet) ReSyncBlockchain(fromDate time.Time) { func (w *SPVWallet) ExchangeRates() wallet.ExchangeRates { return w.exchangeRates } + +func (w *SPVWallet) AssociateTransactionWithOrder(cb wallet.TransactionCallback) { + for _, l := range w.txstore.listeners { + go l(cb) + } +} diff --git a/vendor/github.com/OpenBazaar/zcashd-wallet/wallet.go b/vendor/github.com/OpenBazaar/zcashd-wallet/wallet.go index 7633c179de..9eac728391 100644 --- a/vendor/github.com/OpenBazaar/zcashd-wallet/wallet.go +++ b/vendor/github.com/OpenBazaar/zcashd-wallet/wallet.go @@ -1072,3 +1072,10 @@ func (w *ZcashdWallet) Close() { func (w *ZcashdWallet) ExchangeRates() wallet.ExchangeRates { return w.exchangeRates } + +// AssociateTransactionWithOrder used for ORDER_PAYMENT message +func (w *ZcashdWallet) AssociateTransactionWithOrder(txnCB wallet.TransactionCallback) { + for _, l := range w.listeners { + go l(txnCB) + } +} diff --git a/vendor/github.com/cpacia/BitcoinCash-Wallet/wallet.go b/vendor/github.com/cpacia/BitcoinCash-Wallet/wallet.go index 0ee4ce01c6..e20705063d 100644 --- a/vendor/github.com/cpacia/BitcoinCash-Wallet/wallet.go +++ b/vendor/github.com/cpacia/BitcoinCash-Wallet/wallet.go @@ -2,6 +2,10 @@ package bitcoincash import ( "errors" + "io" + "sync" + "time" + "github.com/OpenBazaar/wallet-interface" "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg" @@ -14,9 +18,6 @@ import ( "github.com/cpacia/bchutil" "github.com/op/go-logging" b39 "github.com/tyler-smith/go-bip39" - "io" - "sync" - "time" ) func setupNetworkParams(params *chaincfg.Params) { @@ -498,3 +499,10 @@ func (w *SPVWallet) ReSyncBlockchain(fromDate time.Time) { w.txstore.PopulateAdrs() w.wireService.Resync() } + +// AssociateTransactionWithOrder used for ORDER_PAYMENT message +func (w *SPVWallet) AssociateTransactionWithOrder(cb wallet.TransactionCallback) { + for _, l := range w.txstore.listeners { + go l(cb) + } +} From 4ce3238f2d5d5957e218c0cc9d34b6b1b49df35c Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Thu, 18 Jul 2019 23:42:50 +0530 Subject: [PATCH 20/23] fix reviewed issues --- core/net.go | 65 +++++++++++++++++++++------------------- mobile/node.go | 2 +- repo/db/messages_test.go | 2 +- repo/message_test.go | 4 +++ schema/constants.go | 6 ++-- schema/manager.go | 5 ++-- 6 files changed, 46 insertions(+), 38 deletions(-) diff --git a/core/net.go b/core/net.go index 15d82b5e04..acca627c51 100644 --- a/core/net.go +++ b/core/net.go @@ -279,13 +279,14 @@ func (n *OpenBazaarNode) SendOrder(peerID string, contract *pb.RicardianContract orderID0, err := n.CalcOrderID(contract.BuyerOrder) if err != nil { - return resp, err - } - err = n.Datastore.Messages().Put( - fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER)), - orderID0, pb.Message_ORDER, peerID, repo.Message{Msg: m}) - if err != nil { - log.Errorf("failed putting message (%s-%d)", orderID0, int(pb.Message_ORDER)) + log.Errorf("failed calculating order id") + } else { + err = n.Datastore.Messages().Put( + fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER)), + orderID0, pb.Message_ORDER, peerID, repo.Message{Msg: m}) + if err != nil { + log.Errorf("failed putting message (%s-%d)", orderID0, int(pb.Message_ORDER)) + } } resp, err = n.Service.SendRequest(ctx, p, &m) @@ -316,15 +317,15 @@ func (n *OpenBazaarNode) SendOrderConfirmation(peerID string, contract *pb.Ricar } orderID0, err := n.CalcOrderID(contract.BuyerOrder) if err != nil { - return err - } - err = n.Datastore.Messages().Put( - fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_CONFIRMATION)), - orderID0, pb.Message_ORDER_CONFIRMATION, peerID, repo.Message{Msg: m}) - if err != nil { - log.Errorf("failed putting message (%s-%d)", orderID0, int(pb.Message_ORDER)) + log.Errorf("failed calculating order id") + } else { + err = n.Datastore.Messages().Put( + fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_CONFIRMATION)), + orderID0, pb.Message_ORDER_CONFIRMATION, peerID, repo.Message{Msg: m}) + if err != nil { + log.Errorf("failed putting message (%s-%d)", orderID0, int(pb.Message_ORDER_CONFIRMATION)) + } } - return n.sendMessage(peerID, &k, m) } @@ -351,7 +352,7 @@ func (n *OpenBazaarNode) SendCancel(peerID, orderID string) error { fmt.Sprintf("%s-%d", orderID, int(pb.Message_ORDER_CANCEL)), orderID, pb.Message_ORDER_CANCEL, peerID, repo.Message{Msg: m}) if err != nil { - log.Errorf("failed putting message (%s-%d)", orderID, int(pb.Message_ORDER)) + log.Errorf("failed putting message (%s-%d)", orderID, int(pb.Message_ORDER_CANCEL)) } return n.sendMessage(peerID, kp, m) } @@ -382,7 +383,7 @@ func (n *OpenBazaarNode) SendReject(peerID string, rejectMessage *pb.OrderReject fmt.Sprintf("%s-%d", rejectMessage.OrderID, int(pb.Message_ORDER_REJECT)), rejectMessage.OrderID, pb.Message_ORDER_REJECT, peerID, repo.Message{Msg: m}) if err != nil { - log.Errorf("failed putting message (%s-%d)", rejectMessage.OrderID, int(pb.Message_ORDER)) + log.Errorf("failed putting message (%s-%d)", rejectMessage.OrderID, int(pb.Message_ORDER_REJECT)) } return n.sendMessage(peerID, kp, m) } @@ -416,13 +417,14 @@ func (n *OpenBazaarNode) SendOrderFulfillment(peerID string, k *libp2p.PubKey, f } orderID0, err := n.CalcOrderID(fulfillmentMessage.BuyerOrder) if err != nil { - return err - } - err = n.Datastore.Messages().Put( - fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_FULFILLMENT)), - orderID0, pb.Message_ORDER_FULFILLMENT, peerID, repo.Message{Msg: m}) - if err != nil { - log.Errorf("failed putting message (%s-%d)", orderID0, int(pb.Message_ORDER)) + log.Errorf("failed calculating order id") + } else { + err = n.Datastore.Messages().Put( + fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_FULFILLMENT)), + orderID0, pb.Message_ORDER_FULFILLMENT, peerID, repo.Message{Msg: m}) + if err != nil { + log.Errorf("failed putting message (%s-%d)", orderID0, int(pb.Message_ORDER_FULFILLMENT)) + } } return n.sendMessage(peerID, k, m) } @@ -442,13 +444,14 @@ func (n *OpenBazaarNode) SendOrderCompletion(peerID string, k *libp2p.PubKey, co } orderID0, err := n.CalcOrderID(completionMessage.BuyerOrder) if err != nil { - return err - } - err = n.Datastore.Messages().Put( - fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_COMPLETION)), - orderID0, pb.Message_ORDER_COMPLETION, peerID, repo.Message{Msg: m}) - if err != nil { - log.Errorf("failed putting message (%s-%d)", orderID0, int(pb.Message_ORDER)) + log.Errorf("failed calculating order id") + } else { + err = n.Datastore.Messages().Put( + fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_COMPLETION)), + orderID0, pb.Message_ORDER_COMPLETION, peerID, repo.Message{Msg: m}) + if err != nil { + log.Errorf("failed putting message (%s-%d)", orderID0, int(pb.Message_ORDER_COMPLETION)) + } } return n.sendMessage(peerID, k, m) } diff --git a/mobile/node.go b/mobile/node.go index 601e2291d1..6e42dfc55e 100644 --- a/mobile/node.go +++ b/mobile/node.go @@ -69,8 +69,8 @@ type Node struct { var ( fileLogFormat = logging.MustStringFormatter( `%{time:15:04:05.000} [%{level}] [%{module}/%{shortfunc}] %{message}`, - publishUnlocked = false ) + publishUnlocked = false mainLoggingBackend logging.Backend ) diff --git a/repo/db/messages_test.go b/repo/db/messages_test.go index 7b69d4d7c7..b25919be23 100644 --- a/repo/db/messages_test.go +++ b/repo/db/messages_test.go @@ -61,7 +61,7 @@ func TestMessageDB_Put(t *testing.T) { t.Error(err) } - if !(string(retMsg.GetPayload().Value) == payload) { + if string(retMsg.GetPayload().Value) != payload { t.Error("incorrect payload") } diff --git a/repo/message_test.go b/repo/message_test.go index 183df04928..7d4196b7bf 100644 --- a/repo/message_test.go +++ b/repo/message_test.go @@ -35,6 +35,10 @@ func TestMessage(t *testing.T) { t.Error(err) } + if retRepoMsg.GetMessageType() != pb.Message_ORDER { + t.Error("wrong msg type") + } + if !bytes.Equal(retRepoMsg.GetPayload().GetValue(), []byte(payload)) { t.Error("wrong msg type") } diff --git a/schema/constants.go b/schema/constants.go index b301145c70..e4b489e6c0 100644 --- a/schema/constants.go +++ b/schema/constants.go @@ -37,9 +37,9 @@ const ( CreateIndexCouponsSQL = "create index index_coupons on coupons (slug);" CreateTableModeratedStoresSQL = "create table moderatedstores (peerID text primary key not null);" CreateMessagesSQL = "create table messages (messageID text primary key not null, orderID text, message_type integer, message blob, peerID text, url text, acknowledged bool, tries integer, created_at integer, updated_at integer);" - CreateIndexMessagesSQL1 = "create index index_messages1 on messages (messageID);" - CreateIndexMessagesSQL2 = "create index index_messages2 on messages (orderID, message_type);" - CreateIndexMessagesSQL3 = "create index index_messages2 on messages (peerID, message_type);" + CreateIndexMessagesSQLMessageID = "create index index_messages_messageID on messages (messageID);" + CreateIndexMessagesSQLOrderIDMType = "create index index_messages_orderIDmType on messages (orderID, message_type);" + CreateIndexMessagesSQLPeerIDMType = "create index index_messages_peerIDmType on messages (peerID, message_type);" // End SQL Statements // Configuration defaults diff --git a/schema/manager.go b/schema/manager.go index 1b187fe39c..2c745f663b 100644 --- a/schema/manager.go +++ b/schema/manager.go @@ -306,8 +306,9 @@ func InitializeDatabaseSQL(encryptionPassword string) string { CreateIndexCouponsSQL, CreateTableModeratedStoresSQL, CreateMessagesSQL, - CreateIndexMessagesSQL1, - CreateIndexMessagesSQL2, + CreateIndexMessagesSQLMessageID, + CreateIndexMessagesSQLOrderIDMType, + CreateIndexMessagesSQLPeerIDMType, } return strings.Join(initializeStatement, " ") } From 3b17209d37503b4e7b6f3124fa2dbdeedc16ac67 Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Fri, 19 Jul 2019 19:29:04 +0530 Subject: [PATCH 21/23] remove GetMessageByID --- repo/datastore.go | 3 --- repo/db/messages.go | 31 ------------------------------- 2 files changed, 34 deletions(-) diff --git a/repo/datastore.go b/repo/datastore.go index 0a5ae93342..9984c379a0 100644 --- a/repo/datastore.go +++ b/repo/datastore.go @@ -442,7 +442,4 @@ type MessageStore interface { // GetByOrderIDType returns the message for specified order and type GetByOrderIDType(orderID string, mType pb.Message_MessageType) (*Message, string, error) - - // GetByMessageID returns the message for specified message id - GetByMessageID(messageID string) (*Message, string, error) } diff --git a/repo/db/messages.go b/repo/db/messages.go index abf57f07ae..5801e61314 100644 --- a/repo/db/messages.go +++ b/repo/db/messages.go @@ -2,7 +2,6 @@ package db import ( "database/sql" - "encoding/json" "fmt" "sync" "time" @@ -90,33 +89,3 @@ func (o *MessagesDB) GetByOrderIDType(orderID string, mType pb.Message_MessageTy return msg, peerID, nil } - -// GetByMessageID returns the message for the specified message id -func (o *MessagesDB) GetByMessageID(messageID string) (*repo.Message, string, error) { - o.lock.Lock() - defer o.lock.Unlock() - var ( - msg0 []byte - peerID string - ) - - stmt, err := o.db.Prepare("select message, peerID from messages where messageID=?") - if err != nil { - return nil, "", err - } - err = stmt.QueryRow(messageID).Scan(&msg0, &peerID) - if err != nil { - return nil, "", err - } - - msg := new(repo.Message) - - if len(msg0) > 0 { - err = json.Unmarshal(msg0, msg) - if err != nil { - return nil, "", err - } - } - - return msg, peerID, nil -} From 57a6bfd08e51ff4bb5df6f691054e25e9c08ad39 Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Wed, 24 Jul 2019 19:41:12 +0530 Subject: [PATCH 22/23] fix error logging and add offline message send after unsuccessful message resend --- api/jsonapi.go | 10 ++++++-- core/net.go | 67 ++++++++++++++++++++++++++++++++++---------------- 2 files changed, 54 insertions(+), 23 deletions(-) diff --git a/api/jsonapi.go b/api/jsonapi.go index 8ebe53a5bf..9fc84505bc 100644 --- a/api/jsonapi.go +++ b/api/jsonapi.go @@ -4269,8 +4269,14 @@ func (i *jsonAPIHandler) POSTResendOrderMessage(w http.ResponseWriter, r *http.R err = i.node.Service.SendMessage(ctx, p, &msg.Msg) if err != nil { - ErrorResponse(w, http.StatusBadRequest, "order message not sent") - return + // If send message failed, try sending offline message + log.Errorf("resending message failed: %v", err) + err = i.node.SendOfflineMessage(p, nil, &msg.Msg) + if err != nil { + log.Errorf("resending offline message failed: %v", err) + ErrorResponse(w, http.StatusBadRequest, "order message not sent") + return + } } SanitizedResponse(w, `{}`) diff --git a/core/net.go b/core/net.go index acca627c51..1e092e32d2 100644 --- a/core/net.go +++ b/core/net.go @@ -35,6 +35,7 @@ var OfflineMessageWaitGroup sync.WaitGroup func (n *OpenBazaarNode) sendMessage(peerID string, k *libp2p.PubKey, message pb.Message) error { p, err := peer.IDB58Decode(peerID) if err != nil { + log.Errorf("failed to decode peerID: %v", err) return err } ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout) @@ -143,6 +144,7 @@ func (n *OpenBazaarNode) SendOfflineAck(peerID string, pointerID peer.ID) error func (n *OpenBazaarNode) GetPeerStatus(peerID string) (string, error) { p, err := peer.IDB58Decode(peerID) if err != nil { + log.Errorf("failed to decode peerID: %v", err) return "", err } ctx, cancel := context.WithCancel(context.Background()) @@ -188,6 +190,7 @@ func (n *OpenBazaarNode) Follow(peerID string) error { } pbAny, err := ptypes.MarshalAny(sd) if err != nil { + log.Errorf("failed to marshal the signedData: %v", err) return err } m.Payload = pbAny @@ -240,6 +243,7 @@ func (n *OpenBazaarNode) Unfollow(peerID string) error { } pbAny, err := ptypes.MarshalAny(sd) if err != nil { + log.Errorf("failed to marshal the signedData: %v", err) return err } m.Payload = pbAny @@ -263,6 +267,7 @@ func (n *OpenBazaarNode) Unfollow(peerID string) error { func (n *OpenBazaarNode) SendOrder(peerID string, contract *pb.RicardianContract) (resp *pb.Message, err error) { p, err := peer.IDB58Decode(peerID) if err != nil { + log.Errorf("failed to decode peerID: %v", err) return resp, err } @@ -270,27 +275,27 @@ func (n *OpenBazaarNode) SendOrder(peerID string, contract *pb.RicardianContract defer cancel() pbAny, err := ptypes.MarshalAny(contract) if err != nil { + log.Errorf("failed to marshal the contract: %v", err) return resp, err } m := pb.Message{ MessageType: pb.Message_ORDER, Payload: pbAny, } - orderID0, err := n.CalcOrderID(contract.BuyerOrder) if err != nil { - log.Errorf("failed calculating order id") + log.Errorf("failed calculating order id: %v", err) } else { err = n.Datastore.Messages().Put( fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER)), orderID0, pb.Message_ORDER, peerID, repo.Message{Msg: m}) if err != nil { - log.Errorf("failed putting message (%s-%d)", orderID0, int(pb.Message_ORDER)) + log.Errorf("failed putting message (%s-%d): %v", orderID0, int(pb.Message_ORDER), err) } } - resp, err = n.Service.SendRequest(ctx, p, &m) if err != nil { + log.Errorf("failed to send order request: %v", err) return resp, err } return resp, nil @@ -305,6 +310,7 @@ func (n *OpenBazaarNode) SendError(peerID string, k *libp2p.PubKey, errorMessage func (n *OpenBazaarNode) SendOrderConfirmation(peerID string, contract *pb.RicardianContract) error { a, err := ptypes.MarshalAny(contract) if err != nil { + log.Errorf("failed to marshal the contract: %v", err) return err } m := pb.Message{ @@ -313,17 +319,18 @@ func (n *OpenBazaarNode) SendOrderConfirmation(peerID string, contract *pb.Ricar } k, err := libp2p.UnmarshalPublicKey(contract.GetBuyerOrder().GetBuyerID().GetPubkeys().Identity) if err != nil { + log.Errorf("failed to unmarshal the publicKey: %v", err) return err } - orderID0, err := n.CalcOrderID(contract.BuyerOrder) - if err != nil { - log.Errorf("failed calculating order id") + orderID0 := contract.VendorOrderConfirmation.OrderID + if orderID0 == "" { + log.Errorf("failed fetching orderID") } else { err = n.Datastore.Messages().Put( fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_CONFIRMATION)), orderID0, pb.Message_ORDER_CONFIRMATION, peerID, repo.Message{Msg: m}) if err != nil { - log.Errorf("failed putting message (%s-%d)", orderID0, int(pb.Message_ORDER_CONFIRMATION)) + log.Errorf("failed putting message (%s-%d): %v", orderID0, int(pb.Message_ORDER_CONFIRMATION), err) } } return n.sendMessage(peerID, &k, m) @@ -352,7 +359,7 @@ func (n *OpenBazaarNode) SendCancel(peerID, orderID string) error { fmt.Sprintf("%s-%d", orderID, int(pb.Message_ORDER_CANCEL)), orderID, pb.Message_ORDER_CANCEL, peerID, repo.Message{Msg: m}) if err != nil { - log.Errorf("failed putting message (%s-%d)", orderID, int(pb.Message_ORDER_CANCEL)) + log.Errorf("failed putting message (%s-%d): %v", orderID, int(pb.Message_ORDER_CANCEL), err) } return n.sendMessage(peerID, kp, m) } @@ -361,6 +368,7 @@ func (n *OpenBazaarNode) SendCancel(peerID, orderID string) error { func (n *OpenBazaarNode) SendReject(peerID string, rejectMessage *pb.OrderReject) error { a, err := ptypes.MarshalAny(rejectMessage) if err != nil { + log.Errorf("failed to marshal the contract: %v", err) return err } m := pb.Message{ @@ -375,6 +383,7 @@ func (n *OpenBazaarNode) SendReject(peerID string, rejectMessage *pb.OrderReject } else { k, err := libp2p.UnmarshalPublicKey(order.GetBuyerOrder().GetBuyerID().GetPubkeys().Identity) if err != nil { + log.Errorf("failed to unmarshal publicKey: %v", err) return err } kp = &k @@ -383,7 +392,7 @@ func (n *OpenBazaarNode) SendReject(peerID string, rejectMessage *pb.OrderReject fmt.Sprintf("%s-%d", rejectMessage.OrderID, int(pb.Message_ORDER_REJECT)), rejectMessage.OrderID, pb.Message_ORDER_REJECT, peerID, repo.Message{Msg: m}) if err != nil { - log.Errorf("failed putting message (%s-%d)", rejectMessage.OrderID, int(pb.Message_ORDER_REJECT)) + log.Errorf("failed putting message (%s-%d): %v", rejectMessage.OrderID, int(pb.Message_ORDER_REJECT), err) } return n.sendMessage(peerID, kp, m) } @@ -392,6 +401,7 @@ func (n *OpenBazaarNode) SendReject(peerID string, rejectMessage *pb.OrderReject func (n *OpenBazaarNode) SendRefund(peerID string, refundMessage *pb.RicardianContract) error { a, err := ptypes.MarshalAny(refundMessage) if err != nil { + log.Errorf("failed to marshal the contract: %v", err) return err } m := pb.Message{ @@ -400,6 +410,7 @@ func (n *OpenBazaarNode) SendRefund(peerID string, refundMessage *pb.RicardianCo } k, err := libp2p.UnmarshalPublicKey(refundMessage.GetBuyerOrder().GetBuyerID().GetPubkeys().Identity) if err != nil { + log.Errorf("failed to unmarshal publicKey: %v", err) return err } return n.sendMessage(peerID, &k, m) @@ -409,21 +420,22 @@ func (n *OpenBazaarNode) SendRefund(peerID string, refundMessage *pb.RicardianCo func (n *OpenBazaarNode) SendOrderFulfillment(peerID string, k *libp2p.PubKey, fulfillmentMessage *pb.RicardianContract) error { a, err := ptypes.MarshalAny(fulfillmentMessage) if err != nil { + log.Errorf("failed to marshal the contract: %v", err) return err } m := pb.Message{ MessageType: pb.Message_ORDER_FULFILLMENT, Payload: a, } - orderID0, err := n.CalcOrderID(fulfillmentMessage.BuyerOrder) - if err != nil { - log.Errorf("failed calculating order id") + orderID0 := fulfillmentMessage.VendorOrderFulfillment[0].OrderId + if orderID0 != "" { + log.Errorf("failed fetching orderID") } else { err = n.Datastore.Messages().Put( fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_FULFILLMENT)), orderID0, pb.Message_ORDER_FULFILLMENT, peerID, repo.Message{Msg: m}) if err != nil { - log.Errorf("failed putting message (%s-%d)", orderID0, int(pb.Message_ORDER_FULFILLMENT)) + log.Errorf("failed putting message (%s-%d): %v", orderID0, int(pb.Message_ORDER_FULFILLMENT), err) } } return n.sendMessage(peerID, k, m) @@ -433,24 +445,22 @@ func (n *OpenBazaarNode) SendOrderFulfillment(peerID string, k *libp2p.PubKey, f func (n *OpenBazaarNode) SendOrderCompletion(peerID string, k *libp2p.PubKey, completionMessage *pb.RicardianContract) error { a, err := ptypes.MarshalAny(completionMessage) if err != nil { + log.Errorf("failed to marshal the contract: %v", err) return err } m := pb.Message{ MessageType: pb.Message_ORDER_COMPLETION, Payload: a, } - if err != nil { - return err - } - orderID0, err := n.CalcOrderID(completionMessage.BuyerOrder) - if err != nil { - log.Errorf("failed calculating order id") + orderID0 := completionMessage.BuyerOrderCompletion.OrderId + if orderID0 == "" { + log.Errorf("failed fetching orderID") } else { err = n.Datastore.Messages().Put( fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_COMPLETION)), orderID0, pb.Message_ORDER_COMPLETION, peerID, repo.Message{Msg: m}) if err != nil { - log.Errorf("failed putting message (%s-%d)", orderID0, int(pb.Message_ORDER_COMPLETION)) + log.Errorf("failed putting message (%s-%d): %v", orderID0, int(pb.Message_ORDER_COMPLETION), err) } } return n.sendMessage(peerID, k, m) @@ -460,6 +470,7 @@ func (n *OpenBazaarNode) SendOrderCompletion(peerID string, k *libp2p.PubKey, co func (n *OpenBazaarNode) SendDisputeOpen(peerID string, k *libp2p.PubKey, disputeMessage *pb.RicardianContract) error { a, err := ptypes.MarshalAny(disputeMessage) if err != nil { + log.Errorf("failed to marshal the contract: %v", err) return err } m := pb.Message{ @@ -473,6 +484,7 @@ func (n *OpenBazaarNode) SendDisputeOpen(peerID string, k *libp2p.PubKey, disput func (n *OpenBazaarNode) SendDisputeUpdate(peerID string, updateMessage *pb.DisputeUpdate) error { a, err := ptypes.MarshalAny(updateMessage) if err != nil { + log.Errorf("failed to marshal the contract: %v", err) return err } m := pb.Message{ @@ -486,6 +498,7 @@ func (n *OpenBazaarNode) SendDisputeUpdate(peerID string, updateMessage *pb.Disp func (n *OpenBazaarNode) SendDisputeClose(peerID string, k *libp2p.PubKey, resolutionMessage *pb.RicardianContract) error { a, err := ptypes.MarshalAny(resolutionMessage) if err != nil { + log.Errorf("failed to marshal the contract: %v", err) return err } m := pb.Message{ @@ -499,10 +512,12 @@ func (n *OpenBazaarNode) SendDisputeClose(peerID string, k *libp2p.PubKey, resol func (n *OpenBazaarNode) SendFundsReleasedByVendor(peerID string, marshalledPeerPublicKey []byte, orderID string) error { peerKey, err := libp2p.UnmarshalPublicKey(marshalledPeerPublicKey) if err != nil { + log.Errorf("failed to unmarshal the publicKey: %v", err) return err } payload, err := ptypes.MarshalAny(&pb.VendorFinalizedPayment{OrderID: orderID}) if err != nil { + log.Errorf("failed to marshal the finalized payment: %v", err) return err } message := pb.Message{ @@ -516,6 +531,7 @@ func (n *OpenBazaarNode) SendFundsReleasedByVendor(peerID string, marshalledPeer func (n *OpenBazaarNode) SendChat(peerID string, chatMessage *pb.Chat) error { a, err := ptypes.MarshalAny(chatMessage) if err != nil { + log.Errorf("failed to marshal the chat message: %v", err) return err } m := pb.Message{ @@ -525,6 +541,7 @@ func (n *OpenBazaarNode) SendChat(peerID string, chatMessage *pb.Chat) error { p, err := peer.IDB58Decode(peerID) if err != nil { + log.Errorf("failed to decode peerID: %v", err) return err } ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout) @@ -532,6 +549,7 @@ func (n *OpenBazaarNode) SendChat(peerID string, chatMessage *pb.Chat) error { err = n.Service.SendMessage(ctx, p, &m) if err != nil && chatMessage.Flag != pb.Chat_TYPING { if err := n.SendOfflineMessage(p, nil, &m); err != nil { + log.Errorf("failed to send offline message: %v", err) return err } } @@ -571,6 +589,7 @@ func (n *OpenBazaarNode) SendModeratorAdd(peerID string) error { } pbAny, err := ptypes.MarshalAny(sd) if err != nil { + log.Errorf("failed to marshal the signed data: %v", err) return err } m.Payload = pbAny @@ -615,6 +634,7 @@ func (n *OpenBazaarNode) SendModeratorRemove(peerID string) error { } pbAny, err := ptypes.MarshalAny(sd) if err != nil { + log.Errorf("failed to marshal the signedData: %v", err) return err } m.Payload = pbAny @@ -641,6 +661,7 @@ func (n *OpenBazaarNode) SendBlock(peerID string, id cid.Cid) error { } a, err := ptypes.MarshalAny(b) if err != nil { + log.Errorf("failed to marshal the block: %v", err) return err } m := pb.Message{ @@ -650,6 +671,7 @@ func (n *OpenBazaarNode) SendBlock(peerID string, id cid.Cid) error { p, err := peer.IDB58Decode(peerID) if err != nil { + log.Errorf("failed to decode peerID: %v", err) return err } return n.Service.SendMessage(context.Background(), p, &m) @@ -666,6 +688,7 @@ func (n *OpenBazaarNode) SendStore(peerID string, ids []cid.Cid) error { a, err := ptypes.MarshalAny(cList) if err != nil { + log.Errorf("failed to marshal the cidList: %v", err) return err } @@ -676,6 +699,7 @@ func (n *OpenBazaarNode) SendStore(peerID string, ids []cid.Cid) error { p, err := peer.IDB58Decode(peerID) if err != nil { + log.Errorf("failed to decode peerID: %v", err) return err } pmes, err := n.Service.SendRequest(context.Background(), p, &m) @@ -694,6 +718,7 @@ func (n *OpenBazaarNode) SendStore(peerID string, ids []cid.Cid) error { resp := new(pb.CidList) err = ptypes.UnmarshalAny(pmes.Payload, resp) if err != nil { + log.Errorf("failed to unmarshal the cidList: %v", err) return err } if len(resp.Cids) == 0 { From 0ee517efdc07f60eca1236a06aeb96dc7a8b0832 Mon Sep 17 00:00:00 2001 From: Ashwin Mangale Date: Fri, 26 Jul 2019 09:27:20 +0530 Subject: [PATCH 23/23] fix indentation and modify the offline scan conditional --- api/jsonapi.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/api/jsonapi.go b/api/jsonapi.go index fe1faa5a30..80bf3f98dc 100644 --- a/api/jsonapi.go +++ b/api/jsonapi.go @@ -67,6 +67,8 @@ type jsonAPIHandler struct { var lastManualScan time.Time +const OfflineMessageScanInterval = 1 * time.Minute + func newJSONAPIHandler(node *core.OpenBazaarNode, authCookie http.Cookie, config schema.APIConfig) *jsonAPIHandler { allowedIPs := make(map[string]bool) for _, ip := range config.AllowedIPs { @@ -1482,7 +1484,7 @@ func (i *jsonAPIHandler) GETProfile(w http.ResponseWriter, r *http.Request) { return } if profile.PeerID != peerID { - ErrorResponse(w, http.StatusNotFound, "invalid profile: peer id mismatch on found profile") + ErrorResponse(w, http.StatusNotFound, "invalid profile: peer id mismatch on found profile") return } w.Header().Set("Cache-Control", "public, max-age=600, immutable") @@ -4270,7 +4272,7 @@ func (i *jsonAPIHandler) POSTResendOrderMessage(w http.ResponseWriter, r *http.R err = i.node.Service.SendMessage(ctx, p, &msg.Msg) if err != nil { // If send message failed, try sending offline message - log.Errorf("resending message failed: %v", err) + log.Warningf("resending message failed: %v", err) err = i.node.SendOfflineMessage(p, nil, &msg.Msg) if err != nil { log.Errorf("resending offline message failed: %v", err) @@ -4284,14 +4286,13 @@ func (i *jsonAPIHandler) POSTResendOrderMessage(w http.ResponseWriter, r *http.R // GETScanOfflineMessages - used to manually trigger offline message scan func (i *jsonAPIHandler) GETScanOfflineMessages(w http.ResponseWriter, r *http.Request) { - t := time.Since(lastManualScan).Minutes() - if t < 100000000 { - if t >= 1 { + if lastManualScan.IsZero() { + lastManualScan = time.Now() + } else { + if time.Since(lastManualScan) >= OfflineMessageScanInterval { i.node.MessageRetriever.RunOnce() lastManualScan = time.Now() } - } else { - lastManualScan = time.Now() } SanitizedResponse(w, `{}`) }