@@ -6,23 +6,23 @@ import (
66 "flag"
77 "io"
88 "log"
9- "math/rand"
109 "net/http"
1110 "net/url"
1211 "os"
1312 "path"
1413 "strings"
1514 "sync"
16- "time"
1715)
1816
1917// Config 结构体用于存储命令行参数配置
2018type Config struct {
21- KeyFile string // API 密钥文件路径
22- TargetURL string // 目标 API 基础 URL
23- Port string // 代理服务器监听端口
24- Address string // 代理服务器监听地址
25- Password string // 客户端身份验证密码
19+ KeyFile string // API 密钥文件路径
20+ TargetURL string // 目标 API 基础 URL
21+ Port string // 代理服务器监听端口
22+ Address string // 代理服务器监听地址
23+ Password string // 客户端身份验证密码
24+ MaxWorkers int // 最大工作协程数
25+ MaxQueue int // 最大请求队列长度
2626}
2727
2828// parseFlags 解析命令行参数并返回 Config 实例
@@ -33,14 +33,25 @@ func parseFlags() *Config {
3333 flag .StringVar (& cfg .Port , "port" , "8080" , "Port to listen on" )
3434 flag .StringVar (& cfg .Address , "address" , "localhost" , "Address to listen on" )
3535 flag .StringVar (& cfg .Password , "password" , "" , "Password for client authentication" )
36+
37+ // 添加WorkerPool相关配置
38+ maxWorkers := flag .Int ("max-workers" , 50 , "Maximum number of worker goroutines" )
39+ maxQueue := flag .Int ("max-queue" , 500 , "Maximum size of request queue" )
40+
3641 flag .Parse ()
42+
43+ // 将WorkerPool配置添加到Config结构体
44+ cfg .MaxWorkers = * maxWorkers
45+ cfg .MaxQueue = * maxQueue
46+
3747 return cfg
3848}
3949
4050// KeyPool 管理 API 密钥池
4151type KeyPool struct {
42- keys []string // 密钥列表
43- mu sync.Mutex // 互斥锁,确保线程安全
52+ keys []string // 密钥列表
53+ mu sync.Mutex // 互斥锁,确保线程安全
54+ currentIndex int // 当前密钥索引,用于循环抽取
4455}
4556
4657// NewKeyPool 从文件中加载密钥并创建 KeyPool 实例
@@ -65,41 +76,182 @@ func NewKeyPool(filePath string) (*KeyPool, error) {
6576 return nil , err
6677 }
6778 log .Printf ("[INFO] Loaded %d keys from file %s" , len (keys ), filePath )
68- return & KeyPool {keys : keys }, nil
79+ return & KeyPool {keys : keys , currentIndex : 0 }, nil
6980}
7081
71- // GetRandomKey 随机返回一个密钥
82+ // GetRandomKey 按顺序循环返回一个密钥
7283func (kp * KeyPool ) GetRandomKey () string {
7384 kp .mu .Lock ()
7485 defer kp .mu .Unlock ()
7586 if len (kp .keys ) == 0 {
7687 return ""
7788 }
78- rand .Seed (time .Now ().UnixNano ())
79- return kp .keys [rand .Intn (len (kp .keys ))]
89+ key := kp .keys [kp .currentIndex ]
90+ kp .currentIndex = (kp .currentIndex + 1 ) % len (kp .keys ) // 循环到下一个索引
91+ return key
92+ }
93+
94+ // 定义请求结构体
95+ type ProxyRequest struct {
96+ Request * http.Request
97+ Response http.ResponseWriter
98+ Done chan bool // 用于通知请求处理完成
99+ }
100+
101+ // Worker结构体,表示一个工作协程
102+ type Worker struct {
103+ ID int
104+ TaskQueue chan * ProxyRequest // 任务队列
105+ Quit chan bool // 退出信号
106+ WorkerPool * WorkerPool // 所属工作池
107+ }
108+
109+ // 创建新的Worker
110+ func NewWorker (id int , workerPool * WorkerPool ) * Worker {
111+ return & Worker {
112+ ID : id ,
113+ TaskQueue : make (chan * ProxyRequest ),
114+ Quit : make (chan bool ),
115+ WorkerPool : workerPool ,
116+ }
117+ }
118+
119+ // Worker开始工作
120+ func (w * Worker ) Start () {
121+ go func () {
122+ for {
123+ // 将worker注册到工作池的空闲队列
124+ w .WorkerPool .WorkerQueue <- w .TaskQueue
125+
126+ select {
127+ case task := <- w .TaskQueue :
128+ // 处理请求
129+ w .WorkerPool .HandleFunc (task .Response , task .Request )
130+ task .Done <- true
131+ case <- w .Quit :
132+ // 收到退出信号
133+ return
134+ }
135+ }
136+ }()
137+ }
138+
139+ // Worker停止工作
140+ func (w * Worker ) Stop () {
141+ go func () {
142+ w .Quit <- true
143+ }()
144+ }
145+
146+ // WorkerPool结构体,管理工作协程池
147+ type WorkerPool struct {
148+ WorkerQueue chan chan * ProxyRequest // 空闲Worker队列
149+ TaskQueue chan * ProxyRequest // 任务队列
150+ MaxWorkers int // 最大Worker数量
151+ MaxQueue int // 最大队列长度
152+ HandleFunc func (http.ResponseWriter , * http.Request ) // 请求处理函数
153+ }
154+
155+ // 创建新的WorkerPool
156+ func NewWorkerPool (maxWorkers int , maxQueue int , handleFunc func (http.ResponseWriter , * http.Request )) * WorkerPool {
157+ pool := & WorkerPool {
158+ WorkerQueue : make (chan chan * ProxyRequest , maxWorkers ),
159+ TaskQueue : make (chan * ProxyRequest , maxQueue ),
160+ MaxWorkers : maxWorkers ,
161+ MaxQueue : maxQueue ,
162+ HandleFunc : handleFunc ,
163+ }
164+ return pool
165+ }
166+
167+ // 启动WorkerPool
168+ func (wp * WorkerPool ) Start () {
169+ // 创建并启动workers
170+ for i := 0 ; i < wp .MaxWorkers ; i ++ {
171+ worker := NewWorker (i , wp )
172+ worker .Start ()
173+ log .Printf ("[INFO] Started worker %d" , i )
174+ }
175+
176+ // 启动任务分发协程
177+ go wp .dispatch ()
178+ }
179+
180+ // 停止WorkerPool
181+ func (wp * WorkerPool ) Stop () {
182+ // TODO: 实现停止逻辑
183+ }
184+
185+ // 将任务分发给空闲worker
186+ func (wp * WorkerPool ) dispatch () {
187+ for {
188+ select {
189+ case task := <- wp .TaskQueue :
190+ // 等待空闲worker
191+ workerTaskQueue := <- wp .WorkerQueue
192+ // 将任务发送给worker
193+ workerTaskQueue <- task
194+ }
195+ }
196+ }
197+
198+ // 将请求提交到WorkerPool
199+ func (wp * WorkerPool ) Submit (response http.ResponseWriter , request * http.Request ) bool {
200+ task := & ProxyRequest {
201+ Request : request ,
202+ Response : response ,
203+ Done : make (chan bool , 1 ),
204+ }
205+
206+ select {
207+ case wp .TaskQueue <- task :
208+ // 请求成功加入队列
209+ <- task .Done // 等待任务完成
210+ return true
211+ default :
212+ // 队列已满,实现背压
213+ log .Println ("[WARN] Task queue is full, rejecting request" )
214+ http .Error (response , "Server is busy, please try again later" , http .StatusServiceUnavailable )
215+ return false
216+ }
80217}
81218
82219// ProxyHandler 处理 HTTP 代理请求
83220type ProxyHandler struct {
84- cfg * Config // 配置信息
85- keyPool * KeyPool // 密钥池
86- client * http.Client // HTTP 客户端
221+ cfg * Config // 配置信息
222+ keyPool * KeyPool // 密钥池
223+ client * http.Client // HTTP 客户端
224+ workerPool * WorkerPool // 工作协程池
87225}
88226
89227// NewProxyHandler 创建 ProxyHandler 实例
90228func NewProxyHandler (cfg * Config , keyPool * KeyPool ) * ProxyHandler {
91- return & ProxyHandler {
229+ handler := & ProxyHandler {
92230 cfg : cfg ,
93231 keyPool : keyPool ,
94232 client : & http.Client {},
95233 }
234+ return handler
235+ }
236+
237+ // InitWorkerPool 初始化工作协程池
238+ func (ph * ProxyHandler ) InitWorkerPool (maxWorkers int , maxQueue int ) {
239+ ph .workerPool = NewWorkerPool (maxWorkers , maxQueue , ph .HandleRequest )
240+ ph .workerPool .Start ()
241+ log .Printf ("[INFO] Started worker pool with %d workers and queue size %d" , maxWorkers , maxQueue )
96242}
97243
98244// ServeHTTP 实现 HTTP 处理逻辑
99245func (ph * ProxyHandler ) ServeHTTP (w http.ResponseWriter , r * http.Request ) {
100246 // 记录接收到的请求
101247 log .Printf ("[INFO] Received request: %s %s" , r .Method , r .URL .String ())
102248
249+ // 将请求提交到工作池处理
250+ ph .workerPool .Submit (w , r )
251+ }
252+
253+ // HandleRequest 处理请求的方法,由Worker调用
254+ func (ph * ProxyHandler ) HandleRequest (w http.ResponseWriter , r * http.Request ) {
103255 // 验证客户端身份
104256 if ! ph .authenticate (r ) {
105257 log .Println ("[WARN] Unauthorized access attempt" )
@@ -182,14 +334,16 @@ func (ph *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
182334
183335// getUnusedKey 获取一个未使用过的密钥
184336func (ph * ProxyHandler ) getUnusedKey (attempted map [string ]bool ) string {
185- ph .keyPool .mu .Lock ()
186- defer ph .keyPool .mu .Unlock ()
187- for _ , key := range ph .keyPool .keys {
188- if ! attempted [key ] {
189- return key
190- }
337+ key := ph .keyPool .GetRandomKey ()
338+ // 如果获取到的密钥已使用过,则尝试其他密钥
339+ for attempted [key ] && len (attempted ) < len (ph .keyPool .keys ) {
340+ key = ph .keyPool .GetRandomKey ()
191341 }
192- return ""
342+ // 如果所有密钥都已尝试过,返回空字符串
343+ if attempted [key ] {
344+ return ""
345+ }
346+ return key
193347}
194348
195349// authenticate 验证客户端身份
@@ -295,12 +449,9 @@ func (ph *ProxyHandler) extractModelFromRequest(r *http.Request) (string, error)
295449 return "" , nil
296450}
297451
298- // maskKey 用于掩码密钥,保护敏感信息
452+ // maskKey 直接返回原始密钥,不再进行掩码处理
299453func maskKey (key string ) string {
300- if len (key ) > 4 {
301- return key [:4 ] + "****"
302- }
303- return "****"
454+ return key
304455}
305456
306457// main 函数,启动代理服务器
@@ -311,8 +462,8 @@ func main() {
311462 log .Println ("[ERROR] Missing required flags: --key-file, --target-url, --password" )
312463 os .Exit (1 )
313464 }
314- log .Printf ("[INFO] Configuration loaded: KeyFile=%s, TargetURL=%s, Address=%s, Port=%s" ,
315- cfg .KeyFile , cfg .TargetURL , cfg .Address , cfg .Port )
465+ log .Printf ("[INFO] Configuration loaded: KeyFile=%s, TargetURL=%s, Address=%s, Port=%s, MaxWorkers=%d, MaxQueue=%d " ,
466+ cfg .KeyFile , cfg .TargetURL , cfg .Address , cfg .Port , cfg . MaxWorkers , cfg . MaxQueue )
316467
317468 // 初始化密钥池
318469 keyPool , err := NewKeyPool (cfg .KeyFile )
@@ -323,6 +474,9 @@ func main() {
323474
324475 // 创建代理处理器
325476 proxyHandler := NewProxyHandler (cfg , keyPool )
477+
478+ // 初始化并启动工作池
479+ proxyHandler .InitWorkerPool (cfg .MaxWorkers , cfg .MaxQueue )
326480
327481 // 启动服务器
328482 addr := cfg .Address + ":" + cfg .Port
0 commit comments