Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New de-shuffle test (was "imagecodecs and hdf5plugin packages") #90

Merged
merged 9 commits into from
Aug 27, 2021
17 changes: 8 additions & 9 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ RUN conda update conda -y && \
conda install conda-pack && \
conda create --name hsds --yes python=3.8
RUN conda install --name hsds --yes \
pip \
wheel \
curl \
git \
compilers \
psutil \
numpy \
pytz \
Expand All @@ -20,15 +20,14 @@ RUN conda install --name hsds --yes \
aiohttp-cors \
pyjwt \
pyyaml \
imagecodecs \
hdf5plugin \
numcodecs \
pip \
simplejson \
wheel
# Install numcodecs from the specific commit since we need the brand new shuffle codec...
RUN DISABLE_NUMCODECS_AVX2=1 CFLAGS=-DHAVE_UNISTD_H \
conda run -n hsds --no-capture-output pip install --no-cache-dir \
git+https://github.com/zarr-developers/numcodecs.git@d16d1eac5198166a24726ffe808e3dcfcab9700d#egg=numcodecs \
&& conda remove --name hsds --yes git compilers \
&& conda run -n hsds --no-capture-output pip install --no-cache-dir kubernetes
wheel \
&& \
conda run -n hsds --no-capture-output pip install --no-cache-dir kubernetes
RUN conda-pack -n hsds -o /tmp/hsds-env.tar \
&& mkdir -p /opt/env/hsds \
&& cd /opt/env/hsds \
Expand Down
2 changes: 1 addition & 1 deletion hsds/datanode_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ def callback(future):
if obj_id not in pending_s3_write_tasks:
msg = f"s3sync - no pending task for {obj_id} in "
msg += "pending_s3_write_tasks"
log.info()
log.info(msg)
create_task = False

elif time_since_dirty < 1.0:
Expand Down
54 changes: 29 additions & 25 deletions hsds/util/dsetUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,14 @@ def getShuffleFilter(dset_json):
""" Return shuffle filter, or None """
filters = getFilters(dset_json)
for filter in filters:
if 'class' not in filter:
try:
if filter["class"] == "H5Z_FILTER_SHUFFLE":
log.debug(f"Shuffle filter is used: {filter}")
return filter
except KeyError:
log.warn(f"filter option: {filter} with no class key")
continue
filter_class = filter["class"]
if filter_class == 'H5Z_FILTER_SHUFFLE':
return filter
log.debug("Shuffle filter not used")
return None


Expand All @@ -137,38 +139,40 @@ def getFilterOps(app, dset_json, item_size):
compressionFilter = getCompressionFilter(dset_json)
log.debug(f"got compressionFilter: {compressionFilter}")

if not compressionFilter:
return None

filter_ops = {}
shuffleFilter = getShuffleFilter(dset_json)

if compressionFilter["class"] == 'H5Z_FILTER_DEFLATE':
filter_ops["compressor"] = 'zlib' # blosc compressor
if shuffleFilter:
filter_ops["use_shuffle"] = True
shuffleFilter = getShuffleFilter(dset_json)
if shuffleFilter:
filter_ops["use_shuffle"] = True

if compressionFilter:
if compressionFilter["class"] == 'H5Z_FILTER_DEFLATE':
filter_ops["compressor"] = 'zlib' # blosc compressor
if shuffleFilter:
filter_ops["use_shuffle"] = True
else:
# for HDF5-style compression, use shuffle only if it turned on
filter_ops['use_shuffle'] = False
else:
# for HDF5-style compression, use shuffle only if it turned on
filter_ops['use_shuffle'] = False
else:
if "name" in compressionFilter and \
compressionFilter["name"] in blosc.list_compressors():
filter_ops["compressor"] = compressionFilter["name"]
if "name" in compressionFilter and \
compressionFilter["name"] in blosc.list_compressors():
filter_ops["compressor"] = compressionFilter["name"]
else:
filter_ops["compressor"] = 'lz4' # default to lz4
if "level" not in compressionFilter:
filter_ops['level'] = 5 # medium level
else:
filter_ops["compressor"] = 'lz4' # default to lz4
if "level" not in compressionFilter:
filter_ops['level'] = 5 # medium level
else:
filter_ops['level'] = int(compressionFilter["level"])
filter_ops['level'] = int(compressionFilter["level"])

