谱分析 (spectral analysis) 的 SciPy 代码解析

Spectral analysis in SciPy

An optional description of the image for screen readers. Credit by Herb

由于个人研究课题的需要,我仔细的研读了 Scipy.signal.spectral源码。此文就是关于此源码的详细解析教程,以方便我未来回溯相关谱分析 (spectral analysis) 的细节,也通过阅读成熟且优美的源代码提高自己的 Python 编程开发能力。内容涉及:stft, istft, csd, welch, coherence, periodogram, spectrogram, check_COLA, check_NOLA, lombscargle

PS: 目前主要总结了 STFT 短时傅里叶变换的源码解析,其他的待续吧~

SciPy

SciPy (pronounced “Sigh Pie”) 是 Python 中非常重要的一个开源的数学、科学和工程计算包。

SciPy (pronounced “Sigh Pie”) is open-source software for mathematics, science, and engineering. It includes modules for statistics, optimization, integration, linear algebra, Fourier transforms, signal and image processing, ODE solvers, and more.


Spectral analysis

在 Signal processing (scipy.signal) 的 API 中,本文将会针对频谱分析 (spectral analysis) 的源码进行详细解读。

APIDescription
periodogram(x[, fs, window, nfft, detrend, …])Estimate power spectral density using a periodogram.
welch(x[, fs, window, nperseg, noverlap, …])Estimate power spectral density using Welch’s method.
csd(x, y[, fs, window, nperseg, noverlap, …])Estimate the cross power spectral density, Pxy, using Welch’s method.
coherence(x, y[, fs, window, nperseg, …])Estimate the magnitude squared coherence estimate, Cxy, of discrete-time signals X and Y using Welch’s method.
spectrogram(x[, fs, window, nperseg, …])Compute a spectrogram with consecutive Fourier transforms.
lombscargle(x, y, freqs)Computes the Lomb-Scargle periodogram.
vectorstrength(events, period)Determine the vector strength of the events corresponding to the given period.
stft(x[, fs, window, nperseg, noverlap, …])Compute the Short Time Fourier Transform (STFT).
istft(Zxx[, fs, window, nperseg, noverlap, …])Perform the inverse Short Time Fourier transform (iSTFT).
check_COLA(window, nperseg, noverlap[, tol])Check whether the Constant OverLap Add (COLA) constraint is met
check_NOLA(window, nperseg, noverlap[, tol])Check whether the Nonzero Overlap Add (NOLA) constraint is met

scipy/signal/aspectral.py

首先要说明的是,我们会避轻就重的研究清楚各个算法是如何实现的,暂时忽略数据结构的识别等无关紧要的细节,直击和理解算法进行的逻辑和执行的动机。

spectrial.py 脚本 (SciPy v1.6.0) 中,共有函数定义 (def) 14 个,其中只有 10 个核心函数是 __all__ 的成员函数。他们之间的引用关系,可见下图:

`scipy/signal/spectral.py`
scipy/signal/spectral.py

其中蓝色框子里定义的是核心函数,白色框子是辅助函数,其他就是模块引用函数。

从图中的依赖关系就可以看出来,_spectral_helper 是最重要的底层辅助函数。下面我们将会以短时傅里叶变换 (stft) 为例,来梳理其是如何实现的。

stft

以一段数据为例,先看一下在 stft 一定参数下的执行效果:

import numpy as np
import matplotlib.pyplot as plt
import scipy.signal as signal

fname = './demo.dat'
data = np.loadtxt(fname)
t, hp, _ = data[:,0], data[:,1], data[:,2]

fs = 1/(t[1]-t[0])  # sampling points 采样率
print(fs, hp.size)
# output
# (16384, 32613)

hp 是我们的目标数据,采样率 fs 是 16384Hz。经过指定参数后的短时傅里叶变换 (Short Time Fourier Transform (STFT)) 后,可以得到如下结果:

