-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdeduplicate.go
142 lines (116 loc) · 3.04 KB
/
deduplicate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package main
import (
"database/sql"
"fmt"
"github.com/lib/pq"
)
func withTransaction(db *sql.DB, fn func(db *sql.Tx) error) error {
// begin a transaction
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("error beginning transaction: %s", err)
}
defer tx.Rollback() // rollback the transaction if it's not committed
err = fn(tx) // Run action
if err != nil {
return err
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("error committing transaction: %s", err)
}
return nil
}
func fillTempTable(tx *sql.Tx, batch Batch) error {
_, err := tx.Exec(`
CREATE TEMP TABLE temp_logs (log_id UUID) ON COMMIT DROP;
CREATE INDEX temp_logs_log_id_idx ON temp_logs USING HASH (log_id);
`)
if err != nil {
return fmt.Errorf("error creating temp table: %s", err)
}
// prepare copy command
stmt, _ := tx.Prepare(pq.CopyIn("temp_logs", "log_id"))
for key := range batch {
_, err := stmt.Exec(key)
if err != nil {
return fmt.Errorf("error creating COPY handler: %s", err)
}
}
defer stmt.Close()
// consolidate copy command
result, err := stmt.Exec()
if err != nil {
return fmt.Errorf("error running COPY command: %s", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("error checking affected COPY rows: %s", err)
}
fmt.Println("Rows inserted into temp table:", rows)
return nil
}
func fillLogs(tx *sql.Tx, batch Batch) error {
// prepare copy command
stmt, err := tx.Prepare(pq.CopyIn("processed_logs", "log_id"))
if err != nil {
return fmt.Errorf("error creating COPY handler: %w", err)
}
for key := range batch {
if _, err := stmt.Exec(key); err != nil {
return fmt.Errorf("error creating COPY handler: %w", err)
}
}
defer stmt.Close()
// consolidate copy command
result, err := stmt.Exec()
if err != nil {
return fmt.Errorf("error running COPY command: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("error checking affected COPY rows: %w", err)
}
fmt.Println("Rows inserted into final table:", rows)
return nil
}
func deduplicateBatch(tx *sql.Tx, batch Batch) (Batch, error) {
// lock the processed_logs table
if err := acquireLock(tx); err != nil {
return nil, err
}
// fill temp table with batch logs
if err := fillTempTable(tx, batch); err != nil {
return nil, err
}
// find duplicated logs
rows, err := tx.Query(`
SELECT t.log_id
FROM temp_logs t
INNER JOIN processed_logs p ON t.log_id = p.log_id
`)
if err != nil {
return nil, fmt.Errorf("error selecting new logs: %w", err)
}
defer rows.Close()
for rows.Next() {
var logId string
if err := rows.Scan(&logId); err != nil {
return nil, fmt.Errorf("error scanning row: %w", err)
}
// remove duplicated item from the batch
delete(batch, logId)
}
err = fillLogs(tx, batch)
if err != nil {
return nil, err
}
return batch, nil
}
func acquireLock(tx *sql.Tx) error {
_, err := tx.Exec("LOCK TABLE processed_logs IN ACCESS EXCLUSIVE MODE")
if err != nil {
return fmt.Errorf("error acquiring table lock: %s", err)
}
return nil
}