Skip to content

Commit 223f57e

Browse files
committed
move exchange to a single place
1 parent 4930acf commit 223f57e

5 files changed

Lines changed: 494 additions & 32 deletions

File tree

.mockery.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ packages:
5151
dir: ./test/mocks
5252
pkgname: mocks
5353
filename: external/hstore.go
54+
github.com/evstack/ev-node/pkg/sync:
55+
interfaces:
56+
P2PExchange:
57+
config:
58+
dir: ./test/mocks
59+
pkgname: mocks
60+
filename: external/p2pexchange.go
5461
github.com/evstack/ev-node/block/internal/syncing:
5562
interfaces:
5663
DARetriever:

pkg/sync/exchange_wrapper.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,15 @@ import (
1010
type storeGetter[H header.Header[H]] func(context.Context, store.Store, header.Hash) (H, error)
1111
type storeGetterByHeight[H header.Header[H]] func(context.Context, store.Store, uint64) (H, error)
1212

13-
type exchangeWrapper[H header.Header[H]] struct {
13+
// P2PExchange defines the interface for the underlying P2P exchange.
14+
type P2PExchange[H header.Header[H]] interface {
1415
header.Exchange[H]
16+
Start(context.Context) error
17+
Stop(context.Context) error
18+
}
19+
20+
type exchangeWrapper[H header.Header[H]] struct {
21+
p2pExchange P2PExchange[H]
1522
daStore store.Store
1623
getter storeGetter[H]
1724
getterByHeight storeGetterByHeight[H]
@@ -26,7 +33,7 @@ func (ew *exchangeWrapper[H]) Get(ctx context.Context, hash header.Hash) (H, err
2633
}
2734

2835
// Fallback to network exchange
29-
return ew.Exchange.Get(ctx, hash)
36+
return ew.p2pExchange.Get(ctx, hash)
3037
}
3138

3239
func (ew *exchangeWrapper[H]) GetByHeight(ctx context.Context, height uint64) (H, error) {
@@ -38,5 +45,21 @@ func (ew *exchangeWrapper[H]) GetByHeight(ctx context.Context, height uint64) (H
3845
}
3946

4047
// Fallback to network exchange
41-
return ew.Exchange.GetByHeight(ctx, height)
48+
return ew.p2pExchange.GetByHeight(ctx, height)
49+
}
50+
51+
func (ew *exchangeWrapper[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (H, error) {
52+
return ew.p2pExchange.Head(ctx, opts...)
53+
}
54+
55+
func (ew *exchangeWrapper[H]) GetRangeByHeight(ctx context.Context, from H, to uint64) ([]H, error) {
56+
return ew.p2pExchange.GetRangeByHeight(ctx, from, to)
57+
}
58+
59+
func (ew *exchangeWrapper[H]) Start(ctx context.Context) error {
60+
return ew.p2pExchange.Start(ctx)
61+
}
62+
63+
func (ew *exchangeWrapper[H]) Stop(ctx context.Context) error {
64+
return ew.p2pExchange.Stop(ctx)
4265
}

pkg/sync/exchange_wrapper_test.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,17 @@ func TestExchangeWrapper_Get(t *testing.T) {
1919
expectedHeader := &types.SignedHeader{} // Just a dummy
2020

2121
t.Run("Hit in Store", func(t *testing.T) {
22-
mockEx := extmocks.NewMockExchange[*types.SignedHeader](t)
22+
mockEx := extmocks.NewMockP2PExchange[*types.SignedHeader](t)
2323
// Exchange should NOT be called
2424

2525
getter := func(ctx context.Context, s store.Store, h header.Hash) (*types.SignedHeader, error) {
2626
return expectedHeader, nil
2727
}
2828

2929
ew := &exchangeWrapper[*types.SignedHeader]{
30-
Exchange: mockEx,
31-
daStore: mocks.NewMockStore(t),
32-
getter: getter,
30+
p2pExchange: mockEx,
31+
daStore: mocks.NewMockStore(t),
32+
getter: getter,
3333
}
3434

3535
h, err := ew.Get(ctx, hash)
@@ -38,17 +38,17 @@ func TestExchangeWrapper_Get(t *testing.T) {
3838
})
3939

4040
t.Run("Miss in Store", func(t *testing.T) {
41-
mockEx := extmocks.NewMockExchange[*types.SignedHeader](t)
42-
mockEx.On("Get", ctx, hash).Return(expectedHeader, nil)
41+
mockEx := extmocks.NewMockP2PExchange[*types.SignedHeader](t)
42+
mockEx.EXPECT().Get(ctx, hash).Return(expectedHeader, nil)
4343

4444
getter := func(ctx context.Context, s store.Store, h header.Hash) (*types.SignedHeader, error) {
4545
return nil, errors.New("not found")
4646
}
4747

4848
ew := &exchangeWrapper[*types.SignedHeader]{
49-
Exchange: mockEx,
50-
daStore: mocks.NewMockStore(t),
51-
getter: getter,
49+
p2pExchange: mockEx,
50+
daStore: mocks.NewMockStore(t),
51+
getter: getter,
5252
}
5353

5454
h, err := ew.Get(ctx, hash)
@@ -57,13 +57,13 @@ func TestExchangeWrapper_Get(t *testing.T) {
5757
})
5858

5959
t.Run("Store Not Configured", func(t *testing.T) {
60-
mockEx := extmocks.NewMockExchange[*types.SignedHeader](t)
61-
mockEx.On("Get", ctx, hash).Return(expectedHeader, nil)
60+
mockEx := extmocks.NewMockP2PExchange[*types.SignedHeader](t)
61+
mockEx.EXPECT().Get(ctx, hash).Return(expectedHeader, nil)
6262

6363
ew := &exchangeWrapper[*types.SignedHeader]{
64-
Exchange: mockEx,
65-
daStore: nil, // No store
66-
getter: nil,
64+
p2pExchange: mockEx,
65+
daStore: nil, // No store
66+
getter: nil,
6767
}
6868

6969
h, err := ew.Get(ctx, hash)
@@ -78,14 +78,14 @@ func TestExchangeWrapper_GetByHeight(t *testing.T) {
7878
expectedHeader := &types.SignedHeader{}
7979

8080
t.Run("Hit in Store", func(t *testing.T) {
81-
mockEx := extmocks.NewMockExchange[*types.SignedHeader](t)
81+
mockEx := extmocks.NewMockP2PExchange[*types.SignedHeader](t)
8282

8383
getterByHeight := func(ctx context.Context, s store.Store, h uint64) (*types.SignedHeader, error) {
8484
return expectedHeader, nil
8585
}
8686

8787
ew := &exchangeWrapper[*types.SignedHeader]{
88-
Exchange: mockEx,
88+
p2pExchange: mockEx,
8989
daStore: mocks.NewMockStore(t),
9090
getterByHeight: getterByHeight,
9191
}
@@ -96,15 +96,15 @@ func TestExchangeWrapper_GetByHeight(t *testing.T) {
9696
})
9797

9898
t.Run("Miss in Store", func(t *testing.T) {
99-
mockEx := extmocks.NewMockExchange[*types.SignedHeader](t)
100-
mockEx.On("GetByHeight", ctx, height).Return(expectedHeader, nil)
99+
mockEx := extmocks.NewMockP2PExchange[*types.SignedHeader](t)
100+
mockEx.EXPECT().GetByHeight(ctx, height).Return(expectedHeader, nil)
101101

102102
getterByHeight := func(ctx context.Context, s store.Store, h uint64) (*types.SignedHeader, error) {
103103
return nil, errors.New("not found")
104104
}
105105

106106
ew := &exchangeWrapper[*types.SignedHeader]{
107-
Exchange: mockEx,
107+
p2pExchange: mockEx,
108108
daStore: mocks.NewMockStore(t),
109109
getterByHeight: getterByHeight,
110110
}

pkg/sync/sync_service.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,8 @@ type SyncService[H header.Header[H]] struct {
4949

5050
p2p *p2p.Client
5151

52-
ex header.Exchange[H]
53-
p2pExchange *goheaderp2p.Exchange[H]
54-
sub *goheaderp2p.Subscriber[H]
52+
ex *exchangeWrapper[H]
53+
sub *goheaderp2p.Subscriber[H]
5554
p2pServer *goheaderp2p.ExchangeServer[H]
5655
store *goheaderstore.Store[H]
5756
daStore store.Store
@@ -312,20 +311,21 @@ func (syncService *SyncService[H]) setupP2PInfrastructure(ctx context.Context) (
312311

313312
peerIDs := syncService.getPeerIDs()
314313

315-
if syncService.p2pExchange, err = newP2PExchange[H](syncService.p2p.Host(), peerIDs, networkID, syncService.genesis.ChainID, syncService.p2p.ConnectionGater()); err != nil {
314+
p2pExchange, err := newP2PExchange[H](syncService.p2p.Host(), peerIDs, networkID, syncService.genesis.ChainID, syncService.p2p.ConnectionGater())
315+
if err != nil {
316316
return nil, fmt.Errorf("error while creating exchange: %w", err)
317317
}
318-
if err := syncService.p2pExchange.Start(ctx); err != nil {
319-
return nil, fmt.Errorf("error while starting exchange: %w", err)
320-
}
321318

322-
// Wrap the exchange with the DA store check
319+
// Create exchange wrapper with DA store check
323320
syncService.ex = &exchangeWrapper[H]{
324-
Exchange: syncService.p2pExchange,
321+
p2pExchange: p2pExchange,
325322
daStore: syncService.daStore,
326323
getter: syncService.getter,
327324
getterByHeight: syncService.getterByHeight,
328325
}
326+
if err := syncService.ex.Start(ctx); err != nil {
327+
return nil, fmt.Errorf("error while starting exchange: %w", err)
328+
}
329329

330330
return peerIDs, nil
331331
}
@@ -425,7 +425,7 @@ func (syncService *SyncService[H]) Stop(ctx context.Context) error {
425425
syncService.topicSubscription.Cancel()
426426
err := errors.Join(
427427
syncService.p2pServer.Stop(ctx),
428-
syncService.p2pExchange.Stop(ctx),
428+
syncService.ex.Stop(ctx),
429429
syncService.sub.Stop(ctx),
430430
)
431431
if syncService.syncerStatus.isStarted() {

0 commit comments

Comments
 (0)