Skip to content

Commit b2fdea7

Browse files
committed
feat(handler): use middleware to handle requests that cannot be routed
1 parent 582c943 commit b2fdea7

4 files changed

Lines changed: 58 additions & 37 deletions

File tree

cluster/cluster_worker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func (w workerService) HandleRequest(_ context.Context, req *clusterpb.RequestMe
3535
Route: req.Route,
3636
Data: req.Data,
3737
}
38-
w.node.handler.localProcess(handlerNode, req.Id, s, msg)
38+
w.node.handler.localProcess(s, msg, handlerNode)
3939
return &clusterpb.MemberHandleResponse{}, nil
4040
}
4141

@@ -53,7 +53,7 @@ func (w workerService) HandleNotify(_ context.Context, req *clusterpb.NotifyMess
5353
Route: req.Route,
5454
Data: req.Data,
5555
}
56-
w.node.handler.localProcess(handler, 0, s, msg)
56+
w.node.handler.localProcess(s, msg, handler)
5757
return &clusterpb.MemberHandleResponse{}, nil
5858
}
5959

cluster/handler.go

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"math/rand"
88
"net"
99
"sort"
10-
"strings"
1110
"sync"
1211
"time"
1312

@@ -303,36 +302,20 @@ func (h *LocalHandler) processPacket(agent *agent, p *packet.Packet) error {
303302

304303
// processMessage 处理消息, 分发到本地或远程
305304
func (h *LocalHandler) processMessage(agent *agent, msg *message.Message) {
306-
var lastMid uint64
307-
switch msg.Type {
308-
case message.Request:
309-
lastMid = msg.ID
310-
case message.Notify:
311-
lastMid = 0
312-
default:
313-
log.Info("Invalid message type: " + msg.Type.String())
305+
if handlerNode, found := h.localHandlers[msg.Route]; found {
306+
h.localProcess(agent.session, msg, handlerNode)
314307
return
315308
}
316-
handlerNode, found := h.localHandlers[msg.Route]
317-
if !found {
318-
h.remoteProcess(agent.session, msg, false)
319-
} else {
320-
h.localProcess(handlerNode, lastMid, agent.session, msg)
321-
}
309+
h.remoteProcess(agent.session, msg, false)
322310
}
323311

324312
// remoteProcess 处理远程消息
325313
func (h *LocalHandler) remoteProcess(session *session.Session, msg *message.Message, noCopy bool) {
326-
index := strings.LastIndex(msg.Route, ".")
327-
if index < 0 {
328-
log.Info("nano/handler: invalid route %s", msg.Route)
329-
return
330-
}
331-
332-
service := msg.Route[:index]
314+
service := msg.Service()
333315
members := h.findMembers(service)
334316
if len(members) == 0 {
335-
log.Info("nano/handler: %s not found(forgot registered?)", msg.Route)
317+
// 没有服务节点, 转本地处理 404
318+
h.localProcess(session, msg, nil)
336319
return
337320
}
338321

@@ -347,7 +330,11 @@ func (h *LocalHandler) remoteProcess(session *session.Session, msg *message.Mess
347330
} else {
348331
member := h.node.opts.RemoteServiceRoute(service, session, members)
349332
if member == nil {
350-
log.Info("customize remoteServiceRoute handler: %s is not found", msg.Route)
333+
if env.Debug {
334+
log.Info("customize remoteServiceRoute handler: %s is not found", msg.Route)
335+
}
336+
// 有服务节点, 但自定义路由没匹配到, 转本地处理 404
337+
h.localProcess(session, msg, nil)
351338
return
352339
}
353340
remoteAddr = member.ServiceAddr
@@ -407,21 +394,27 @@ func (h *LocalHandler) remoteProcess(session *session.Session, msg *message.Mess
407394
}
408395

409396
// localProcess 本地处理
410-
func (h *LocalHandler) localProcess(handlerNode *npi.HandlerNode, lastMid uint64, session *session.Session, msg *message.Message) {
397+
func (h *LocalHandler) localProcess(session *session.Session, msg *message.Message, handlerNode *npi.HandlerNode) {
398+
// 计算响应 ID
399+
lastMid, err := msg.ResponseID()
400+
if err != nil {
401+
log.Info(err.Error() + ":" + msg.Type.String())
402+
return
403+
}
404+
405+
// 计算服务名
406+
service := msg.Service()
407+
408+
// 执行管道任务
411409
if pipe := h.pipeline; pipe != nil {
412-
err := pipe.Inbound().Process(session, msg)
410+
err = pipe.Inbound().Process(session, msg)
413411
if err != nil {
414412
log.Error("Pipeline process failed.", err)
415413
return
416414
}
417415
}
418-
index := strings.LastIndex(msg.Route, ".")
419-
if index < 0 {
420-
log.Info("nano/handler: invalid route %s", msg.Route)
421-
return
422-
}
423-
service := msg.Route[:index]
424416

417+
// 记录日志
425418
if env.Debug {
426419
log.Info("UID=%d, Message={%s}, Data=%v", session.UID(), msg.String(), msg.Data)
427420
}

cluster/handler_task.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package cluster
22

33
import (
4+
"github.com/lonng/nano/internal/env"
5+
"github.com/lonng/nano/internal/log"
46
"github.com/lonng/nano/npi"
57
"github.com/lonng/nano/protocal/message"
68
"github.com/lonng/nano/session"
@@ -9,7 +11,7 @@ import (
911
// 任务对象
1012
type localHandlerTask struct {
1113
localHandler *LocalHandler
12-
handlerNode *npi.HandlerNode
14+
handlerNode *npi.HandlerNode // 只有该字段允许为 nil
1315
session *session.Session
1416
msg *message.Message
1517
service string
@@ -18,6 +20,7 @@ type localHandlerTask struct {
1820

1921
// run 运行任务
2022
func (task *localHandlerTask) run() {
23+
// 定义变量
2124
h := task.localHandler
2225
handlerNode := task.handlerNode
2326
s := task.session
@@ -47,12 +50,15 @@ func (task *localHandlerTask) run() {
4750
c.Msg = msg
4851
c.Session = s
4952
// 有路由
50-
if handlerNode.Len() > 0 {
53+
if handlerNode != nil && handlerNode.Len() > 0 {
5154
c.HandlerNode = handlerNode
5255
c.Next()
5356
return
5457
}
5558
// 无路由
59+
if env.Debug {
60+
log.Info("nano/handler: %s not found(forgot registered?)", msg.Route)
61+
}
5662
c.HandlerNode = h.allNoRoutes
5763
c.Next()
5864
}

protocal/message/message.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ package message
22

33
import (
44
"fmt"
5+
"strings"
56
)
67

7-
// Message represents a unmarshalled message or a message which to be marshaled
8+
// Message represents an unmarshalled message or a message which to be marshaled
89
type Message struct {
910
Type Type // message type
1011
ID uint64 // unique id, zero while notify mode
@@ -18,6 +19,27 @@ func New() *Message {
1819
return &Message{}
1920
}
2021

22+
// ResponseID 获取响应 ID
23+
func (m *Message) ResponseID() (uint64, error) {
24+
switch m.Type {
25+
case Request:
26+
return m.ID, nil
27+
case Notify:
28+
return 0, nil
29+
default:
30+
return 0, ErrWrongMessageType
31+
}
32+
}
33+
34+
// Service 获取服务名
35+
func (m *Message) Service() string {
36+
route := m.Route
37+
if index := strings.LastIndex(route, "."); index != -1 {
38+
return route[:index]
39+
}
40+
return ""
41+
}
42+
2143
// String, implementation of fmt.Stringer interface
2244
func (m *Message) String() string {
2345
return fmt.Sprintf("%s %s (%dbytes)", types[m.Type], m.Route, len(m.Data))

0 commit comments

Comments
 (0)