-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathbatch.go
88 lines (75 loc) · 1.89 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package mongods
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/ipfs/go-datastore"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
var (
ErrBatchAlreadyCommited = errors.New("batch already commited")
)
type mongoBatch struct {
lock sync.Mutex
commited bool
deletes map[datastore.Key]struct{}
upserts map[datastore.Key][]byte
ds *MongoDS
}
func (mb *mongoBatch) Put(key datastore.Key, val []byte) error {
mb.lock.Lock()
defer mb.lock.Unlock()
if mb.commited {
return ErrBatchAlreadyCommited
}
mb.upserts[key] = val
delete(mb.deletes, key)
return nil
}
func (mb *mongoBatch) Delete(key datastore.Key) error {
mb.lock.Lock()
defer mb.lock.Unlock()
if mb.commited {
return ErrBatchAlreadyCommited
}
mb.deletes[key] = struct{}{}
delete(mb.upserts, key)
return nil
}
func (mb *mongoBatch) Commit() error {
mb.lock.Lock()
defer mb.lock.Unlock()
if mb.commited {
return ErrBatchAlreadyCommited
}
operations := make([]mongo.WriteModel, 0, len(mb.deletes)+len(mb.upserts))
if cap(operations) == 0 {
mb.commited = true
return nil
}
for k := range mb.deletes {
delOp := mongo.NewDeleteOneModel()
delOp.SetFilter(bson.M{"_id": k.String()})
operations = append(operations, delOp)
}
for k, v := range mb.upserts {
upsOp := mongo.NewUpdateOneModel()
upsOp.SetUpsert(true)
upsOp.SetFilter(bson.M{"_id": k.String()})
upsOp.SetUpdate(bson.M{"$set": bson.M{"v": v}})
operations = append(operations, upsOp)
}
bulkOption := options.BulkWriteOptions{}
bulkOption.SetOrdered(false) // Will do things in parallel
ctx, cls := context.WithTimeout(context.Background(), mb.ds.opTimeout*time.Duration(len(operations)))
defer cls()
if _, err := mb.ds.col.BulkWrite(ctx, operations, &bulkOption); err != nil {
return fmt.Errorf("committing batch: %s", err)
}
mb.commited = true
return nil
}