if filter_ops:
filter_ops['item_size'] = item_size
if item_size == 'H5T_VARIABLE':
filter_ops['use_shuffle'] = False
log.debug(f"save filter ops: {filter_ops} for {dset_id}")
filter_map[dset_id] = filter_ops # save

return filter_ops
return filter_ops
else:
return None


def getHyperslabSelection(dsetshape, start=None, stop=None, step=None):
Expand Down
9 changes: 6 additions & 3 deletions testall.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
PYTHON_CMD = "python3"

unit_tests = ('arrayUtilTest', 'chunkUtilTest', 'domainUtilTest',
'dsetUtilTest', 'hdf5dtypeTest', 'idUtilTest', 'lruCacheTest', 'shuffleTest')
'dsetUtilTest', 'hdf5dtypeTest', 'idUtilTest', 'lruCacheTest',
'shuffleTest')

integ_tests = ('uptest', 'setup_test', 'domain_test', 'group_test', 'link_test',
'attr_test', 'datatype_test', 'dataset_test', 'acl_test', 'value_test', 'pointsel_test', 'query_test', 'vlen_test')
integ_tests = ('uptest', 'setup_test', 'domain_test', 'group_test',
'link_test', 'attr_test', 'datatype_test', 'dataset_test',
'acl_test', 'value_test', 'pointsel_test', 'query_test',
'vlen_test')

skip_unit = False
if len(sys.argv) > 1:
Expand Down
1 change: 1 addition & 0 deletions tests/integ/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
'max_chunks_per_folder': 0 # match with this config setting on server
}


def get(x):
# see if there are an environment variable override
if x.upper() in os.environ:
Expand Down
96 changes: 42 additions & 54 deletions tests/integ/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,23 @@
import base64
try:
import pytz
USE_UTC=True
USE_UTC = True
except ModuleNotFoundError:
USE_UTC=False
USE_UTC = False

import config
"""
Helper function - get endpoint we'll send http requests to
"""


def getEndpoint():
"""Get endpoint we'll send HTTP requests to"""
endpoint = config.get("hsds_endpoint")
return endpoint

"""
Helper function - get session object
"""

def getSession():
"""Get session object"""
endpoint = getEndpoint()

if endpoint.endswith(".sock"):
# use requests_unixsocket to get a socket session
# Expect endpoint in the form:
Expand All @@ -47,19 +46,14 @@ def getSession():
return session


"""
Helper function - get endpoint we'll send http requests to
"""
def getRangeGetEndpoint():

"""Get endpoint we'll send HTTP range GET requests to"""
endpoint = config.get("rangeget_endpoint")
return endpoint


"""
Helper function - return true if the parameter looks like a UUID
"""
def validateId(id):
"""Return true if the parameter looks like a UUID"""
if type(id) != str:
# should be a string
return False
Expand All @@ -68,21 +62,19 @@ def validateId(id):
return False
return True

"""
Helper - return number of active sn/dn nodes
"""

def getActiveNodeCount(session=None):
"""Return number of active sn/dn nodes"""
req = getEndpoint("head") + "/info"
rsp = session.get(req)
rsp_json = json.loads(rsp.text)
sn_count = rsp_json["active_sn_count"]
dn_count = rsp_json["active_dn_count"]
return sn_count, dn_count

"""
Helper - get base domain to use for test_cases
"""

def getTestDomainName(name):
"""Get base domain to use for test_cases"""
now = time.time()
if USE_UTC:
dt = datetime.fromtimestamp(now, pytz.utc)
Expand All @@ -95,23 +87,23 @@ def getTestDomainName(name):
domain += '/'
domain += name.lower()
domain += '/'
domain += "{:04d}{:02d}{:02d}T{:02d}{:02d}{:02d}_{:06d}Z".format(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, dt.microsecond)
domain += "{:04d}{:02d}{:02d}T{:02d}{:02d}{:02d}_{:06d}Z".format(
dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, dt.microsecond)
return domain

"""
Helper - get default request headers for domain
"""

