-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstore.go
75 lines (58 loc) · 2.05 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main
import (
"context"
"fmt"
"time"
"cloud.google.com/go/storage"
"github.com/google/uuid"
)
func uploadTempObject(client *storage.Client, bucket, object string, data []byte) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*50)
defer cancel()
// upload an object with storage.Writer.
wc := client.Bucket(bucket).Object(object).NewWriter(ctx)
wc.ChunkSize = 0 // note retries are not supported for chunk size 0.
// buffer := bytes.NewBuffer(data)
if _, err := wc.Write(data); err != nil {
return fmt.Errorf("error writing data to gcs: %w", err)
}
// if _, err := io.Copy(wc, buffer); err != nil {
// return fmt.Errorf("error uploading file: %w", err)
// }
// data can continue to be added to the file until the writer is closed.
if err := wc.Close(); err != nil {
return fmt.Errorf("error closing writer: %w", err)
}
fmt.Println("Temp file uploaded successfully")
return nil
}
func composeObjects(client *storage.Client, bucket, tempObject, destObject string) error {
// append time object in the final object
tempObj := client.Bucket(bucket).Object(tempObject)
destObj := client.Bucket(bucket).Object(destObject)
// compose temp object with existing object
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
_, err := destObj.ComposerFrom(destObj, tempObj).Run(ctx)
if err != nil {
return fmt.Errorf("error when composing objects: %w", err)
}
// delete temp object
if err = tempObj.Delete(ctx); err != nil {
return fmt.Errorf("error when deleting objects: %w", err)
}
fmt.Printf("New composite object %s was created by combining %s and %s\n", destObject, destObject, tempObject)
return nil
}
func storeBatch(app *App, data []byte) error {
uuid := uuid.New()
bucket := app.storageBucket
destObject := "log.pb"
tempObject := "temp-log-" + uuid.String() + ".pb"
// insert temp object
err := uploadTempObject(app.storageClient, bucket, tempObject, data)
if err != nil {
return err
}
return composeObjects(app.storageClient, bucket, tempObject, destObject)
}