diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index daa3bdde8..0e7b9c0ee 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -33,6 +33,8 @@ jobs: run: | sudo apt-get update && \ sudo apt-get install -y \ + autoconf \ + automake \ build-essential \ ca-certificates \ curl \ @@ -41,6 +43,7 @@ jobs: git \ libhwloc-dev \ libopenblas-dev \ + libtool \ pkg-config \ software-properties-common - name: "Software Install - MacOS" @@ -84,6 +87,16 @@ jobs: jupyter_client \ nbformat \ nbconvert + - name: "Software Install - PSRDADA" + if: ${{ matrix.os == 'ubuntu-20.04' || matrix.os == 'self-hosted' }} + run: | + git clone git://git.code.sf.net/p/psrdada/code psrdada + cd psrdada + ./bootstrap + ./configure + make -j all + sudo make install + cd .. - uses: actions/checkout@v3 - name: "Build and Install" run: | diff --git a/python/bifrost/blocks/dada_file.py b/python/bifrost/blocks/dada_file.py new file mode 100755 index 000000000..979211b73 --- /dev/null +++ b/python/bifrost/blocks/dada_file.py @@ -0,0 +1,207 @@ +# Copyright (c) 2016-2023, The Bifrost Authors. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of The Bifrost Authors nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +""" +# binary_file_io.py + +Basic file I/O blocks for reading and writing data. +""" +import numpy as np +import time +import bifrost as bf +import bifrost.pipeline as bfp +from bifrost.dtype import string2numpy +from astropy.time import Time +import glob +import os + +from bifrost import telemetry +telemetry.track_module() + +def _angle_str_to_sigproc(ang): + aparts = ang.split(':') + if len(aparts) == 2: + sp_ang = int(aparts[0])*10000 + int(aparts[1])*100 + 0.0 + elif len(aparts) == 3: + sp_ang = int(aparts[0])*10000 + int(aparts[1])*100 + float(aparts[2]) + else: + raise RuntimeError("Cannot parse: {ang} does not match XX:YY:ZZ.zz".format(ang=ang)) + +class DadaFileRead(object): + """ Simple file-like reading object for pipeline testing + + Args: + filename (str): Name of file to open + dtype (np.dtype or str): datatype of data, e.g. float32. This should be a *numpy* dtype, + not a bifrost.ndarray dtype (eg. float32, not f32) + gulp_size (int): How much data to read per gulp, (i.e. sub-array size) + """ + def __init__(self, filename, header_callback): + super(DadaFileRead, self).__init__() + if isinstance(filename, str): + self.file_obj = open(filename, 'rb') + self.nfiles = 1 + else: + self.file_obj = open(filename[0], 'rb') + self.nfiles = len(filename) + + self.filenames = filename + self.fcount = 0 + + print("%i/%i: opening %s" % (self.fcount+1, self.nfiles, self.file_obj.name)) + self._header_callback = header_callback + self.header = self._read_header(header_callback) + itensor = self.header['_tensor'] + self.dtype = np.dtype(string2numpy(itensor['dtype'])) + self.block_shape = np.copy(itensor['shape']) + self.block_shape[0] = 1 + self.block_size = np.prod(self.block_shape) + self.gulp_size = self.block_size * self.dtype.itemsize + + def _read_header(self, header_callback): + """ Read DADA header, and apply header_callback + + Applies header_callback to convert DADA header to bifrost header. Specifically, + need to generate the '_tensor' from the DADA keywords which have no formal spec. + """ + hdr_buf = self.file_obj.read(4096).decode('ascii') + hdr = {} + for line in hdr_buf.split('\n'): + try: + key, val = line.split() + hdr[key] = val + except ValueError: + pass + hdr = header_callback(hdr) + return hdr + + def _open_next_file(self): + self.file_obj.close() + self.fcount += 1 + print("%i/%i: opening %s" % (self.fcount+1, self.nfiles, self.filenames[self.fcount])) + self.file_obj = open(self.filenames[self.fcount], 'rb') + _hdr = self._read_header(self._header_callback) + + def _read_data(self): + d = np.fromfile(self.file_obj, dtype=self.dtype, count=self.block_size) + return d + + def read_block(self): + #print("Reading...") + d = self._read_data() + if d.size == 0 and self.fcount < self.nfiles - 1: + self._open_next_file() + d = self._read_data() + d = d.reshape(self.block_shape) + return d + elif d.size == 0 and self.fcount == self.nfiles - 1: + #print("EOF") + return np.array([0]) # EOF + elif self.block_size != np.prod(d.shape): + print(self.block_size, np.prod(d.shape), d.shape, d.size) + print("Warning: File truncated or gulp size does not divide n_blocks") + try: + bs_truncated = list(self.block_shape) + bs_truncated[0] = -1 # Attempt to load truncated data anyway + d = d.reshape(bs_truncated) + return d + except ValueError: + return np.array([0]) # EOF + else: + d = d.reshape(self.block_shape) + return d + + def read(self): + gulp_break = False + d = self.read_block() + return d + + + def __enter__(self): + return self + + def close(self): + pass + + def __exit__(self, type, value, tb): + self.close() + + +def generate_dada_filelist(filename): + """ Generate a list of DADA files from start filename + + Args: + filename (str): Path to file. e.g. + /data/dprice/2020-07-23-02:33:07.587_0000000000000000.000000.dada + + Returns: + flist (list): A list of all associated files + """ + bn = os.path.basename(filename) + dn = os.path.dirname(filename) + bn_root = '_'.join(bn.split('_')[:-1]) # Strips off _000.000.dada bit + flist = sorted(glob.glob(os.path.join(dn, bn_root + '_*.dada'))) + return flist + + +class DadaFileReadBlock(bfp.SourceBlock): + def __init__(self, filename, header_callback, gulp_nframe, *args, **kwargs): + super(DadaFileReadBlock, self).__init__(filename, gulp_nframe, *args, **kwargs) + self.header_callback = header_callback + + def create_reader(self, filename): + flist = generate_dada_filelist(filename) + return DadaFileRead(flist, self.header_callback) + + def on_sequence(self, ireader, filename): + ohdr = ireader.header + return [ohdr] + + def on_data(self, reader, ospans): + indata = reader.read() + odata = ospans[0].data + #print indata.shape, odata.shape + if np.prod(indata.shape) == np.prod(odata.shape[1:]): + ospans[0].data[0] = indata + return [1] + else: + # EOF or truncated block + return [0] + + +def read_dada_file(filename, header_callback, gulp_nframe, *args, **kwargs): + """ Block for reading binary data from file and streaming it into a bifrost pipeline + + Args: + filenames (list): A list of filenames to open + header_callback (method): A function that converts from PSRDADA header into a + bifrost header. + gulp_size (int): Number of elements in a gulp (i.e. sub-array size) + gulp_nframe (int): Number of frames in a gulp. (Ask Ben / Miles for good explanation) + dtype (bifrost dtype string): dtype, e.g. f32, cf32 + """ + return DadaFileReadBlock(filename, header_callback, gulp_nframe, *args, **kwargs) diff --git a/python/bifrost/blocks/psrdada.py b/python/bifrost/blocks/psrdada.py index 6965e7f3f..d9bf8320d 100644 --- a/python/bifrost/blocks/psrdada.py +++ b/python/bifrost/blocks/psrdada.py @@ -25,12 +25,17 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -from bifrost.pipeline import SourceBlock +from bifrost.pipeline import SourceBlock, SinkBlock from bifrost.Space import Space from bifrost.psrdada import Hdu from bifrost.libbifrost import _bf, _check import bifrost.ndarray +import numpy as np +from datetime import datetime +from copy import deepcopy +import os + from bifrost import telemetry telemetry.track_module() @@ -45,16 +50,19 @@ class PsrDadaBufferReader(object): def __init__(self, hdu, hdr): self.hdu = hdu self.header = hdr - self._open_next_block() + self.block = None def _open_next_block(self): self.block = next(self.hdu.data_block) self.nbyte = self.block.size_bytes() self.byte0 = 0 def readinto(self, buf): + if self.block is None: + self._open_next_block() dst_space = Space(_get_space(buf)).as_BFspace() byte0 = 0 nbyte = buf.nbytes nbyte_copy = min(nbyte - byte0, self.nbyte - self.byte0) + while nbyte_copy: _check(_bf.bfMemcpy(buf.ctypes.data + byte0, dst_space, self.block.ptr + self.byte0, _bf.BF_SPACE_SYSTEM, @@ -70,19 +78,23 @@ def readinto(self, buf): break return byte0 def close(self): - self.block.close() + if self.block: + self.block.close() def __enter__(self): return self def __exit__(self, type, value, tb): self.close() -def psrdada_read_sequence_iterator(buffer_key): +def psrdada_read_sequence_iterator(buffer_key, single=False): hdu = Hdu() hdu.connect_read(buffer_key) for hdr in hdu.header_block: with hdr: hdr_string = hdr.data.tostring() yield hdu, hdr_string + if single: + hdu.disconnect() + break hdu.disconnect() def _cast_to_type(string): @@ -91,7 +103,15 @@ def _cast_to_type(string): try: return float(string) except ValueError: pass return string + +def _cast_to_string(unknown): + if type(unknown) is bytes: + return unknown.decode('utf-8') + elif type(unknown) is str: + return unknown + def parse_dada_header(headerstr, cast_types=True): + headerstr = _cast_to_string(headerstr) headerstr = headerstr[:headerstr.find('\0')] header = {} for line in headerstr.split('\n'): @@ -107,9 +127,9 @@ def parse_dada_header(headerstr, cast_types=True): return header class PsrDadaSourceBlock(SourceBlock): - def __init__(self, buffer_key, header_callback, gulp_nframe, space=None, - *args, **kwargs): - buffer_iterator = psrdada_read_sequence_iterator(buffer_key) + def __init__(self, buffer_key, header_callback, gulp_nframe, space=None, + single=False, *args, **kwargs): + buffer_iterator = psrdada_read_sequence_iterator(buffer_key, single) super(PsrDadaSourceBlock, self).__init__(buffer_iterator, gulp_nframe, space, *args, **kwargs) self.header_callback = header_callback @@ -117,6 +137,7 @@ def create_reader(self, hdu_hdr): hdu, hdr = hdu_hdr return PsrDadaBufferReader(hdu, hdr) def on_sequence(self, reader, hdu_hdr): + print("PsrDadaSourceBlock::on_sequence") ihdr_dict = parse_dada_header(reader.header) return [self.header_callback(ihdr_dict)] def on_data(self, reader, ospans): @@ -127,7 +148,178 @@ def on_data(self, reader, ospans): nframe = nbyte // ospan.frame_nbyte return [nframe] -def read_psrdada_buffer(buffer_key, header_callback, gulp_nframe, space=None, +def _keyval_to_dadastr(key, val): + """ Convert key: value pair into a DADA string """ + return "{key:20s} {val}\n".format(key=key.upper(), val=val) + + +def _extract_tensor_scale(key, tensor): + try: + idx = tensor['labels'].index(key) + return tensor['scales'][idx] + except ValueError as e: + return [0, 0] + + +def _extract_tensor_shape(key, tensor): + try: + idx = tensor['labels'].index(key) + return tensor['shape'][idx] + except ValueError as e: + return 1 + + +def generate_dada_header(hdr_dict, hdrlen=4096): + """ Generate DADA header from header dict + + Args: + hdr_dict (dict): Header dictionary of key, value pairs + hdrlen (int): Size of header, default 4096 + + Returns: + s (str): DADA header string with padding to hdrlen + """ + s = "HDR_VERSION 1.0\n" + s+= "HDR_SIZE %i\n" % hdrlen + keys_to_skip = ('HDR_VERSION', 'HDR_SIZE') + + # update parameters from bifrost tensor + if '_tensor' in hdr_dict.keys(): + print(hdr_dict['_tensor']) + dtype = hdr_dict['_tensor']['dtype'] + dtype_vals = { + 'cf64': { 'NBIT': '64', 'NDIM': '2' }, + 'f64': { 'NBIT': '64', 'NDIM': '1' }, + 'cf32': { 'NBIT': '32', 'NDIM': '2' }, + 'f32': { 'NBIT': '32', 'NDIM': '1' }, + 'ci16': { 'NBIT': '16', 'NDIM': '2' }, + 'i16': { 'NBIT': '16', 'NDIM': '1' }, + 'ci8': { 'NBIT': '8', 'NDIM': '2' }, + 'i8': { 'NBIT': '8', 'NDIM': '1' } } + if dtype in dtype_vals.keys(): + hdr_dict['NBIT'] = dtype_vals[dtype]['NBIT'] + hdr_dict['NDIM'] = dtype_vals[dtype]['NDIM'] + + hdr_dict['NBEAM'] = _extract_tensor_shape("beam", hdr_dict['_tensor']) + hdr_dict['NANT'] = _extract_tensor_shape("station", hdr_dict['_tensor']) + hdr_dict['NPOL'] = _extract_tensor_shape("pol", hdr_dict['_tensor']) + hdr_dict['NCHAN'] = _extract_tensor_shape("freq", hdr_dict['_tensor']) + + f0 = _extract_tensor_scale("freq", hdr_dict['_tensor'])[0] + chan_bw = _extract_tensor_scale("freq", hdr_dict['_tensor'])[1] + bw = chan_bw * int(hdr_dict['NCHAN']) + hdr_dict['BW'] = bw + hdr_dict['FREQ'] = f0 + bw / 2 + + # print(hdr_dict['_tensor']) + ts = _extract_tensor_scale("time", hdr_dict['_tensor'])[0] + + tsamp = _extract_tensor_scale("fine_time", hdr_dict['_tensor'])[1] + if tsamp == 0: + tsamp = _extract_tensor_scale("time", hdr_dict['_tensor'])[1] + if tsamp == 0: + print("TSAMP was 0, changing to 10.24 us") + tsamp = 0.00001024 + + hdr_dict['TSAMP'] = tsamp * 1e6 + ts_integer = int(ts) + hdr_dict['UTC_START'] = datetime.utcfromtimestamp(ts_integer).strftime("%Y-%m-%d-%H:%M:%S") + + fine_time = _extract_tensor_shape("fine_time", hdr_dict['_tensor']) + + bits_per_sample = int(hdr_dict['NBEAM']) * \ + int(hdr_dict['NANT']) * \ + int(hdr_dict['NPOL']) * \ + int(hdr_dict['NBIT']) * \ + int(hdr_dict['NDIM']) * \ + int(hdr_dict['NCHAN']) + + resolution = (bits_per_sample * fine_time) / 8 + hdr_dict['RESOLUTION'] = resolution + + bytes_per_second = int((bits_per_sample // 8) / tsamp) + hdr_dict['BYTES_PER_SECOND'] = bytes_per_second + + hdr_dict['FILE_SIZE'] = bytes_per_second * 8 + + if hdr_dict['_tensor']['labels'] == ['time', 'beam', 'freq', 'fine_time']: + hdr_dict['ORDER'] = 'SFT' + + for key, val in hdr_dict.items(): + if key not in keys_to_skip: + if isinstance(val, (str, float, int)): + s += _keyval_to_dadastr(key, val) + s_padding = "\x00" + if len(s) > hdrlen: + raise RuntimeError("Header is too large for HDR_SIZE! Increase hdrlen") + n_pad = hdrlen - len(s) + return s + s_padding * n_pad + +class PsrDadaSinkBlock(SinkBlock): + def __init__(self, iring, buffer_key, gulp_nframe, space=None, *args, **kwargs): + super(PsrDadaSinkBlock, self).__init__(iring, gulp_nframe, *args, **kwargs) + self.hdu = Hdu() + self.hdu.connect_write(buffer_key) + self.keywords_to_add = {} + self.keywords_to_sub = [] + self.keywords_to_change = {} + + def add_header_keywords(self, hdr_dict): + """Add specified keywords to outgoing header dict""" + for key, value in hdr_dict.items(): + self.keywords_to_add[key] = value + + def sub_header_keywords(self, hdr_dict): + """Remove specified keywords from outgoing header dict""" + for key in hdr_dict: + self.keywords_to_sub.append(key) + + def remap_prefixed_keywords(self, prefix, suffixes): + """Remap the keywords in suffixes, removing the prefix""" + for suffix in suffixes: + self.keywords_to_change[prefix + suffix] = suffix + + def on_sequence(self, iseq): + print("PsrDadaSinkBlock::on_sequence") + updated_header = iseq.header.copy() + # rename some header keywords + for key, value in self.keywords_to_change.items(): + try: + self.keywords_to_add[value] = updated_header[key] + self.keywords_to_sub.append(key) + except KeyError: + pass + # insert the additional keywords + updated_header.update(self.keywords_to_add) + # remove the keywords + for key in self.keywords_to_sub: + try: + del updated_header[key] + except KeyError: + pass + dada_header_str = generate_dada_header(updated_header) + dada_header_buf = next(self.hdu.header_block) + + dada_header_buf.data[:] = np.fromstring(dada_header_str.encode('ascii'), dtype='uint8') + dada_header_buf.close() + + def on_sequence_end(self, iseq): + self.hdu.disconnect() + + def on_data(self, ispan): + + # TODO: Make this work in CUDA space + dada_blk = next(self.hdu.data_block) + + nbyte = ispan.data.nbytes + _check(_bf.bfMemcpy(dada_blk.ptr, _bf.BF_SPACE_SYSTEM, + ispan.data.ctypes.data, _bf.BF_SPACE_SYSTEM, + nbyte)) + + #dada_blk.data[:] = ispan.data.view('u8') + dada_blk.close() + +def read_psrdada_buffer(buffer_key, header_callback, gulp_nframe, space=None, single=False, *args, **kwargs): """Read data from a PSRDADA ring buffer. @@ -137,6 +329,7 @@ def read_psrdada_buffer(buffer_key, header_callback, gulp_nframe, space=None, header_callback (func): A function f(psrdata_header_dict) -> bifrost_header_dict. gulp_nframe (int): No. frames to process at a time. space (string): The output memory space (all Bifrost spaces are supported). + single (bool): Only process a single data stream with the block *args: Arguments to ``bifrost.pipeline.SourceBlock``. **kwargs: Keyword Arguments to ``bifrost.pipeline.SourceBlock``. @@ -160,5 +353,13 @@ def read_psrdada_buffer(buffer_key, header_callback, gulp_nframe, space=None, References: http://psrdada.sourceforge.net/ """ - return PsrDadaSourceBlock(buffer_key, header_callback, gulp_nframe, space, + return PsrDadaSourceBlock(buffer_key, header_callback, gulp_nframe, space, single, *args, **kwargs) + +def write_psrdada_buffer(iring, buffer_key, gulp_nframe, *args, **kwargs): + """ Write into a PSRDADA ring buffer + + Note: + Initial version, currently only supports system space (not CUDA) + """ + return PsrDadaSinkBlock(iring, buffer_key, gulp_nframe) diff --git a/python/bifrost/psrdada.py b/python/bifrost/psrdada.py index 1e6eb6d50..1f022cb98 100644 --- a/python/bifrost/psrdada.py +++ b/python/bifrost/psrdada.py @@ -42,7 +42,6 @@ import bifrost.libpsrdada_generated as _dada import numpy as np from bifrost.ndarray import _address_as_buffer -from bifrost.libbifrost import EndOfDataStop import ctypes @@ -52,15 +51,23 @@ def get_pointer_value(ptr): return ctypes.c_void_p.from_buffer(ptr).value +def _cast_to_bytes(unknown): + """ handle the difference between python 2 and 3 """ + if type(unknown) is str: + return unknown.encode("utf-8") + elif type(unknown) is bytes: + return unknown + class MultiLog(object): count = 0 def __init__(self, name=None): if name is None: name = f"MultiLog{MultiLog.count}" MultiLog.count += 1 - self.obj = _dada.multilog_open(name, '\0') + self.obj = _dada.multilog_open(_cast_to_bytes(name), _cast_to_bytes('\0')) def __del__(self): - _dada.multilog_close(self.obj) + if hasattr(self, 'obj'): + _dada.multilog_close(self.obj) class IpcBufBlock(object): def __init__(self, buf, mutable=False): @@ -122,7 +129,7 @@ def next(self): return self.__next__() def open(self): raise NotImplementedError() - def close(self, nbyte): + def close(self): raise NotImplementedError() class IpcBaseIO(IpcBaseBuf): @@ -200,13 +207,15 @@ def close(self, nbyte): class Hdu(object): def __init__(self): + self.connected = False + self.registered = False self._dada = _dada self.log = MultiLog() self.hdu = _dada.dada_hdu_create(self.log.obj) - self.connected = False def __del__(self): self.disconnect() - _dada.dada_hdu_destroy(self.hdu) + if hasattr(self, "hdu"): + _dada.dada_hdu_destroy(self.hdu) def _connect(self, buffer_key=0xDADA): self.buffer_key = buffer_key _dada.dada_hdu_set_key(self.hdu, self.buffer_key) @@ -233,6 +242,16 @@ def _unlock(self): def relock(self): self._unlock() self._lock(self.mode) + def _register(self): + if not self.registered: + if _dada.dada_cuda_dbregister(self.hdu) < 0: + raise IOError("Failed to register memory with CUDA driver") + self.registered = True + def _unregister(self): + if self.registered: + if _dada.dada_cuda_dbunregister(self.hdu) < 0: + raise IOError("Failed to unregister memory with CUDA driver") + self.registered = False def open_HACK(self): if _dada.ipcio_open(self.data_block.io, 'w') < 0: raise IOError("ipcio_open failed") @@ -242,6 +261,7 @@ def connect_read(self, buffer_key=0xDADA): self.header_block = IpcReadHeaderBuf(self.hdu.contents.header_block) self.data_block = IpcReadDataBuf(self.hdu.contents.data_block) self.connected = True + self._register() def connect_write(self, buffer_key=0xDADA): self._connect(buffer_key) self._lock('write') @@ -250,6 +270,7 @@ def connect_write(self, buffer_key=0xDADA): self.connected = True def disconnect(self): if self.connected: + self._unregister() self._unlock() self._disconnect() self.connected = False