Skip to content

Commit

Permalink
debug #301
Browse files Browse the repository at this point in the history
  • Loading branch information
wpbonelli committed Jun 5, 2022
1 parent 06110c1 commit 0990a7d
Showing 1 changed file with 65 additions and 57 deletions.
122 changes: 65 additions & 57 deletions plantit/plantit/celery_tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os
import traceback
from copy import deepcopy
from pathlib import Path
from typing import List, TypedDict, Optional
from os import environ
@@ -966,7 +967,7 @@ async def push_migration_event(user: User, migration: Migration):
SELECT_MANAGED_FILE_BY_FID = """SELECT fid, filename, uri FROM file_managed WHERE fid = %s"""
SELECT_ROOT_IMAGE = """SELECT entity_id FROM field_data_field_root_image WHERE field_root_image_fid = %s"""
SELECT_ROOT_COLLECTION = """SELECT entity_id FROM field_data_field_marked_coll_root_img_ref WHERE field_marked_coll_root_img_ref_target_id = %s"""
SELECT_ROOT_COLLECTION_TITLE = """SELECT title, created, changed FROM node WHERE nid = %s"""
SELECT_ROOT_COLLECTION_TITLE = """SELECT title, created, changed FROM node WHERE nid = %s AND"""
SELECT_ROOT_COLLECTION_METADATA = """SELECT field_collection_metadata_first, field_collection_metadata_second FROM field_data_field_collection_metadata WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_LOCATION = """SELECT field_collection_location_lat, field_collection_location_lng FROM field_data_field_collection_location WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_PLANTING = """SELECT field_collection_plantation_value FROM field_data_field_collection_plantation WHERE entity_id = %s"""
@@ -981,6 +982,7 @@ async def push_migration_event(user: User, migration: Migration):

@app.task(bind=True)
def migrate_dirt_datasets(self, username: str):
# retrieve user, profile and migration
try:
user = User.objects.get(username=username)
profile = Profile.objects.get(user=user)
@@ -1002,10 +1004,6 @@ def migrate_dirt_datasets(self, username: str):
return
client.mkdir(root_collection_path)

# folder for orphans (files not associated with any collection)
orphan_subcollection_path = f"/iplant/home/{user.username}/dirt_migration/orphans"
orphan_subcollection_created = False

# create a database client for the DIRT DB and open a connection
# db = Database(settings.DIRT_MIGRATION_DB_CONN_STR)
# async_to_sync(db.connect)()
@@ -1032,10 +1030,7 @@ def migrate_dirt_datasets(self, username: str):
uploaded=False) for row in rows]

# associate each root image with the collection it's a member of too
collections = dict()

# for UI updating
uploads = []
uploads = {'orphans': dict()}

