Skip to content

Commit

Permalink
Merge pull request #624 from catalystneuro/expose_jobs_to_dandi_upload
Browse files Browse the repository at this point in the history
Expose DANDI upload job parameters
  • Loading branch information
CodyCBakerPhD authored Oct 31, 2023
2 parents 200d800 + d5d7167 commit bad291f
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* Adding radius as a conversion factor in `FicTracDataInterface`. [PR #619](https://github.com/catalystneuro/neuroconv/pull/619)
* Coerce `FicTracDataInterface` original timestamps to start from 0. [PR #619](https://github.com/catalystneuro/neuroconv/pull/619)
* Added configuration metadata to `FicTracDataInterface`. [PR #618](https://github.com/catalystneuro/neuroconv/pull/618)
* Expose number of jobs to `automatic_dandi_upload`. [PR #624](https://github.com/catalystneuro/neuroconv/pull/624)

### Fixes
* Remove `starting_time` reset to default value (0.0) when adding the rate and updating the `photon_series_kwargs` or `roi_response_series_kwargs`, in `add_photon_series` or `add_fluorescence_traces`. [PR #595](https://github.com/catalystneuro/neuroconv/pull/595)
Expand Down
32 changes: 25 additions & 7 deletions src/neuroconv/tools/data_transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from shutil import rmtree
from tempfile import mkdtemp
from time import sleep, time
from typing import Dict, List, Tuple, Union
from typing import Dict, List, Optional, Tuple, Union
from warnings import warn

from dandi.download import download as dandi_download
Expand Down Expand Up @@ -273,6 +273,8 @@ def automatic_dandi_upload(
version: str = "draft",
staging: bool = False,
cleanup: bool = False,
number_of_jobs: Optional[int] = None,
number_of_threads: Optional[int] = None,
):
"""
Fully automated upload of NWBFiles to a DANDISet.
Expand Down Expand Up @@ -304,22 +306,33 @@ def automatic_dandi_upload(
cleanup : bool, default: False
Whether to remove the dandiset folder path and nwb_folder_path.
Defaults to False.
number_of_jobs : int, optional
The number of jobs to use in the DANDI upload process.
number_of_threads : int, optional
The number of threads to use in the DANDI upload process.
"""
dandiset_folder_path = (
Path(mkdtemp(dir=nwb_folder_path.parent)) if dandiset_folder_path is None else dandiset_folder_path
)
dandiset_path = dandiset_folder_path / dandiset_id
assert os.getenv("DANDI_API_KEY"), (
"Unable to find environment variable 'DANDI_API_KEY'. "
"Please retrieve your token from DANDI and set this environment variable."
)

dandiset_folder_path = (
Path(mkdtemp(dir=nwb_folder_path.parent)) if dandiset_folder_path is None else dandiset_folder_path
)
dandiset_path = dandiset_folder_path / dandiset_id
# Odd big of logic upstream: https://github.com/dandi/dandi-cli/blob/master/dandi/cli/cmd_upload.py#L92-L96
if number_of_threads is not None and number_of_threads > 1 and jobs is None:
jobs = -1

url_base = "https://gui-staging.dandiarchive.org" if staging else "https://dandiarchive.org"
dandiset_url = f"{url_base}/dandiset/{dandiset_id}/{version}"
dandi_download(urls=dandiset_url, output_dir=str(dandiset_folder_path), get_metadata=True, get_assets=False)
assert dandiset_path.exists(), "DANDI download failed!"

dandi_organize(paths=str(nwb_folder_path), dandiset_path=str(dandiset_path))
# TODO: need PR on DANDI to expose number of jobs
dandi_organize(
paths=str(nwb_folder_path), dandiset_path=str(dandiset_path), devel_debug=True if number_of_jobs == 1 else False
)
organized_nwbfiles = dandiset_path.rglob("*.nwb")

# DANDI has yet to implement forcing of session_id inclusion in organize step
Expand All @@ -340,7 +353,12 @@ def automatic_dandi_upload(
assert len(list(dandiset_path.iterdir())) > 1, "DANDI organize failed!"

dandi_instance = "dandi-staging" if staging else "dandi" # Test
dandi_upload(paths=[str(x) for x in organized_nwbfiles], dandi_instance=dandi_instance)
dandi_upload(
paths=[str(x) for x in organized_nwbfiles],
dandi_instance=dandi_instance,
jobs=number_of_jobs,
jobs_per_file=number_of_threads,
)

# Cleanup should be confirmed manually; Windows especially can complain
if cleanup:
Expand Down
58 changes: 58 additions & 0 deletions tests/test_minimal/test_tools/dandi_transfer_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,61 @@ def tearDown(self):

def test_automatic_dandi_upload(self):
automatic_dandi_upload(dandiset_id="200560", nwb_folder_path=self.nwb_folder_path, staging=True)


@pytest.mark.skipif(
not HAVE_DANDI_KEY,
reason="You must set your DANDI_API_KEY to run this test!",
)
class TestAutomaticDANDIUploadNonParallel(TestCase):
def setUp(self):
self.tmpdir = Path(mkdtemp())
self.nwb_folder_path = self.tmpdir / "test_nwb"
self.nwb_folder_path.mkdir()
metadata = get_default_nwbfile_metadata()
metadata["NWBFile"].update(
session_start_time=datetime.now().astimezone(),
session_id=f"test-automatic-upload-{sys.platform}-{get_python_version().replace('.', '-')}-non-parallel",
)
metadata.update(Subject=dict(subject_id="foo", species="Mus musculus", age="P1D", sex="U"))
with NWBHDF5IO(path=self.nwb_folder_path / "test_nwb_2.nwb", mode="w") as io:
io.write(make_nwbfile_from_metadata(metadata=metadata))

def tearDown(self):
rmtree(self.tmpdir)

def test_automatic_dandi_upload_non_parallel(self):
automatic_dandi_upload(
dandiset_id="200560", nwb_folder_path=self.nwb_folder_path, staging=True, number_of_jobs=1
)


@pytest.mark.skipif(
not HAVE_DANDI_KEY,
reason="You must set your DANDI_API_KEY to run this test!",
)
class TestAutomaticDANDIUploadNonParallelNonThreaded(TestCase):
def setUp(self):
self.tmpdir = Path(mkdtemp())
self.nwb_folder_path = self.tmpdir / "test_nwb"
self.nwb_folder_path.mkdir()
metadata = get_default_nwbfile_metadata()
metadata["NWBFile"].update(
session_start_time=datetime.now().astimezone(),
session_id=f"test-automatic-upload-{sys.platform}-{get_python_version().replace('.', '-')}-non-parallel-non-threaded",
)
metadata.update(Subject=dict(subject_id="foo", species="Mus musculus", age="P1D", sex="U"))
with NWBHDF5IO(path=self.nwb_folder_path / "test_nwb_3.nwb", mode="w") as io:
io.write(make_nwbfile_from_metadata(metadata=metadata))

def tearDown(self):
rmtree(self.tmpdir)

def test_automatic_dandi_upload_non_parallel_non_threaded(self):
automatic_dandi_upload(
dandiset_id="200560",
nwb_folder_path=self.nwb_folder_path,
staging=True,
number_of_jobs=1,
number_of_threads=1,
)

0 comments on commit bad291f

Please sign in to comment.