-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcore.go
More file actions
176 lines (155 loc) · 5.9 KB
/
core.go
File metadata and controls
176 lines (155 loc) · 5.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package mongo
import (
"context"
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/fireflycore/go-mongo/internal"
"github.com/fireflycore/go-utils/network"
"github.com/fireflycore/go-utils/tlsx"
"go.mongodb.org/mongo-driver/v2/event"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.mongodb.org/mongo-driver/v2/mongo/readpref"
"go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/v2/mongo/otelmongo"
)
// New 根据配置创建 MongoDB 连接并返回数据库句柄。
func New(c *Conf) (*mongo.Database, error) {
if c == nil {
return nil, errors.New("mongo: conf is nil")
}
host, port, err := network.SplitHostPort(c.Address, "27017")
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
clientOptions := options.Client()
// 启用 otelmongo 插件(Tracing),自动记录 Mongo 命令 Span
clientOptions.Monitor = otelmongo.NewMonitor(otelmongo.WithCommandAttributeDisabled(false))
if c.Username != "" {
credential := options.Credential{
Username: c.Username,
}
if c.Password != "" {
credential.Password = c.Password
}
clientOptions.SetAuth(credential)
}
// 从配置生成 TLSConfig;tlsEnabled 表示是否启用 TLS。
tlsConfig, tlsEnabled, err := tlsx.NewTLSConfig(c.Tls)
// TLS 配置构造失败时直接返回错误。
if err != nil {
return nil, err
}
// 启用 TLS 时,将 TLSConfig 写入 clientOptions。
if tlsEnabled {
// 由 driver 使用该 TLS 配置建立安全连接。
clientOptions.TLSConfig = tlsConfig
clientOptions.TLSConfig.ServerName = host
}
// 将 address 组装为 MongoDB 标准 URI。
uri := fmt.Sprintf("mongodb://%s", net.JoinHostPort(host, port))
// 把 URI 应用到 clientOptions。
clientOptions.ApplyURI(uri)
// 设置 BSON 编解码行为。
clientOptions.SetBSONOptions(&options.BSONOptions{
UseLocalTimeZone: false, // 关闭本地时区,减少环境差异带来的时间解析偏差。
})
if c.MaxOpenConnects > 0 {
// 设置连接池最大连接数。
clientOptions.SetMaxPoolSize(uint64(c.MaxOpenConnects))
}
if c.ConnMaxLifeTime > 0 {
// 设置最大空闲时间。
clientOptions.SetMaxConnIdleTime(time.Second * time.Duration(c.ConnMaxLifeTime))
}
// 启用日志时,安装命令监控器以采集 Mongo 命令执行信息。
// 注意:go-mongo 原有 Logger 是通过 Monitor 实现的。
// 由于 clientOptions.Monitor 只能设置一个,而 otelmongo 也是一个 Monitor。
// 为了同时支持 Logs 和 Traces,我们需要把 otelmongo 和原有 Logger Monitor 串联起来。
// 但 mongo-driver 目前没有 ChainMonitor 的公开方法。
// 幸好 otelmongo.NewMonitor 返回的是 *event.CommandMonitor。
// 我们可以在原有 Monitor 的基础上,把 otelmongo 的回调函数合并进去。
otelMonitor := clientOptions.Monitor // 这是上面刚设置的 otelmongo monitor
if c.Logger {
logger := internal.NewLogger(&internal.Conf{ // 构造内部 logger 配置并返回 logger 实例。
SlowThreshold: 200 * time.Millisecond, // 慢查询阈值,超过则按 warn 输出。
Colorful: true, // 是否开启彩色控制台输出。
Database: c.Database, // 写入日志字段,用于区分数据库实例。
Console: c.loggerConsole, // 是否输出到控制台。
})
// stmts 用于缓存 RequestID 对应的命令文本,供结束事件读取。
var stmts sync.Map
// 保存 otelmongo 的原始回调
otelStarted := otelMonitor.Started
otelSucceeded := otelMonitor.Succeeded
otelFailed := otelMonitor.Failed
// 重新绑定命令监控回调(串联 otelmongo + internal logger)。
clientOptions.Monitor = &event.CommandMonitor{
// Started 在命令开始时触发。
Started: func(ctx context.Context, e *event.CommandStartedEvent) {
// 先执行 otelmongo 的逻辑 (Tracing)
if otelStarted != nil {
otelStarted(ctx, e)
}
// 再执行 internal logger 的逻辑 (Logging)
stmts.Store(e.RequestID, e.Command.String())
},
// Succeeded 在命令成功时触发。
Succeeded: func(ctx context.Context, e *event.CommandSucceededEvent) {
// 先执行 otelmongo 的逻辑
if otelSucceeded != nil {
otelSucceeded(ctx, e)
}
// 再执行 internal logger 的逻辑
// smt 用于保存命令字符串(若能从 map 中取到)。
var smt string
// 通过 RequestID 找到对应的命令文本。
if v, ok := stmts.Load(e.RequestID); ok {
// 做类型断言并赋值(失败则保持空字符串)。
smt, _ = v.(string)
// 取出后删除,避免 map 增长。
stmts.Delete(e.RequestID)
}
// 记录成功 Trace,err 字符串为空。
logger.Trace(ctx, e.RequestID, e.Duration, smt, "")
},
// Failed 在命令失败时触发。
Failed: func(ctx context.Context, e *event.CommandFailedEvent) {
// 先执行 otelmongo 的逻辑
if otelFailed != nil {
otelFailed(ctx, e)
}
// 再执行 internal logger 的逻辑
// smt 用于保存命令字符串(若能从 map 中取到)。
var smt string
// 通过 RequestID 找到对应的命令文本。
if v, ok := stmts.Load(e.RequestID); ok {
smt, _ = v.(string)
stmts.Delete(e.RequestID)
}
// 记录失败 Trace,err 为 driver 提供的失败信息。
if e.Failure != nil {
logger.Trace(ctx, e.RequestID, e.Duration, smt, e.Failure.Error())
} else {
logger.Trace(ctx, e.RequestID, e.Duration, smt, "")
}
},
}
}
// 用构造好的 options 建立客户端连接。
client, err := mongo.Connect(clientOptions)
if err != nil {
return nil, err
}
// Ping 用于验证连接可用与认证正确。
if err := client.Ping(ctx, readpref.Primary()); err != nil {
return nil, err
}
// 选择默认数据库并返回对应句柄。
db := client.Database(c.Database)
return db, nil
}