基於 IEEE PHM 2012 數據挑戰的軸承剩餘使用壽命(RUL)預測與振動信號分析平台,採用 Vue 3 + FastAPI 全端架構。
┌─────────────────────────────────────┐
│ 前端層 (Vue 3 + Element Plus) │
└─────────────────────────────────────┘
↕ REST API
┌─────────────────────────────────────┐
│ API 網關層 (FastAPI) │
└─────────────────────────────────────┘
↕
┌─────────────────────────────────────┐
│ 業務邏輯層 (演算法模組) │
└─────────────────────────────────────┘
↕
┌─────────────────────────────────────┐
│ 資料存取層 (SQLite) │
└─────────────────────────────────────┘
貢獻:
- 清晰的職責分離
- RESTful API 設計
- 易於擴展和維護
貢獻:
- 開發與生產環境一致性
- 水平擴展能力
- 輕鬆的 CI/CD 整合
| 模組 | 職責 |
|---|---|
timedomain.py |
時域特徵(Peak, RMS, Kurtosis, CF, EO) |
frequencydomain.py |
頻域特徵(FFT, FM0, TSA-FFT) |
filterprocess.py |
高階統計(NA4, FM4, M6A, M8A, ER) |
timefrequency.py |
時頻分析(STFT, CWT, Spectrogram) |
hilberttransform.py |
希爾伯特轉換與包絡分析 |
貢獻:
- 單一職責原則(SRP)
- 開閉原則(OCP)
- 依賴倒置原則(DIP)
- 工廠模式:統一的演算法實例化
- 單例模式:資料庫連線池管理
- 策略模式:演算法可互換
提供 30+ 個 API 端點:
- 數據查詢:7 個端點
- 時域分析:2 個端點
- 頻域分析:4 個端點
- 包絡分析:2 個端點
- 時頻分析:3 個端點
- 高階統計:2 個端點
- 溫度數據:6 個端點
try:
# 業務邏輯
except FileNotFoundError as e:
raise HTTPException(status_code=404, detail="友善錯誤訊息")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))bearings (一對多)
↓
measurement_files (一對多)
↓
measurements
貢獻:
- 第三正規化(3NF)
- 外鍵約束確保數據完整性
- 索引優化
- 本地開發環境 vs Docker 容器環境
- 導入路徑不同導致 ModuleNotFoundError
# 所有演算法模組都採用這個模式
try:
from backend.initialization import InitParameter as ip
from backend.timedomain import TimeDomain as td
except ModuleNotFoundError:
from initialization import InitParameter as ip
from timedomain import TimeDomain as td# main.py 中的動態路徑處理
_current_dir = os.path.dirname(os.path.abspath(__file__))
if _current_dir not in sys.path:
sys.path.insert(0, _current_dir)| 優點 | 缺點 |
|---|---|
| ✅ 簡單直觀 | ❌ 重複程式碼,違反 DRY |
| ✅ 運行時動態適應 | ❌ 導入錯誤只有運行時發現 |
- 除以零錯誤
- 空資料集
- 數值溢出
- NaN/Inf 傳播
除以零保護:
# FilterProcess.NA4
na4 = total_sum_all / division_total_sum_segment if division_total_sum_segment != 0 else np.nan
# FilterProcess.FM4
denominator = np.sum(difference ** 2) ** 2
fm4 = (n * np.sum(difference ** 4)) / denominator if denominator != 0 else np.nan
# FilterProcess.M6A
denominator = np.sum(difference ** 2) ** 3
m6a = ((n ** 2) * np.sum(difference ** 6)) / denominator if denominator != 0 else np.nan空資料集保護:
# frequencydomain.py
max_mortor_gear = fftoutput[mask1 & mask2]
if max_mortor_gear.empty:
max_mortor_gear = fftoutput.iloc[0:1] # Use first row as fallback
else:
max_mortor_gear = fftoutput[fftoutput['abs_fft']==np.max(max_mortor_gear['abs_fft'])]頻譜和諧波保護:
# frequencydomain.py
low_filter_sum,_ = hs.Harmonic(fftoutput)
if low_filter_sum == 0:
low_filter_sum = 1.0 # Avoid division by zero
# TSA FFT
high_filter_sum,_ = hs.Sildband(tsa_fftoutput)
if high_filter_sum == 0:
high_filter_sum = 1.0RMS 歸一化保護:
# frequencydomain.py
rms_val = td.rms(amp)
if rms_val == 0:
rms_val = 1.0 # Avoid division by zero| 優點 | 缺點 |
|---|---|
| ✅ 防止程式崩潰 | ❌ 硬編碼預設值可能不符合業務邏輯 |
| ✅ 使用 np.nan 標記無效計算 | ❌ 每個計算點都要檢查 |
- 資料庫查詢結果龐大(單檔案數十萬筆)
- 頻譜計算產生大量數據
- 網路傳輸負擔
- 記憶體消耗
配置顯示限制:
# config.py
SIGNAL_DISPLAY_LIMIT = 1000
SPECTRUM_DISPLAY_LIMIT = 1000
ENVELOPE_SPECTRUM_DISPLAY_LIMIT = 500
# main.py 中使用
"spectrum_data": {
"frequency": freq[:SPECTRUM_DISPLAY_LIMIT].tolist(),
"horizontal_magnitude": horiz_magnitude[:SPECTRUM_DISPLAY_LIMIT].tolist()
}分頁查詢:
# phm_query.py
def get_file_list(self, bearing_name: str, offset: int = 0, limit: int = 100):
cursor.execute("... LIMIT ? OFFSET ?", (bearing_name, limit, offset))頻譜圖資料降採樣:
# main.py
freq_limit = min(100, len(horiz_stft['frequencies']))
time_limit = min(100, len(horiz_stft['time']))
features = {
"spectrogram_data": {
"frequencies": horiz_stft['frequencies'][:freq_limit].tolist(),
"time": horiz_stft['time'][:time_limit].tolist()
}
}| 優點 | 缺點 |
|---|---|
| ✅ 顯著減少網路傳輸量 | ❌ 前端無法查看完整資料 |
| ✅ 降低前端渲染負擔 | ❌ 損失資料細節 |
| ✅ 配置集中管理 | ❌ 硬編碼限制值可能不適用所有場景 |
- FM0(Frequency Magnitude 0):識別馬達齒輪和皮帶主要頻率
- TSA(Time Synchronous Averaging):時域同步平均
- 邊帶檢測:識別主要頻率周圍的邊帶成分
- 諧波分析:識別整數倍頻率
多重遮罩篩選:
# frequencydomain.py - 計算 motor gear 主要頻率
mask1 = fftoutput['freqs']>=ip.mortor_gear-ip.side_band_range
mask2 = fftoutput['freqs']<=ip.mortor_gear+ip.side_band_range
max_mortor_gear=fftoutput[mask1 & mask2]
# 找出周圍頻率
mask7 = fftoutput['freqs']>=float(max_mortor_gear1['freqs'].values[0]) - ip.harmonic_gmf_range
mask8 = fftoutput['freqs']<float(max_mortor_gear1['freqs'].values[0])
mask9 = fftoutput['freqs']>float(max_mortor_gear1['freqs'].values[0])
mask10 = fftoutput['freqs']<=float(max_mortor_gear1['freqs'].values[0]) + ip.harmonic_gmf_range
# 篩選數值
fft_mgs1=fftoutput[mask7 & mask8]
fft_mgs2=fftoutput[mask9 & mask10]諧波與邊帶表格計算:
# harmonic_sildband_table.py 獨立處理
class HarmonicSildband():
def Harmonic(fft):
"""計算諧波和"""
pass
def Sildband(tsa_fft):
"""計算邊帶和"""
passTSA 頻譜倍率調整:
# 計算 TSA FFT 和原始 FFT 頻率的倍率
max4_freq = float(max4['tsa_freqs1'].values[0])
if max4_freq == 0:
max_freqs = 1.0
else:
max_freqs = float(max3['freqs1'].values[0]) / max4_freq
# 調整 TSA 頻譜頻率刻度
tsa_fftoutput=pd.DataFrame({
'multiply_freqs':np.round(tsa_freqs*max_freqs,5),
'tsa_abs_fft':tsa_abs_fft,
'tsa_abs_fft_n': tsa_abs_fft_n
})| 優點 | 缺點 |
|---|---|
| ✅ 完整實現 FM0 和 TSA 算法 | ❌ 遮罩邏輯複雜,難以維護 |
| ✅ 諧波/邊帶計算獨立 | ❌ 頻率範圍硬編碼 |
| ✅ 處理 TSA 頻譜頻率刻度調整 | ❌ 計算步驟多,容易出錯 |
- 某些檔案資料缺失
- 計算過程異常
- 資料格式不一致
- 需要繼續處理其他檔案
# frequencydomain.py - calculate_frequency_domain_trend
for idx, (file_num, file_id) in enumerate(files):
try:
# 更新進度
if progress_callback:
progress_callback(idx + 1, total_files, file_num)
# 查詢資料
df = pd.read_sql_query(query, conn)
if df.empty:
print(f"Warning: File {file_num} has no data, skipping")
# 插入 NaN 值
for key in feature_keys:
trend_data["horizontal"][key].append(float('nan'))
trend_data["vertical"][key].append(float('nan'))
trend_data["file_numbers"].append(file_num)
continue
# ... 計算邏輯 ...
except Exception as e:
print(f"Error processing file {file_num}: {str(e)}")
# 插入 NaN 值
for key in feature_keys:
trend_data["horizontal"][key].append(float('nan'))
trend_data["vertical"][key].append(float('nan'))
trend_data["file_numbers"].append(file_num)
continue| 優點 | 缺點 |
|---|---|
| ✅ 單個檔案錯誤不中斷整體 | ❌ 使用 print,不適合生產環境 |
| ✅ 插入 NaN 保持資料結構完整 | ❌ 錯誤資訊丟失,難追蹤 |
| ✅ 進度回調機制 | ❌ 異常捕獲太寬泛 |
- FastAPI 是異步框架,但 SQLite 是同步
- 多請求同時訪問導致競爭條件
- 頻繁創建/關閉連接影響性能
- 需要線程安全
# main.py - 連線管理器
import threading
_db_local = threading.local()
@contextlib.contextmanager
def get_db_connection(db_path: str):
"""資料庫連接上下文管理器"""
conn = getattr(_db_local, 'conn', None)
if conn is None:
conn = sqlite3.connect(db_path)
_db_local.conn = conn
try:
yield conn
finally:
# 不在此處關閉連接,讓連接在線程結束時關閉
pass
def close_db_connection():
"""關閉當前線程的資料庫連接"""
conn = getattr(_db_local, 'conn', None)
if conn is not None:
conn.close()
_db_local.conn = None
# 應用關閉時清理
@app.on_event("shutdown")
def shutdown_event():
close_db_connection()| 優點 | 缺點 |
|---|---|
| ✅ 線程安全 | ❌ SQLite 不支援高併發寫入 |
| ✅ 上下文管理器確保清理 | ❌ 連接不立即關閉可能洩漏 |
| ✅ 應用關閉時清理 | ❌ 不適合真正異步場景 |
- Windows 使用
\,Linux/Mac 使用/ - Docker 容器內路徑與本地不同
- 相對路徑與絕對路徑混用
# config.py
from pathlib import Path
BACKEND_DIR = Path(__file__).parent.absolute()
# 使用 os.path.join 連接路徑
PHM_DATABASE_PATH = os.path.join(BACKEND_DIR, "phm_data.db")
PHM_TEMPERATURE_DATABASE_PATH = os.path.join(BACKEND_DIR, "phm_temperature_data.db")
# main.py - 動態路徑處理
current_dir = os.path.dirname(os.path.abspath(__file__))
if os.path.basename(current_dir) == 'backend':
project_root = os.path.dirname(current_dir)
else:
project_root = current_dir
# 讀取分析結果
summary_path = os.path.join(project_root, "phm_analysis_results", "summary.json")| 優點 | 缺點 |
|---|---|
| ✅ 使用 Path 和 os.path 處理路徑 | ❌ 混用 Path 和 os.path |
| ✅ 動態判斷目錄結構 | ❌ 邏輯複雜,難理解 |
| ✅ 絕對路徑避免相對路徑問題 | ❌ 硬編碼目錄名稱 'backend' |
1. 引入 logging 模組
import logging
logger = logging.getLogger(__name__)
# 替換所有 print
logger.info("Processing file %d", file_num)
logger.error("Error in calculation: %s", str(e))2. 統一使用 pathlib
# config.py
BACKEND_DIR = Path(__file__).parent.resolve()
PROJECT_ROOT = BACKEND_DIR.parent if BACKEND_DIR.name == 'backend' else BACKEND_DIR
# 使用 / 操作符連接路徑
PHM_DATABASE_PATH = BACKEND_DIR / "phm_data.db"3. 建立安全除法裝飾器
# utils/safety.py
import functools
import numpy as np
def safe_division(default_value=np.nan):
"""安全除法裝飾器"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
if isinstance(result, (float, np.floating)):
if np.isnan(result) or np.isinf(result):
return default_value
return result
return wrapper
return decorator
# 使用
@safe_division(default_value=1.0)
def calculate_ratio(numerator, denominator):
return numerator / denominator4. 改進錯誤收集與報告
class ProcessingResult:
def __init__(self):
self.success_files = []
self.failed_files = []
self.errors = {}
def add_failure(self, file_num, error):
self.failed_files.append(file_num)
self.errors[file_num] = str(error)
def get_summary(self):
return {
'total': len(self.success_files) + len(self.failed_files),
'success': len(self.success_files),
'failed': len(self.failed_files),
'error_rate': len(self.failed_files) / (len(self.success_files) + len(self.failed_files)) if len(self.failed_files) > 0 else 0
}1. 使用 aiosqlite 實現異步資料庫
import aiosqlite
async def get_db_connection():
conn = await aiosqlite.connect(PHM_DATABASE_PATH)
return conn
@app.get("/api/example")
async def example_endpoint():
async with get_db_connection() as conn:
cursor = await conn.execute("SELECT * FROM bearings")
rows = await cursor.fetchall()
return rows2. 重構頻域計算
class SpectrumAnalyzer:
"""頻譜分析封裝類"""
def find_peaks_around(self, center_freq, bandwidth):
"""找出指定頻率周圍的峰值"""
mask = (self.fft_result['freqs'] >= center_freq - bandwidth) & \
(self.fft_result['freqs'] <= center_freq + bandwidth)
return self.fft_result[mask]
def calculate_sidebands(self, center_freq, num_sidebands=2):
"""計算邊帶"""
pass3. 實現智能降採樣
from scipy import signal
def downsample_data(data, max_points=1000):
"""智能降採樣,保留重要特徵"""
if len(data) <= max_points:
return data
return signal.resample(data, max_points)
def peak_preserving_downsample(data, max_points=1000):
"""保留峰值的降採樣"""
from scipy.signal import find_peaks
peaks, _ = find_peaks(np.abs(data))
indices = sorted(set(peaks[:max_points]))
return data[indices]4. 添加單元測試
# tests/test_timedomain.py
import pytest
from backend.timedomain import TimeDomain
def test_rms():
td = TimeDomain()
signal = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
result = td.rms(signal)
assert isinstance(result, float)
assert result > 0
def test_kurtosis():
td = TimeDomain()
signal = np.random.normal(0, 1, 1000)
result = td.kurt(signal)
assert isinstance(result, float)1. 遷移至 PostgreSQL
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/dbname",
echo=False
)
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)2. 實現微服務架構
┌─────────────┐
│ API Gateway │
└──────┬──────┘
│
┌───┴───┬─────────┬─────────┐
│ │ │ │
┌──▼──┐ ┌──▼──┐ ┌──▼──┐ ┌──▼──┐
│ Time │ │Freq │ │Env │ │Temp │
│Domain│ │Domain│ │Analy │ │Data │
│Service│ │Service│ │Service│ │Service│
└──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘
│ │ │ │
└───────┴─────────┴─────────┘
↓
┌──────────┐
│PostgreSQL│
└──────────┘
3. 添加快取機制(Redis)
import redis
from fastapi import FastAPI
app = FastAPI()
redis_client = redis.Redis(host='localhost', port=6379, db=0)
@app.get("/api/cache-example")
async def cache_example(bearing_name: str):
# 檢查快取
cache_key = f"analysis:{bearing_name}"
cached = redis_client.get(cache_key)
if cached:
return {"data": json.loads(cached), "from_cache": True}
# 計算結果
result = calculate_analysis(bearing_name)
# 存入快取(過期時間 1 小時)
redis_client.setex(cache_key, 3600, json.dumps(result))
return {"data": result, "from_cache": False}4. 實現異步任務隊列(Celery)
from celery import Celery
celery_app = Celery(
'vibration_analysis',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
@celery_app.task
def process_bearing_trend(bearing_name):
"""異步處理軸承趨勢分析"""
# 耗時的計算邏輯
result = calculate_frequency_domain_trend(bearing_name)
return result
# 在 API 中使用
@app.post("/api/analysis/async")
async def start_async_analysis(bearing_name: str):
task = process_bearing_trend.delay(bearing_name)
return {"task_id": task.id, "status": "pending"}
@app.get("/api/analysis/status/{task_id}")
async def get_task_status(task_id: str):
task = celery_app.AsyncResult(task_id)
return {"status": task.status, "result": task.result}| 維度 | 評分 | 說明 |
|---|---|---|
| 架構設計 | ⭐⭐⭐⭐⭐ | 清晰的分層架構,模組化設計 |
| 程式碼質量 | ⭐⭐⭐⭐ | 良好的命名規範,適當的抽象 |
| 可維護性 | ⭐⭐⭐⭐⭐ | 配置集中化,文檔完備 |
| 可擴展性 | ⭐⭐⭐⭐⭐ | 開閉原則,易於新增功能 |
| 性能優化 | ⭐⭐⭐⭐ | 資料庫連線池,分頁查詢 |
| 用戶體驗 | ⭐⭐⭐⭐⭐ | 友善的錯誤處理,一致的 API |
| 容器化 | ⭐⭐⭐⭐⭐ | Docker 支援,易於部署 |
| 困難點 | 複雜度 | 現有解決效果 |
|---|---|---|
| 模組導入兼容性 | ⭐⭐ | ⭐⭐⭐ 可用但重複 |
| 數值計算穩定性 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ 防護完善 |
| 大量資料處理 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ 有效但損失細節 |
| 複雜頻域計算 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ 功能完整但維護難 |
| 批量處理錯誤 | ⭐⭐⭐ | ⭐⭐ 容錯但日誌不足 |
| 資料庫連接管理 | ⭐⭐⭐⭐ | ⭐⭐⭐ 線程安全但不適合異步 |
| 跨平台路徑處理 | ⭐⭐ | ⭐⭐⭐ 處理完善 |
軟體工程貢獻:
- 🔬 科學計算與 Web 技術的完美結合
- 🏗️ 清晰架構與模組化設計
- 🚀 容器化部署與現代 DevOps 實踐
- 📚 文檔完備與教育價值
- 🔧 易於擴展與長期維護
解決困難點的成果:
- ✅ 完善的數值計算安全機制
- ✅ 有效的大量資料處理策略
- ✅ 完整的複雜頻域計算實現
- ✅ 容錯的批量處理機制
- ✅ 線程安全的資料庫連接管理
當前架構:
┌─────────────┐
│ FastAPI │ (同步處理)
└──────┬──────┘
│
↓
┌─────────────┐
│ SQLite │ (不支援高併發)
└─────────────┘
建議架構:
┌─────────────┐
│ FastAPI │ (異步處理)
└──────┬──────┘
│
↓
┌─────────────┐
│ PostgreSQL │ (支援高併發)
└─────────────┘
↑
│
┌──────┴──────┐
│ Redis │ (快取層)
└─────────────┘
本專案從軟體工程的角度來看,是一個設計優良、架構清晰、可維護性高的全端應用系統。它不僅成功整合了複雜的科學計算演算法,還採用了現代化的技術棧和最佳實踐。
雖然在處理某些技術困難時採用的解決方案可以進一步優化,但整體上已經建立起一個功能完整、穩定運行的系統。通過持續改進和架構升級,專案將能更好地支援大規模資料處理和高併發訪問。
這是一個可以作為學術研究轉化為工程應用的優秀範例,值得相關領域的開發者學習和參考。