forked from tracek/Ornithokrites
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnoise_subtraction.py
96 lines (80 loc) · 3.29 KB
/
noise_subtraction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# -*- coding: utf-8 -*-
import scipy as sp
import scipy.special as spc
import scipy.signal as sig
import numpy as np
class SpectralSubtraction():
def __init__(self,winsize,window,coefficient=5.0,ratio=1.0):
self._window=window
self._coefficient=coefficient
self._ratio=ratio
def compute(self,signal,noise):
n_spec = sp.fft(noise*self._window)
n_pow = sp.absolute(n_spec)**2.0
return self.compute_by_noise_pow(signal,n_pow)
def compute_by_noise_pow(self,signal,n_pow):
s_spec = sp.fft(signal*self._window)
s_amp = sp.absolute(s_spec)
s_phase = sp.angle(s_spec)
s_amp2 = s_amp**2.0
amp = s_amp2 - n_pow*self._coefficient
# amp = s_amp**2.0 - (1 + np.std(n_pow) / n_pow) * n_pow * 2
amp = sp.maximum(amp, 0.01 * s_amp2)
amp = sp.sqrt(amp)
amp = self._ratio*amp + (1.0-self._ratio)*s_amp
spec = amp * sp.exp(s_phase*1j)
return sp.real(sp.ifft(spec))
class SpectrumReconstruction(object):
def __init__(self,winsize,window,constant=0.001,ratio=1.0,alpha=0.99):
self._window=window
self._G = sp.zeros(winsize,sp.float32)
self._prevGamma = sp.zeros(winsize,sp.float32)
self._alpha = alpha
self._prevAmp = sp.zeros(winsize,sp.float32)
self._ratio=ratio
self._constant=constant
def compute(self,signal,noise):
n_spec = sp.fft(noise*self._window)
n_pow = sp.absolute(n_spec)**2.0
return self.compute_by_noise_pow(signal,n_pow)
def _calc_aposteriori_snr(self,s_amp,n_pow):
return s_amp**2.0/n_pow
def _calc_apriori_snr(self,gamma):
return self._alpha*self._G**2.0 * self._prevGamma + (1.0-self._alpha)*sp.maximum(gamma-1.0, 0.0)#a priori s/n ratio
def _calc_apriori_snr2(self,gamma,n_pow):
return self._alpha*(self._prevAmp**2.0/n_pow) + (1.0-self._alpha)*sp.maximum(gamma-1.0, 0.0)#a priori s/n ratio
def get_frame(signal, winsize, no):
shift=winsize/2
start=no*shift
end = start+winsize
return signal[start:end]
def add_signal(signal, frame, winsize, no ):
shift=winsize/2
start=no*shift
end=start+winsize
signal[start:end] = signal[start:end] + frame
def reduce_noise(signal, noisy_signal, method, winsize=2**10, window=sp.hanning(2**10)):
""" Reduce noise """
if method == 'SS' or method == 0:
method = SpectralSubtraction(winsize, window)
elif method == 'MMSE_STSA' or method == 1:
method = MMSE_STSA(winsize, window)
elif method == 'MMSE_LogSTSA' or method == 2:
method = MMSE_LogSTSA(winsize, window)
elif method == 'JointMap' or method == 3:
method = JointMap(winsize, window)
out = sp.zeros(len(signal),sp.float32)
power = sig.welch(noisy_signal, window=window, return_onesided=False, scaling='spectrum')[1] * window.sum()**2
nf = len(signal)/(winsize/2) - 1
for no in xrange(nf):
s = get_frame(signal, winsize, no)
add_signal(out, method.compute_by_noise_pow(s, power), winsize, no)
return out
def get_noise(signal, rate, intervals):
interval = intervals.popitem()
if interval[1][0] == 0:
start = 0
else:
start = interval[1][0] + 3*rate
end = interval[1][1] - rate
return signal[start:end]