Skip to content

Commit

Permalink
Merge pull request #90 from HDFGroup/new-codecs
Browse files Browse the repository at this point in the history
New de-shuffle test (was "imagecodecs and hdf5plugin packages")
  • Loading branch information
jreadey authored Aug 27, 2021
2 parents 26c5ce6 + 205f5e6 commit f177253
Show file tree
Hide file tree
Showing 10 changed files with 652 additions and 380 deletions.
17 changes: 8 additions & 9 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ RUN conda update conda -y && \
conda install conda-pack && \
conda create --name hsds --yes python=3.8
RUN conda install --name hsds --yes \
pip \
wheel \
curl \
git \
compilers \
psutil \
numpy \
pytz \
Expand All @@ -20,15 +20,14 @@ RUN conda install --name hsds --yes \
aiohttp-cors \
pyjwt \
pyyaml \
imagecodecs \
hdf5plugin \
numcodecs \
pip \
simplejson \
wheel
# Install numcodecs from the specific commit since we need the brand new shuffle codec...
RUN DISABLE_NUMCODECS_AVX2=1 CFLAGS=-DHAVE_UNISTD_H \
conda run -n hsds --no-capture-output pip install --no-cache-dir \
git+https://github.com/zarr-developers/numcodecs.git@d16d1eac5198166a24726ffe808e3dcfcab9700d#egg=numcodecs \
&& conda remove --name hsds --yes git compilers \
&& conda run -n hsds --no-capture-output pip install --no-cache-dir kubernetes
wheel \
&& \
conda run -n hsds --no-capture-output pip install --no-cache-dir kubernetes
RUN conda-pack -n hsds -o /tmp/hsds-env.tar \
&& mkdir -p /opt/env/hsds \
&& cd /opt/env/hsds \
Expand Down
2 changes: 1 addition & 1 deletion hsds/datanode_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ def callback(future):
if obj_id not in pending_s3_write_tasks:
msg = f"s3sync - no pending task for {obj_id} in "
msg += "pending_s3_write_tasks"
log.info()
log.info(msg)
create_task = False

elif time_since_dirty < 1.0:
Expand Down
54 changes: 29 additions & 25 deletions hsds/util/dsetUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,14 @@ def getShuffleFilter(dset_json):
""" Return shuffle filter, or None """
filters = getFilters(dset_json)
for filter in filters:
if 'class' not in filter:
try:
if filter["class"] == "H5Z_FILTER_SHUFFLE":
log.debug(f"Shuffle filter is used: {filter}")
return filter
except KeyError:
log.warn(f"filter option: {filter} with no class key")
continue
filter_class = filter["class"]
if filter_class == 'H5Z_FILTER_SHUFFLE':
return filter
log.debug("Shuffle filter not used")
return None


Expand All @@ -137,38 +139,40 @@ def getFilterOps(app, dset_json, item_size):
compressionFilter = getCompressionFilter(dset_json)
log.debug(f"got compressionFilter: {compressionFilter}")

if not compressionFilter:
return None

filter_ops = {}
shuffleFilter = getShuffleFilter(dset_json)

if compressionFilter["class"] == 'H5Z_FILTER_DEFLATE':
filter_ops["compressor"] = 'zlib' # blosc compressor
if shuffleFilter:
filter_ops["use_shuffle"] = True
shuffleFilter = getShuffleFilter(dset_json)
if shuffleFilter:
filter_ops["use_shuffle"] = True

if compressionFilter:
if compressionFilter["class"] == 'H5Z_FILTER_DEFLATE':
filter_ops["compressor"] = 'zlib' # blosc compressor
if shuffleFilter:
filter_ops["use_shuffle"] = True
else:
# for HDF5-style compression, use shuffle only if it turned on
filter_ops['use_shuffle'] = False
else:
# for HDF5-style compression, use shuffle only if it turned on
filter_ops['use_shuffle'] = False
else:
if "name" in compressionFilter and \
compressionFilter["name"] in blosc.list_compressors():
filter_ops["compressor"] = compressionFilter["name"]
if "name" in compressionFilter and \
compressionFilter["name"] in blosc.list_compressors():
filter_ops["compressor"] = compressionFilter["name"]
else:
filter_ops["compressor"] = 'lz4' # default to lz4
if "level" not in compressionFilter:
filter_ops['level'] = 5 # medium level
else:
filter_ops["compressor"] = 'lz4' # default to lz4
if "level" not in compressionFilter:
filter_ops['level'] = 5 # medium level
else:
filter_ops['level'] = int(compressionFilter["level"])
filter_ops['level'] = int(compressionFilter["level"])

