Skip to content

Commit 34aedd2

Browse files
committed
feat: refactor Celestia blob client to use new datypes package for result types and error handling
1 parent 389804c commit 34aedd2

File tree

10 files changed

+671
-217
lines changed

10 files changed

+671
-217
lines changed

block/internal/da/celestia_client.go

Lines changed: 63 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,7 @@ import (
1313

1414
celestia "github.com/evstack/ev-node/da/celestia"
1515
"github.com/evstack/ev-node/pkg/blob"
16-
)
17-
18-
var (
19-
ErrBlobNotFound = errors.New("blob: not found")
20-
ErrBlobSizeOverLimit = errors.New("blob: over size limit")
21-
ErrTxTimedOut = errors.New("timed out waiting for tx to be included in a block")
22-
ErrTxAlreadyInMempool = errors.New("tx already in mempool")
23-
ErrTxIncorrectAccountSequence = errors.New("incorrect account sequence")
24-
ErrContextDeadline = errors.New("context deadline")
25-
ErrHeightFromFuture = errors.New("given height is from the future")
26-
ErrContextCanceled = errors.New("context canceled")
16+
datypes "github.com/evstack/ev-node/pkg/da/types"
2717
)
2818

2919
// CelestiaBlobConfig contains configuration for the Celestia blob client.
@@ -69,7 +59,7 @@ func NewCelestiaBlob(cfg CelestiaBlobConfig) *CelestiaBlobClient {
6959
}
7060

