-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy path1.py
More file actions
142 lines (122 loc) · 5.35 KB
/
1.py
File metadata and controls
142 lines (122 loc) · 5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import tkinter as tk
from tkinter import ttk, scrolledtext
import serial
import serial.tools.list_ports
import threading
import time
class FPGA_CPU_Monitor:
def __init__(self, root):
self.root = root
self.root.title("FPGA RISC-V 软核在线监控上位机")
self.root.geometry("750x550")
self.ser = None
self.is_running = False
self.online_flag = False
self.last_heart_time = time.time()
# 顶部控制
frame_top = ttk.Frame(root, padding=5)
frame_top.pack(fill=tk.X)
ttk.Label(frame_top, text="串口:").grid(row=0, column=0)
self.cb_port = ttk.Combobox(frame_top, width=12)
self.cb_port.grid(row=0, column=1, padx=5)
ttk.Label(frame_top, text="波特率:").grid(row=0, column=2)
self.cb_baud = ttk.Combobox(frame_top, width=12)
self.cb_baud["values"] = ["9600", "115200"]
self.cb_baud.set("115200")
self.cb_baud.grid(row=0, column=3, padx=5)
self.btn_refresh = ttk.Button(frame_top, text="刷新端口", command=self.refresh_port)
self.btn_refresh.grid(row=0, column=4, padx=5)
self.btn_open = ttk.Button(frame_top, text="打开串口", command=self.toggle_serial)
self.btn_open.grid(row=0, column=5, padx=5)
self.btn_clear = ttk.Button(frame_top, text="清空接收区", command=self.clear_recv)
self.btn_clear.grid(row=0, column=6, padx=5)
# CPU在线状态
frame_state = ttk.LabelFrame(root, text="软核在线状态", padding=5)
frame_state.pack(fill=tk.X, padx=5, pady=5)
self.state_label = ttk.Label(frame_state, text="未连接 —— 离线", foreground="red", font=("SimHei",11))
self.state_label.pack(anchor=tk.W)
# 接收区
frame_recv = ttk.LabelFrame(root, text="串口接收日志", padding=5)
frame_recv.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.recv_text = scrolledtext.ScrolledText(frame_recv, wrap=tk.WORD)
self.recv_text.pack(fill=tk.BOTH, expand=True)
# 发送区
frame_send = ttk.LabelFrame(root, text="发送算式/指令到软核", padding=5)
frame_send.pack(fill=tk.X, padx=5, pady=5)
self.send_entry = ttk.Entry(frame_send, width=50)
self.send_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)
self.send_entry.bind("<Return>", self.send_data)
self.btn_send = ttk.Button(frame_send, text="发送", command=self.send_data)
self.btn_send.pack(side=tk.LEFT)
self.refresh_port()
self.check_heartbeat()
def refresh_port(self):
ports = serial.tools.list_ports.comports()
plist = [p.device for p in ports]
self.cb_port["values"] = plist
if plist:
self.cb_port.set(plist[0])
def toggle_serial(self):
if self.ser is None or not self.ser.is_open:
try:
port = self.cb_port.get()
baud = int(self.cb_baud.get())
self.ser = serial.Serial(port, baud, timeout=0)
self.is_running = True
self.btn_open.config(text="关闭串口")
self.state_label.config(text="已连接 —— 等待心跳", foreground="orange")
threading.Thread(target=self.recv_loop, daemon=True).start()
except Exception as e:
self.recv_text.insert(tk.END, f"打开失败:{e}\n")
self.recv_text.see(tk.END)
else:
self.is_running = False
self.ser.close()
self.btn_open.config(text="打开串口")
self.state_label.config(text="已断开 —— 离线", foreground="red")
def recv_loop(self):
buf = ""
while self.is_running:
d = self.ser.read(1)
if d:
try:
ch = d.decode("ascii")
except:
ch = ""
buf += ch
# 检测心跳包
if "#ONLINE#" in buf:
self.last_heart_time = time.time()
self.root.after(0, self.set_online)
buf = ""
self.root.after(0, self.append_recv, ch)
def check_heartbeat(self):
# 定时检测是否超时离线
if self.is_running:
if time.time() - self.last_heart_time > 1.5:
self.set_offline()
self.root.after(200, self.check_heartbeat)
def set_online(self):
self.state_label.config(text="软核在线 ✅", foreground="green")
def set_offline(self):
self.state_label.config(text="软核离线 ❌", foreground="red")
def append_recv(self, txt):
self.recv_text.insert(tk.END, txt)
self.recv_text.see(tk.END)
def send_data(self, event=None):
if self.ser and self.ser.is_open:
s = self.send_entry.get().strip()
if s:
self.ser.write((s+"\r\n").encode("ascii"))
self.recv_text.insert(tk.END, f"【发送】{s}\n")
self.recv_text.see(tk.END)
self.send_entry.delete(0, tk.END)
else:
self.recv_text.insert(tk.END, "请先打开串口\n")
self.recv_text.see(tk.END)
def clear_recv(self):
self.recv_text.delete(1.0, tk.END)
if __name__ == "__main__":
root = tk.Tk()
app = FPGA_CPU_Monitor(root)
root.mainloop()