Skip to content

Commit 8840576

Browse files
committed
Add new fields to block and transaction that are introduced w/ pos
1 parent fdf5042 commit 8840576

14 files changed

+389
-27
lines changed

entries/block.go

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//go:build relic
2+
13
package entries
24

35
import (
@@ -12,14 +14,23 @@ import (
1214
)
1315

1416
type BlockEntry struct {
15-
BlockHash string `pg:",pk,use_zero"`
16-
PrevBlockHash string
17-
TxnMerkleRoot string
18-
Timestamp time.Time
19-
Height uint64
20-
Nonce uint64
21-
ExtraNonce uint64
22-
BadgerKey []byte `pg:",use_zero"`
17+
BlockHash string `pg:",pk,use_zero"`
18+
PrevBlockHash string
19+
TxnMerkleRoot string
20+
Timestamp time.Time
21+
Height uint64
22+
Nonce uint64
23+
ExtraNonce uint64
24+
BlockVersion uint32
25+
TxnConnectStatusByIndexHash string `pg:",use_zero"`
26+
ProposerPublicKey string `pg:",use_zero"`
27+
ProposerVotingPublicKey string `pg:",use_zero"`
28+
ProposerRandomSeedSignature string `pg:",use_zero"`
29+
ProposedInView uint64
30+
ProposerVotePartialSignature string `pg:",use_zero"`
31+
// TODO: Quorum Certificates. Separate entry.
32+
33+
BadgerKey []byte `pg:",use_zero"`
2334
}
2435

2536
type PGBlockEntry struct {
@@ -28,18 +39,34 @@ type PGBlockEntry struct {
2839
}
2940

3041
// Convert the UserAssociation DeSo encoder to the PG struct used by bun.
31-
func BlockEncoderToPGStruct(block *lib.MsgDeSoBlock, keyBytes []byte) *PGBlockEntry {
42+
func BlockEncoderToPGStruct(block *lib.MsgDeSoBlock, keyBytes []byte, params *lib.DeSoParams) *PGBlockEntry {
3243
blockHash, _ := block.Hash()
44+
var txnConnectStatusByIndexHash string
45+
if block.Header.TxnConnectStatusByIndexHash != nil {
46+
txnConnectStatusByIndexHash = hex.EncodeToString(block.Header.TxnConnectStatusByIndexHash.ToBytes())
47+
}
48+
var proposerPublicKey string
49+
if block.Header.ProposerPublicKey != nil {
50+
proposerPublicKey = consumer.PublicKeyBytesToBase58Check(
51+
block.Header.ProposerPublicKey.ToBytes(), params)
52+
}
3353
return &PGBlockEntry{
3454
BlockEntry: BlockEntry{
35-
BlockHash: hex.EncodeToString(blockHash[:]),
36-
PrevBlockHash: hex.EncodeToString(block.Header.PrevBlockHash[:]),
37-
TxnMerkleRoot: hex.EncodeToString(block.Header.TransactionMerkleRoot[:]),
38-
Timestamp: consumer.UnixNanoToTime(uint64(block.Header.TstampNanoSecs)),
39-
Height: block.Header.Height,
40-
Nonce: block.Header.Nonce,
41-
ExtraNonce: block.Header.ExtraNonce,
42-
BadgerKey: keyBytes,
55+
BlockHash: hex.EncodeToString(blockHash[:]),
56+
PrevBlockHash: hex.EncodeToString(block.Header.PrevBlockHash[:]),
57+
TxnMerkleRoot: hex.EncodeToString(block.Header.TransactionMerkleRoot[:]),
58+
Timestamp: consumer.UnixNanoToTime(uint64(block.Header.TstampNanoSecs)),
59+
Height: block.Header.Height,
60+
Nonce: block.Header.Nonce,
61+
ExtraNonce: block.Header.ExtraNonce,
62+
BlockVersion: block.Header.Version,
63+
TxnConnectStatusByIndexHash: txnConnectStatusByIndexHash,
64+
ProposerPublicKey: proposerPublicKey,
65+
ProposerVotingPublicKey: block.Header.ProposerVotingPublicKey.ToString(),
66+
ProposerRandomSeedSignature: block.Header.ProposerRandomSeedSignature.ToString(),
67+
ProposedInView: block.Header.ProposedInView,
68+
ProposerVotePartialSignature: block.Header.ProposerVotePartialSignature.ToString(),
69+
BadgerKey: keyBytes,
4370
},
4471
}
4572
}
@@ -77,10 +104,16 @@ func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation
77104

78105
for _, entry := range uniqueBlocks {
79106
block := entry.Encoder.(*lib.MsgDeSoBlock)
80-
blockEntry := BlockEncoderToPGStruct(block, entry.KeyBytes)
107+
blockEntry := BlockEncoderToPGStruct(block, entry.KeyBytes, params)
81108
pgBlockEntrySlice = append(pgBlockEntrySlice, blockEntry)
82109
for jj, transaction := range block.Txns {
83-
pgTransactionEntry, err := TransactionEncoderToPGStruct(transaction, uint64(jj), blockEntry.BlockHash, blockEntry.Height, blockEntry.Timestamp, params)
110+
// Check if the transaction connects or not.
111+
txnConnects := blockEntry.Height < uint64(params.ForkHeights.ProofOfStake2ConsensusCutoverBlockHeight) ||
112+
jj == 0 || block.TxnConnectStatusByIndex.Get(jj-1)
113+
pgTransactionEntry, err := TransactionEncoderToPGStruct(
114+
transaction, uint64(jj), blockEntry.BlockHash, blockEntry.Height, blockEntry.Timestamp, txnConnects,
115+
params,
116+
)
84117
if err != nil {
85118
return errors.Wrapf(err, "entries.bulkInsertBlockEntry: Problem converting transaction to PG struct")
86119
}

entries/stake_reward.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package entries
2+
3+
import (
4+
"github.com/deso-protocol/core/lib"
5+
"github.com/deso-protocol/state-consumer/consumer"
6+
"github.com/uptrace/bun"
7+
)
8+
9+
type StakeReward struct {
10+
StakerPKID string `bun:",nullzero"`
11+
ValidatorPKID string `bun:",nullzero"`
12+
RewardMethod lib.StakingRewardMethod // TODO: we probably want this to be human readable?
13+
RewardNanos uint64 `pg:",use_zero"`
14+
IsValidatorCommission bool
15+
BlockHash string
16+
17+
UtxoOpIndex uint64 `pg:",use_zero"`
18+
}
19+
20+
type PGStakeReward struct {
21+
bun.BaseModel `bun:"table:stake_reward"`
22+
StakeReward
23+
}
24+
25+
// Convert the StakeRewardStateChangeMetadata DeSo encoder to the PGStakeReward struct used by bun.
26+
func StakeRewardEncoderToPGStruct(
27+
stakeReward *lib.StakeRewardStateChangeMetadata,
28+
params *lib.DeSoParams,
29+
blockHash string,
30+
utxoOpIndex uint64,
31+
) StakeReward {
32+
pgStakeReward := StakeReward{}
33+
34+
if stakeReward.StakerPKID != nil {
35+
pgStakeReward.StakerPKID = consumer.PublicKeyBytesToBase58Check((*stakeReward.StakerPKID)[:], params)
36+
}
37+
38+
if stakeReward.ValidatorPKID != nil {
39+
pgStakeReward.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*stakeReward.ValidatorPKID)[:], params)
40+
}
41+
42+
pgStakeReward.RewardMethod = stakeReward.StakingRewardMethod
43+
pgStakeReward.RewardNanos = stakeReward.RewardNanos
44+
pgStakeReward.IsValidatorCommission = stakeReward.IsValidatorCommission
45+
pgStakeReward.BlockHash = blockHash
46+
pgStakeReward.UtxoOpIndex = utxoOpIndex
47+
return pgStakeReward
48+
}

entries/transaction.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,24 @@ type TransactionEntry struct {
3333
IndexInBlock uint64
3434
BlockHeight uint64
3535
Timestamp time.Time `pg:",use_zero"`
36-
BadgerKey []byte `pg:",use_zero"`
36+
Connects bool
37+
BadgerKey []byte `pg:",use_zero"`
3738
}
3839

3940
type PGTransactionEntry struct {
4041
bun.BaseModel `bun:"table:transaction_partitioned"`
4142
TransactionEntry
4243
}
4344

44-
func TransactionEncoderToPGStruct(transaction *lib.MsgDeSoTxn, blockIndex uint64, blockHash string, blockHeight uint64, timestamp time.Time, params *lib.DeSoParams) (*PGTransactionEntry, error) {
45+
func TransactionEncoderToPGStruct(
46+
transaction *lib.MsgDeSoTxn,
47+
blockIndex uint64,
48+
blockHash string,
49+
blockHeight uint64,
50+
timestamp time.Time,
51+
connects bool,
52+
params *lib.DeSoParams,
53+
) (*PGTransactionEntry, error) {
4554

4655
var txInputs []map[string]string
4756
for _, input := range transaction.TxInputs {
@@ -86,6 +95,7 @@ func TransactionEncoderToPGStruct(transaction *lib.MsgDeSoTxn, blockIndex uint64
8695
IndexInBlock: blockIndex,
8796
BlockHeight: blockHeight,
8897
Timestamp: timestamp,
98+
Connects: connects,
8999
BadgerKey: transaction.Hash()[:],
90100
},
91101
}
@@ -127,7 +137,9 @@ func transformTransactionEntry(entries []*lib.StateChangeEntry, params *lib.DeSo
127137

128138
for _, entry := range uniqueTransactions {
129139
transaction := entry.Encoder.(*lib.MsgDeSoTxn)
130-
transactionEntry, err := TransactionEncoderToPGStruct(transaction, 0, "", 0, time.Now(), params)
140+
// Assuming transactions connect when using this function. We can only
141+
// tell if a transaction connects or not if we have the block.
142+
transactionEntry, err := TransactionEncoderToPGStruct(transaction, 0, "", 0, time.Now(), true, params)
131143
if err != nil {
132144
return nil, errors.Wrapf(err, "entries.transformAndBulkInsertTransactionEntry: Problem converting transaction to PG struct")
133145
}

entries/utxo_operation.go

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"github.com/deso-protocol/core/lib"
99
"github.com/deso-protocol/state-consumer/consumer"
10+
"github.com/golang/glog"
1011
"github.com/pkg/errors"
1112
"github.com/uptrace/bun"
1213
"time"
@@ -64,7 +65,7 @@ func ConvertUtxoOperationKeyToBlockHashHex(keyBytes []byte) string {
6465
return hex.EncodeToString(keyBytes[1:])
6566
}
6667

67-
// PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler
68+
// UtxoOperationBatchOperation is the entry point for processing a batch of utxo operations. It determines the appropriate handler
6869
// based on the operation type and executes it.
6970
func UtxoOperationBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
7071
// We check before we call this function that there is at least one operation type.
@@ -92,6 +93,7 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
9293
transactionUpdates := make([]*PGTransactionEntry, 0)
9394
affectedPublicKeys := make([]*PGAffectedPublicKeyEntry, 0)
9495
blockEntries := make([]*PGBlockEntry, 0)
96+
stakeRewardEntries := make([]*PGStakeReward, 0)
9597

9698
// Start timer to track how long it takes to insert the entries.
9799
start := time.Now()
@@ -113,14 +115,18 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
113115
blockHash := ConvertUtxoOperationKeyToBlockHashHex(entry.KeyBytes)
114116

115117
// Check to see if the state change entry has an attached block.
116-
// Note that this only happens during the iniltial sync, in order to speed up the sync process.
118+
// Note that this only happens during the initial sync, in order to speed up the sync process.
117119
if entry.Block != nil {
118120
insertTransactions = true
119121
block := entry.Block
120-
blockEntry := BlockEncoderToPGStruct(block, entry.KeyBytes)
122+
blockEntry := BlockEncoderToPGStruct(block, entry.KeyBytes, params)
121123
blockEntries = append(blockEntries, blockEntry)
122124
for ii, txn := range block.Txns {
123-
pgTxn, err := TransactionEncoderToPGStruct(txn, uint64(ii), blockEntry.BlockHash, blockEntry.Height, blockEntry.Timestamp, params)
125+
// Check if the transaction connects or not.
126+
txnConnects := blockEntry.Height < uint64(params.ForkHeights.ProofOfStake2ConsensusCutoverBlockHeight) ||
127+
ii == 0 || block.TxnConnectStatusByIndex.Get(ii-1)
128+
pgTxn, err := TransactionEncoderToPGStruct(
129+
txn, uint64(ii), blockEntry.BlockHash, blockEntry.Height, blockEntry.Timestamp, txnConnects, params)
124130
if err != nil {
125131
return errors.Wrapf(err, "entries.bulkInsertUtxoOperationsEntry: Problem converting transaction to PG struct")
126132
}
@@ -169,7 +175,7 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
169175
if err != nil {
170176
return fmt.Errorf("entries.bulkInsertUtxoOperationsEntry: Problem decoding transaction for entry %+v at block height %v", entry, entry.BlockHeight)
171177
}
172-
txIndexMetadata, err := consumer.ComputeTransactionMetadata(transaction, blockHash, &lib.DeSoMainnetParams, transaction.TxnFeeNanos, uint64(jj), utxoOps)
178+
txIndexMetadata, err := consumer.ComputeTransactionMetadata(transaction, blockHash, params, transaction.TxnFeeNanos, uint64(jj), utxoOps)
173179
if err != nil {
174180
return fmt.Errorf("entries.bulkInsertUtxoOperationsEntry: Problem computing transaction metadata for entry %+v at block height %v", entry, entry.BlockHeight)
175181
}
@@ -216,6 +222,27 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
216222
affectedPublicKeys = append(affectedPublicKeys, affectedPublicKeyEntry)
217223
}
218224
transactionUpdates = append(transactionUpdates, transactions[jj])
225+
} else if jj == len(transactions) {
226+
// TODO: parse utxo operations for the block level index.
227+
// Examples: deletion of expired nonces, staking rewards (restaked
228+
// + payed to balance), validator jailing, updating validator's
229+
// last active at epoch.
230+
for ii, utxoOp := range utxoOps {
231+
switch utxoOp.Type {
232+
case lib.OperationTypeStakeDistributionRestake, lib.OperationTypeStakeDistributionPayToBalance:
233+
stateChangeMetadata, ok := utxoOp.StateChangeMetadata.(*lib.StakeRewardStateChangeMetadata)
234+
if !ok {
235+
glog.Error("bulkInsertUtxoOperationsEntry: Problem with state change metadata for " +
236+
"stake rewards")
237+
continue
238+
}
239+
stakeReward := PGStakeReward{
240+
StakeReward: StakeRewardEncoderToPGStruct(stateChangeMetadata, params, blockHash, uint64(ii)),
241+
}
242+
stakeRewardEntries = append(stakeRewardEntries, &stakeReward)
243+
}
244+
}
245+
219246
}
220247
}
221248
// Print how long it took to insert the entries.
@@ -273,6 +300,18 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
273300
}
274301

275302
fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Inserted %v affected public keys in %v s\n", len(affectedPublicKeys), time.Since(start))
303+
304+
start = time.Now()
305+
306+
// Insert stake rewards into db
307+
if len(stakeRewardEntries) > 0 {
308+
_, err := db.NewInsert().Model(&stakeRewardEntries).On("CONFLICT (block_hash, utxo_op_index) DO UPDATE").Exec(context.Background())
309+
if err != nil {
310+
return errors.Wrapf(err, "InsertStakeRewards: Problem inserting stake rewards")
311+
}
312+
}
313+
fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Inserted %v stake rewards in %v s\n", len(stakeRewardEntries), time.Since(start))
314+
276315
return nil
277316
}
278317

main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ func getConfigValues() (pgURI string, stateChangeDir string, consumerProgressDir
108108
if stateChangeDir == "" {
109109
stateChangeDir = "/tmp/state-changes"
110110
}
111+
// Set the state change dir flag that core uses, so DeSoEncoders properly encode and decode state change metadata.
112+
viper.Set("state-change-dir", stateChangeDir)
111113

112114
consumerProgressDir = viper.GetString("CONSUMER_PROGRESS_DIR")
113115
if consumerProgressDir == "" {

migrations/initial_migrations/20231213000000_create_stake_entry_table.go renamed to migrations/initial_migrations/20231213000001_create_stake_entry_table.go

File renamed without changes.

migrations/initial_migrations/20231213000000_create_validator_entry_table.go renamed to migrations/initial_migrations/20231213000002_create_validator_entry_table.go

File renamed without changes.

migrations/initial_migrations/20231213000000_create_locked_stake_entry_table.go renamed to migrations/initial_migrations/20231213000003_create_locked_stake_entry_table.go

File renamed without changes.

migrations/initial_migrations/20240129000000_create_epoch_entry_table.go renamed to migrations/initial_migrations/20240129000003_create_epoch_entry_table.go

File renamed without changes.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package initial_migrations
2+
3+
import (
4+
"context"
5+
"github.com/uptrace/bun"
6+
)
7+
8+
// TODO: Not nullable fields
9+
func updateBlockTableWithPoSFields(db *bun.DB, tableName string) error {
10+
_, err := db.Exec(`
11+
ALTER TABLE block
12+
ADD COLUMN block_version BIGINT,
13+
ADD COLUMN txn_connect_status_by_index_hash VARCHAR,
14+
ADD COLUMN proposer_public_key VARCHAR,
15+
ADD COLUMN proposer_voting_public_key VARCHAR,
16+
ADD COLUMN proposer_random_seed_signature VARCHAR,
17+
ADD COLUMN proposed_in_view BIGINT,
18+
ADD COLUMN proposer_vote_partial_signature VARCHAR;
19+
`)
20+
// TODO: What other fields do we need indexed?
21+
return err
22+
}
23+
24+
func init() {
25+
Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error {
26+
return updateBlockTableWithPoSFields(db, "block")
27+
}, func(ctx context.Context, db *bun.DB) error {
28+
_, err := db.Exec(`
29+
ALTER TABLE block
30+
DROP COLUMN block_version,
31+
DROP COLUMN txn_connect_status_by_index_hash,
32+
DROP COLUMN proposer_public_key,
33+
DROP COLUMN proposer_voting_public_key,
34+
DROP COLUMN proposer_random_seed_signature,
35+
DROP COLUMN proposed_in_view,
36+
DROP COLUMN proposer_vote_partial_signature;
37+
`)
38+
if err != nil {
39+
return err
40+
}
41+
return nil
42+
})
43+
}

0 commit comments

Comments
 (0)