if filter_ops:
filter_ops['item_size'] = item_size
if item_size == 'H5T_VARIABLE':
filter_ops['use_shuffle'] = False
log.debug(f"save filter ops: {filter_ops} for {dset_id}")
filter_map[dset_id] = filter_ops # save

return filter_ops
return filter_ops
else:
return None


def getHyperslabSelection(dsetshape, start=None, stop=None, step=None):
Expand Down
9 changes: 6 additions & 3 deletions testall.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
PYTHON_CMD = "python3"

unit_tests = ('arrayUtilTest', 'chunkUtilTest', 'domainUtilTest',
'dsetUtilTest', 'hdf5dtypeTest', 'idUtilTest', 'lruCacheTest', 'shuffleTest')
'dsetUtilTest', 'hdf5dtypeTest', 'idUtilTest', 'lruCacheTest',
'shuffleTest')

integ_tests = ('uptest', 'setup_test', 'domain_test', 'group_test', 'link_test',
'attr_test', 'datatype_test', 'dataset_test', 'acl_test', 'value_test', 'pointsel_test', 'query_test', 'vlen_test')
integ_tests = ('uptest', 'setup_test', 'domain_test', 'group_test',
'link_test', 'attr_test', 'datatype_test', 'dataset_test',
'acl_test', 'value_test', 'pointsel_test', 'query_test',
'vlen_test')

skip_unit = False
if len(sys.argv) > 1:
Expand Down
1 change: 1 addition & 0 deletions tests/integ/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
'max_chunks_per_folder': 0 # match with this config setting on server
}


def get(x):
# see if there are an environment variable override
if x.upper() in os.environ:
Expand Down
96 changes: 42 additions & 54 deletions tests/integ/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,23 @@
import base64
try:
import pytz
USE_UTC=True
USE_UTC = True
except ModuleNotFoundError:
USE_UTC=False
USE_UTC = False

import config
"""
Helper function - get endpoint we'll send http requests to
"""


def getEndpoint():
"""Get endpoint we'll send HTTP requests to"""
endpoint = config.get("hsds_endpoint")
return endpoint

"""
Helper function - get session object
"""

def getSession():
"""Get session object"""
endpoint = getEndpoint()

if endpoint.endswith(".sock"):
# use requests_unixsocket to get a socket session
# Expect endpoint in the form:
Expand All @@ -47,19 +46,14 @@ def getSession():
return session


"""
Helper function - get endpoint we'll send http requests to
"""
def getRangeGetEndpoint():

"""Get endpoint we'll send HTTP range GET requests to"""
endpoint = config.get("rangeget_endpoint")
return endpoint


"""
Helper function - return true if the parameter looks like a UUID
"""
def validateId(id):
"""Return true if the parameter looks like a UUID"""
if type(id) != str:
# should be a string
return False
Expand All @@ -68,21 +62,19 @@ def validateId(id):
return False
return True

"""
Helper - return number of active sn/dn nodes
"""

def getActiveNodeCount(session=None):
"""Return number of active sn/dn nodes"""
req = getEndpoint("head") + "/info"
rsp = session.get(req)
rsp_json = json.loads(rsp.text)
sn_count = rsp_json["active_sn_count"]
dn_count = rsp_json["active_dn_count"]
return sn_count, dn_count

"""
Helper - get base domain to use for test_cases
"""

def getTestDomainName(name):
"""Get base domain to use for test_cases"""
now = time.time()
if USE_UTC:
dt = datetime.fromtimestamp(now, pytz.utc)
Expand All @@ -95,23 +87,23 @@ def getTestDomainName(name):
domain += '/'
domain += name.lower()
domain += '/'
domain += "{:04d}{:02d}{:02d}T{:02d}{:02d}{:02d}_{:06d}Z".format(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, dt.microsecond)
domain += "{:04d}{:02d}{:02d}T{:02d}{:02d}{:02d}_{:06d}Z".format(
dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, dt.microsecond)
return domain