7161
// Submit submits blobs to the DA layer with the specified options.
72-
func (c *CelestiaBlobClient) Submit(ctx context.Context, data [][]byte, namespace []byte, options []byte) ResultSubmit {
62+
func (c *CelestiaBlobClient) Submit(ctx context.Context, data [][]byte, namespace []byte, options []byte) datypes.ResultSubmit {
7363
// calculate blob size
7464
var blobSize uint64
7565
for _, b := range data {
@@ -78,9 +68,9 @@ func (c *CelestiaBlobClient) Submit(ctx context.Context, data [][]byte, namespac
7868

7969
ns, err := share.NewNamespaceFromBytes(namespace)
8070
if err != nil {
81-
return ResultSubmit{
82-
BaseResult: BaseResult{
83-
Code: StatusError,
71+
return datypes.ResultSubmit{
72+
BaseResult: datypes.BaseResult{
73+
Code: datypes.StatusError,
8474
Message: fmt.Sprintf("invalid namespace: %v", err),
8575
},
8676
}
@@ -89,18 +79,18 @@ func (c *CelestiaBlobClient) Submit(ctx context.Context, data [][]byte, namespac
8979
blobs := make([]*blob.Blob, len(data))
9080
for i, raw := range data {
9181
if uint64(len(raw)) > c.maxBlobSize {
92-
return ResultSubmit{
93-
BaseResult: BaseResult{
94-
Code: StatusTooBig,
95-
Message: ErrBlobSizeOverLimit.Error(),
82+
return datypes.ResultSubmit{
83+
BaseResult: datypes.BaseResult{
84+
Code: datypes.StatusTooBig,
85+
Message: datypes.ErrBlobSizeOverLimit.Error(),
9686
},
9787
}
9888
}
9989
blobs[i], err = blob.NewBlobV0(ns, raw)
10090
if err != nil {
101-
return ResultSubmit{
102-
BaseResult: BaseResult{
103-
Code: StatusError,
91+
return datypes.ResultSubmit{
92+
BaseResult: datypes.BaseResult{
93+
Code: datypes.StatusError,
10494
Message: fmt.Sprintf("failed to build blob %d: %v", i, err),
10595
},
10696
}
@@ -110,9 +100,9 @@ func (c *CelestiaBlobClient) Submit(ctx context.Context, data [][]byte, namespac
110100
var submitOpts blob.SubmitOptions
111101
if len(options) > 0 {
112102
if err := json.Unmarshal(options, &submitOpts); err != nil {
113-
return ResultSubmit{
114-
BaseResult: BaseResult{
115-
Code: StatusError,
103+
return datypes.ResultSubmit{
104+
BaseResult: datypes.BaseResult{
105+
Code: datypes.StatusError,
116106
Message: fmt.Sprintf("failed to parse submit options: %v", err),
117107
},
118108
}
@@ -121,28 +111,28 @@ func (c *CelestiaBlobClient) Submit(ctx context.Context, data [][]byte, namespac
121111

122112
height, err := c.blobAPI.Submit(ctx, blobs, &submitOpts)
123113
if err != nil {
124-
code := StatusError
114+
code := datypes.StatusError
125115
switch {
126116
case errors.Is(err, context.Canceled):
127-
code = StatusContextCanceled
128-
case strings.Contains(err.Error(), ErrTxTimedOut.Error()):
129-
code = StatusNotIncludedInBlock
130-
case strings.Contains(err.Error(), ErrTxAlreadyInMempool.Error()):
131-
code = StatusAlreadyInMempool
132-
case strings.Contains(err.Error(), ErrTxIncorrectAccountSequence.Error()):
133-
code = StatusIncorrectAccountSequence
134-
case strings.Contains(err.Error(), ErrBlobSizeOverLimit.Error()):
135-
code = StatusTooBig
136-
case strings.Contains(err.Error(), ErrContextDeadline.Error()):
137-
code = StatusContextDeadline
117+
code = datypes.StatusContextCanceled
118+
case strings.Contains(err.Error(), datypes.ErrTxTimedOut.Error()):
119+
code = datypes.StatusNotIncludedInBlock
120+
case strings.Contains(err.Error(), datypes.ErrTxAlreadyInMempool.Error()):
121+
code = datypes.StatusAlreadyInMempool
122+
case strings.Contains(err.Error(), datypes.ErrTxIncorrectAccountSequence.Error()):
123+
code = datypes.StatusIncorrectAccountSequence
124+
case strings.Contains(err.Error(), datypes.ErrBlobSizeOverLimit.Error()):
125+
code = datypes.StatusTooBig
126+
case strings.Contains(err.Error(), datypes.ErrContextDeadline.Error()):
127+
code = datypes.StatusContextDeadline
138128
}
139-
if code == StatusTooBig {
129+
if code == datypes.StatusTooBig {
140130
c.logger.Debug().Err(err).Uint64("status", uint64(code)).Msg("DA submission failed")
141131
} else {
142132
c.logger.Error().Err(err).Uint64("status", uint64(code)).Msg("DA submission failed")
143133
}
144-
return ResultSubmit{
145-
BaseResult: BaseResult{
134+
return datypes.ResultSubmit{
135+
BaseResult: datypes.BaseResult{
146136
Code: code,
147137
Message: "failed to submit blobs: " + err.Error(),
148138
SubmittedCount: 0,
@@ -154,23 +144,23 @@ func (c *CelestiaBlobClient) Submit(ctx context.Context, data [][]byte, namespac
154144
}
155145

156146
if len(blobs) == 0 {
157-
return ResultSubmit{
158-
BaseResult: BaseResult{
159-
Code: StatusSuccess,
147+
return datypes.ResultSubmit{
148+
BaseResult: datypes.BaseResult{
149+
Code: datypes.StatusSuccess,
160150
BlobSize: blobSize,
161151
Height: height,
162152
},
163153
}
164154
}
165155

166-
ids := make([]ID, len(blobs))
156+
ids := make([]datypes.ID, len(blobs))
167157
for i, b := range blobs {
168158
ids[i] = blob.MakeID(height, b.Commitment)
169159
}
170160

171-
return ResultSubmit{
172-
BaseResult: BaseResult{
173-
Code: StatusSuccess,
161+
return datypes.ResultSubmit{
162+
BaseResult: datypes.BaseResult{
163+
Code: datypes.StatusSuccess,
174164
IDs: ids,
175165
SubmittedCount: uint64(len(ids)),
176166
Height: height,
@@ -181,12 +171,12 @@ func (c *CelestiaBlobClient) Submit(ctx context.Context, data [][]byte, namespac
181171
}
182172

183173
// Retrieve retrieves blobs from the DA layer at the specified height and namespace.
184-
func (c *CelestiaBlobClient) Retrieve(ctx context.Context, height uint64, namespace []byte) ResultRetrieve {
174+
func (c *CelestiaBlobClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve {
185175
ns, err := share.NewNamespaceFromBytes(namespace)
186176
if err != nil {
187-
return ResultRetrieve{
188-
BaseResult: BaseResult{
189-
Code: StatusError,
177+
return datypes.ResultRetrieve{
178+
BaseResult: datypes.BaseResult{
179+
Code: datypes.StatusError,
190180
Message: fmt.Sprintf("invalid namespace: %v", err),
191181
Height: height,
192182
},
@@ -200,29 +190,29 @@ func (c *CelestiaBlobClient) Retrieve(ctx context.Context, height uint64, namesp
200190
if err != nil {
201191
// Handle known errors by substring because RPC may wrap them.
202192
switch {
203-
case strings.Contains(err.Error(), ErrBlobNotFound.Error()):
204-
return ResultRetrieve{
205-
BaseResult: BaseResult{
206-
Code: StatusNotFound,
207-
Message: ErrBlobNotFound.Error(),
193+
case strings.Contains(err.Error(), datypes.ErrBlobNotFound.Error()):
194+
return datypes.ResultRetrieve{
195+
BaseResult: datypes.BaseResult{
196+
Code: datypes.StatusNotFound,
197+
Message: datypes.ErrBlobNotFound.Error(),
208198
Height: height,
209199
Timestamp: time.Now(),
210200
},
211201
}
212-
case strings.Contains(err.Error(), ErrHeightFromFuture.Error()):
213-
return ResultRetrieve{
214-
BaseResult: BaseResult{
215-
Code: StatusHeightFromFuture,
216-
Message: ErrHeightFromFuture.Error(),
202+
case strings.Contains(err.Error(), datypes.ErrHeightFromFuture.Error()):
203+
return datypes.ResultRetrieve{
204+
BaseResult: datypes.BaseResult{
205+
Code: datypes.StatusHeightFromFuture,
206+
Message: datypes.ErrHeightFromFuture.Error(),
217207
Height: height,
218208
Timestamp: time.Now(),
219209
},
220210
}
221211
default:
222212
c.logger.Error().Uint64("height", height).Err(err).Msg("failed to get blobs")
223-
return ResultRetrieve{
224-
BaseResult: BaseResult{
225-
Code: StatusError,
213+
return datypes.ResultRetrieve{
214+
BaseResult: datypes.BaseResult{
215+
Code: datypes.StatusError,
226216
Message: fmt.Sprintf("failed to get blobs: %s", err.Error()),
227217
Height: height,
228218
Timestamp: time.Now(),
@@ -233,10 +223,10 @@ func (c *CelestiaBlobClient) Retrieve(ctx context.Context, height uint64, namesp
233223

234224
if len(blobs) == 0 {
235225
c.logger.Debug().Uint64("height", height).Msg("No blobs found at height")
236-
return ResultRetrieve{
237-
BaseResult: BaseResult{
238-
Code: StatusNotFound,
239-
Message: ErrBlobNotFound.Error(),
226+
return datypes.ResultRetrieve{
227+
BaseResult: datypes.BaseResult{
228+
Code: datypes.StatusNotFound,
229+
Message: datypes.ErrBlobNotFound.Error(),
240230
Height: height,
241231
Timestamp: time.Now(),
242232
},
@@ -250,9 +240,9 @@ func (c *CelestiaBlobClient) Retrieve(ctx context.Context, height uint64, namesp
250240
ids[i] = blob.MakeID(height, b.Commitment)
251241
}
252242

253-
return ResultRetrieve{
254-
BaseResult: BaseResult{
255-
Code: StatusSuccess,
243+
return datypes.ResultRetrieve{
244+
BaseResult: datypes.BaseResult{
245+
Code: datypes.StatusSuccess,
256246
Height: height,
257247
IDs: ids,
258248
Timestamp: time.Now(),
@@ -262,12 +252,12 @@ func (c *CelestiaBlobClient) Retrieve(ctx context.Context, height uint64, namesp
262252
}
263253

264254
// RetrieveHeaders retrieves blobs from the header namespace at the specified height.
265-
func (c *CelestiaBlobClient) RetrieveHeaders(ctx context.Context, height uint64) ResultRetrieve {
255+
func (c *CelestiaBlobClient) RetrieveHeaders(ctx context.Context, height uint64) datypes.ResultRetrieve {
266256
return c.Retrieve(ctx, height, c.namespaceBz)
267257
}
268258

269259
// RetrieveData retrieves blobs from the data namespace at the specified height.
270-
func (c *CelestiaBlobClient) RetrieveData(ctx context.Context, height uint64) ResultRetrieve {
260+
func (c *CelestiaBlobClient) RetrieveData(ctx context.Context, height uint64) datypes.ResultRetrieve {
271261
return c.Retrieve(ctx, height, c.dataNamespaceBz)
272262
}
273263

block/internal/da/celestia_client_test.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
celestia "github.com/evstack/ev-node/da/celestia"
1515
"github.com/evstack/ev-node/pkg/blob"
16+
datypes "github.com/evstack/ev-node/pkg/da/types"
1617
)
1718

1819
type mockCelestiaBlobAPI struct {
@@ -66,15 +67,15 @@ func TestCelestiaClient_Submit_ErrorMapping(t *testing.T) {
6667
testCases := []struct {
6768
name string
6869
err error
69-
wantStatus StatusCode
70+
wantStatus datypes.StatusCode
7071
}{
71-
{"timeout", ErrTxTimedOut, StatusNotIncludedInBlock},
72-
{"alreadyInMempool", ErrTxAlreadyInMempool, StatusAlreadyInMempool},
73-
{"seq", ErrTxIncorrectAccountSequence, StatusIncorrectAccountSequence},
74-
{"tooBig", ErrBlobSizeOverLimit, StatusTooBig},
75-
{"deadline", ErrContextDeadline, StatusContextDeadline},
76-
{"canceled", context.Canceled, StatusContextCanceled},
77-
{"other", errors.New("boom"), StatusError},
72+
{"timeout", datypes.ErrTxTimedOut, datypes.StatusNotIncludedInBlock},
73+
{"alreadyInMempool", datypes.ErrTxAlreadyInMempool, datypes.StatusAlreadyInMempool},
74+
{"seq", datypes.ErrTxIncorrectAccountSequence, datypes.StatusIncorrectAccountSequence},
75+
{"tooBig", datypes.ErrBlobSizeOverLimit, datypes.StatusTooBig},
76+
{"deadline", datypes.ErrContextDeadline, datypes.StatusContextDeadline},
77+
{"canceled", context.Canceled, datypes.StatusContextCanceled},
78+
{"other", errors.New("boom"), datypes.StatusError},
7879
}
7980

8081
for _, tc := range testCases {
@@ -101,7 +102,7 @@ func TestCelestiaClient_Submit_Success(t *testing.T) {
101102
DataNamespace: "ns",
102103
})
103104
res := cl.Submit(context.Background(), [][]byte{[]byte("data")}, ns, nil)
104-
require.Equal(t, StatusSuccess, res.Code)
105+
require.Equal(t, datypes.StatusSuccess, res.Code)
105106
require.Equal(t, uint64(10), res.Height)
106107
require.Len(t, res.IDs, 1)
107108
}
@@ -115,20 +116,20 @@ func TestCelestiaClient_Submit_InvalidNamespace(t *testing.T) {
115116
DataNamespace: "ns",
116117
})
117118
res := cl.Submit(context.Background(), [][]byte{[]byte("data")}, []byte{0x01, 0x02}, nil)
118-
require.Equal(t, StatusError, res.Code)
119+
require.Equal(t, datypes.StatusError, res.Code)
119120
}
120121

121122
func TestCelestiaClient_Retrieve_NotFound(t *testing.T) {
122123
ns := share.MustNewV0Namespace([]byte("ns")).Bytes()
123-
mockAPI := &mockCelestiaBlobAPI{submitErr: ErrBlobNotFound}
124+
mockAPI := &mockCelestiaBlobAPI{submitErr: datypes.ErrBlobNotFound}
124125
cl := NewCelestiaBlob(CelestiaBlobConfig{
125126
Celestia: makeCelestiaClient(mockAPI),
126127
Logger: zerolog.Nop(),
127128
Namespace: "ns",
128129
DataNamespace: "ns",
129130
})
130131
res := cl.Retrieve(context.Background(), 5, ns)
131-
require.Equal(t, StatusNotFound, res.Code)
132+
require.Equal(t, datypes.StatusNotFound, res.Code)
132133
}
133134

134135
func TestCelestiaClient_Retrieve_Success(t *testing.T) {
@@ -143,7 +144,7 @@ func TestCelestiaClient_Retrieve_Success(t *testing.T) {
143144
DataNamespace: "ns",
144145
})
145146
res := cl.Retrieve(context.Background(), 7, ns)
146-
require.Equal(t, StatusSuccess, res.Code)
147+
require.Equal(t, datypes.StatusSuccess, res.Code)
147148
require.Len(t, res.Data, 1)
148149
require.Len(t, res.IDs, 1)
149150
}
@@ -163,5 +164,5 @@ func TestCelestiaClient_SubmitOptionsMerge(t *testing.T) {
163164
require.NoError(t, err)
164165

165166
res := cl.Submit(context.Background(), [][]byte{[]byte("data")}, ns, raw)
166-
require.Equal(t, StatusSuccess, res.Code)
167+
require.Equal(t, datypes.StatusSuccess, res.Code)
167168
}

0 commit comments

Comments
 (0)