-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspectra.py
338 lines (287 loc) · 12.5 KB
/
spectra.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
import copy
import numpy as np
import scipy.signal
import psr_utils
class Spectra(object):
"""A class to store spectra. This is mainly to provide
reusable functionality.
"""
def __init__(self, freqs, dt, data, starttime=0, dm=0):
"""Spectra constructor.
Inputs:
freqs: Observing frequencies for each channel.
dt: Sample time (in seconds).
data: A 2D numpy array containing pulsar data.
Axis 0 should contain channels. (e.g. data[0,:])
Axis 1 should contain spectra. (e.g. data[:,0])
starttime: Start time (in seconds) of the spectra
with respect to the start of the observation.
(Default: 0).
dm: Dispersion measure (in pc/cm^3). (Default: 0)
Output:
spectra_obj: Spectrum object.
"""
self.numchans, self.numspectra = data.shape
assert len(freqs)==self.numchans
self.freqs = freqs
self.data = data.astype('float')
self.dt = dt
self.starttime = starttime
self.dm = 0
def __str__(self):
return str(self.data)
def __getitem__(self, key):
return self.data[key]
def __setitem__(self, key, value):
self.data[key] = value
def get_chan(self, channum):
return self.data[channum,:]
def get_spectrum(self, specnum):
return self.data[:,specnum]
def shift_channels(self, bins, padval=0):
"""Shift each channel to the left by the corresponding
value in bins, an array.
Inputs:
bins: An array containing the number of bins
to shift each channel by.
padval: Value to use when shifting near the edge
of a channel. This can be a numeric value,
'median', 'mean', or 'rotate'.
The values 'median' and 'mean' refer to the
median and mean of the channel. The value
'rotate' takes values from one end of the
channel and shifts them to the other.
Outputs:
None
*** Shifting happens in-place ***
"""
assert self.numchans == len(bins)
for ii in range(self.numchans):
chan = self.get_chan(ii)
# Use 'chan[:]' so update happens in-place
# this way the change effects self.data
chan[:] = psr_utils.rotate(chan, bins[ii])
if padval!='rotate':
# Get padding value
if padval=='mean':
pad = np.mean(chan)
elif padval=='median':
pad = np.median(chan)
else:
pad = padval
# Replace rotated values with padval
if bins[ii]>0:
chan[-bins[ii]:] = pad
elif bins[ii]<0:
chan[:-bins[ii]] = pad
def subband(self, nsub, subdm=None, padval=0):
"""Reduce the number of channels to 'nsub' by subbanding.
The channels within a subband are combined using the
DM 'subdm'. 'padval' is passed to the call to
'Spectra.shift_channels'.
Inputs:
nsub: Number of subbands. Must be a factor of
the number of channels.
subdm: The DM with which to combine channels within
each subband (Default: don't shift channels
within each subband)
padval: The padding value to use when shifting
channels during dedispersion. See documentation
of Spectra.shift_channels. (Default: 0)
Outputs:
None
*** Subbanding happens in-place ***
"""
assert (self.numchans % nsub) == 0
assert (subdm is None) or (subdm >= 0)
nchan_per_sub = self.numchans/nsub
sub_hifreqs = self.freqs[np.arange(nsub)*nchan_per_sub]
sub_lofreqs = self.freqs[(1+np.arange(nsub))*nchan_per_sub-1]
sub_ctrfreqs = 0.5*(sub_hifreqs+sub_lofreqs)
if subdm is not None:
# Compute delays
ref_delays = psr_utils.delay_from_DM(subdm-self.dm, sub_ctrfreqs)
delays = psr_utils.delay_from_DM(subdm-self.dm, self.freqs)
rel_delays = delays-ref_delays.repeat(nchan_per_sub) # Relative delay
rel_bindelays = np.round(rel_delays/self.dt).astype('int')
# Shift channels
self.shift_channels(rel_bindelays, padval)
# Subband
self.data = np.array([np.sum(sub, axis=0) for sub in \
np.vsplit(self.data, nsub)])
self.freqs = sub_ctrfreqs
self.numchans = nsub
def scaled(self, indep=False):
"""Return a scaled version of the Spectra object.
When scaling subtract the median from each channel,
and divide by global std deviation (if indep==False), or
divide by std deviation of each row (if indep==True).
Input:
indep: Boolean. If True, scale each row
independantly (Default: False).
Output:
scaled_spectra: A scaled version of the
Spectra object.
"""
other = copy.deepcopy(self)
if not indep:
std = other.data.std()
for ii in range(other.numchans):
chan = other.get_chan(ii)
median = np.median(chan)
if indep:
std = chan.std()
chan[:] = (chan-median)/std
return other
def scaled2(self, indep=False):
"""Return a scaled version of the Spectra object.
When scaling subtract the min from each channel,
and divide by global max (if indep==False), or
divide by max of each row (if indep==True).
Input:
indep: Boolean. If True, scale each row
independantly (Default: False).
Output:
scaled_spectra: A scaled version of the
Spectra object.
"""
other = copy.deepcopy(self)
if not indep:
max = other.data.max()
for ii in range(other.numchans):
chan = other.get_chan(ii)
min = chan.min()
if indep:
max = chan.max()
chan[:] = (chan-min)/max
return other
def masked(self, mask, maskval='median-mid80'):
"""Replace masked data with 'maskval'. Returns
a masked copy of the Spectra object.
Inputs:
mask: An array of boolean values of the same size and shape
as self.data. True represents an entry to be masked.
maskval: Value to use when masking. This can be a numeric
value, 'median', 'mean', or 'median-mid80'.
The values 'median' and 'mean' refer to the median and
mean of the channel, respectively. The value 'median-mid80'
refers to the median of the channel after the top and bottom
10% of the sorted channel is removed.
(Default: 'median-mid80')
Output:
maskedspec: A masked version of the Spectra object.
"""
assert self.data.shape == mask.shape
maskvals = np.ones(self.numchans)
for ii in range(self.numchans):
chan = self.get_chan(ii)
# Use 'chan[:]' so update happens in-place
if maskval=='mean':
maskvals[ii]=np.mean(chan)
elif maskval=='median':
maskvals[ii]=np.median(chan)
elif maskval=='median-mid80':
n = int(np.round(0.1*self.numspectra))
maskvals[ii]=np.median(sorted(chan)[n:-n])
else:
maskvals[ii]=maskval
if np.all(mask[ii]):
self.data[ii] = np.ones_like(self.data[ii])*(maskvals[:,np.newaxis][ii])
return self
def dedisperse(self, dm=0, padval=0):
"""Shift channels according to the delays predicted by
the given DM.
Inputs:
dm: The DM (in pc/cm^3) to use.
padval: The padding value to use when shifting
channels during dedispersion. See documentation
of Spectra.shift_channels. (Default: 0)
Outputs:
None
*** Dedispersion happens in place ***
"""
assert dm >= 0
ref_delay = psr_utils.delay_from_DM(dm-self.dm, np.max(self.freqs))
delays = psr_utils.delay_from_DM(dm-self.dm, self.freqs)
rel_delays = delays-ref_delay # Relative delay
rel_bindelays = np.round(rel_delays/self.dt).astype('int')
# Shift channels
self.shift_channels(rel_bindelays, padval)
self.dm=dm
def smooth(self, width=1, padval=0):
"""Smooth each channel by convolving with a top hat
of given width. The height of the top had is
chosen shuch that RMS=1 after smoothing.
Overlap values are determined by 'padval'.
Inputs:
width: Number of bins to smooth by (Default: no smoothing)
padval: Padding value to use. Possible values are
float-value, 'mean', 'median', 'wrap'.
(Default: 0).
Ouputs:
None
This bit of code is taken from Scott Ransom's
PRESTO's single_pulse_search.py (line ~ 423).
*** Smoothing is done in place. ***
"""
if width > 1:
kernel = np.ones(width, dtype='float32')/np.sqrt(width)
for ii in range(self.numchans):
chan = self.get_chan(ii)
if padval=='wrap':
tosmooth = np.concatenate([chan[-width:], \
chan, chan[:width]])
elif padval=='mean':
tosmooth = np.ones(self.numspectra+width*2) * \
np.mean(chan)
tosmooth[width:-width] = chan
elif padval=='median':
tosmooth = np.ones(self.numspectra+width*2) * \
np.median(chan)
tosmooth[width:-width] = chan
else: # padval is a float
tosmooth = np.ones(self.numspectra+width*2) * \
padval
tosmooth[width:-width] = chan
smoothed = scipy.signal.convolve(tosmooth, kernel, 'same')
chan[:] = smoothed[width:-width]
def trim(self, bins=0):
"""Trim the end of the data by 'bins' spectra.
Input:
bins: Number of spectra to trim off the end of the observation.
If bins is negative trim spectra off the beginning of the
observation.
Outputs:
None
*** Trimming is irreversible ***
"""
assert bins < self.numspectra
if bins == 0:
return
elif bins > 0:
self.data = self.data[:,:-bins]
self.numspectra = self.numspectra-bins
elif bins < 0:
self.data = self.data[:,bins:]
self.numspectra = self.numspectra-bins
self.starttime = self.starttime+bins*self.dt
def downsample(self, factor=1, trim=True):
"""Downsample (in-place) the spectra by co-adding
'factor' adjacent bins.
Inputs:
factor: Reduce the number of spectra by this
factor. Must be a factor of the number of
spectra if 'trim' is False.
trim: Trim off excess bins.
Ouputs:
None
*** Downsampling is done in place ***
"""
assert trim or not (self.numspectra % factor)
new_num_spectra = self.numspectra/factor
num_to_trim = self.numspectra%factor
self.trim(num_to_trim)
self.data = np.array(np.column_stack([np.sum(subint, axis=1) for \
subint in np.hsplit(self.data,new_num_spectra)]))
self.numspectra = new_num_spectra
self.dt = self.dt*factor