def getRequestHeaders(domain=None, username=None, bucket=None, password=None, **kwargs):
"""Get default request headers for domain"""
if username is None:
username = config.get("user_name")
if password is None:
password = config.get("user_password")
elif username == config.get("user2_name"):
if password is None:
password = config.get("user2_password")
headers = { }
headers = dict()
if domain is not None:
#if config.get("bucket_name"):
# if config.get("bucket_name"):
# domain = config.get("bucket_name") + domain
headers['X-Hdf-domain'] = domain.encode('utf-8')
if username and password:
Expand All @@ -132,19 +124,17 @@ def getRequestHeaders(domain=None, username=None, bucket=None, password=None, **
headers[k] = kwargs[k]
return headers

"""
Helper - Get parent domain of given domain.
"""

def getParentDomain(domain):
"""Get parent domain of given domain."""
parent = op.dirname(domain)
if not parent:
raise ValueError("Invalid domain") # can't end with dot
raise ValueError("Invalid domain") # can't end with dot
return parent

"""
Helper - Get DNS-style domain name given a filepath domain
"""

def getDNSDomain(domain):
"""Get DNS-style domain name given a filepath domain"""
names = domain.split('/')
names.reverse()
dns_domain = ''
Expand All @@ -155,10 +145,9 @@ def getDNSDomain(domain):
dns_domain = dns_domain[:-1] # str trailing dot
return dns_domain

"""
Helper - Create domain (and parent domin if needed)
"""

def setupDomain(domain, folder=False):
"""Create domain (and parent domain if needed)"""
endpoint = config.get("hsds_endpoint")
headers = getRequestHeaders(domain=domain)
req = endpoint + "/"
Expand All @@ -176,7 +165,7 @@ def setupDomain(domain, folder=False):
setupDomain(parent_domain, folder=True)

headers = getRequestHeaders(domain=domain)
body=None
body = None
if folder:
body = {"folder": True}
rsp = session.put(req, data=json.dumps(body), headers=headers)
Expand All @@ -185,35 +174,31 @@ def setupDomain(domain, folder=False):
if rsp.status_code != 201:
raise ValueError(f"Unexpected put domain error: {rsp.status_code}")

"""
Helper function - get root uuid for domain
"""

def getRootUUID(domain, username=None, password=None, session=None):
"""Get root uuid for domain"""
req = getEndpoint() + "/"
headers = getRequestHeaders(domain=domain, username=username, password=password)

rsp = session.get(req, headers=headers)
root_uuid= None
root_uuid = None
if rsp.status_code == 200:
rspJson = json.loads(rsp.text)
root_uuid = rspJson["root"]
return root_uuid


"""
Helper function - get a domain for one of the test files
"""
def getTestDomain(name):
"""Get a domain for one of the test files"""
username = config.get("user_name")
path = f'/home/{username}/test/{name}'
return path

"""
Helper function - get uuid for a given path
"""

def getUUIDByPath(domain, path, username=None, password=None, session=None):
"""Get uuid for a given path"""
if path[0] != '/':
raise KeyError("only abs paths") # only abs paths
raise KeyError("only abs paths") # only abs paths

parent_uuid = getRootUUID(domain, username=username, password=password, session=session)

Expand All @@ -223,7 +208,11 @@ def getUUIDByPath(domain, path, username=None, password=None, session=None):
headers = getRequestHeaders(domain=domain)

# make a fake tgt_json to represent 'link' to root group
tgt_json = {'collection': "groups", 'class': "H5L_TYPE_HARD", 'id': parent_uuid }
tgt_json = {
'collection': "groups",
'class': "H5L_TYPE_HARD",
'id': parent_uuid
}
tgt_uuid = None

names = path.split('/')
Expand Down Expand Up @@ -251,10 +240,9 @@ def getUUIDByPath(domain, path, username=None, password=None, session=None):
raise KeyError("non-hard link")
return tgt_uuid

"""
Helper function = get HDF5 JSON dump for chunbk locations
"""

def getHDF5JSON(filename):
"""Get HDF5 JSON dump for chunk locations"""
if not op.isfile(filename):
return None
hdf5_json = None
Expand Down
Loading