ssh = SSH(
host=settings.DIRT_MIGRATION_HOST,
@@ -1049,54 +1044,81 @@ def migrate_dirt_datasets(self, username: str):
folders = [folder for folder in sftp.listdir(userdir)]
logger.info(f"User {username} has {len(folders)} DIRT folders: {', '.join(folders)}")

# persist number of folders and fifles
# persist number of folders and files
migration.num_folders = len(folders)
migration.num_files = len(managed_files)
migration.save()

for file in managed_files:
# get Drupal entity ID given root image file ID
# get file entity ID given root image file ID
cursor.execute(SELECT_ROOT_IMAGE, (file.id,))
row = cursor.fetchone()

# if we didn't find a corresponding file node for this managed file, skip it
if row is None:
logger.warning(f"DIRT root image with file ID {file.id} not found")
continue

file_entity_id = row[0]

# get Drupal entity ID for the collection this root image file is in
# get collection entity ID for the collection this image is in
cursor.execute(SELECT_ROOT_COLLECTION, (file_entity_id,))
row = cursor.fetchone()

# if we didn't find a corresponding marked collection for this root image file,
# use an orphan folder named by date (as stored on the DIRT server NFS)
if row is None:
logger.warning(f"DIRT root image collection with entity ID {file_entity_id} not found")
if not orphan_subcollection_created:
client.mkdir(orphan_subcollection_path)
orphan_subcollection_created = True

# create the folder if we need to
if file.folder not in uploads.keys():
uploads[file.folder] = dict()
client.mkdir(join(root_collection_path, file.folder))

# download the file
sftp.get(join(file.folder, file.name), join(staging_dir, file.name))

# upload the file to the corresponding collection
client.upload(from_path=join(staging_dir, file.name), to_prefix=collections[coll_entity_id])
client.upload(from_path=join(staging_dir, file.name), to_prefix=join(root_collection_path, file.folder))

# push a progress update to client
uploads.append(ManagedFile(fid=file.id, name=file.name, path=file.path, folder=file.folder, orphan=True, uploaded=True))
uploads[file.folder][file.name] = ManagedFile(
id=file.id,
name=file.name,
path=file.path,
folder=file.folder,
orphan=True,
uploaded=True)
migration.uploads = json.dumps(uploads)
migration.save()
async_to_sync(push_migration_event)(user, migration)

# remove file from staging dir
os.remove(join(staging_dir, file.name))

# go on to the next file
continue

# otherwise we have a corresponding marked collection, get its title
coll_entity_id = row[0]
cursor.execute(SELECT_ROOT_COLLECTION_TITLE, (coll_entity_id,))
coll_title_row = cursor.fetchone()
coll_title = coll_title_row[0]
coll_created = datetime.fromtimestamp(int(coll_title_row[1]))
coll_changed = datetime.fromtimestamp(int(coll_title_row[2]))

# if we haven't encountered this collection yet..
if coll_entity_id not in collections:
# get its title, creation/modification timestamps, metadata and environmental data
cursor.execute(SELECT_ROOT_COLLECTION_TITLE, (coll_entity_id,))
title_row = cursor.fetchone()
if coll_title not in uploads.keys():
uploads[coll_title] = dict()

# create the collection in the data store
coll_path = join(root_collection_path, coll_title)
client.mkdir(coll_path)

# get ID of newly created collection
stat = client.stat(coll_path)
id = stat['id']

# get its creation/modification timestamps, metadata and environmental data
cursor.execute(SELECT_ROOT_COLLECTION_METADATA, (coll_entity_id,))
metadata_rows = cursor.fetchall()
cursor.execute(SELECT_ROOT_COLLECTION_LOCATION, (coll_entity_id,))
@@ -1118,9 +1140,6 @@ def migrate_dirt_datasets(self, username: str):
cursor.execute(SELECT_ROOT_COLLECTION_PESTICIDES, (coll_entity_id,))
pesticides_row = cursor.fetchone()

title = title_row[0]
created = datetime.fromtimestamp(int(title_row[1]))
changed = datetime.fromtimestamp(int(title_row[2]))
metadata = {row[0]: row[1] for row in metadata_rows}
latitude = None if location_row is None else location_row[0]
longitude = None if location_row is None else location_row[0]
@@ -1133,25 +1152,11 @@ def migrate_dirt_datasets(self, username: str):
soil_k = None if soil_k_row is None else soil_k_row[0]
pesticides = None if pesticides_row is None else pesticides_row[0]

# mark the collection as seen
collections[coll_entity_id] = join(root_collection_path, title)

# create collection in CyVerse data store for this marked collection
collection_path = join(root_collection_path, title)
if client.dir_exists(collection_path):
logger.warning(f"Collection {collection_path} already exists, skipping")
continue
client.mkdir(collection_path)

# get ID of newly created collection
stat = client.stat(collection_path)
id = stat['id']

# attach metadata to collection
props = [
f"migrated={timezone.now().isoformat()}",
f"created={created.isoformat()}",
f"changed={changed.isoformat()}",
f"created={coll_created.isoformat()}",
f"changed={coll_changed.isoformat()}",
]
if latitude is not None: props.append(f"latitude={latitude}")
if longitude is not None: props.append(f"longitude={longitude}")
@@ -1170,13 +1175,19 @@ def migrate_dirt_datasets(self, username: str):
sftp.get(join(file.folder, file.name), join(staging_dir, file.name))

# upload the file to the corresponding collection
client.upload(from_path=join(staging_dir, file.name), to_prefix=collections[coll_entity_id])
client.upload(from_path=join(staging_dir, file.name), to_prefix=coll_title)

# push a progress update to client
uploads.append(ManagedFile(fid=file.id, name=file.name, path=file.path, folder=file.folder, orphan=False, uploaded=True))
uploads[file.folder][file.name] = ManagedFile(
id=file.id,
name=file.name,
path=file.path,
folder=file.folder,
orphan=False,
uploaded=True)
migration.uploads = json.dumps(uploads)
async_to_sync(push_migration_event)(user, migration)
migration.save()
async_to_sync(push_migration_event)(user, migration)

# remove file from staging dir
os.remove(join(staging_dir, file.name))
@@ -1185,33 +1196,30 @@ def migrate_dirt_datasets(self, username: str):

# close the DB connection
db.close()
# async_to_sync(db.disconnect)()

# get ID of newly created collection
# get ID of newly created migration collection add collection timestamp as metadata
root_collection_id = client.stat(root_collection_path)['id']

# add collection timestamp as metadata
end = timezone.now()
client.set_metadata(root_collection_id, [
f"dirt_migration_timestamp={end.isoformat()}",
# TODO: anything else we need to add here?
], [])

# send notification to user via email
# SnsClient.get().publish_message(
# profile.push_notification_topic_arn,
# f"DIRT => PlantIT migration completed",
# f"Duration: {str(end - start)}",
# {})

# persist completion
end = timezone.now()
migration.completed = end
completed = timezone.now()
migration.completed = completed
migration.save()

# push completion update to the UI
async_to_sync(push_migration_event)(user, migration)

# send notification to user via email
SnsClient.get().publish_message(
profile.push_notification_topic_arn,
f"DIRT => PlantIT migration completed",
f"Duration: {str(end - migration.started)}",
{})


# see https://stackoverflow.com/a/41119054/6514033
# `@app.on_after_finalize.connect` is necessary for some reason instead of `@app.on_after_configure.connect`

0 comments on commit 0990a7d

Please sign in to comment.