Skip to content
This repository was archived by the owner on Aug 19, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cmd/run_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"crypto/ecdsa"
"fmt"
"github.com/0xPolygonHermez/zkevm-node/dataavailability/celestia"
"os"

dataCommitteeClient "github.com/0xPolygon/cdk-data-availability/client"
"github.com/0xPolygonHermez/zkevm-node/config"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/0xPolygonHermez/zkevm-node/pool"
"github.com/0xPolygonHermez/zkevm-node/sequencesender"
"github.com/0xPolygonHermez/zkevm-node/state"
openrpc "github.com/celestiaorg/celestia-openrpc"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
)
Expand Down Expand Up @@ -95,6 +98,7 @@ func newDataAvailability(c config.Config, st *state.State, etherman *etherman.Cl
if err != nil {
return nil, fmt.Errorf("error getting data availability protocol name: %v", err)
}

var daBackend dataavailability.DABackender
switch daProtocolName {
case string(dataavailability.DataAvailabilityCommittee):
Expand Down Expand Up @@ -122,6 +126,18 @@ func newDataAvailability(c config.Config, st *state.State, etherman *etherman.Cl
if err != nil {
return nil, err
}
case string(dataavailability.Celestia):
// TODO load Celestia config
conf := celestia.Config{
GasPrice: float64(openrpc.DefaultGasPrice()),
Rpc: os.Getenv("ZKEVM_NODE_CELESTIA_URI"),
NamespaceId: os.Getenv("ZKEVM_NODE_CELESTIA_NAMESPACEID"),
AuthToken: "",
}
daBackend, err = celestia.New(conf)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unexpected / unsupported DA protocol: %s", daProtocolName)
}
Expand Down
56 changes: 56 additions & 0 deletions dataavailability/celestia/blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package celestia

import (
"bytes"
"encoding/binary"
)

// BlobPointer contains the reference to the data blob on Celestia
type BlobPointer struct {
BlockHeight uint64
TxCommitment [32]byte
}

// MarshalBinary encodes the BlobPointer to binary
// serialization format: height + start + end + commitment + data root
func (b *BlobPointer) MarshalBinary() ([]byte, error) {
buf := new(bytes.Buffer)

// Writing fixed-size values
if err := binary.Write(buf, binary.BigEndian, b.BlockHeight); err != nil {
return nil, err
}

// Writing fixed-size byte arrays directly
if _, err := buf.Write(b.TxCommitment[:]); err != nil {
return nil, err
}

return buf.Bytes(), nil
}

// UnmarshalBinary decodes the binary to BlobPointer
// serialization format: height + start + end + commitment + data root
func (b *BlobPointer) UnmarshalBinary(data []byte) error {
buf := bytes.NewReader(data)

// Reading fixed-size values
if err := binary.Read(buf, binary.BigEndian, &b.BlockHeight); err != nil {
return err
}

// Reading fixed-size byte arrays directly
if err := readFixedBytes(buf, b.TxCommitment[:]); err != nil {
return err
}

return nil
}

// readFixedBytes reads a fixed number of bytes into a byte slice
func readFixedBytes(buf *bytes.Reader, data []byte) error {
if _, err := buf.Read(data); err != nil {
return err
}
return nil
}
251 changes: 251 additions & 0 deletions dataavailability/celestia/celestia.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package celestia

import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"

"github.com/0xPolygonHermez/zkevm-node/log"
openrpc "github.com/celestiaorg/celestia-openrpc"
"github.com/celestiaorg/celestia-openrpc/types/blob"
"github.com/celestiaorg/celestia-openrpc/types/share"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
)

const unexpectedHashTemplate = "mismatch on transaction data. Expected hash %s, actual hash: %s"

// Config Configuration for the CelestiaDA node
type Config struct {
// L1RpcUrl string
GasPrice float64
Rpc string
// TendermintRPC string
NamespaceId string
AuthToken string
// BlobstreamXAddress string
// EventChannelSize uint64
}

// Batch Types to help with Batch Data Aggregation
//type Batch struct {
// ChangeL2BlockMarker uint8
// DeltaTimestamp uint32
// IndexL1InfoTree uint32
// Transactions []Transaction
//}
//
//type Transaction struct {
// RLP []byte
// R, S, V []byte
// EfficiencyPercent byte
//}

// CelestiaBackend implements the Celestia integration
type CelestiaBackend struct {
Cfg Config
Client *openrpc.Client
// Trpc *http.HTTP
Namespace share.Namespace
// BlobstreamX *blobstreamx.BlobstreamX
}

func New(cfg Config) (*CelestiaBackend, error) {
if cfg.NamespaceId == "" {
return nil, errors.New("namespace id cannot be blank")
}
nsBytes, err := hex.DecodeString(cfg.NamespaceId)
if err != nil {
return nil, err
}

namespace, err := share.NewBlobNamespaceV0(nsBytes)
if err != nil {
return nil, err
}

// var trpc *http.HTTP
// trpc, err = http.New(cfg.TendermintRPC, "/websocket")

// if cfg.EventChannelSize == 0 {
// cfg.EventChannelSize = 100
// }

return &CelestiaBackend{
Cfg: cfg,
Client: nil,
Namespace: namespace,
}, nil
}

func (c *CelestiaBackend) Init() error {
daClient, err := openrpc.NewClient(context.Background(), c.Cfg.Rpc, c.Cfg.AuthToken)
if err != nil {
return err
}
c.Client = daClient
return nil
}

func (c *CelestiaBackend) PostSequence(ctx context.Context, batchesData [][]byte) ([]byte, error) {
var aggregatedBatch []byte
for _, batchData := range batchesData {
aggregatedBatch = append(aggregatedBatch, batchData...)
}

dataBlob, err := blob.NewBlobV0(c.Namespace, aggregatedBatch)
if err != nil {
log.Warnf("Error creating blob, error: %s", err)
return nil, err
}

commitment, err := blob.CreateCommitment(dataBlob)
if err != nil {
log.Warnf("Error creating commitment, error: %s", err)
return nil, err
}

height, err := c.Client.Blob.Submit(ctx, []*blob.Blob{dataBlob}, openrpc.GasPrice(c.Cfg.GasPrice))
if err != nil {
log.Warnf("Blob Submission error, error: %s", err)
return nil, err
}

if height == 0 {
log.Warnf("Unexpected height from blob response, height: %d", height)
return nil, errors.New("unexpected response code")
}

proofs, err := c.Client.Blob.GetProof(ctx, height, c.Namespace, commitment)
if err != nil {
log.Warnf("Error retrieving proof, error: %s", err)
return nil, err
}

included, err := c.Client.Blob.Included(ctx, height, c.Namespace, proofs, commitment)
if err != nil || !included {
log.Warnf("Error checking for inclusion, error: %s, proof: %v", err, proofs)
return nil, err
}
log.Infof("Successfully posted blob, height: %d, commitment: %s", height, hex.EncodeToString(commitment))

txCommitment := [32]byte{}
copy(txCommitment[:], commitment)

blobPointer := BlobPointer{
BlockHeight: height,
TxCommitment: txCommitment,
}

blobPointerData, err := blobPointer.MarshalBinary()
if err != nil {
log.Warnf("BlobPointer Marshal Binary error: %s", err)
return nil, err
}

buf := new(bytes.Buffer)
err = binary.Write(buf, binary.BigEndian, blobPointerData)
if err != nil {
log.Warnf("blob pointer data serialization failed, error: %s", err)
return nil, err
}

// will need to wait for Blobstream or Equivalency service
return buf.Bytes(), nil
}

func (c *CelestiaBackend) GetSequence(ctx context.Context, batchHashes []common.Hash, dataAvailabilityMessage []byte) ([][]byte, error) {
blobPointer := BlobPointer{}
err := blobPointer.UnmarshalBinary(dataAvailabilityMessage)
if err != nil {
return nil, err
}
dataBlob, err := c.Client.Blob.Get(ctx, blobPointer.BlockHeight, c.Namespace, blobPointer.TxCommitment[:])
if err != nil {
return nil, err
}

aggregatedBatchData := dataBlob.Data

// need to read data using the dataAvailabilityMessage
decodedBatches, err := decodeBatches(aggregatedBatchData)
if err != nil {
fmt.Printf("Error decoding batches: %v\n", err)
return nil, err
}

if len(decodedBatches) != len(batchHashes) {
return nil, fmt.Errorf("error decoded batches length and batch hashes length mismatch")
}

for i, batchData := range decodedBatches {
actualTransactionsHash := crypto.Keccak256Hash(batchData)
if actualTransactionsHash != batchHashes[i] {
unexpectedHash := fmt.Errorf(
unexpectedHashTemplate, batchHashes[i], actualTransactionsHash,
)
log.Warnf(
"error getting data from Celestia node at height %s with commitment %s: %s",
blobPointer.BlockHeight, blobPointer.TxCommitment, unexpectedHash,
)
}
}
return decodedBatches, nil
}

func decodeBatches(data []byte) ([][]byte, error) {
var batches [][]byte
offset := 0

for offset < len(data) {
// Skip the changeL2BlockMarker
offset++

// Read the DeltaTimestamp and IndexL1InfoTree
_ = binary.BigEndian.Uint32(data[offset : offset+4]) // DeltaTimestamp
offset += 4
_ = binary.BigEndian.Uint32(data[offset : offset+4]) // IndexL1InfoTree
offset += 4

var batchBytes []byte
for offset < len(data) {
rlpLen := binary.BigEndian.Uint32(data[offset : offset+4])
offset += 4
rlpEnd := offset + int(rlpLen)
batchBytes = append(batchBytes, data[offset:rlpEnd]...)
offset = rlpEnd

rLen := binary.BigEndian.Uint32(data[offset : offset+4])
offset += 4
rEnd := offset + int(rLen)
batchBytes = append(batchBytes, data[offset:rEnd]...)
offset = rEnd

sLen := binary.BigEndian.Uint32(data[offset : offset+4])
offset += 4
sEnd := offset + int(sLen)
batchBytes = append(batchBytes, data[offset:sEnd]...)
offset = sEnd

vLen := binary.BigEndian.Uint32(data[offset : offset+4])
offset += 4
vEnd := offset + int(vLen)
batchBytes = append(batchBytes, data[offset:vEnd]...)
offset = vEnd

batchBytes = append(batchBytes, data[offset]) // EfficiencyPercent
offset++

if offset >= len(data) {
break
}
}

batches = append(batches, batchBytes)
}

return batches, nil
}
2 changes: 2 additions & 0 deletions dataavailability/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ type DABackendType string
const (
// DataAvailabilityCommittee is the DAC protocol backend
DataAvailabilityCommittee DABackendType = "DataAvailabilityCommittee"
// Celestia is the DA protocol backend for Celestia
Celestia DABackendType = "Celestia"
)
Loading