-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathutils.py
More file actions
135 lines (105 loc) · 3.44 KB
/
utils.py
File metadata and controls
135 lines (105 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from pathlib import Path
import time
import os
import typing as t
from inspect import iscoroutinefunction
from user_agents import parse as parse_ua
from pydantic import BaseModel
from fastapi import Request
class InitOnceChecker:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
# l.debug("Creating new ImgAPIInit instance")
# else:
# l.debug("Reusing existing ImgAPIInit instance")
return cls._instance
def __init__(self):
if hasattr(self, 'initialized') and self.initialized:
# l.debug("ImgAPIInit already initialized, skipping")
self.new_init = False
return
self.initialized = True
self.new_init = True
def perf_counter():
'''
获取一个性能计数器, 执行返回函数来结束计时, 并返回保留两位小数的毫秒值
'''
start = time.perf_counter()
return lambda: round((time.perf_counter() - start)*1000, 2)
def get_path(path: str, create_dirs: bool = True, is_dir: bool = False) -> str:
'''
相对路径 (基于主程序目录) -> 绝对路径
:param path: 相对路径
:param create_dirs: 是否自动创建目录(如果不存在)
:param is_dir: 目标是否为目录
:return: 绝对路径
'''
full_path = str(Path(__file__).parent.joinpath(path))
if create_dirs:
# 自动创建目录
if is_dir:
os.makedirs(full_path, exist_ok=True)
else:
os.makedirs(os.path.dirname(full_path), exist_ok=True)
return full_path
def replace_code_tags(text: str) -> str:
'''
markdown -> html
'''
while "`" in text:
text = text.replace("`", "<code>", 1).replace("`", "</code>", 1)
return text
def cnen(cn: str, en: str):
return f'{replace_code_tags(cn)}<br/><i>{replace_code_tags(en)}</i>'
def ua(ua_str: str) -> t.Literal['vertical', 'horizontal', 'unknown']:
ua_result = parse_ua(ua_str)
if ua_result.is_mobile:
# Mobile -> Vertical
return 'vertical'
elif ua_result.is_pc or ua_result.is_tablet:
# PC / Tablet -> Horizontal
return 'horizontal'
else:
# Unknown -> Auto
return 'unknown'
class _UAResult_Browser(BaseModel):
family: str | None = None
version: list[int] | None = None
version_string: str | None = None
class _UAResult_OS(BaseModel):
family: str | None = None
version: list[int] | None = None
version_string: str | None = None
class _UAResult_Device(BaseModel):
family: str | None = None
brand: str | None = None
model: str | None = None
class UAResult(BaseModel):
browser: _UAResult_Browser
os: _UAResult_OS
device: _UAResult_Device
is_bot: bool
is_email_client: bool
is_mobile: bool
is_pc: bool
is_tablet: bool
is_touch_capable: bool
async def call_image_func(func: t.Callable[[Request], t.Any] | str | None, req: Request) -> t.Any | None:
"""统一调用 sync / async 图片函数 / 返回 url"""
if isinstance(func, str):
return func
if func is None:
return None
if iscoroutinefunction(func):
return await func(req)
return func(req)
async def call_init_func(func: t.Callable[[], t.Any] | None) -> None:
"""统一调用 sync / async init 函数"""
if func is None:
return
if iscoroutinefunction(func):
await func()
else:
func()