導(dǎo)語(yǔ)
在上一篇關(guān)于 CWT 的文章里,我們已經(jīng)展示了連續(xù)小波變換(CWT)如何“放大”心電圖(ECG)里那一瞬間的 R 波,并獲取心率。這一次,我們把平移不變的小波(SWT)和離散小波(DWT)也請(qǐng)來(lái)比較一下,三種小波變換,同組數(shù)據(jù),最后我們將比較檢測(cè)結(jié)果的差異。
為了避免陳述一些枯燥的公式,先用幾句話來(lái)概括這三種小波變換各自的特點(diǎn)。
CWT:像拿著一把可變倍數(shù)的放大鏡,對(duì)著信號(hào)一寸寸地掃——你可以把放大率(尺度)調(diào)得很細(xì)很密,也可以把放大鏡慢慢移動(dòng)到任何時(shí)刻去比對(duì)。但因?yàn)槟銓?duì)每個(gè)放大率和每個(gè)位置都拍張‘快照’,最后會(huì)積攢一堆照片(系數(shù)很多、很冗余),所以既能看得細(xì)致入微,也要付出計(jì)算和存儲(chǔ)的代價(jià)。


如上面2個(gè)圖所示CWT示意在2個(gè)尺度的小波函數(shù)下,每個(gè)尺度對(duì)應(yīng)的2個(gè)時(shí)間點(diǎn)(T1,T2)的平移,小波函數(shù)和原信號(hào)進(jìn)行逐點(diǎn)乘積得到曲線,以及對(duì)應(yīng)局部區(qū)域乘積和的數(shù)值(紅點(diǎn))。
SWT:像把同一幅畫(huà)用不同倍率的鏡頭都拍成同樣像素大小的照片,這樣每張照片的像素都能一一對(duì)應(yīng)地比較——你不會(huì)因?yàn)榉糯蠡蚩s小而丟掉某個(gè)像素的位置,但照片會(huì)成堆(冗余),好處是無(wú)論把原畫(huà)輕輕平移多少,照片的亮點(diǎn)都會(huì)一起平移,不會(huì)亂跳(平移穩(wěn)定)。每個(gè)倍率的鏡頭包含兩個(gè)濾鏡,一個(gè)抓細(xì)節(jié)(高頻),一個(gè)抓輪廓(低頻)。如下圖所示,不過(guò)只對(duì)應(yīng)一種濾鏡。

信號(hào)和小波最初形態(tài)(無(wú)插零膨脹)


SWT在分解信號(hào)第一層的T1和T2處局部小波系數(shù)與信號(hào)的逐點(diǎn)相乘并求和(紅點(diǎn))

信號(hào)和連續(xù)插零膨脹4次的小波

和膨脹后小波系數(shù)卷積的不是原信號(hào)

SWT在分解信號(hào)第四層(紅點(diǎn)是對(duì)應(yīng)時(shí)間的乘積和)
DWT:SWT通過(guò)上采樣濾波器(或等價(jià)地不下采樣)來(lái)實(shí)現(xiàn)尺度變化,從而獲得平移不變性和冗余系數(shù);DWT通過(guò)下采樣實(shí)現(xiàn)尺度變化,但是對(duì)輸入信號(hào)的平移敏感;你可以把 SWT 看作是“去下采樣化并對(duì)濾波器進(jìn)行按層擴(kuò)展”的 DWT 變體。想象你拿著一臺(tái)固定變焦的小相機(jī)(其實(shí)是一對(duì)濾波器:一個(gè)是寫(xiě)意讓畫(huà)面模糊的 low?pass 和一個(gè)抓細(xì)節(jié)的 high?pass)。第一輪你把整張畫(huà)“拍”成兩張:一張是模糊的大塊(近似),一張是細(xì)碎的紋理(細(xì)節(jié))。接著,把那張“模糊的照片”按比例縮小(下采樣,只保留每隔一行/列的像素)作為下一輪的新畫(huà)面,再用同樣的相機(jī)繼續(xù)拍——層層下去就成了一個(gè)金字塔。注意:細(xì)節(jié)那一份在每層也會(huì)被下采樣并保存為該層的細(xì)節(jié)系數(shù),所以整個(gè)過(guò)程既緊湊又可重構(gòu)。但正因?yàn)槊繉佣荚凇皝G掉一半像素”,分解結(jié)果對(duì)原始信號(hào)的微小平移會(huì)很敏感;要想避免這一點(diǎn)可以考慮不下采樣的 SWT(平移不變小波)或冗余變換。
好了,不多說(shuō)了,這次代碼較多。我們就轉(zhuǎn)到心率檢測(cè)這個(gè)話題上吧。
SWT檢測(cè)心率

