Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 71 additions & 29 deletions berachain-beacon-data/beacon-ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def setup_tables(self):
CREATE TABLE cosmos_blocks (
height BIGINT PRIMARY KEY,
hash VARCHAR(64),
time DATETIME,
time DATETIME(3),
proposer_address VARCHAR(40),
proposer_name VARCHAR(100),
chain_id VARCHAR(64),
Expand Down Expand Up @@ -458,7 +458,7 @@ def setup_tables(self):
raw_log TEXT,
status VARCHAR(20),
codespace VARCHAR(100),
timestamp DATETIME,
timestamp DATETIME(3),
INDEX idx_height (height),
INDEX idx_hash (hash)
) ENGINE=InnoDB
Expand All @@ -470,7 +470,7 @@ def setup_tables(self):
evidence_type VARCHAR(100),
validator_address VARCHAR(40),
total_voting_power BIGINT,
timestamp DATETIME,
timestamp DATETIME(3),
raw_data JSON,
INDEX idx_height (height),
INDEX idx_validator (validator_address)
Expand All @@ -492,7 +492,7 @@ def setup_tables(self):
status VARCHAR(20),
tokens DECIMAL(65,0),
delegator_shares DECIMAL(65,0),
update_time DATETIME,
update_time DATETIME(3),
INDEX idx_status (status)
) ENGINE=InnoDB
''',
Expand All @@ -501,7 +501,7 @@ def setup_tables(self):
id BIGINT AUTO_INCREMENT PRIMARY KEY,
height BIGINT,
validator_address VARCHAR(40),
timestamp DATETIME,
timestamp DATETIME(3),
signature TEXT,
block_id_flag INT,
voting_power BIGINT,
Expand Down Expand Up @@ -529,7 +529,22 @@ def setup_tables(self):
)
'''
}

alter_statements = {
'cosmos_blocks': 'ALTER TABLE cosmos_blocks MODIFY COLUMN time DATETIME(3);',
'transactions': 'ALTER TABLE transactions MODIFY COLUMN timestamp DATETIME(3);',
'evidence': 'ALTER TABLE evidence MODIFY COLUMN timestamp DATETIME(3);',
'signatures': 'ALTER TABLE signatures MODIFY COLUMN timestamp DATETIME(3);'
}
# Execute ALTER statements for existing tables
for table, alter_sql in alter_statements.items():
try:
print(f"Updating timestamp precision for table {table}...")
cursor.execute(alter_sql)
print(f"Successfully updated {table}")
except Exception as e:
print(f"Error updating {table}: {e}")
# Continue with other tables even if one fails
continue
# Create tables that don't exist
for table_name, create_sql in tables.items():
if table_name.lower() not in existing_tables:
Expand Down Expand Up @@ -852,22 +867,34 @@ def ingest_blocks(self, start_height, end_height):
return processed_blocks

def _parse_timestamp(self, timestamp_str: str) -> str:
"""Convert Cosmos timestamp to MySQL compatible format"""
"""Convert Cosmos timestamp to MySQL compatible format with millisecond precision"""
try:
# Remove the 'Z' and handle fractional seconds
timestamp_str = timestamp_str.replace('Z', '')
# Parse the timestamp string
dt = datetime.strptime(timestamp_str.split('.')[0], '%Y-%m-%dT%H:%M:%S')
# Format for MySQL
return dt.strftime('%Y-%m-%d %H:%M:%S')
# Remove the 'Z' from the end if present
timestamp_str = timestamp_str.rstrip('Z')

# Parse the timestamp
if '.' in timestamp_str:
# Split into main part and fractional seconds
main_part, fractional = timestamp_str.split('.')
# Truncate fractional seconds to 6 digits (microseconds) before parsing
fractional = fractional[:6]
# Reconstruct timestamp string with truncated precision
timestamp_str = f"{main_part}.{fractional}"
dt = datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%S.%f')
else:
dt = datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%S')

# Format for MySQL with millisecond precision
return dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] # Keep only milliseconds, not microseconds
except Exception as e:
print(f"Error parsing timestamp {timestamp_str}: {e}")
return datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
# Return current time with millisecond precision if parsing fails
return datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]

def _store_block(self, cursor, block_data: Dict):
"""Store block data in the cosmos_blocks table"""
"""Store block data with millisecond precision timestamp"""
if self.db_pool.db_type == 'mysql':
# Convert timestamp to MySQL format
# Convert timestamp to MySQL format with milliseconds
mysql_timestamp = self._parse_timestamp(block_data['time'])

# Get validator name, defaulting to empty string if not found
Expand Down Expand Up @@ -918,19 +945,34 @@ def _store_block(self, cursor, block_data: Dict):
total_voting_power = new_values.total_voting_power,
proposer_priority = new_values.proposer_priority
''', (
block_data['height'], block_data['hash'], mysql_timestamp,
proposer_address, proposer_name, block_data['chain_id'],
block_data['num_txs'], block_data['num_evidence'],
block_data['total_gas_wanted'], block_data['total_gas_used'],
block_data['total_fee'], block_data['last_commit_round'],
block_data['last_block_id'], block_data['validators_hash'],
block_data['next_validators_hash'], block_data['consensus_hash'],
block_data['app_hash'], block_data['last_results_hash'],
block_data['evidence_hash'], block_data['last_commit_hash'],
block_data['data_hash'], block_data['valid_signatures'],
block_data['total_signatures'], block_data['version'],
block_data['parts_total'], block_data['parts_hash'],
block_data['total_voting_power'], block_data['proposer_priority']
block_data['height'],
block_data['hash'],
mysql_timestamp,
proposer_address,
proposer_name,
block_data['chain_id'],
block_data['num_txs'],
block_data['num_evidence'],
block_data['total_gas_wanted'],
block_data['total_gas_used'],
block_data['total_fee'],
block_data['last_commit_round'],
block_data['last_block_id'],
block_data['validators_hash'],
block_data['next_validators_hash'],
block_data['consensus_hash'],
block_data['app_hash'],
block_data['last_results_hash'],
block_data['evidence_hash'],
block_data['last_commit_hash'],
block_data['data_hash'],
block_data['valid_signatures'],
block_data['total_signatures'],
block_data['version'],
block_data['parts_total'],
block_data['parts_hash'],
block_data['total_voting_power'],
block_data['proposer_priority']
))

def _store_validator_set(self, cursor, height: int, validators: List[Dict]):
Expand Down