freqs, time, Zxx = signal.spectral.stft(hp, fs=fs, 
                               nfft=fs//4, nperseg=fs//4, noverlap=fs//8,)
print(freqs.size, time.size, Zxx.shape)
# output
# 2049 17 (2049, 17)

根据函数 signal.spectral.stft 输出的频率分辨率 (freqs)、时间分辨率 (time) 和能谱矩阵 (Zxx) 结果,可以绘制该段数据的时间-频率图像。

plt.pcolormesh(time, freqs, np.abs(Zxx), #shading='gouraud' #开启这个参数就会图像插值,显示会更光滑
              )
plt.title('STFT Magnitude')
plt.ylabel('Frequency [Hz]')
plt.xlabel('Time [sec]')
plt.ylim(fmin, fmax)
plt.colorbar()
plt.tight_layout()
plt.show()

那么这个图像究竟是如何实现的呢?

我们将函数底层调用会用的到默认关键词参数,以及会引用到工具函数总结如下:

### stft ####################################################################
def stft(x, fs=1.0, window='hann', nperseg=256, noverlap=None, nfft=None,
         detrend=False, return_onesided=True, boundary='zeros', padded=True,
         axis=-1):
    freqs, time, Zxx = _spectral_helper(x, x, fs, window, nperseg, noverlap,
                                        nfft, detrend, return_onesided,
                                        scaling='spectrum', axis=axis,
                                        mode='stft', boundary=boundary,
                                        padded=padded)
    return freqs, time, Zxx
#############################################################################
def _spectral_helper(x, y, fs=1.0, window='hann', nperseg=None, noverlap=None,
                     nfft=None, detrend='constant', return_onesided=True,
                     scaling='density', axis=-1, mode='psd', boundary=None,
                     padded=False):
    #... (暂略)
    pass
  	return freqs, time, result

由此可见,stft 对数据 x 的处理,等价于对底层辅助函数 _spectral_helper 的输入参数 xy 取一致即可。下面,我们就开始对 _spectral_helper 函数探究其实如何给出输出结果 freqstimeZxx的。

_spectral_helper

x = hp.copy()
print('x.size = {}, fs = {}'.format(x.size, fs))
# output
# x.size = 32613, fs = 16384
  • Preparing

    _spectral_helper 的内部,前期是很多关于配置和调用的准备工作。在形式上是:

    # Preparing
    mode = 'stft' # or psd
    same_data = True
    outdtype = np.result_type(x, np.complex64) # 明确数据精度
    axis = -1
    window='hann'
    boundary='zeros'
    padded = True  # not used
    detrend = False  # not used
    scaling='spectrum'  # or density
    return_onesided, sides=True, 'onesided'
    
    from scipy.signal._arraytools import const_ext, even_ext, odd_ext, zero_ext
    boundary_funcs = {'even': even_ext,
                      'odd': odd_ext,
                      'constant': const_ext,
                      'zeros': zero_ext,
                       None: None}
    

    boundary_funcs 是考虑对数据 x 将会以什么样的形式来延拓其边界。后面很快会再讨论到这一点。

  • Finetune

    下面的参数是比较重要的可调参数:

    ## Finetune ##  
    
    # parse window; nperseg = win.shape
    nperseg = nperseg=fs//4
    nperseg = int(nperseg)
    print('nperseg = {}'.format(nperseg))
    
    # parse window; if array like, then set nperseg = win.shape
    win, nperseg = signal.spectral._triage_segments(window, nperseg, input_length=x.shape[-1])
    print('nperseg = {}, win.size = {}'.format(nperseg, win.size))
    # win have the same dtype with outdtype
    if np.result_type(win, np.complex64) != outdtype:
        win = win.astype(outdtype)
    
    # nfft must be greater than or equal to nperseg.
    nfft = int(fs//4)
    print('nfft = {}'.format(nfft))
    
    # noverlap must be less than nperseg.
    noverlap = int(fs//8)
    print('noverlap = {}'.format(noverlap))
    
    nstep = nperseg - noverlap
    print('nstep = {}'.format(nstep))
    
    ## output ##
    # nperseg = 4096
    # nperseg = 4096, win.size = 4096
    # nfft = 4096
    # noverlap = 2048
    # nstep = 2048
    
    • nperseg 表示的是要解析的短时窗口序列的长度 (number of per segment)。
    • nfft 表示的是对每个短时窗口序列进行傅里叶变换 fft 时,所需要的长度 (num of fft)。也就是 nfft 点的离散傅里叶变换 (nfft-point discrete Fourier Transform (DFT))。所以,要求 nfft $\ge$ nperseg
    • noverlap 表示的是每个相邻短时窗口序列之间的重叠长度 (num of overlap)。一般来说通常取定为 nperseg//2,即 50% 的窗口重叠。
    • nstep 是间接计算出来的参数 (num of step),为了之后计算窗口滑动数目带来便利。显然有 nperseg = nstep + noverlap
    • 至于 win 就是每个短时窗口序列所对应的窗函数序列啦。根据指定的窗函数 windownperseg 和输入数据 x 的长度来判断和给出与 nperseg 相对应长度一直窗函数序列。其是通过辅助函数 _triage_segments 来实现的,比较简单,详情可以瞅一眼附录的 _triage_segments 即可。
  • Boundary extension + padding

    正式计算之前,还需要对我们的完整数据 x 进行边界延拓和补零的操作。下面的注释中也给出了这样操作偶的理由。

    # Padding occurs after boundary extension, so that the extended signal ends
    # in zeros, instead of introducing an impulse at the end.
    # I.e. if x = [..., 3, 2]
    # extend then pad -> [..., 3, 2, 2, 3, 0, 0, 0]
    # pad then extend -> [..., 3, 2, 0, 0, 0, 2, 3]
    
    print('x.size = {}'.format(x.size))
    
    # boundary extension
    x = boundary_funcs[boundary](x, nperseg//2, axis=-1)
    print('x.size = {} | {}'.format(x.size, hp.size + nperseg ))
    
    # Pad to integer number of windowed segments
    # I.e make x.shape[-1] = nperseg + (nseg-1)*nstep, with integer nseg
    nadd = (-(x.shape[-1]-nperseg) % nstep) % nperseg
    zeros_shape = list(x.shape[:-1]) + [nadd]
    x = np.concatenate((x, np.zeros(zeros_shape)), axis=-1)
    print('x.size = {} | {}'.format(x.size, x.size % nperseg ))
    
    ## output ##
    # x.size = 32613
    # x.size = 36709 | 36709
    # x.size = 36864 | 0
    
    • 先进行边界延拓,然后再补零。
    • 在默认参数boundary='zeros'下,边界延拓事实上是调用了外部函数 zero_ext 来实现的。延拓效果是在数据 x 的基础上,向各两侧分别补零 nperseg//2 长度。其他的延拓方法详情可以查看附录部分 (const_ext, even_ext, odd_ext, zero_ext)
    • 补零的目标是为了让最终的数据 x 长度可以被 nstep 整除,详情和细节可以看下面我自己绘制的示意图。其中,核心是利用Python 的负整除来计算出变量 add,另外,给予我的理解 % nperseg 这一步其实没有作用,因为被整除的部分总是一个小于 nperseg 的正整数。
`spectral_helper`: boundary extension + padding
spectral_helper: boundary extension + padding

到此就算是完成了所有关于数据和参数的准备工作,下一步就是关键了。

接下来就是对每一个 segment 进行傅里叶变换!

_fft_helper

首先要说的是,在短时傅里叶变换 stfd 中,默认是不考虑趋势去除 detrending 的。去掉趋势是振动信号的一种常见预处理方法,其可以有效降低环境干扰所带来的的不稳定性。对于我们这个"很短"数据段的时频图来说,一般不考虑。

Scipy 中定义了一个 _fft_helper 函数来一步到位计算给定数据下,各个加窗的短时数据序列的频谱信息。

# Perform the windowed FFTs
result = _fft_helper(x, win, detrend_func, nperseg, noverlap, nfft, sides)

def _fft_helper(x, win, detrend_func, nperseg, noverlap, nfft, sides):
  	pass # ... (暂略)
		return result

Scipy 中原代码的实现其实非常 pythonic 风格的方法,那就是将数据 x 映射为一个数据矩阵,每行为每一个待处理的数据 segment,每列都是相同的 nperseg

  • Way 1

    # Created strided array of data segments
    if nperseg == 1 and noverlap == 0:
        result = x[..., np.newaxis]
    else:
        # https://stackoverflow.com/a/5568169
        step = nperseg - noverlap
        shape = x.shape[:-1]+((x.shape[-1]-noverlap)//step, nperseg)
        strides = x.strides[:-1]+(step*x.strides[-1], x.strides[-1])
        result = np.lib.stride_tricks.as_strided(x, shape=shape,
                                                 strides=strides)
    

    这个方法看不太懂,也不太好理解,似乎是某种记录数值位的方式实现的。可以肯定的是效率很高。下面给个例子,感受下一般:

    ## Example
    >>> y = np.arange(1, 30+1, 1)
    >>> print(y)
    >>> shape = y.shape[:-1]+((y.shape[-1]-2)//5, 6)
    >>> print('shape = {} | y.shape[:-1] = {}'.format(shape, x.shape[:-1]))
    >>> strides = y.strides[:-1]+(5*y.strides[-1], y.strides[-1])
    >>> print('strides = {} | y.strides[:-1] = {}'.format(strides, y.strides[:-1], ))
    >>> np.lib.stride_tricks.as_strided(y, shape=shape,
                                             strides=strides)
    
    [ 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
     25 26 27 28 29 30]
    shape = (5, 6) | y.shape[:-1] = ()
    strides = (40, 8) | y.strides[:-1] = ()
    
    array([[ 1,  2,  3,  4,  5,  6],
           [ 6,  7,  8,  9, 10, 11],
           [11, 12, 13, 14, 15, 16],
           [16, 17, 18, 19, 20, 21],
           [21, 22, 23, 24, 25, 26]])
    
  • Way 2

    用我自己熟悉且容易理解的方式重新写这一部分:

    Slices = [ (i*nstep , i*nstep+nperseg) for i in range(x.size) if i*nstep+nperseg <= x.size]
    result = np.asarray([x[s[0]:s[1]] for s in Slices])
    

    可以看到,我无非是记录下每一个 segment 的前后 index,然后切片罢了。

  • 加窗+傅里叶变换

    当你的时频矩阵 result 已经准备好了以后,接下来就是充分利用 python 中的 “广播机制”,对每一行加窗,并且做 “nfft"-点实数域的离散傅里叶变换即可。

    # Detrend each data segment individually
    result = detrend_func(result) # 对 stft 来说,这就是个恒等映射。
    
    # Apply window by multiplication
    result = win * result
    
    # Perform the fft.
    # 1-D *nfft*-point discrete Fourier Transform (DFT) of a real-valued array 
    result = scipy.fft.rfft(result.real, n=nfft) 
    print('result.shape = {} | nfft.shape = {}'.format(result.shape, nfft))
    ## outut ##
    # result.shape = (17, 2049) | nfft.shape = 4096
    
    • 得到的时频矩阵 result 显然行数是时间分辨率,纵轴对应的是频域分辨率,它取决于参数 nfft,间接的也却取决于 nperseg

    • 上述是基于单边 “one sided” 的情况,若对应于 “two sided”,则要改用 scipy.fft.fft 函数。

    • _fft_helper 到此开始输出 result

在最后的输出之前,_spectral_helper 还要做三件事:

  1. 调整时频矩阵 result 的数值单位。可以通过参数 scaling 来调节:

    if scaling == 'density':
        scale = 1.0 / (fs * (win*win).sum())
    elif scaling == 'spectrum':  # stft 默认用频谱 spectrum
        scale = 1.0 / win.sum()**2
    else:
        raise ValueError('Unknown scaling: %r' % scaling)
    
    if mode == 'stft':
        scale = np.sqrt(scale)
    
    result *= scale 
    
    • density: Select Power the loss of power is compensated. The ratio of the sum of the squared data before and after applying the window is used as a normalization factor. The total power within the spectrum therefore always corresponds to the power of the data before applying the window.

    • spectrum: Selecting Amplitude normalizes to the gain of the used window function, i.e. the sum of all values is divided by their number. This compensates the damping of the amplitudes caused by applying the window. This is especially useful to measure peaks within the spectrum.

    • 这里有一个非常棒的材料讲各类 STFT 的频谱单位:

    • 其他情况 (onsesided, psd, same_data)下,对时频矩阵的处理:

      if sides == 'onesided' and mode == 'psd': # not used for stft
      		if nfft % 2:
          		result[..., 1:] *= 2
          else:
            	# Last point is unpaired Nyquist freq point, don't double
            	result[..., 1:-1] *= 2
      
      # All imaginary parts are zero anyways
      if same_data and mode != 'stft':
      		result = result.real
      
  2. 输出时间分辨率序列。

    time = np.arange(nperseg/2, x.shape[-1] - nperseg/2 + 1, 
                     nperseg - noverlap)/float(fs)
    if boundary is not None:
        time -= (nperseg/2) / fs                    
    

    仔细观察就可以知道,这对应的是每一个 segment 的中心点处的时刻。

  3. 输出频率分辨率序列。

    if sides == 'twosided':
        freqs = scipy.fft.fftfreq(nfft, 1/fs)
    elif sides == 'onesided':
        freqs = scipy.fft.rfftfreq(nfft, 1/fs) # stft
    

    简而言之,就是给定离散傅里叶变换究竟是几点的,并且要说明采样率信息(可知 segment 对应的时长信息),即可给出在每个 segment 上傅里叶变换后有着怎样的频率分量。

    我也可以手撸一把,给出对应于 scipy.fft.rfftfreq 更加直接的含义:

    freqs = np.arange(0, 1/2+1/nfft, 1/nfft)*fs # when `nfft` is even
    
    • np.arange 部分是目标给出从0 到 1/2 之间,以 1/nfft 作为分辨率间隔的等差序列。
    • *fs 表示的是要在 0 到 Nyquist frequency 范围内的频率分辨率。

stft 已讲解完)

csd

Estimate the cross power spectral density (csd), Pxy, using Welch’s method.

在上述介绍的 _spectral_helper 的基础上,就可以实现互功率谱密度(cross power spectral density),两个频域函数之间的功率谱密度。其实部为共谱密度(简称“共谱”),虚部为正交谱密度。

由于随机信号是时域无限信号,不具备可积分条件,因此不能直接进行傅里叶变换。又因为随机信号的频率、幅值、相位都是随机的, 因此从理论上讲,一般不作幅值谱和相位谱分析,而是用具有统计特性的功率谱密度 (power spectral density) 来作谱分析。

  • 自功率谱密度函数 (Auto-power spectral density function)

    • 平稳随机过程的功率谱密度与自相关函数是一傅里叶变换偶对 (fourier transform dual pair) $$ \begin{array}{l} S_{x}(\omega)=\int_{-\infty}^{\infty} R_{x}(\tau) e^{-j \omega \tau} d \tau \ R_{x}(\tau)=\int_{-\infty}^{\infty} S_{x}(\omega) e^{j \omega \tau} d \omega \end{array} $$ $x(t)$ 是零均值的随机信号,且 $x(t)$ 无周期性分量,其自相关函数 $R_x(\tau\rightarrow\infty)=0$,自相关函数满足傅里叶变换条件 $\int_{-\infty}^{\infty}\left|R_{x}(\tau)\right| d \tau<0$。

    • 性质:实偶函数+双边谱;单边谱 $G_x(\omega)$ (非负频率上的谱) $$ G_{x}(\omega) =2 S_{x}(\omega) =2 \int_{-\infty}^{\infty} R_{x}(\tau) e^{-j \omega \tau} d \tau \quad(\omega>0) $$

    • 物理意义:信号的能量在不同频率成分上的分布。

    • 自功率谱密度函数与幅值谱 (amplitude spectrum) 的关系: $$ S_{x}(f)=\lim _{T \rightarrow \infty} \frac{1}{2 T}|X(f)|^{2} $$

  • 互功率谱密度函数 (cross-power spectral density function)

    • 定义:(互相关函数满足傅里叶变换条件 $\int_{-\infty}^{\infty}\left|R_{x y}(\tau)\right| d \tau<\infty$)

    $$ \begin{array}{l} S_{x y}(\omega)=\int_{-\infty}^{\infty} R_{x y}(\tau) e^{-j \omega \tau} d \tau \ R_{x y}(\tau)=\frac{1}{2 \pi} \int_{-\infty}^{\infty} S_{x y}(\omega) e^{j \omega \tau} d \omega \end{array} $$

    • 单边互谱密度函数 (one-sided cross-power spectrum) $$ G_{x y}(\omega)=2 \int_{-\infty}^{\infty} R_{x y}(\tau) e^{-j \omega \tau} d \tau \quad(0<\omega<\infty) $$
  • 谱相干函数 (spectral coherence function):

    • 测评输入、输出信号间的因果性,即输出信号的功率谱中有多少是所测试输入量引起的响应。

    $$ \gamma_{x y}^{2}(\omega)=\frac{\left|G_{x y}(\omega)\right|^{2}}{G_{x}(\omega) G_{y}(\omega)} $$

    • $\gamma_{x y}^{2}(\omega)=1$ $y(t)$ 和 $x(t)$ 完全相关
    • $\gamma_{x y}^{2}(\omega)=0$ $y(t)$ 和 $x(t)$ 完全无关
    • $1>\gamma_{x y}^{2}(\omega)>0$ $y(t)$ 和 $x(t)$ 部分相关
      • 测试中有外界干扰
      • 输出 $y(t)$ 是输入 $x(t)$ 和其他输入的综合输出
      • 联系 $x(t)$ 和 $y(t)$ 的系统是非线性的
  • 频率响应函数 (frequenct response function) $$ H(\omega)=\frac{G_{x y}(\omega)}{G_{x}(\omega)} $$

来个例子看看 csd 的实际效果是怎样的:

>>> from scipy import signal
>>> import matplotlib.pyplot as plt
# Generate two test signals with some common features.
>>> fs = 10e3
>>> N = 1e5
>>> amp = 20
>>> freq = 1234.0
>>> noise_power = 0.001 * fs / 2
>>> time = np.arange(N) / fs
>>> b, a = signal.butter(2, 0.25, 'low')
>>> x = np.random.normal(scale=np.sqrt(noise_power), size=time.shape)
>>> y = signal.lfilter(b, a, x)
>>> x += amp*np.sin(2*np.pi*freq*time)
>>> y += np.random.normal(scale=0.1*np.sqrt(noise_power), size=time.shape)
# Compute and plot the magnitude of the cross spectral density.
>>> f, Pxy = signal.csd(x, y, fs, nperseg=1024)
>>> f, Pxx = signal.csd(x, x, fs, nperseg=1024)
>>> f, Pyy = signal.csd(y, y, fs, nperseg=1024)
>>> plt.semilogy(f, np.abs(Pxy), label = 'Pxy')
>>> plt.semilogy(f, np.abs(Pxx), label = 'Pxx')
>>> plt.semilogy(f, np.abs(Pyy), label = 'Pyy')
>>> plt.xlabel('frequency [Hz]')
>>> plt.ylabel('CSD [V**2/Hz]')
>>> plt.legend()
>>> plt.show()

其源码的实现:

def csd(x, y, fs=1.0, window='hann', nperseg=None, noverlap=None, nfft=None,
        detrend='constant', return_onesided=True, scaling='density',
        axis=-1, average='mean'):
    """Estimate the cross power spectral density, Pxy, using Welch's method."""
    freqs, _, Pxy = _spectral_helper(x, y, fs, window, nperseg, noverlap, nfft,
                                     detrend, return_onesided, scaling, axis,
                                     mode='psd')

    # Average over windows.
    if len(Pxy.shape) >= 2 and Pxy.size > 0:
        if Pxy.shape[-1] > 1:
            if average == 'median':
                Pxy = np.median(Pxy, axis=-1) / _median_bias(Pxy.shape[-1])
            elif average == 'mean':
                Pxy = Pxy.mean(axis=-1)
            else:
                raise ValueError('average must be "median" or "mean", got %s'
                                 % (average,))
        else:
            Pxy = np.reshape(Pxy, Pxy.shape[:-1])

    return freqs, Pxy

_median_bias

def _median_bias(n):
    """
    Returns the bias of the median of a set of periodograms relative to
    the mean.
    """
    ii_2 = 2 * np.arange(1., (n-1) // 2 + 1)
    return 1 + np.sum(1. / (ii_2 + 1) - 1. / ii_2)

welch (PSD)

Estimate power spectral density using Welch’s method.

Welch’s method 1 computes an estimate of the power spectral density (PSD) by dividing the data into overlapping segments, computing a modified periodogram for each segment and averaging the periodograms.

源码是很简单直接的:

def welch(x, fs=1.0, window='hann', nperseg=None, noverlap=None, nfft=None,
          detrend='constant', return_onesided=True, scaling='density',
          axis=-1, average='mean'):
    """Estimate power spectral density using Welch's method."""
    freqs, Pxx = csd(x, x, fs=fs, window=window, nperseg=nperseg,
                     noverlap=noverlap, nfft=nfft, detrend=detrend,
                     return_onesided=return_onesided, scaling=scaling,
                     axis=axis, average=average)

    return freqs, Pxx.real

这就是所谓的 Welch 方法下的 PSD。看个例子,体会一般:

>>> from scipy import signal
>>> import matplotlib.pyplot as plt
>>> np.random.seed(1234)
# Generate a test signal, a 2 Vrms sine wave at 1234 Hz, corrupted by
# 0.001 V**2/Hz of white noise sampled at 10 kHz.
>>> fs = 10e3
>>> N = 1e5
>>> amp = 2*np.sqrt(2)
>>> freq = 1234.0
>>> noise_power = 0.001 * fs / 2
>>> time = np.arange(N) / fs
>>> x = amp*np.sin(2*np.pi*freq*time)
>>> x += np.random.normal(scale=np.sqrt(noise_power), size=time.shape)

现在来计算并看看这个数据的 PSD,以及噪声的功率密度:

# Compute and plot the power spectral density.
>>> f, Pxx_den = signal.welch(x, fs, nperseg=1024)
>>> plt.semilogy(f, Pxx_den)
>>> plt.ylim([0.5e-3, 1])
>>> plt.xlabel('frequency [Hz]')
>>> plt.ylabel('PSD [V**2/Hz]')
>>> plt.show()
# If we average the last half of the spectral density, to exclude the
# peak, we can recover the noise power on the signal.
>>> np.mean(Pxx_den[256:])
# 0.0009924865443739191

上面的例子中是默认参数 scaling='density',下面试一下 scaling='spectrum' 我们可以观察到 amplitude

# Now compute and plot the power spectrum.
>>> f, Pxx_spec = signal.welch(x, fs, 'flattop', 1024, scaling='spectrum')
>>> plt.figure()
>>> plt.semilogy(f, np.sqrt(Pxx_spec))
>>> plt.xlabel('frequency [Hz]')
>>> plt.ylabel('Linear spectrum [V RMS]')
>>> plt.show()
# The peak height in the power spectrum is an estimate of the RMS
# amplitude.
>>> np.sqrt(Pxx_spec.max())
# 2.0077340678640727

# If we now introduce a discontinuity in the signal, by increasing the
# amplitude of a small portion of the signal by 50, we can see the
# corruption of the mean average power spectral density, but using a
# median average better estimates the normal behaviour.
>>> x[int(N//2):int(N//2)+10] *= 50.
>>> f, Pxx_den = signal.welch(x, fs, nperseg=1024)
>>> f_med, Pxx_den_med = signal.welch(x, fs, nperseg=1024, average='median')
>>> plt.semilogy(f, Pxx_den, label='mean')
>>> plt.semilogy(f_med, Pxx_den_med, label='median')
>>> plt.ylim([0.5e-3, 1])
>>> plt.xlabel('frequency [Hz]')
>>> plt.ylabel('PSD [V**2/Hz]')
>>> plt.legend()
>>> plt.show()

coherence

Estimate the magnitude squared coherence estimate, Cxy, of discrete-time signals X and Y using Welch’s method.

Cxy = abs(Pxy)**2/(Pxx*Pyy), where Pxx and Pyy are power spectral density estimates of X and Y, and Pxy is the cross spectral density estimate of X and Y.

源码:

def coherence(x, y, fs=1.0, window='hann', nperseg=None, noverlap=None,
              nfft=None, detrend='constant', axis=-1):
    """
    Estimate the magnitude squared coherence estimate, Cxy, of
    discrete-time signals X and Y using Welch's method.
		"""
    freqs, Pxx = welch(x, fs=fs, window=window, nperseg=nperseg,
                       noverlap=noverlap, nfft=nfft, detrend=detrend,
                       axis=axis)
    _, Pyy = welch(y, fs=fs, window=window, nperseg=nperseg, noverlap=noverlap,
                   nfft=nfft, detrend=detrend, axis=axis)
    _, Pxy = csd(x, y, fs=fs, window=window, nperseg=nperseg,
                 noverlap=noverlap, nfft=nfft, detrend=detrend, axis=axis)

    Cxy = np.abs(Pxy)**2 / Pxx / Pyy

    return freqs, Cxy

例子:

>>> from scipy import signal
>>> import matplotlib.pyplot as plt
# Generate two test signals with some common features.
>>> fs = 10e3
>>> N = 1e5
>>> amp = 20
>>> freq = 1234.0
>>> noise_power = 0.001 * fs / 2
>>> time = np.arange(N) / fs
>>> b, a = signal.butter(2, 0.25, 'low')
>>> x = np.random.normal(scale=np.sqrt(noise_power), size=time.shape)
>>> y = signal.lfilter(b, a, x)
>>> x += amp*np.sin(2*np.pi*freq*time)
>>> y += np.random.normal(scale=0.1*np.sqrt(noise_power), size=time.shape)
# Compute and plot the coherence.
>>> f, Cxy = signal.coherence(x, y, fs, nperseg=1024)
>>> f, Pxx = signal.welch(x, fs, nperseg=1024)
>>> f, Pyy = signal.welch(y, fs, nperseg=1024)
>>> lnxy = plt.semilogy(f, Cxy, color = 'r', label = 'Cxy')
>>> plt.ylabel('Coherence')
>>> plt.twinx()
>>> lnx = plt.semilogy(f, Pxx, color = 'g', label = 'Pxx')
>>> lny = plt.semilogy(f, Pyy, color = 'b', label = 'Pyy')
>>> plt.ylabel('PSD [V**2/Hz]')
>>> plt.xlabel('frequency [Hz]')
>>> lns = lnxy+lnx+lny
>>> labs = [l.get_label() for l in lns]
>>> plt.legend(lns, labs)
>>> plt.show()

periodogram (PSD)

Estimate power spectral density using a periodogram.

下面是用 periodogram 计算 PSD 的源码:

def periodogram(x, fs=1.0, window='boxcar', nfft=None, detrend='constant',
                return_onesided=True, scaling='density', axis=-1):
    """Estimate power spectral density using a periodogram."""
    x = np.asarray(x)

    if x.size == 0:
        return np.empty(x.shape), np.empty(x.shape)

    if window is None:
        window = 'boxcar'

    if nfft is None:
        nperseg = x.shape[axis]
    elif nfft == x.shape[axis]:
        nperseg = nfft
    elif nfft > x.shape[axis]:
        nperseg = x.shape[axis]
    elif nfft < x.shape[axis]:
        s = [np.s_[:]]*len(x.shape)
        s[axis] = np.s_[:nfft]
        x = x[tuple(s)]
        nperseg = nfft
        nfft = None

    return welch(x, fs=fs, window=window, nperseg=nperseg, noverlap=0,
                 nfft=nfft, detrend=detrend, return_onesided=return_onesided,
                 scaling=scaling, axis=axis)

例子:

>>> from scipy import signal
>>> import matplotlib.pyplot as plt
>>> np.random.seed(1234)
# Generate a test signal, a 2 Vrms sine wave at 1234 Hz, corrupted by
# 0.001 V**2/Hz of white noise sampled at 10 kHz.
>>> fs = 10e3
>>> N = 1e5
>>> amp = 2*np.sqrt(2)
>>> freq = 1234.0
>>> noise_power = 0.001 * fs / 2
>>> time = np.arange(N) / fs
>>> x = amp*np.sin(2*np.pi*freq*time)
>>> x += np.random.normal(scale=np.sqrt(noise_power), size=time.shape)
# Compute and plot the power spectral density.
>>> f, Pxx_den = signal.periodogram(x, fs)
>>> plt.semilogy(f, Pxx_den)
>>> plt.ylim([1e-7, 1e2])
>>> plt.xlabel('frequency [Hz]')
>>> plt.ylabel('PSD [V**2/Hz]')
>>> plt.show()
# If we average the last half of the spectral density, to exclude the
# peak, we can recover the noise power on the signal.
>>> np.mean(Pxx_den[25000:])
# 0.00099728892368242854

# Now compute and plot the power spectrum.
>>> f, Pxx_spec = signal.periodogram(x, fs, 'flattop', scaling='spectrum')
>>> plt.figure()
>>> plt.semilogy(f, np.sqrt(Pxx_spec))
>>> plt.ylim([1e-4, 1e1])
>>> plt.xlabel('frequency [Hz]')
>>> plt.ylabel('Linear spectrum [V RMS]')
>>> plt.show()
# The peak height in the power spectrum is an estimate of the RMS
# amplitude.
>>> np.sqrt(Pxx_spec.max())
# 2.0077340678640727

spectrogram

Compute a spectrogram with consecutive Fourier transforms.

Spectrograms can be used as a way of visualizing the change of a nonstationary signal’s frequency content over time.

源码:

def spectrogram(x, fs=1.0, window=('tukey', .25), nperseg=None, noverlap=None,
                nfft=None, detrend='constant', return_onesided=True,
                scaling='density', axis=-1, mode='psd'):
	  """Compute a spectrogram with consecutive Fourier transforms."""
    
    modelist = ['psd', 'complex', 'magnitude', 'angle', 'phase']
    if mode not in modelist:
        raise ValueError('unknown value for mode {}, must be one of {}'
                         .format(mode, modelist))

    # need to set default for nperseg before setting default for noverlap below
    window, nperseg = _triage_segments(window, nperseg,
                                       input_length=x.shape[axis])

    # Less overlap than welch, so samples are more statisically independent
    if noverlap is None:
        noverlap = nperseg // 8

    if mode == 'psd':
        freqs, time, Sxx = _spectral_helper(x, x, fs, window, nperseg,
                                            noverlap, nfft, detrend,
                                            return_onesided, scaling, axis,
                                            mode='psd')

    else:
        freqs, time, Sxx = _spectral_helper(x, x, fs, window, nperseg,
                                            noverlap, nfft, detrend,
                                            return_onesided, scaling, axis,
                                            mode='stft')

        if mode == 'magnitude':
            Sxx = np.abs(Sxx)
        elif mode in ['angle', 'phase']:
            Sxx = np.angle(Sxx)
            if mode == 'phase':
                # Sxx has one additional dimension for time strides
                if axis < 0:
                    axis -= 1
                Sxx = np.unwrap(Sxx, axis=axis)

        # mode =='complex' is same as `stft`, doesn't need modification

    return freqs, time, Sxx

例子:

>>> from scipy import signal
>>> from scipy.fft import fftshift
>>> import matplotlib.pyplot as plt
# Generate a test signal, a 2 Vrms sine wave whose frequency is slowly
# modulated around 3kHz, corrupted by white noise of exponentially
# decreasing magnitude sampled at 10 kHz.
>>> fs = 10e3
>>> N = 1e5
>>> amp = 2 * np.sqrt(2)
>>> noise_power = 0.01 * fs / 2
>>> time = np.arange(N) / float(fs)
>>> mod = 500*np.cos(2*np.pi*0.25*time)
>>> carrier = amp * np.sin(2*np.pi*3e3*time + mod)
>>> noise = np.random.normal(scale=np.sqrt(noise_power), size=time.shape)
>>> noise *= np.exp(-time/5)
>>> x = carrier + noise
# Compute and plot the spectrogram.
>>> f, t, Sxx = signal.spectrogram(x, fs)
>>> plt.pcolormesh(t, f, Sxx, shading='gouraud')
>>> plt.ylabel('Frequency [Hz]')
>>> plt.xlabel('Time [sec]')
>>> plt.show()

# Note, if using output that is not one sided, then use the following:
>>> f, t, Sxx = signal.spectrogram(x, fs, return_onesided=False)
>>> plt.pcolormesh(t, fftshift(f), fftshift(Sxx, axes=0), shading='gouraud')
>>> plt.ylabel('Frequency [Hz]')
>>> plt.xlabel('Time [sec]')
>>> plt.show()

istft

Perform the inverse Short Time Fourier transform (iSTFT).

这部分源码比较长,先直接看 stft 的例子,然后是 istft 的例子。

>>> from scipy import signal
>>> import matplotlib.pyplot as plt
# Generate a test signal, a 2 Vrms sine wave at 50Hz corrupted by
# 0.001 V**2/Hz of white noise sampled at 1024 Hz.
>>> fs = 1024
>>> N = 10*fs
>>> nperseg = 512
>>> amp = 2 * np.sqrt(2)
>>> noise_power = 0.001 * fs / 2
>>> time = np.arange(N) / float(fs)
>>> carrier = amp * np.sin(2*np.pi*50*time)
>>> noise = np.random.normal(scale=np.sqrt(noise_power),
...                          size=time.shape)
>>> x = carrier + noise
# Compute the STFT, and plot its magnitude
>>> f, t, Zxx = signal.stft(x, fs=fs, nperseg=nperseg)
>>> plt.figure()
>>> plt.pcolormesh(t, f, np.abs(Zxx), vmin=0, vmax=amp, shading='gouraud')
>>> plt.ylim([f[1], f[-1]])
>>> plt.title('STFT Magnitude')
>>> plt.ylabel('Frequency [Hz]')
>>> plt.xlabel('Time [sec]')
>>> plt.yscale('log')
>>> plt.show()

# Zero the components that are 10% or less of the carrier magnitude,
# then convert back to a time series via inverse STFT
>>> Zxx = np.where(np.abs(Zxx) >= amp/10, Zxx, 0)
>>> _, xrec = signal.istft(Zxx, fs)
# Compare the cleaned signal with the original and true carrier signals.
>>> plt.figure()
>>> plt.plot(time, x, time, xrec, time, carrier)
>>> plt.xlim([2, 2.1])
>>> plt.xlabel('Time [sec]')
>>> plt.ylabel('Signal')
>>> plt.legend(['Carrier + Noise', 'Filtered via STFT', 'True Carrier'])
>>> plt.show()

# Note that the cleaned signal does not start as abruptly as the original,
# since some of the coefficients of the transient were also removed:
>>> plt.figure()
>>> plt.plot(time, x, time, xrec, time, carrier)
>>> plt.xlim([0, 0.1])
>>> plt.xlabel('Time [sec]')
>>> plt.ylabel('Signal')
>>> plt.legend(['Carrier + Noise', 'Filtered via STFT', 'True Carrier'])
>>> plt.show()

check_COLA

Check whether the Constant OverLap Add (COLA) constraint is met

check_NOLA

Check whether the Nonzero Overlap Add (NOLA) constraint is met

lombscargle

Computes the Lomb-Scargle periodogram.

The Lomb-Scargle periodogram was developed by Lomb 2 and further extended by Scargle 3 to find, and test the significance of weak periodic signals with uneven temporal sampling.

When normalize is False (default) the computed periodogram is unnormalized, it takes the value (A**2) * N/4 for a harmonic signal with amplitude A for sufficiently large N.

When normalize is True the computed periodogram is normalized by the residuals of the data around a constant reference model (at zero).

Input arrays should be 1-D and will be cast to float64.

>>> import matplotlib.pyplot as plt
# First define some input parameters for the signal:
>>> A = 2.
>>> w = 1.
>>> phi = 0.5 * np.pi
>>> nin = 1000
>>> nout = 100000
>>> frac_points = 0.9 # Fraction of points to select
# Randomly select a fraction of an array with timesteps:
>>> r = np.random.rand(nin)
>>> x = np.linspace(0.01, 10*np.pi, nin)
>>> x = x[r >= frac_points]
# Plot a sine wave for the selected times:
>>> y = A * np.sin(w*x+phi)
# Define the array of frequencies for which to compute the periodogram:
>>> f = np.linspace(0.01, 10, nout)
# Calculate Lomb-Scargle periodogram:
>>> import scipy.signal as signal
>>> pgram = signal.lombscargle(x, y, f, normalize=True)
# Now make a plot of the input data:
>>> plt.subplot(2, 1, 1)
>>> plt.plot(x, y, 'b+')
# Then plot the normalized periodogram:
>>> plt.subplot(2, 1, 2)
>>> plt.plot(f, pgram)
>>> plt.show()


Appendix

_triage_segments

经过简化的后,可以通过如下的代码来了解频谱分析中是如何获取合适的窗函数的。在本文中,此函数会被引用自:

from scipy.signal.windows import get_window
def _triage_segments(window, nperseg, input_length):
    """
    Parses window and nperseg arguments for spectrogram and _spectral_helper.
    This is a helper function, not meant to be called externally.
		"""
    # parse window; if array like, then set nperseg = win.shape
    if isinstance(window, str) or isinstance(window, tuple):
        # if nperseg not specified
        if nperseg is None:
            nperseg = 256  # then change to default
        if nperseg > input_length:
            warnings.warn('nperseg = {0:d} is greater than input length '
                          ' = {1:d}, using nperseg = {1:d}'
                          .format(nperseg, input_length))
            nperseg = input_length
        win = get_window(window, nperseg)
    else:
        win = np.asarray(window)
        if len(win.shape) != 1:
            raise ValueError('window must be 1-D')
        if input_length < win.shape[-1]:
            raise ValueError('window is longer than input signal')
        if nperseg is None:
            nperseg = win.shape[0]
        elif nperseg is not None:
            if nperseg != win.shape[0]:
                raise ValueError("value specified for nperseg is different"
                                 " from length of window")
    return win, nperseg    

const_ext, even_ext, odd_ext, zero_ext

这四个边界延拓的辅助函数的源码,我贴在最后了。最重要的是,通过一下几个例子理解清楚,它们分别是什么意思即可。

  • const_ext: Constant extension at the boundaries of an array

    • Generate a new ndarray that is a constant extension of x along an axis. The extension repeats the values at the first and last element of the axis.

      >>> from scipy.signal._arraytools import const_ext
      >>> a = np.array([[1, 2, 3, 4, 5], [0, 1, 4, 9, 16]])
      >>> const_ext(a, 2)
      array([[ 1,  1,  1,  2,  3,  4,  5,  5,  5],
      			 [ 0,  0,  0,  1,  4,  9, 16, 16, 16]])
      # Constant extension continues with the same values as the endpoints of the
      # array:
      >>> t = np.linspace(0, 1.5, 100)
      >>> a = 0.9 * np.sin(2 * np.pi * t**2)
      >>> b = const_ext(a, 40)
      >>> import matplotlib.pyplot as plt
      >>> plt.plot(np.arange(-40, 140), b, 'b', lw=1, label='constant extension')
      >>> plt.plot(np.arange(100), a, 'r', lw=2, label='original')
      >>> plt.legend(loc='best')
      

  • even_ext: Even extension at the boundaries of an array

    • Generate a new ndarray by making an even extension of x along an axis.

      >>> from scipy.signal._arraytools import even_ext
      >>> a = np.array([[1, 2, 3, 4, 5], [0, 1, 4, 9, 16]])
      >>> even_ext(a, 2)
      array([[ 3,  2,  1,  2,  3,  4,  5,  4,  3],
      			 [ 4,  1,  0,  1,  4,  9, 16,  9,  4]])
      # Even extension is a "mirror image" at the boundaries of the original array:
      >>> t = np.linspace(0, 1.5, 100)
      >>> a = 0.9 * np.sin(2 * np.pi * t**2)
      >>> b = even_ext(a, 40)
      >>> import matplotlib.pyplot as plt
      >>> plt.plot(np.arange(-40, 140), b, 'b', lw=1, label='even extension')
      >>> plt.plot(np.arange(100), a, 'r', lw=2, label='original')
      >>> plt.legend(loc='best')
      >>> plt.show()
      

  • odd_ext: Odd extension at the boundaries of an array

    • Generate a new ndarray by making an odd extension of x along an axis.

      >>> from scipy.signal._arraytools import odd_ext
      >>> a = np.array([[1, 2, 3, 4, 5], [0, 1, 4, 9, 16]])
      >>> odd_ext(a, 2)
      array([[-1,  0,  1,  2,  3,  4,  5,  6,  7],
      		   [-4, -1,  0,  1,  4,  9, 16, 23, 28]])
      # Odd extension is a "180 degree rotation" at the endpoints of the original array:
      >>> t = np.linspace(0, 1.5, 100)
      >>> a = 0.9 * np.sin(2 * np.pi * t**2)
      >>> b = odd_ext(a, 40)
      >>> import matplotlib.pyplot as plt
      >>> plt.plot(np.arange(-40, 140), b, 'b', lw=1, label='odd extension')
      >>> plt.plot(np.arange(100), a, 'r', lw=2, label='original')
      >>> plt.legend(loc='best')
      >>> plt.show()
      

  • zero_ext: Zero padding at the boundaries of an array

    • Generate a new ndarray that is a zero-padded extension of x along an axis.

      >>> from scipy.signal._arraytools import zero_ext
      >>> a = np.array([[1, 2, 3, 4, 5], [0, 1, 4, 9, 16]])
      >>> zero_ext(a, 2)
      array([[ 0,  0,  1,  2,  3,  4,  5,  0,  0],
      	  	 [ 0,  0,  0,  1,  4,  9, 16,  0,  0]])
      
  • 简化后的操作源码:

    • From https://github.com/scipy/scipy/blob/master/scipy/signal/_arraytools.py

      from scipy.signal._arraytools import axis_slice
      def const_ext(x, n, axis=-1):
          """
          Constant extension at the boundaries of an array
          """
          if n < 1:
              return x
          left_end = axis_slice(x, start=0, stop=1, axis=axis)
          ones_shape = [1] * x.ndim
          ones_shape[axis] = n
          ones = np.ones(ones_shape, dtype=x.dtype)
          left_ext = ones * left_end
          right_end = axis_slice(x, start=-1, axis=axis)
          right_ext = ones * right_end
          ext = np.concatenate((left_ext,
                              x,
                              right_ext),
                              axis=axis)
          return ext  
      ##########################################################################
      def even_ext(x, n, axis=-1):
          """
          Even extension at the boundaries of an array
              """
          if n < 1:
              return x
          if n > x.shape[axis] - 1:
              raise ValueError(("The extension length n (%d) is too big. " +
                              "It must not exceed x.shape[axis]-1, which is %d.")
                              % (n, x.shape[axis] - 1))
          left_ext = axis_slice(x, start=n, stop=0, step=-1, axis=axis)
          right_ext = axis_slice(x, start=-2, stop=-(n + 2), step=-1, axis=axis)
          ext = np.concatenate((left_ext,
                              x,
                              right_ext),
                              axis=axis)
          return ext
      ##########################################################################
      def odd_ext(x, n, axis=-1):
          """
          Odd extension at the boundaries of an array
              """
          if n < 1:
              return x
          if n > x.shape[axis] - 1:
              raise ValueError(("The extension length n (%d) is too big. " +
                              "It must not exceed x.shape[axis]-1, which is %d.")
                              % (n, x.shape[axis] - 1))
          left_end = axis_slice(x, start=0, stop=1, axis=axis)
          left_ext = axis_slice(x, start=n, stop=0, step=-1, axis=axis)
          right_end = axis_slice(x, start=-1, axis=axis)
          right_ext = axis_slice(x, start=-2, stop=-(n + 2), step=-1, axis=axis)
          ext = np.concatenate((2 * left_end - left_ext,
                              x,
                              2 * right_end - right_ext),
                              axis=axis)
          return ext
      ##########################################################################
      def zero_ext(x, n, axis=-1):
          """
          Zero padding at the boundaries of an array
          """
          if n < 1:
              return x
          zeros_shape = list(x.shape)
          zeros_shape[axis] = n
          zeros = np.zeros(zeros_shape, dtype=x.dtype)
          ext = np.concatenate((zeros, x, zeros), axis=axis)
          return ext
      

  1. P. Welch, “The use of the fast Fourier transform for the estimation of power spectra: A method based on time averaging over short, modified periodograms”, IEEE Trans. Audio Electroacoust. vol. 15, pp. 70-73, 1967. ↩︎

  2. N.R. Lomb “Least-squares frequency analysis of unequally spaced data”, Astrophysics and Space Science, vol 39, pp. 447-462, 1976 ↩︎

  3. J.D. Scargle “Studies in astronomical time series analysis. II - Statistical aspects of spectral analysis of unevenly spaced data”, The Astrophysical Journal, vol 263, pp. 835-853, 1982 例子: ↩︎

Next
Previous

Related