diff --git a/deploy/local/docker-compose/vector-kafka-clickhouse-libp2p.yaml b/deploy/local/docker-compose/vector-kafka-clickhouse-libp2p.yaml index 442654fc..f8bf0635 100644 --- a/deploy/local/docker-compose/vector-kafka-clickhouse-libp2p.yaml +++ b/deploy/local/docker-compose/vector-kafka-clickhouse-libp2p.yaml @@ -420,6 +420,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .transient = .data.transient + .remote_peer_id = .data.remote_peer .updated_date_time = to_unix_timestamp(now()) del(.event) del(.meta) @@ -520,6 +521,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .transient = .data.transient + .remote_peer_id = .data.remote_peer .updated_date_time = to_unix_timestamp(now()) del(.event) del(.meta) @@ -544,6 +546,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.peer_id .protocol = .data.protocol .updated_date_time = to_unix_timestamp(now()) del(.event) @@ -570,6 +573,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.peer_id .updated_date_time = to_unix_timestamp(now()) del(.event) del(.meta) @@ -611,6 +615,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .meta.client.additional_data.metadata.peer_id .updated_date_time = to_unix_timestamp(now()) del(.event) del(.meta) @@ -653,6 +658,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .meta.client.additional_data.metadata.peer_id .updated_date_time = to_unix_timestamp(now()) del(.event) del(.meta) @@ -695,6 +701,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.peer_id .updated_date_time = to_unix_timestamp(now()) del(.event) del(.meta) @@ -737,6 +744,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.peer_id .updated_date_time = to_unix_timestamp(now()) del(.event) del(.meta) @@ -817,6 +825,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.peer_id .message_id = .data.msg_id .message_size = .data.msg_size .reason = .data.reason @@ -864,6 +873,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.peer_id .message_id = .data.msg_id .message_size = .data.msg_size .updated_date_time = to_unix_timestamp(now()) @@ -910,6 +920,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.peer_id .message_id = .data.msg_id .message_size = .data.msg_size .updated_date_time = to_unix_timestamp(now()) @@ -939,6 +950,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .meta.client.additional_data.metadata.peer_id .proposer_index = .data.proposer_index @@ -1031,6 +1043,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .meta.client.additional_data.metadata.peer_id .attesting_validator_index = .meta.client.additional_data.attesting_validator.index .attesting_validator_committee_index = .meta.client.additional_data.attesting_validator.committee_index @@ -1146,6 +1159,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .meta.client.additional_data.metadata.peer_id # Aggregator specific fields .aggregator_index = .meta.client.additional_data.aggregator_index @@ -1296,6 +1310,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .meta.client.additional_data.metadata.peer_id .message_id = .meta.client.additional_data.message_id .message_size = .meta.client.additional_data.message_size @@ -1392,6 +1407,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .meta.client.additional_data.metadata.peer_id .message_id = .meta.client.additional_data.message_id .message_size = .meta.client.additional_data.message_size @@ -1525,6 +1541,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .meta.client.additional_data.metadata.peer_id } del(.event) @@ -1553,6 +1570,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.peer_id if .data.error != null { .error = .data.error @@ -1612,6 +1630,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.peer_id .unique_key = seahash(.event.id) if .data.error != null { @@ -1674,6 +1693,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.peer_id topicParts, err = split(.data.topic, "/") if err != null { @@ -1749,6 +1769,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.peer_id key, err = to_string(rootEventKey) + "_rpc_meta_control_iwant_" + to_string(.data.control_index) + "_" + to_string(.data.message_index) if err != null { @@ -1807,6 +1828,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.peer_id key, err = to_string(rootEventKey) + "_rpc_meta_control_idontwant_" + to_string(.data.control_index) + "_" + to_string(.data.message_index) if err != null { @@ -1865,6 +1887,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.peer_id topicParts, err = split(.data.topic, "/") if err != null { @@ -1938,6 +1961,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.peer_id topicParts, err = split(.data.topic, "/") if err != null { @@ -1963,6 +1987,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .graft_peer_id_unique_key = seahash(graft_peer_id_key) + .graft_peer_id = .data.graft_peer_id key, err = to_string(rootEventKey) + "_rpc_meta_control_prune_" + to_string(.data.control_index) + "_" + to_string(.data.peer_index) if err != null { @@ -2020,6 +2045,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.peer_id topicParts, err = split(.data.topic_id, "/") if err != null { @@ -2094,6 +2120,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.peer_id topicParts, err = split(.data.topic_id, "/") if err != null { @@ -2153,6 +2180,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.meta.peer_id .unique_key = seahash(.event.id) .updated_date_time = to_unix_timestamp(now()) @@ -2182,6 +2210,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.meta.peer_id .unique_key = seahash(.event.id) .updated_date_time = to_unix_timestamp(now()) @@ -2211,6 +2240,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .peer_id_unique_key = seahash(peer_id_key) + .peer_id = .data.meta.peer_id .unique_key = seahash(.event.id) .updated_date_time = to_unix_timestamp(now()) @@ -2236,6 +2266,7 @@ transforms: log("failed to generate peer id unique key: " + string!(err), level: "error", rate_limit_secs: 60) } .remote_peer_id_unique_key = seahash(peer_id_key) + .remote_peer_id = .data.remote_peer .remote_maddrs = .data.remote_maddrs .latency_ms = to_int!(.data.latency_ms) .direction = if .data.direction == 1 { diff --git a/deploy/migrations/clickhouse/087_libp2p_add_peer_id.down.sql b/deploy/migrations/clickhouse/087_libp2p_add_peer_id.down.sql new file mode 100644 index 00000000..d2d7e4d6 --- /dev/null +++ b/deploy/migrations/clickhouse/087_libp2p_add_peer_id.down.sql @@ -0,0 +1,93 @@ +-- Migration 087 down: Remove peer_id columns from all libp2p tables + +-- ============================================ +-- SECTION 1: Remove peer_id columns from local tables (27 tables) +-- ============================================ + +ALTER TABLE libp2p_add_peer_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_remove_peer_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_recv_rpc_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_send_rpc_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_drop_rpc_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_join_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_leave_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_graft_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_prune_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_deliver_message_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_reject_message_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_duplicate_message_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_handle_status_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_handle_metadata_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_gossipsub_beacon_block_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_gossipsub_beacon_attestation_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_gossipsub_blob_sidecar_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_gossipsub_aggregate_and_proof_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_gossipsub_data_column_sidecar_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_rpc_meta_message_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_rpc_meta_subscription_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_rpc_meta_control_ihave_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_rpc_meta_control_iwant_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_rpc_meta_control_graft_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_rpc_meta_control_prune_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_rpc_meta_control_idontwant_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_rpc_data_column_custody_probe_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; + +-- ============================================ +-- SECTION 2: Remove peer_id columns from distributed tables (27 tables) +-- ============================================ + +ALTER TABLE libp2p_add_peer ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_remove_peer ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_recv_rpc ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_send_rpc ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_drop_rpc ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_join ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_leave ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_graft ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_prune ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_deliver_message ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_reject_message ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_duplicate_message ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_handle_status ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_handle_metadata ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_gossipsub_beacon_block ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_gossipsub_beacon_attestation ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_gossipsub_blob_sidecar ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_gossipsub_aggregate_and_proof ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_gossipsub_data_column_sidecar ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_rpc_meta_message ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_rpc_meta_subscription ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_rpc_meta_control_ihave ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_rpc_meta_control_iwant ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_rpc_meta_control_graft ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_rpc_meta_control_prune ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_rpc_meta_control_idontwant ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; +ALTER TABLE libp2p_rpc_data_column_custody_probe ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id; + +-- ============================================ +-- SECTION 3: Remove remote_peer_id columns from local tables (3 tables) +-- ============================================ + +ALTER TABLE libp2p_connected_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS remote_peer_id; +ALTER TABLE libp2p_disconnected_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS remote_peer_id; +ALTER TABLE libp2p_synthetic_heartbeat_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS remote_peer_id; + +-- ============================================ +-- SECTION 4: Remove remote_peer_id columns from distributed tables (3 tables) +-- ============================================ + +ALTER TABLE libp2p_connected ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS remote_peer_id; +ALTER TABLE libp2p_disconnected ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS remote_peer_id; +ALTER TABLE libp2p_synthetic_heartbeat ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS remote_peer_id; + +-- ============================================ +-- SECTION 5: Remove graft_peer_id column from local table (1 table) +-- ============================================ + +ALTER TABLE libp2p_rpc_meta_control_prune_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS graft_peer_id; + +-- ============================================ +-- SECTION 6: Remove graft_peer_id column from distributed table (1 table) +-- ============================================ + +ALTER TABLE libp2p_rpc_meta_control_prune ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS graft_peer_id; diff --git a/deploy/migrations/clickhouse/087_libp2p_add_peer_id.up.sql b/deploy/migrations/clickhouse/087_libp2p_add_peer_id.up.sql new file mode 100644 index 00000000..b387ddb5 --- /dev/null +++ b/deploy/migrations/clickhouse/087_libp2p_add_peer_id.up.sql @@ -0,0 +1,213 @@ +-- Migration 087: Add peer_id columns to all libp2p tables +-- Adds raw peer_id string alongside existing peer_id_unique_key (seahash) +-- Historical rows will have NULL; only new data will be populated + +-- ============================================ +-- SECTION 1: Add peer_id columns to local tables (27 tables) +-- ============================================ + +ALTER TABLE libp2p_add_peer_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_remove_peer_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_recv_rpc_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID of the sender' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_send_rpc_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID of the receiver' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_drop_rpc_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_join_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID that joined the topic' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_leave_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID that left the topic' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_graft_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_prune_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_deliver_message_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID that delivered the message' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_reject_message_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_duplicate_message_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_handle_status_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_handle_metadata_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_gossipsub_beacon_block_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_gossipsub_beacon_attestation_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_gossipsub_blob_sidecar_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_gossipsub_aggregate_and_proof_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_gossipsub_data_column_sidecar_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_rpc_meta_message_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_rpc_meta_subscription_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_rpc_meta_control_ihave_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_rpc_meta_control_iwant_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_rpc_meta_control_graft_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_rpc_meta_control_prune_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_rpc_meta_control_idontwant_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_rpc_data_column_custody_probe_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +-- ============================================ +-- SECTION 2: Add peer_id columns to distributed tables (27 tables) +-- ============================================ + +ALTER TABLE libp2p_add_peer ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_remove_peer ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_recv_rpc ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID of the sender' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_send_rpc ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID of the receiver' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_drop_rpc ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_join ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID that joined the topic' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_leave ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID that left the topic' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_graft ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_prune ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_deliver_message ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID that delivered the message' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_reject_message ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_duplicate_message ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_handle_status ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_handle_metadata ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_gossipsub_beacon_block ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_gossipsub_beacon_attestation ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_gossipsub_blob_sidecar ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_gossipsub_aggregate_and_proof ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_gossipsub_data_column_sidecar ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_rpc_meta_message ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_rpc_meta_subscription ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_rpc_meta_control_ihave ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_rpc_meta_control_iwant ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_rpc_meta_control_graft ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_rpc_meta_control_prune ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_rpc_meta_control_idontwant ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +ALTER TABLE libp2p_rpc_data_column_custody_probe ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS peer_id Nullable(String) COMMENT 'The libp2p peer ID' CODEC(ZSTD(1)) AFTER peer_id_unique_key; + +-- ============================================ +-- SECTION 3: Add remote_peer_id columns to local tables (3 tables) +-- ============================================ + +ALTER TABLE libp2p_connected_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS remote_peer_id Nullable(String) COMMENT 'The remote libp2p peer ID' CODEC(ZSTD(1)) AFTER remote_peer_id_unique_key; + +ALTER TABLE libp2p_disconnected_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS remote_peer_id Nullable(String) COMMENT 'The remote libp2p peer ID' CODEC(ZSTD(1)) AFTER remote_peer_id_unique_key; + +ALTER TABLE libp2p_synthetic_heartbeat_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS remote_peer_id Nullable(String) COMMENT 'The remote libp2p peer ID' CODEC(ZSTD(1)) AFTER remote_peer_id_unique_key; + +-- ============================================ +-- SECTION 4: Add remote_peer_id columns to distributed tables (3 tables) +-- ============================================ + +ALTER TABLE libp2p_connected ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS remote_peer_id Nullable(String) COMMENT 'The remote libp2p peer ID' CODEC(ZSTD(1)) AFTER remote_peer_id_unique_key; + +ALTER TABLE libp2p_disconnected ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS remote_peer_id Nullable(String) COMMENT 'The remote libp2p peer ID' CODEC(ZSTD(1)) AFTER remote_peer_id_unique_key; + +ALTER TABLE libp2p_synthetic_heartbeat ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS remote_peer_id Nullable(String) COMMENT 'The remote libp2p peer ID' CODEC(ZSTD(1)) AFTER remote_peer_id_unique_key; + +-- ============================================ +-- SECTION 5: Add graft_peer_id column to local table (1 table) +-- ============================================ + +ALTER TABLE libp2p_rpc_meta_control_prune_local ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS graft_peer_id Nullable(String) COMMENT 'The graft libp2p peer ID' CODEC(ZSTD(1)) AFTER graft_peer_id_unique_key; + +-- ============================================ +-- SECTION 6: Add graft_peer_id column to distributed table (1 table) +-- ============================================ + +ALTER TABLE libp2p_rpc_meta_control_prune ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS graft_peer_id Nullable(String) COMMENT 'The graft libp2p peer ID' CODEC(ZSTD(1)) AFTER graft_peer_id_unique_key;