diff --git a/berachain-beacon-data/beacon-ingestion.py b/berachain-beacon-data/beacon-ingestion.py index 9231824..b840ed8 100644 --- a/berachain-beacon-data/beacon-ingestion.py +++ b/berachain-beacon-data/beacon-ingestion.py @@ -413,7 +413,7 @@ def setup_tables(self): CREATE TABLE cosmos_blocks ( height BIGINT PRIMARY KEY, hash VARCHAR(64), - time DATETIME, + time DATETIME(3), proposer_address VARCHAR(40), proposer_name VARCHAR(100), chain_id VARCHAR(64), @@ -458,7 +458,7 @@ def setup_tables(self): raw_log TEXT, status VARCHAR(20), codespace VARCHAR(100), - timestamp DATETIME, + timestamp DATETIME(3), INDEX idx_height (height), INDEX idx_hash (hash) ) ENGINE=InnoDB @@ -470,7 +470,7 @@ def setup_tables(self): evidence_type VARCHAR(100), validator_address VARCHAR(40), total_voting_power BIGINT, - timestamp DATETIME, + timestamp DATETIME(3), raw_data JSON, INDEX idx_height (height), INDEX idx_validator (validator_address) @@ -492,7 +492,7 @@ def setup_tables(self): status VARCHAR(20), tokens DECIMAL(65,0), delegator_shares DECIMAL(65,0), - update_time DATETIME, + update_time DATETIME(3), INDEX idx_status (status) ) ENGINE=InnoDB ''', @@ -501,7 +501,7 @@ def setup_tables(self): id BIGINT AUTO_INCREMENT PRIMARY KEY, height BIGINT, validator_address VARCHAR(40), - timestamp DATETIME, + timestamp DATETIME(3), signature TEXT, block_id_flag INT, voting_power BIGINT, @@ -529,7 +529,22 @@ def setup_tables(self): ) ''' } - + alter_statements = { + 'cosmos_blocks': 'ALTER TABLE cosmos_blocks MODIFY COLUMN time DATETIME(3);', + 'transactions': 'ALTER TABLE transactions MODIFY COLUMN timestamp DATETIME(3);', + 'evidence': 'ALTER TABLE evidence MODIFY COLUMN timestamp DATETIME(3);', + 'signatures': 'ALTER TABLE signatures MODIFY COLUMN timestamp DATETIME(3);' + } + # Execute ALTER statements for existing tables + for table, alter_sql in alter_statements.items(): + try: + print(f"Updating timestamp precision for table {table}...") + cursor.execute(alter_sql) + print(f"Successfully updated {table}") + except Exception as e: + print(f"Error updating {table}: {e}") + # Continue with other tables even if one fails + continue # Create tables that don't exist for table_name, create_sql in tables.items(): if table_name.lower() not in existing_tables: @@ -852,22 +867,34 @@ def ingest_blocks(self, start_height, end_height): return processed_blocks def _parse_timestamp(self, timestamp_str: str) -> str: - """Convert Cosmos timestamp to MySQL compatible format""" + """Convert Cosmos timestamp to MySQL compatible format with millisecond precision""" try: - # Remove the 'Z' and handle fractional seconds - timestamp_str = timestamp_str.replace('Z', '') - # Parse the timestamp string - dt = datetime.strptime(timestamp_str.split('.')[0], '%Y-%m-%dT%H:%M:%S') - # Format for MySQL - return dt.strftime('%Y-%m-%d %H:%M:%S') + # Remove the 'Z' from the end if present + timestamp_str = timestamp_str.rstrip('Z') + + # Parse the timestamp + if '.' in timestamp_str: + # Split into main part and fractional seconds + main_part, fractional = timestamp_str.split('.') + # Truncate fractional seconds to 6 digits (microseconds) before parsing + fractional = fractional[:6] + # Reconstruct timestamp string with truncated precision + timestamp_str = f"{main_part}.{fractional}" + dt = datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%S.%f') + else: + dt = datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%S') + + # Format for MySQL with millisecond precision + return dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] # Keep only milliseconds, not microseconds except Exception as e: print(f"Error parsing timestamp {timestamp_str}: {e}") - return datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') + # Return current time with millisecond precision if parsing fails + return datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] def _store_block(self, cursor, block_data: Dict): - """Store block data in the cosmos_blocks table""" + """Store block data with millisecond precision timestamp""" if self.db_pool.db_type == 'mysql': - # Convert timestamp to MySQL format + # Convert timestamp to MySQL format with milliseconds mysql_timestamp = self._parse_timestamp(block_data['time']) # Get validator name, defaulting to empty string if not found @@ -918,19 +945,34 @@ def _store_block(self, cursor, block_data: Dict): total_voting_power = new_values.total_voting_power, proposer_priority = new_values.proposer_priority ''', ( - block_data['height'], block_data['hash'], mysql_timestamp, - proposer_address, proposer_name, block_data['chain_id'], - block_data['num_txs'], block_data['num_evidence'], - block_data['total_gas_wanted'], block_data['total_gas_used'], - block_data['total_fee'], block_data['last_commit_round'], - block_data['last_block_id'], block_data['validators_hash'], - block_data['next_validators_hash'], block_data['consensus_hash'], - block_data['app_hash'], block_data['last_results_hash'], - block_data['evidence_hash'], block_data['last_commit_hash'], - block_data['data_hash'], block_data['valid_signatures'], - block_data['total_signatures'], block_data['version'], - block_data['parts_total'], block_data['parts_hash'], - block_data['total_voting_power'], block_data['proposer_priority'] + block_data['height'], + block_data['hash'], + mysql_timestamp, + proposer_address, + proposer_name, + block_data['chain_id'], + block_data['num_txs'], + block_data['num_evidence'], + block_data['total_gas_wanted'], + block_data['total_gas_used'], + block_data['total_fee'], + block_data['last_commit_round'], + block_data['last_block_id'], + block_data['validators_hash'], + block_data['next_validators_hash'], + block_data['consensus_hash'], + block_data['app_hash'], + block_data['last_results_hash'], + block_data['evidence_hash'], + block_data['last_commit_hash'], + block_data['data_hash'], + block_data['valid_signatures'], + block_data['total_signatures'], + block_data['version'], + block_data['parts_total'], + block_data['parts_hash'], + block_data['total_voting_power'], + block_data['proposer_priority'] )) def _store_validator_set(self, cursor, height: int, validators: List[Dict]):