-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
88 lines (79 loc) · 2.33 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package main
import (
"context"
"flag"
"fmt"
"gcs-redis-operator/config"
"gcs-redis-operator/constants"
"gcs-redis-operator/dto"
"gcs-redis-operator/redis"
"gcs-redis-operator/utils"
"github.com/bytedance/sonic"
"log"
"os"
"reflect"
"strconv"
"strings"
)
func main() {
var (
gcsPath string
redisInstance string
redisOperation string
expiry string
batchSize string
)
config.InitConfig()
flag.StringVar(&gcsPath, "gcsPath", "", "Gcs Path")
flag.StringVar(&redisInstance, "redisInstance", "", "Redis Instance ID")
flag.StringVar(&redisOperation, "operation", "", "Redis operation to be performed")
flag.StringVar(&expiry, "expiry", "", "Redis keys expiry in seconds")
flag.StringVar(&batchSize, "batchSize", constants.DEFAULT_BATCH_SIZE, "Batch size to be pushed in redis from gcs")
flag.Parse()
if gcsPath == "" || redisInstance == "" || redisOperation == "" || expiry == "" {
fmt.Println("Error: Required flags are missing or invalid.")
flag.PrintDefaults()
return
}
redisBatchSize, err := strconv.Atoi(batchSize)
if err != nil {
log.Fatal(err)
return
}
var gcsBucketName, gcsObjectPath string
gcsPathParts := strings.Split(gcsPath, "//")
if len(gcsPathParts) < 2 {
fmt.Println("Invalid gcs path")
return
}
gcsBucketAndObjectPath := gcsPathParts[1]
parts := strings.SplitN(gcsBucketAndObjectPath, "/", 2)
if len(parts) == 2 {
gcsBucketName = parts[0]
gcsObjectPath = parts[1]
} else {
fmt.Println("Invalid gcs path")
return
}
fmt.Println("Starting GCS to Redis job....")
fmt.Println("Gcs Bucket Name:", gcsBucketName)
fmt.Println("Gcs Object Path:", gcsObjectPath)
fmt.Println("Redis Instance Id:", redisInstance)
fmt.Println("Redis Operation to be performed:", redisOperation)
fmt.Println("Batch Size:", redisBatchSize)
redisConfig := config.GetRedisConfig(redisInstance)
ctx := context.Background()
redisClient := redis.NewRedisClusterClient(redisConfig, expiry)
dataChannel := make(chan dto.PostStaticRedisMsetMsg, constants.DATA_CHANNEL_SIZE)
done := make(chan bool, 1)
var v dto.PostStaticFeatures
err = sonic.Pretouch(reflect.TypeOf(v))
if err != nil {
log.Fatal(err)
}
go utils.ReadGcsUncompressedFile(ctx, gcsBucketName, gcsObjectPath, dataChannel, done)
if redisOperation == "mset" {
redisClient.ProcessMset(dataChannel, done, redisBatchSize)
}
os.Exit(0)
}