SWT通過(guò)db4小波檢測(cè)到的前20s峰值和心率
大家可以比較一下CWT的心率檢測(cè)輸出(小波變換(3):連續(xù)小波變換(CWT)方式從心電圖數(shù)據(jù)獲取心率)。SWT檢測(cè)心率的程序中,除了要找到峰值,還要避免峰值的變化,峰值處的多個(gè)相同采樣值,以及大峰后的小偽峰。程序的基本步驟如下:
讀取/截取 ECG
預(yù)處理(bandpass_filter)
SWT 分解(pywt.swt)
level(分解層數(shù)):越高能覆蓋越低頻成分,但計(jì)算和存儲(chǔ)開(kāi)銷(xiāo)增加
wavelet(母小波):db4、sym4、coif1 常用于 ECG;db4 與 QRS 形狀相似通常表現(xiàn)好
經(jīng)驗(yàn)(針對(duì) fs=360 Hz):細(xì)節(jié)層 j 對(duì)應(yīng)的近似頻帶(D_j 約為 fs/(2^(j+1)) 到 fs/(2^j)):
D1: 90–180 Hz, D2: 45–90 Hz, D3: 22.5–45 Hz, D4: 11.25–22.5 Hz, D5: 5.625–11.25 Hz
QRS 能量通常集中在 ~5–40 Hz → 可選 detail_levels = [3,4,5](與你代碼一致)
聚合感興趣尺度(agg)
歸一化與平滑(agg -> agg_smooth)
歸一化:agg = (agg - min) / ptp 便于閾值基于百分位或絕對(duì)值一致性
平滑:moving_average(agg, win_samples),win_samples = smooth_ms/1000 * fs(示例 smooth_ms=30 ms)
初始候選峰檢測(cè)(較寬松):找出可能的 R 峰位置(候選),但保持一定寬松以便后續(xù)二次篩選
候選峰特征計(jì)算(height, prominence)
二次篩選(基于幅值 / 突出度 / 相對(duì)高度)
計(jì)算 BPM(瞬時(shí) + 平滑)
Plot輸出
SWT小波變換檢測(cè)心率程序源碼:
""" 基于 SWT 的 ECG R 峰檢測(cè)與心率 (BPM) 計(jì)算示例 適合:離線分析或?qū)z測(cè)穩(wěn)定性有較高要求的場(chǎng)景(例如設(shè)備后臺(tái)或驗(yàn)證) 依賴: pip install numpy scipy matplotlib pywavelets 說(shuō)明要點(diǎn): - 使用 pywt.swt 做平移不變小波分解(每級(jí)返回 cA, cD, 長(zhǎng)度與輸入相同) - 選取與 QRS 頻帶對(duì)應(yīng)的 detail 級(jí)別(例如 fs=360Hz 時(shí), detail level 3-5 涵蓋 ~5-45Hz) - 聚合所選 detail 的絕對(duì)值作為能量包絡(luò), 平滑后做峰檢測(cè) - 峰在原始帶通信號(hào)附近做局部最大值校準(zhǔn)以確定 R 峰準(zhǔn)確位置 """ importnumpyasnp importscipy.signalassignal importmatplotlib.pyplotasplt importpywt fromscipy.signal.windowsimportgaussian importpandasaspd # ---------- 參數(shù)(根據(jù)采樣率調(diào)整) ---------- fs =360.0 # 采樣率(Hz), 題目中為 360Hz wavelet ='db4' # 常用小波(Daubechies 4 對(duì) ECG 效果不錯(cuò)) swt_level =5 # SWT 分解層數(shù)(越高捕獲越低頻成分, 但計(jì)算增大) # 推薦 detail levels(基于經(jīng)驗(yàn)/頻帶映射), 對(duì) 360Hz 可用 3-5 覆蓋 QRS 頻段 5-40Hz swt_detail_levels = [3,4,5] pre_band = (0.5,45.0) # 可選預(yù)帶通濾波, 去基線與高頻噪聲 min_rr_sec =0.20 # 最小 RR(秒), 防止偽峰;200 ms 常見(jiàn) smooth_ms =30 # 聚合能量的平滑窗口 (毫秒) threshold_percentile =70# 自適應(yīng)閾值:聚合能量的百分位 debug_plot =True # ---------- 輔助函數(shù) ---------- defbandpass_filter(x, fs, lowcut, highcut, order=3): nyq =0.5* fs b, a = signal.butter(order, [lowcut/nyq, highcut/nyq], btype='band') returnsignal.filtfilt(b, a, x) defmoving_average(x, win_samples): ifwin_samples <=?1: ? ? ? ??return?x ? ? kernel = np.ones(win_samples) / win_samples ? ??return?np.convolve(x, kernel, mode='same') def?_compute_bpm_from_peaks(peaks_idx, fs, smooth_N=3): ? ??""" ? ? 從峰索引計(jì)算 BPM 序列。 ? ? 返回 (bpm_times, bpm_values): ? ? ? bpm_times: 對(duì)應(yīng)于每次計(jì)算出的 BPM 的時(shí)間點(diǎn)(秒), 使用當(dāng)前峰時(shí)間 ? ? ? bpm_values: 平滑后的 BPM(以最近 smooth_N 個(gè) RR 平均) ? ? """ ? ??if?len(peaks_idx) 2: ? ? ? ??return?np.array([]), np.array([]) ? ? bpm_times = [] ? ? bpm_values = [] ? ??for?i?in?range(1,?len(peaks_idx)): ? ? ? ? rr = (peaks_idx[i] - peaks_idx[i-1]) / fs ? ? ? ??if?rr <=?0: ? ? ? ? ? ??continue ? ? ? ? inst_bpm =?60.0?/ rr ? ? ? ??# 平滑:最多用最近 smooth_N 個(gè) RR ? ? ? ? j0 =?max(1, i - smooth_N +?1) ? ? ? ? rr_list = [(peaks_idx[k] - peaks_idx[k-1]) / fs?for?k?in?range(j0, i+1)] ? ? ? ??if?len(rr_list) >0: smooth_bpm =60.0/ (np.mean(rr_list)) else: smooth_bpm = inst_bpm bpm_times.append(peaks_idx[i] / fs) bpm_values.append(smooth_bpm) returnnp.array(bpm_times), np.array(bpm_values) defdetect_beats_swt(ecg, fs, wavelet='db4', level=6, detail_levels=(3,4,5), pre_band=(0.5,45.0), smooth_ms=30, initial_percentile=50, amp_factor=0.6, prom_factor=0.5, prom_min=0.05, rel_factor=0.45, min_rr_sec=0.20, debug_plot=False): """ SWT-based R peak detection. 返回: peaks_idx, bpm_times, bpm_values 主要改進(jìn): - 先用初始閾值找候選峰, 再基于候選峰的中位高度/中位 prominence 做二次篩選 - 加入相對(duì)前峰高度檢查以減少大峰后的小偽峰 """ # 1) 預(yù)處理 ifpre_bandisnotNone: ecg_f = bandpass_filter(ecg, fs, pre_band[0], pre_band[1]) else: ecg_f = ecg.copy() # 2) SWT 分解 swt_coeffs = pywt.swt(ecg_f, wavelet, level=level, start_level=0) # 3) 聚合 select detail levels 的絕對(duì)值 L =len(ecg_f) agg = np.zeros(L) forlvlindetail_levels: if1<= lvl <= level: ? ? ? ? ? ? cA, cD = swt_coeffs[lvl-1] ? ? ? ? ? ? agg += np.abs(cD) ? ??# 歸一化與平滑 ? ? agg = (agg - np.min(agg)) / (np.ptp(agg) +?1e-12) ? ? win_samples =?max(1,?int((smooth_ms/1000.0) * fs)) ? ? agg_smooth = moving_average(agg, win_samples) ? ?? ? ??# 4) 初始候選峰(較寬松) ? ? initial_threshold = np.percentile(agg_smooth, initial_percentile) ? ? min_distance =?int(min_rr_sec * fs) ? ??# 設(shè)置一個(gè)絕對(duì)高度閾值。所有檢測(cè)到的峰值, 其實(shí)際幅值(在 agg_smooth 數(shù)組中的值)都必須大于或等于這個(gè) initial_threshold ? ? peaks0, props0 = signal.find_peaks(agg_smooth, height=initial_threshold, distance=min_distance) ? ??if?peaks0.size ==?0: ? ? ? ??# 無(wú)候選峰 -> 返回空 returnnp.array([], dtype=int), np.array([]), np.array([]) heights = agg_smooth[peaks0] prominences = signal.peak_prominences(agg_smooth, peaks0)[0] median_h = np.median(heights) median_prom = np.median(prominences) # 5) 二次篩選:基于 height / prominence / relative-to-prev accepted = [] prev_height =None fori, pinenumerate(peaks0): h = heights[i] prom = prominences[i]ifi =max(initial_threshold, median_h * amp_factor)) cond_prom = (prom >=max(median_prom * prom_factor, prom_min)) ifnot(cond_amporcond_prom): continue ifprev_heightisnotNone: ifh < prev_height * rel_factor: ? ? ? ? ? ? ? ??# 允許 prominence 很大時(shí)仍保留 ? ? ? ? ? ? ? ??if?prom = initial_threshold] # 6) 在帶通信號(hào)上校正到局部最大(提高定位精度) corrected = [] search_radius =int(0.05* fs) forpinaccepted: lo =max(0, p - search_radius) hi =min(L, p + search_radius +1) seg = ecg_f[lo:hi] ifseg.size ==0: continue local_idx = np.argmax(np.abs(seg)) corr = lo + local_idx iflen(corrected) ==0or(corr - corrected[-1]) > (min_distance //2): corrected.append(int(corr)) peaks_idx = np.array(corrected, dtype=int) # 7) 計(jì)算 BPM bpm_times, bpm_values = _compute_bpm_from_peaks(peaks_idx, fs, smooth_N=3) # 8) debug 繪圖 ifdebug_plot: t = np.arange(L) / fs plt.figure(figsize=(12,9)) ax1 = plt.subplot(4,1,1) plt.plot(t, ecg, label='raw ECG', alpha=0.5) plt.plot(t, ecg_f, label='filtered ECG', linewidth=1.0) plt.scatter(peaks_idx/fs, ecg_f[peaks_idx], color='r', label='final R peaks') plt.legend(loc='upper right'); plt.title('ECG and detected peaks') ax2 = plt.subplot(4,1,2, sharex=ax1) # 展示被選 detail 的系數(shù)(堆疊) selected_coefs = np.vstack([np.abs(swt_coeffs[l-1][1])forlindetail_levelsif1<=l<=level]) ? ? ? ??if?selected_coefs.size >0: extent = [0, L/fs,min(detail_levels)-0.5,max(detail_levels)+0.5] plt.imshow(selected_coefs, aspect='auto', origin='lower', extent=extent) plt.colorbar(label='|SWT detail|') plt.ylabel('detail level'); plt.title('Selected SWT detail coefficients') ax3 = plt.subplot(4,1,3, sharex=ax1) plt.plot(t, agg, label='agg (norm)', alpha=0.4) plt.plot(t, agg_smooth, label='agg smoothed', linewidth=1.2) plt.hlines(initial_threshold, t[0], t[-1], colors='k', linestyles='--', label=f'init thr={initial_threshold:.3f}') plt.scatter(peaks0/fs, agg_smooth[peaks0], color='g', s=20, label='candidates') plt.scatter(np.array(accepted)/fs, agg_smooth[np.array(accepted)], facecolors='none', edgecolors='r', s=60, label='accepted (pre-correct)') plt.legend(loc='upper right'); plt.title('Aggregated SWT energy and candidates') ax4 = plt.subplot(4,1,4, sharex=ax1) iflen(bpm_times) >0: plt.plot(bpm_times, bpm_values,'-o') plt.xlabel('Time (s)'); plt.ylabel('BPM'); plt.title('Estimated BPM (smoothed)') plt.grid(True) else: plt.text(0.1,0.5,'No BPM computed (need >=2 peaks)', transform=ax4.transAxes) plt.tight_layout() plt.show() returnpeaks_idx, bpm_times, bpm_values defread_ecg_from_csv(csv_path, fs=360, duration_sec=20): """ 從 CSV 讀取 ECG 數(shù)據(jù), 并截取前 duration_sec 秒 參數(shù): csv_path: CSV文件路徑 fs: 采樣率 (Hz), 默認(rèn)360 duration_sec: 需要讀取的時(shí)長(zhǎng)(秒), 默認(rèn)20 返回: t: 時(shí)間軸 (秒) ecg_segment: 截取后的 ECG 信號(hào) """ # ---------- 讀取 CSV 并提取 ECG 列 ---------- df = pd.read_csv(csv_path) if'MLII'indf.columns: ecg_full = df['MLII'].values.astype(float) elifdf.shape[1] >=2: ecg_full = df.iloc[:,1].values.astype(float) else: raiseValueError("未找到 ECG 列, 請(qǐng)確認(rèn) CSV 列名或格式。") # 計(jì)算需要截取的樣本數(shù) n_samples_needed =int(duration_sec * fs) # 截取前 n_samples_needed 個(gè)樣本 iflen(ecg_full) >= n_samples_needed: ecg_segment = ecg_full[:n_samples_needed] else: print(f"警告: 原始數(shù)據(jù)只有{len(ecg_full)}個(gè)樣本 ({n_samples_needed}), 將使用全部數(shù)據(jù)") ? ? ? ? ecg_segment = ecg_full ? ? ? ? n_samples_needed =?len(ecg_full) ? ?? ? ??# 生成時(shí)間軸 ? ??if?'time_s'?in?df.columns: ? ? ? ??# 如果CSV有時(shí)間列, 也相應(yīng)截取 ? ? ? ? times_full = df['time_s'].values ? ? ? ? t = times_full[:n_samples_needed] ? ??else: ? ? ? ? t = np.arange(n_samples_needed) / fs ? ?? ? ??return?t, ecg_segment if?__name__ ==?"__main__": ? ? csv_path =?r"Your_path_to_ECG_data122_60s.csv" ? ? t, ecg = read_ecg_from_csv(csv_path=csv_path, fs=360, duration_sec=?20) ? ?? ? ? peaks_swt, bpm_times_swt, bpm_values_swt = detect_beats_swt( ? ? ? ? ecg, fs, ? ? ? ? wavelet='db4', ? ? ? ? level=5, ? ? ? ? detail_levels=(3,4,5), ? ? ? ? pre_band=(0.5,45.0), ? ? ? ? smooth_ms=30, ? ? ? ? initial_percentile=85, ? ? ? ? amp_factor=0.6, ? ? ? ? prom_factor=0.5, ? ? ? ? prom_min=0.05, ? ? ? ? rel_factor=0.45, ? ? ? ? min_rr_sec=0.20, ? ? ? ? debug_plot=True ? ? ) ? ??print("SWT 檢測(cè):",?len(peaks_swt),?"peaks; 最后 BPM:", (bpm_values_swt[-1]?if?len(bpm_values_swt)>0elseNone))
DWT檢測(cè)心率
DWT檢測(cè)新的程序流程和SWT類(lèi)似,也是把原信號(hào)通過(guò)小波變換(這里采用的仍然是db4)分解信號(hào),然后在信號(hào)中把需要的頻率范圍內(nèi)的系數(shù)相加,重建信號(hào)(相對(duì)于抓住信號(hào)中需要的特征的濾波后的信號(hào)),然后在基于這個(gè)重建信號(hào)對(duì)峰值進(jìn)行檢測(cè),平滑處理,再校正,然后計(jì)算BPM后,輸出處理后的數(shù)據(jù)到圖。


