Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ type Config struct {
DataLayerType string
// DataLayerDBPath is the path for the database. For "versioned_leveldb", this is a base directory.
DataLayerDBPath string
// DataLayerNumOfVersions specifies how many historical versions to keep. Only used by "versioned_leveldb".
// DataLayerNumOfVersions specifies how many historical versions to keep per application.
// Each app's version history is pruned independently when it exceeds this limit.
// Only used by "versioned_leveldb".
DataLayerNumOfVersions int

// DeanonymizationReportPath is the path to a folder where to store deanonymization reports.
Expand Down
45 changes: 25 additions & 20 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func (m *SecureProcessorManager) processRequestFromChain(ctx context.Context) er
return nil
}

localStateRoot, err := m.dataLayer.LastVersionID()
localStateRoot, err := m.dataLayer.LastVersionID(admittedAppID)
if err != nil {
if dbErr, ok := err.(*storageErrors.Error); ok && dbErr.Code == storageErrors.NoVersionInDb {
localStateRoot = make([]byte, 32) // Initialize to zero state root if no version exists
Expand Down Expand Up @@ -555,7 +555,7 @@ func (m *SecureProcessorManager) processRequestFromChain(ctx context.Context) er
return nil
}
m.log.Info("Manager: REORG not solved within timeout => Rollback the DB")
if err := m.dataLayer.Rollback(stateRoot[:]); err != nil {
if err := m.dataLayer.Rollback(admittedAppID, stateRoot[:]); err != nil {
m.log.Error("Manager: Error while rolling back the DB: %v", err)
return fmt.Errorf("fatal: rollback failed: %w", err)
}
Expand Down Expand Up @@ -598,7 +598,7 @@ func (m *SecureProcessorManager) checkIfReorg(stateRoot [32]byte) (bool, error)
return true, nil
}

oldVersions, err := m.dataLayer.ListVersions()
oldVersions, err := m.dataLayer.ListVersions(admittedAppID)
if err != nil {
m.log.Error("Manager: Failed to get db old versions: %v", err)
return false, err
Expand Down Expand Up @@ -635,7 +635,7 @@ func (m *SecureProcessorManager) submitStateOnChain(ctx context.Context, updateP
m.log.Error("Failed to submit state update for error: %v", err)
if updatePayload.ErrorCode == 0 {
m.log.Info("Rollback the application state to previous version")
if err := m.dataLayer.Rollback(updatePayload.PrevStateRoot[:]); err != nil {
if err := m.dataLayer.Rollback(admittedAppID, updatePayload.PrevStateRoot[:]); err != nil {
// If this happens, the local db and the chain are out of sync and cannot be recovered automatically.
// Log and return err to let REORG detection handle it on the next poll.
m.log.Error("Failed to rollback application state: %v. Will retry via REORG detection.", err)
Expand Down Expand Up @@ -707,13 +707,7 @@ func (m *SecureProcessorManager) processDeployApp(ctx context.Context, req *comm
}

// Store the application state and WASM bytecode
versionID := appState.StateRoot[:]
err = m.dataLayer.Store(
ctx,
versionID[:],
[]*common.ApplicationState{appState},
[]*common.WASMData{{ApplicationID: appState.ApplicationID, Bytecode: wasmModule}},
)
err = m.initAppStorage(ctx, appState, wasmModule)
if err != nil {
m.log.Error("failed to submit state update: %v", err)
return err
Expand Down Expand Up @@ -780,15 +774,7 @@ func (m *SecureProcessorManager) processProcessRequest(ctx context.Context, req
}

// Store the updated application state
versionID := updatedState.StateRoot[:]
m.log.Info("VersionID %x", versionID[:])

err = m.dataLayer.Store(
ctx,
versionID[:],
[]*common.ApplicationState{updatedState},
nil,
)
err = m.storeStateToStorage(ctx, updatedState)
if err != nil {
m.log.Error("failed to submit state update: %v", err)
return err
Expand All @@ -797,6 +783,25 @@ func (m *SecureProcessorManager) processProcessRequest(ctx context.Context, req
return m.submitStateOnChain(ctx, updatePayload)
}

// initAppStorage stores the application state and WASM bytecode for a newly deployed application.
func (m *SecureProcessorManager) initAppStorage(ctx context.Context, state *common.ApplicationState, wasmModule []byte) error {
versionID := state.StateRoot[:]
m.log.Info("Storing app state and WASM, versionID %x", versionID)
return m.dataLayer.StoreWithWasm(
ctx,
versionID,
state,
&common.WASMData{ApplicationID: state.ApplicationID, Bytecode: wasmModule},
)
}

// storeStateToStorage stores the updated application state after processing a request.
func (m *SecureProcessorManager) storeStateToStorage(ctx context.Context, state *common.ApplicationState) error {
versionID := state.StateRoot[:]
m.log.Info("Storing app state, versionID %x", versionID)
return m.dataLayer.Store(ctx, versionID, state)
}

// saveDeanonymizationReport saves a deanonymization report to the filesystem
func (m *SecureProcessorManager) saveDeanonymizationReport(report *common.DeanonymizationReport, req *common.Request) error {
if err := os.MkdirAll(m.config.DeanonymizationReportPath, 0755); err != nil {
Expand Down
32 changes: 16 additions & 16 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func TestProcessDeployAppWithFailure(t *testing.T) {
require.NoError(t, err)

// Test that if it is a failure payload returned by the executor, submitStateOnChain is called but the state is not stored in the data layer
manager.dataLayer.(*mockdb.MockDataLayer).AddMockedFunc("Store", func(context.Context, []byte, []*common.ApplicationState, []*common.WASMData) error {
manager.dataLayer.(*mockdb.MockDataLayer).AddMockedFunc("Store", func(context.Context, []byte, *common.ApplicationState, *common.WASMData) error {
t.Fatal("Store should not be called if the executor returned a failure payload")
return nil
})
Expand Down Expand Up @@ -613,7 +613,7 @@ func TestProcessDeployAppWithErrors(t *testing.T) {

// Test data layer failure. In this case, it shouldn't call stateUpdate on chain and it returns the error
expectedError = "failed to store state"
manager.dataLayer.(*mockdb.MockDataLayer).AddMockedFunc("Store", func(context.Context, []byte, []*common.ApplicationState, []*common.WASMData) error {
manager.dataLayer.(*mockdb.MockDataLayer).AddMockedFunc("Store", func(context.Context, []byte, *common.ApplicationState, *common.WASMData) error {
return fmt.Errorf("%s", expectedError)
})

Expand All @@ -640,7 +640,7 @@ func TestProcessDeployAppWithErrors(t *testing.T) {
failure = manager.processDeployApp(context.Background(), request)
require.Nil(t, failure)
// Check that the local db has been reverted to the initial state
_, err = manager.dataLayer.LastVersionID()
_, err = manager.dataLayer.LastVersionID(admittedAppID)
require.Error(t, err)
dbErr, ok := err.(*storageErrors.Error)
require.True(t, ok && dbErr.Code == storageErrors.NoVersionInDb)
Expand All @@ -659,7 +659,7 @@ func TestProcessDeployAppWithErrors(t *testing.T) {
require.Error(t, err)
require.ErrorContains(t, err, expectedError)
// Check that the local db has been reverted to the initial state
_, err = manager.dataLayer.LastVersionID()
_, err = manager.dataLayer.LastVersionID(admittedAppID)
require.Error(t, err)
dbErr, ok = err.(*storageErrors.Error)
require.True(t, ok && dbErr.Code == storageErrors.NoVersionInDb)
Expand Down Expand Up @@ -881,7 +881,7 @@ func TestProcessProcessRequestWithFailure(t *testing.T) {
require.NoError(t, err)

// Test that if it is a failure payload returned by the executor, submitStateOnChain is called but the state is not stored in the data layer
manager.dataLayer.(*mockdb.MockDataLayer).AddMockedFunc("Store", func(context.Context, []byte, []*common.ApplicationState, []*common.WASMData) error {
manager.dataLayer.(*mockdb.MockDataLayer).AddMockedFunc("Store", func(context.Context, []byte, *common.ApplicationState, *common.WASMData) error {
t.Fatal("Store should not be called if the executor returned a failure payload")
return nil
})
Expand Down Expand Up @@ -920,7 +920,7 @@ func TestProcessProcessRequestWithErrors(t *testing.T) {
completedRequests := mockBCClient.GetCompletedRequests()
require.Equal(t, 1, len(completedRequests), "expected 1 completed request")

oldDbVersion, err := manager.dataLayer.LastVersionID()
oldDbVersion, err := manager.dataLayer.LastVersionID(admittedAppID)
require.NoError(t, err)

// Simulate application state not found. In this case, it should call SendProcessRequest and return a failure payload, then submitStateOnChain is called but the state is not stored in the data layer
Expand Down Expand Up @@ -992,7 +992,7 @@ func TestProcessProcessRequestWithErrors(t *testing.T) {

// Test data layer failure, stop processing and return the error
expectedError = "failed to store state"
manager.dataLayer.(*mockdb.MockDataLayer).AddMockedFunc("Store", func(context.Context, []byte, []*common.ApplicationState, []*common.WASMData) error {
manager.dataLayer.(*mockdb.MockDataLayer).AddMockedFunc("Store", func(context.Context, []byte, *common.ApplicationState, *common.WASMData) error {
return errors.New(expectedError)
})

Expand All @@ -1019,7 +1019,7 @@ func TestProcessProcessRequestWithErrors(t *testing.T) {
failure = manager.processProcessRequest(context.Background(), request)
require.Nil(t, failure)
// Check that the local db has been reverted to the initial state
newDbVersion, err := manager.dataLayer.LastVersionID()
newDbVersion, err := manager.dataLayer.LastVersionID(admittedAppID)
require.NoError(t, err)
require.Equal(t, oldDbVersion, newDbVersion)

Expand All @@ -1038,7 +1038,7 @@ func TestProcessProcessRequestWithErrors(t *testing.T) {
require.Contains(t, failure.Error(), expectedError)

// Check that the local db has been reverted to the initial state
newDbVersion, err = manager.dataLayer.LastVersionID()
newDbVersion, err = manager.dataLayer.LastVersionID(admittedAppID)
require.NoError(t, err)
require.Equal(t, oldDbVersion, newDbVersion)

Expand All @@ -1059,7 +1059,7 @@ func TestProcessProcessRequestWithErrors(t *testing.T) {
require.Contains(t, failure.Error(), expectedError)

// Check that the local db has been reverted to the initial state
newDbVersion, err = manager.dataLayer.LastVersionID()
newDbVersion, err = manager.dataLayer.LastVersionID(admittedAppID)
require.NoError(t, err)
require.Equal(t, oldDbVersion, newDbVersion)

Expand Down Expand Up @@ -1134,7 +1134,7 @@ func TestProcessRequestFromChainWithReorgs(t *testing.T) {
completedRequests = mockBCClient.GetCompletedRequests()
require.Equal(t, 1, len(completedRequests), "expected 1 completed request")

db_version, err := manager.dataLayer.LastVersionID()
db_version, err := manager.dataLayer.LastVersionID(admittedAppID)
require.NoError(t, err)

nextPendingReq, stateRootOnChain1, err := mockBCClient.GetNextPendingRequest(context.Background())
Expand All @@ -1150,7 +1150,7 @@ func TestProcessRequestFromChainWithReorgs(t *testing.T) {
completedRequests = mockBCClient.GetCompletedRequests()
require.Equal(t, 2, len(completedRequests), "expected 2 completed requests")

db_version, err = manager.dataLayer.LastVersionID()
db_version, err = manager.dataLayer.LastVersionID(admittedAppID)
require.NoError(t, err)

nextPendingReq, stateRootOnChain2, err := mockBCClient.GetNextPendingRequest(context.Background())
Expand Down Expand Up @@ -1200,7 +1200,7 @@ func TestProcessRequestFromChainWithReorgs(t *testing.T) {
completedRequests = mockBCClient.GetCompletedRequests()
require.Equal(t, 3, len(completedRequests), "expected 3 completed requests")

db_version, err = manager.dataLayer.LastVersionID()
db_version, err = manager.dataLayer.LastVersionID(admittedAppID)
require.NoError(t, err)

_, stateRootOnChain3, err := mockBCClient.GetNextPendingRequest(context.Background())
Expand Down Expand Up @@ -1246,7 +1246,7 @@ func TestProcessRequestFromChainWithReorgs(t *testing.T) {
completedRequests = mockBCClient.GetCompletedRequests()
require.Equal(t, 1, len(completedRequests), "expected 1 completed request")

db_version, err = manager.dataLayer.LastVersionID()
db_version, err = manager.dataLayer.LastVersionID(admittedAppID)
require.NoError(t, err)

_, stateRootOnChain, err := mockBCClient.GetNextPendingRequest(context.Background())
Expand Down Expand Up @@ -1293,7 +1293,7 @@ func TestProcessRequestFromChainWithErrors(t *testing.T) {
// Check that if LastVersionID returns an error, processRequestFromChain doesn't execute the request and doesn't return an error
//**********************

manager.dataLayer.(*mockdb.MockDataLayer).AddMockedFunc("LastVersionID", func() ([]byte, error) {
manager.dataLayer.(*mockdb.MockDataLayer).AddMockedFunc("LastVersionID", func(common.ApplicationIdType) ([]byte, error) {
return nil, fmt.Errorf("LastVersionID error")
})

Expand All @@ -1305,7 +1305,7 @@ func TestProcessRequestFromChainWithErrors(t *testing.T) {
//**********************
// Check that if ListVersions returns an error, processRequestFromChain doesn't execute the request and doesn't return an error
//**********************
manager.dataLayer.(*mockdb.MockDataLayer).AddMockedFunc("ListVersions", func() ([][]byte, error) {
manager.dataLayer.(*mockdb.MockDataLayer).AddMockedFunc("ListVersions", func(common.ApplicationIdType) ([][]byte, error) {
return nil, fmt.Errorf("ListVersions error")
})

Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type DataLayerConfig struct {
Type string
DBPath string
NumVersions int
NumVersions int // Maximum number of historical versions to keep per application.
}

const (
Expand All @@ -30,6 +30,9 @@ func NewDataLayer(cfg DataLayerConfig) (storage.DataLayer, error) {
if strings.TrimSpace(cfg.DBPath) == "" {
return nil, fmt.Errorf("data layer path is empty")
}
if cfg.NumVersions <= 0 {
return nil, fmt.Errorf("NumVersions must be positive, got %d", cfg.NumVersions)
}
levelCfg := versionedDb.VersionedLevelDBConfig{
DBPath: cfg.DBPath,
VersionsToKeep: cfg.NumVersions,
Expand Down
34 changes: 24 additions & 10 deletions pkg/storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,38 @@ import (
// ApplicationStateStore defines the interface for managing versioned application state,
// including WebAssembly (WASM) bytecode and application data. It supports atomic
// storage of application data, rollbacks, and retrieval of specific versions.
// Versioning is per-application: each application maintains its own independent
// version chain, enabling rollback for one app without affecting others.
type ApplicationStateStore interface {
// Store atomically saves the application state and WASM bytecode for a given version.
// This allows for reliable versioning and the ability to revert to a previous state if needed.
// Store saves the application state for a given version.
// The version is filed under the application that produced the data.
// state must not be nil.
Store(
ctx context.Context,
versionID []byte,
stateArray []*common.ApplicationState,
wasmArray []*common.WASMData,
state *common.ApplicationState,
) error

// Rollback reverts the application state to the specified versionID.
Rollback(versionID []byte) error
// StoreWithWasm atomically saves the application state and WASM bytecode for a given version.
// The version is filed under the application that produced the data.
// Both state and wasm must not be nil and must share the same ApplicationID.
StoreWithWasm(
ctx context.Context,
versionID []byte,
state *common.ApplicationState,
wasm *common.WASMData,
) error

// Rollback reverts the application state for the given app to the specified versionID.
// Only ChangeSets belonging to this app are reverted.
Rollback(appID common.ApplicationIdType, versionID []byte) error

// LastVersionID returns the most recent version ID stored in the database.
LastVersionID() ([]byte, error)
// LastVersionID returns the most recent version ID for the given application.
LastVersionID(appID common.ApplicationIdType) ([]byte, error)

// ListVersions returns a list of all stored version IDs.
ListVersions() ([][]byte, error)
// ListVersions returns a list of all stored version IDs for the given application,
// ordered from most recent to oldest (LIFO).
ListVersions(appID common.ApplicationIdType) ([][]byte, error)

// GetApplicationState retrieves the state of a specific application by its ID.
GetApplicationState(ctx context.Context, applicationID common.ApplicationIdType) (*common.ApplicationState, error)
Expand Down
Loading
Loading