From 0aa55e861e6822151fcbfa34a727b4c4fdfa23ce Mon Sep 17 00:00:00 2001 From: Mohammad Date: Tue, 6 Jan 2026 10:00:15 +0330 Subject: [PATCH 1/2] feat: implement chunked user synchronization with gRPC and REST APIs --- common/service.pb.go | 142 ++++++++++++++++++++++++--------- common/service.proto | 7 ++ common/service_grpc.pb.go | 33 +++++++- controller/rest/rest_test.go | 83 +++++++++++++++++++ controller/rest/service.go | 1 + controller/rest/user.go | 63 +++++++++++++++ controller/rpc/middleware.go | 1 + controller/rpc/rpc_test.go | 65 ++++++++++++++- controller/rpc/user.go | 39 +++++++++ controller/users_chunk.go | 29 +++++++ controller/users_chunk_test.go | 48 +++++++++++ 11 files changed, 473 insertions(+), 38 deletions(-) create mode 100644 controller/users_chunk.go create mode 100644 controller/users_chunk_test.go diff --git a/common/service.pb.go b/common/service.pb.go index dd49325..ec396e1 100644 --- a/common/service.pb.go +++ b/common/service.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.6 -// protoc v6.31.1 +// protoc-gen-go v1.36.10 +// protoc v3.21.12 // source: common/service.proto package common @@ -1182,6 +1182,66 @@ func (x *Users) GetUsers() []*User { return nil } +type UsersChunk struct { + state protoimpl.MessageState `protogen:"open.v1"` + Users []*User `protobuf:"bytes,1,rep,name=users,proto3" json:"users,omitempty"` + Index uint64 `protobuf:"varint,2,opt,name=index,proto3" json:"index,omitempty"` + Last bool `protobuf:"varint,3,opt,name=last,proto3" json:"last,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *UsersChunk) Reset() { + *x = UsersChunk{} + mi := &file_common_service_proto_msgTypes[18] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *UsersChunk) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UsersChunk) ProtoMessage() {} + +func (x *UsersChunk) ProtoReflect() protoreflect.Message { + mi := &file_common_service_proto_msgTypes[18] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UsersChunk.ProtoReflect.Descriptor instead. +func (*UsersChunk) Descriptor() ([]byte, []int) { + return file_common_service_proto_rawDescGZIP(), []int{18} +} + +func (x *UsersChunk) GetUsers() []*User { + if x != nil { + return x.Users + } + return nil +} + +func (x *UsersChunk) GetIndex() uint64 { + if x != nil { + return x.Index + } + return 0 +} + +func (x *UsersChunk) GetLast() bool { + if x != nil { + return x.Last + } + return false +} + var File_common_service_proto protoreflect.FileDescriptor const file_common_service_proto_rawDesc = "" + @@ -1261,7 +1321,12 @@ const file_common_service_proto_rawDesc = "" + "\aproxies\x18\x02 \x01(\v2\x0e.service.ProxyR\aproxies\x12\x1a\n" + "\binbounds\x18\x03 \x03(\tR\binbounds\",\n" + "\x05Users\x12#\n" + - "\x05users\x18\x01 \x03(\v2\r.service.UserR\x05users*\x17\n" + + "\x05users\x18\x01 \x03(\v2\r.service.UserR\x05users\"[\n" + + "\n" + + "UsersChunk\x12#\n" + + "\x05users\x18\x01 \x03(\v2\r.service.UserR\x05users\x12\x14\n" + + "\x05index\x18\x02 \x01(\x04R\x05index\x12\x12\n" + + "\x04last\x18\x03 \x01(\bR\x04last*\x17\n" + "\vBackendType\x12\b\n" + "\x04XRAY\x10\x00*_\n" + "\bStatType\x12\r\n" + @@ -1270,7 +1335,7 @@ const file_common_service_proto_rawDesc = "" + "\bInbounds\x10\x02\x12\v\n" + "\aInbound\x10\x03\x12\r\n" + "\tUsersStat\x10\x04\x12\f\n" + - "\bUserStat\x10\x052\x9a\x05\n" + + "\bUserStat\x10\x052\xd7\x05\n" + "\vNodeService\x126\n" + "\x05Start\x12\x10.service.Backend\x1a\x19.service.BaseInfoResponse\"\x00\x12(\n" + "\x04Stop\x12\x0e.service.Empty\x1a\x0e.service.Empty\"\x00\x12:\n" + @@ -1282,7 +1347,8 @@ const file_common_service_proto_rawDesc = "" + "\x12GetUserOnlineStats\x12\x14.service.StatRequest\x1a\x1b.service.OnlineStatResponse\"\x00\x12V\n" + "\x18GetUserOnlineIpListStats\x12\x14.service.StatRequest\x1a\".service.StatsOnlineIpListResponse\"\x00\x12-\n" + "\bSyncUser\x12\r.service.User\x1a\x0e.service.Empty\"\x00(\x01\x12-\n" + - "\tSyncUsers\x12\x0e.service.Users\x1a\x0e.service.Empty\"\x00B#Z!github.com/pasarguard/node/commonb\x06proto3" + "\tSyncUsers\x12\x0e.service.Users\x1a\x0e.service.Empty\"\x00\x12;\n" + + "\x10SyncUsersChunked\x12\x13.service.UsersChunk\x1a\x0e.service.Empty\"\x00(\x01B#Z!github.com/pasarguard/node/commonb\x06proto3" var ( file_common_service_proto_rawDescOnce sync.Once @@ -1297,7 +1363,7 @@ func file_common_service_proto_rawDescGZIP() []byte { } var file_common_service_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_common_service_proto_msgTypes = make([]protoimpl.MessageInfo, 19) +var file_common_service_proto_msgTypes = make([]protoimpl.MessageInfo, 20) var file_common_service_proto_goTypes = []any{ (BackendType)(0), // 0: service.BackendType (StatType)(0), // 1: service.StatType @@ -1319,47 +1385,51 @@ var file_common_service_proto_goTypes = []any{ (*Proxy)(nil), // 17: service.Proxy (*User)(nil), // 18: service.User (*Users)(nil), // 19: service.Users - nil, // 20: service.StatsOnlineIpListResponse.IpsEntry + (*UsersChunk)(nil), // 20: service.UsersChunk + nil, // 21: service.StatsOnlineIpListResponse.IpsEntry } var file_common_service_proto_depIdxs = []int32{ 0, // 0: service.Backend.type:type_name -> service.BackendType 18, // 1: service.Backend.users:type_name -> service.User 6, // 2: service.StatResponse.stats:type_name -> service.Stat 1, // 3: service.StatRequest.type:type_name -> service.StatType - 20, // 4: service.StatsOnlineIpListResponse.ips:type_name -> service.StatsOnlineIpListResponse.IpsEntry + 21, // 4: service.StatsOnlineIpListResponse.ips:type_name -> service.StatsOnlineIpListResponse.IpsEntry 13, // 5: service.Proxy.vmess:type_name -> service.Vmess 14, // 6: service.Proxy.vless:type_name -> service.Vless 15, // 7: service.Proxy.trojan:type_name -> service.Trojan 16, // 8: service.Proxy.shadowsocks:type_name -> service.Shadowsocks 17, // 9: service.User.proxies:type_name -> service.Proxy 18, // 10: service.Users.users:type_name -> service.User - 4, // 11: service.NodeService.Start:input_type -> service.Backend - 2, // 12: service.NodeService.Stop:input_type -> service.Empty - 2, // 13: service.NodeService.GetBaseInfo:input_type -> service.Empty - 2, // 14: service.NodeService.GetLogs:input_type -> service.Empty - 2, // 15: service.NodeService.GetSystemStats:input_type -> service.Empty - 2, // 16: service.NodeService.GetBackendStats:input_type -> service.Empty - 8, // 17: service.NodeService.GetStats:input_type -> service.StatRequest - 8, // 18: service.NodeService.GetUserOnlineStats:input_type -> service.StatRequest - 8, // 19: service.NodeService.GetUserOnlineIpListStats:input_type -> service.StatRequest - 18, // 20: service.NodeService.SyncUser:input_type -> service.User - 19, // 21: service.NodeService.SyncUsers:input_type -> service.Users - 3, // 22: service.NodeService.Start:output_type -> service.BaseInfoResponse - 2, // 23: service.NodeService.Stop:output_type -> service.Empty - 3, // 24: service.NodeService.GetBaseInfo:output_type -> service.BaseInfoResponse - 5, // 25: service.NodeService.GetLogs:output_type -> service.Log - 12, // 26: service.NodeService.GetSystemStats:output_type -> service.SystemStatsResponse - 11, // 27: service.NodeService.GetBackendStats:output_type -> service.BackendStatsResponse - 7, // 28: service.NodeService.GetStats:output_type -> service.StatResponse - 9, // 29: service.NodeService.GetUserOnlineStats:output_type -> service.OnlineStatResponse - 10, // 30: service.NodeService.GetUserOnlineIpListStats:output_type -> service.StatsOnlineIpListResponse - 2, // 31: service.NodeService.SyncUser:output_type -> service.Empty - 2, // 32: service.NodeService.SyncUsers:output_type -> service.Empty - 22, // [22:33] is the sub-list for method output_type - 11, // [11:22] is the sub-list for method input_type - 11, // [11:11] is the sub-list for extension type_name - 11, // [11:11] is the sub-list for extension extendee - 0, // [0:11] is the sub-list for field type_name + 18, // 11: service.UsersChunk.users:type_name -> service.User + 4, // 12: service.NodeService.Start:input_type -> service.Backend + 2, // 13: service.NodeService.Stop:input_type -> service.Empty + 2, // 14: service.NodeService.GetBaseInfo:input_type -> service.Empty + 2, // 15: service.NodeService.GetLogs:input_type -> service.Empty + 2, // 16: service.NodeService.GetSystemStats:input_type -> service.Empty + 2, // 17: service.NodeService.GetBackendStats:input_type -> service.Empty + 8, // 18: service.NodeService.GetStats:input_type -> service.StatRequest + 8, // 19: service.NodeService.GetUserOnlineStats:input_type -> service.StatRequest + 8, // 20: service.NodeService.GetUserOnlineIpListStats:input_type -> service.StatRequest + 18, // 21: service.NodeService.SyncUser:input_type -> service.User + 19, // 22: service.NodeService.SyncUsers:input_type -> service.Users + 20, // 23: service.NodeService.SyncUsersChunked:input_type -> service.UsersChunk + 3, // 24: service.NodeService.Start:output_type -> service.BaseInfoResponse + 2, // 25: service.NodeService.Stop:output_type -> service.Empty + 3, // 26: service.NodeService.GetBaseInfo:output_type -> service.BaseInfoResponse + 5, // 27: service.NodeService.GetLogs:output_type -> service.Log + 12, // 28: service.NodeService.GetSystemStats:output_type -> service.SystemStatsResponse + 11, // 29: service.NodeService.GetBackendStats:output_type -> service.BackendStatsResponse + 7, // 30: service.NodeService.GetStats:output_type -> service.StatResponse + 9, // 31: service.NodeService.GetUserOnlineStats:output_type -> service.OnlineStatResponse + 10, // 32: service.NodeService.GetUserOnlineIpListStats:output_type -> service.StatsOnlineIpListResponse + 2, // 33: service.NodeService.SyncUser:output_type -> service.Empty + 2, // 34: service.NodeService.SyncUsers:output_type -> service.Empty + 2, // 35: service.NodeService.SyncUsersChunked:output_type -> service.Empty + 24, // [24:36] is the sub-list for method output_type + 12, // [12:24] is the sub-list for method input_type + 12, // [12:12] is the sub-list for extension type_name + 12, // [12:12] is the sub-list for extension extendee + 0, // [0:12] is the sub-list for field type_name } func init() { file_common_service_proto_init() } @@ -1373,7 +1443,7 @@ func file_common_service_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_common_service_proto_rawDesc), len(file_common_service_proto_rawDesc)), NumEnums: 2, - NumMessages: 19, + NumMessages: 20, NumExtensions: 0, NumServices: 1, }, diff --git a/common/service.proto b/common/service.proto index f21a9da..05ef71a 100644 --- a/common/service.proto +++ b/common/service.proto @@ -125,6 +125,12 @@ message Users { repeated User users = 1; } +message UsersChunk { + repeated User users = 1; + uint64 index = 2; + bool last = 3; +} + // Service for node management and connection service NodeService { rpc Start (Backend) returns (BaseInfoResponse) {} @@ -143,4 +149,5 @@ service NodeService { rpc SyncUser (stream User) returns (Empty) {} rpc SyncUsers (Users) returns (Empty) {} + rpc SyncUsersChunked (stream UsersChunk) returns (Empty) {} } diff --git a/common/service_grpc.pb.go b/common/service_grpc.pb.go index 634d957..e4525e5 100644 --- a/common/service_grpc.pb.go +++ b/common/service_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.5.1 -// - protoc v6.31.1 +// - protoc v3.21.12 // source: common/service.proto package common @@ -30,6 +30,7 @@ const ( NodeService_GetUserOnlineIpListStats_FullMethodName = "/service.NodeService/GetUserOnlineIpListStats" NodeService_SyncUser_FullMethodName = "/service.NodeService/SyncUser" NodeService_SyncUsers_FullMethodName = "/service.NodeService/SyncUsers" + NodeService_SyncUsersChunked_FullMethodName = "/service.NodeService/SyncUsersChunked" ) // NodeServiceClient is the client API for NodeService service. @@ -49,6 +50,7 @@ type NodeServiceClient interface { GetUserOnlineIpListStats(ctx context.Context, in *StatRequest, opts ...grpc.CallOption) (*StatsOnlineIpListResponse, error) SyncUser(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[User, Empty], error) SyncUsers(ctx context.Context, in *Users, opts ...grpc.CallOption) (*Empty, error) + SyncUsersChunked(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[UsersChunk, Empty], error) } type nodeServiceClient struct { @@ -181,6 +183,19 @@ func (c *nodeServiceClient) SyncUsers(ctx context.Context, in *Users, opts ...gr return out, nil } +func (c *nodeServiceClient) SyncUsersChunked(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[UsersChunk, Empty], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &NodeService_ServiceDesc.Streams[2], NodeService_SyncUsersChunked_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[UsersChunk, Empty]{ClientStream: stream} + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type NodeService_SyncUsersChunkedClient = grpc.ClientStreamingClient[UsersChunk, Empty] + // NodeServiceServer is the server API for NodeService service. // All implementations must embed UnimplementedNodeServiceServer // for forward compatibility. @@ -198,6 +213,7 @@ type NodeServiceServer interface { GetUserOnlineIpListStats(context.Context, *StatRequest) (*StatsOnlineIpListResponse, error) SyncUser(grpc.ClientStreamingServer[User, Empty]) error SyncUsers(context.Context, *Users) (*Empty, error) + SyncUsersChunked(grpc.ClientStreamingServer[UsersChunk, Empty]) error mustEmbedUnimplementedNodeServiceServer() } @@ -241,6 +257,9 @@ func (UnimplementedNodeServiceServer) SyncUser(grpc.ClientStreamingServer[User, func (UnimplementedNodeServiceServer) SyncUsers(context.Context, *Users) (*Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method SyncUsers not implemented") } +func (UnimplementedNodeServiceServer) SyncUsersChunked(grpc.ClientStreamingServer[UsersChunk, Empty]) error { + return status.Errorf(codes.Unimplemented, "method SyncUsersChunked not implemented") +} func (UnimplementedNodeServiceServer) mustEmbedUnimplementedNodeServiceServer() {} func (UnimplementedNodeServiceServer) testEmbeddedByValue() {} @@ -442,6 +461,13 @@ func _NodeService_SyncUsers_Handler(srv interface{}, ctx context.Context, dec fu return interceptor(ctx, in, info, handler) } +func _NodeService_SyncUsersChunked_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(NodeServiceServer).SyncUsersChunked(&grpc.GenericServerStream[UsersChunk, Empty]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type NodeService_SyncUsersChunkedServer = grpc.ClientStreamingServer[UsersChunk, Empty] + // NodeService_ServiceDesc is the grpc.ServiceDesc for NodeService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -497,6 +523,11 @@ var NodeService_ServiceDesc = grpc.ServiceDesc{ Handler: _NodeService_SyncUser_Handler, ClientStreams: true, }, + { + StreamName: "SyncUsersChunked", + Handler: _NodeService_SyncUsersChunked_Handler, + ClientStreams: true, + }, }, Metadata: "common/service.proto", } diff --git a/controller/rest/rest_test.go b/controller/rest/rest_test.go index a0daf19..0d31454 100644 --- a/controller/rest/rest_test.go +++ b/controller/rest/rest_test.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "encoding/binary" "errors" "fmt" "io" @@ -271,6 +272,88 @@ func TestREST_SyncUser(t *testing.T) { } } +func TestREST_SyncUsersChunked(t *testing.T) { + firstChunk := &common.UsersChunk{ + Index: 0, + Users: []*common.User{ + { + Email: "chunk_rest_user1@example.com", + Inbounds: []string{ + "VMESS TCP NOTLS", + "TROJAN TCP NOTLS", + }, + Proxies: &common.Proxy{ + Vmess: &common.Vmess{ + Id: uuid.New().String(), + }, + Trojan: &common.Trojan{ + Password: "try a random string", + }, + }, + }, + }, + } + + secondChunk := &common.UsersChunk{ + Index: 1, + Users: []*common.User{ + { + Email: "chunk_rest_user2@example.com", + Inbounds: []string{ + "Shadowsocks TCP", + "Shadowsocks UDP", + }, + Proxies: &common.Proxy{ + Shadowsocks: &common.Shadowsocks{ + Password: "try a random string", + Method: "aes-256-gcm", + }, + }, + }, + }, + Last: true, + } + + var body bytes.Buffer + appendChunk := func(chunk *common.UsersChunk) { + data, err := proto.Marshal(chunk) + if err != nil { + t.Fatalf("failed to marshal chunk: %v", err) + } + + var lenBuf [binary.MaxVarintLen64]byte + n := binary.PutUvarint(lenBuf[:], uint64(len(data))) + body.Write(lenBuf[:n]) + body.Write(data) + } + + appendChunk(firstChunk) + appendChunk(secondChunk) + + req, err := http.NewRequest("PUT", sharedTestCtx.url+"/users/sync/chunked", bytes.NewReader(body.Bytes())) + if err != nil { + t.Fatalf("failed to create request: %v", err) + } + req.Header.Set("x-api-key", apiKey.String()) + req.Header.Set("Content-Type", "application/x-protobuf") + + resp, err := sharedTestCtx.client.Do(req) + if err != nil { + t.Fatalf("failed to send chunked request: %v", err) + } + defer resp.Body.Close() + + respBody, _ := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status %d: %s", resp.StatusCode, string(respBody)) + } + + var empty common.Empty + if err = proto.Unmarshal(respBody, &empty); err != nil { + t.Fatalf("failed to decode response: %v", err) + } +} + func TestREST_GetLogsStream(t *testing.T) { reader, err := sharedTestCtx.createAuthenticatedStreamingRequest("GET", "/logs") if err != nil { diff --git a/controller/rest/service.go b/controller/rest/service.go index ff9cbfd..e3e68d7 100644 --- a/controller/rest/service.go +++ b/controller/rest/service.go @@ -49,6 +49,7 @@ func (s *Service) setRouter() { }) private.Put("/user/sync", s.SyncUser) private.Put("/users/sync", s.SyncUsers) + private.Put("/users/sync/chunked", s.SyncUsersChunked) }) s.Router = router diff --git a/controller/rest/user.go b/controller/rest/user.go index d836d19..e908c84 100644 --- a/controller/rest/user.go +++ b/controller/rest/user.go @@ -1,6 +1,10 @@ package rest import ( + "bufio" + "encoding/binary" + "errors" + "fmt" "io" "log" "net/http" @@ -8,6 +12,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/pasarguard/node/common" + "github.com/pasarguard/node/controller" ) func (s *Service) SyncUser(w http.ResponseWriter, r *http.Request) { @@ -73,3 +78,61 @@ func (s *Service) SyncUsers(w http.ResponseWriter, r *http.Request) { return } } + +func (s *Service) SyncUsersChunked(w http.ResponseWriter, r *http.Request) { + reader := bufio.NewReader(r.Body) + defer r.Body.Close() + + chunks := make(map[uint64][]*common.User) + var ( + lastIndex uint64 + sawLast bool + ) + + for { + size, err := binary.ReadUvarint(reader) + if errors.Is(err, io.EOF) { + break + } + if err != nil { + http.Error(w, fmt.Sprintf("failed to read chunk length: %v", err), http.StatusBadRequest) + return + } + if size == 0 { + continue + } + + payload := make([]byte, size) + if _, err = io.ReadFull(reader, payload); err != nil { + http.Error(w, fmt.Sprintf("failed to read chunk payload: %v", err), http.StatusBadRequest) + return + } + + chunk := &common.UsersChunk{} + if err = proto.Unmarshal(payload, chunk); err != nil { + http.Error(w, fmt.Sprintf("failed to decode chunk: %v", err), http.StatusBadRequest) + return + } + + chunks[chunk.GetIndex()] = append(chunks[chunk.GetIndex()], chunk.GetUsers()...) + + if chunk.GetLast() { + sawLast = true + lastIndex = chunk.GetIndex() + break + } + } + + users, err := controller.BuildUsersFromChunks(chunks, lastIndex, sawLast) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := s.Backend().SyncUsers(r.Context(), users); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + common.SendProtoResponse(w, &common.Empty{}) +} diff --git a/controller/rpc/middleware.go b/controller/rpc/middleware.go index 9fefee0..bbcbfea 100644 --- a/controller/rpc/middleware.go +++ b/controller/rpc/middleware.go @@ -193,6 +193,7 @@ var backendMethods = map[string]bool{ "/service.NodeService/Stop": true, "/service.NodeService/SyncUser": true, "/service.NodeService/SyncUsers": true, + "/service.NodeService/SyncUsersChunked": true, "/service.NodeService/GetLogs": true, } diff --git a/controller/rpc/rpc_test.go b/controller/rpc/rpc_test.go index 877066e..ec9f084 100644 --- a/controller/rpc/rpc_test.go +++ b/controller/rpc/rpc_test.go @@ -260,6 +260,69 @@ func TestGRPC_SyncUsers(t *testing.T) { } } +func TestGRPC_SyncUsersChunked(t *testing.T) { + ctx, cancel := context.WithTimeout(sharedTestCtx.ctxWithSession, 10*time.Second) + defer cancel() + + stream, err := sharedTestCtx.client.SyncUsersChunked(ctx) + if err != nil { + t.Fatalf("Failed to open chunked sync stream: %v", err) + } + + firstChunk := &common.UsersChunk{ + Index: 0, + Users: []*common.User{ + { + Email: "chunk_user1@example.com", + Inbounds: []string{ + "VMESS TCP NOTLS", + "VLESS TCP REALITY", + }, + Proxies: &common.Proxy{ + Vmess: &common.Vmess{ + Id: uuid.New().String(), + }, + Vless: &common.Vless{ + Id: uuid.New().String(), + }, + }, + }, + }, + } + + secondChunk := &common.UsersChunk{ + Index: 1, + Users: []*common.User{ + { + Email: "chunk_user2@example.com", + Inbounds: []string{ + "Shadowsocks TCP", + "Shadowsocks UDP", + }, + Proxies: &common.Proxy{ + Shadowsocks: &common.Shadowsocks{ + Password: "try a random string", + Method: "aes-256-gcm", + }, + }, + }, + }, + Last: true, + } + + if err = stream.Send(firstChunk); err != nil { + t.Fatalf("Failed to send first chunk: %v", err) + } + + if err = stream.Send(secondChunk); err != nil { + t.Fatalf("Failed to send final chunk: %v", err) + } + + if _, err = stream.CloseAndRecv(); err != nil { + t.Fatalf("Failed to complete chunked sync: %v", err) + } +} + func TestGRPC_GetSpecificUserStats(t *testing.T) { ctx, cancel := context.WithTimeout(sharedTestCtx.ctxWithSession, 5*time.Second) defer cancel() @@ -361,4 +424,4 @@ func TestGRPC_KeepAliveTimeout(t *testing.T) { } else { t.Fatal("expected session ID error") } -} \ No newline at end of file +} diff --git a/controller/rpc/user.go b/controller/rpc/user.go index 0a28766..41063c8 100644 --- a/controller/rpc/user.go +++ b/controller/rpc/user.go @@ -3,6 +3,7 @@ package rpc import ( "context" "errors" + "io" "log" "google.golang.org/grpc" @@ -10,6 +11,7 @@ import ( "google.golang.org/grpc/status" "github.com/pasarguard/node/common" + "github.com/pasarguard/node/controller" ) func (s *Service) SyncUser(stream grpc.ClientStreamingServer[common.User, common.Empty]) error { @@ -39,3 +41,40 @@ func (s *Service) SyncUsers(ctx context.Context, users *common.Users) (*common.E return nil, nil } + +func (s *Service) SyncUsersChunked(stream grpc.ClientStreamingServer[common.UsersChunk, common.Empty]) error { + chunks := make(map[uint64][]*common.User) + var ( + lastIndex uint64 + sawLast bool + ) + + for { + chunk, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return status.Errorf(codes.Internal, "failed to receive chunk: %v", err) + } + + chunks[chunk.GetIndex()] = append(chunks[chunk.GetIndex()], chunk.GetUsers()...) + + if chunk.GetLast() { + sawLast = true + lastIndex = chunk.GetIndex() + break + } + } + + users, err := controller.BuildUsersFromChunks(chunks, lastIndex, sawLast) + if err != nil { + return status.Error(codes.InvalidArgument, err.Error()) + } + + if err := s.Backend().SyncUsers(stream.Context(), users); err != nil { + return status.Errorf(codes.Internal, "failed to sync users: %v", err) + } + + return stream.SendAndClose(&common.Empty{}) +} diff --git a/controller/users_chunk.go b/controller/users_chunk.go new file mode 100644 index 0000000..1daa228 --- /dev/null +++ b/controller/users_chunk.go @@ -0,0 +1,29 @@ +package controller + +import ( + "fmt" + + "github.com/pasarguard/node/common" +) + +// BuildUsersFromChunks orders chunked user payloads by their index and returns a single slice. +func BuildUsersFromChunks(chunks map[uint64][]*common.User, lastIndex uint64, sawLast bool) ([]*common.User, error) { + if !sawLast { + return nil, fmt.Errorf("missing final chunk indicator") + } + + users := make([]*common.User, 0) + for i := uint64(0); i <= lastIndex; i++ { + chunkUsers, ok := chunks[i] + if !ok { + return nil, fmt.Errorf("missing chunk index %d", i) + } + users = append(users, chunkUsers...) + } + + if len(users) == 0 { + return nil, fmt.Errorf("no users received") + } + + return users, nil +} diff --git a/controller/users_chunk_test.go b/controller/users_chunk_test.go new file mode 100644 index 0000000..f9e3c33 --- /dev/null +++ b/controller/users_chunk_test.go @@ -0,0 +1,48 @@ +package controller + +import ( + "strings" + "testing" + + "github.com/pasarguard/node/common" +) + +func TestBuildUsersFromChunksOrdersByIndex(t *testing.T) { + chunks := map[uint64][]*common.User{ + 1: { + {Email: "second"}, + }, + 0: { + {Email: "first"}, + }, + } + + users, err := BuildUsersFromChunks(chunks, 1, true) + if err != nil { + t.Fatalf("expected users to build successfully, got error: %v", err) + } + + if len(users) != 2 || users[0].GetEmail() != "first" || users[1].GetEmail() != "second" { + t.Fatalf("users not ordered by index: %#v", users) + } +} + +func TestBuildUsersFromChunksMissingLast(t *testing.T) { + _, err := BuildUsersFromChunks(map[uint64][]*common.User{}, 0, false) + if err == nil || !strings.Contains(err.Error(), "missing final chunk indicator") { + t.Fatalf("expected missing final chunk indicator error, got: %v", err) + } +} + +func TestBuildUsersFromChunksMissingChunk(t *testing.T) { + chunks := map[uint64][]*common.User{ + 1: { + {Email: "only"}, + }, + } + + _, err := BuildUsersFromChunks(chunks, 1, true) + if err == nil || !strings.Contains(err.Error(), "missing chunk index 0") { + t.Fatalf("expected missing chunk index error, got: %v", err) + } +} From 0fa31280ad14e35de720861d865485be56c3e777 Mon Sep 17 00:00:00 2001 From: Mohammad Date: Tue, 6 Jan 2026 11:11:17 +0330 Subject: [PATCH 2/2] try to fix tests --- backend/xray/user.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/xray/user.go b/backend/xray/user.go index a4400ff..9acde30 100644 --- a/backend/xray/user.go +++ b/backend/xray/user.go @@ -168,5 +168,8 @@ func (x *Xray) SyncUsers(_ context.Context, users []*common.User) error { if err := x.Restart(); err != nil { return err } + if err := x.checkXrayStatus(); err != nil { + return err + } return nil }