diff --git a/.gitignore b/.gitignore index 879a9ef..71fbd6a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,4 @@ __pycache__ config.json -log.txt TouchFishFiles -log.ndjson \ No newline at end of file +log.ndjson diff --git a/LTS.py b/LTS.py index f0fe29d..972b0f9 100644 --- a/LTS.py +++ b/LTS.py @@ -153,22 +153,22 @@ ## 2.2 Receive -`{ type: "CHAT.RECEIVE", from: number, order: number, filename: string, content: string, to: number | -1 | -2 }` +`{ type: "CHAT.RECEIVE", from: number, order: signed number, filename: string, content: string, to: number | -1 | -2 }` 服务端将消息转发给目标客户端。 - `type`: `"CHAT.RECEIVE"` - `from`: 发送者的用户 ID。(下同) -- `order`: 文件编号(用户区分不同的文件发送请求),可能取值包括:(下同) - - `0`:普通文本消息; - - 正整数:文件编号。 +- `order`: 消息编号,可能取值包括:(下同) + - 正整数:普通文本消息; + - 负整数:文件编号。 - `filename`: 同上。 - `content`: 同上。 - `to`: 同上。 ## 2.3 Log -`{ type: "CHAT.LOG", time: time, from: number, order: number, filename: string, content: string, to: number | -1 | -2 }` +`{ type: "CHAT.LOG", time: time, from: number, order: signed number, filename: string, content: string, to: number | -1 | -2 }` 服务端将收到的聊天记录写入日志。 @@ -212,6 +212,7 @@ - `status`: 同上。 - `chat_history`: 历史聊天记录,每条记录包含:(不包含私聊记录和文件发送记录) - `time`: 同上; + - `order`:同上; - `from`: 同上; - `content`: 同上; - `to`: 同上。 @@ -306,7 +307,7 @@ import time # 程序版本 -VERSION = "v4.3.3-rc1" +VERSION = "v4.5.2" # 用于客户端解析协议 1.2 RESULTS = \ @@ -347,7 +348,9 @@ - help 指令显示的帮助消息中的其余段落 - 程序关闭时的「再见!」文本 -(注:洋红色 (magenta) 目前没有使用过) +特别说明: +- shell 指令的输出文本颜色为系统默认颜色 +- 洋红色 (magenta) 目前没有使用过 """ COLORS = \ { @@ -368,7 +371,10 @@ port 服务端端口 username 连接时使用的用户名 """ -DEFAULT_CLIENT_CONFIG = {"side": "Client", "ip": "127.0.0.1", "port": 8080, "username": "user"} +# 需要指出的是,第五部分中会给 username 字段 +# 的默认值后面加上一个随机六位数作为后缀, +# 形成形如 "user123456" 的用户名 +DEFAULT_CLIENT_CONFIG = {"side": "Client", "ip": "touchfish.xin", "port": 7001, "username": "user"} # 默认服务端配置(side 和 general.* 必须在启动时指定): """ @@ -425,7 +431,7 @@ # 客户端配置中的期望数据类型如下,此处没有单独编写代码: """ side 必须为 "Client" -ip 必须为合法 IPv4 +ip 不能为空串 port 必须在 [1, 65535] 中取值 username 不能为空串 """ @@ -458,7 +464,35 @@ """[1:-1] # 指令列表 -COMMAND_LIST = ['admin', 'ban', 'broadcast', 'config', 'dashboard', 'distribute', 'doorman', 'exit', 'help', 'kick', 'save', 'send', 'transfer', 'whisper'] +COMMAND_LIST = ['admin', 'ban', 'broadcast', 'config', 'dashboard', 'distribute', 'doorman', 'evaluate', 'exit', 'flood', 'help', 'kick', 'save', 'send', 'shell', 'transfer', 'whisper'] + +# 缩写表 +ABBREVIATION_TABLE = \ +{ + "D": "dashboard", "F": "distribute", "Q": "evaluate", "E": "exit", "L": "flood", + "H": "help", "S": "send", "J": "shell", "T": "transfer", "P": "whisper", + "I+": "ban ip add", "I-": "ban ip remove", "W+": "ban words add", "W-": "ban words remove", + "B": "broadcast", "C": "config", "G+": "doorman accept", "G-": "doorman reject", "K": "kick", + "A+": "admin add", "A-": "admin remove", "V": "save", + "d": "dashboard", "f": "distribute", "q": "evaluate", "e": "exit", "l": "flood", + "h": "help", "s": "send", "j": "shell", "t": "transfer", "p": "whisper", + "i+": "ban ip add", "i-": "ban ip remove", "w+": "ban words add", "w-": "ban words remove", + "b": "broadcast", "c": "config", "g+": "doorman accept", "g-": "doorman reject", "k": "kick", + "a+": "admin add", "a-": "admin remove", "v": "save" +} + +# flood 指令开启的简易命令行模式的进入提示 +SIMPLE_COMMAND_LINE_HINT_CONTENT = \ +""" +您已经进入简易命令行模式。 +在简易命令行模式中,您只需要执行以下三个步骤,即可完成单行公开消息的发送: + 1. 按下 Enter 进入输入模式 + 2. 直接输入想要发送的单行消息(不需要显式执行 send 指令) + 3. 再按下 Enter 返回输出模式 +本模式下发送结果不会进行显式反馈,而是根据下面的特性间接反馈: +发送成功的消息能够在输出模式中看到(带有响铃),发送失败的消息则不会。 +在任何模式下按下 Ctrl + {} 以退出简易命令行模式。 +"""[1:-1] # help 指令显示的帮助消息(分为 8 段) HELP_HINT_CONTENT = \ @@ -492,38 +526,49 @@ """[1:-1], """ -聊天室内可用的指令分为以下 14 条 22 项: +聊天室内可用的指令分为以下 17 条 25 项: """[1:-1], """ - dashboard 展示聊天室各项数据 - distribute 发送文件 - exit 退出或关闭聊天室 - help 显示本帮助文本 - send 发送多行消息 - send 发送单行消息 - transfer 向某个用户发送私有文件 - whisper 向某个用户发送多行私聊消息 - whisper 向某个用户发送单行私聊消息 - * ban ip add 封禁 IP 或 IP 段 - * ban ip remove 解除封禁 IP 或 IP 段 - * ban words add 屏蔽某个词语 - * ban words remove 解除屏蔽某个词语 - * broadcast 向全体用户广播多行消息 - * broadcast 向全体用户广播单行消息 - * config 修改聊天室配置项 - * doorman accept 通过某个用户的加入申请 - * doorman reject 拒绝某个用户的加入申请 - * kick 踢出某个用户 - ** admin add 添加管理员 - ** admin remove 移除管理员 - ** save 保存聊天室配置项信息 + [D] dashboard 展示聊天室各项数据 + [F] distribute 发送文件 + [Q] evaluate 像 Python IDLE 那样计算输入数据 + [E] exit 退出或关闭聊天室 + [L] flood 开启简易命令行模式 + [H] help 显示本帮助文本 + [S] send 发送多行消息 + [S] send 发送单行消息 + [J] shell 执行 Shell 指令 + [T] transfer 向某个用户发送私有文件 + [P] whisper 向某个用户发送多行私聊消息 + [P] whisper 向某个用户发送单行私聊消息 + [I+] * ban ip add 封禁 IP 或 IP 段 + [I-] * ban ip remove 解除封禁 IP 或 IP 段 + [W+] * ban words add 屏蔽某个词语 + [W-] * ban words remove 解除屏蔽某个词语 + [B] * broadcast 向全体用户广播多行消息 + [B] * broadcast 向全体用户广播单行消息 + [C] * config 修改聊天室配置项 + [G+] * doorman accept 通过某个用户的加入申请 + [G-] * doorman reject 拒绝某个用户的加入申请 + [K] * kick 踢出某个用户 + [A+] ** admin add 添加管理员 + [A-] ** admin remove 移除管理员 + [V] ** save 保存聊天室配置项信息 """, """ +缩略表示形式不区分大小写,其他字段区分大小写。 +支持用左边方括号内的内容缩略表示右边所有没有用尖括号括起来的字段。 +所有 字段可以输入 UID 或用户名均可,优先解析为 UID。 +解析用户名遇到冲突时采纳 UID 最小的合法解析结果。 +简易命令行模式允许您直接输入并发送单行消息而省略 send,但会禁用其他指令。 标注 * 的指令只有状态为 Admin 或 Root 的用户可以使用。 标注 ** 的指令只有状态为 Root 的用户可以使用。 对于 dashboard 指令,状态为 Root 的用户可以看到所有用户的 IP 地址,其他用户不能。 +对于 evaluate 指令,该指令直接使用 eval() 函数实现,其中二进制发行版的 Python 版本为 3.6。 +对于 evaluate 指令,请不要注入恶意代码(典型的有 globals(), locals() 等),否则后果自负。 +对于 shell 指令,请不要试图执行危害本程序(或您的设备)的指令(此处从略),否则后果自负。 对于 ban ip 指令,支持输入形如 a.b.c.d/e 的 IP 段,但前缀长度 (e 值) 不得小于 24。 对于 config 指令, 的格式以 dashboard 指令输出的参数名称为准。 对于 config 指令, 的格式以 dashboard 指令输出的修改示例为准。 @@ -533,7 +578,7 @@ """[1:-1], """ -你可以在 TouchFish 的官方 Github 仓库页面获取更多联机帮助: +您可以在 TouchFish 的官方 Github 仓库页面获取更多联机帮助: https://github.com/2044-space-elevator/TouchFish """[1:-1] ] @@ -591,13 +636,15 @@ 参见 HELP_HINT 第 3 段,下同) buffer my_socket 读取时模拟的缓冲区 (发送的数据都是 NDJSON,因此遇到换行符则清空) -EXIT_FLAG 默认为 False,程序终止改为 True,通知所有线程终止 +exit_flag 默认为 False,程序终止改为 True,通知所有线程终止 print_queue 用于输入模式下记录被阻塞的输出内容(每行一条), 切换到输出模式后一并输出 以下是服务端启用而客户端不启用的变量: file_order 目前服务端已经传送的文件个数, - 用于从 1 开始分配文件 ID,区分文件 + 用于从 -1 开始分配文件 ID (-1, -2, -3, ...),区分文件 +message_order 目前服务端已经传送的消息个数, + 用于从 1 开始分配消息 ID (1, 2, 3, ...),区分文件 server_socket 服务端向客户端暴露用于连接的 TCP socket history 用于记录聊天上下文,在新客户端建立连接时 通过协议 3.2 发送给客户端 @@ -628,6 +675,7 @@ my_username = "user" my_uid = 0 file_order = 0 +message_order = 0 my_socket = None users = [] server_socket = socket.socket() @@ -636,7 +684,7 @@ history = [] online_count = 1 buffer = "" -EXIT_FLAG = False +exit_flag = False log_queue = queue.Queue() receive_queue = queue.Queue() send_queue = queue.Queue() @@ -704,8 +752,9 @@ def prints(text, color_code=None): else: print_queue.put(dye(text, color_code)) -# 不受 blocked 变量控制的强制文本输出(只用于 -# dashboard 指令和 help 命令输出信息) +# 不受 blocked 变量控制的强制文本输出: +# 只用于 dashboard 指令、flood 指令(部分) +# 和 help 指令输出信息 def printf(text, color_code=None): print(dye(text, color_code)) @@ -718,6 +767,20 @@ def printc(verbose, text): if verbose: print(dye(text, "black")) +# 解析用户名 +def parse_username(arg, expected_status): + try: + uid = int(arg.split()[0]) + if uid >= 0 and uid < len(users) and users[uid]['status'] in expected_status: + return arg + raise + except: + for i in range(len(users)): + if users[i]['status'] in expected_status: + if arg.startswith(users[i]['username'] + " ") or arg == users[i]['username']: + return str(i) + arg[len(users[i]['username']):] + return "" + # 检查 element 是不是合法 IP def check_ip(element): pattern = r'^(\d+)\.(\d+)\.(\d+)\.(\d+)$' # int.int.int.int @@ -825,7 +888,7 @@ def print_message(message): def process(message): global users global online_count - global EXIT_FLAG + global exit_flag ring() # 响铃 if message['type'] == "CHAT.RECEIVE": # 收到消息 (协议 2.2) message['time'] = time_str() @@ -859,7 +922,7 @@ def process(message): # 来清除 ANSI 文本序列带来的显示效果, # 防止干扰用户后续的终端使用 prints("\033[0m\033[1;36m再见!\033[0m") - EXIT_FLAG = True + exit_flag = True return if message['type'] == "SERVER.CONFIG.CHANGE": # 服务端参数变更 (协议 3.4.2) announce(message['operator']) @@ -870,14 +933,14 @@ def process(message): deletions = [item for item in config[message['key'].split('.')[0]][message['key'].split('.')[1]] if not item in message['value']] prints("该配置项相比修改前增加了:{}".format(str(additions)), "cyan") prints("该配置项相比修改前移除了:{}".format(str(deletions)), "cyan") - config[message['key'].split('.')[0]][message['key'].split('.')[1]] = message['value'] + config[message['key'].split('.')[0]][message['key'].split('.')[1]] = message['value'] return if message['type'] == "SERVER.STOP.ANNOUNCE": # 服务端关闭 (协议 3.3.1) if side == "Client": # 同上 announce(0) prints("聊天室服务端已经关闭。", "cyan") prints("\033[0m\033[1;36m再见!\033[0m") - EXIT_FLAG = True + exit_flag = True return # 从 my_socket 读取数据,每次 128 KiB,读完为止 @@ -921,7 +984,7 @@ def get_message(): # 对于用户直接调用的指令,参数传递规则如下(某些指令只出现部分参数): """ -arg 指令参数:紧跟命令后的全部文本, +arg 指令参数:紧跟指令后的全部文本, 如输入 "admin add 1" 则传入 "add 1" message 消息:固定为 None(缺省值) verbose 是否为直接调用的指令:固定为 True(缺省值) @@ -947,7 +1010,7 @@ def get_message(): # 而不是在客户端判定指令执行成功并向服务端发送请求时就修改; # 因此服务端广播任何消息时都不应该将请求发送者排除在广播对象之外 -# 对于完全不需要参数的命令 (dashboard, exit, help), +# 对于完全不需要参数的指令 (dashboard, exit, help), # 服务端不会重新调用函数(因为根本没有请求), # 参数中只有一个 arg (缺省为 None,函数中不会调用), # 用于在第四部分的 thread_input 线程中统一调用接口 @@ -966,26 +1029,20 @@ def do_doorman(arg, verbose=True, by=-1): if len(arg) != 2: printc(verbose, "参数错误:应当给出恰好 2 个参数。") return + arg[1] = parse_username(arg[1], ["Pending"]) try: arg[1] = int(arg[1]) except: - printc(verbose, "参数错误:UID 必须是整数。") + printc(verbose, "参数错误:用户解析失败,只能对状态为 Pending 的用户操作。") return if not arg[0] in ['accept', 'reject']: printc(verbose, "参数错误:第一个参数必须是 accept 和 reject 中的某一项。") return - if arg[1] <= -1 or arg[1] >= len(users): - printc(verbose, "UID 输入错误。") - return - if users[arg[1]]['status'] != "Pending": - printc(verbose, "只能对状态为 Pending 的用户操作。") - if users[arg[1]]['status'] in ["Online", "Admin", "Root"] and arg[0] == "reject": - printc(verbose, "您似乎想要踢出该用户,请使用以下指令:kick {}".format(arg)) - return if arg[0] == "accept": if side == "Server": - send_queue.put(json.dumps({'to': arg[1], 'content': {'type': 'GATE.REVIEW_RESULT', 'accepted': True, 'operator': {'username': users[by]['username'], 'uid': by}}})) # 协议 1.3 + # 重要:最后再给该用户发送信息,防止出现 + # 该用户已经断开连接而状态没有更新的情况 log_queue.put(json.dumps({'type': 'GATE.STATUS_CHANGE.LOG', 'time': time_str(), 'status': 'Online', 'uid': arg[1], 'operator': by})) # 协议 1.6.3 for i in range(len(users)): if users[i]['status'] in ["Online", "Admin", "Root"]: @@ -995,6 +1052,7 @@ def do_doorman(arg, verbose=True, by=-1): users_abstract = [] for i in range(len(users)): users_abstract.append({"username": users[i]['username'], "status": users[i]['status']}) + send_queue.put(json.dumps({'to': arg[1], 'content': {'type': 'GATE.REVIEW_RESULT', 'accepted': True, 'operator': {'username': users[by]['username'], 'uid': by}}})) # 协议 1.3 send_queue.put(json.dumps({'to': arg[1], 'content': {'type': 'SERVER.DATA', 'server_version': VERSION, 'uid': arg[1], 'config': config, 'users': users_abstract, 'chat_history': history}})) # 协议 3.2 if side == "Client": my_socket.send(bytes(json.dumps({'type': 'GATE.STATUS_CHANGE.REQUEST', 'status': 'Online', 'uid': arg[1]}) + "\n", encoding="utf-8")) # 协议 1.6.1 @@ -1028,21 +1086,14 @@ def do_kick(arg, verbose=True, by=-1): if not arg: printc(verbose, "参数错误:应当给出恰好 1 个参数。") return + arg = parse_username(arg, ["Online", "Admin"]) try: arg = int(arg) except: - printc(verbose, "参数错误:UID 必须是整数。") - return - if arg <= -1 or arg >= len(users): - printc(verbose, "UID 输入错误。") - return - if not users[arg]['status'] in ["Online", "Admin"]: - printc(verbose, "只能对状态为 Online 或 Admin 的用户操作。") - if users[arg]['status'] == "Pending": - printc(verbose, "您似乎想要拒绝该用户的加入申请,请使用以下指令:doorman reject {}".format(arg)) + printc(verbose, "参数错误:用户解析失败,只能对状态为 Online 或 Admin 的用户操作。") return if users[by]['status'] == "Admin" and users[arg]['status'] == "Admin": - printc(verbose, "状态为 Admin 的用户只能对状态为 Online 的用户操作。") + printc(verbose, "参数错误:用户解析失败,状态为 Admin 的用户只能对状态为 Online 的用户操作。") return if side == "Server": @@ -1079,18 +1130,19 @@ def do_admin(arg, verbose=True, by=-1): if not arg[0] in ['add', 'remove']: printc(verbose, "参数错误:第一个参数必须是 add 或 remove。") return + arg[1] = parse_username(arg[1], ["Online", "Admin"]) try: arg[1] = int(arg[1]) except: - printc(verbose, "参数错误:UID 必须是整数。") - return - if arg[1] <= 0 or arg[1] >= len(users): - printc(verbose, "UID 输入错误。") + if arg[0] == 'add': + printc(verbose, "参数错误:用户解析失败,只能对状态为 Online 的用户操作。") + if arg[0] == 'remove': + printc(verbose, "参数错误:用户解析失败,只能对状态为 Admin 的用户操作。") return if arg[0] == 'add': if users[arg[1]]['status'] != "Online": - printc(verbose, "只能对状态为 Online 的用户操作。") + printc(verbose, "参数错误:用户解析失败,只能对状态为 Online 的用户操作。") return users[arg[1]]['status'] = "Admin" log_queue.put(json.dumps({'type': 'GATE.STATUS_CHANGE.LOG', 'time': time_str(), 'status': 'Admin', 'uid': arg[1], 'operator': by})) # 协议 1.6.3 @@ -1100,7 +1152,7 @@ def do_admin(arg, verbose=True, by=-1): if arg[0] == 'remove': if users[arg[1]]['status'] != "Admin": - printc(verbose, "只能对状态为 Admin 的用户操作。") + printc(verbose, "参数错误:用户解析失败,只能对状态为 Admin 的用户操作。") return users[arg[1]]['status'] = "Online" log_queue.put(json.dumps({'type': 'GATE.STATUS_CHANGE.LOG', 'time': time_str(), 'status': 'Online', 'uid': arg[1], 'operator': by})) # 协议 1.6.3 @@ -1137,7 +1189,7 @@ def do_config(arg, verbose=True, by=-1): printc(verbose, r' config gate.enter_hint "Hi there!\n"') if not input("\033[0m\033[1;30m确定要继续吗?[y/N] ") in ['y', 'Y']: return - print("\033[8;30m", end="") + print("\033[8;30m", end="", flush=True) if arg[0] == "ban.ip" or arg[0] == "ban.words": printc(verbose, "请注意,本参数修改时 需要带引号并转义。") printc(verbose, "例如,将 fuck 和 shit 设置为屏蔽词:") @@ -1145,7 +1197,7 @@ def do_config(arg, verbose=True, by=-1): printc(verbose, "该操作将【清空】原有的屏蔽词列表(或 IP 黑名单),请谨慎操作!") if not input("\033[0m\033[1;30m确定要继续吗?[y/N] ") in ['y', 'Y']: return - print("\033[8;30m", end="") + print("\033[8;30m", end="", flush=True) try: if not eval("isinstance({}, {})".format(arg[1], CONFIG_TYPE_CHECK_TABLE[arg[0]])): @@ -1263,7 +1315,7 @@ def do_ban(arg, verbose=True, by=-1): printc(verbose, "^", arg[2], "$", sep="") if not input("\033[0m\033[1;30m确定要继续吗?[y/N] ") in ['y', 'Y']: return - print("\033[8;30m", end="") + print("\033[8;30m", end="", flush=True) if arg[1] == 'add': if arg[2] in config['ban']['words']: @@ -1276,7 +1328,7 @@ def do_ban(arg, verbose=True, by=-1): if users[i]['status'] in ["Online", "Admin", "Root"]: send_queue.put(json.dumps({'to': i, 'content': {'type': 'SERVER.CONFIG.CHANGE', 'key': 'ban.words', 'value': config['ban']['words'], 'operator': by}})) # 协议 3.4.2 if side == "Client": - new_value = config['ban']['words'] + new_value = config['ban']['words'][:] new_value.append(arg[2]) my_socket.send(bytes(json.dumps({'type': 'SERVER.CONFIG.POST', 'key': 'ban.words', 'value': new_value}) + "\n", encoding="utf-8")) # 协议 3.4.1 printc(verbose, "操作成功。") @@ -1292,7 +1344,7 @@ def do_ban(arg, verbose=True, by=-1): if users[i]['status'] in ["Online", "Admin", "Root"]: send_queue.put(json.dumps({'to': i, 'content': {'type': 'SERVER.CONFIG.CHANGE', 'key': 'ban.words', 'value': config['ban']['words'], 'operator': by}})) # 协议 3.4.2 if side == "Client": - new_value = config['ban']['words'] + new_value = config['ban']['words'][:] new_value.remove(arg[2]) my_socket.send(bytes(json.dumps({'type': 'SERVER.CONFIG.POST', 'key': 'ban.words', 'value': new_value}) + "\n", encoding="utf-8")) # 协议 3.4.1 printc(verbose, "操作成功。") @@ -1302,6 +1354,7 @@ def do_broadcast(arg, message=None, verbose=True, by=-1): global log_queue global send_queue global my_socket + global message_order if by == -1: by = my_uid if not users[by]['status'] in ["Admin", "Root"]: @@ -1316,11 +1369,12 @@ def do_broadcast(arg, message=None, verbose=True, by=-1): message = enter() if side == "Server": - log_queue.put(json.dumps({'type': 'CHAT.LOG', 'time': time_str(), 'from': by, 'order': 0, 'filename': "", 'content': message, 'to': -2})) # 协议 2.3 - history.append({'time': time_str(), 'from': by, 'content': message, 'to': -2}) # 公开消息,记入 history 列表 + message_order += 1 # 给该消息分配一个新的编号,从 1 开始递增(下同) + log_queue.put(json.dumps({'type': 'CHAT.LOG', 'time': time_str(), 'from': by, 'order': message_order, 'filename': "", 'content': message, 'to': -2})) # 协议 2.3 + history.append({'time': time_str(), 'from': by, 'content': message, 'to': -2, 'order': message_order}) # 公开消息,记入 history 列表 for i in range(len(users)): if users[i]['status'] in ["Online", "Admin", "Root"]: - send_queue.put(json.dumps({'to': i, 'content': {'type': 'CHAT.RECEIVE', 'from': by, 'order': 0, 'filename': "", 'content': message, 'to': -2}})) # 协议 2.2 + send_queue.put(json.dumps({'to': i, 'content': {'type': 'CHAT.RECEIVE', 'from': by, 'order': message_order, 'filename': "", 'content': message, 'to': -2}})) # 协议 2.2 if side == "Client": my_socket.send(bytes(json.dumps({'type': 'CHAT.SEND', 'filename': "", 'content': message, 'to': -2}) + "\n", encoding="utf-8")) # 协议 2.1 @@ -1331,6 +1385,7 @@ def do_send(arg, message=None, verbose=True, by=-1): global log_queue global send_queue global my_socket + global message_order if by == -1: by = my_uid if message == None: # 同上,识别调用方法 @@ -1350,11 +1405,12 @@ def do_send(arg, message=None, verbose=True, by=-1): return if side == "Server": - log_queue.put(json.dumps({'type': 'CHAT.LOG', 'time': time_str(), 'from': by, 'order': 0, 'filename': "", 'content': message, 'to': -1})) # 协议 2.3 - history.append({'time': time_str(), 'from': by, 'content': message, 'to': -1}) # 公开消息,记入 history 列表 + message_order += 1 # 同上,给该消息分配一个新的编号 + log_queue.put(json.dumps({'type': 'CHAT.LOG', 'time': time_str(), 'from': by, 'order': message_order, 'filename': "", 'content': message, 'to': -1})) # 协议 2.3 + history.append({'time': time_str(), 'from': by, 'content': message, 'to': -1, 'order': message_order}) # 公开消息,记入 history 列表 for i in range(len(users)): if users[i]['status'] in ["Online", "Admin", "Root"]: - send_queue.put(json.dumps({'to': i, 'content': {'type': 'CHAT.RECEIVE', 'from': by, 'order': 0, 'filename': "", 'content': message, 'to': -1}})) # 协议 2.2 + send_queue.put(json.dumps({'to': i, 'content': {'type': 'CHAT.RECEIVE', 'from': by, 'order': message_order, 'filename': "", 'content': message, 'to': -1}})) # 协议 2.2 if side == "Client": my_socket.send(bytes(json.dumps({'type': 'CHAT.SEND', 'filename': "", 'content': message, 'to': -1}) + "\n", encoding="utf-8")) # 协议 2.1 @@ -1364,11 +1420,13 @@ def do_whisper(arg, message=None, verbose=True, by=-1): global log_queue global send_queue global my_socket + global message_order if by == -1: by = my_uid if not config['message']['allow_private']: printc(verbose, "此聊天室目前不允许发送私聊消息。") return + arg = parse_username(arg, ["Online", "Admin", "Root"]) # 分离接收方 UID 和(可能不存在的)单行消息 try: arg, message = arg.split(' ', 1) @@ -1376,13 +1434,8 @@ def do_whisper(arg, message=None, verbose=True, by=-1): pass try: arg = int(arg) - if arg <= -1 or arg >= len(users): - raise except: - printc(verbose, "UID 输入错误。") - return - if not users[arg]['status'] in ["Online", "Admin", "Root"]: - printc(verbose, "只能向状态处于 Online、Admin、Root 中的某一项的用户发送私聊消息。") + printc(verbose, "参数错误:用户解析失败,只能对状态处于 Online、Admin、Root 中的某一项的用户操作。") return if arg == by: printc(verbose, "不能向自己发送私聊消息。") @@ -1403,12 +1456,13 @@ def do_whisper(arg, message=None, verbose=True, by=-1): if side == "Server": # 非公开消息,不记入 history 列表, # 于是这里没有了 history.append 语句 - log_queue.put(json.dumps({'type': 'CHAT.LOG', 'time': time_str(), 'from': by, 'order': 0, 'filename': "", 'content': message, 'to': arg})) # 协议 2.3 + message_order += 1 # 同上,给该消息分配一个新的编号 + log_queue.put(json.dumps({'type': 'CHAT.LOG', 'time': time_str(), 'from': by, 'order': message_order, 'filename': "", 'content': message, 'to': arg})) # 协议 2.3 for i in range(len(users)): # 私聊消息只对收发方,状态为 Admin 的用户和 # 状态为 Root 的用户可见 if users[i]['status'] in ["Admin", "Root"] or i == by or i == arg: - send_queue.put(json.dumps({'to': i, 'content': {'type': 'CHAT.RECEIVE', 'from': by, 'order': 0, 'filename': "", 'content': message, 'to': arg}})) # 协议 2.2 + send_queue.put(json.dumps({'to': i, 'content': {'type': 'CHAT.RECEIVE', 'from': by, 'order': message_order, 'filename': "", 'content': message, 'to': arg}})) # 协议 2.2 if side == "Client": my_socket.send(bytes(json.dumps({'type': 'CHAT.SEND', 'filename': "", 'content': message, 'to': arg}) + "\n", encoding="utf-8")) # 协议 2.1 @@ -1448,7 +1502,7 @@ def do_distribute(arg, message=None, verbose=True, by=-1): return if side == "Server": - file_order += 1 # 给该文件分配一个新的编号,从 1 开始(下同) + file_order -= 1 # 给该文件分配一个新的编号,从 -1 开始递减(下同) # 服务端在此处接收文件(下同); # 先写入到磁盘(相当于写入日志,尽快释放内存,且减小意外断电的情况下的损失), # 然后在第四部分的 thread_send 线程中重新读取并发送 @@ -1501,37 +1555,37 @@ def do_transfer(arg, message=None, verbose=True, by=-1): global file_order if by == -1: by = my_uid - arg = arg.split(' ', 1) - if len(arg) != 2: - printc(verbose, "参数错误:应当给出恰好 2 个参数。") - return if not config['file']['allow_any']: printc(verbose, "此聊天室目前不允许发送文件。") return if not config['file']['allow_private']: printc(verbose, "此聊天室目前不允许发送私有文件。") return + arg = parse_username(arg, ["Online", "Admin", "Root"]) + # 分离接收方 UID 和(可能不存在的)文件名 try: - arg[0] = int(arg[0]) - if arg[0] <= -1 or arg[0] >= len(users): - raise + arg, filename = arg.split(' ', 1) except: - printc(verbose, "UID 输入错误。") + pass + try: + arg = int(arg) + except: + printc(verbose, "参数错误:用户解析失败,只能对状态处于 Online、Admin、Root 中的某一项的用户操作。") return - if not users[arg[0]]['status'] in ["Online", "Admin", "Root"]: + if not users[arg]['status'] in ["Online", "Admin", "Root"]: printc(verbose, "只能向状态处于 Online、Admin、Root 中的某一项的用户发送私有文件。") return - if arg[0] == by: + if arg == by: printc(verbose, "不能向自己发送私有文件。") return for word in config['ban']['words']: - if word in arg[1]: + if word in filename: printc(verbose, "发送失败:文件名中包含屏蔽词:" + word) return if not message: # 同上,跳过重复的加密操作 try: # 同上,读取文件并转换 - with open(arg[1], 'rb') as f: + with open(filename, 'rb') as f: file_data = f.read() message = base64.b64encode(file_data).decode('utf-8') except: @@ -1542,7 +1596,7 @@ def do_transfer(arg, message=None, verbose=True, by=-1): return if side == "Server": - file_order += 1 # 同上,给该文件分配一个新的编号 + file_order -= 1 # 同上,给该文件分配一个新的编号 # 同上,服务端在此处接收文件 try: # 同上,不同系统的目录格式不同 @@ -1559,16 +1613,16 @@ def do_transfer(arg, message=None, verbose=True, by=-1): tmp_filename = "TouchFishFiles\\{}.file".format(file_order) else: tmp_filename = "TouchFishFiles/{}.file".format(file_order) - log_queue.put(json.dumps({'type': 'CHAT.LOG', 'time': time_str(), 'from': by, 'order': file_order, 'filename': arg[1], 'content': "", 'to': arg[0]})) # 协议 2.3 + log_queue.put(json.dumps({'type': 'CHAT.LOG', 'time': time_str(), 'from': by, 'order': file_order, 'filename': filename, 'content': "", 'to': arg})) # 协议 2.3 for i in range(len(users)): # 同上,先以保存文件时使用的文件名填充 content 字段; # 私有文件只对收发方,状态为 Admin 的用户和 # 状态为 Root 的用户可见 - if users[i]['status'] in ["Admin", "Root"] or i == by or i == arg[0]: - send_queue.put(json.dumps({'to': i, 'content': {'type': 'CHAT.RECEIVE', 'from': by, 'order': file_order, 'filename': arg[1], 'content': tmp_filename, 'to': arg[0]}})) # 协议 2.2 + if users[i]['status'] in ["Admin", "Root"] or i == by or i == arg: + send_queue.put(json.dumps({'to': i, 'content': {'type': 'CHAT.RECEIVE', 'from': by, 'order': file_order, 'filename': filename, 'content': tmp_filename, 'to': arg}})) # 协议 2.2 if side == "Client": # 协议 2.1 - token = json.dumps({'type': 'CHAT.SEND', 'filename': arg[1], 'content': message, 'to': arg[0]}) + "\n" + token = json.dumps({'type': 'CHAT.SEND', 'filename': filename, 'content': message, 'to': arg}) + "\n" chunks = [token[i:i+32768] for i in range(0, len(token), 32768)] # 同上,分段发送数据 for chunk in chunks: @@ -1612,7 +1666,6 @@ def do_dashboard(arg=None): def do_save(arg=None): global log_queue if users[my_uid]['status'] != "Root": - print(users[my_uid]['status']) print("只有处于 Root 状态的用户有权执行该操作。") return try: @@ -1623,27 +1676,87 @@ def do_save(arg=None): except: print("无法将参数保存到配置文件 config.json,请稍后重试。") +def do_evaluate(arg=None): + try: + print(eval(arg)) + except Exception as e: + print("计算时遇到错误:" + str(e)) + def do_exit(arg=None): global log_queue global send_queue - global EXIT_FLAG - print("\033[0m\033[1;36m再见!\033[0m") # 此处不能调用 dye 函数,原因参见第二部分 process 函数中的注释 + global exit_flag + # 此处不能调用 dye 函数,因为需要使用 \033[0m + # 来清除 ANSI 文本序列带来的显示效果, + # 防止干扰用户后续的终端使用 + print("\033[0m\033[1;36m再见!\033[0m") if side == "Server": log_queue.put(json.dumps({'type': 'SERVER.STOP.LOG', 'time': time_str()})) # 协议 3.2.2 for i in range(len(users)): if users[i]['status'] in ["Pending", "Online", "Admin", "Root"]: send_queue.put(json.dumps({'to': i, 'content': {'type': 'SERVER.STOP.ANNOUNCE'}})) # 协议 3.2.1 server_socket.close() - EXIT_FLAG = True + exit_flag = True my_socket.close() return +def do_flood(arg=None): + global blocked + global exit_flag + if platform.system() == "Windows": + shortcut = 'C' + else: + shortcut = 'D' + print(SIMPLE_COMMAND_LINE_HINT_CONTENT.format(shortcut)) + print("\033[8;30m", end="", flush=True) + while True: + time.sleep(0.1) + if exit_flag: + print("\033[0m", end="", flush=True) + return + + # 输出模式 + try: + input() + except EOFError: + printf("您已经退出简易命令行模式。", "black") + return + except: + pass + + # 变更为输入模式 + blocked = True + try: + message = input("\033[0m\033[1;30m> ") + except EOFError: + print() + printf("您已经退出简易命令行模式。", "black") + return + except: + pass + if not message: + print("\033[8;30m", end="", flush=True) + blocked = False + continue + + # 发送消息 + do_send(message, None, False, -1) + print("\033[8;30m", end="", flush=True) + + # 变更为输出模式 + blocked = False + def do_help(arg=None): print() for hint in HELP_HINT: printf(hint['content'], hint['color']) print() +def do_shell(arg=None): + print("\033[0m", end="", flush=True) # 执行前清除现有文本效果 + os.system(arg) + print("\033[8;30m", end="", flush=True) # 执行后恢复现有文本效果 + @@ -1669,7 +1782,7 @@ def do_help(arg=None): # 所有线程均使用 while True 的无限循环, # 每轮开始前暂停 0.1 秒防止 CPU 占用过高, -# 且均受程序终止信号 EXIT_FLAG 的调控。 +# 且均受程序终止信号 exit_flag 的调控。 def thread_gate(): global online_count @@ -1678,9 +1791,8 @@ def thread_gate(): global users while True: time.sleep(0.1) - if EXIT_FLAG: + if exit_flag: return - break # 尝试开启新连接 conntmp, addresstmp = None, None @@ -1730,13 +1842,21 @@ def thread_gate(): if online_count == config['general']['max_connections']: result = "Room is full" for user in users[:-1]: - if user['status'] in ["Online", "Admin", "Root"] and users[uid]['username'] == user['username']: + if user['status'] in ["Online", "Admin", "Root", "Pending"] and users[uid]['username'] == user['username']: result = "Duplicate usernames" for word in config['ban']['words']: if word in users[uid]['username']: result = "Username consists of banned words" - users[uid]['body'].send(bytes(json.dumps({'type': 'GATE.RESPONSE', 'result': result}) + "\n", encoding="utf-8")) # 协议 1.2 + while True: + try: + users[uid]['body'].send(bytes(json.dumps({'type': 'GATE.RESPONSE', 'result': result}) + "\n", encoding="utf-8")) # 协议 1.2 + break + except BlockingIOError: + continue + except: + break + log_queue.put(json.dumps({'type': 'GATE.CLIENT_REQUEST.LOG', 'time': time_str(), 'ip': users[uid]['ip'], 'username': users[uid]['username'], 'uid': uid, 'result': result})) # 协议 1.5.2 for i in range(len(users)): if users[i]['status'] in ["Online", "Admin", "Root"]: @@ -1761,7 +1881,7 @@ def thread_gate(): users_abstract = [] for i in range(len(users)): users_abstract.append({"username": users[i]['username'], "status": users[i]['status']}) - users[uid]['body'].send(bytes(json.dumps({'type': 'SERVER.DATA', 'server_version': VERSION, 'uid': uid, 'config': config, 'users': users_abstract, 'chat_history': history}) + "\n", encoding="utf-8")) # 协议 3.2 + send_queue.put(json.dumps({'to': uid, 'content': {'type': 'SERVER.DATA', 'server_version': VERSION, 'uid': uid, 'config': config, 'users': users_abstract, 'chat_history': history}})) # 协议 3.2 def thread_process(): global online_count @@ -1771,9 +1891,9 @@ def thread_process(): global users while True: time.sleep(0.1) - if EXIT_FLAG: + if exit_flag: return - break + while not receive_queue.empty(): message = json.loads(receive_queue.get()) sender, content = message['from'], message['content'] @@ -1805,9 +1925,9 @@ def thread_receive(): global users while True: time.sleep(0.1) - if EXIT_FLAG: + if exit_flag: return - break + for i in range(len(users)): if users[i]['status'] in ["Online", "Admin", "Root"]: data = "" @@ -1842,12 +1962,12 @@ def thread_send(): global users while True: time.sleep(0.1) - if EXIT_FLAG: + if exit_flag: return - break + while not send_queue.empty(): message = json.loads(send_queue.get()) - if not users[message['to']]['status'] in ["Online", "Admin", "Root"]: + if not users[message['to']]['status'] in ["Online", "Admin", "Root", "Pending"]: continue # 先发送心跳数据(单个换行符)检查客户端是否下线 try: @@ -1902,9 +2022,8 @@ def thread_log(): file.write(log_queue.get() + "\n") # 与其他线程不同,先写入日志再读取程序终止信号, # 确保程序终止时没有日志残留在 log_queue 中 - if EXIT_FLAG: + if exit_flag: return - break def thread_check(): global online_count @@ -1913,9 +2032,9 @@ def thread_check(): global users while True: time.sleep(1) # 该部分对整体性能影响较大,因此执行频率下调至 1 秒一次 - if EXIT_FLAG: + if exit_flag: return - break + down = [] # 先完成全部下线用户检测工作再一并广播, # 避免将状态变更通知(不必要地)发送给 @@ -1937,45 +2056,59 @@ def thread_check(): def thread_input(): global blocked - global EXIT_FLAG + global exit_flag while True: time.sleep(0.1) - if EXIT_FLAG: + if exit_flag: + print("\033[0m", end="", flush=True) return - break + # 输出模式 try: input() except: pass + # 变更为输入模式 blocked = True - command = input("\033[0m\033[1;30m> ") + try: + command = input("\033[0m\033[1;30m> ") + except: + pass if not command: - print("\033[8;30m", end="") + print("\033[8;30m", end="", flush=True) blocked = False continue + + # 将缩写形式替换为完整形式 + for i in list(ABBREVIATION_TABLE.keys()): + if command.startswith(i + " ") or command == i: + command = ABBREVIATION_TABLE[i] + command[len(i):] + break command = command.split(' ', 1) if len(command) == 1: command = [command[0], ""] if not command[0] in COMMAND_LIST: - print("指令输入错误。\n\033[8;30m", end="") + print("指令输入错误。\n\033[8;30m", end="", flush=True) blocked = False continue + # 将对应指令函数加载到 now,然后执行 now 函数 now = eval("do_{}".format(command[0])) now(command[1]) - print("\033[8;30m", end="") + print("\033[8;30m", end="", flush=True) + # 变更为输出模式 blocked = False def thread_output(): - global EXIT_FLAG + global exit_flag while True: time.sleep(0.1) - if EXIT_FLAG: + if exit_flag: + print("\033[0m", end="", flush=True) return - break + read() message = get_message() flush() @@ -1999,6 +2132,7 @@ def main(): global my_username global my_uid global file_order + global message_order global my_socket global users global server_socket @@ -2007,17 +2141,15 @@ def main(): global history global online_count global buffer - global EXIT_FLAG + global exit_flag global log_queue global receive_queue global send_queue global print_queue - - can_read_config = True # 尝试读取配置文件 (config.json), # 检查规则详见第一部分的相关注释; - # 检查不通过则加载默认服务端配置 + # 检查不通过则加载默认客户端配置 try: with open("config.json", "r", encoding="utf-8") as f: tmp_config = json.load(f) @@ -2069,22 +2201,36 @@ def main(): raise if not tmp_config['username']: raise - if not check_ip(tmp_config['ip']): + if not tmp_config['ip']: raise config = tmp_config - prints("配置文件 config.json 读取成功!", "yellow") + config_read_result = "OK" except FileNotFoundError: - prints("未找到配置文件 config.json。如果该文件存在,请尝试以管理员权限重新运行。", "yellow") - prints("下面将使用默认服务端配置启动程序。", "yellow") - config = DEFAULT_SERVER_CONFIG + config = DEFAULT_CLIENT_CONFIG + config_read_result = "Not found" except: - prints("配置文件 config.json 中的配置项存在错误。", "yellow") - prints("下面将使用默认服务端配置启动程序。", "yellow") - config = DEFAULT_SERVER_CONFIG - can_read_config = False + config = DEFAULT_CLIENT_CONFIG + config_read_result = "Broken" os.system('') # 对 Windows 尝试开启 ANSI 转义字符(带颜色文本)支持 clear_screen() + + prints("祝大家 2026 年新年快乐!", "magenta") + prints("我们准备了一些新年彩蛋,详情请见:", "magenta") + prints("https://github.com/ILoveScratch2/TouchFish-Astra/releases/tag/v2.1.0", "magenta") + prints("这段文本只在此版本 (v4.5.2) 中出现。", "magenta") + print() + + if config_read_result == "OK": + prints("配置文件 config.json 读取成功!", "yellow") + if config_read_result == "Not found": + prints("未找到配置文件 config.json。如果该文件存在,请尝试以管理员权限重新运行。", "yellow") + prints("下面将使用默认客户端配置启动程序。", "yellow") + if config_read_result == "Broken": + prints("配置文件 config.json 中的配置项存在错误。", "yellow") + prints("下面将使用默认客户端配置启动程序。", "yellow") + print() + if platform.system() == "Windows": shortcut = 'C' else: @@ -2093,338 +2239,388 @@ def main(): prints("当前程序版本:{}".format(VERSION), "yellow") prints("按下 Ctrl + {} 以按照配置文件中的配置自动启动。".format(shortcut), "yellow") prints("按下 Enter 以指定启动配置。", "yellow") - auto_start = False - try: - input() - except BaseException as e: - auto_start = True - except: - pass - tmp_side = None - if not auto_start: - tmp_side = input("\033[0m\033[1;37m启动类型 (Server = 服务端, Client = 客户端) [{}]:".format(config['side'])) - if not tmp_side: - tmp_side = config['side'] - if not tmp_side in ["Server", "Client"]: - prints("参数错误。", "red") - input("\033[0m") - sys.exit(1) - if tmp_side == "Server": - # 当程序以服务端启动时, - # 若 config.json 中加载到的 side 参数为 "Client", - # 则覆写为默认服务端配置 - if config['side'] == "Client": - config = DEFAULT_SERVER_CONFIG - tmp_ip = None - if not auto_start: - tmp_ip = input("\033[0m\033[1;37m服务端 IP [{}]:".format(config['general']['server_ip'])) - if not tmp_ip: - tmp_ip = config['general']['server_ip'] - config['general']['server_ip'] = tmp_ip - if not check_ip(tmp_ip): - prints("参数错误:输入的服务端 IP 不是有效的点分十进制格式 IPv4 地址。", "red") - input("\033[0m") - sys.exit(1) - tmp_port = None - if not auto_start: - tmp_port = input("\033[0m\033[1;37m端口 [{}]:".format(config['general']['server_port'])) - if not tmp_port: - tmp_port = config['general']['server_port'] + try: + auto_start = False try: - tmp_port = int(tmp_port) - if tmp_port < 1 or tmp_port > 65535: - raise + input() + except BaseException as e: + auto_start = True except: - prints("参数错误:端口号应为不大于 65535 的正整数。", "red") - input("\033[0m") - sys.exit(1) - config['general']['server_port'] = tmp_port - tmp_server_username = None - if not auto_start: - tmp_server_username = input("\033[0m\033[1;37m服务端管理员的用户名 [{}]:".format(config['general']['server_username'])) - if not tmp_server_username: - tmp_server_username = config['general']['server_username'] - config['general']['server_username'] = tmp_server_username - my_username = config['general']['server_username'] - tmp_max_connections = None + pass + tmp_side = None if not auto_start: - tmp_max_connections = input("\033[0m\033[1;37m最大在线连接数 [{}]:".format(config['general']['max_connections'])) - if not tmp_max_connections: - tmp_max_connections = config['general']['max_connections'] - try: - tmp_max_connections = int(tmp_max_connections) - if tmp_max_connections < 1 or tmp_max_connections > 128: - raise - except: - prints("参数错误:最大在线连接数应为不大于 128 的正整数。", "red") - input("\033[0m") - sys.exit(1) - config['general']['max_connections'] = tmp_max_connections - - # 创建保存文件时使用的目录(下同) - if platform.system() == "Windows": - os.system('mkdir TouchFishFiles 1>nul 2>&1') - else: - os.system('mkdir TouchFishFiles 1>/dev/null 2>&1') - try: - with open("config.json", "w", encoding="utf-8") as f: - json.dump(config, f) - prints("本次连接中输入的参数已经保存到配置文件 config.json,下次连接时将自动加载。", "yellow") - except: - prints("启动时遇到错误:配置文件 config.json 写入失败。", "red") - input("\033[0m") - sys.exit(1) - try: - with open("log.ndjson", "a", encoding="utf-8") as f: - pass - except: - prints("启动时遇到错误:无法向日志文件 log.ndjson 写入内容。", "red") + tmp_side = input("\033[0m\033[1;37m启动类型 (Server = 服务端, Client = 客户端) [{}]:".format(config['side'])) + if not tmp_side: + tmp_side = config['side'] + if not tmp_side in ["Server", "Client"]: + prints("参数错误。", "red") input("\033[0m") sys.exit(1) - - try: - # 启动服务端 socket - server_socket = socket.socket() - server_socket.bind((config['general']['server_ip'], config['general']['server_port'])) - server_socket.listen(config['general']['max_connections']) - server_socket.setblocking(False) - users = [{"body": None, "buffer": "", "ip": None, "username": config['general']['server_username'], "status": "Root", "busy": False}] # 初始化用户列表 - root_socket = socket.socket() # 为服务端创建一个连接用于接收信息(不用于发送请求) - root_socket.connect((config['general']['server_ip'], config['general']['server_port'])) # 连接到服务端 socket - # 同上,调整为非阻塞模式,缓冲区大小设置为 1 MiB,改善性能 - root_socket.setblocking(False) - root_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576) - root_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1048576) - users[0]['body'], users[0]['ip'] = server_socket.accept() # 完成连接 - # 同上,设置 TCP 保活参数:启用功能,5 分钟后开始探测,间隔 30 秒 - if platform.system() != "Windows": - users[0]['body'].setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) - users[0]['body'].setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 300) - users[0]['body'].setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30) + + if tmp_side == "Server": + # 当程序以服务端启动时, + # 若 config.json 中加载到的 side 参数为 "Client", + # 则覆写为默认服务端配置 + if config['side'] == "Client": + config = DEFAULT_SERVER_CONFIG + tmp_ip = None + if not auto_start: + tmp_ip = input("\033[0m\033[1;37m服务端 IP [{}]:".format(config['general']['server_ip'])) + if not tmp_ip: + tmp_ip = config['general']['server_ip'] + config['general']['server_ip'] = tmp_ip + if not check_ip(tmp_ip): + prints("参数错误:输入的服务端 IP 不是有效的点分十进制格式 IPv4 地址。", "red") + input("\033[0m") + sys.exit(1) + tmp_port = None + if not auto_start: + tmp_port = input("\033[0m\033[1;37m端口 [{}]:".format(config['general']['server_port'])) + if not tmp_port: + tmp_port = config['general']['server_port'] + try: + tmp_port = int(tmp_port) + if tmp_port < 1 or tmp_port > 65535: + raise + except: + prints("参数错误:端口号应为不大于 65535 的正整数。", "red") + input("\033[0m") + sys.exit(1) + config['general']['server_port'] = tmp_port + tmp_server_username = None + if not auto_start: + tmp_server_username = input("\033[0m\033[1;37m服务端管理员的用户名 [{}]:".format(config['general']['server_username'])) + if not tmp_server_username: + tmp_server_username = config['general']['server_username'] + config['general']['server_username'] = tmp_server_username + my_username = config['general']['server_username'] + tmp_max_connections = None + if not auto_start: + tmp_max_connections = input("\033[0m\033[1;37m最大在线连接数 [{}]:".format(config['general']['max_connections'])) + if not tmp_max_connections: + tmp_max_connections = config['general']['max_connections'] + try: + tmp_max_connections = int(tmp_max_connections) + if tmp_max_connections < 1 or tmp_max_connections > 128: + raise + except: + prints("参数错误:最大在线连接数应为不大于 128 的正整数。", "red") + input("\033[0m") + sys.exit(1) + config['general']['max_connections'] = tmp_max_connections + + # 创建保存文件时使用的目录(下同) + if platform.system() == "Windows": + os.system('mkdir TouchFishFiles 1>nul 2>&1') else: - users[0]['body'].setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) - users[0]['body'].ioctl(socket.SIO_KEEPALIVE_VALS, (1, 300000, 30000)) - users[0]['body'].setblocking(False) - users[0]['body'].setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576) - users[0]['body'].setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1048576) - my_uid = 0 - my_socket = root_socket - except Exception as e: - prints("启动时遇到错误:无法在给定的地址上启动 socket,请检查 IP 地址或更换端口。\n详细信息:" + str(e), "red") - input("\033[0m") - sys.exit(1) - - with open("./log.ndjson", "a", encoding="utf-8") as file: - file.write(json.dumps({'type': 'SERVER.START', 'time': time_str(), 'server_version': VERSION, 'config': config}) + "\n") # 协议 3.1 - - side = "Server" - prints("启动成功!", "green") - # 响铃,显示帮助文本,显示聊天室各项信息,显示加入提示 - ring() - do_help() - do_dashboard() - if config['gate']['enter_hint']: - first_line = dye("[" + time_str()[11:19] + "]", "black") - first_line += dye(" [您发送的]", "blue") - first_line += " " - first_line += dye(" [加入提示]", "red") - first_line += " " - first_line += dye("@", "black") - first_line += dye(config['general']['server_username'], "yellow") - first_line += dye(":", "black") - prints(first_line) - prints(config['gate']['enter_hint'], "white") - - THREAD_GATE = threading.Thread(target=thread_gate) - THREAD_PROCESS = threading.Thread(target=thread_process) - THREAD_RECEIVE = threading.Thread(target=thread_receive) - THREAD_SEND = threading.Thread(target=thread_send) - THREAD_LOG = threading.Thread(target=thread_log) - THREAD_CHECK = threading.Thread(target=thread_check) - THREAD_INPUT = threading.Thread(target=thread_input) - THREAD_OUTPUT = threading.Thread(target=thread_output) - - THREAD_GATE.start() - THREAD_PROCESS.start() - THREAD_RECEIVE.start() - THREAD_SEND.start() - THREAD_LOG.start() - THREAD_CHECK.start() - THREAD_INPUT.start() - THREAD_OUTPUT.start() - - if tmp_side == "Client": - # 当程序以客户端启动时, - # 若 config.json 中加载到的 side 参数为 "Client", - # 则覆写为默认客户端配置 - if config['side'] == "Server": - config = DEFAULT_CLIENT_CONFIG - tmp_ip = None - if not auto_start: - tmp_ip = input("\033[0m\033[1;37m服务端 IP [{}]:".format(config['ip'])) - if not tmp_ip: - tmp_ip = config['ip'] - config['ip'] = tmp_ip - if not check_ip(tmp_ip): - prints("参数错误:输入的服务端 IP 不是有效的点分十进制格式 IPv4 地址。", "red") - input("\033[0m") - sys.exit(1) - tmp_port = None - if not auto_start: - tmp_port = input("\033[0m\033[1;37m端口 [{}]:".format(config['port'])) - if not tmp_port: - tmp_port = config['port'] - try: - tmp_port = int(tmp_port) - if tmp_port < 1 or tmp_port > 65535: - raise - except: - prints("参数错误:端口号应为不大于 65535 的正整数。", "red") - input("\033[0m") - sys.exit(1) - config['port'] = tmp_port - tmp_username = None - if not auto_start: - tmp_username = input("\033[0m\033[1;37m用户名 [{}]:".format(config['username'])) - if not tmp_username: - tmp_username = config['username'] - config['username'] = tmp_username - my_username = config['username'] - # 同上,创建保存文件时使用的目录 - if platform.system() == "Windows": - os.system('mkdir TouchFishFiles 1>nul 2>&1') - else: - os.system('mkdir TouchFishFiles 1>/dev/null 2>&1') - try: - with open("config.json", "w", encoding="utf-8") as f: - json.dump(config, f) - prints("本次连接中输入的参数已经保存到配置文件 config.json,下次连接时将自动加载。", "yellow") - except: - prints("启动时遇到错误:配置文件 config.json 写入失败。", "red") - input("\033[0m") - sys.exit(1) - - prints("正在连接聊天室...", "yellow") - my_socket = socket.socket() - try: - my_socket.connect((config['ip'], config['port'])) # 连接到服务端 socket - # 同上,调整为非阻塞模式,缓冲区大小设置为 1 MiB,改善性能 - my_socket.setblocking(False) - my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576) - my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1048576) - my_socket.send(bytes(json.dumps({'type': 'GATE.REQUEST', 'username': my_username}), encoding="utf-8")) # 协议 1.1 - except Exception as e: - prints("连接失败:{}".format(e), "red") - input("\033[0m") - sys.exit(1) - - # 同上,设置 TCP 保活参数:启用功能,5 分钟后开始探测,间隔 30 秒 - if platform.system() == "Windows": - my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) - my_socket.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 300000, 30000)) - else: - my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) - my_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 300) - my_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30) - - # 核验协议 1.2,获取加入请求结果 - try: - message = None - time.sleep(0.5) # 与服务端「错峰」0.5 秒,期望第一次验证就成功(总用时 1.5 秒) - for i in range(10): # 设置 10 秒的「窗口期」,每秒验证一次 - time.sleep(1) - try: - read() - message = get_message() - if not message: - raise - break - except: + os.system('mkdir TouchFishFiles 1>/dev/null 2>&1') + try: + with open("config.json", "w", encoding="utf-8") as f: + json.dump(config, f) + prints("本次连接中输入的参数已经保存到配置文件 config.json,下次连接时将自动加载。", "yellow") + except: + prints("启动时遇到错误:配置文件 config.json 写入失败。", "red") + input("\033[0m") + sys.exit(1) + try: + with open("log.ndjson", "a", encoding="utf-8") as f: pass - if not message: - raise - if not message['result'] in ["Accepted", "Pending review"] + list(RESULTS.keys()): - raise - except: - prints("连接失败:对方似乎不是 v4 及以上的 TouchFish 服务端。", "red") - prints("注:也有可能是对方服务器端口被防火墙拦截,请联系服务器所有者确认,或检查本地网络及防火墙设置。", "black") - input("\033[0m") - sys.exit(1) - - if not message['result'] in ["Accepted", "Pending review"]: - prints("连接失败:{}".format(RESULTS[message['result']]), "red") - input("\033[0m") - sys.exit(1) - - if message['result'] == "Accepted": - prints("连接成功!", "green") + except: + prints("启动时遇到错误:无法向日志文件 log.ndjson 写入内容。", "red") + input("\033[0m") + sys.exit(1) + + try: + # 启动服务端 socket: + # 每两步操作之间间隔 0.01 秒, + # 防止爆出 BlockingIOError + server_socket = socket.socket() + time.sleep(0.01) + server_socket.bind((config['general']['server_ip'], config['general']['server_port'])) + time.sleep(0.01) + server_socket.listen(config['general']['max_connections']) + time.sleep(0.01) + server_socket.setblocking(False) + time.sleep(0.01) + users = [{"body": None, "buffer": "", "ip": None, "username": config['general']['server_username'], "status": "Root", "busy": False}] # 初始化用户列表 + time.sleep(0.01) + root_socket = socket.socket() # 为服务端创建一个连接用于接收信息(不用于发送请求) + time.sleep(0.01) + root_socket.connect((config['general']['server_ip'], config['general']['server_port'])) # 连接到服务端 socket + time.sleep(0.01) + # 同上,调整为非阻塞模式,缓冲区大小设置为 1 MiB,改善性能 + root_socket.setblocking(False) + time.sleep(0.01) + root_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576) + time.sleep(0.01) + root_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1048576) + time.sleep(0.01) + users[0]['body'], users[0]['ip'] = server_socket.accept() # 完成连接 + time.sleep(0.01) + # 同上,设置 TCP 保活参数:启用功能,5 分钟后开始探测,间隔 30 秒 + if platform.system() != "Windows": + users[0]['body'].setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) + time.sleep(0.01) + users[0]['body'].setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 300) + time.sleep(0.01) + users[0]['body'].setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30) + time.sleep(0.01) + else: + users[0]['body'].setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) + time.sleep(0.01) + users[0]['body'].ioctl(socket.SIO_KEEPALIVE_VALS, (1, 300000, 30000)) + time.sleep(0.01) + users[0]['body'].setblocking(False) + time.sleep(0.01) + users[0]['body'].setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576) + time.sleep(0.01) + users[0]['body'].setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1048576) + time.sleep(0.01) + my_uid = 0 + time.sleep(0.01) + my_socket = root_socket + time.sleep(0.01) + except Exception as e: + prints("启动时遇到错误:" + str(e), "red") + prints("请检查 IP 地址或更换端口。", "red") + input("\033[0m") + sys.exit(1) + + with open("./log.ndjson", "a", encoding="utf-8") as file: + file.write(json.dumps({'type': 'SERVER.START', 'time': time_str(), 'server_version': VERSION, 'config': config}) + "\n") # 协议 3.1 + + side = "Server" + prints("启动成功!", "green") + # 响铃,显示帮助文本,显示聊天室各项信息,显示加入提示 ring() + do_help() + do_dashboard() + if config['gate']['enter_hint']: + first_line = dye("[" + time_str()[11:19] + "]", "black") + first_line += dye(" [您发送的]", "blue") + first_line += " " + first_line += dye(" [加入提示]", "red") + first_line += " " + first_line += dye("@", "black") + first_line += dye(config['general']['server_username'], "yellow") + first_line += dye(":", "black") + prints(first_line) + prints(config['gate']['enter_hint'], "white") + + THREAD_GATE = threading.Thread(target=thread_gate) + THREAD_PROCESS = threading.Thread(target=thread_process) + THREAD_RECEIVE = threading.Thread(target=thread_receive) + THREAD_SEND = threading.Thread(target=thread_send) + THREAD_LOG = threading.Thread(target=thread_log) + THREAD_CHECK = threading.Thread(target=thread_check) + THREAD_INPUT = threading.Thread(target=thread_input) + THREAD_OUTPUT = threading.Thread(target=thread_output) + + THREAD_GATE.start() + THREAD_PROCESS.start() + THREAD_RECEIVE.start() + THREAD_SEND.start() + THREAD_LOG.start() + THREAD_CHECK.start() + THREAD_INPUT.start() + THREAD_OUTPUT.start() - if message['result'] == "Pending review": - prints("服务端需要对连接请求进行人工审核,请等待...", "white") - while True: - try: - read() - message = get_message() - if not message: - continue - # 特殊情况:聊天室服务端已经关闭 (协议 3.3.1) - if message['type'] == "SERVER.STOP.ANNOUNCE": - prints("聊天室服务端已经关闭。", "red") - prints("连接失败。", "red") - input("\033[0m") - sys.exit(1) - # 一般情况:人工审核完成 (协议 1.3) - if not message['accepted']: - prints("服务端管理员 {} (UID = {}) 拒绝了您的连接请求。".format(message['operator']['username'], message['operator']['uid']), "red") - prints("连接失败。", "red") - input("\033[0m") - sys.exit(1) - if message['accepted']: - time.sleep(1) # 等待 1 秒,确认协议 3.2 提供的完整上下文传输完成 - prints("服务端管理员 {} (UID = {}) 通过了您的连接请求。".format(message['operator']['username'], message['operator']['uid']), "green") - prints("连接成功!", "green") - ring() + if tmp_side == "Client": + # 当程序以客户端启动时, + # 若 config.json 中加载到的 side 参数为 "Client", + # 则覆写为默认客户端配置 + if config['side'] == "Server" or config_read_result != "OK": + config = DEFAULT_CLIENT_CONFIG + config['username'] += time_str()[20:26] + # 截取 "xxxx-xx-xx xx:xx:xx.xxxxxx" 中最后的 "xxxxxx" + # 当作随机的用户名后缀,形成形如 "user123456" 的用户名 + tmp_ip = None + if not auto_start: + tmp_ip = input("\033[0m\033[1;37m服务端 IP [{}]:".format(config['ip'])) + if not tmp_ip: + tmp_ip = config['ip'] + config['ip'] = tmp_ip + tmp_port = None + if not auto_start: + tmp_port = input("\033[0m\033[1;37m端口 [{}]:".format(config['port'])) + if not tmp_port: + tmp_port = config['port'] + try: + tmp_port = int(tmp_port) + if tmp_port < 1 or tmp_port > 65535: + raise + except: + prints("参数错误:端口号应为不大于 65535 的正整数。", "red") + input("\033[0m") + sys.exit(1) + config['port'] = tmp_port + tmp_username = None + if not auto_start: + tmp_username = input("\033[0m\033[1;37m用户名 [{}]:".format(config['username'])) + if not tmp_username: + tmp_username = config['username'] + config['username'] = tmp_username + my_username = config['username'] + # 同上,创建保存文件时使用的目录 + if platform.system() == "Windows": + os.system('mkdir TouchFishFiles 1>nul 2>&1') + else: + os.system('mkdir TouchFishFiles 1>/dev/null 2>&1') + try: + with open("config.json", "w", encoding="utf-8") as f: + json.dump(config, f) + prints("本次连接中输入的参数已经保存到配置文件 config.json,下次连接时将自动加载。", "yellow") + except: + prints("启动时遇到错误:配置文件 config.json 写入失败。", "red") + input("\033[0m") + sys.exit(1) + + my_socket = socket.socket() + try: + my_socket.connect((config['ip'], config['port'])) # 连接到服务端 socket + # 同上,调整为非阻塞模式,缓冲区大小设置为 1 MiB,改善性能 + my_socket.setblocking(False) + my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576) + my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1048576) + my_socket.send(bytes(json.dumps({'type': 'GATE.REQUEST', 'username': my_username}), encoding="utf-8")) # 协议 1.1 + except Exception as e: + prints("启动时遇到错误:{}".format(e), "red") + input("\033[0m") + sys.exit(1) + + # 同上,设置 TCP 保活参数:启用功能,5 分钟后开始探测,间隔 30 秒 + if platform.system() == "Windows": + my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) + my_socket.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 300000, 30000)) + else: + my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) + my_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 300) + my_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30) + + # 核验协议 1.2,获取加入请求结果 + try: + message = None + seconds_consumed = 0 + print(dye("正在连接聊天室... (已经等待了 {} / 10 秒)\r", "yellow").format(seconds_consumed), end="", flush=True) + for i in range(10): + # 设置 10 秒的「窗口期」,每秒验证一次 + # 与服务端「错峰」0.5 秒,期望第一次验证就成功(总用时 1 秒) + time.sleep(1) + seconds_consumed += 1 + print(dye("正在连接聊天室... (已经等待了 {} / 10 秒)\r", "yellow").format(seconds_consumed), end="", flush=True) + try: + read() + message = get_message() + if not message: + raise break - except: - pass - - side = "Client" - # 获取服务端通过协议 3.2 提供的完整上下文; - # 此时自己应当处于 Online 状态 - read() - first_data = get_message() - server_version = first_data['server_version'] - my_uid = first_data['uid'] - config = first_data['config'] - users = first_data['users'] - # 自行计算在线人数(包括自己) - online_count = 0 - for user in users: - if user['status'] in ["Pending", "Online", "Admin", "Root"]: - online_count += 1 - - # 显示帮助文本,显示聊天室各项信息,显示加入提示 - do_help() - do_dashboard() - for i in first_data['chat_history']: - print_message(i) - if config['gate']['enter_hint']: - first_line = dye("[" + time_str()[11:19] + "]", "black") - first_line += dye(" [加入提示]", "red") - first_line += " " - first_line += dye("@", "black") - first_line += dye(config['general']['server_username'], "yellow") - first_line += dye(":", "black") - prints(first_line) - prints(config['gate']['enter_hint'], "white") - - THREAD_INPUT = threading.Thread(target=thread_input) - THREAD_OUTPUT = threading.Thread(target=thread_output) - - THREAD_INPUT.start() - THREAD_OUTPUT.start() + except: + pass + if not message: + seconds_consumed += 1 + raise + if not message['result'] in ["Accepted", "Pending review"] + list(RESULTS.keys()): + raise + except: + print() + if seconds_consumed == 11: + prints("连接失败:连接超时。", "red") + else: + prints("连接失败:对方返回的内容不符合 TouchFish v4 协议。", "red") + prints("对方似乎不是 v4 及以上的 TouchFish 服务端。", "red") + if seconds_consumed == 11: + prints("(也有可能是对方服务器端口被防火墙拦截,请联系服务器所有者确认,或检查本地网络及防火墙设置。)", "red") + input("\033[0m") + sys.exit(1) + + if not message['result'] in ["Accepted", "Pending review"]: + print() + prints("连接失败:{}".format(RESULTS[message['result']]), "red") + input("\033[0m") + sys.exit(1) + + if message['result'] == "Accepted": + print() + prints("连接成功!", "green") + ring() + + if message['result'] == "Pending review": + print() + seconds_consumed = 0 + while True: + clock_start = datetime.datetime.now().timestamp() + print(dye("服务端需要对连接请求进行人工审核,请等待... (已经等待了 {} 秒)\r", "white").format(seconds_consumed), end="", flush=True) + try: + read() + message = get_message() + if not message: + raise + # 特殊情况:聊天室服务端已经关闭 (协议 3.3.1) + if message['type'] == "SERVER.STOP.ANNOUNCE": + prints("聊天室服务端已经关闭。", "red") + prints("连接失败。", "red") + input("\033[0m") + sys.exit(1) + # 一般情况:人工审核完成 (协议 1.3) + if not message['accepted']: + print() + prints("服务端管理员 {} (UID = {}) 拒绝了您的连接请求。".format(message['operator']['username'], message['operator']['uid']), "red") + prints("连接失败。", "red") + input("\033[0m") + sys.exit(1) + if message['accepted']: + time.sleep(1) # 等待 1 秒,确认协议 3.2 提供的完整上下文传输完成 + print() + prints("服务端管理员 {} (UID = {}) 通过了您的连接请求。".format(message['operator']['username'], message['operator']['uid']), "green") + prints("连接成功!", "green") + ring() + break + except: + pass + clock_end = datetime.datetime.now().timestamp() + seconds_consumed += 1 + time.sleep(1 - (clock_end - clock_start)) + + side = "Client" + # 获取服务端通过协议 3.2 提供的完整上下文; + # 此时自己应当处于 Online 状态 + read() + first_data = get_message() + server_version = first_data['server_version'] + my_uid = first_data['uid'] + config = first_data['config'] + users = first_data['users'] + # 自行计算在线人数(包括自己) + online_count = 0 + for user in users: + if user['status'] in ["Pending", "Online", "Admin", "Root"]: + online_count += 1 + + # 显示帮助文本,显示聊天室各项信息,显示加入提示 + do_help() + do_dashboard() + for i in first_data['chat_history']: + print_message(i) + if config['gate']['enter_hint']: + first_line = dye("[" + time_str()[11:19] + "]", "black") + first_line += dye(" [加入提示]", "red") + first_line += " " + first_line += dye("@", "black") + first_line += dye(config['general']['server_username'], "yellow") + first_line += dye(":", "black") + prints(first_line) + prints(config['gate']['enter_hint'], "white") + + THREAD_INPUT = threading.Thread(target=thread_input) + THREAD_OUTPUT = threading.Thread(target=thread_output) + + THREAD_INPUT.start() + THREAD_OUTPUT.start() + except BaseException as e: + print() + prints("程序运行时遇到错误:" + str(e), "red") + print("\033[0m") if __name__ == "__main__": main()