From 312e65bc39a0b4b8708e0ad0ded40f6ce1a8cc88 Mon Sep 17 00:00:00 2001 From: Calvin1989 <1400235935@qq.com> Date: Thu, 30 Apr 2026 15:36:56 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat(edge=5Fintelligence=5FV2X):=20?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=E8=99=9A=E6=8B=9F=E5=9B=B4=E6=A0=8F=E5=85=A5?= =?UTF-8?q?=E4=BE=B5=E6=A3=80=E6=B5=8B=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在画面 58% 高度处绘制金黄色虚线围栏(VIRTUAL FENCE) - 检测目标越线时框变橙红色并显示 !INTRUSION! 标签 - 上升沿计数入侵事件(同一批目标持续越线只计1次) - 围栏线在入侵时闪烁变色,屏幕四周出现橙色边框警报 - V2X 面板新增 Intrusions 入侵事件计数行 - V2X 广播最高优先级推送 FENCE BREACH 消息 --- src/edge_intelligence_V2X/main.py | 102 ++++++++++++++++++++++++++++-- 1 file changed, 96 insertions(+), 6 deletions(-) diff --git a/src/edge_intelligence_V2X/main.py b/src/edge_intelligence_V2X/main.py index e0c78d4ce8..f44c4ba9f2 100644 --- a/src/edge_intelligence_V2X/main.py +++ b/src/edge_intelligence_V2X/main.py @@ -74,6 +74,10 @@ COLOR_OTHER = (230, 165, 0) # 橙色 — 其他 COLOR_PANEL_BG = (30, 30, 30) # 面板背景 +# 虚拟围栏配置 +FENCE_Y_RATIO = 0.58 # 围栏线纵坐标占画面高度的比例(0~1) +FENCE_COLOR = (0, 215, 255) # 围栏正常颜色(金黄色,BGR) + # 截图保存目录 SCREENSHOT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'results') @@ -109,6 +113,12 @@ def __init__(self): self.last_detections = None # 行人闯入报警 self.ped_alert_timer = 0.0 # 报警持续到此时间戳 + # 虚拟围栏入侵检测 + self.fence_y = int(CAMERA_HEIGHT * FENCE_Y_RATIO) + self.intrusion_event_count = 0 # 累计入侵事件次数 + self.prev_intrusion = False # 上一帧是否存在入侵 + self.intrusion_alert_timer = 0.0 # 入侵警报持续到此时间戳 + self.intrusion_objects = 0 # 当前帧检测到的入侵目标数 # 检测历史记录(用于底部统计条) self.history_vehicles = [] self.history_pedestrians = [] @@ -306,9 +316,10 @@ def _detect(self, frame): return self.last_detections def _draw_detections(self, frame, detections): - """在画面上绘制检测框、类别标签、置信度""" + """在画面上绘制检测框、类别标签、置信度,并检测虚拟围栏入侵""" v_count = 0 p_count = 0 + intrusion_count = 0 for box in detections.boxes: cls_id = int(box.cls[0]) @@ -316,18 +327,31 @@ def _draw_detections(self, frame, detections): conf = float(box.conf[0]) x1, y1, x2, y2 = map(int, box.xyxy[0]) - # 按类别选择颜色 + # 按类别选择基础颜色并计数 if cls_name in VEHICLE_CLASSES: - color = COLOR_VEHICLE + base_color = COLOR_VEHICLE v_count += 1 elif cls_name in PERSON_CLASSES: - color = COLOR_PERSON + base_color = COLOR_PERSON p_count += 1 else: - color = COLOR_OTHER + base_color = COLOR_OTHER + + # 判断是否越过虚拟围栏(目标底部进入受限区域) + intruding = (y2 > self.fence_y and + cls_name in (VEHICLE_CLASSES | PERSON_CLASSES)) + if intruding: + intrusion_count += 1 + color = (0, 60, 255) # 橙红色标记入侵目标 + thickness = 3 + cv2.putText(frame, "!INTRUSION!", (x1, max(y1 - 24, 36)), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 60, 255), 2) + else: + color = base_color + thickness = 2 # 检测框 - cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) + cv2.rectangle(frame, (x1, y1), (x2, y2), color, thickness) # 标签(背景 + 文字) label = f"{cls_name} {conf:.2f}" @@ -353,6 +377,46 @@ def _draw_detections(self, frame, detections): if p_count > 0: self.ped_alert_timer = time.time() + 1.5 # 报警持续 1.5 秒 + # 虚拟围栏:上升沿计数(新一波入侵才累加,避免持续存在的目标重复计数) + self.intrusion_objects = intrusion_count + if intrusion_count > 0 and not self.prev_intrusion: + self.intrusion_event_count += 1 + if intrusion_count > 0: + self.intrusion_alert_timer = time.time() + 2.0 + self.prev_intrusion = intrusion_count > 0 + + return frame + + def _draw_virtual_fence(self, frame): + """绘制虚拟围栏线:虚线效果 + 区域标注,入侵时变橙红色闪烁""" + fy = self.fence_y + # 入侵时围栏线变橙红色并闪烁,否则显示金黄色 + if time.time() < self.intrusion_alert_timer: + color = (0, 60, 255) if int(time.time() * 4) % 2 == 0 else (0, 140, 255) + else: + color = FENCE_COLOR + + # 虚线效果:18px 实线 + 10px 间隔 + dash, gap = 18, 10 + x = 0 + while x < CAMERA_WIDTH: + cv2.line(frame, (x, fy), (min(x + dash, CAMERA_WIDTH), fy), color, 2) + x += dash + gap + + # 围栏标签(左侧带深色背景) + label = "[ VIRTUAL FENCE ]" + (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.45, 1) + cv2.rectangle(frame, (8, fy - th - 6), (8 + tw + 6, fy + 4), (20, 20, 20), -1) + cv2.putText(frame, label, (11, fy - 2), + cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 1) + + # 右侧区域标注 + cv2.putText(frame, "SAFE ZONE", + (CAMERA_WIDTH - 120, fy - 10), + cv2.FONT_HERSHEY_SIMPLEX, 0.4, (80, 220, 80), 1) + cv2.putText(frame, "RESTRICTED ZONE", + (CAMERA_WIDTH - 178, fy + 20), + cv2.FONT_HERSHEY_SIMPLEX, 0.4, (80, 100, 255), 1) return frame def _draw_v2x_panel(self, frame): @@ -382,6 +446,10 @@ def _draw_v2x_panel(self, frame): y += 26 cv2.putText(frame, f"Total: {self.stats['total']}", (x0 + 15, y), cv2.FONT_HERSHEY_SIMPLEX, 0.55, (255, 255, 255), 1) + y += 26 + i_color = (0, 100, 255) if self.intrusion_objects > 0 else (160, 160, 160) + cv2.putText(frame, f"Intrusions: {self.intrusion_event_count} events", + (x0 + 15, y), cv2.FONT_HERSHEY_SIMPLEX, 0.55, i_color, 1) # 摄像头机位 y += 30 @@ -410,6 +478,8 @@ def _draw_v2x_panel(self, frame): def _get_v2x_messages(self): """根据检测结果和天气生成 V2X 预警广播""" msgs = [] + if self.intrusion_objects > 0: + msgs.append(f"> FENCE BREACH: {self.intrusion_objects} target(s)") if self.stats['pedestrians'] > 0: msgs.append(f"> {self.stats['pedestrians']} pedestrian(s) ahead") if self.stats['vehicles'] > 5: @@ -476,6 +546,24 @@ def _draw_pedestrian_alert(self, frame): cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2) return frame + def _draw_intrusion_alert(self, frame): + """虚拟围栏入侵时绘制橙红色边框与文字警报(与行人红色警报视觉区分)""" + if time.time() < self.intrusion_alert_timer: + thick = 6 if int(time.time() * 4) % 2 == 0 else 3 + cv2.rectangle(frame, (2, 2), (CAMERA_WIDTH - 3, CAMERA_HEIGHT - 3), + (0, 80, 255), thick) + text = f"! FENCE INTRUSION Total: {self.intrusion_event_count} !" + (tw, th), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2) + tx = (CAMERA_WIDTH - tw) // 2 + ty = min(self.fence_y + 45, CAMERA_HEIGHT - 40) + overlay = frame.copy() + cv2.rectangle(overlay, (tx - 12, ty - th - 6), + (tx + tw + 12, ty + 8), (0, 40, 160), -1) + cv2.addWeighted(overlay, 0.65, frame, 0.35, 0, frame) + cv2.putText(frame, text, (tx, ty), + cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 200, 100), 2) + return frame + # ==================== 控制操作 ==================== def switch_weather(self): @@ -528,7 +616,9 @@ def run(self): # 检测 + 绘制 detections = self._detect(frame) frame = self._draw_detections(frame, detections) + frame = self._draw_virtual_fence(frame) frame = self._draw_v2x_panel(frame) + frame = self._draw_intrusion_alert(frame) frame = self._draw_pedestrian_alert(frame) frame = self._draw_header(frame) frame = self._draw_footer(frame) From e47db8b203d1e2aebc3db738286024b46af83a9c Mon Sep 17 00:00:00 2001 From: Calvin1989 <1400235935@qq.com> Date: Tue, 5 May 2026 22:14:16 +0800 Subject: [PATCH 2/2] feat(chap04): add FNN ablation study on MNIST Compare 5 configurations (Baseline / +Dropout / +BN / +BN+Dropout / Enhanced) to quantify the contribution of each improvement component. Key results: - Baseline (784->128->10): 98.04% test accuracy - Enhanced (deeper+BN+Dropout+LR schedule): 98.46% test accuracy (+0.43%) - Generates 4 comparison plots: train/test accuracy curves, loss curve, bar chart --- .gitignore | 1 + .../fnn_ablation_study.py | 237 ++++++++++++++++++ 2 files changed, 238 insertions(+) create mode 100644 src/chap04_simple_neural_network/fnn_ablation_study.py diff --git a/.gitignore b/.gitignore index 44433ce788..3808f4d6f3 100644 --- a/.gitignore +++ b/.gitignore @@ -78,6 +78,7 @@ data/ dataset/ datasets/ +mnist_data/ # ======================================== # 日志与缓存 diff --git a/src/chap04_simple_neural_network/fnn_ablation_study.py b/src/chap04_simple_neural_network/fnn_ablation_study.py new file mode 100644 index 0000000000..b74da77583 --- /dev/null +++ b/src/chap04_simple_neural_network/fnn_ablation_study.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python +# coding: utf-8 +""" +FNN 消融实验 —— MNIST 手写数字识别(PyTorch 实现) +==================================================== +对比以下 5 种模型配置,验证每个改进组件的贡献: + + Config-A Baseline : 784 -> 128 -> 10 (复现原版逻辑,改为 mini-batch) + Config-B +Dropout : Baseline + Dropout(0.3) + Config-C +BN : Baseline + BatchNormalization + Config-D +BN+Dropout : Baseline + BN + Dropout(0.3) + Config-E Enhanced : 784 -> 256 -> 128 -> 64 -> 10 + BN + Dropout + LR调度 + +输出(保存在脚本同目录): + fnn_ablation_train_acc.png 训练集准确率曲线 + fnn_ablation_test_acc.png 测试集准确率曲线 + fnn_ablation_loss.png 训练 Loss 曲线 + fnn_ablation_bar.png 最终测试准确率柱状图 +""" + +import os +import numpy as np +import matplotlib +matplotlib.use("Agg") +import matplotlib.pyplot as plt +import torch +import torch.nn as nn +import torch.optim as optim +from torchvision import datasets, transforms +from torch.utils.data import DataLoader + +EPOCHS = 20 +BATCH_SIZE = 256 +LR = 1e-3 +SEED = 42 +SAVE_DIR = os.path.dirname(os.path.abspath(__file__)) + +torch.manual_seed(SEED) +np.random.seed(SEED) + +DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") +print(f"使用设备: {DEVICE}") + + +def load_mnist(): + transform = transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ]) + data_root = os.path.join(SAVE_DIR, "mnist_data") + train_set = datasets.MNIST(data_root, train=True, download=True, transform=transform) + test_set = datasets.MNIST(data_root, train=False, download=True, transform=transform) + train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True, num_workers=0) + test_loader = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False, num_workers=0) + return train_loader, test_loader + + +def build_model(config): + if config == "A": + model = nn.Sequential( + nn.Flatten(), + nn.Linear(784, 128), nn.ReLU(), + nn.Linear(128, 10), + ) + elif config == "B": + model = nn.Sequential( + nn.Flatten(), + nn.Linear(784, 128), nn.ReLU(), nn.Dropout(0.3), + nn.Linear(128, 10), + ) + elif config == "C": + model = nn.Sequential( + nn.Flatten(), + nn.Linear(784, 128, bias=False), nn.BatchNorm1d(128), nn.ReLU(), + nn.Linear(128, 10), + ) + elif config == "D": + model = nn.Sequential( + nn.Flatten(), + nn.Linear(784, 128, bias=False), nn.BatchNorm1d(128), nn.ReLU(), nn.Dropout(0.3), + nn.Linear(128, 10), + ) + elif config == "E": + layers_list = [nn.Flatten()] + dims = [784, 256, 128, 64] + for i in range(len(dims) - 1): + lin = nn.Linear(dims[i], dims[i+1], bias=False) + nn.init.kaiming_normal_(lin.weight, nonlinearity="relu") + layers_list += [lin, nn.BatchNorm1d(dims[i+1]), nn.ReLU(), nn.Dropout(0.3)] + layers_list.append(nn.Linear(64, 10)) + model = nn.Sequential(*layers_list) + else: + raise ValueError(f"Unknown config: {config}") + return model.to(DEVICE) + + +def train_model(config, train_loader, test_loader): + model = build_model(config) + optimizer = optim.Adam(model.parameters(), lr=LR) + criterion = nn.CrossEntropyLoss() + scheduler = None + if config == "E": + scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="max", factor=0.5, patience=3) + + total_params = sum(p.numel() for p in model.parameters() if p.requires_grad) + print(f"\n{'='*55}") + print(f" 训练 Config-{config} | 参数量: {total_params:,}") + print(f"{'='*55}") + + history = {"train_loss": [], "train_acc": [], "test_loss": [], "test_acc": []} + + for epoch in range(EPOCHS): + model.train() + total_loss, correct, total = 0.0, 0, 0 + for x_batch, y_batch in train_loader: + x_batch, y_batch = x_batch.to(DEVICE), y_batch.to(DEVICE) + optimizer.zero_grad() + logits = model(x_batch) + loss = criterion(logits, y_batch) + loss.backward() + optimizer.step() + total_loss += loss.item() * y_batch.size(0) + correct += (logits.argmax(1) == y_batch).sum().item() + total += y_batch.size(0) + tr_loss = total_loss / total + tr_acc = correct / total + + model.eval() + t_loss, t_correct, t_total = 0.0, 0, 0 + with torch.no_grad(): + for x_batch, y_batch in test_loader: + x_batch, y_batch = x_batch.to(DEVICE), y_batch.to(DEVICE) + logits = model(x_batch) + t_loss += criterion(logits, y_batch).item() * y_batch.size(0) + t_correct += (logits.argmax(1) == y_batch).sum().item() + t_total += y_batch.size(0) + te_loss = t_loss / t_total + te_acc = t_correct / t_total + + history["train_loss"].append(tr_loss) + history["train_acc"].append(tr_acc) + history["test_loss"].append(te_loss) + history["test_acc"].append(te_acc) + print(f" Epoch {epoch+1:>2}/{EPOCHS} | train_loss={tr_loss:.4f} train_acc={tr_acc:.4f} | test_loss={te_loss:.4f} test_acc={te_acc:.4f}") + if scheduler is not None: + scheduler.step(te_acc) + + return history + + +CONFIGS = ["A", "B", "C", "D", "E"] +LABELS = { + "A": "A: Baseline (784->128->10)", + "B": "B: +Dropout(0.3)", + "C": "C: +BatchNorm", + "D": "D: +BN+Dropout", + "E": "E: Enhanced (deeper+BN+Dropout+LR)", +} +COLORS = ["#e74c3c", "#f39c12", "#2ecc71", "#3498db", "#9b59b6"] +MARKERS = ["o", "s", "^", "D", "P"] + + +def plot_curves(all_history): + epochs = list(range(1, EPOCHS + 1)) + + for metric, title, ylabel, ylim, fname in [ + ("train_acc", "FNN Ablation Study -- Training Accuracy", "Accuracy", (0.85, 1.01), "fnn_ablation_train_acc.png"), + ("test_acc", "FNN Ablation Study -- Test Accuracy", "Accuracy", (0.93, 1.005),"fnn_ablation_test_acc.png"), + ("train_loss","FNN Ablation Study -- Training Loss", "Cross-Entropy Loss", None, "fnn_ablation_loss.png"), + ]: + fig, ax = plt.subplots(figsize=(9, 5)) + for i, cfg in enumerate(CONFIGS): + ax.plot(epochs, all_history[cfg][metric], + label=LABELS[cfg], color=COLORS[i], + marker=MARKERS[i], markevery=4, linewidth=1.8) + ax.set_xlabel("Epoch", fontsize=12) + ax.set_ylabel(ylabel, fontsize=12) + ax.set_title(title, fontsize=14) + loc = "lower right" if "acc" in metric else "upper right" + ax.legend(fontsize=8, loc=loc) + if ylim: + ax.set_ylim(*ylim) + ax.grid(True, alpha=0.3) + fig.tight_layout() + p = os.path.join(SAVE_DIR, fname) + fig.savefig(p, dpi=150) + plt.close(fig) + print(f"[保存] {p}") + + best_accs = [max(all_history[cfg]["test_acc"]) for cfg in CONFIGS] + short_labels = ["A\nBaseline", "B\n+Dropout", "C\n+BN", "D\n+BN\n+Dropout", "E\nEnhanced"] + fig, ax = plt.subplots(figsize=(9, 5)) + bars = ax.bar(short_labels, best_accs, color=COLORS, width=0.5, edgecolor="white") + for bar, acc in zip(bars, best_accs): + ax.text(bar.get_x() + bar.get_width()/2, bar.get_height()+0.0003, + f"{acc:.4f}", ha="center", va="bottom", fontsize=10, fontweight="bold") + ax.set_ylabel("Best Test Accuracy", fontsize=12) + ax.set_title("FNN Ablation Study -- Best Test Accuracy per Config", fontsize=14) + ax.set_ylim(min(best_accs)-0.005, 1.005) + ax.grid(True, axis="y", alpha=0.3) + fig.tight_layout() + p = os.path.join(SAVE_DIR, "fnn_ablation_bar.png") + fig.savefig(p, dpi=150) + plt.close(fig) + print(f"[保存] {p}") + + +def print_summary(all_history): + baseline = max(all_history["A"]["test_acc"]) + descs = { + "A": "Baseline (原版逻辑 + mini-batch)", + "B": "Baseline + Dropout(0.3)", + "C": "Baseline + BatchNorm", + "D": "Baseline + BN + Dropout", + "E": "Enhanced (深层+BN+Dropout+LR调度)", + } + print("\n" + "="*68) + print(f" {'配置':<6} {'描述':<30} {'最佳测试准确率':>12} {'相对提升':>8}") + print("-"*68) + for cfg in CONFIGS: + best = max(all_history[cfg]["test_acc"]) + delta = (best - baseline) * 100 + sign = "+" if delta >= 0 else "" + print(f" {cfg:<6} {descs[cfg]:<30} {best:>12.4f} {sign+f'{delta:.2f}%':>8}") + print("="*68) + + +if __name__ == "__main__": + print("加载 MNIST 数据集 ...") + train_loader, test_loader = load_mnist() + all_history = {} + for cfg in CONFIGS: + all_history[cfg] = train_model(cfg, train_loader, test_loader) + print("\n绘制对比图 ...") + plot_curves(all_history) + print_summary(all_history) + print("\n完成!4 张图片已保存在脚本同目录下。")