DWT小波變換檢測(cè)心率
DWT小波變換檢測(cè)心率程序源碼:
""" 基于 DWT 的 ECG R 峰檢測(cè)(通過(guò)重構(gòu)選定 detail 級(jí)別) 適合: 計(jì)算資源受限或?qū)崟r(shí)嵌入式場(chǎng)景(優(yōu)點(diǎn)是計(jì)算量低) 缺點(diǎn): DWT 非平移不變,檢測(cè)位置對(duì)相位/起點(diǎn)敏感;可以通過(guò) cycle spinning(多次平移重構(gòu)求平均)緩解。 依賴: pip install numpy scipy matplotlib pywavelets """ importnumpyasnp importscipy.signalassignal importmatplotlib.pyplotasplt importpywt fromscipy.signal.windowsimportgaussian importpandasaspd # ---------- 參數(shù) ---------- fs =360.0 wavelet ='db4' dwt_level =4 # 對(duì)于 360Hz,QRS 主要在 D3-D5 范圍(經(jīng)驗(yàn)),可選 detail_levels_dwt = [3,4,5] detail_levels_dwt = [3,4,5] pre_band = (0.5,45.0) min_rr_sec =0.20 smooth_ms =30 threshold_percentile =75 debug_plot =True defbandpass_filter(x, fs, lowcut, highcut, order=3): nyq =0.5* fs b, a = signal.butter(order, [lowcut/nyq, highcut/nyq], btype='band') returnsignal.filtfilt(b, a, x) defmoving_average(x, win_samples): ifwin_samples <=?1: ? ? ? ??return?x ? ? kernel = np.ones(win_samples)/win_samples ? ??return?np.convolve(x, kernel, mode='same') def?detect_beats_dwt(ecg, fs, ? ? ? ? ? ? ? ? ? ? ?wavelet='db4', ? ? ? ? ? ? ? ? ? ? ?level=6, ? ? ? ? ? ? ? ? ? ? ?detail_levels=(3,4,5), ? ? ? ? ? ? ? ? ? ? ?pre_band=(0.5,45.0), ? ? ? ? ? ? ? ? ? ? ?smooth_ms=30, ? ? ? ? ? ? ? ? ? ? ?threshold_percentile=70, ? ? ? ? ? ? ? ? ? ? ?min_rr_sec=0.20, ? ? ? ? ? ? ? ? ? ? ?debug_plot=False): ? ??""" ? ? DWT 方法: 對(duì) signal 做 wavedec 分解,然后把不需要的系數(shù)置零,再用 waverec 重構(gòu)出只含 QRS 頻帶的信號(hào), ? ? 在重構(gòu)信號(hào)上做能量聚合和平滑檢測(cè)。 ? ? 返回: peaks_idx, bpm_times, bpm_values ? ? """ ? ??# 1) 預(yù)處理 ? ??if?pre_band?is?not?None: ? ? ? ? ecg_f = bandpass_filter(ecg, fs, pre_band[0], pre_band[1]) ? ??else: ? ? ? ? ecg_f = ecg.copy() ? ?? ? ??# 2) DWT 分解 ? ? coeffs = pywt.wavedec(ecg_f, wavelet, level=level) ? ??# coeffs: [cA_n, cD_n, cD_{n-1}, ..., cD1] 長(zhǎng)度 level+1 ? ?? ? ??# 3) 將不選的 detail 置零,只保留目標(biāo) detail 級(jí)別 ? ? coeffs_sel = [None] *?len(coeffs) ? ??# cA_n 保留為零(若想也保留可調(diào)整),這里置零以強(qiáng)調(diào)細(xì)節(jié) ? ? coeffs_sel[0] = np.zeros_like(coeffs[0]) ? ??# detail indices in coeffs: coeffs[1] is cD_n, coeffs[-1] is cD1 ? ??# mapping: detail level j (1..n) corresponds to coeffs index = len(coeffs)-j ? ??for?j?in?range(1, level+1): ? ? ? ? idx =?len(coeffs) - j ? ? ? ??if?j?in?detail_levels: ? ? ? ? ? ? coeffs_sel[idx] = coeffs[idx] ?# 保留 ? ? ? ??else: ? ? ? ? ? ? coeffs_sel[idx] = np.zeros_like(coeffs[idx]) ? ?? ? ??# 4) 重構(gòu)信號(hào)(只含所選 detail) ? ? recon = pywt.waverec(coeffs_sel, wavelet) ? ??# waverec 可能導(dǎo)致重構(gòu)長(zhǎng)度與原始略有差異,截?cái)嗷蛱畛?? ? recon = recon[:len(ecg_f)] ? ?? ? ??# 5) 能量聚合、平滑 ? ? agg = np.abs(recon) ? ? agg = (agg - np.min(agg)) / (np.ptp(agg) +?1e-12) ? ? win_samples =?max(1,?int((smooth_ms/1000.0) * fs)) ? ? agg_smooth = moving_average(agg, win_samples) ? ?? ? ??# 6) 峰檢測(cè) ? ? threshold = np.percentile(agg_smooth, threshold_percentile) ? ? min_distance =?int(min_rr_sec * fs) ? ? peaks0, props = signal.find_peaks(agg_smooth, height=threshold, distance=min_distance) ? ?? ? ??# 7) 在濾波后的原始信號(hào)上校正峰位置 ? ? corrected = [] ? ? L =?len(ecg_f) ? ? search_radius =?int(0.05?* fs) ? ??for?p?in?peaks0: ? ? ? ? lo =?max(0, p - search_radius) ? ? ? ? hi =?min(L, p + search_radius +?1) ? ? ? ? seg = ecg_f[lo:hi] ? ? ? ??if?seg.size ==?0:?continue ? ? ? ? local_idx = np.argmax(np.abs(seg)) ? ? ? ? corr = lo + local_idx ? ? ? ??if?len(corrected) ==?0?or?(corr - corrected[-1]) > (min_distance //2): corrected.append(int(corr)) peaks_idx = np.array(corrected, dtype=int) # 8) 計(jì)算 BPM bpm_times = [] bpm_values = [] foriinrange(1,len(peaks_idx)): rr = (peaks_idx[i] - peaks_idx[i-1]) / fs ifrr <=?0:?continue ? ? ? ? inst_bpm =?60.0?/ rr ? ? ? ? lookback =?3 ? ? ? ? j0 =?max(1, i - lookback +?1) ? ? ? ? rr_list = [(peaks_idx[k] - peaks_idx[k-1]) / fs?for?k?in?range(j0, i+1)] ? ? ? ? smooth_bpm =?60.0?/ (np.mean(rr_list))?if?len(rr_list)>0elseinst_bpm bpm_times.append(peaks_idx[i] / fs) bpm_values.append(smooth_bpm) bpm_times = np.array(bpm_times) bpm_values = np.array(bpm_values) # 9) debug 繪圖 ifdebug_plot: t = np.arange(len(ecg)) / fs plt.figure(figsize=(12,8)) ax1 = plt.subplot(3,1,1) plt.plot(t, ecg, label='raw ECG', alpha=0.6) plt.plot(t, ecg_f, label='filtered', alpha=0.9) plt.scatter(peaks_idx/fs, ecg_f[peaks_idx], color='r', label='detected R peaks') plt.legend(); plt.title('ECG and peaks') ax2 = plt.subplot(3,1,2, sharex=ax1) plt.plot(t, recon, label='reconstructed (selected D levels)') plt.legend(); plt.title('Reconstructed detail-band signal') ax3 = plt.subplot(3,1,3, sharex=ax1) plt.plot(t, agg_smooth, label='agg_smooth') plt.hlines(threshold, t[0], t[-1], linestyles='--', label=f'thr={threshold:.2f}') plt.scatter(peaks0/fs, agg_smooth[peaks0], color='g', label='peaks before correction') plt.legend(); plt.title('Aggregated envelope') plt.tight_layout(); plt.show() # 新增: 單獨(dú)的心率隨時(shí)間圖(獨(dú)立 figure) plt.figure(figsize=(10,4)) ifbpm_times.size >0: plt.plot(bpm_times, bpm_values, marker='o', linestyle='-', color='tab:orange') plt.xlabel('Time (s)') plt.ylabel('Heart Rate (BPM)') plt.title('Heart Rate over Time (smoothed)') plt.grid(True) else: # 若未檢測(cè)到心搏,給出提示性圖形 plt.text(0.5,0.5,'No heartbeats detected to plot', ha='center', va='center', fontsize=12) plt.title('Heart Rate over Time (no detections)') plt.axis('off') plt.show() returnpeaks_idx, bpm_times, bpm_values defread_ecg_from_csv(csv_path, fs=360, duration_sec=20): """ 從 CSV 讀取 ECG 數(shù)據(jù),并截取前 duration_sec 秒 參數(shù): csv_path: CSV文件路徑 fs: 采樣率 (Hz),默認(rèn)360 duration_sec: 需要讀取的時(shí)長(zhǎng)(秒),默認(rèn)20 返回: t: 時(shí)間軸 (秒) ecg_segment: 截取后的 ECG 信號(hào) """ # ---------- 讀取 CSV 并提取 ECG 列 ---------- df = pd.read_csv(csv_path) if'MLII'indf.columns: ecg_full = df['MLII'].values.astype(float) elifdf.shape[1] >=2: ecg_full = df.iloc[:,1].values.astype(float) else: raiseValueError("未找到 ECG 列,請(qǐng)確認(rèn) CSV 列名或格式。") # 計(jì)算需要截取的樣本數(shù) n_samples_needed =int(duration_sec * fs) # 截取前 n_samples_needed 個(gè)樣本 iflen(ecg_full) >= n_samples_needed: ecg_segment = ecg_full[:n_samples_needed] else: print(f"警告: 原始數(shù)據(jù)只有{len(ecg_full)}個(gè)樣本 ({n_samples_needed}),將使用全部數(shù)據(jù)") ? ? ? ? ecg_segment = ecg_full ? ? ? ? n_samples_needed =?len(ecg_full) ? ?? ? ??# 生成時(shí)間軸 ? ??if?'time_s'?in?df.columns: ? ? ? ??# 如果CSV有時(shí)間列,也相應(yīng)截取 ? ? ? ? times_full = df['time_s'].values ? ? ? ? t = times_full[:n_samples_needed] ? ??else: ? ? ? ? t = np.arange(n_samples_needed) / fs ? ?? ? ??return?t, ecg_segment if?__name__ ==?"__main__": ? ? csv_path =?r"C:py_TestCodePy_codeswaveletsignal_change_detect122_60s.csv" ? ? t, ecg = read_ecg_from_csv(csv_path=csv_path, fs=360, duration_sec=?20) ? ?? ? ? peaks_idx, bpm_times, bpm_values = detect_beats_dwt( ? ? ? ? ecg, fs, ? ? ? ? wavelet=wavelet, ? ? ? ? level=dwt_level, ? ? ? ? detail_levels=detail_levels_dwt, ? ? ? ? pre_band=pre_band, ? ? ? ? smooth_ms=smooth_ms, ? ? ? ? threshold_percentile=threshold_percentile, ? ? ? ? min_rr_sec=min_rr_sec, ? ? ? ? debug_plot=debug_plot ? ? ) ? ??print(f"檢測(cè)到?{len(peaks_idx)}?個(gè) R 峰") ? ??if?len(bpm_values)>0: print(f"最后 BPM ={bpm_values[-1]:.1f}bpm (time{bpm_times[-1]:.1f}s)")
比較CWT,SWT和DWT三種小波變換檢測(cè)心率的結(jié)果
小編把三個(gè)演示程序的最后輸出額外添加了心率BMP的輸出,可見(jiàn)檢測(cè)結(jié)果在心電圖的前20個(gè)心率值完全相同:
| CWT (bpm) | SWT (bpm) | DWT (bpm) |
|---|---|---|
| 92.7 | 92.7 | 92.7 |
| 92.1 | 92.1 | 92.1 |
| 91.8 | 91.8 | 91.8 |
| 93.6 | 93.6 | 93.6 |
| 93.6 | 93.6 | 93.6 |
| 94.0 | 94.0 | 94.0 |
| 91.0 | 91.0 | 91.0 |
| 90.9 | 90.9 | 90.9 |
| 89.9 | 89.9 | 89.9 |
| 90.3 | 90.3 | 90.3 |
| 89.4 | 89.4 | 89.4 |
| 89.1 | 89.1 | 89.1 |
| 87.9 | 87.9 | 87.9 |
| 87.1 | 87.1 | 87.1 |
| 86.4 | 86.4 | 86.4 |
| 86.3 | 86.3 | 86.3 |
| 86.7 | 86.7 | 86.7 |
| 87.3 | 87.3 | 87.3 |
| 86.9 | 86.9 | 86.9 |
| 86.3 | 86.3 | 86.3 |
結(jié)語(yǔ)
代碼有點(diǎn)冗長(zhǎng),但是也只觸及到小波變換的冰山一角。所謂拋磚引玉,如果可以有助于各位理解小波變換和信號(hào)處理,那確實(shí)是心滿意足了。
這里仍然要引用一下ECG的數(shù)據(jù)來(lái)源。[1][2]
[參考]
[1] G. B. Moody and R. G. Mark, “The impact of the MIT-BIH Arrhythmia Database,”IEEE Eng. Med. Biol. Mag., vol. 20, no. 3, pp. 45–50, May 2001, doi: 10.1109/51.932724.
[2] A. L. Goldberger et al., “PhysioBank, PhysioToolkit, and PhysioNet: Components of a new research resource for complex physiologic signals,”Circulation, vol. 101, no. 23, pp. e215–e220, Jun. 2000, doi: 10.1161/01.CIR.101.23.e215.
-
小波變換
+關(guān)注
關(guān)注
2文章
184瀏覽量
30545 -
心電圖
+關(guān)注
關(guān)注
1文章
82瀏覽量
26031
原文標(biāo)題:小波變換(4):平穩(wěn)小波變換變換(SWT)和離散小波變換(DWT)方式從心電圖數(shù)據(jù)獲取心率
文章出處:【微信號(hào):安費(fèi)諾傳感器學(xué)堂,微信公眾號(hào):安費(fèi)諾傳感器學(xué)堂】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
#參考設(shè)計(jì)#可穿戴心電圖設(shè)計(jì)方案
利用深度循環(huán)神經(jīng)網(wǎng)絡(luò)對(duì)心電圖降噪
可通過(guò)iPhone監(jiān)控心率的心電圖儀參考設(shè)計(jì)
心電圖的數(shù)據(jù)處理和波形顯示包括心率,RR值等的參數(shù)顯示..
AMEYA360設(shè)計(jì)方案丨基于 NXP 的心電圖儀
如何對(duì)心電圖(ECG)信號(hào)進(jìn)行簡(jiǎn)單的分析和心率計(jì)算
【快速上手手冊(cè)】瘋殼·四合一健康監(jiān)測(cè)模組(心率血壓血氧心電)
【快速上手手冊(cè)】瘋殼·四合一健康監(jiān)測(cè)模組(心率血壓血氧心電)
心電圖,什么是心電圖
怎樣用Arduino微控制器和AD8232制作心電圖并測(cè)量心率
蘋(píng)果設(shè)計(jì)新的 Apple Watch 測(cè)量心電圖(ECG)波的算法
在OLED屏幕上獲取實(shí)時(shí)心電圖
心電圖儀簡(jiǎn)介
使用MSP430FG439的心率和心電圖監(jiān)測(cè)器
利用平穩(wěn)和離散小波變換方式從心電圖數(shù)據(jù)獲取心率
評(píng)論