From f2cfb1a505f8279f07bb41ff9754abd12065e85a Mon Sep 17 00:00:00 2001 From: j-rafique Date: Fri, 16 May 2025 17:02:09 +0500 Subject: [PATCH] optimize file handling by streaming chunks and hashing on the fly to reduce memory usage --- pkg/codec/codec.go | 5 +- pkg/codec/raptorq.go | 49 ++++------ pkg/logtrace/fields.go | 1 + .../server/cascade/cascade_action_server.go | 98 +++++++++++++++++-- supernode/services/cascade/adaptors/rq.go | 9 +- supernode/services/cascade/helper.go | 14 ++- supernode/services/cascade/register.go | 16 ++- tests/system/e2e_cascade_test.go | 5 +- 8 files changed, 136 insertions(+), 61 deletions(-) diff --git a/pkg/codec/codec.go b/pkg/codec/codec.go index 9963c988..e644d3b5 100644 --- a/pkg/codec/codec.go +++ b/pkg/codec/codec.go @@ -28,8 +28,9 @@ type Block struct { // EncodeRequest represents the request to encode a file. type EncodeRequest struct { - TaskID string - Data []byte + TaskID string + Path string + DataSize int } // RaptorQ contains methods for request services from RaptorQ service. diff --git a/pkg/codec/raptorq.go b/pkg/codec/raptorq.go index 140e4834..198f1103 100644 --- a/pkg/codec/raptorq.go +++ b/pkg/codec/raptorq.go @@ -24,61 +24,48 @@ func NewRaptorQCodec(dir string) Codec { func (rq *raptorQ) Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error) { /* ---------- 1. initialise RaptorQ processor ---------- */ + fields := logtrace.Fields{ + logtrace.FieldMethod: "Encode", + logtrace.FieldModule: "rq", + logtrace.FieldTaskID: req.TaskID, + "path": req.Path, + "data-size": req.DataSize, + } processor, err := raptorq.NewDefaultRaptorQProcessor() if err != nil { return EncodeResponse{}, fmt.Errorf("create RaptorQ processor: %w", err) } defer processor.Free() + logtrace.Info(ctx, "RaptorQ processor created", fields) - logtrace.Info(ctx, "RaptorQ processor created", logtrace.Fields{ - "data-size": len(req.Data)}) - /* ---------- 2. persist req.Data to a temp file ---------- */ - - tmp, err := os.CreateTemp("", "rq-encode-*") - if err != nil { - return EncodeResponse{}, fmt.Errorf("create temp file: %w", err) - } - tmpPath := tmp.Name() - if _, err := tmp.Write(req.Data); err != nil { - tmp.Close() - os.Remove(tmpPath) - return EncodeResponse{}, fmt.Errorf("write temp file: %w", err) - } - if err := tmp.Close(); err != nil { // sync to disk - os.Remove(tmpPath) - return EncodeResponse{}, fmt.Errorf("close temp file: %w", err) - } - - /* ---------- 3. run the encoder ---------- */ - - blockSize := processor.GetRecommendedBlockSize(uint64(len(req.Data))) + /* ---------- 1. run the encoder ---------- */ + blockSize := processor.GetRecommendedBlockSize(uint64(req.DataSize)) symbolsDir := filepath.Join(rq.symbolsBaseDir, req.TaskID) if err := os.MkdirAll(symbolsDir, 0o755); err != nil { - os.Remove(tmpPath) + fields[logtrace.FieldError] = err.Error() + os.Remove(req.Path) return EncodeResponse{}, fmt.Errorf("mkdir %s: %w", symbolsDir, err) } + logtrace.Info(ctx, "RaptorQ processor encoding", fields) - logtrace.Info(ctx, "RaptorQ processor encoding", logtrace.Fields{ - "symbols-dir": symbolsDir, - "temp-file": tmpPath}) - - resp, err := processor.EncodeFile(tmpPath, symbolsDir, blockSize) + resp, err := processor.EncodeFile(req.Path, symbolsDir, blockSize) if err != nil { - os.Remove(tmpPath) + fields[logtrace.FieldError] = err.Error() + os.Remove(req.Path) return EncodeResponse{}, fmt.Errorf("raptorq encode: %w", err) } /* we no longer need the temp file */ // _ = os.Remove(tmpPath) - /* ---------- 4. read the layout JSON ---------- */ + /* ---------- 2. read the layout JSON ---------- */ layoutData, err := os.ReadFile(resp.LayoutFilePath) - logtrace.Info(ctx, "RaptorQ processor layout file", logtrace.Fields{ "layout-file": resp.LayoutFilePath}) if err != nil { + fields[logtrace.FieldError] = err.Error() return EncodeResponse{}, fmt.Errorf("read layout %s: %w", resp.LayoutFilePath, err) } diff --git a/pkg/logtrace/fields.go b/pkg/logtrace/fields.go index 0ef9be21..d8a38b8a 100644 --- a/pkg/logtrace/fields.go +++ b/pkg/logtrace/fields.go @@ -18,4 +18,5 @@ const ( FieldTxHash = "tx_hash" FieldTaskID = "task_id" FieldActionID = "action_id" + FieldHashHex = "hash_hex" ) diff --git a/supernode/node/action/server/cascade/cascade_action_server.go b/supernode/node/action/server/cascade/cascade_action_server.go index 4deca38b..ad77b421 100644 --- a/supernode/node/action/server/cascade/cascade_action_server.go +++ b/supernode/node/action/server/cascade/cascade_action_server.go @@ -1,14 +1,18 @@ package cascade import ( + "encoding/hex" "fmt" + "github.com/LumeraProtocol/supernode/pkg/errors" + "google.golang.org/grpc" "io" + "lukechampine.com/blake3" + "os" + "path/filepath" pb "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade" "github.com/LumeraProtocol/supernode/pkg/logtrace" cascadeService "github.com/LumeraProtocol/supernode/supernode/services/cascade" - - "google.golang.org/grpc" ) type ActionServer struct { @@ -34,9 +38,24 @@ func (server *ActionServer) Register(stream pb.CascadeService_RegisterServer) er ctx := stream.Context() logtrace.Info(ctx, "client streaming request to upload cascade input data received", fields) - // Collect data chunks - var allData []byte - var metadata *pb.Metadata + var ( + metadata *pb.Metadata + totalSize int + ) + + hasher, tempFile, tempFilePath, err := initializeHasherAndTempFile() + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to initialize hasher and temp file", fields) + return fmt.Errorf("initializing hasher and temp file: %w", err) + } + defer func(tempFile *os.File) { + err := tempFile.Close() + if err != nil && !errors.Is(err, os.ErrClosed) { + fields[logtrace.FieldError] = err.Error() + logtrace.Warn(ctx, "error closing temp file", fields) + } + }(tempFile) // Process incoming stream for { @@ -55,11 +74,27 @@ func (server *ActionServer) Register(stream pb.CascadeService_RegisterServer) er switch x := req.RequestType.(type) { case *pb.RegisterRequest_Chunk: if x.Chunk != nil { - // Add data chunk to our collection - allData = append(allData, x.Chunk.Data...) + + // hash the chunks + _, err := hasher.Write(x.Chunk.Data) + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to write chunk to hasher", fields) + return fmt.Errorf("hashing error: %w", err) + } + + // write chunks to the file + _, err = tempFile.Write(x.Chunk.Data) + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to write chunk to file", fields) + return fmt.Errorf("file write error: %w", err) + } + totalSize += len(x.Chunk.Data) + logtrace.Info(ctx, "received data chunk", logtrace.Fields{ "chunk_size": len(x.Chunk.Data), - "total_size_so_far": len(allData), + "total_size_so_far": totalSize, }) } case *pb.RegisterRequest_Metadata: @@ -81,12 +116,26 @@ func (server *ActionServer) Register(stream pb.CascadeService_RegisterServer) er fields[logtrace.FieldActionID] = metadata.GetActionId() logtrace.Info(ctx, "metadata received from action-sdk", fields) + hash := hasher.Sum(nil) + hashHex := hex.EncodeToString(hash) + fields[logtrace.FieldHashHex] = hashHex + logtrace.Info(ctx, "final BLAKE3 hash generated", fields) + + targetPath, err := replaceTempDirWithTaskDir(metadata.GetTaskId(), tempFilePath, tempFile) + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to replace temp dir with task dir", fields) + return fmt.Errorf("failed to replace temp dir with task dir: %w", err) + } + // Process the complete data task := server.factory.NewCascadeRegistrationTask() - err := task.Register(ctx, &cascadeService.RegisterRequest{ + err = task.Register(ctx, &cascadeService.RegisterRequest{ TaskID: metadata.TaskId, ActionID: metadata.ActionId, - Data: allData, + DataHash: hash, + DataSize: totalSize, + FilePath: targetPath, }, func(resp *cascadeService.RegisterResponse) error { grpcResp := &pb.RegisterResponse{ EventType: pb.SupernodeEventType(resp.EventType), @@ -112,3 +161,32 @@ func (server *ActionServer) Register(stream pb.CascadeService_RegisterServer) er logtrace.Info(ctx, "cascade registration completed successfully", fields) return nil } + +func initializeHasherAndTempFile() (*blake3.Hasher, *os.File, string, error) { + hasher := blake3.New(32, nil) + + tempFilePath := filepath.Join(os.TempDir(), fmt.Sprintf("cascade-upload-%d.tmp", os.Getpid())) + tempFile, err := os.Create(tempFilePath) + if err != nil { + return nil, nil, "", fmt.Errorf("could not create temp file: %w", err) + } + + return hasher, tempFile, tempFilePath, nil +} + +func replaceTempDirWithTaskDir(taskID, tempFilePath string, tempFile *os.File) (targetPath string, err error) { + if err := tempFile.Close(); err != nil && !errors.Is(err, os.ErrClosed) { + return "", fmt.Errorf("failed to close temp file: %w", err) + } + + targetDir := filepath.Join(os.TempDir(), taskID) + if err := os.MkdirAll(targetDir, 0755); err != nil { + return "", fmt.Errorf("could not create task directory: %w", err) + } + targetPath = filepath.Join(targetDir, fmt.Sprintf("uploaded-%s.dat", taskID)) + if err := os.Rename(tempFilePath, targetPath); err != nil { + return "", fmt.Errorf("could not move file to final location: %w", err) + } + + return targetPath, nil +} diff --git a/supernode/services/cascade/adaptors/rq.go b/supernode/services/cascade/adaptors/rq.go index ed0d4f2d..2228b466 100644 --- a/supernode/services/cascade/adaptors/rq.go +++ b/supernode/services/cascade/adaptors/rq.go @@ -10,7 +10,7 @@ import ( // //go:generate mockgen -destination=mocks/rq_mock.go -package=cascadeadaptormocks -source=rq.go type CodecService interface { - EncodeInput(ctx context.Context, taskID string, data []byte) (EncodeResult, error) + EncodeInput(ctx context.Context, taskID string, path string, dataSize int) (EncodeResult, error) } // EncodeResult represents the outcome of encoding the input data. @@ -30,10 +30,11 @@ func NewCodecService(codec codec.Codec) CodecService { } // EncodeInput encodes the provided data and returns symbols and metadata. -func (c *codecImpl) EncodeInput(ctx context.Context, taskID string, data []byte) (EncodeResult, error) { +func (c *codecImpl) EncodeInput(ctx context.Context, taskID string, path string, dataSize int) (EncodeResult, error) { resp, err := c.codec.Encode(ctx, codec.EncodeRequest{ - TaskID: taskID, - Data: data, + TaskID: taskID, + Path: path, + DataSize: dataSize, }) if err != nil { return EncodeResult{}, err diff --git a/supernode/services/cascade/helper.go b/supernode/services/cascade/helper.go index 09e35978..28805da1 100644 --- a/supernode/services/cascade/helper.go +++ b/supernode/services/cascade/helper.go @@ -68,8 +68,7 @@ func (task *CascadeRegistrationTask) decodeCascadeMetadata(ctx context.Context, return meta, nil } -func (task *CascadeRegistrationTask) verifyDataHash(ctx context.Context, data []byte, expected string, f logtrace.Fields) error { - dh, _ := utils.Blake3Hash(data) +func (task *CascadeRegistrationTask) verifyDataHash(ctx context.Context, dh []byte, expected string, f logtrace.Fields) error { b64 := utils.B64Encode(dh) if string(b64) != expected { return task.wrapErr(ctx, "data hash doesn't match", errors.New(""), f) @@ -79,8 +78,8 @@ func (task *CascadeRegistrationTask) verifyDataHash(ctx context.Context, data [] return nil } -func (task *CascadeRegistrationTask) encodeInput(ctx context.Context, data []byte, f logtrace.Fields) (*adaptors.EncodeResult, error) { - resp, err := task.rq.EncodeInput(ctx, task.ID(), data) +func (task *CascadeRegistrationTask) encodeInput(ctx context.Context, path string, dataSize int, f logtrace.Fields) (*adaptors.EncodeResult, error) { + resp, err := task.rq.EncodeInput(ctx, task.ID(), path, dataSize) if err != nil { return nil, task.wrapErr(ctx, "failed to encode data", err, f) } @@ -201,7 +200,7 @@ func verifyIDs(ticketMetadata, metadata codec.Layout) error { // verifyActionFee checks if the action fee is sufficient for the given data size // It fetches action parameters, calculates the required fee, and compares it with the action price -func (task *CascadeRegistrationTask) verifyActionFee(ctx context.Context, action *actiontypes.Action, data []byte, fields logtrace.Fields) error { +func (task *CascadeRegistrationTask) verifyActionFee(ctx context.Context, action *actiontypes.Action, dataSize int, fields logtrace.Fields) error { // Fetch action parameters params, err := task.lumeraClient.GetActionParams(ctx) if err != nil { @@ -213,8 +212,7 @@ func (task *CascadeRegistrationTask) verifyActionFee(ctx context.Context, action feePerByte := params.Params.FeePerByte.Amount // Calculate per-byte fee based on data size - dataBytes := len(data) - perByteFee := sdk.NewCoin(baseFee.Denom, feePerByte.Mul(math.NewInt(int64(dataBytes)))) + perByteFee := sdk.NewCoin(baseFee.Denom, feePerByte.Mul(math.NewInt(int64(dataSize)))) // Calculate total fee requiredFee := baseFee.Add(perByteFee) @@ -222,7 +220,7 @@ func (task *CascadeRegistrationTask) verifyActionFee(ctx context.Context, action // Log the calculated fee logtrace.Info(ctx, "calculated required fee", logtrace.Fields{ "fee": requiredFee.String(), - "dataBytes": dataBytes, + "dataBytes": dataSize, }) // Check if action price is less than required fee if action.Price.IsLT(requiredFee) { diff --git a/supernode/services/cascade/register.go b/supernode/services/cascade/register.go index 705fff9e..32f69830 100644 --- a/supernode/services/cascade/register.go +++ b/supernode/services/cascade/register.go @@ -2,6 +2,7 @@ package cascade import ( "context" + "os" "github.com/LumeraProtocol/supernode/pkg/logtrace" ) @@ -10,7 +11,9 @@ import ( type RegisterRequest struct { TaskID string ActionID string - Data []byte + DataHash []byte + DataSize int + FilePath string } // RegisterResponse contains the result of upload @@ -57,7 +60,7 @@ func (task *CascadeRegistrationTask) Register( task.streamEvent(SupernodeEventTypeActionRetrieved, "action has been retrieved", "", send) /* 2. Verify action fee -------------------------------------------------------- */ - if err := task.verifyActionFee(ctx, action, req.Data, fields); err != nil { + if err := task.verifyActionFee(ctx, action, req.DataSize, fields); err != nil { return err } logtrace.Info(ctx, "action fee has been validated", fields) @@ -80,14 +83,14 @@ func (task *CascadeRegistrationTask) Register( task.streamEvent(SupernodeEventTypeMetadataDecoded, "cascade metadata has been decoded", "", send) /* 5. Verify data hash --------------------------------------------------------- */ - if err := task.verifyDataHash(ctx, req.Data, cascadeMeta.DataHash, fields); err != nil { + if err := task.verifyDataHash(ctx, req.DataHash, cascadeMeta.DataHash, fields); err != nil { return err } logtrace.Info(ctx, "data-hash has been verified", fields) task.streamEvent(SupernodeEventTypeDataHashVerified, "data-hash has been verified", "", send) /* 6. Encode the raw data ------------------------------------------------------ */ - encResp, err := task.encodeInput(ctx, req.Data, fields) + encResp, err := task.encodeInput(ctx, req.FilePath, req.DataSize, fields) if err != nil { return err } @@ -136,5 +139,10 @@ func (task *CascadeRegistrationTask) Register( logtrace.Info(ctx, "action has been finalized", fields) task.streamEvent(SupernodeEventTypeActionFinalized, "action has been finalized", resp.TxHash, send) + err = os.RemoveAll(req.FilePath) + if err != nil { + logtrace.Warn(ctx, "error removing file", fields) + } + return nil } diff --git a/tests/system/e2e_cascade_test.go b/tests/system/e2e_cascade_test.go index 01952602..ae99824b 100644 --- a/tests/system/e2e_cascade_test.go +++ b/tests/system/e2e_cascade_test.go @@ -261,8 +261,9 @@ func TestCascadeE2E(t *testing.T) { ctx := context.Background() encodeRes, err := rqCodec.Encode(ctx, codec.EncodeRequest{ - Data: data, - TaskID: "1", + Path: testFileFullpath, + DataSize: int(fileInfo.Size()), + TaskID: "1", }) require.NoError(t, err, "Failed to encode data with RaptorQ")