Skip to content

Commit

Permalink
add pycyapi as a dependency (#299), DIRT data transfer task full impl…
Browse files Browse the repository at this point in the history
…ementation (#301)
  • Loading branch information
wpbonelli committed Apr 28, 2022
1 parent a557adf commit 721bf79
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 11 deletions.
3 changes: 2 additions & 1 deletion dockerfiles/plantit/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@ flower==1.0.0
scipy==1.8.0
pandas==1.4.0
statsmodels==0.13.1
furo
furo
pycyapi
62 changes: 52 additions & 10 deletions plantit/plantit/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from django.contrib.auth.models import User
from django.core.cache import cache
from django.utils import timezone
from pycyapi.clients import TerrainClient

import plantit.healthchecks
import plantit.mapbox
Expand Down Expand Up @@ -894,7 +895,7 @@ def agents_healthchecks():
# DIRT migration task

@app.task(bind=True)
def check_dirt_datasets(self, username: str):
def migrate_dirt_datasets(self, username: str):
try:
user = User.objects.get(username=username)
profile = Profile.objects.get(user=user)
Expand All @@ -903,12 +904,15 @@ def check_dirt_datasets(self, username: str):
self.request.callbacks = None
return

# record starting time
start = timezone.now()

# create SSH/SFTP client
ssh = SSH(
host='tucco.cyverse.org',
port=1657,
username=settings.CYVERSE_USERNAME,
pkey=str(get_user_private_key_path(settings.CYVERSE_USERNAME)))

with ssh.client.open_sftp() as sftp:

# list the user's datasets on the DIRT server
Expand All @@ -917,6 +921,15 @@ def check_dirt_datasets(self, username: str):
new_line = '\n'
logger.info(f"User {username} has {len(datasets)} datasets:{new_line}{new_line.join(datasets)}")

# create a client for the CyVerse APIs and create a collection for the migrated DIRT data
client = TerrainClient(access_token=profile.cyverse_access_token)
root_collection_path = f"/iplant/home/{user.username}/dirt_migration"
if client.dir_exists(root_collection_path):
logger.warning(f"Collection {root_collection_path} already exists, aborting DIRT migration for {user.username}")
return
else:
client.create_directory(root_collection_path)

# transfer all the user's datasets to the temporary staging directory on this server
for folder in datasets:
files = [f for f in sftp.listdir(join(user_dir, folder))]
Expand All @@ -926,20 +939,49 @@ def check_dirt_datasets(self, username: str):
for file in files:
sftp.get(file.filename, join(settings.DIRT_STAGING_DIR, folder, file.filename))

# create collection in user's home data store directory
# create subcollection for this folder
collection_path = join(root_collection_path, folder.rpartition('/')[2])
if client.dir_exists(collection_path):
logger.warning(f"Collection {collection_path} already exists, aborting DIRT migration for {user.username}")
return
else:
client.create_directory(collection_path)

# upload all files to collection
client.upload_directory(
from_path=join(settings.DIRT_STAGING_DIR, folder),
to_prefix=collection_path)

# get ID of newly created collection
id = client.stat(collection_path)['id']

# mark collection as originating from DIRT

# send notification to user via email

# mark user's profile that DIRT transfer has been completed
profile.dirt_migrated = True
profile.save()
user.save()
client.set_metadata(id, [
f"dirt_migration_timestamp={timezone.now().isoformat()}",
# TODO: anything else we need to add here?
])

# get ID of newly created collection
root_collection_id = client.stat(root_collection_path)['id']

# mark root collection timestamp as metadata
end = timezone.now()
client.set_metadata(root_collection_id, [
f"dirt_migration_timestamp={end.isoformat()}",
# TODO: anything else we need to add here?
])

# send notification to user via email
SnsClient.get().publish_message(
profile.push_notification_topic_arn,
f"DIRT => PlantIT migration completed",
f"Duration: {str(end - start)}",
{})

# mark user's profile that DIRT transfer has been completed
profile.dirt_migrated = True
profile.save()
user.save()


# see https://stackoverflow.com/a/41119054/6514033
Expand Down

0 comments on commit 721bf79

Please sign in to comment.