Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev jojomale #83

Merged
merged 14 commits into from
Dec 4, 2024
47 changes: 40 additions & 7 deletions docs/source/modules/correlate/correlator.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ The Correlator Object
---------------------

The Object governing the actual computation is :py:class:`~seismic.correlate.correlate.Correlator`.
It takes only two input arguments and the most important is the dictionary / yml file that we created in `the preceding step <./get_started.html#download-data>`_.
The second input argument is the :py:class:`~seismic.trace_data.waveform.Store_Client` we used to `download <../trace_data/waveform.html#download-data>`_ our data.
It takes only two input arguments and the most important is the dictionary / yml file that
we created in `the preceding step <./get_started.html#download-data>`_.
The second input argument is the :py:class:`~seismic.trace_data.waveform.Store_Client`
we used to `download <../trace_data/waveform.html#download-data>`_ our data.

As Computing Ambient Noise Correlations is computationally very expensive, **SeisMIC** can be used in conjuction with MPI (e.g., openMPI) to enable processing with several cores or on HPCs.
As Computing Ambient Noise Correlations is computationally very expensive,
**SeisMIC** can be used in conjuction with MPI (e.g., openMPI) to enable processing
with several cores or on HPCs.

A script to start your correlation could look like this:

Expand All @@ -43,18 +47,47 @@ A script to start your correlation could look like this:
params = '/path/to/my/params.yaml'
# You don't have to set this (could be None)
client = Client('MYFDSN')
client.set_eida_token('/I/want/embargoed/data.txt)
client.set_eida_token('/I/want/embargoed/data.txt')
# root is the same as proj_dir in params.yaml
root = '/path/to/project/'
sc = Store_Client(client, root)

c = Correlator(sc, options=params)
c = Correlator(options=params, store_client=sc)
print('Correlator initiated')
x = time()
st = c.pxcorr()
print('Correlation finished after', time()-x, 'seconds')

This script can be iniated in bash using:

If you did not download any data using **SeisMIC**, you can use the Correlator object
without setting a :py:class:`~seismic.trace_data.waveform.Store_Client` object.
In this case, the Correlator creates a :py:class:`~seismic.trace_data.waveform.Local_Store_Client`
object internally from `options`. The above script would then look like this:

.. code-block:: python
:caption: mycorrelation_noclient.py
:linenos:

from time import time
import os
# This tells numpy to only use one thread
# As we use MPI this is necessary to avoid overascribing threads
os.environ['OPENBLAS_NUM_THREADS'] = '1'

from seismic.correlate.correlate import Correlator

# Path to the paramter file we created in the step before
params = '/path/to/my/params.yaml'

c = Correlator(options=params)
print('Correlator initiated')
x = time()
st = c.pxcorr()
print('Correlation finished after', time()-x, 'seconds')



This script can be initiated in bash using:

.. code-block:: bash

Expand All @@ -65,4 +98,4 @@ that you will want to use is :py:meth:`~seismic.correlate.correlate.Correlator.p

.. note::
On some MPI versions, the parameters are named differently. For example (`-n` could correspond to `-c`). When in doubt, refer to the help
or man page of your `mpirun` `mpiexec` command.
or man page of your `mpirun` or `mpiexec` command.
33 changes: 33 additions & 0 deletions docs/source/modules/correlate/get_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ Setting the parameters
log_level: 'WARNING'
# folder for figures
fig_subdir : 'figures'
# Path to stationxml files, default is "inventory/*.xml"
stationxml_file : 'inventory/*.xml'
# sds root directory, default is "mseed"
sds_dir : 'mseed'
# sds format string of waveform file names,
# change if your filenames deviate from standard pattern
sds_fmtstr : '{year}/{network}/{station}/{channel}.{sds_type}/{network}.{station}.{location}.{channel}.{sds_type}.{year}.{doy:03d}'



#### parameters that are network specific
Expand Down Expand Up @@ -284,11 +292,36 @@ Project Wide Parameters
log_level: 'WARNING'
# folder for figures
fig_subdir : 'figures'
# Path to stationxml files, default is "inventory/*.xml"
stationxml_file : 'inventory/*.xml'
# sds root directory, default is "mseed"
sds_dir : 'mseed'
# sds format string of waveform file names,
# change if your filenames deviate from standard SDS pattern
sds_fmtstr : '{year}/{network}/{station}/{channel}.{sds_type}/{network}.{station}.{location}.{channel}.{sds_type}.{year}.{doy:03d}'


Those are parameters that govern the logging and the file-structure. ``proj_dir`` is the root directory, we have chosen when initialising our
:py:class:`~seismic.trace_data.waveform.Store_Client` as shown `here <../trace_data/waveform.html#download-data>`_ .
``fig_dir`` and ``log_dir`` are just subdirectories for figures and logs, respectively, and the log level decides how much will actually be logged.
In most cases, *WARNING* is the appropriate choice - everything below will start spitting out a lot of information.
``stationxml_file`` is the path to the stationxml files, and ``sds_dir`` is the directory where the mseed files are stored.
``sds_fmtstr`` is the format string for the mseed files. The default is the standard for
`SeisComP Data Structure (SDS) <https://www.seiscomp.de/seiscomp3/doc/applications/slarchive/SDS.html>`_.
Leave these parameters to the default values if you used SeisMIC to download the data.
However, if you want to use your own local data base, you will have to adjust these parameters
accordingly. Wildcards are permitted.

The `sds_fmtstr` defines the naming pattern of the mseed files. The following placeholders (everything
in {}) can be used:
`year`, `network`, `station`, `channel`, `location`, `sds_type`, `doy`.
The standard pattern for SeisComP3 is:

.. code-block:: yaml

sds_fmtstr : '{year}/{network}/{station}/{channel}.{sds_type}/{network}.{station}.{location}.{channel}.{sds_type}.{year}.{doy:03d}'



Network Specific Parameters
===========================
Expand Down
5 changes: 3 additions & 2 deletions environment.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: seismic
name: seismic-dev
channels:
- conda-forge
dependencies:
Expand All @@ -12,10 +12,11 @@ dependencies:
- prov
- pytest
- pyyaml
- python>=3.10
- python>=3.10, <3.12
- scipy<=1.11.2
- flake8
- tqdm
- ipykernel
- pip:
- sphinx < 6.0
- pydata-sphinx-theme
Expand Down
2 changes: 1 addition & 1 deletion examples/correlate.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
root = 'data'
sc = Store_Client(client, root)

c = Correlator(sc, options=params)
c = Correlator(options=params, store_client=sc)
print('Correlator initiated')
x = time()
st = c.pxcorr()
Expand Down
4 changes: 4 additions & 0 deletions examples/params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ log_subdir : 'log'
log_level: 'WARNING'
# folder for figures
fig_subdir : 'figures'
# Path to stationxml files if not downloaded
stationxml_file : 'inventory/*.xml'
# sds root directory, default is "mseed"
sds_dir : 'mseed'


#### parameters that are network specific
Expand Down
106 changes: 14 additions & 92 deletions examples/spatial.ipynb

Large diffs are not rendered by default.

963 changes: 25 additions & 938 deletions examples/tutorial.ipynb

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions params_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ log_subdir : 'log'
log_level: 'WARNING'
# folder for figures
fig_subdir : 'figures'
# Path to stationxml files if not downloaded
stationxml_file : '/path/to/stations/*.xml'
# sds root directory, default is "mseed"
sds_dir : '/path/to/sds_root'
# sds format string of waveform file names,
# change if your filenames deviate from standard pattern
sds_fmtstr : '{year}/{network}/{station}/{channel}.{sds_type}/{network}.{station}.{location}.{channel}.{sds_type}.{year}.{doy:03d}'


#### parameters that are network specific
Expand Down
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
# Broader requirements are specified in the environment.yml

geographiclib
h5py ==3.9.0
h5py <=3.12.1
matplotlib <=3.7.2
mpi4py<=3.1.4
mpi4py<=4.0.1
openmpi <=5.0.6
numpy<=1.25.2
obspy<=1.4.0, >=1.3.1
pip
Expand Down
63 changes: 40 additions & 23 deletions src/seismic/correlate/correlate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
Peter Makus (makus@gfz-potsdam.de)

Created: Monday, 29th March 2021 07:58:18 am
Last Modified: Tuesday, 19th November 2024 01:57:48 pm
Last Modified: Monday, 04th December 2024 03:07:26 pm (J. Lehr)
'''
from copy import deepcopy
from typing import Iterator, List, Tuple, Optional
from warnings import warn
import os
Expand All @@ -30,7 +29,7 @@
from seismic.correlate import preprocessing_td as pptd
from seismic.correlate import preprocessing_stream as ppst
from seismic.db.corr_hdf5 import CorrelationDataBase, h5_FMTSTR
from seismic.trace_data.waveform import Store_Client
from seismic.trace_data.waveform import Store_Client, Local_Store_Client
from seismic.utils.fetch_func_from_str import func_from_str
from seismic.utils import miic_utils as mu

Expand All @@ -40,22 +39,21 @@ class Correlator(object):
Object to manage the actual Correlation (i.e., Green's function retrieval)
for the database.
"""
def __init__(self, store_client: Store_Client, options: dict or str):
def __init__(self, options: dict | str, store_client: Store_Client = None):
"""
Initiates the Correlator object. When executing
:func:`~seismic.correlate.correlate.Correlator.pxcorr()`, it will
actually compute the correlations and save them in an hdf5 file that
can be handled using
:class:`~seismic.db.corr_hdf5.CorrelationDataBase`.
Data has to be preprocessed before calling this (i.e., the data already
has to be given in an ASDF format). Consult
:class:`~seismic.trace_data.preprocess.Preprocessor` for information on
how to proceed with this.

:param options: Dictionary containing all options for the correlation.
Can also be a path to a yaml file containing all the keys required
in options.
:type options: dict or str
:param store_client: Object that handles the data retrieval. If None,
a :class:`~seismic.trace_data.waveform.Local_Store_Client` will be
initiated. In this case all data has to be available locally.
"""
if isinstance(options, str):
with open(options) as file:
Expand Down Expand Up @@ -102,19 +100,23 @@ def __init__(self, store_client: Store_Client, options: dict or str):
consoleHandler.setFormatter(fmt)
self.logger.addHandler(consoleHandler)

self.logger.debug("ID of core {:01d} is {:d}".format(
self.rank, id(self.comm)))

# Write the options dictionary to the log file
if self.rank == 0:
opt_dump = deepcopy(options)
# json cannot write the UTCDateTime objects that might be in here
for step in opt_dump['co']['preProcessing']:
if 'stream_mask_at_utc' in step['function']:
startsstr = [
t.format_fissures() for t in step['args']['starts']]
step['args']['starts'] = startsstr
if 'ends' in step['args']:
endsstr = [
t.format_fissures() for t in step['args']['ends']]
step['args']['ends'] = endsstr
opt_dump = mu.utcdatetime2str(options)
# opt_dump = deepcopy(options)
# # json cannot write the UTCDateTime objects that might be in here
# for step in opt_dump['co']['preProcessing']:
# if 'stream_mask_at_utc' in step['function']:
# startsstr = [
# t.format_fissures() for t in step['args']['starts']]
# step['args']['starts'] = startsstr
# if 'ends' in step['args']:
# endsstr = [t.format_fissures()
# for t in step['args']['ends']]
# step['args']['ends'] = endsstr
with open(os.path.join(
logdir, 'params%s.txt' % tstr), 'w') as file:
file.write(json.dumps(opt_dump, indent=1))
Expand All @@ -137,6 +139,8 @@ def __init__(self, store_client: Store_Client, options: dict or str):
# location = options['net']['location']

# Store_Client
if store_client is None:
store_client = Local_Store_Client(options)
self.store_client = store_client

if isinstance(station, list) and len(station) == 1:
Expand Down Expand Up @@ -491,7 +495,9 @@ def _generate_data(self) -> Iterator[Tuple[Stream, bool]]:
pmap = pmap.astype(np.int32)
ind = pmap == self.rank
ind = np.arange(len(self.avail_raw_data))[ind]

self.logger.debug("Core %d reading %s" % (
self.rank, str(np.array(self.avail_raw_data)[ind])
))
# Loop over read increments
for t in tqdm(loop_window):
write_flag = True # Write length is same as read length
Expand All @@ -501,6 +507,9 @@ def _generate_data(self) -> Iterator[Tuple[Stream, bool]]:

# loop over queried stations
for net, stat, loc, cha in np.array(self.avail_raw_data)[ind]:
self.logger.debug("Core %d reading %s, %s, %s" % (
self.rank, net, stat, cha
))
# Load data
stext = self.store_client._load_local(
net, stat, loc, cha, startt, endt, True, False)
Expand All @@ -510,6 +519,9 @@ def _generate_data(self) -> Iterator[Tuple[Stream, bool]]:
continue
st = st.extend(stext)

self.logger.info("Core %d processes %d traces. Ids are %s" % (
self.rank, len(st), str([tr.id for tr in st])
))
# Stream based preprocessing
# Downsampling
# 04/04/2023 Downsample before preprocessing for performance
Expand Down Expand Up @@ -634,8 +646,11 @@ def _generate_data(self) -> Iterator[Tuple[Stream, bool]]:
f'No new data for times {winstart}-{winend}')
continue

self.logger.debug('Working on correlation times %s-%s' % (
str(win[0].stats.starttime), str(win[0].stats.endtime)))
self.logger.debug(('Core %d working on correlation'
+ ' times %s-%s of %d traceswith ids %s') % (
self.rank,
str(win[0].stats.starttime), str(win[0].stats.endtime),
len(win), str([tr.id for tr in win])))
win = win.merge()
win = win.trim(winstart, winend, pad=True)
yield win, write_flag
Expand All @@ -652,7 +667,9 @@ def _pxcorr_matrix(self, A: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:

# indices for traces to be worked on by each process
ind = pmap == self.rank

self.logger.debug("Core %d working on indices %d-%d" % (
self.rank, ind[0], ind[-1]
))
######################################
corr_args = self.options['corr_args']
# time domain pre-processing
Expand Down
2 changes: 1 addition & 1 deletion src/seismic/correlate/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def extract_trace(
return outdata

def extract_multi_trace(
self, win_inc: int or List[int], method: str = 'mean',
self, win_inc: int | List[int], method: str = 'mean',
percentile: float = 50.) -> List[np.ndarray]:
"""
Extract several representative traces from a correlation matrix.
Expand Down
Loading
Loading