Skip to content

Commit 8c41fdb

Browse files
author
john
committed
临时保存:triple泛化调用相关文件
1 parent 5aba962 commit 8c41fdb

30 files changed

Lines changed: 9990 additions & 0 deletions
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package connection
19+
20+
import (
21+
"context"
22+
"time"
23+
24+
"dubbo.apache.org/dubbo-go/v3/common"
25+
)
26+
27+
// ConnectionState represents the state of a connection
28+
type ConnectionState int32
29+
30+
const (
31+
// StateIdle indicates the connection is idle
32+
StateIdle ConnectionState = iota
33+
// StateConnecting indicates the connection is being established
34+
StateConnecting
35+
// StateReady indicates the connection is ready for use
36+
StateReady
37+
// StateTransientFailure indicates the connection has failed temporarily
38+
StateTransientFailure
39+
// StateShutdown indicates the connection has been shutdown
40+
StateShutdown
41+
)
42+
43+
// String returns the string representation of ConnectionState
44+
func (cs ConnectionState) String() string {
45+
switch cs {
46+
case StateIdle:
47+
return "IDLE"
48+
case StateConnecting:
49+
return "CONNECTING"
50+
case StateReady:
51+
return "READY"
52+
case StateTransientFailure:
53+
return "TRANSIENT_FAILURE"
54+
case StateShutdown:
55+
return "SHUTDOWN"
56+
default:
57+
return "UNKNOWN"
58+
}
59+
}
60+
61+
// Connection represents a unified connection interface across all protocols
62+
type Connection interface {
63+
// GetState returns the current state of the connection
64+
GetState() ConnectionState
65+
66+
// GetURL returns the URL associated with this connection
67+
GetURL() *common.URL
68+
69+
// IsAvailable checks if the connection is available for use
70+
IsAvailable() bool
71+
72+
// Close closes the connection
73+
Close() error
74+
75+
// GetLastActive returns the last active time of the connection
76+
GetLastActive() time.Time
77+
78+
// GetProtocol returns the protocol name (dubbo, triple, grpc, etc.)
79+
GetProtocol() string
80+
}
81+
82+
// ConnectionStats provides statistics about connections
83+
type ConnectionStats struct {
84+
TotalConnections int
85+
ActiveConnections int
86+
IdleConnections int
87+
FailedConnections int
88+
ReconnectAttempts int
89+
LastReconnectTime time.Time
90+
}
91+
92+
// HealthCheckResult represents the result of a health check
93+
type HealthCheckResult struct {
94+
Healthy bool
95+
Reason string
96+
CheckTime time.Time
97+
Duration time.Duration
98+
}
99+
100+
// HealthChecker defines the interface for connection health checking
101+
type HealthChecker interface {
102+
// CheckConnection performs health check on a specific connection
103+
CheckConnection(ctx context.Context, conn Connection) *HealthCheckResult
104+
105+
// GetProtocol returns the protocol this health checker supports
106+
GetProtocol() string
107+
108+
// GetCheckInterval returns the recommended check interval
109+
GetCheckInterval() time.Duration
110+
}
111+
112+
// ConnectionPool manages connections for a specific protocol
113+
type ConnectionPool interface {
114+
// GetConnection retrieves a healthy connection for the given URL
115+
GetConnection(url *common.URL) (Connection, error)
116+
117+
// RemoveConnection removes a connection from the pool
118+
RemoveConnection(conn Connection) error
119+
120+
// RemoveStaleConnections removes all stale connections for the given URL
121+
RemoveStaleConnections(url *common.URL) int
122+
123+
// GetStats returns connection pool statistics
124+
GetStats() *ConnectionStats
125+
126+
// Close closes all connections in the pool
127+
Close() error
128+
}
129+
130+
// ConnectionManager provides unified connection management across all protocols
131+
type ConnectionManager interface {
132+
// RegisterProtocol registers a connection pool and health checker for a protocol
133+
RegisterProtocol(protocol string, pool ConnectionPool, checker HealthChecker) error
134+
135+
// GetConnection gets a healthy connection for the specified URL
136+
GetConnection(url *common.URL) (Connection, error)
137+
138+
// IsConnectionHealthy checks if the connection is healthy
139+
IsConnectionHealthy(conn Connection) bool
140+
141+
// RemoveStaleConnections removes stale connections for the given URL
142+
RemoveStaleConnections(url *common.URL) int
143+
144+
// GetGlobalStats returns global connection statistics across all protocols
145+
GetGlobalStats() map[string]*ConnectionStats
146+
147+
// StartHealthCheckLoop starts the background health check loop
148+
StartHealthCheckLoop(ctx context.Context)
149+
150+
// Close closes all connections across all protocols
151+
Close() error
152+
}
153+
154+
// StateChangeCallback is called when connection state changes
155+
type StateChangeCallback func(conn Connection, oldState, newState ConnectionState)
156+
157+
// ConnectionEventListener listens to connection events
158+
type ConnectionEventListener interface {
159+
// OnStateChange is called when connection state changes
160+
OnStateChange(conn Connection, oldState, newState ConnectionState)
161+
162+
// OnHealthCheckFailed is called when health check fails
163+
OnHealthCheckFailed(conn Connection, result *HealthCheckResult)
164+
165+
// OnConnectionRemoved is called when a connection is removed
166+
OnConnectionRemoved(conn Connection, reason string)
167+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package connection
19+
20+
import (
21+
"time"
22+
23+
"dubbo.apache.org/dubbo-go/v3/common"
24+
"dubbo.apache.org/dubbo-go/v3/remoting"
25+
)
26+
27+
// DubboConnection adapts Dubbo's ExchangeClient to the unified Connection interface
28+
// This provides a unified interface for Issue #1868 health checking
29+
type DubboConnection struct {
30+
client *remoting.ExchangeClient
31+
url *common.URL
32+
lastActive time.Time
33+
}
34+
35+
// NewDubboConnection creates a new DubboConnection wrapper
36+
func NewDubboConnection(client *remoting.ExchangeClient, url *common.URL) *DubboConnection {
37+
return &DubboConnection{
38+
client: client,
39+
url: url,
40+
lastActive: time.Now(),
41+
}
42+
}
43+
44+
// GetState returns the current state of the connection
45+
func (dc *DubboConnection) GetState() ConnectionState {
46+
if dc.client == nil {
47+
return StateShutdown
48+
}
49+
50+
// Map ExchangeClient availability to our state model
51+
if dc.client.IsAvailable() {
52+
return StateReady
53+
}
54+
55+
// If not available, we need to determine if it's a transient failure
56+
// or if the connection is being established
57+
return StateTransientFailure
58+
}
59+
60+
// GetURL returns the URL associated with this connection
61+
func (dc *DubboConnection) GetURL() *common.URL {
62+
return dc.url
63+
}
64+
65+
// IsAvailable checks if the connection is available for use
66+
func (dc *DubboConnection) IsAvailable() bool {
67+
if dc.client == nil {
68+
return false
69+
}
70+
71+
// Delegate to the ExchangeClient's IsAvailable method
72+
// This uses our improved health checking from the ExchangeClient implementation
73+
return dc.client.IsAvailable()
74+
}
75+
76+
// Close closes the connection
77+
func (dc *DubboConnection) Close() error {
78+
if dc.client != nil {
79+
dc.client.Close()
80+
}
81+
return nil
82+
}
83+
84+
// GetLastActive returns the last active time of the connection
85+
func (dc *DubboConnection) GetLastActive() time.Time {
86+
return dc.lastActive
87+
}
88+
89+
// UpdateLastActive updates the last active time
90+
func (dc *DubboConnection) UpdateLastActive() {
91+
dc.lastActive = time.Now()
92+
}
93+
94+
// GetProtocol returns the protocol name
95+
func (dc *DubboConnection) GetProtocol() string {
96+
return "dubbo"
97+
}
98+
99+
// GetExchangeClient returns the underlying ExchangeClient
100+
// This allows protocol-specific operations while maintaining the unified interface
101+
func (dc *DubboConnection) GetExchangeClient() *remoting.ExchangeClient {
102+
return dc.client
103+
}
104+
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package connection
19+
20+
import (
21+
"context"
22+
"time"
23+
24+
"github.com/dubbogo/gost/log/logger"
25+
)
26+
27+
// DubboHealthChecker implements HealthChecker for Dubbo protocol
28+
// This addresses Issue #1868 by providing proper connection health detection
29+
// for Dubbo protocol without the circular dependency issues
30+
type DubboHealthChecker struct {
31+
checkInterval time.Duration
32+
maxIdleTime time.Duration
33+
}
34+
35+
// NewDubboHealthChecker creates a new Dubbo protocol health checker
36+
func NewDubboHealthChecker() *DubboHealthChecker {
37+
return &DubboHealthChecker{
38+
checkInterval: 30 * time.Second,
39+
maxIdleTime: 5 * time.Minute,
40+
}
41+
}
42+
43+
// CheckConnection performs health check on a Dubbo connection
44+
// This method solves the core Issue #1868 problem by checking connection health
45+
// without using the potentially stale connection itself (avoiding circular dependency)
46+
func (dhc *DubboHealthChecker) CheckConnection(ctx context.Context, conn Connection) *HealthCheckResult {
47+
startTime := time.Now()
48+
49+
result := &HealthCheckResult{
50+
Healthy: true,
51+
CheckTime: startTime,
52+
}
53+
54+
// 1. Basic state check
55+
state := conn.GetState()
56+
if state == StateShutdown || state == StateTransientFailure {
57+
result.Healthy = false
58+
result.Reason = "connection state is " + state.String()
59+
result.Duration = time.Since(startTime)
60+
return result
61+
}
62+
63+
// 2. Availability check using the connection's own IsAvailable method
64+
// This delegates to the underlying implementation (Getty) which knows
65+
// how to check session state without network I/O
66+
if !conn.IsAvailable() {
67+
result.Healthy = false
68+
result.Reason = "connection reports not available"
69+
result.Duration = time.Since(startTime)
70+
return result
71+
}
72+
73+
// 3. Idle time check
74+
lastActive := conn.GetLastActive()
75+
if !lastActive.IsZero() {
76+
idleTime := time.Since(lastActive)
77+
if idleTime > dhc.maxIdleTime {
78+
result.Healthy = false
79+
result.Reason = "connection idle too long: " + idleTime.String()
80+
result.Duration = time.Since(startTime)
81+
return result
82+
}
83+
}
84+
85+
// 4. For Dubbo protocol, we trust the Getty session state
86+
// Getty's internal session management is more reliable than
87+
// sending network requests which could fail due to the very
88+
// connection issues we're trying to detect
89+
90+
result.Duration = time.Since(startTime)
91+
92+
if result.Healthy {
93+
logger.Debugf("Dubbo connection health check passed for %s (took %v)",
94+
conn.GetURL().Location, result.Duration)
95+
} else {
96+
logger.Debugf("Dubbo connection health check failed for %s: %s (took %v)",
97+
conn.GetURL().Location, result.Reason, result.Duration)
98+
}
99+
100+
return result
101+
}
102+
103+
// GetProtocol returns the protocol this health checker supports
104+
func (dhc *DubboHealthChecker) GetProtocol() string {
105+
return "dubbo"
106+
}
107+
108+
// GetCheckInterval returns the recommended check interval
109+
func (dhc *DubboHealthChecker) GetCheckInterval() time.Duration {
110+
return dhc.checkInterval
111+
}
112+
113+
// SetCheckInterval sets the health check interval
114+
func (dhc *DubboHealthChecker) SetCheckInterval(interval time.Duration) {
115+
dhc.checkInterval = interval
116+
}
117+
118+
// SetMaxIdleTime sets the maximum idle time before considering connection stale
119+
func (dhc *DubboHealthChecker) SetMaxIdleTime(maxIdleTime time.Duration) {
120+
dhc.maxIdleTime = maxIdleTime
121+
}
122+

0 commit comments

Comments
 (0)