From cefbb4ee7450d6bbabd4fe95211c00805f0db380 Mon Sep 17 00:00:00 2001 From: Manoranjith Date: Thu, 17 Aug 2023 18:45:24 +0200 Subject: [PATCH 01/14] Add test for session config generation function Signed-off-by: Manoranjith --- cmd/perunnode/generate_test.go | 63 ++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 cmd/perunnode/generate_test.go diff --git a/cmd/perunnode/generate_test.go b/cmd/perunnode/generate_test.go new file mode 100644 index 00000000..7b1de11e --- /dev/null +++ b/cmd/perunnode/generate_test.go @@ -0,0 +1,63 @@ +// Copyright (c) 2023 - for information on the respective copyright owner +// see the NOTICE file and/or the repository at +// https://github.com/hyperledger-labs/perun-node +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "os" + "path/filepath" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGenerateSessionConfig(t *testing.T) { + // Create a temporary directory + tempDir, err := os.MkdirTemp("", "session_test") + require.NoError(t, err) + defer os.RemoveAll(tempDir) //nolint:errcheck + + // Change to the temporary directory + err = os.Chdir(tempDir) + require.NoError(t, err) + + // Now you can test the function with a clean temporary directory + err = generateSessionConfig() + require.NoError(t, err) + + // List of aliases for testing + aliasesList := []string{"alice", "bob", "api"} + + // Verify the directories and files were created as expected + for _, alias := range aliasesList { + dirPath := filepath.Join(tempDir, alias) + require.DirExists(t, dirPath) + + require.DirExists(t, filepath.Join(dirPath, "database")) + require.FileExists(t, filepath.Join(dirPath, "idprovider.yaml")) + require.DirExists(t, filepath.Join(dirPath, "keystore")) + + keystoreFiles, err := os.ReadDir(filepath.Join(dirPath, "keystore")) + require.NoError(t, err) + require.Len(t, keystoreFiles, 2) + for _, file := range keystoreFiles { + require.True(t, strings.HasPrefix(file.Name(), "UTC")) + } + + require.FileExists(t, filepath.Join(dirPath, "session.yaml")) + } +} From 5ba61933f742608f07580aa73c32971f10410729 Mon Sep 17 00:00:00 2001 From: Manoranjith Date: Thu, 17 Aug 2023 18:49:15 +0200 Subject: [PATCH 02/14] Fix bug in generateSessionConfig - The filesToMove array was not populated. Hence, the directories were empty. To fix, populate this array properly. - The idProvider.yml include only the own alias. To fix, modify the logic to include everything except own alias. Signed-off-by: Manoranjith --- cmd/perunnode/generate.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/cmd/perunnode/generate.go b/cmd/perunnode/generate.go index c463d3aa..98cae917 100644 --- a/cmd/perunnode/generate.go +++ b/cmd/perunnode/generate.go @@ -159,6 +159,7 @@ func generateSessionConfig() error { const count = 3 aliases := [count]string{aliceAlias, bobAlias, apiAlias} cfgs := [count]session.Config{} + peers := [count][]perun.PeerID{} providersFile := [count]string{} cfgFile := [count]string{} @@ -173,8 +174,17 @@ func generateSessionConfig() error { cfgs[i].User.Alias = aliases[i] } + for i := 0; i < count; i++ { // Everyone except the self is a peer. + peers[i] = make([]perun.PeerID, 0, count-1) + for j := 0; j < count; j++ { + if i != j { + peers[i] = append(peers[i], peerID(cfgs[j].User)) + } + } + } + for i := 0; i < count; i++ { - providersFile[i], err = idprovidertest.NewIDProvider(peerID(cfgs[i].User)) + providersFile[i], err = idprovidertest.NewIDProvider(peers[i]...) if err != nil { return err } @@ -190,10 +200,10 @@ func generateSessionConfig() error { // Move the artifacts to currenct directory. filesToMove := make(map[string]string) for i := 0; i < count; i++ { - cfgFile[i] = filepath.Join(aliases[i], sessionConfigFile) - providersFile[i] = filepath.Join(aliases[i], idProviderFile) - cfgs[i].DatabaseDir = filepath.Join(aliases[i], databaseDir) - cfgs[i].User.OnChainWallet.KeystorePath = filepath.Join(aliases[i], keystoreDir) + filesToMove[cfgFile[i]] = filepath.Join(aliases[i], sessionConfigFile) + filesToMove[providersFile[i]] = filepath.Join(aliases[i], idProviderFile) + filesToMove[cfgs[i].DatabaseDir] = filepath.Join(aliases[i], databaseDir) + filesToMove[cfgs[i].User.OnChainWallet.KeystorePath] = filepath.Join(aliases[i], keystoreDir) } return moveFiles(filesToMove) } From ee18207a05e36842eaa49218d135146ab5f5f073 Mon Sep 17 00:00:00 2001 From: Manoranjith Date: Thu, 17 Aug 2023 18:52:00 +0200 Subject: [PATCH 03/14] In Makefile: rm build; add install, generate cmds - Change build command to install. This installs the binaries in $GOBIN directory. - Add generate command that generates the artefacts for demo. Signed-off-by: Manoranjith --- Makefile | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index 4e649e38..febedc53 100644 --- a/Makefile +++ b/Makefile @@ -11,12 +11,19 @@ CLI_BIN := perunnodecli TUI_PKG := ./cmd/perunnodetui TUI_BIN := perunnodetui +DEMO_DIR := demo + LDFLAGS=-ldflags "-X 'main.version=$(VERSION)' -X 'main.gitCommitID=$(GIT_COMMIT_ID)' -X 'main.goperunVersion=$(GOPERUN_VERSION)'" -build: - go build $(LDFLAGS) $(NODE_PKG) - go build $(CLI_PKG) - go build $(TUI_PKG) +install: + go install $(LDFLAGS) $(NODE_PKG) + go install $(CLI_PKG) + go install $(TUI_PKG) + +generate: install + @mkdir $(DEMO_DIR) + @cd $(DEMO_DIR) && $(NODE_BIN) generate + @echo "Configuration files for demo generated in ./$(DEMO_DIR)" clean: - rm -rf $(NODE_BIN) $(CLI_BIN) $(TUI_BIN) node.yaml alice bob + rm -rf demo From f57b6c8d3a60e0cd52c4111f6b5288f5c1b397ad Mon Sep 17 00:00:00 2001 From: Manoranjith Date: Thu, 17 Aug 2023 18:54:20 +0200 Subject: [PATCH 04/14] Add demo directory to .gitignore Signed-off-by: Manoranjith --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 3ec2acd6..2bcf7eea 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ *~ app/payment/perun.log +demo From 118ecdb8000712c7418b2ea677280a70daf6c5ba Mon Sep 17 00:00:00 2001 From: Manoranjith Date: Thu, 14 Sep 2023 04:45:53 +0200 Subject: [PATCH 05/14] Fix parsing of blocktimeout Signed-off-by: Manoranjith --- api/grpc/pb/sdktypes.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/api/grpc/pb/sdktypes.go b/api/grpc/pb/sdktypes.go index 51f9bc02..98c8c107 100644 --- a/api/grpc/pb/sdktypes.go +++ b/api/grpc/pb/sdktypes.go @@ -374,9 +374,13 @@ func fromAdjudicatorEventBase(event *pchannel.AdjudicatorEventBase) (protoEvent // license and cannot be used in the perun-node project, // outside of ethereum adapter. // TODO: Validate if number is less than int64max before type casting. - val := reflect.ValueOf(event.TimeoutV).FieldByName("Time") - protoEvent.Timeout.Sec = int64(val.Uint()) - protoEvent.Timeout.Type = AdjudicatorEventBase_ethBlock + + timeoutValue := reflect.ValueOf(event.TimeoutV).Elem() + time := timeoutValue.FieldByName("Time") + if time.IsValid() { + protoEvent.Timeout.Sec = int64(time.Uint()) + protoEvent.Timeout.Type = AdjudicatorEventBase_ethBlock + } } return protoEvent } From bd75dbe182a8d0f0a7056c0ea2cf0dc12a6ccdda Mon Sep 17 00:00:00 2001 From: Manoranjith Date: Thu, 14 Sep 2023 05:06:35 +0200 Subject: [PATCH 06/14] Initialize and check for entries in subscribes map - Previously, the nested map was not initialized. This caused a panic when Register was invoked. - Initializing the map fixes this it. Signed-off-by: Manoranjith --- api/grpc/funding.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/api/grpc/funding.go b/api/grpc/funding.go index f10deac5..446d46f2 100644 --- a/api/grpc/funding.go +++ b/api/grpc/funding.go @@ -234,6 +234,9 @@ func (a *fundingServer) Subscribe(req *pb.SubscribeReq, stream pb.Funding_API_Su } a.Lock() + if a.subscribes[req.SessionID] == nil { + a.subscribes[req.SessionID] = make(map[pchannel.ID]pchannel.AdjudicatorSubscription) + } a.subscribes[req.SessionID][chID] = adjSub a.Unlock() @@ -279,12 +282,18 @@ func (a *fundingServer) Unsubscribe(_ context.Context, req *pb.UnsubscribeReq) ( copy(chID[:], req.ChID) a.Lock() - adjSub := a.subscribes[req.SessionID][chID] + if _, ok := a.subscribes[req.SessionID]; !ok { + return errResponse(perun.NewAPIErrUnknownInternal(errors.New("unknown session id"))), nil + } + adjSub, ok := a.subscribes[req.SessionID][chID] + if !ok { + return errResponse(perun.NewAPIErrUnknownInternal(errors.New("unknown channel id"))), nil + } delete(a.subscribes[req.SessionID], chID) a.Unlock() if err := adjSub.Close(); err != nil { - return errResponse(perun.NewAPIErrUnknownInternal(errors.WithMessage(err, "retrieving session"))), nil + return errResponse(perun.NewAPIErrUnknownInternal(errors.WithMessage(err, "closing sub"))), nil } return &pb.UnsubscribeResp{ From 6da6dfb1acefb7cd11aec8a38af5f9a9c0b2ba26 Mon Sep 17 00:00:00 2001 From: manoranjith Date: Sun, 10 Dec 2023 14:37:52 +0100 Subject: [PATCH 07/14] Register asset only if not ETH - ETH asset is registered by default already during the initialization fo the funder. Signed-off-by: manoranjith --- session/session.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/session/session.go b/session/session.go index b21bbd88..65f15185 100644 --- a/session/session.go +++ b/session/session.go @@ -40,6 +40,7 @@ import ( "github.com/hyperledger-labs/perun-node/blockchain/ethereum" "github.com/hyperledger-labs/perun-node/comm/tcp" "github.com/hyperledger-labs/perun-node/comm/tcp/tcptest" + "github.com/hyperledger-labs/perun-node/currency" "github.com/hyperledger-labs/perun-node/idprovider" "github.com/hyperledger-labs/perun-node/idprovider/local" "github.com/hyperledger-labs/perun-node/log" @@ -619,7 +620,7 @@ func updateAssetsInFunder(currs []perun.Currency, contractRegistry perun.ROContr // So, all assets will be present in the registry, for i := range currs { asset, _ := contractRegistry.Asset(currs[i].Symbol()) - if !f.IsAssetRegistered(asset) { + if !f.IsAssetRegistered(asset) && currs[i].Symbol() != currency.ETHSymbol { token, _ := contractRegistry.Token(currs[i].Symbol()) f.RegisterAssetERC20(asset, token, onChainAcc) } From 7a3b3b3cc08eedcd4ef24099ec0cbaca098d0d4a Mon Sep 17 00:00:00 2001 From: manoranjith Date: Sun, 10 Dec 2023 14:41:11 +0100 Subject: [PATCH 08/14] Add a test for printing test private keys & addrs - The key can be used for initializing pre-funded accounts in ganache-cli or for initializing the same keys for tests in alternate clients (like rust implementation). Signed-off-by: manoranjith --- session/sessiontest/config_test.go | 57 ++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 session/sessiontest/config_test.go diff --git a/session/sessiontest/config_test.go b/session/sessiontest/config_test.go new file mode 100644 index 00000000..7db98a7b --- /dev/null +++ b/session/sessiontest/config_test.go @@ -0,0 +1,57 @@ +// Copyright (c) 2020 - for information on the respective copyright owner +// see the NOTICE file and/or the repository at +// https://github.com/hyperledger-labs/perun-node +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sessiontest_test + +import ( + "fmt" + "math/rand" + "testing" + + "github.com/hyperledger-labs/perun-node/blockchain/ethereum/ethereumtest" + + "github.com/ethereum/go-ethereum/accounts" + "github.com/ethereum/go-ethereum/accounts/keystore" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" + pethwallet "perun.network/go-perun/backend/ethereum/wallet" +) + +// TestPrintPrivateKeys function is used to the print the private key of +// on-chain accounts for n users used in tests. +// +// These private keys are used for initializing ganache-cli node with pre-funded accounts. +func TestPrintPrivateKeys(t *testing.T) { + prng := rand.New(rand.NewSource(ethereumtest.RandSeedForTestAccs)) + + count := uint(3) + count *= 2 + ws, err := ethereumtest.NewWalletSetup(prng, count) + require.NoError(t, err) + password := "" + fmt.Printf("\nNo Address Private Key\n\n") + for i := uint(0); i < count; i = i + 2 { + acc := ws.Accs[i] + + addr := common.Address(*(acc.Address()).(*pethwallet.Address)) + keyJSON, err := ws.Keystore.Export(accounts.Account{Address: addr}, password, password) + require.NoError(t, err) + + key, err := keystore.DecryptKey(keyJSON, password) + require.NoError(t, err) + fmt.Printf("%d:\t 0x%s: 0x%X\n", (i+2)/2, acc.Address(), key.PrivateKey.D.Bytes()) + } +} From 427889b9708a4b4be98e1dd7192989828b3a19b8 Mon Sep 17 00:00:00 2001 From: Manoranjith Date: Thu, 14 Sep 2023 05:56:15 +0200 Subject: [PATCH 09/14] Option for choosing payment or fundwatch mode - Add a flag to run command to specify the service to run. - Split the serve function in gprc package. Signed-off-by: Manoranjith --- api/grpc/payment_integ_test.go | 4 ++-- api/grpc/server.go | 18 +++++++++++++++--- cmd/perunnode/run.go | 28 ++++++++++++++++++++++++---- 3 files changed, 41 insertions(+), 9 deletions(-) diff --git a/api/grpc/payment_integ_test.go b/api/grpc/payment_integ_test.go index 2a14649a..53d3620f 100644 --- a/api/grpc/payment_integ_test.go +++ b/api/grpc/payment_integ_test.go @@ -60,9 +60,9 @@ func StartServer(t *testing.T, nodeCfg perun.NodeConfig, grpcPort string) { nodeAPI, err := node.New(nodeCfg) require.NoErrorf(t, err, "initializing nodeAPI") - t.Log("Started ListenAndServePayChAPI") + t.Log("Started grpc service") go func() { - if err := grpc.ListenAndServePayChAPI(nodeAPI, grpcPort); err != nil { + if err := grpc.ServePaymentAPI(nodeAPI, grpcPort); err != nil { t.Logf("server returned with error: %v", err) } }() diff --git a/api/grpc/server.go b/api/grpc/server.go index 641b0af1..d4f400c7 100644 --- a/api/grpc/server.go +++ b/api/grpc/server.go @@ -27,15 +27,28 @@ import ( "github.com/hyperledger-labs/perun-node/api/grpc/pb" ) -// ListenAndServePayChAPI starts a payment channel API server that listens for incoming grpc +// ServePaymentAPI starts a payment channel API server that listens for incoming grpc // requests at the specified address and serves those requests using the node API instance. -func ListenAndServePayChAPI(n perun.NodeAPI, grpcPort string) error { +func ServePaymentAPI(n perun.NodeAPI, grpcPort string) error { paymentChServer := &payChAPIServer{ n: n, chProposalsNotif: make(map[string]chan bool), chUpdatesNotif: make(map[string]map[string]chan bool), } + listener, err := net.Listen("tcp", grpcPort) + if err != nil { + return errors.Wrap(err, "starting listener") + } + grpcServer := grpclib.NewServer() + pb.RegisterPayment_APIServer(grpcServer, paymentChServer) + + return grpcServer.Serve(listener) +} + +// ServeFundingWatchingAPI starts a payment channel API server that listens for incoming grpc +// requests at the specified address and serves those requests using the node API instance. +func ServeFundingWatchingAPI(n perun.NodeAPI, grpcPort string) error { fundingServer := &fundingServer{ n: n, subscribes: make(map[string]map[pchannel.ID]pchannel.AdjudicatorSubscription), @@ -50,7 +63,6 @@ func ListenAndServePayChAPI(n perun.NodeAPI, grpcPort string) error { return errors.Wrap(err, "starting listener") } grpcServer := grpclib.NewServer() - pb.RegisterPayment_APIServer(grpcServer, paymentChServer) pb.RegisterFunding_APIServer(grpcServer, fundingServer) pb.RegisterWatching_APIServer(grpcServer, watchingServer) diff --git a/cmd/perunnode/run.go b/cmd/perunnode/run.go index e8359e1d..ce18836a 100644 --- a/cmd/perunnode/run.go +++ b/cmd/perunnode/run.go @@ -43,10 +43,12 @@ const ( responsetimeoutF = "responsetimeout" configfileF = "configfile" // can only be specified in flag, not via config file. grpcPortF = "grpcport" // can only be specified in flag, not via config file. + serviceF = "service" // can only be specified in flag, not via config file. // default values for flags in run command. defaultConfigFile = "node.yaml" defaultGrpcPort = 50001 + defaultService = "payment" ) var ( @@ -98,6 +100,7 @@ func init() { func defineFlags() { runCmd.Flags().String(configfileF, defaultConfigFile, "node config file") runCmd.Flags().Uint64(grpcPortF, defaultGrpcPort, "port for grpc payment channel API server to listen") + runCmd.Flags().String(serviceF, defaultService, "service to be enabled (payment or fundwatch)") // Default values of all these flags should be zero, as their only purpose is to allow the user to // explicitly specify the configuration. @@ -141,10 +144,27 @@ func run(cmd *cobra.Command, _ []string) { return } - fmt.Printf("Running perun node with the below config:\n%s.\n\nServing payment channel API via grpc at port %s\n\n", - prettify(nodeCfg), grpcAddr) - if err := grpc.ListenAndServePayChAPI(nodeAPI, grpcAddr); err != nil { - fmt.Printf("Server returned with error: %v\n", err) + service, err := cmd.Flags().GetString(serviceF) + if err != nil { + panic("unknown flag service\n") + } + switch service { + case "payment": + fmt.Printf("Running perun node with the below config:\n%s.\n\nServing payment channel API via grpc at port %s\n\n", + prettify(nodeCfg), grpcAddr) + if err := grpc.ServePaymentAPI(nodeAPI, grpcAddr); err != nil { + fmt.Printf("Server returned with error: %v\n", err) + } + case "fundwatch": + fmt.Printf( + "Running perun node with the below config:\n%s.\n\nServing funding and watching API via grpc at port %s\n\n", + prettify(nodeCfg), grpcAddr) + if err := grpc.ServeFundingWatchingAPI(nodeAPI, grpcAddr); err != nil { + fmt.Printf("Server returned with error: %v\n", err) + } + default: + panic("invalid value for service flag\n") + } } From 23d16c5827afbdc9587cc8a81c2ad72a1a6c3ffc Mon Sep 17 00:00:00 2001 From: Manoranjith Date: Fri, 22 Sep 2023 23:07:11 +0200 Subject: [PATCH 10/14] Serve payment API even in fundwatch only mode - This is a temporary workaround to enable the existing version of perunnode-cli to connect with the API in fundwatch only mode to generate a session-id by opening a session. - Once, the perunnode-cli is updated, this can be undone. Signed-off-by: Manoranjith --- api/grpc/server.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/api/grpc/server.go b/api/grpc/server.go index d4f400c7..e442046d 100644 --- a/api/grpc/server.go +++ b/api/grpc/server.go @@ -49,6 +49,11 @@ func ServePaymentAPI(n perun.NodeAPI, grpcPort string) error { // ServeFundingWatchingAPI starts a payment channel API server that listens for incoming grpc // requests at the specified address and serves those requests using the node API instance. func ServeFundingWatchingAPI(n perun.NodeAPI, grpcPort string) error { + paymentChServer := &payChAPIServer{ + n: n, + chProposalsNotif: make(map[string]chan bool), + chUpdatesNotif: make(map[string]map[string]chan bool), + } fundingServer := &fundingServer{ n: n, subscribes: make(map[string]map[pchannel.ID]pchannel.AdjudicatorSubscription), @@ -65,6 +70,7 @@ func ServeFundingWatchingAPI(n perun.NodeAPI, grpcPort string) error { grpcServer := grpclib.NewServer() pb.RegisterFunding_APIServer(grpcServer, fundingServer) pb.RegisterWatching_APIServer(grpcServer, watchingServer) + pb.RegisterPayment_APIServer(grpcServer, paymentChServer) return grpcServer.Serve(listener) } From e1cd68249a3254b487648aac3430d952836ca706 Mon Sep 17 00:00:00 2001 From: Manoranjith Date: Sat, 23 Sep 2023 00:58:32 +0200 Subject: [PATCH 11/14] Extract handlers for funding server to a pkg - The extracted handlers use in golang standard context, protobuf request/response messages. - Hence, these can be shared between grpc and other upcoming implementations (like peruniotcp server). Signed-off-by: Manoranjith --- api/grpc/funding.go | 252 +++------------------------------ api/grpc/server.go | 7 +- api/handlers/doc.go | 19 +++ api/handlers/funding.go | 302 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 342 insertions(+), 238 deletions(-) create mode 100644 api/handlers/doc.go create mode 100644 api/handlers/funding.go diff --git a/api/grpc/funding.go b/api/grpc/funding.go index 446d46f2..04796f73 100644 --- a/api/grpc/funding.go +++ b/api/grpc/funding.go @@ -18,285 +18,65 @@ package grpc import ( "context" - "fmt" - "github.com/pkg/errors" - pchannel "perun.network/go-perun/channel" - psync "polycry.pt/poly-go/sync" - - "github.com/hyperledger-labs/perun-node" "github.com/hyperledger-labs/perun-node/api/grpc/pb" + "github.com/hyperledger-labs/perun-node/api/handlers" ) // fundingServer represents a grpc server that can serve funding API. type fundingServer struct { pb.UnimplementedFunding_APIServer - n perun.NodeAPI - - // The mutex should be used when accessing the map data structures. - psync.Mutex - subscribes map[string]map[pchannel.ID]pchannel.AdjudicatorSubscription + *handlers.FundingHandler } // Fund wraps session.Fund. -func (a *fundingServer) Fund(ctx context.Context, grpcReq *pb.FundReq) (*pb.FundResp, error) { - errResponse := func(err perun.APIError) *pb.FundResp { - return &pb.FundResp{ - Error: pb.FromError(err), - } - } - - sess, apiErr := a.n.GetSession(grpcReq.SessionID) - if apiErr != nil { - return errResponse(apiErr), nil - } - req, err := pb.ToFundingReq(grpcReq) - if err != nil { - return errResponse(perun.NewAPIErrUnknownInternal(err)), nil - } - - err = sess.Fund(ctx, req) - if err != nil { - return errResponse(perun.NewAPIErrUnknownInternal(err)), nil - } - - return &pb.FundResp{ - Error: nil, - }, nil +func (a *fundingServer) Fund(ctx context.Context, req *pb.FundReq) (*pb.FundResp, error) { + return a.FundingHandler.Fund(ctx, req) } // RegisterAssetERC20 is a stub that always returns false. Because, the remote // funder does not support use of assets other than the default ERC20 asset. // // TODO: Make actual implementation. -func (a *payChAPIServer) RegisterAssetERC20(_ context.Context, _ *pb.RegisterAssetERC20Req) ( +func (a *fundingServer) RegisterAssetERC20(ctx context.Context, req *pb.RegisterAssetERC20Req) ( *pb.RegisterAssetERC20Resp, error, ) { - return &pb.RegisterAssetERC20Resp{ - MsgSuccess: false, - }, nil + return a.FundingHandler.RegisterAssetERC20(ctx, req) } // IsAssetRegistered wraps session.IsAssetRegistered. -func (a *payChAPIServer) IsAssetRegistered(_ context.Context, req *pb.IsAssetRegisteredReq) ( +func (a *fundingServer) IsAssetRegistered(ctx context.Context, req *pb.IsAssetRegisteredReq) ( *pb.IsAssetRegisteredResp, error, ) { - errResponse := func(err perun.APIError) *pb.IsAssetRegisteredResp { - return &pb.IsAssetRegisteredResp{ - Response: &pb.IsAssetRegisteredResp_Error{ - Error: pb.FromError(err), - }, - } - } - - sess, err := a.n.GetSession(req.SessionID) - if err != nil { - return errResponse(err), nil - } - asset := pchannel.NewAsset() - err2 := asset.UnmarshalBinary(req.Asset) - if err2 != nil { - err = perun.NewAPIErrInvalidArgument(err2, "asset", fmt.Sprintf("%x", req.Asset)) - return errResponse(err), nil - } - - isRegistered := sess.IsAssetRegistered(asset) - - return &pb.IsAssetRegisteredResp{ - Response: &pb.IsAssetRegisteredResp_MsgSuccess_{ - MsgSuccess: &pb.IsAssetRegisteredResp_MsgSuccess{ - IsRegistered: isRegistered, - }, - }, - }, nil + return a.FundingHandler.IsAssetRegistered(ctx, req) } // Register wraps session.Register. func (a *fundingServer) Register(ctx context.Context, req *pb.RegisterReq) (*pb.RegisterResp, error) { - errResponse := func(err perun.APIError) *pb.RegisterResp { - return &pb.RegisterResp{ - Error: pb.FromError(err), - } - } - - sess, err := a.n.GetSession(req.SessionID) - if err != nil { - return errResponse(err), nil - } - adjReq, err2 := pb.ToAdjReq(req.AdjReq) - if err2 != nil { - return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil - } - signedStates := make([]pchannel.SignedState, len(req.SignedStates)) - for i := range signedStates { - signedStates[i], err2 = pb.ToSignedState(req.SignedStates[i]) - if err2 != nil { - return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil - } - } - - err2 = sess.Register(ctx, adjReq, signedStates) - if err2 != nil { - return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil - } - - return &pb.RegisterResp{ - Error: nil, - }, nil + return a.FundingHandler.Register(ctx, req) } // Withdraw wraps session.Withdraw. func (a *fundingServer) Withdraw(ctx context.Context, req *pb.WithdrawReq) (*pb.WithdrawResp, error) { - errResponse := func(err perun.APIError) *pb.WithdrawResp { - return &pb.WithdrawResp{ - Error: pb.FromError(err), - } - } - - sess, err := a.n.GetSession(req.SessionID) - if err != nil { - return errResponse(err), nil - } - adjReq, err2 := pb.ToAdjReq(req.AdjReq) - if err2 != nil { - return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil - } - stateMap := pchannel.StateMap(make(map[pchannel.ID]*pchannel.State)) - - for i := range req.StateMap { - var id pchannel.ID - copy(id[:], req.StateMap[i].Id) - stateMap[id], err2 = pb.ToState(req.StateMap[i].State) - if err2 != nil { - return errResponse(err), nil - } - } - - err2 = sess.Withdraw(ctx, adjReq, stateMap) - if err2 != nil { - return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil - } - - return &pb.WithdrawResp{ - Error: nil, - }, nil + return a.FundingHandler.Withdraw(ctx, req) } // Progress wraps session.Progress. func (a *fundingServer) Progress(ctx context.Context, req *pb.ProgressReq) (*pb.ProgressResp, error) { - errResponse := func(err perun.APIError) *pb.ProgressResp { - return &pb.ProgressResp{ - Error: pb.FromError(err), - } - } - - sess, err := a.n.GetSession(req.SessionID) - if err != nil { - return errResponse(err), nil - } - var progReq perun.ProgressReq - var err2 error - progReq.AdjudicatorReq, err2 = pb.ToAdjReq(req.AdjReq) - if err2 != nil { - return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil - } - progReq.NewState, err2 = pb.ToState(req.NewState) - if err2 != nil { - return errResponse(err), nil - } - copy(progReq.Sig, req.Sig) - - err2 = sess.Progress(ctx, progReq) - if err2 != nil { - return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil - } - - return &pb.ProgressResp{ - Error: nil, - }, nil + return a.FundingHandler.Progress(ctx, req) } // Subscribe wraps session.Subscribe. func (a *fundingServer) Subscribe(req *pb.SubscribeReq, stream pb.Funding_API_SubscribeServer) error { - sess, err := a.n.GetSession(req.SessionID) - if err != nil { - return errors.WithMessage(err, "retrieving session") + notify := func(notif *pb.SubscribeResp) error { + return stream.Send(notif) } - var chID pchannel.ID - copy(chID[:], req.ChID) - - adjSub, err := sess.Subscribe(context.Background(), chID) - if err != nil { - return errors.WithMessage(err, "setting up subscription") - } - - a.Lock() - if a.subscribes[req.SessionID] == nil { - a.subscribes[req.SessionID] = make(map[pchannel.ID]pchannel.AdjudicatorSubscription) - } - a.subscribes[req.SessionID][chID] = adjSub - a.Unlock() - - // This stream is anyways closed when StopWatching is called for. - // Hence, that will act as the exit condition for the loop. - go func() { - // will return nil, when the sub is closed. - // so, we need a mechanism to call close on the server side. - // so, add a call Unsubscribe, which simply calls close. - for { - adjEvent := adjSub.Next() - if adjEvent == nil { - err := errors.WithMessage(adjSub.Err(), "sub closed with error") - notif := &pb.SubscribeResp_Error{ - Error: pb.FromError(perun.NewAPIErrUnknownInternal(err)), - } - // TODO: Proper error handling. For now, ignore this error. - _ = stream.Send(&pb.SubscribeResp{Response: notif}) //nolint: errcheck - return - } - notif, err := pb.SubscribeResponseFromAdjEvent(adjEvent) - if err != nil { - return - } - err = stream.Send(notif) - if err != nil { - return - } - } - }() - - return nil + return a.FundingHandler.Subscribe(req, notify) } -func (a *fundingServer) Unsubscribe(_ context.Context, req *pb.UnsubscribeReq) (*pb.UnsubscribeResp, error) { - errResponse := func(err perun.APIError) *pb.UnsubscribeResp { - return &pb.UnsubscribeResp{ - Error: pb.FromError(err), - } - } - - var chID pchannel.ID - copy(chID[:], req.ChID) - - a.Lock() - if _, ok := a.subscribes[req.SessionID]; !ok { - return errResponse(perun.NewAPIErrUnknownInternal(errors.New("unknown session id"))), nil - } - adjSub, ok := a.subscribes[req.SessionID][chID] - if !ok { - return errResponse(perun.NewAPIErrUnknownInternal(errors.New("unknown channel id"))), nil - } - delete(a.subscribes[req.SessionID], chID) - a.Unlock() - - if err := adjSub.Close(); err != nil { - return errResponse(perun.NewAPIErrUnknownInternal(errors.WithMessage(err, "closing sub"))), nil - } - - return &pb.UnsubscribeResp{ - Error: nil, - }, nil +func (a *fundingServer) Unsubscribe(ctx context.Context, req *pb.UnsubscribeReq) (*pb.UnsubscribeResp, error) { + return a.FundingHandler.Unsubscribe(ctx, req) } diff --git a/api/grpc/server.go b/api/grpc/server.go index e442046d..d535e924 100644 --- a/api/grpc/server.go +++ b/api/grpc/server.go @@ -25,6 +25,7 @@ import ( "github.com/hyperledger-labs/perun-node" "github.com/hyperledger-labs/perun-node/api/grpc/pb" + "github.com/hyperledger-labs/perun-node/api/handlers" ) // ServePaymentAPI starts a payment channel API server that listens for incoming grpc @@ -55,8 +56,10 @@ func ServeFundingWatchingAPI(n perun.NodeAPI, grpcPort string) error { chUpdatesNotif: make(map[string]map[string]chan bool), } fundingServer := &fundingServer{ - n: n, - subscribes: make(map[string]map[pchannel.ID]pchannel.AdjudicatorSubscription), + FundingHandler: &handlers.FundingHandler{ + N: n, + Subscribes: make(map[string]map[pchannel.ID]pchannel.AdjudicatorSubscription), + }, } watchingServer := &watchingServer{ n: n, diff --git a/api/handlers/doc.go b/api/handlers/doc.go new file mode 100644 index 00000000..31acb8cf --- /dev/null +++ b/api/handlers/doc.go @@ -0,0 +1,19 @@ +// Copyright (c) 2023 - for information on the respective copyright owner +// see the NOTICE file and/or the repository at +// https://github.com/hyperledger-labs/perun-node +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package handlers implements a the handlers for payment, funding and watching API. +// This is used by different network adapters such as grpc and tcp. +package handlers diff --git a/api/handlers/funding.go b/api/handlers/funding.go new file mode 100644 index 00000000..d9e17eeb --- /dev/null +++ b/api/handlers/funding.go @@ -0,0 +1,302 @@ +// Copyright (c) 2023 - for information on the respective copyright owner +// see the NOTICE file and/or the repository at +// https://github.com/hyperledger-labs/perun-node +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handlers + +import ( + "context" + "fmt" + + pchannel "perun.network/go-perun/channel" + psync "polycry.pt/poly-go/sync" + + "github.com/pkg/errors" + + "github.com/hyperledger-labs/perun-node" + "github.com/hyperledger-labs/perun-node/api/grpc/pb" +) + +// FundingHandler represents a grpc server that can serve funding API. +type FundingHandler struct { + N perun.NodeAPI + + // The mutex should be used when accessing the map data structures. + psync.Mutex + Subscribes map[string]map[pchannel.ID]pchannel.AdjudicatorSubscription +} + +// Fund wraps session.Fund. +func (a *FundingHandler) Fund(ctx context.Context, req *pb.FundReq) (*pb.FundResp, error) { + errResponse := func(err perun.APIError) *pb.FundResp { + return &pb.FundResp{ + Error: pb.FromError(err), + } + } + + sess, apiErr := a.N.GetSession(req.SessionID) + if apiErr != nil { + return errResponse(apiErr), nil + } + fundingReq, err := pb.ToFundingReq(req) + if err != nil { + return errResponse(perun.NewAPIErrUnknownInternal(err)), nil + } + + err = sess.Fund(ctx, fundingReq) + if err != nil { + return errResponse(perun.NewAPIErrUnknownInternal(err)), nil + } + + return &pb.FundResp{ + Error: nil, + }, nil +} + +// RegisterAssetERC20 is a stub that always returns false. Because, the remote +// funder does not support use of assets other than the default ERC20 asset. +// +// TODO: Make actual implementation. +func (a *FundingHandler) RegisterAssetERC20(_ context.Context, _ *pb.RegisterAssetERC20Req) ( + *pb.RegisterAssetERC20Resp, error, +) { + return &pb.RegisterAssetERC20Resp{ + MsgSuccess: false, + }, nil +} + +// IsAssetRegistered wraps session.IsAssetRegistered. +func (a *FundingHandler) IsAssetRegistered(_ context.Context, req *pb.IsAssetRegisteredReq) ( + *pb.IsAssetRegisteredResp, + error, +) { + errResponse := func(err perun.APIError) *pb.IsAssetRegisteredResp { + return &pb.IsAssetRegisteredResp{ + Response: &pb.IsAssetRegisteredResp_Error{ + Error: pb.FromError(err), + }, + } + } + + sess, err := a.N.GetSession(req.SessionID) + if err != nil { + return errResponse(err), nil + } + asset := pchannel.NewAsset() + err2 := asset.UnmarshalBinary(req.Asset) + if err2 != nil { + err = perun.NewAPIErrInvalidArgument(err2, "asset", fmt.Sprintf("%x", req.Asset)) + return errResponse(err), nil + } + + isRegistered := sess.IsAssetRegistered(asset) + + return &pb.IsAssetRegisteredResp{ + Response: &pb.IsAssetRegisteredResp_MsgSuccess_{ + MsgSuccess: &pb.IsAssetRegisteredResp_MsgSuccess{ + IsRegistered: isRegistered, + }, + }, + }, nil +} + +// Register wraps session.Register. +func (a *FundingHandler) Register(ctx context.Context, req *pb.RegisterReq) (*pb.RegisterResp, error) { + errResponse := func(err perun.APIError) *pb.RegisterResp { + return &pb.RegisterResp{ + Error: pb.FromError(err), + } + } + + sess, err := a.N.GetSession(req.SessionID) + if err != nil { + return errResponse(err), nil + } + adjReq, err2 := pb.ToAdjReq(req.AdjReq) + if err2 != nil { + return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil + } + signedStates := make([]pchannel.SignedState, len(req.SignedStates)) + for i := range signedStates { + signedStates[i], err2 = pb.ToSignedState(req.SignedStates[i]) + if err2 != nil { + return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil + } + } + + err2 = sess.Register(ctx, adjReq, signedStates) + if err2 != nil { + return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil + } + + return &pb.RegisterResp{ + Error: nil, + }, nil +} + +// Withdraw wraps session.Withdraw. +func (a *FundingHandler) Withdraw(ctx context.Context, req *pb.WithdrawReq) (*pb.WithdrawResp, error) { + errResponse := func(err perun.APIError) *pb.WithdrawResp { + return &pb.WithdrawResp{ + Error: pb.FromError(err), + } + } + + sess, err := a.N.GetSession(req.SessionID) + if err != nil { + return errResponse(err), nil + } + adjReq, err2 := pb.ToAdjReq(req.AdjReq) + if err2 != nil { + return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil + } + stateMap := pchannel.StateMap(make(map[pchannel.ID]*pchannel.State)) + + for i := range req.StateMap { + var id pchannel.ID + copy(id[:], req.StateMap[i].Id) + stateMap[id], err2 = pb.ToState(req.StateMap[i].State) + if err2 != nil { + return errResponse(err), nil + } + } + + err2 = sess.Withdraw(ctx, adjReq, stateMap) + if err2 != nil { + return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil + } + + return &pb.WithdrawResp{ + Error: nil, + }, nil +} + +// Progress wraps session.Progress. +func (a *FundingHandler) Progress(ctx context.Context, req *pb.ProgressReq) (*pb.ProgressResp, error) { + errResponse := func(err perun.APIError) *pb.ProgressResp { + return &pb.ProgressResp{ + Error: pb.FromError(err), + } + } + + sess, err := a.N.GetSession(req.SessionID) + if err != nil { + return errResponse(err), nil + } + var progReq perun.ProgressReq + var err2 error + progReq.AdjudicatorReq, err2 = pb.ToAdjReq(req.AdjReq) + if err2 != nil { + return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil + } + progReq.NewState, err2 = pb.ToState(req.NewState) + if err2 != nil { + return errResponse(err), nil + } + copy(progReq.Sig, req.Sig) + + err2 = sess.Progress(ctx, progReq) + if err2 != nil { + return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil + } + + return &pb.ProgressResp{ + Error: nil, + }, nil +} + +// Subscribe wraps session.Subscribe. +func (a *FundingHandler) Subscribe(req *pb.SubscribeReq, notify func(notif *pb.SubscribeResp) error) error { + sess, err := a.N.GetSession(req.SessionID) + if err != nil { + return errors.WithMessage(err, "retrieving session") + } + + var chID pchannel.ID + copy(chID[:], req.ChID) + + adjSub, err := sess.Subscribe(context.Background(), chID) + if err != nil { + return errors.WithMessage(err, "setting up subscription") + } + + a.Lock() + if a.Subscribes[req.SessionID] == nil { + a.Subscribes[req.SessionID] = make(map[pchannel.ID]pchannel.AdjudicatorSubscription) + } + a.Subscribes[req.SessionID][chID] = adjSub + a.Unlock() + + // This stream is anyways closed when StopWatching is called for. + // Hence, that will act as the exit condition for the loop. + go func() { + // will return nil, when the sub is closed. + // so, we need a mechanism to call close on the server side. + // so, add a call Unsubscribe, which simply calls close. + for { + adjEvent := adjSub.Next() + if adjEvent == nil { + err := errors.WithMessage(adjSub.Err(), "sub closed with error") + notif := &pb.SubscribeResp_Error{ + Error: pb.FromError(perun.NewAPIErrUnknownInternal(err)), + } + // TODO: Proper error handling. For now, ignore this error. + _ = notify(&pb.SubscribeResp{Response: notif}) //nolint: errcheck + return + } + notif, err := pb.SubscribeResponseFromAdjEvent(adjEvent) + if err != nil { + return + } + err = notify(notif) + if err != nil { + return + } + } + }() + + return nil +} + +// Unsubscribe wraps session.Unsubscribe. +func (a *FundingHandler) Unsubscribe(_ context.Context, req *pb.UnsubscribeReq) (*pb.UnsubscribeResp, error) { + errResponse := func(err perun.APIError) *pb.UnsubscribeResp { + return &pb.UnsubscribeResp{ + Error: pb.FromError(err), + } + } + + var chID pchannel.ID + copy(chID[:], req.ChID) + + a.Lock() + if _, ok := a.Subscribes[req.SessionID]; !ok { + return errResponse(perun.NewAPIErrUnknownInternal(errors.New("unknown session id"))), nil + } + adjSub, ok := a.Subscribes[req.SessionID][chID] + if !ok { + return errResponse(perun.NewAPIErrUnknownInternal(errors.New("unknown channel id"))), nil + } + delete(a.Subscribes[req.SessionID], chID) + a.Unlock() + + if err := adjSub.Close(); err != nil { + return errResponse(perun.NewAPIErrUnknownInternal(errors.WithMessage(err, "closing sub"))), nil + } + + return &pb.UnsubscribeResp{ + Error: nil, + }, nil +} From 1a608c74b01403655d35fda97eb302bb991feba8 Mon Sep 17 00:00:00 2001 From: Manoranjith Date: Sun, 24 Sep 2023 23:04:49 +0200 Subject: [PATCH 12/14] Extract handlers for watching server to a pkg - The extracted handlers use in golang standard context, protobuf request/response messages. - Hence, these can be shared between grpc and other upcoming implementations (like peruniotcp server). Signed-off-by: Manoranjith --- api/grpc/server.go | 6 +- api/grpc/watching.go | 161 ++----------------------------- api/handlers/watching.go | 199 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 212 insertions(+), 154 deletions(-) create mode 100644 api/handlers/watching.go diff --git a/api/grpc/server.go b/api/grpc/server.go index d535e924..8c1bcaa0 100644 --- a/api/grpc/server.go +++ b/api/grpc/server.go @@ -62,8 +62,10 @@ func ServeFundingWatchingAPI(n perun.NodeAPI, grpcPort string) error { }, } watchingServer := &watchingServer{ - n: n, - subscribes: make(map[string]map[pchannel.ID]pchannel.AdjudicatorSubscription), + WatchingHandler: &handlers.WatchingHandler{ + N: n, + Subscribes: make(map[string]map[pchannel.ID]pchannel.AdjudicatorSubscription), + }, } listener, err := net.Listen("tcp", grpcPort) diff --git a/api/grpc/watching.go b/api/grpc/watching.go index ab549340..7ec0fe95 100644 --- a/api/grpc/watching.go +++ b/api/grpc/watching.go @@ -20,181 +20,38 @@ import ( "context" "github.com/pkg/errors" - pchannel "perun.network/go-perun/channel" - pwallet "perun.network/go-perun/wallet" - psync "polycry.pt/poly-go/sync" - "github.com/hyperledger-labs/perun-node" "github.com/hyperledger-labs/perun-node/api/grpc/pb" + "github.com/hyperledger-labs/perun-node/api/handlers" ) // watchingServer represents a grpc server that can serve watching API. type watchingServer struct { pb.UnimplementedWatching_APIServer - n perun.NodeAPI - - // The mutex should be used when accessing the map data structures. - psync.Mutex - subscribes map[string]map[pchannel.ID]pchannel.AdjudicatorSubscription + *handlers.WatchingHandler } // StartWatchingLedgerChannel wraps session.StartWatchingLedgerChannel. -func (a *watchingServer) StartWatchingLedgerChannel( //nolint: funlen, gocognit +func (a *watchingServer) StartWatchingLedgerChannel( srv pb.Watching_API_StartWatchingLedgerChannelServer, ) error { req, err := srv.Recv() if err != nil { return errors.WithMessage(err, "reading request data") } - sess, err := a.n.GetSession(req.SessionID) - if err != nil { - return errors.WithMessage(err, "retrieving session") - } - - signedState, err := signedStateFromProtoLedgerChReq(req) - if err != nil { - return errors.WithMessage(err, "parsing signed state") - } - statesPub, adjSub, err := sess.StartWatchingLedgerChannel(context.TODO(), *signedState) - if err != nil { - return errors.WithMessage(err, "start watching") + sendAdjEvent := func(resp *pb.StartWatchingLedgerChannelResp) error { + return srv.Send(resp) } - // This stream is anyways closed when StopWatching is called for. - // Hence, that will act as the exit condition for the loop. - go func() { - adjEventStream := adjSub.EventStream() - var protoResponse *pb.StartWatchingLedgerChannelResp - for { - adjEvent, isOpen := <-adjEventStream - if !isOpen { - return - } - protoResponse, err = adjEventToProtoLedgerChResp(adjEvent) - if err != nil { - return - } - - // Handle error while sending notification. - err = srv.Send(protoResponse) - if err != nil { - return - } - } - }() - - // It should be the responsibility of the streamer to close things. - // Hence, the client should be closing this stream, which will cause srv.Recv to return an error. - // The error will act as the exit condition for this for{} loop. - var tx *pchannel.Transaction -StatesPubLoop: - for { - req, err = srv.Recv() - if err != nil { - err = errors.WithMessage(err, "reading published states pub data") - break StatesPubLoop - } - tx, err = transactionFromProtoLedgerChReq(req) - if err != nil { - err = errors.WithMessage(err, "parsing published states pub data") - break StatesPubLoop - } - - err = statesPub.Publish(context.TODO(), *tx) - if err != nil { - err = errors.WithMessage(err, "locally relaying published states pub data") - break StatesPubLoop - } + receiveState := func() (req *pb.StartWatchingLedgerChannelReq, err error) { + return srv.Recv() } - // TODO: Ensure adjEventSteam go-routine is killed. It should not be leaking. - // It is to allow for that, label breaks are used instead of return statements in the above for-select. - return err + return a.WatchingHandler.StartWatchingLedgerChannel(req, sendAdjEvent, receiveState) } // StopWatching wraps session.StopWatching. func (a *watchingServer) StopWatching(ctx context.Context, req *pb.StopWatchingReq) (*pb.StopWatchingResp, error) { - errResponse := func(err perun.APIError) *pb.StopWatchingResp { - return &pb.StopWatchingResp{ - Error: pb.FromError(err), - } - } - - sess, err := a.n.GetSession(req.SessionID) - if err != nil { - return errResponse(err), nil - } - var chID pchannel.ID - copy(chID[:], req.ChID) - err2 := sess.StopWatching(ctx, chID) - if err2 != nil { - return errResponse(err), nil - } - - return &pb.StopWatchingResp{Error: nil}, nil -} - -func adjEventToProtoLedgerChResp(adjEvent pchannel.AdjudicatorEvent) (*pb.StartWatchingLedgerChannelResp, error) { - protoResponse := &pb.StartWatchingLedgerChannelResp{} - switch e := adjEvent.(type) { - case *pchannel.RegisteredEvent: - registeredEvent, err := pb.FromRegisteredEvent(e) - protoResponse.Response = &pb.StartWatchingLedgerChannelResp_RegisteredEvent{ - RegisteredEvent: registeredEvent, - } - return protoResponse, err - case *pchannel.ProgressedEvent: - progressedEvent, err := pb.FromProgressedEvent(e) - protoResponse.Response = &pb.StartWatchingLedgerChannelResp_ProgressedEvent{ - ProgressedEvent: progressedEvent, - } - return protoResponse, err - case *pchannel.ConcludedEvent: - concludedEvent, err := pb.FromConcludedEvent(e) - protoResponse.Response = &pb.StartWatchingLedgerChannelResp_ConcludedEvent{ - ConcludedEvent: concludedEvent, - } - return protoResponse, err - default: - apiErr := perun.NewAPIErrUnknownInternal(errors.New("unknown even type")) - protoResponse.Response = &pb.StartWatchingLedgerChannelResp_Error{ - Error: pb.FromError(apiErr), - } - return protoResponse, nil - } -} - -func signedStateFromProtoLedgerChReq(req *pb.StartWatchingLedgerChannelReq) ( - signedState *pchannel.SignedState, err error, -) { - signedState = &pchannel.SignedState{} - signedState.Params, err = pb.ToParams(req.Params) - if err != nil { - return nil, err - } - signedState.State, err = pb.ToState(req.State) - if err != nil { - return nil, err - } - sigs := make([]pwallet.Sig, len(req.Sigs)) - for i := range sigs { - copy(sigs[i], req.Sigs[i]) - } - return signedState, nil -} - -func transactionFromProtoLedgerChReq(req *pb.StartWatchingLedgerChannelReq) ( - transaction *pchannel.Transaction, err error, -) { - transaction = &pchannel.Transaction{} - transaction.State, err = pb.ToState(req.State) - if err != nil { - return nil, err - } - transaction.Sigs = make([]pwallet.Sig, len(req.Sigs)) - for i := range transaction.Sigs { - copy(transaction.Sigs[i], req.Sigs[i]) - } - return transaction, nil + return a.WatchingHandler.StopWatching(ctx, req) } diff --git a/api/handlers/watching.go b/api/handlers/watching.go new file mode 100644 index 00000000..013cbe17 --- /dev/null +++ b/api/handlers/watching.go @@ -0,0 +1,199 @@ +// Copyright (c) 2023 - for information on the respective copyright owner +// see the NOTICE file and/or the repository at +// https://github.com/hyperledger-labs/perun-node +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handlers + +import ( + "context" + + "github.com/pkg/errors" + pchannel "perun.network/go-perun/channel" + pwallet "perun.network/go-perun/wallet" + psync "polycry.pt/poly-go/sync" + + "github.com/hyperledger-labs/perun-node" + "github.com/hyperledger-labs/perun-node/api/grpc/pb" +) + +// WatchingHandler represents a grpc server that can serve watching API. +type WatchingHandler struct { + N perun.NodeAPI + + // The mutex should be used when accessing the map data structures. + psync.Mutex + Subscribes map[string]map[pchannel.ID]pchannel.AdjudicatorSubscription +} + +// StartWatchingLedgerChannel wraps session.StartWatchingLedgerChannel. +func (a *WatchingHandler) StartWatchingLedgerChannel( //nolint: funlen, gocognit + req *pb.StartWatchingLedgerChannelReq, + sendAdjEvent func(resp *pb.StartWatchingLedgerChannelResp) error, + receiveState func() (req *pb.StartWatchingLedgerChannelReq, err error), +) error { + var err error + sess, err := a.N.GetSession(req.SessionID) + if err != nil { + return errors.WithMessage(err, "retrieving session") + } + + signedState, err := signedStateFromProtoLedgerChReq(req) + if err != nil { + return errors.WithMessage(err, "parsing signed state") + } + + statesPub, adjSub, err := sess.StartWatchingLedgerChannel(context.TODO(), *signedState) + if err != nil { + return errors.WithMessage(err, "start watching") + } + + // This stream is anyways closed when StopWatching is called for. + // Hence, that will act as the exit condition for the loop. + go func() { + adjEventStream := adjSub.EventStream() + var protoResponse *pb.StartWatchingLedgerChannelResp + for { + adjEvent, isOpen := <-adjEventStream + if !isOpen { + return + } + protoResponse, err = adjEventToProtoLedgerChResp(adjEvent) + if err != nil { + return + } + + // Handle error while sending notification. + err = sendAdjEvent(protoResponse) + if err != nil { + return + } + } + }() + + // It should be the responsibility of the streamer to close things. + // Hence, the client should be closing this stream, which will cause srv.Recv to return an error. + // The error will act as the exit condition for this for{} loop. + var tx *pchannel.Transaction +StatesPubLoop: + for { + + req, err = receiveState() + if err != nil { + err = errors.WithMessage(err, "reading published states pub data") + break StatesPubLoop + } + tx, err = transactionFromProtoLedgerChReq(req) + if err != nil { + err = errors.WithMessage(err, "parsing published states pub data") + break StatesPubLoop + } + + err = statesPub.Publish(context.TODO(), *tx) + if err != nil { + err = errors.WithMessage(err, "locally relaying published states pub data") + break StatesPubLoop + } + } + + // TODO: Ensure adjEventSteam go-routine is killed. It should not be leaking. + // It is to allow for that, label breaks are used instead of return statements in the above for-select. + return err +} + +// StopWatching wraps session.StopWatching. +func (a *WatchingHandler) StopWatching(ctx context.Context, req *pb.StopWatchingReq) (*pb.StopWatchingResp, error) { + errResponse := func(err perun.APIError) *pb.StopWatchingResp { + return &pb.StopWatchingResp{ + Error: pb.FromError(err), + } + } + + sess, err := a.N.GetSession(req.SessionID) + if err != nil { + return errResponse(err), nil + } + var chID pchannel.ID + copy(chID[:], req.ChID) + err2 := sess.StopWatching(ctx, chID) + if err2 != nil { + return errResponse(err), nil + } + + return &pb.StopWatchingResp{Error: nil}, nil +} + +func adjEventToProtoLedgerChResp(adjEvent pchannel.AdjudicatorEvent) (*pb.StartWatchingLedgerChannelResp, error) { + protoResponse := &pb.StartWatchingLedgerChannelResp{} + switch e := adjEvent.(type) { + case *pchannel.RegisteredEvent: + registeredEvent, err := pb.FromRegisteredEvent(e) + protoResponse.Response = &pb.StartWatchingLedgerChannelResp_RegisteredEvent{ + RegisteredEvent: registeredEvent, + } + return protoResponse, err + case *pchannel.ProgressedEvent: + progressedEvent, err := pb.FromProgressedEvent(e) + protoResponse.Response = &pb.StartWatchingLedgerChannelResp_ProgressedEvent{ + ProgressedEvent: progressedEvent, + } + return protoResponse, err + case *pchannel.ConcludedEvent: + concludedEvent, err := pb.FromConcludedEvent(e) + protoResponse.Response = &pb.StartWatchingLedgerChannelResp_ConcludedEvent{ + ConcludedEvent: concludedEvent, + } + return protoResponse, err + default: + apiErr := perun.NewAPIErrUnknownInternal(errors.New("unknown even type")) + protoResponse.Response = &pb.StartWatchingLedgerChannelResp_Error{ + Error: pb.FromError(apiErr), + } + return protoResponse, nil + } +} + +func signedStateFromProtoLedgerChReq(req *pb.StartWatchingLedgerChannelReq) ( + signedState *pchannel.SignedState, err error, +) { + signedState = &pchannel.SignedState{} + signedState.Params, err = pb.ToParams(req.Params) + if err != nil { + return nil, err + } + signedState.State, err = pb.ToState(req.State) + if err != nil { + return nil, err + } + sigs := make([]pwallet.Sig, len(req.Sigs)) + for i := range sigs { + copy(sigs[i], req.Sigs[i]) + } + return signedState, nil +} + +func transactionFromProtoLedgerChReq(req *pb.StartWatchingLedgerChannelReq) ( + transaction *pchannel.Transaction, err error, +) { + transaction = &pchannel.Transaction{} + transaction.State, err = pb.ToState(req.State) + if err != nil { + return nil, err + } + transaction.Sigs = make([]pwallet.Sig, len(req.Sigs)) + for i := range transaction.Sigs { + copy(transaction.Sigs[i], req.Sigs[i]) + } + return transaction, nil +} From aa7a078d1f6eff9bd97ff736a9bdfad24356d677 Mon Sep 17 00:00:00 2001 From: manoranjith Date: Sun, 10 Dec 2023 15:09:00 +0100 Subject: [PATCH 13/14] Add perunio-tcp api server - Define a api_messages.proto as a enum of all the funding and watching services. - This is needed, for parsing of the request, because unlike grpc, tcp adapter using perunio's custom messaging scheme does not have an in-built routing mechanism. Signed-off-by: manoranjith --- api/grpc/pb/api_messages.pb.go | 342 +++++++++++++++++++++++++++++++++ api/tcp/doc.go | 19 ++ api/tcp/server.go | 252 ++++++++++++++++++++++++ proto/api_messages.proto | 43 +++++ 4 files changed, 656 insertions(+) create mode 100644 api/grpc/pb/api_messages.pb.go create mode 100644 api/tcp/doc.go create mode 100644 api/tcp/server.go create mode 100644 proto/api_messages.proto diff --git a/api/grpc/pb/api_messages.pb.go b/api/grpc/pb/api_messages.pb.go new file mode 100644 index 00000000..54eadf49 --- /dev/null +++ b/api/grpc/pb/api_messages.pb.go @@ -0,0 +1,342 @@ +// Copyright (c) 2023 - for information on the respective copyright owner +// see the NOTICE file and/or the repository at +// https://github.com/hyperledger-labs/perun-node +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.30.0 +// protoc v4.23.3 +// source: api_messages.proto + +// Package pb contains proto3 definitions for user API and the corresponding +// generated code for grpc server and client. + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This type is defined as the enumeration of all messages in funding and +// watching service, in order to be able to parse the messages in api/tcp +// package. +type APIMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Msg: + // + // *APIMessage_FundReq + // *APIMessage_FundResp + // *APIMessage_RegisterReq + // *APIMessage_RegisterResp + // *APIMessage_WithdrawReq + // *APIMessage_WithdrawResp + // *APIMessage_StartWatchingLedgerChannelReq + // *APIMessage_StopWatchingReq + Msg isAPIMessage_Msg `protobuf_oneof:"msg"` +} + +func (x *APIMessage) Reset() { + *x = APIMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_api_messages_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *APIMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*APIMessage) ProtoMessage() {} + +func (x *APIMessage) ProtoReflect() protoreflect.Message { + mi := &file_api_messages_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use APIMessage.ProtoReflect.Descriptor instead. +func (*APIMessage) Descriptor() ([]byte, []int) { + return file_api_messages_proto_rawDescGZIP(), []int{0} +} + +func (m *APIMessage) GetMsg() isAPIMessage_Msg { + if m != nil { + return m.Msg + } + return nil +} + +func (x *APIMessage) GetFundReq() *FundReq { + if x, ok := x.GetMsg().(*APIMessage_FundReq); ok { + return x.FundReq + } + return nil +} + +func (x *APIMessage) GetFundResp() *FundResp { + if x, ok := x.GetMsg().(*APIMessage_FundResp); ok { + return x.FundResp + } + return nil +} + +func (x *APIMessage) GetRegisterReq() *RegisterReq { + if x, ok := x.GetMsg().(*APIMessage_RegisterReq); ok { + return x.RegisterReq + } + return nil +} + +func (x *APIMessage) GetRegisterResp() *RegisterResp { + if x, ok := x.GetMsg().(*APIMessage_RegisterResp); ok { + return x.RegisterResp + } + return nil +} + +func (x *APIMessage) GetWithdrawReq() *WithdrawReq { + if x, ok := x.GetMsg().(*APIMessage_WithdrawReq); ok { + return x.WithdrawReq + } + return nil +} + +func (x *APIMessage) GetWithdrawResp() *WithdrawResp { + if x, ok := x.GetMsg().(*APIMessage_WithdrawResp); ok { + return x.WithdrawResp + } + return nil +} + +func (x *APIMessage) GetStartWatchingLedgerChannelReq() *StartWatchingLedgerChannelReq { + if x, ok := x.GetMsg().(*APIMessage_StartWatchingLedgerChannelReq); ok { + return x.StartWatchingLedgerChannelReq + } + return nil +} + +func (x *APIMessage) GetStopWatchingReq() *StopWatchingReq { + if x, ok := x.GetMsg().(*APIMessage_StopWatchingReq); ok { + return x.StopWatchingReq + } + return nil +} + +type isAPIMessage_Msg interface { + isAPIMessage_Msg() +} + +type APIMessage_FundReq struct { + FundReq *FundReq `protobuf:"bytes,1,opt,name=fund_req,json=fundReq,proto3,oneof"` +} + +type APIMessage_FundResp struct { + FundResp *FundResp `protobuf:"bytes,2,opt,name=fund_resp,json=fundResp,proto3,oneof"` +} + +type APIMessage_RegisterReq struct { + RegisterReq *RegisterReq `protobuf:"bytes,3,opt,name=register_req,json=registerReq,proto3,oneof"` +} + +type APIMessage_RegisterResp struct { + RegisterResp *RegisterResp `protobuf:"bytes,4,opt,name=register_resp,json=registerResp,proto3,oneof"` +} + +type APIMessage_WithdrawReq struct { + WithdrawReq *WithdrawReq `protobuf:"bytes,5,opt,name=withdraw_req,json=withdrawReq,proto3,oneof"` +} + +type APIMessage_WithdrawResp struct { + WithdrawResp *WithdrawResp `protobuf:"bytes,6,opt,name=withdraw_resp,json=withdrawResp,proto3,oneof"` +} + +type APIMessage_StartWatchingLedgerChannelReq struct { + StartWatchingLedgerChannelReq *StartWatchingLedgerChannelReq `protobuf:"bytes,7,opt,name=start_watching_ledger_channel_req,json=startWatchingLedgerChannelReq,proto3,oneof"` +} + +type APIMessage_StopWatchingReq struct { + StopWatchingReq *StopWatchingReq `protobuf:"bytes,8,opt,name=stop_watching_req,json=stopWatchingReq,proto3,oneof"` +} + +func (*APIMessage_FundReq) isAPIMessage_Msg() {} + +func (*APIMessage_FundResp) isAPIMessage_Msg() {} + +func (*APIMessage_RegisterReq) isAPIMessage_Msg() {} + +func (*APIMessage_RegisterResp) isAPIMessage_Msg() {} + +func (*APIMessage_WithdrawReq) isAPIMessage_Msg() {} + +func (*APIMessage_WithdrawResp) isAPIMessage_Msg() {} + +func (*APIMessage_StartWatchingLedgerChannelReq) isAPIMessage_Msg() {} + +func (*APIMessage_StopWatchingReq) isAPIMessage_Msg() {} + +var File_api_messages_proto protoreflect.FileDescriptor + +var file_api_messages_proto_rawDesc = []byte{ + 0x0a, 0x12, 0x61, 0x70, 0x69, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x02, 0x70, 0x62, 0x1a, 0x15, 0x66, 0x75, 0x6e, 0x64, 0x69, 0x6e, + 0x67, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, + 0x16, 0x77, 0x61, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xfa, 0x03, 0x0a, 0x0a, 0x41, 0x50, 0x49, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x28, 0x0a, 0x08, 0x66, 0x75, 0x6e, 0x64, 0x5f, 0x72, + 0x65, 0x71, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x70, 0x62, 0x2e, 0x46, 0x75, + 0x6e, 0x64, 0x52, 0x65, 0x71, 0x48, 0x00, 0x52, 0x07, 0x66, 0x75, 0x6e, 0x64, 0x52, 0x65, 0x71, + 0x12, 0x2b, 0x0a, 0x09, 0x66, 0x75, 0x6e, 0x64, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x70, 0x62, 0x2e, 0x46, 0x75, 0x6e, 0x64, 0x52, 0x65, 0x73, + 0x70, 0x48, 0x00, 0x52, 0x08, 0x66, 0x75, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x12, 0x34, 0x0a, + 0x0c, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x72, 0x65, 0x71, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, + 0x72, 0x52, 0x65, 0x71, 0x48, 0x00, 0x52, 0x0b, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, + 0x52, 0x65, 0x71, 0x12, 0x37, 0x0a, 0x0d, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x5f, + 0x72, 0x65, 0x73, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x70, 0x62, 0x2e, + 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x48, 0x00, 0x52, 0x0c, + 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x12, 0x34, 0x0a, 0x0c, + 0x77, 0x69, 0x74, 0x68, 0x64, 0x72, 0x61, 0x77, 0x5f, 0x72, 0x65, 0x71, 0x18, 0x05, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x57, 0x69, 0x74, 0x68, 0x64, 0x72, 0x61, 0x77, + 0x52, 0x65, 0x71, 0x48, 0x00, 0x52, 0x0b, 0x77, 0x69, 0x74, 0x68, 0x64, 0x72, 0x61, 0x77, 0x52, + 0x65, 0x71, 0x12, 0x37, 0x0a, 0x0d, 0x77, 0x69, 0x74, 0x68, 0x64, 0x72, 0x61, 0x77, 0x5f, 0x72, + 0x65, 0x73, 0x70, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x70, 0x62, 0x2e, 0x57, + 0x69, 0x74, 0x68, 0x64, 0x72, 0x61, 0x77, 0x52, 0x65, 0x73, 0x70, 0x48, 0x00, 0x52, 0x0c, 0x77, + 0x69, 0x74, 0x68, 0x64, 0x72, 0x61, 0x77, 0x52, 0x65, 0x73, 0x70, 0x12, 0x6d, 0x0a, 0x21, 0x73, + 0x74, 0x61, 0x72, 0x74, 0x5f, 0x77, 0x61, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x5f, 0x6c, 0x65, + 0x64, 0x67, 0x65, 0x72, 0x5f, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x5f, 0x72, 0x65, 0x71, + 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x72, + 0x74, 0x57, 0x61, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x4c, 0x65, 0x64, 0x67, 0x65, 0x72, 0x43, + 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x65, 0x71, 0x48, 0x00, 0x52, 0x1d, 0x73, 0x74, 0x61, + 0x72, 0x74, 0x57, 0x61, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x4c, 0x65, 0x64, 0x67, 0x65, 0x72, + 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x65, 0x71, 0x12, 0x41, 0x0a, 0x11, 0x73, 0x74, + 0x6f, 0x70, 0x5f, 0x77, 0x61, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x5f, 0x72, 0x65, 0x71, 0x18, + 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x6f, 0x70, 0x57, + 0x61, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x48, 0x00, 0x52, 0x0f, 0x73, 0x74, + 0x6f, 0x70, 0x57, 0x61, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x42, 0x05, 0x0a, + 0x03, 0x6d, 0x73, 0x67, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x3b, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_api_messages_proto_rawDescOnce sync.Once + file_api_messages_proto_rawDescData = file_api_messages_proto_rawDesc +) + +func file_api_messages_proto_rawDescGZIP() []byte { + file_api_messages_proto_rawDescOnce.Do(func() { + file_api_messages_proto_rawDescData = protoimpl.X.CompressGZIP(file_api_messages_proto_rawDescData) + }) + return file_api_messages_proto_rawDescData +} + +var file_api_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_api_messages_proto_goTypes = []interface{}{ + (*APIMessage)(nil), // 0: pb.APIMessage + (*FundReq)(nil), // 1: pb.FundReq + (*FundResp)(nil), // 2: pb.FundResp + (*RegisterReq)(nil), // 3: pb.RegisterReq + (*RegisterResp)(nil), // 4: pb.RegisterResp + (*WithdrawReq)(nil), // 5: pb.WithdrawReq + (*WithdrawResp)(nil), // 6: pb.WithdrawResp + (*StartWatchingLedgerChannelReq)(nil), // 7: pb.StartWatchingLedgerChannelReq + (*StopWatchingReq)(nil), // 8: pb.StopWatchingReq +} +var file_api_messages_proto_depIdxs = []int32{ + 1, // 0: pb.APIMessage.fund_req:type_name -> pb.FundReq + 2, // 1: pb.APIMessage.fund_resp:type_name -> pb.FundResp + 3, // 2: pb.APIMessage.register_req:type_name -> pb.RegisterReq + 4, // 3: pb.APIMessage.register_resp:type_name -> pb.RegisterResp + 5, // 4: pb.APIMessage.withdraw_req:type_name -> pb.WithdrawReq + 6, // 5: pb.APIMessage.withdraw_resp:type_name -> pb.WithdrawResp + 7, // 6: pb.APIMessage.start_watching_ledger_channel_req:type_name -> pb.StartWatchingLedgerChannelReq + 8, // 7: pb.APIMessage.stop_watching_req:type_name -> pb.StopWatchingReq + 8, // [8:8] is the sub-list for method output_type + 8, // [8:8] is the sub-list for method input_type + 8, // [8:8] is the sub-list for extension type_name + 8, // [8:8] is the sub-list for extension extendee + 0, // [0:8] is the sub-list for field type_name +} + +func init() { file_api_messages_proto_init() } +func file_api_messages_proto_init() { + if File_api_messages_proto != nil { + return + } + file_funding_service_proto_init() + file_watching_service_proto_init() + if !protoimpl.UnsafeEnabled { + file_api_messages_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*APIMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_api_messages_proto_msgTypes[0].OneofWrappers = []interface{}{ + (*APIMessage_FundReq)(nil), + (*APIMessage_FundResp)(nil), + (*APIMessage_RegisterReq)(nil), + (*APIMessage_RegisterResp)(nil), + (*APIMessage_WithdrawReq)(nil), + (*APIMessage_WithdrawResp)(nil), + (*APIMessage_StartWatchingLedgerChannelReq)(nil), + (*APIMessage_StopWatchingReq)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_api_messages_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_api_messages_proto_goTypes, + DependencyIndexes: file_api_messages_proto_depIdxs, + MessageInfos: file_api_messages_proto_msgTypes, + }.Build() + File_api_messages_proto = out.File + file_api_messages_proto_rawDesc = nil + file_api_messages_proto_goTypes = nil + file_api_messages_proto_depIdxs = nil +} diff --git a/api/tcp/doc.go b/api/tcp/doc.go new file mode 100644 index 00000000..9f058dc8 --- /dev/null +++ b/api/tcp/doc.go @@ -0,0 +1,19 @@ +// Copyright (c) 2020 - for information on the respective copyright owner +// see the NOTICE file and/or the repository at +// https://github.com/hyperledger-labs/perun-node +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package peruntcp implements a funding and watching API server over tcp with +// perun protobuf encoding. +package peruntcp diff --git a/api/tcp/server.go b/api/tcp/server.go new file mode 100644 index 00000000..ba7e60c4 --- /dev/null +++ b/api/tcp/server.go @@ -0,0 +1,252 @@ +// Copyright (c) 2020 - for information on the respective copyright owner +// see the NOTICE file and/or the repository at +// https://github.com/hyperledger-labs/perun-node +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package peruntcp + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "net" + + "github.com/hyperledger-labs/perun-node" + "github.com/hyperledger-labs/perun-node/api/grpc/pb" + "github.com/hyperledger-labs/perun-node/api/handlers" + "github.com/hyperledger-labs/perun-node/app/payment" + + "google.golang.org/protobuf/proto" + pchannel "perun.network/go-perun/channel" + "perun.network/go-perun/log" + psync "polycry.pt/poly-go/sync" +) + +type server struct { + psync.Closer + + server net.Listener + + fundingHandler *handlers.FundingHandler + watchingHandler *handlers.WatchingHandler + + sessionID string // For timebeing use hard-coded session-id + + channels map[string](chan *pb.StartWatchingLedgerChannelReq) + channelsMtx psync.Mutex +} + +// ServeFundingWatchingAPI starts a payment channel API server that listens for incoming grpc +// requests at the specified address and serves those requests using the node API instance. +func ServeFundingWatchingAPI(n perun.NodeAPI, port string) error { + var err error + sessionID, _, err := payment.OpenSession(n, "api/session.yaml") + if err != nil { + return err + } + + fundingServer := &handlers.FundingHandler{ + N: n, + Subscribes: make(map[string]map[pchannel.ID]pchannel.AdjudicatorSubscription), + } + watchingServer := &handlers.WatchingHandler{ + N: n, + Subscribes: make(map[string]map[pchannel.ID]pchannel.AdjudicatorSubscription), + } + + tcpServer, err := net.Listen("tcp", port) + if err != nil { + return fmt.Errorf("listener: %w", err) + } + + s := &server{ + server: tcpServer, + fundingHandler: fundingServer, + watchingHandler: watchingServer, + sessionID: sessionID, + + channels: make(map[string](chan *pb.StartWatchingLedgerChannelReq), 10), + } + s.OnCloseAlways(func() { tcpServer.Close() }) //nolint: errcheck, gosec + + for { + conn, err := s.server.Accept() + if err != nil { + return err + } + + go s.handle(conn) + } +} + +func (s *server) handle(conn io.ReadWriteCloser) { + defer conn.Close() //nolint: errcheck + s.OnCloseAlways(func() { conn.Close() }) //nolint: errcheck, gosec + + var m psync.Mutex + + for { + msg, err := recvMsg(conn) + // log.Info("received message", msg, err) + if err != nil { + log.Errorf("%+v", msg) + log.Errorf("here decoding message failed: %v", err) + return + } + + go func() { + switch msg := msg.GetMsg().(type) { + case *pb.APIMessage_FundReq: + s.handleFundReq(msg, &m, conn) + + case *pb.APIMessage_RegisterReq: + s.handleRegisterReq(msg, &m, conn) + case *pb.APIMessage_WithdrawReq: + s.handleWithdrawReq(msg, &m, conn) + case *pb.APIMessage_StartWatchingLedgerChannelReq: + s.handleStartWatchingLedgerChannelReq(msg) + case *pb.APIMessage_StopWatchingReq: + s.handleStopWatching(msg) + + } + }() + } +} + +func (s *server) handleFundReq(msg *pb.APIMessage_FundReq, m *psync.Mutex, conn io.ReadWriteCloser) { //nolint: dupl + log.Warnf("Server: Got Funding request") + msg.FundReq.SessionID = s.sessionID + fundResp, err := s.fundingHandler.Fund(context.Background(), msg.FundReq) + if err != nil { + log.Errorf("fund response error +%v", err) + } + err = sendMsg(m, conn, &pb.APIMessage{Msg: &pb.APIMessage_FundResp{ + FundResp: fundResp, + }}) + if err != nil { + log.Errorf("sending response error +%v", err) + } +} + +//nolint:dupl +func (s *server) handleRegisterReq(msg *pb.APIMessage_RegisterReq, m *psync.Mutex, conn io.ReadWriteCloser) { + log.Warnf("Server: Got Registering request") + msg.RegisterReq.SessionID = s.sessionID + registerResp, err := s.fundingHandler.Register(context.Background(), msg.RegisterReq) + if err != nil { + log.Errorf("register response error +%v", err) + } + err = sendMsg(m, conn, &pb.APIMessage{Msg: &pb.APIMessage_RegisterResp{ + RegisterResp: registerResp, + }}) + if err != nil { + log.Errorf("sending response error +%v", err) + } +} + +//nolint:dupl +func (s *server) handleWithdrawReq(msg *pb.APIMessage_WithdrawReq, m *psync.Mutex, conn io.ReadWriteCloser) { + log.Warnf("Server: Got Withdrawing request") + msg.WithdrawReq.SessionID = s.sessionID + withdrawResp, err := s.fundingHandler.Withdraw(context.Background(), msg.WithdrawReq) + if err != nil { + log.Errorf("withdraw response error +%v", err) + } + err = sendMsg(m, conn, &pb.APIMessage{Msg: &pb.APIMessage_WithdrawResp{ + WithdrawResp: withdrawResp, + }}) + if err != nil { + log.Errorf("sending response error +%v", err) + } +} + +func (s *server) handleStartWatchingLedgerChannelReq(msg *pb.APIMessage_StartWatchingLedgerChannelReq) { + log.Warnf("Server: Got Watching request") + msg.StartWatchingLedgerChannelReq.SessionID = s.sessionID + + s.channelsMtx.Lock() + ch, ok := s.channels[string(msg.StartWatchingLedgerChannelReq.State.Id)] + s.channelsMtx.Unlock() + if ok { + ch <- msg.StartWatchingLedgerChannelReq + return + } + + ch = make(chan *pb.StartWatchingLedgerChannelReq, 10) + s.channelsMtx.Lock() + s.channels[string(msg.StartWatchingLedgerChannelReq.State.Id)] = ch + s.channelsMtx.Unlock() + + receiveState := func() (*pb.StartWatchingLedgerChannelReq, error) { + update, ok := <-ch + if !ok { + return nil, errors.New("subscription closed") + } + return update, nil + } + + sendAdjEvent := func(resp *pb.StartWatchingLedgerChannelResp) error { + return nil + } + + err := s.watchingHandler.StartWatchingLedgerChannel( + msg.StartWatchingLedgerChannelReq, + sendAdjEvent, + receiveState) + if err != nil { + log.Errorf("start watching returned with error +%v", err) + } +} + +func (s *server) handleStopWatching(msg *pb.APIMessage_StopWatchingReq) { + msg.StopWatchingReq.SessionID = s.sessionID + _, err := s.watchingHandler.StopWatching(context.Background(), msg.StopWatchingReq) + if err != nil { + log.Errorf("start watching returned with error +%v", err) + } +} + +func recvMsg(conn io.Reader) (*pb.APIMessage, error) { + var size uint16 + if err := binary.Read(conn, binary.BigEndian, &size); err != nil { + return nil, fmt.Errorf("reading size of data from wire: %w", err) + } + data := make([]byte, size) + if _, err := io.ReadFull(conn, data); err != nil { + return nil, fmt.Errorf("reading data from wire: %w", err) + } + var msg pb.APIMessage + if err := proto.Unmarshal(data, &msg); err != nil { + return nil, fmt.Errorf("unmarshaling message: %w", err) + } + return &msg, nil +} + +func sendMsg(m *psync.Mutex, conn io.Writer, msg *pb.APIMessage) error { + m.Lock() + defer m.Unlock() + data, err := proto.Marshal(msg) + if err != nil { + return fmt.Errorf("marshaling message: %w", err) + } + if err = binary.Write(conn, binary.BigEndian, uint16(len(data))); err != nil { + return fmt.Errorf("writing length to wire: %w", err) + } + if _, err = conn.Write(data); err != nil { + return fmt.Errorf("writing data to wire: %w", err) + } + return nil +} diff --git a/proto/api_messages.proto b/proto/api_messages.proto new file mode 100644 index 00000000..29e0faf1 --- /dev/null +++ b/proto/api_messages.proto @@ -0,0 +1,43 @@ +// Copyright (c) 2023 - for information on the respective copyright owner +// see the NOTICE file and/or the repository at +// https://github.com/hyperledger-labs/perun-node +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +// Package pb contains proto3 definitions for user API and the corresponding +// generated code for grpc server and client. +package pb; + +import "funding_service.proto"; +import "watching_service.proto"; + +// Option go_package is to specify the exact path where the generated go code should reside. +option go_package = ".;pb"; + +// This type is defined as the enumeration of all messages in funding and +// watching service, in order to be able to parse the messages in api/tcp +// package. +message APIMessage{ + oneof msg { + FundReq fund_req = 1; + FundResp fund_resp = 2; + RegisterReq register_req = 3; + RegisterResp register_resp = 4; + WithdrawReq withdraw_req = 5; + WithdrawResp withdraw_resp = 6; + StartWatchingLedgerChannelReq start_watching_ledger_channel_req = 7; + StopWatchingReq stop_watching_req = 8; + } +} From 20833870f0791b276a1c5cf60d6a29b8aaa70a05 Mon Sep 17 00:00:00 2001 From: manoranjith Date: Sun, 10 Dec 2023 15:10:42 +0100 Subject: [PATCH 14/14] Add api_messages.proto to go generate command - Also combine the commands for watching and funding service, because the api_messages.proto depends on both of these definitions. Signed-off-by: manoranjith --- generateproto.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/generateproto.go b/generateproto.go index 97ef36b1..525f01e2 100644 --- a/generateproto.go +++ b/generateproto.go @@ -17,5 +17,4 @@ package perun //go:generate protoc --proto_path=proto --go_out=api/grpc/pb --go-grpc_out=api/grpc/pb proto/nodetypes.proto proto/errors.proto proto/payment_service.proto -//go:generate protoc --proto_path=proto --go_out=api/grpc/pb --go-grpc_out=api/grpc/pb proto/sdktypes.proto proto/funding_service.proto -//go:generate protoc --proto_path=proto --go_out=api/grpc/pb --go-grpc_out=api/grpc/pb proto/sdktypes.proto proto/watching_service.proto +//go:generate protoc --proto_path=proto --go_out=api/grpc/pb --go-grpc_out=api/grpc/pb proto/sdktypes.proto proto/watching_service.proto proto/funding_service.proto proto/api_messages.proto