Skip to content

Commit

Permalink
Add the object it is looking for with McRead
Browse files Browse the repository at this point in the history
  • Loading branch information
asteel-gsa committed May 21, 2024
1 parent 7370514 commit 3f790f9
Showing 1 changed file with 40 additions and 35 deletions.
75 changes: 40 additions & 35 deletions cmd/s3_to_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,45 +21,50 @@ func bucket_to_local_tables(
s3path *structs.S3Path,
) {

mc_pipe := pipes.McRead(
bucket_creds,
fmt.Sprintf("%s%s", s3path.Bucket, s3path.Key),
).FilterLine(func(s string) string {
if strings.Contains(s, "CREATE") {
fmt.Printf("REPLACING IN %s\n", s)
}
if strings.Contains(s, "CREATE TABLE") {
return strings.Replace(s, "CREATE TABLE", "CREATE TABLE IF NOT EXISTS", -1)
} else if strings.Contains(s, "CREATE INDEX") {
return strings.Replace(s, "CREATE INDEX", "CREATE INDEX IF NOT EXISTS", -1)
} else {
return s
table_to_schema := get_table_and_schema_names(db_creds)
//fmt.Sprintf("%s%s/%s-%s.dump", s3path.Bucket, s3path.Key, schema, table)
for table, schema := range table_to_schema {
truncate_tables(db_creds, []string{table})
mc_pipe := pipes.McRead(
bucket_creds,
fmt.Sprintf("%s%s/%s-%s.dump", s3path.Bucket, s3path.Key, schema, table),
).FilterLine(func(s string) string {
if strings.Contains(s, "CREATE") {
fmt.Printf("REPLACING IN %s\n", s)
}
if strings.Contains(s, "CREATE TABLE") {
return strings.Replace(s, "CREATE TABLE", "CREATE TABLE IF NOT EXISTS", -1)
} else if strings.Contains(s, "CREATE INDEX") {
return strings.Replace(s, "CREATE INDEX", "CREATE INDEX IF NOT EXISTS", -1)
} else {
return s
}
})
psql_pipe := pipes.Psql(mc_pipe, db_creds)

exit_code := 0
stdout, _ := mc_pipe.String()
if strings.Contains(stdout, "ERR") {
logging.Logger.Printf("S3TODB `mc` reported an error\n")
logging.Logger.Println(stdout)
exit_code = logging.PIPE_FAILURE
}
})
psql_pipe := pipes.Psql(mc_pipe, db_creds)

exit_code := 0
stdout, _ := mc_pipe.String()
if strings.Contains(stdout, "ERR") {
logging.Logger.Printf("S3TODB `mc` reported an error\n")
logging.Logger.Println(stdout)
exit_code = logging.PIPE_FAILURE
}

if mc_pipe.Error() != nil {
logging.Logger.Println("S3TODB `dump | mc` pipe failed")
exit_code = logging.PIPE_FAILURE
}
if mc_pipe.Error() != nil {
logging.Logger.Println("S3TODB `dump | mc` pipe failed")
exit_code = logging.PIPE_FAILURE
}

stdout, _ = psql_pipe.String()
if strings.Contains(stdout, "ERR") {
logging.Logger.Printf("S3TODB database reported an error\n")
logging.Logger.Println(stdout)
exit_code = logging.PIPE_FAILURE
}
stdout, _ = psql_pipe.String()
if strings.Contains(stdout, "ERR") {
logging.Logger.Printf("S3TODB database reported an error\n")
logging.Logger.Println(stdout)
exit_code = logging.PIPE_FAILURE
}

if exit_code != 0 {
os.Exit(exit_code)
if exit_code != 0 {
os.Exit(exit_code)
}
}

}
Expand Down

0 comments on commit 3f790f9

Please sign in to comment.