This repository was archived by the owner on Sep 5, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathbuf_server.go
More file actions
229 lines (192 loc) · 6.82 KB
/
buf_server.go
File metadata and controls
229 lines (192 loc) · 6.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
/*
* SPDX-FileCopyrightText: Hypermode Inc. <hello@hypermode.com>
* SPDX-License-Identifier: Apache-2.0
*/
package modusgraph
import (
"context"
"errors"
"fmt"
"log"
"net"
"strings"
"github.com/dgraph-io/dgo/v250"
"github.com/dgraph-io/dgo/v250/protos/api"
"github.com/hypermodeinc/dgraph/v25/x"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
)
// bufSize is the size of the buffer for the bufconn connection
const bufSize = 1024 * 1024 * 10
// serverWrapper wraps the edgraph.Server to provide proper context setup
type serverWrapper struct {
api.DgraphServer
engine *Engine
}
// Query implements the Dgraph Query method by delegating to the Engine
func (s *serverWrapper) Query(ctx context.Context, req *api.Request) (*api.Response, error) {
var ns *Namespace
nsID, err := x.ExtractNamespace(ctx)
if err != nil || nsID == 0 {
ns = s.engine.GetDefaultNamespace()
} else {
ns, err = s.engine.GetNamespace(nsID)
if err != nil {
return nil, fmt.Errorf("error getting namespace %d: %w", nsID, err)
}
}
s.engine.logger.V(2).Info("Query using namespace", "namespaceID", ns.ID())
if len(req.Mutations) > 0 {
s.engine.logger.V(3).Info("Mutating", "mutations", req.Mutations)
uids, err := ns.Mutate(ctx, req.Mutations)
if err != nil {
return nil, fmt.Errorf("engine mutation error: %w", err)
}
uidMap := make(map[string]string)
for k, v := range uids {
if strings.HasPrefix(k, "_:") {
uidMap[k[2:]] = fmt.Sprintf("0x%x", v)
} else {
uidMap[k] = fmt.Sprintf("0x%x", v)
}
}
return &api.Response{
Uids: uidMap,
}, nil
}
return ns.QueryWithVars(ctx, req.Query, req.Vars)
}
// CommitOrAbort implements the Dgraph CommitOrAbort method
func (s *serverWrapper) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.TxnContext, error) {
var ns *Namespace
nsID, err := x.ExtractNamespace(ctx)
if err != nil || nsID == 0 {
ns = s.engine.GetDefaultNamespace()
} else {
ns, err = s.engine.GetNamespace(nsID)
if err != nil {
return nil, fmt.Errorf("error getting namespace %d: %w", nsID, err)
}
}
s.engine.logger.V(2).Info("CommitOrAbort called with transaction", "transaction", tc, "namespaceID", ns.ID())
if tc.Aborted {
return tc, nil
}
// For commit, we need to make a dummy mutation that has no effect but will trigger the commit
// This approach uses an empty mutation with CommitNow:true to leverage the Engine's existing
// transaction commit mechanism
emptyMutation := &api.Mutation{
CommitNow: true,
}
// We can't directly attach the transaction ID to the context in this way,
// but the Server implementation should handle the transaction context
// using the StartTs value in the empty mutation
// Send the mutation through the Engine
_, err = ns.Mutate(ctx, []*api.Mutation{emptyMutation})
if err != nil {
return nil, fmt.Errorf("error committing transaction: %w", err)
}
s.engine.logger.V(2).Info("Transaction committed successfully")
response := &api.TxnContext{
StartTs: tc.StartTs,
CommitTs: tc.StartTs + 1, // We don't know the actual commit timestamp, but this works for testing
}
return response, nil
}
// Login implements the Dgraph Login method
func (s *serverWrapper) Login(ctx context.Context, req *api.LoginRequest) (*api.Response, error) {
// For security reasons, Authentication is not implemented in this wrapper
return nil, errors.New("authentication not implemented")
}
// Alter implements the Dgraph Alter method by delegating to the Engine
func (s *serverWrapper) Alter(ctx context.Context, op *api.Operation) (*api.Payload, error) {
var ns *Namespace
nsID, err := x.ExtractNamespace(ctx)
if err != nil || nsID == 0 {
ns = s.engine.GetDefaultNamespace()
} else {
ns, err = s.engine.GetNamespace(nsID)
if err != nil {
return nil, fmt.Errorf("error getting namespace %d: %w", nsID, err)
}
}
s.engine.logger.V(2).Info("Alter called with operation", "operation", op, "namespaceID", ns.ID())
switch {
case op.Schema != "":
err = ns.AlterSchema(ctx, op.Schema)
if err != nil {
s.engine.logger.Error(err, "Error altering schema")
return nil, fmt.Errorf("error altering schema: %w", err)
}
case op.DropAll:
err = ns.DropAll(ctx)
if err != nil {
s.engine.logger.Error(err, "Error dropping all")
return nil, fmt.Errorf("error dropping all: %w", err)
}
case op.DropOp != 0:
switch op.DropOp {
case api.Operation_DATA:
err = ns.DropData(ctx)
if err != nil {
s.engine.logger.Error(err, "Error dropping data")
return nil, fmt.Errorf("error dropping data: %w", err)
}
default:
s.engine.logger.Error(nil, "Unsupported drop operation")
return nil, fmt.Errorf("unsupported drop operation: %d", op.DropOp)
}
case op.DropAttr != "":
s.engine.logger.Error(nil, "Drop attribute not implemented yet")
return nil, errors.New("drop attribute not implemented yet")
default:
return nil, errors.New("unsupported alter operation")
}
return &api.Payload{}, nil
}
// CheckVersion implements the Dgraph CheckVersion method
func (s *serverWrapper) CheckVersion(ctx context.Context, check *api.Check) (*api.Version, error) {
// Return a version that matches what the client expects (TODO)
return &api.Version{
Tag: "v25.0.0", // Must match major version expected by client
}, nil
}
// setupBufconnServer creates a bufconn listener and starts a gRPC server with the Dgraph service
func setupBufconnServer(engine *Engine) (*bufconn.Listener, *grpc.Server) {
x.Config.LimitMutationsNquad = 1000000
x.Config.LimitQueryEdge = 10000000
lis := bufconn.Listen(bufSize)
server := grpc.NewServer()
// Register our server wrapper that properly handles context and routing
dgraphServer := &serverWrapper{engine: engine}
api.RegisterDgraphServer(server, dgraphServer)
// Start the server in a goroutine
go func() {
if err := server.Serve(lis); err != nil {
log.Printf("Server exited with error: %v", err)
}
}()
return lis, server
}
// bufDialer is the dialer function for bufconn
func bufDialer(listener *bufconn.Listener) func(context.Context, string) (net.Conn, error) {
return func(ctx context.Context, url string) (net.Conn, error) {
return listener.Dial()
}
}
// createDgraphClient creates a Dgraph client that connects to the bufconn server
func createDgraphClient(ctx context.Context, listener *bufconn.Listener) (*dgo.Dgraph, error) {
// Create a gRPC connection using the bufconn dialer
// nolint:staticcheck // SA1019: grpc.DialContext is deprecated
conn, err := grpc.DialContext(ctx, "bufnet",
grpc.WithContextDialer(bufDialer(listener)),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
// Create a Dgraph client
dgraphClient := api.NewDgraphClient(conn)
// nolint:staticcheck // SA1019: dgo.NewDgraphClient is deprecated but works with our current setup
return dgo.NewDgraphClient(dgraphClient), nil
}