Skip to content

Commit 4411e84

Browse files
committed
refactoring
1 parent 1f4b918 commit 4411e84

30 files changed

Lines changed: 928 additions & 1098 deletions

api/client/client.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ import (
99
logging "github.com/ipfs/go-log/v2"
1010
"google.golang.org/grpc"
1111

12+
appfibre "github.com/celestiaorg/celestia-app/v8/fibre"
13+
1214
"github.com/celestiaorg/celestia-node/blob"
15+
"github.com/celestiaorg/celestia-node/fibre"
1316
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
1417
stateapi "github.com/celestiaorg/celestia-node/nodebuilder/state"
1518
"github.com/celestiaorg/celestia-node/state"
@@ -139,8 +142,24 @@ func (c *Client) initTxClient(
139142
}
140143
c.State = core
141144

145+
// setup fibre client if the fibre key is available in the keyring
146+
var fibreSvc *fibre.Service
147+
fibreCfg := appfibre.DefaultClientConfig()
148+
fibreCfg.StateAddress = conn.Target()
149+
fibreClient, err := appfibre.NewClient(kr, fibreCfg)
150+
if err != nil {
151+
log.Warnw("fibre client not available, fibre blob submission disabled", "err", err)
152+
} else {
153+
if err := fibreClient.Start(ctx); err != nil {
154+
log.Warnw("failed to start fibre client, fibre blob submission disabled", "err", err)
155+
} else {
156+
acc := fibre.NewAccountClient(tc, conn)
157+
fibreSvc = fibre.NewService(fibreClient, tc, acc)
158+
}
159+
}
160+
142161
// setup blob submission service using core
143-
blobSvc := blob.NewService(core, nil, nil, nil, nil)
162+
blobSvc := blob.NewService(core, fibreSvc, nil, nil, nil)
144163
err = blobSvc.Start(ctx)
145164
if err != nil {
146165
_ = core.Stop(ctx)
@@ -161,6 +180,11 @@ func (c *Client) initTxClient(
161180
if err != nil {
162181
return fmt.Errorf("failed to stop blob service: %w", err)
163182
}
183+
if fibreClient != nil {
184+
if stopErr := fibreClient.Stop(ctx); stopErr != nil {
185+
log.Warnw("failed to stop fibre client", "err", stopErr)
186+
}
187+
}
164188
err = core.Stop(ctx)
165189
if err != nil {
166190
return fmt.Errorf("failed to stop core accessor: %w", err)

api/client/client_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ import (
1212
"time"
1313

1414
coregrpc "github.com/cometbft/cometbft/rpc/grpc"
15+
"github.com/cosmos/cosmos-sdk/crypto/hd"
1516
"github.com/cosmos/cosmos-sdk/crypto/keyring"
1617
"github.com/cristalhq/jwt/v5"
1718
gomock "github.com/golang/mock/gomock"
1819
"github.com/stretchr/testify/require"
1920
"go.uber.org/fx"
2021

22+
appfibre "github.com/celestiaorg/celestia-app/v8/fibre"
2123
"github.com/celestiaorg/celestia-app/v8/test/util/testnode"
2224
libshare "github.com/celestiaorg/go-square/v4/share"
2325

@@ -396,6 +398,9 @@ func bridgeNode(t *testing.T, ctx context.Context, cctx testnode.Context) (*node
396398
}
397399
kr, err := KeyringWithNewKey(keysCfg, tempDir)
398400
require.NoError(t, err)
401+
// add the fibre key required by the fibre module
402+
_, _, err = kr.NewMnemonic(appfibre.DefaultKeyName, keyring.English, "", "", hd.Secp256k1)
403+
require.NoError(t, err)
399404

400405
bn, err := nodebuilder.New(node.Bridge, p2p.Private, store,
401406
addAuth,

api/docgen/examples.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/celestiaorg/celestia-node/das"
3232
"github.com/celestiaorg/celestia-node/fibre"
3333
"github.com/celestiaorg/celestia-node/header"
34+
fibre2 "github.com/celestiaorg/celestia-node/nodebuilder/fibre"
3435
"github.com/celestiaorg/celestia-node/nodebuilder/node"
3536
"github.com/celestiaorg/celestia-node/share"
3637
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
@@ -196,10 +197,10 @@ func init() {
196197
copy(fibreCommitment[:], commitment)
197198
add(fibreCommitment)
198199

199-
exampleUploadResult := fibre.UploadResult{
200+
exampleUploadResult := fibre2.UploadResult{
200201
Commitment: fibreCommitment,
201-
ValidatorSignatures: []fibre.ValidatorSignature{[]byte("validator_signature_bytes")},
202-
PaymentPromise: &fibre.PaymentPromise{
202+
ValidatorSignatures: []fibre2.ValidatorSignature{[]byte("validator_signature_bytes")},
203+
PaymentPromise: &fibre2.PaymentPromise{
203204
ChainID: "celestia",
204205
Namespace: namespace,
205206
BlobSize: 1024,
@@ -212,16 +213,16 @@ func init() {
212213
}
213214
add(&exampleUploadResult)
214215

215-
exampleSubmitResult := fibre.SubmitResult{
216+
exampleSubmitResult := fibre2.SubmitResult{
216217
Commitment: fibreCommitment,
217-
ValidatorSignatures: []fibre.ValidatorSignature{[]byte("validator_signature_bytes")},
218+
ValidatorSignatures: []fibre2.ValidatorSignature{[]byte("validator_signature_bytes")},
218219
Height: 42,
219220
TxHash: "A5CF62609391B17E0340A6E07BD15860AFA4BE7F5DAF28F2E22A1C3B0CE85E64",
220221
PaymentPromise: exampleUploadResult.PaymentPromise,
221222
}
222223
add(&exampleSubmitResult)
223224

224-
add(&fibre.GetBlobResponse{
225+
add(&fibre2.GetBlobResult{
225226
Data: []byte("fibre blob data"),
226227
})
227228

@@ -238,7 +239,7 @@ func init() {
238239
AvailableTimestamp: time.Date(2025, 1, 8, 0, 0, 0, 0, time.UTC),
239240
})
240241

241-
add(fibre.PaymentPromise{
242+
add(fibre2.PaymentPromise{
242243
ChainID: "celestia",
243244
Namespace: namespace,
244245
BlobSize: 1024,

blob/service.go

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@ import (
1717
"go.opentelemetry.io/otel/codes"
1818
"go.opentelemetry.io/otel/trace"
1919

20+
appfibre "github.com/celestiaorg/celestia-app/v8/fibre"
2021
"github.com/celestiaorg/celestia-app/v8/pkg/appconsts"
2122
pkgproof "github.com/celestiaorg/celestia-app/v8/pkg/proof"
2223
"github.com/celestiaorg/go-square/v4/inclusion"
2324
libshare "github.com/celestiaorg/go-square/v4/share"
2425
"github.com/celestiaorg/nmt"
2526
"github.com/celestiaorg/rsmt2d"
2627

27-
"github.com/celestiaorg/celestia-node/fibre"
2828
"github.com/celestiaorg/celestia-node/header"
2929
"github.com/celestiaorg/celestia-node/libs/utils"
3030
"github.com/celestiaorg/celestia-node/share"
@@ -34,8 +34,9 @@ import (
3434
)
3535

3636
var (
37-
ErrBlobNotFound = errors.New("blob: not found")
38-
ErrInvalidProof = errors.New("blob: invalid proof")
37+
ErrBlobNotFound = errors.New("blob: not found")
38+
ErrInvalidProof = errors.New("blob: invalid proof")
39+
ErrMixedBlobTypes = errors.New("blob: cannot mix fibre (v2) and regular blobs in a single submit")
3940

4041
log = logging.Logger("blob")
4142
tracer = otel.Tracer("blob/service")
@@ -54,7 +55,16 @@ type Submitter interface {
5455

5556
// FibreSubmitter is an interface for submitting blobs to the Fibre network.
5657
type FibreSubmitter interface {
57-
Submit(ctx context.Context, ns libshare.Namespace, data []byte, config *txclient.TxConfig) (*fibre.SubmitResult, error)
58+
Submit(
59+
ctx context.Context,
60+
ns libshare.Namespace,
61+
data []byte,
62+
config *txclient.TxConfig,
63+
) (
64+
_ *appfibre.PutResult,
65+
_ *appfibre.PaymentPromise,
66+
err error,
67+
)
5868
}
5969

6070
type Service struct {
@@ -187,10 +197,14 @@ func (s *Service) Subscribe(ctx context.Context, ns libshare.Namespace) (<-chan
187197
return blobCh, nil
188198
}
189199

190-
// Submit sends PFB transaction and reports the height at which it was included.
200+
// Submit sends PFB/PFF transaction and reports the height at which it was included.
191201
// Allows sending multiple Blobs atomically synchronously.
192202
// Uses default wallet registered on the Node.
193203
// Handles gas estimation and fee calculation.
204+
//
205+
// If all blobs are fibre blobs (share version 2), they are submitted via the fibre network.
206+
// If all blobs are regular blobs, they are submitted via the standard PFB path.
207+
// Mixing fibre and regular blobs in a single submit is not allowed.
194208
func (s *Service) Submit(ctx context.Context, blobs []*Blob, txConfig *SubmitOptions) (_ uint64, err error) {
195209
ctx, span := tracer.Start(ctx, "blob/submit")
196210
defer func() {
@@ -202,6 +216,17 @@ func (s *Service) Submit(ctx context.Context, blobs []*Blob, txConfig *SubmitOpt
202216
}
203217
}()
204218

219+
allFibre := slices.IndexFunc(blobs, func(b *Blob) bool { return !b.IsFibreBlob() }) == -1
220+
anyFibre := slices.IndexFunc(blobs, func(b *Blob) bool { return b.IsFibreBlob() }) != -1
221+
222+
if anyFibre && !allFibre {
223+
return 0, ErrMixedBlobTypes
224+
}
225+
226+
if allFibre {
227+
return s.submitFibreBlobs(ctx, blobs, txConfig)
228+
}
229+
205230
libBlobs := make([]*libshare.Blob, len(blobs))
206231
for i := range blobs {
207232
libBlobs[i] = blobs[i].Blob
@@ -213,39 +238,20 @@ func (s *Service) Submit(ctx context.Context, blobs []*Blob, txConfig *SubmitOpt
213238
return uint64(resp.Height), nil
214239
}
215240

216-
// SubmitFibreBlob submits a blob via the Fibre network.
217-
// It performs the full Fibre flow: upload to FSPs, aggregate validator signatures,
218-
// and submit MsgPayForFibre on-chain.
219-
func (s *Service) SubmitFibreBlob(
220-
ctx context.Context,
221-
ns libshare.Namespace,
222-
data []byte,
223-
txConfig *SubmitOptions,
224-
) (_ *fibre.SubmitResult, err error) {
225-
ctx, span := tracer.Start(ctx, "blob/submit-fibre")
226-
defer func() {
227-
utils.SetStatusAndEnd(span, err)
228-
if err != nil {
229-
log.Errorw("submitting fibre blob failed", "err", err,
230-
"namespace", ns.String(),
231-
)
232-
}
233-
}()
234-
241+
func (s *Service) submitFibreBlobs(ctx context.Context, blobs []*Blob, txConfig *SubmitOptions) (uint64, error) {
235242
if s.fibreSubmitter == nil {
236-
err = fmt.Errorf("fibre submitter is not configured: node is not connected to a core endpoint")
237-
return nil, err
243+
return 0, fmt.Errorf("fibre submitter is not available")
238244
}
239245

240-
log.Infow("submitting fibre blob", "namespace", ns.String(), "data-size", len(data))
241-
result, err := s.fibreSubmitter.Submit(ctx, ns, data, txConfig)
242-
if err != nil {
243-
err = fmt.Errorf("fibre submit: %w", err)
244-
return nil, err
246+
var height uint64
247+
for _, blob := range blobs {
248+
res, _, err := s.fibreSubmitter.Submit(ctx, blob.Namespace(), blob.Data(), txConfig)
249+
if err != nil {
250+
return 0, err
251+
}
252+
height = res.Height
245253
}
246-
247-
log.Debugw("fibre blob submitted", "namespace", ns.String(), "height", result.Height, "tx-hash", result.TxHash)
248-
return result, nil
254+
return height, nil
249255
}
250256

251257
// Get retrieves a blob in a given namespace at the given height by commitment.

0 commit comments

Comments
 (0)