From 0990a7ddde6955ab5c9c099ef720795c875e4e52 Mon Sep 17 00:00:00 2001 From: Wes Bonelli Date: Sun, 5 Jun 2022 08:51:10 -0400 Subject: [PATCH] debug #301 --- plantit/plantit/celery_tasks.py | 122 +++++++++++++++++--------------- 1 file changed, 65 insertions(+), 57 deletions(-) diff --git a/plantit/plantit/celery_tasks.py b/plantit/plantit/celery_tasks.py index 9cfadc4d..dcea4b0d 100644 --- a/plantit/plantit/celery_tasks.py +++ b/plantit/plantit/celery_tasks.py @@ -1,6 +1,7 @@ import json import os import traceback +from copy import deepcopy from pathlib import Path from typing import List, TypedDict, Optional from os import environ @@ -966,7 +967,7 @@ async def push_migration_event(user: User, migration: Migration): SELECT_MANAGED_FILE_BY_FID = """SELECT fid, filename, uri FROM file_managed WHERE fid = %s""" SELECT_ROOT_IMAGE = """SELECT entity_id FROM field_data_field_root_image WHERE field_root_image_fid = %s""" SELECT_ROOT_COLLECTION = """SELECT entity_id FROM field_data_field_marked_coll_root_img_ref WHERE field_marked_coll_root_img_ref_target_id = %s""" -SELECT_ROOT_COLLECTION_TITLE = """SELECT title, created, changed FROM node WHERE nid = %s""" +SELECT_ROOT_COLLECTION_TITLE = """SELECT title, created, changed FROM node WHERE nid = %s AND""" SELECT_ROOT_COLLECTION_METADATA = """SELECT field_collection_metadata_first, field_collection_metadata_second FROM field_data_field_collection_metadata WHERE entity_id = %s""" SELECT_ROOT_COLLECTION_LOCATION = """SELECT field_collection_location_lat, field_collection_location_lng FROM field_data_field_collection_location WHERE entity_id = %s""" SELECT_ROOT_COLLECTION_PLANTING = """SELECT field_collection_plantation_value FROM field_data_field_collection_plantation WHERE entity_id = %s""" @@ -981,6 +982,7 @@ async def push_migration_event(user: User, migration: Migration): @app.task(bind=True) def migrate_dirt_datasets(self, username: str): + # retrieve user, profile and migration try: user = User.objects.get(username=username) profile = Profile.objects.get(user=user) @@ -1002,10 +1004,6 @@ def migrate_dirt_datasets(self, username: str): return client.mkdir(root_collection_path) - # folder for orphans (files not associated with any collection) - orphan_subcollection_path = f"/iplant/home/{user.username}/dirt_migration/orphans" - orphan_subcollection_created = False - # create a database client for the DIRT DB and open a connection # db = Database(settings.DIRT_MIGRATION_DB_CONN_STR) # async_to_sync(db.connect)() @@ -1032,10 +1030,7 @@ def migrate_dirt_datasets(self, username: str): uploaded=False) for row in rows] # associate each root image with the collection it's a member of too - collections = dict() - - # for UI updating - uploads = [] + uploads = {'orphans': dict()} ssh = SSH( host=settings.DIRT_MIGRATION_HOST, @@ -1049,54 +1044,81 @@ def migrate_dirt_datasets(self, username: str): folders = [folder for folder in sftp.listdir(userdir)] logger.info(f"User {username} has {len(folders)} DIRT folders: {', '.join(folders)}") - # persist number of folders and fifles + # persist number of folders and files migration.num_folders = len(folders) migration.num_files = len(managed_files) migration.save() for file in managed_files: - # get Drupal entity ID given root image file ID + # get file entity ID given root image file ID cursor.execute(SELECT_ROOT_IMAGE, (file.id,)) row = cursor.fetchone() + + # if we didn't find a corresponding file node for this managed file, skip it if row is None: logger.warning(f"DIRT root image with file ID {file.id} not found") continue + file_entity_id = row[0] - # get Drupal entity ID for the collection this root image file is in + # get collection entity ID for the collection this image is in cursor.execute(SELECT_ROOT_COLLECTION, (file_entity_id,)) row = cursor.fetchone() + # if we didn't find a corresponding marked collection for this root image file, # use an orphan folder named by date (as stored on the DIRT server NFS) if row is None: logger.warning(f"DIRT root image collection with entity ID {file_entity_id} not found") - if not orphan_subcollection_created: - client.mkdir(orphan_subcollection_path) - orphan_subcollection_created = True + + # create the folder if we need to + if file.folder not in uploads.keys(): + uploads[file.folder] = dict() + client.mkdir(join(root_collection_path, file.folder)) # download the file sftp.get(join(file.folder, file.name), join(staging_dir, file.name)) # upload the file to the corresponding collection - client.upload(from_path=join(staging_dir, file.name), to_prefix=collections[coll_entity_id]) + client.upload(from_path=join(staging_dir, file.name), to_prefix=join(root_collection_path, file.folder)) # push a progress update to client - uploads.append(ManagedFile(fid=file.id, name=file.name, path=file.path, folder=file.folder, orphan=True, uploaded=True)) + uploads[file.folder][file.name] = ManagedFile( + id=file.id, + name=file.name, + path=file.path, + folder=file.folder, + orphan=True, + uploaded=True) migration.uploads = json.dumps(uploads) + migration.save() async_to_sync(push_migration_event)(user, migration) # remove file from staging dir os.remove(join(staging_dir, file.name)) + # go on to the next file continue + # otherwise we have a corresponding marked collection, get its title coll_entity_id = row[0] + cursor.execute(SELECT_ROOT_COLLECTION_TITLE, (coll_entity_id,)) + coll_title_row = cursor.fetchone() + coll_title = coll_title_row[0] + coll_created = datetime.fromtimestamp(int(coll_title_row[1])) + coll_changed = datetime.fromtimestamp(int(coll_title_row[2])) - # if we haven't encountered this collection yet.. - if coll_entity_id not in collections: - # get its title, creation/modification timestamps, metadata and environmental data - cursor.execute(SELECT_ROOT_COLLECTION_TITLE, (coll_entity_id,)) - title_row = cursor.fetchone() + if coll_title not in uploads.keys(): + uploads[coll_title] = dict() + + # create the collection in the data store + coll_path = join(root_collection_path, coll_title) + client.mkdir(coll_path) + + # get ID of newly created collection + stat = client.stat(coll_path) + id = stat['id'] + + # get its creation/modification timestamps, metadata and environmental data cursor.execute(SELECT_ROOT_COLLECTION_METADATA, (coll_entity_id,)) metadata_rows = cursor.fetchall() cursor.execute(SELECT_ROOT_COLLECTION_LOCATION, (coll_entity_id,)) @@ -1118,9 +1140,6 @@ def migrate_dirt_datasets(self, username: str): cursor.execute(SELECT_ROOT_COLLECTION_PESTICIDES, (coll_entity_id,)) pesticides_row = cursor.fetchone() - title = title_row[0] - created = datetime.fromtimestamp(int(title_row[1])) - changed = datetime.fromtimestamp(int(title_row[2])) metadata = {row[0]: row[1] for row in metadata_rows} latitude = None if location_row is None else location_row[0] longitude = None if location_row is None else location_row[0] @@ -1133,25 +1152,11 @@ def migrate_dirt_datasets(self, username: str): soil_k = None if soil_k_row is None else soil_k_row[0] pesticides = None if pesticides_row is None else pesticides_row[0] - # mark the collection as seen - collections[coll_entity_id] = join(root_collection_path, title) - - # create collection in CyVerse data store for this marked collection - collection_path = join(root_collection_path, title) - if client.dir_exists(collection_path): - logger.warning(f"Collection {collection_path} already exists, skipping") - continue - client.mkdir(collection_path) - - # get ID of newly created collection - stat = client.stat(collection_path) - id = stat['id'] - # attach metadata to collection props = [ f"migrated={timezone.now().isoformat()}", - f"created={created.isoformat()}", - f"changed={changed.isoformat()}", + f"created={coll_created.isoformat()}", + f"changed={coll_changed.isoformat()}", ] if latitude is not None: props.append(f"latitude={latitude}") if longitude is not None: props.append(f"longitude={longitude}") @@ -1170,13 +1175,19 @@ def migrate_dirt_datasets(self, username: str): sftp.get(join(file.folder, file.name), join(staging_dir, file.name)) # upload the file to the corresponding collection - client.upload(from_path=join(staging_dir, file.name), to_prefix=collections[coll_entity_id]) + client.upload(from_path=join(staging_dir, file.name), to_prefix=coll_title) # push a progress update to client - uploads.append(ManagedFile(fid=file.id, name=file.name, path=file.path, folder=file.folder, orphan=False, uploaded=True)) + uploads[file.folder][file.name] = ManagedFile( + id=file.id, + name=file.name, + path=file.path, + folder=file.folder, + orphan=False, + uploaded=True) migration.uploads = json.dumps(uploads) - async_to_sync(push_migration_event)(user, migration) migration.save() + async_to_sync(push_migration_event)(user, migration) # remove file from staging dir os.remove(join(staging_dir, file.name)) @@ -1185,33 +1196,30 @@ def migrate_dirt_datasets(self, username: str): # close the DB connection db.close() - # async_to_sync(db.disconnect)() - # get ID of newly created collection + # get ID of newly created migration collection add collection timestamp as metadata root_collection_id = client.stat(root_collection_path)['id'] - - # add collection timestamp as metadata end = timezone.now() client.set_metadata(root_collection_id, [ f"dirt_migration_timestamp={end.isoformat()}", # TODO: anything else we need to add here? ], []) - # send notification to user via email - # SnsClient.get().publish_message( - # profile.push_notification_topic_arn, - # f"DIRT => PlantIT migration completed", - # f"Duration: {str(end - start)}", - # {}) - # persist completion - end = timezone.now() - migration.completed = end + completed = timezone.now() + migration.completed = completed migration.save() # push completion update to the UI async_to_sync(push_migration_event)(user, migration) + # send notification to user via email + SnsClient.get().publish_message( + profile.push_notification_topic_arn, + f"DIRT => PlantIT migration completed", + f"Duration: {str(end - migration.started)}", + {}) + # see https://stackoverflow.com/a/41119054/6514033 # `@app.on_after_finalize.connect` is necessary for some reason instead of `@app.on_after_configure.connect`