-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(PXP-6815): update MDS object after file upload. #29
Changes from 11 commits
462fd0b
b57f40c
c75cd39
8ed7bfb
ede6e48
e1acffa
0b83fb1
4b9944e
88f34be
1836be6
e27a4b0
7c87976
4f03844
7c38369
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,47 +2,68 @@ package handlers | |
|
||
import ( | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"log" | ||
"net/http" | ||
"net/url" | ||
"os" | ||
"strings" | ||
"time" | ||
) | ||
|
||
// MaxRetries maximum number of retries | ||
const MaxRetries = 5 | ||
|
||
type ConfigInfo struct { | ||
Indexd IndexdInfo `indexd` | ||
MetadataService MetadataServiceInfo `metadata_service` | ||
} | ||
|
||
type IndexdInfo struct { | ||
URL string `url` | ||
Username string `username` | ||
Password string `password` | ||
} | ||
|
||
func minOf(vars ...int64) int64 { | ||
min := vars[0] | ||
type MetadataServiceInfo struct { | ||
URL string `url` | ||
Username string `username` | ||
Password string `password` | ||
} | ||
|
||
for _, i := range vars { | ||
if min > i { | ||
min = i | ||
// Read Indexd and Metadata Service config info from CONFIG_FILE into | ||
// ConfigInfo struct. Panic if Indexd creds could not be found | ||
func getConfigInfo() *ConfigInfo { | ||
configInfo := new(ConfigInfo) | ||
configBytes := []byte(os.Getenv("CONFIG_FILE")) | ||
log.Printf("Attempting to unmarshal both Indexd and Metadata Service configs from JSON in CONFIG_FILE env variable") | ||
if err := json.Unmarshal(configBytes, configInfo); err != nil { | ||
log.Panicf("Could not unmarshal JSON in CONFIG_FILE env variable: %s", err) | ||
} | ||
if configInfo.Indexd == (IndexdInfo{}) { | ||
log.Printf("Could not find required Indexd config when unmarshalling both Indexd and Metadata Service configs. Trying again to only unmarshal Indexd config") | ||
if err := json.Unmarshal(configBytes, &configInfo.Indexd); err != nil { | ||
log.Panicf("Could not unmarshal JSON in CONFIG_FILE env variable: %s", err) | ||
} | ||
if configInfo.Indexd == (IndexdInfo{}) { | ||
log.Panicf("Could not find required Indexd config in JSON in CONFIG_FILE env variable") | ||
} | ||
} | ||
|
||
return min | ||
} | ||
func getIndexServiceInfo() (*IndexdInfo, error) { | ||
indexdInfo := new(IndexdInfo) | ||
if err := json.Unmarshal([]byte(os.Getenv("CONFIG_FILE")), indexdInfo); err != nil { | ||
return nil, errors.New("Enviroiment variable CONFIG_FILE is not set correctly") | ||
log.Printf("Indexd config was unmarshalled") | ||
if configInfo.MetadataService != (MetadataServiceInfo{}) { | ||
log.Printf("Metadata Service config was unmarshalled") | ||
} else { | ||
log.Printf("Metadata Service config was not unmarshalled") | ||
} | ||
return indexdInfo, nil | ||
|
||
return configInfo | ||
} | ||
|
||
// IndexS3Object indexes s3 object | ||
// The fuction does several things. It first downloads the object from | ||
// S3, computes size and hashes, and update indexd | ||
// IndexS3Object indexes s3 object The fuction does several things. It | ||
// downloads the object from S3, computes size and hashes, and updates Indexd | ||
// and potentially Metadata Service | ||
func IndexS3Object(s3objectURL string) { | ||
configInfo := getConfigInfo() | ||
|
||
s3objectURL, _ = url.QueryUnescape(s3objectURL) | ||
u, err := url.Parse(s3objectURL) | ||
if err != nil { | ||
|
@@ -63,6 +84,21 @@ func IndexS3Object(s3objectURL string) { | |
} else { | ||
uuid = strings.Join(split_key[:len(split_key)-1], "/") | ||
} | ||
filename := split_key[len(split_key)-1] | ||
|
||
log.Printf("Attempting to get rev for record %s in Indexd", uuid) | ||
rev, err := GetIndexdRecordRev(uuid, configInfo.Indexd.URL) | ||
mdsUploadedBody := fmt.Sprintf(`{"_upload_status": "uploaded", "_filename": "%s"}`, filename) | ||
if err != nil { | ||
log.Panicf("Can not get record %s from Indexd. Error message %s", uuid, err) | ||
} else if rev == "" { | ||
log.Printf("Indexd record with guid %s already has size and hashes", uuid) | ||
updateMetadataObjectWrapper(uuid, configInfo, mdsUploadedBody) | ||
return | ||
} | ||
log.Printf("Got rev %s from Indexd for record %s", rev, uuid) | ||
|
||
updateMetadataObjectWrapper(uuid, configInfo, `{"_upload_status": "indexs3client job calculating hashes and size"}`) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. all the others statuses in the design doc are a single word (or underscore-separated words) you might want to check with @Avantol13 if this is an acceptable status. if not, maybe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah just one thing to keep in mind is what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, that's not detailed in the design doc, i'll let you and Alex make that call |
||
|
||
client, err := CreateNewAwsClient() | ||
if err != nil { | ||
|
@@ -77,54 +113,16 @@ func IndexS3Object(s3objectURL string) { | |
} | ||
log.Printf("Finish to compute hashes for %s", key) | ||
|
||
indexdInfo, _ := getIndexServiceInfo() | ||
|
||
var retries = 0 | ||
var rev = "" | ||
|
||
for { | ||
rev, err = GetIndexdRecordRev(uuid, indexdInfo.URL) | ||
if err != nil { | ||
retries++ | ||
log.Printf("Error: %s. Retry: %d", err, retries) | ||
time.Sleep(5 * time.Second) | ||
} else if rev == "" { | ||
log.Println("The file already has size and hashes") | ||
return | ||
} else { | ||
break | ||
} | ||
if retries == MaxRetries { | ||
log.Panicf("Can not get record %s from indexd. Error message %s", uuid, err) | ||
} | ||
} | ||
|
||
body := fmt.Sprintf(`{"size": %d, "urls": ["%s"], "hashes": {"md5": "%s", "sha1":"%s", "sha256": "%s", "sha512": "%s", "crc": "%s"}}`, | ||
indexdHashesBody := fmt.Sprintf(`{"size": %d, "urls": ["%s"], "hashes": {"md5": "%s", "sha1":"%s", "sha256": "%s", "sha512": "%s", "crc": "%s"}}`, | ||
objectSize, s3objectURL, hashes.Md5, hashes.Sha1, hashes.Sha256, hashes.Sha512, hashes.Crc32c) | ||
|
||
retries = 0 | ||
for { | ||
resp, err := UpdateIndexdRecord(uuid, rev, indexdInfo, []byte(body)) | ||
if err != nil { | ||
retries++ | ||
log.Printf("Error: %s. Retry: %d", err, retries) | ||
time.Sleep(5 * time.Second) | ||
} else if resp.StatusCode != 200 { | ||
log.Printf("StatusCode: %d. Retry: %d", resp.StatusCode, retries) | ||
retries++ | ||
time.Sleep(5 * time.Second) | ||
} else { | ||
log.Printf("Finish updating the record %s. Response Status: %d. Body %s", uuid, resp.StatusCode, body) | ||
break | ||
} | ||
|
||
if retries == MaxRetries { | ||
if err == nil { | ||
log.Panicf("Can not update %s with hash info. Body %s. Status code %d. Detail %s", uuid, body, resp.StatusCode, err) | ||
} else { | ||
log.Panicf("Can not update %s with hash info. Body %s. Detail %s", uuid, body, err) | ||
} | ||
break | ||
} | ||
log.Printf("Attempting to update Indexd record %s. Request Body: %s", uuid, indexdHashesBody) | ||
resp, err := UpdateIndexdRecord(uuid, rev, &configInfo.Indexd, []byte(indexdHashesBody)) | ||
if err != nil { | ||
log.Panicf("Could not update Indexd record %s. Error: %s", uuid, err) | ||
} else if resp.StatusCode != http.StatusOK { | ||
log.Panicf("Could not update Indexd record %s. Response Status Code: %d", uuid, resp.StatusCode) | ||
} | ||
log.Printf("Updated Indexd record %s with hash info. Response Status Code: %d", uuid, resp.StatusCode) | ||
|
||
updateMetadataObjectWrapper(uuid, configInfo, mdsUploadedBody) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the ticket and design doc mention
_file_type
but not_filename
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I talked with @Avantol13 about this. We thought
_filename
could be preferable. I just updated the design doc to reflect this.