"""
Helper - get default request headers for domain
"""

def getRequestHeaders(domain=None, username=None, bucket=None, password=None, **kwargs):
"""Get default request headers for domain"""
if username is None:
username = config.get("user_name")
if password is None:
password = config.get("user_password")
elif username == config.get("user2_name"):
if password is None:
password = config.get("user2_password")
headers = { }
headers = dict()
if domain is not None:
#if config.get("bucket_name"):
# if config.get("bucket_name"):
# domain = config.get("bucket_name") + domain
headers['X-Hdf-domain'] = domain.encode('utf-8')
if username and password:
Expand All @@ -132,19 +124,17 @@ def getRequestHeaders(domain=None, username=None, bucket=None, password=None, **
headers[k] = kwargs[k]
return headers

"""
Helper - Get parent domain of given domain.
"""

def getParentDomain(domain):
"""Get parent domain of given domain."""
parent = op.dirname(domain)
if not parent:
raise ValueError("Invalid domain") # can't end with dot
raise ValueError("Invalid domain") # can't end with dot
return parent

"""
Helper - Get DNS-style domain name given a filepath domain
"""

def getDNSDomain(domain):
"""Get DNS-style domain name given a filepath domain"""
names = domain.split('/')
names.reverse()
dns_domain = ''
Expand All @@ -155,10 +145,9 @@ def getDNSDomain(domain):
dns_domain = dns_domain[:-1] # str trailing dot
return dns_domain

"""
Helper - Create domain (and parent domin if needed)
"""

def setupDomain(domain, folder=False):
"""Create domain (and parent domain if needed)"""
endpoint = config.get("hsds_endpoint")
headers = getRequestHeaders(domain=domain)
req = endpoint + "/"
Expand All @@ -176,7 +165,7 @@ def setupDomain(domain, folder=False):
setupDomain(parent_domain, folder=True)

headers = getRequestHeaders(domain=domain)
body=None
body = None
if folder:
body = {"folder": True}
rsp = session.put(req, data=json.dumps(body), headers=headers)
Expand All @@ -185,35 +174,31 @@ def setupDomain(domain, folder=False):
if rsp.status_code != 201:
raise ValueError(f"Unexpected put domain error: {rsp.status_code}")

"""
Helper function - get root uuid for domain
"""

def getRootUUID(domain, username=None, password=None, session=None):
"""Get root uuid for domain"""
req = getEndpoint() + "/"
headers = getRequestHeaders(domain=domain, username=username, password=password)

rsp = session.get(req, headers=headers)
root_uuid= None
root_uuid = None
if rsp.status_code == 200:
rspJson = json.loads(rsp.text)
root_uuid = rspJson["root"]
return root_uuid


"""
Helper function - get a domain for one of the test files
"""
def getTestDomain(name):
"""Get a domain for one of the test files"""
username = config.get("user_name")
path = f'/home/{username}/test/{name}'
return path

"""
Helper function - get uuid for a given path
"""

def getUUIDByPath(domain, path, username=None, password=None, session=None):
"""Get uuid for a given path"""
if path[0] != '/':
raise KeyError("only abs paths") # only abs paths
raise KeyError("only abs paths") # only abs paths

parent_uuid = getRootUUID(domain, username=username, password=password, session=session)

Expand All @@ -223,7 +208,11 @@ def getUUIDByPath(domain, path, username=None, password=None, session=None):
headers = getRequestHeaders(domain=domain)

# make a fake tgt_json to represent 'link' to root group
tgt_json = {'collection': "groups", 'class': "H5L_TYPE_HARD", 'id': parent_uuid }
tgt_json = {
'collection': "groups",
'class': "H5L_TYPE_HARD",
'id': parent_uuid
}
tgt_uuid = None

names = path.split('/')
Expand Down Expand Up @@ -251,10 +240,9 @@ def getUUIDByPath(domain, path, username=None, password=None, session=None):
raise KeyError("non-hard link")
return tgt_uuid

"""
Helper function = get HDF5 JSON dump for chunbk locations
"""

def getHDF5JSON(filename):
"""Get HDF5 JSON dump for chunk locations"""
if not op.isfile(filename):
return None
hdf5_json = None
Expand Down
Loading

0 comments on commit f177253

Please sign in to comment.