From d6910652358200c40bb1b9e92088dfd58b0e07e2 Mon Sep 17 00:00:00 2001 From: Danny Price Date: Thu, 15 Dec 2022 12:25:20 +0800 Subject: [PATCH 01/11] PSRDADA block updates --- python/bifrost/blocks/dada_file.py | 205 +++++++++++++++++++++++++++ python/bifrost/blocks/psrdada.py | 217 +++++++++++++++++++++++++++-- python/bifrost/psrdada.py | 38 +++-- 3 files changed, 439 insertions(+), 21 deletions(-) create mode 100755 python/bifrost/blocks/dada_file.py diff --git a/python/bifrost/blocks/dada_file.py b/python/bifrost/blocks/dada_file.py new file mode 100755 index 000000000..9841a2cd2 --- /dev/null +++ b/python/bifrost/blocks/dada_file.py @@ -0,0 +1,205 @@ +# Copyright (c) 2016, The Bifrost Authors. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of The Bifrost Authors nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +""" +# binary_file_io.py + +Basic file I/O blocks for reading and writing data. +""" +import numpy as np +import time +import bifrost as bf +import bifrost.pipeline as bfp +from bifrost.dtype import string2numpy +from astropy.time import Time +import glob +import os + +def _angle_str_to_sigproc(ang): + aparts = ang.split(':') + if len(aparts) == 2: + sp_ang = int(aparts[0])*10000 + int(aparts[1])*100 + 0.0 + elif len(aparts) == 3: + sp_ang = int(aparts[0])*10000 + int(aparts[1])*100 + float(aparts[2]) + else: + raise RuntimeError("Cannot parse: {ang} does not match XX:YY:ZZ.zz".format(ang=ang)) + +class DadaFileRead(object): + """ Simple file-like reading object for pipeline testing + + Args: + filename (str): Name of file to open + dtype (np.dtype or str): datatype of data, e.g. float32. This should be a *numpy* dtype, + not a bifrost.ndarray dtype (eg. float32, not f32) + gulp_size (int): How much data to read per gulp, (i.e. sub-array size) + """ + def __init__(self, filename, header_callback): + super(DadaFileRead, self).__init__() + if isinstance(filename, str): + self.file_obj = open(filename, 'rb') + self.nfiles = 1 + else: + self.file_obj = open(filename[0], 'rb') + self.nfiles = len(filename) + + self.filenames = filename + self.fcount = 0 + + print("%i/%i: opening %s" % (self.fcount+1, self.nfiles, self.file_obj.name)) + self._header_callback = header_callback + self.header = self._read_header(header_callback) + itensor = self.header['_tensor'] + self.dtype = np.dtype(string2numpy(itensor['dtype'])) + self.block_shape = np.copy(itensor['shape']) + self.block_shape[0] = 1 + self.block_size = np.prod(self.block_shape) + self.gulp_size = self.block_size * self.dtype.itemsize + + def _read_header(self, header_callback): + """ Read DADA header, and apply header_callback + + Applies header_callback to convert DADA header to bifrost header. Specifically, + need to generate the '_tensor' from the DADA keywords which have no formal spec. + """ + hdr_buf = self.file_obj.read(4096).decode('ascii') + hdr = {} + for line in hdr_buf.split('\n'): + try: + key, val = line.split() + hdr[key] = val + except ValueError: + pass + hdr = header_callback(hdr) + return hdr + + def _open_next_file(self): + self.file_obj.close() + self.fcount += 1 + print("%i/%i: opening %s" % (self.fcount+1, self.nfiles, self.filenames[self.fcount])) + self.file_obj = open(self.filenames[self.fcount], 'rb') + _hdr = self._read_header(self._header_callback) + + def _read_data(self): + d = np.fromfile(self.file_obj, dtype=self.dtype, count=self.block_size) + return d + + def read_block(self): + #print("Reading...") + d = self._read_data() + if d.size == 0 and self.fcount < self.nfiles - 1: + self._open_next_file() + d = self._read_data() + d = d.reshape(self.block_shape) + return d + elif d.size == 0 and self.fcount == self.nfiles - 1: + #print("EOF") + return np.array([0]) # EOF + elif self.block_size != np.prod(d.shape): + print(self.block_size, np.prod(d.shape), d.shape, d.size) + print("Warning: File truncated or gulp size does not divide n_blocks") + try: + bs_truncated = list(self.block_shape) + bs_truncated[0] = -1 # Attempt to load truncated data anyway + d = d.reshape(bs_truncated) + return d + except ValueError: + return np.array([0]) # EOF + else: + d = d.reshape(self.block_shape) + return d + + def read(self): + gulp_break = False + d = self.read_block() + return d + + + def __enter__(self): + return self + + def close(self): + pass + + def __exit__(self, type, value, tb): + self.close() + + +def generate_dada_filelist(filename): + """ Generate a list of DADA files from start filename + + Args: + filename (str): Path to file. e.g. + /data/dprice/2020-07-23-02:33:07.587_0000000000000000.000000.dada + + Returns: + flist (list): A list of all associated files + """ + bn = os.path.basename(filename) + dn = os.path.dirname(filename) + bn_root = '_'.join(bn.split('_')[:-1]) # Strips off _000.000.dada bit + flist = sorted(glob.glob(os.path.join(dn, bn_root + '_*.dada'))) + return flist + + +class DadaFileReadBlock(bfp.SourceBlock): + def __init__(self, filename, header_callback, gulp_nframe, *args, **kwargs): + super(DadaFileReadBlock, self).__init__(filename, gulp_nframe, *args, **kwargs) + self.header_callback = header_callback + + def create_reader(self, filename): + flist = generate_dada_filelist(filename) + return DadaFileRead(flist, self.header_callback) + + def on_sequence(self, ireader, filename): + ohdr = ireader.header + return [ohdr] + + def on_data(self, reader, ospans): + indata = reader.read() + odata = ospans[0].data + #print indata.shape, odata.shape + if np.prod(indata.shape) == np.prod(odata.shape[1:]): + ospans[0].data[0] = indata + return [1] + else: + # EOF or truncated block + return [0] + + +def read_dada_file(filename, header_callback, gulp_nframe, *args, **kwargs): + """ Block for reading binary data from file and streaming it into a bifrost pipeline + + Args: + filenames (list): A list of filenames to open + header_callback (method): A function that converts from PSRDADA header into a + bifrost header. + gulp_size (int): Number of elements in a gulp (i.e. sub-array size) + gulp_nframe (int): Number of frames in a gulp. (Ask Ben / Miles for good explanation) + dtype (bifrost dtype string): dtype, e.g. f32, cf32 + """ + return DadaFileReadBlock(filename, header_callback, gulp_nframe, *args, **kwargs) + diff --git a/python/bifrost/blocks/psrdada.py b/python/bifrost/blocks/psrdada.py index ca159d29e..8a2f9fde0 100644 --- a/python/bifrost/blocks/psrdada.py +++ b/python/bifrost/blocks/psrdada.py @@ -27,14 +27,16 @@ from __future__ import absolute_import -from bifrost.pipeline import SourceBlock +from bifrost.pipeline import SourceBlock, SinkBlock from bifrost.Space import Space from bifrost.psrdada import Hdu from bifrost.libbifrost import _bf, _check import bifrost.ndarray -from bifrost import telemetry -telemetry.track_module() +import numpy as np +from datetime import datetime +from copy import deepcopy +import os # TODO: Move to memory.py? def _get_space(arr): @@ -47,16 +49,19 @@ class PsrDadaBufferReader(object): def __init__(self, hdu, hdr): self.hdu = hdu self.header = hdr - self._open_next_block() + self.block = None def _open_next_block(self): self.block = next(self.hdu.data_block) self.nbyte = self.block.size_bytes() self.byte0 = 0 def readinto(self, buf): + if self.block is None: + self._open_next_block() dst_space = Space(_get_space(buf)).as_BFspace() byte0 = 0 nbyte = buf.nbytes nbyte_copy = min(nbyte - byte0, self.nbyte - self.byte0) + while nbyte_copy: _check(_bf.bfMemcpy(buf.ctypes.data + byte0, dst_space, self.block.ptr + self.byte0, _bf.BF_SPACE_SYSTEM, @@ -72,19 +77,23 @@ def readinto(self, buf): break return byte0 def close(self): - self.block.close() + if self.block: + self.block.close() def __enter__(self): return self def __exit__(self, type, value, tb): self.close() -def psrdada_read_sequence_iterator(buffer_key): +def psrdada_read_sequence_iterator(buffer_key, single=False): hdu = Hdu() hdu.connect_read(buffer_key) for hdr in hdu.header_block: with hdr: hdr_string = hdr.data.tostring() yield hdu, hdr_string + if single: + hdu.disconnect() + break hdu.disconnect() def _cast_to_type(string): @@ -93,7 +102,15 @@ def _cast_to_type(string): try: return float(string) except ValueError: pass return string + +def _cast_to_string(unknown): + if type(unknown) is bytes: + return unknown.decode('utf-8') + elif type(unknown) is str: + return unknown + def parse_dada_header(headerstr, cast_types=True): + headerstr = _cast_to_string(headerstr) headerstr = headerstr[:headerstr.find('\0')] header = {} for line in headerstr.split('\n'): @@ -109,9 +126,9 @@ def parse_dada_header(headerstr, cast_types=True): return header class PsrDadaSourceBlock(SourceBlock): - def __init__(self, buffer_key, header_callback, gulp_nframe, space=None, - *args, **kwargs): - buffer_iterator = psrdada_read_sequence_iterator(buffer_key) + def __init__(self, buffer_key, header_callback, gulp_nframe, space=None, + single=False, *args, **kwargs): + buffer_iterator = psrdada_read_sequence_iterator(buffer_key, single) super(PsrDadaSourceBlock, self).__init__(buffer_iterator, gulp_nframe, space, *args, **kwargs) self.header_callback = header_callback @@ -119,6 +136,7 @@ def create_reader(self, hdu_hdr): hdu, hdr = hdu_hdr return PsrDadaBufferReader(hdu, hdr) def on_sequence(self, reader, hdu_hdr): + print("PsrDadaSourceBlock::on_sequence") ihdr_dict = parse_dada_header(reader.header) return [self.header_callback(ihdr_dict)] def on_data(self, reader, ospans): @@ -129,7 +147,174 @@ def on_data(self, reader, ospans): nframe = nbyte // ospan.frame_nbyte return [nframe] -def read_psrdada_buffer(buffer_key, header_callback, gulp_nframe, space=None, +def _keyval_to_dadastr(key, val): + """ Convert key: value pair into a DADA string """ + return "{key:20s} {val}\n".format(key=key.upper(), val=val) + + +def _extract_tensor_scale(key, tensor): + try: + idx = tensor['labels'].index(key) + return tensor['scales'][idx] + except ValueError as e: + return [0, 0] + + +def _extract_tensor_shape(key, tensor): + try: + idx = tensor['labels'].index(key) + return tensor['shape'][idx] + except ValueError as e: + return 1 + + +def generate_dada_header(hdr_dict, hdrlen=4096): + """ Generate DADA header from header dict + + Args: + hdr_dict (dict): Header dictionary of key, value pairs + hdrlen (int): Size of header, default 4096 + + Returns: + s (str): DADA header string with padding to hdrlen + """ + s = "HDR_VERSION 1.0\n" + s+= "HDR_SIZE %i\n" % hdrlen + keys_to_skip = ('HDR_VERSION', 'HDR_SIZE') + + # update parameters from bifrost tensor + if '_tensor' in hdr_dict.keys(): + print(hdr_dict['_tensor']) + dtype = hdr_dict['_tensor']['dtype'] + dtype_vals = { + 'cf32': { 'NBIT': '32', 'NDIM': '2' }, + 'f32': { 'NBIT': '32', 'NDIM': '1' }, + 'ci8': { 'NBIT': '8', 'NDIM': '2' }, + 'i8': { 'NBIT': '8', 'NDIM': '1' } } + if dtype in dtype_vals.keys(): + hdr_dict['NBIT'] = dtype_vals[dtype]['NBIT'] + hdr_dict['NDIM'] = dtype_vals[dtype]['NDIM'] + + hdr_dict['NBEAM'] = _extract_tensor_shape("beam", hdr_dict['_tensor']) + hdr_dict['NANT'] = _extract_tensor_shape("station", hdr_dict['_tensor']) + hdr_dict['NPOL'] = _extract_tensor_shape("pol", hdr_dict['_tensor']) + hdr_dict['NCHAN'] = _extract_tensor_shape("freq", hdr_dict['_tensor']) + + f0 = _extract_tensor_scale("freq", hdr_dict['_tensor'])[0] + chan_bw = _extract_tensor_scale("freq", hdr_dict['_tensor'])[1] + bw = chan_bw * int(hdr_dict['NCHAN']) + hdr_dict['BW'] = bw + hdr_dict['FREQ'] = f0 + bw / 2 + + # print(hdr_dict['_tensor']) + ts = _extract_tensor_scale("time", hdr_dict['_tensor'])[0] + + tsamp = _extract_tensor_scale("fine_time", hdr_dict['_tensor'])[1] + if tsamp == 0: + tsamp = _extract_tensor_scale("time", hdr_dict['_tensor'])[1] + if tsamp == 0: + print("TSAMP was 0, changing to 10.24 us") + tsamp = 0.00001024 + + hdr_dict['TSAMP'] = tsamp * 1e6 + ts_integer = int(ts) + hdr_dict['UTC_START'] = datetime.utcfromtimestamp(ts_integer).strftime("%Y-%m-%d-%H:%M:%S") + + fine_time = _extract_tensor_shape("fine_time", hdr_dict['_tensor']) + + bits_per_sample = int(hdr_dict['NBEAM']) * \ + int(hdr_dict['NANT']) * \ + int(hdr_dict['NPOL']) * \ + int(hdr_dict['NBIT']) * \ + int(hdr_dict['NDIM']) * \ + int(hdr_dict['NCHAN']) + + resolution = (bits_per_sample * fine_time) / 8 + hdr_dict['RESOLUTION'] = resolution + + bytes_per_second = int((bits_per_sample / 8) / tsamp) + hdr_dict['BYTES_PER_SECOND'] = bytes_per_second + + hdr_dict['FILE_SIZE'] = bytes_per_second * 8 + + if hdr_dict['_tensor']['labels'] == ['time', 'beam', 'freq', 'fine_time']: + hdr_dict['ORDER'] = 'SFT' + + for key, val in hdr_dict.items(): + if key not in keys_to_skip: + if isinstance(val, (str, float, int)): + s += _keyval_to_dadastr(key, val) + s_padding = "\x00" + if len(s) > hdrlen: + raise RuntimeError("Header is too large for HDR_SIZE! Increase hdrlen") + n_pad = hdrlen - len(s) + return s + s_padding * n_pad + +class PsrDadaSinkBlock(SinkBlock): + def __init__(self, iring, buffer_key, gulp_nframe, space=None, *args, **kwargs): + super(PsrDadaSinkBlock, self).__init__(iring, gulp_nframe, *args, **kwargs) + self.hdu = Hdu() + self.hdu.connect_write(buffer_key) + self.keywords_to_add = {} + self.keywords_to_sub = [] + self.keywords_to_change = {} + + def add_header_keywords(self, hdr_dict): + """Add specified keywords to outgoing header dict""" + for key, value in hdr_dict.items(): + self.keywords_to_add[key] = value + + def sub_header_keywords(self, hdr_dict): + """Remove specified keywords from outgoing header dict""" + for key in hdr_dict: + self.keywords_to_sub.append(key) + + def remap_prefixed_keywords(self, prefix, suffixes): + """Remap the keywords in suffixes, removing the prefix""" + for suffix in suffixes: + self.keywords_to_change[prefix + suffix] = suffix + + def on_sequence(self, iseq): + print("PsrDadaSinkBlock::on_sequence") + updated_header = iseq.header.copy() + # rename some header keywords + for key, value in self.keywords_to_change.items(): + try: + self.keywords_to_add[value] = updated_header[key] + self.keywords_to_sub.append(key) + except KeyError: + pass + # insert the additional keywords + updated_header.update(self.keywords_to_add) + # remove the keywords + for key in self.keywords_to_sub: + try: + del updated_header[key] + except KeyError: + pass + dada_header_str = generate_dada_header(updated_header) + dada_header_buf = next(self.hdu.header_block) + + dada_header_buf.data[:] = np.fromstring(dada_header_str.encode('ascii'), dtype='uint8') + dada_header_buf.close() + + def on_sequence_end(self, iseq): + self.hdu.disconnect() + + def on_data(self, ispan): + + # TODO: Make this work in CUDA space + dada_blk = next(self.hdu.data_block) + + nbyte = ispan.data.nbytes + _check(_bf.bfMemcpy(dada_blk.ptr, _bf.BF_SPACE_SYSTEM, + ispan.data.ctypes.data, _bf.BF_SPACE_SYSTEM, + nbyte)) + + #dada_blk.data[:] = ispan.data.view('u8') + dada_blk.close() + +def read_psrdada_buffer(buffer_key, header_callback, gulp_nframe, space=None, single=False, *args, **kwargs): """Read data from a PSRDADA ring buffer. @@ -139,6 +324,7 @@ def read_psrdada_buffer(buffer_key, header_callback, gulp_nframe, space=None, header_callback (func): A function f(psrdata_header_dict) -> bifrost_header_dict. gulp_nframe (int): No. frames to process at a time. space (string): The output memory space (all Bifrost spaces are supported). + single (bool): Only process a single data stream with the block *args: Arguments to ``bifrost.pipeline.SourceBlock``. **kwargs: Keyword Arguments to ``bifrost.pipeline.SourceBlock``. @@ -162,5 +348,14 @@ def read_psrdada_buffer(buffer_key, header_callback, gulp_nframe, space=None, References: http://psrdada.sourceforge.net/ """ - return PsrDadaSourceBlock(buffer_key, header_callback, gulp_nframe, space, + return PsrDadaSourceBlock(buffer_key, header_callback, gulp_nframe, space, single, *args, **kwargs) + +def write_psrdada_buffer(iring, buffer_key, gulp_nframe, *args, **kwargs): + """ Write into a PSRDADA ring buffer + + Note: + Initial version, currently only supports system space (not CUDA) + """ + return PsrDadaSinkBlock(iring, buffer_key, gulp_nframe) + diff --git a/python/bifrost/psrdada.py b/python/bifrost/psrdada.py index bca88e19c..bb8124fc2 100644 --- a/python/bifrost/psrdada.py +++ b/python/bifrost/psrdada.py @@ -44,25 +44,29 @@ import bifrost.libpsrdada_generated as _dada import numpy as np from bifrost.ndarray import _address_as_buffer -from bifrost.libbifrost import EndOfDataStop import ctypes -from bifrost import telemetry -telemetry.track_module() - def get_pointer_value(ptr): return ctypes.c_void_p.from_buffer(ptr).value +def _cast_to_bytes(unknown): + """ handle the difference between python 2 and 3 """ + if type(unknown) is str: + return unknown.encode("utf-8") + elif type(unknown) is bytes: + return unknown + class MultiLog(object): count = 0 def __init__(self, name=None): if name is None: name = "MultiLog%i" % MultiLog.count MultiLog.count += 1 - self.obj = _dada.multilog_open(name, '\0') + self.obj = _dada.multilog_open(_cast_to_bytes(name), _cast_to_bytes('\0')) def __del__(self): - _dada.multilog_close(self.obj) + if hasattr(self, 'obj'): + _dada.multilog_close(self.obj) class IpcBufBlock(object): def __init__(self, buf, mutable=False): @@ -119,12 +123,12 @@ def __next__(self): else: del block self.reset() - raise EndOfDataStop('IpcBufBlock empty') + raise StopIteration() def next(self): return self.__next__() def open(self): raise NotImplementedError() - def close(self, nbyte): + def close(self): raise NotImplementedError() class IpcBaseIO(IpcBaseBuf): @@ -202,13 +206,15 @@ def close(self, nbyte): class Hdu(object): def __init__(self): + self.connected = False + self.registered = False self._dada = _dada self.log = MultiLog() self.hdu = _dada.dada_hdu_create(self.log.obj) - self.connected = False def __del__(self): self.disconnect() - _dada.dada_hdu_destroy(self.hdu) + if hasattr(self, "hdu"): + _dada.dada_hdu_destroy(self.hdu) def _connect(self, buffer_key=0xDADA): self.buffer_key = buffer_key _dada.dada_hdu_set_key(self.hdu, self.buffer_key) @@ -235,6 +241,16 @@ def _unlock(self): def relock(self): self._unlock() self._lock(self.mode) + def _register(self): + if not self.registered: + if _dada.dada_cuda_dbregister(self.hdu) < 0: + raise IOError("Failed to register memory with CUDA driver") + self.registered = True + def _unregister(self): + if self.registered: + if _dada.dada_cuda_dbunregister(self.hdu) < 0: + raise IOError("Failed to unregister memory with CUDA driver") + self.registered = False def open_HACK(self): if _dada.ipcio_open(self.data_block.io, 'w') < 0: raise IOError("ipcio_open failed") @@ -244,6 +260,7 @@ def connect_read(self, buffer_key=0xDADA): self.header_block = IpcReadHeaderBuf(self.hdu.contents.header_block) self.data_block = IpcReadDataBuf(self.hdu.contents.data_block) self.connected = True + self._register() def connect_write(self, buffer_key=0xDADA): self._connect(buffer_key) self._lock('write') @@ -252,6 +269,7 @@ def connect_write(self, buffer_key=0xDADA): self.connected = True def disconnect(self): if self.connected: + self._unregister() self._unlock() self._disconnect() self.connected = False From 04c490a3e11aa5cfa53750da58253a2d7c1e87ac Mon Sep 17 00:00:00 2001 From: jaycedowell Date: Fri, 26 May 2023 10:00:42 -0600 Subject: [PATCH 02/11] Add telemetry. --- python/bifrost/blocks/dada_file.py | 6 ++++-- python/bifrost/blocks/psrdada.py | 6 ++++-- python/bifrost/psrdada.py | 5 ++++- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/python/bifrost/blocks/dada_file.py b/python/bifrost/blocks/dada_file.py index 9841a2cd2..979211b73 100755 --- a/python/bifrost/blocks/dada_file.py +++ b/python/bifrost/blocks/dada_file.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016, The Bifrost Authors. All rights reserved. +# Copyright (c) 2016-2023, The Bifrost Authors. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -39,6 +39,9 @@ import glob import os +from bifrost import telemetry +telemetry.track_module() + def _angle_str_to_sigproc(ang): aparts = ang.split(':') if len(aparts) == 2: @@ -202,4 +205,3 @@ def read_dada_file(filename, header_callback, gulp_nframe, *args, **kwargs): dtype (bifrost dtype string): dtype, e.g. f32, cf32 """ return DadaFileReadBlock(filename, header_callback, gulp_nframe, *args, **kwargs) - diff --git a/python/bifrost/blocks/psrdada.py b/python/bifrost/blocks/psrdada.py index 8a2f9fde0..ac1c990cf 100644 --- a/python/bifrost/blocks/psrdada.py +++ b/python/bifrost/blocks/psrdada.py @@ -1,5 +1,5 @@ -# Copyright (c) 2016-2021, The Bifrost Authors. All rights reserved. +# Copyright (c) 2016-2023, The Bifrost Authors. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -38,6 +38,9 @@ from copy import deepcopy import os +from bifrost import telemetry +telemetry.track_module() + # TODO: Move to memory.py? def _get_space(arr): if isinstance(arr, bifrost.ndarray): @@ -358,4 +361,3 @@ def write_psrdada_buffer(iring, buffer_key, gulp_nframe, *args, **kwargs): Initial version, currently only supports system space (not CUDA) """ return PsrDadaSinkBlock(iring, buffer_key, gulp_nframe) - diff --git a/python/bifrost/psrdada.py b/python/bifrost/psrdada.py index bb8124fc2..9d4842a89 100644 --- a/python/bifrost/psrdada.py +++ b/python/bifrost/psrdada.py @@ -1,5 +1,5 @@ -# Copyright (c) 2016-2021, The Bifrost Authors. All rights reserved. +# Copyright (c) 2016-2023, The Bifrost Authors. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -47,6 +47,9 @@ import ctypes +from bifrost import telemetry +telemetry.track_module() + def get_pointer_value(ptr): return ctypes.c_void_p.from_buffer(ptr).value From 22df34fa8c3368e1f1d642337263a6b5ebc5b34c Mon Sep 17 00:00:00 2001 From: jaycedowell Date: Fri, 26 May 2023 10:02:57 -0600 Subject: [PATCH 03/11] Switch back to EndOfDataStop. --- python/bifrost/psrdada.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/bifrost/psrdada.py b/python/bifrost/psrdada.py index 9d4842a89..e91ece10b 100644 --- a/python/bifrost/psrdada.py +++ b/python/bifrost/psrdada.py @@ -126,7 +126,7 @@ def __next__(self): else: del block self.reset() - raise StopIteration() + raise EndOfDataStop('IpcBufBlock empty') def next(self): return self.__next__() def open(self): From ff62420f29f7d45e312783cf1b98bbe9a9a11ada Mon Sep 17 00:00:00 2001 From: jaycedowell Date: Fri, 26 May 2023 10:12:10 -0600 Subject: [PATCH 04/11] Add in cf64/f64 and ci16/i16 data types. --- python/bifrost/blocks/psrdada.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/bifrost/blocks/psrdada.py b/python/bifrost/blocks/psrdada.py index ac1c990cf..18f1c1777 100644 --- a/python/bifrost/blocks/psrdada.py +++ b/python/bifrost/blocks/psrdada.py @@ -190,8 +190,12 @@ def generate_dada_header(hdr_dict, hdrlen=4096): print(hdr_dict['_tensor']) dtype = hdr_dict['_tensor']['dtype'] dtype_vals = { + 'cf64': { 'NBIT': '64', 'NDIM': '2' }, + 'f64': { 'NBIT': '64', 'NDIM': '1' }, 'cf32': { 'NBIT': '32', 'NDIM': '2' }, 'f32': { 'NBIT': '32', 'NDIM': '1' }, + 'ci16': { 'NBIT': '16', 'NDIM': '2' }, + 'i16': { 'NBIT': '16', 'NDIM': '1' } 'ci8': { 'NBIT': '8', 'NDIM': '2' }, 'i8': { 'NBIT': '8', 'NDIM': '1' } } if dtype in dtype_vals.keys(): From 104394ccb2d45ad568d5f1fe8c5179aedab0b7a4 Mon Sep 17 00:00:00 2001 From: jaycedowell Date: Fri, 26 May 2023 10:13:49 -0600 Subject: [PATCH 05/11] Division change. --- python/bifrost/blocks/psrdada.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/bifrost/blocks/psrdada.py b/python/bifrost/blocks/psrdada.py index 18f1c1777..703b6d2f6 100644 --- a/python/bifrost/blocks/psrdada.py +++ b/python/bifrost/blocks/psrdada.py @@ -25,7 +25,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -from __future__ import absolute_import +from __future__ import absolute_import, division from bifrost.pipeline import SourceBlock, SinkBlock from bifrost.Space import Space @@ -239,7 +239,7 @@ def generate_dada_header(hdr_dict, hdrlen=4096): resolution = (bits_per_sample * fine_time) / 8 hdr_dict['RESOLUTION'] = resolution - bytes_per_second = int((bits_per_sample / 8) / tsamp) + bytes_per_second = int((bits_per_sample // 8) / tsamp) hdr_dict['BYTES_PER_SECOND'] = bytes_per_second hdr_dict['FILE_SIZE'] = bytes_per_second * 8 From a3a2ce3878904014ee242b2c19cc52bc3d234e18 Mon Sep 17 00:00:00 2001 From: jaycedowell Date: Fri, 26 May 2023 10:16:48 -0600 Subject: [PATCH 06/11] Missing comma. --- python/bifrost/blocks/psrdada.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/bifrost/blocks/psrdada.py b/python/bifrost/blocks/psrdada.py index 703b6d2f6..10fa9d735 100644 --- a/python/bifrost/blocks/psrdada.py +++ b/python/bifrost/blocks/psrdada.py @@ -195,7 +195,7 @@ def generate_dada_header(hdr_dict, hdrlen=4096): 'cf32': { 'NBIT': '32', 'NDIM': '2' }, 'f32': { 'NBIT': '32', 'NDIM': '1' }, 'ci16': { 'NBIT': '16', 'NDIM': '2' }, - 'i16': { 'NBIT': '16', 'NDIM': '1' } + 'i16': { 'NBIT': '16', 'NDIM': '1' }, 'ci8': { 'NBIT': '8', 'NDIM': '2' }, 'i8': { 'NBIT': '8', 'NDIM': '1' } } if dtype in dtype_vals.keys(): From 090f7f28fcec957daa496127991d87cd552844e3 Mon Sep 17 00:00:00 2001 From: jaycedowell Date: Fri, 26 May 2023 10:20:31 -0600 Subject: [PATCH 07/11] Try to install PSRDADA for testing. --- .github/workflows/main.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 59d67ddb1..6fd636983 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -49,6 +49,16 @@ jobs: gawk \ gnu-sed \ pkg-config + -name: "Software Install - PSRDADA" + if: ${{ matrix.os == 'ubuntu-20.04' || matrix.os == 'self-hosted' }} + run: | + git clone git://git.code.sf.net/p/psrdada/code psrdada + cd psrdada + ./bootstrap + ./configure + make -j all + sudo make install + cd .. - uses: actions/setup-python@v4.3.0 with: python-version: ${{ matrix.python-version }} From 3b6d0988bfa6fb4179c90f7c4aa960451a642e0a Mon Sep 17 00:00:00 2001 From: jaycedowell Date: Fri, 26 May 2023 10:26:11 -0600 Subject: [PATCH 08/11] Typos. --- .github/workflows/main.yml | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6fd636983..c89d125b1 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -49,16 +49,16 @@ jobs: gawk \ gnu-sed \ pkg-config - -name: "Software Install - PSRDADA" - if: ${{ matrix.os == 'ubuntu-20.04' || matrix.os == 'self-hosted' }} - run: | - git clone git://git.code.sf.net/p/psrdada/code psrdada - cd psrdada - ./bootstrap - ./configure - make -j all - sudo make install - cd .. + - name: "Software Install - PSRDADA" + if: ${{ matrix.os == 'ubuntu-20.04' || matrix.os == 'self-hosted' }} + run: | + git clone git://git.code.sf.net/p/psrdada/code psrdada + cd psrdada + ./bootstrap + ./configure + make -j all + sudo make install + cd .. - uses: actions/setup-python@v4.3.0 with: python-version: ${{ matrix.python-version }} From 806fc6448baf4e9c8a4f5aa42e01790a5c9e3775 Mon Sep 17 00:00:00 2001 From: jaycedowell Date: Fri, 26 May 2023 11:11:47 -0600 Subject: [PATCH 09/11] Add in autoconf and automake. --- .github/workflows/main.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index c89d125b1..8bf9e4bd8 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -31,6 +31,8 @@ jobs: run: | sudo apt-get update && \ sudo apt-get install -y \ + autoconf \ + automake \ build-essential \ ca-certificates \ curl \ From f0c62fed0f33bfc21355e11497fcbb6a903f2004 Mon Sep 17 00:00:00 2001 From: jaycedowell Date: Fri, 26 May 2023 11:19:16 -0600 Subject: [PATCH 10/11] Add in libtool. --- .github/workflows/main.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 8bf9e4bd8..39d9061dc 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -40,6 +40,7 @@ jobs: gfortran \ git \ libopenblas-dev \ + libtool \ pkg-config \ software-properties-common - name: "Software Install - MacOS" From 78de6d44c99f55beaa8f95e4d143ee8c8d480cd4 Mon Sep 17 00:00:00 2001 From: jaycedowell Date: Fri, 26 May 2023 11:23:33 -0600 Subject: [PATCH 11/11] PSRDADA dies on a missing python for self-hosted. --- .github/workflows/main.yml | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 39d9061dc..30baeefaa 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -52,16 +52,6 @@ jobs: gawk \ gnu-sed \ pkg-config - - name: "Software Install - PSRDADA" - if: ${{ matrix.os == 'ubuntu-20.04' || matrix.os == 'self-hosted' }} - run: | - git clone git://git.code.sf.net/p/psrdada/code psrdada - cd psrdada - ./bootstrap - ./configure - make -j all - sudo make install - cd .. - uses: actions/setup-python@v4.3.0 with: python-version: ${{ matrix.python-version }} @@ -92,6 +82,16 @@ jobs: jupyter_client \ nbformat \ nbconvert + - name: "Software Install - PSRDADA" + if: ${{ matrix.os == 'ubuntu-20.04' || matrix.os == 'self-hosted' }} + run: | + git clone git://git.code.sf.net/p/psrdada/code psrdada + cd psrdada + ./bootstrap + ./configure + make -j all + sudo make install + cd .. - uses: actions/checkout@v3 - name: "Build and Install" run: |