Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions deploy/local/docker-compose/vector-kafka-clickhouse-libp2p.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.transient = .data.transient
.remote_peer_id = .data.remote_peer
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
Expand Down Expand Up @@ -520,6 +521,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.transient = .data.transient
.remote_peer_id = .data.remote_peer
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
Expand All @@ -544,6 +546,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.peer_id
.protocol = .data.protocol
.updated_date_time = to_unix_timestamp(now())
del(.event)
Expand All @@ -570,6 +573,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.peer_id
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
Expand Down Expand Up @@ -611,6 +615,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .meta.client.additional_data.metadata.peer_id
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
Expand Down Expand Up @@ -653,6 +658,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .meta.client.additional_data.metadata.peer_id
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
Expand Down Expand Up @@ -695,6 +701,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.peer_id
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
Expand Down Expand Up @@ -737,6 +744,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.peer_id
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
Expand Down Expand Up @@ -817,6 +825,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.peer_id
.message_id = .data.msg_id
.message_size = .data.msg_size
.reason = .data.reason
Expand Down Expand Up @@ -864,6 +873,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.peer_id
.message_id = .data.msg_id
.message_size = .data.msg_size
.updated_date_time = to_unix_timestamp(now())
Expand Down Expand Up @@ -910,6 +920,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.peer_id
.message_id = .data.msg_id
.message_size = .data.msg_size
.updated_date_time = to_unix_timestamp(now())
Expand Down Expand Up @@ -939,6 +950,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .meta.client.additional_data.metadata.peer_id

.proposer_index = .data.proposer_index

Expand Down Expand Up @@ -1031,6 +1043,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .meta.client.additional_data.metadata.peer_id

.attesting_validator_index = .meta.client.additional_data.attesting_validator.index
.attesting_validator_committee_index = .meta.client.additional_data.attesting_validator.committee_index
Expand Down Expand Up @@ -1146,6 +1159,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .meta.client.additional_data.metadata.peer_id

# Aggregator specific fields
.aggregator_index = .meta.client.additional_data.aggregator_index
Expand Down Expand Up @@ -1296,6 +1310,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .meta.client.additional_data.metadata.peer_id
.message_id = .meta.client.additional_data.message_id
.message_size = .meta.client.additional_data.message_size

Expand Down Expand Up @@ -1392,6 +1407,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .meta.client.additional_data.metadata.peer_id
.message_id = .meta.client.additional_data.message_id
.message_size = .meta.client.additional_data.message_size

Expand Down Expand Up @@ -1525,6 +1541,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .meta.client.additional_data.metadata.peer_id
}

del(.event)
Expand Down Expand Up @@ -1553,6 +1570,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.peer_id

if .data.error != null {
.error = .data.error
Expand Down Expand Up @@ -1612,6 +1630,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.peer_id
.unique_key = seahash(.event.id)

if .data.error != null {
Expand Down Expand Up @@ -1674,6 +1693,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.peer_id

topicParts, err = split(.data.topic, "/")
if err != null {
Expand Down Expand Up @@ -1749,6 +1769,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.peer_id

key, err = to_string(rootEventKey) + "_rpc_meta_control_iwant_" + to_string(.data.control_index) + "_" + to_string(.data.message_index)
if err != null {
Expand Down Expand Up @@ -1807,6 +1828,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.peer_id

key, err = to_string(rootEventKey) + "_rpc_meta_control_idontwant_" + to_string(.data.control_index) + "_" + to_string(.data.message_index)
if err != null {
Expand Down Expand Up @@ -1865,6 +1887,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.peer_id

topicParts, err = split(.data.topic, "/")
if err != null {
Expand Down Expand Up @@ -1938,6 +1961,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.peer_id

topicParts, err = split(.data.topic, "/")
if err != null {
Expand All @@ -1963,6 +1987,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.graft_peer_id_unique_key = seahash(graft_peer_id_key)
.graft_peer_id = .data.graft_peer_id

key, err = to_string(rootEventKey) + "_rpc_meta_control_prune_" + to_string(.data.control_index) + "_" + to_string(.data.peer_index)
if err != null {
Expand Down Expand Up @@ -2020,6 +2045,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.peer_id

topicParts, err = split(.data.topic_id, "/")
if err != null {
Expand Down Expand Up @@ -2094,6 +2120,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.peer_id

topicParts, err = split(.data.topic_id, "/")
if err != null {
Expand Down Expand Up @@ -2153,6 +2180,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.meta.peer_id

.unique_key = seahash(.event.id)
.updated_date_time = to_unix_timestamp(now())
Expand Down Expand Up @@ -2182,6 +2210,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.meta.peer_id

.unique_key = seahash(.event.id)
.updated_date_time = to_unix_timestamp(now())
Expand Down Expand Up @@ -2211,6 +2240,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.peer_id = .data.meta.peer_id

.unique_key = seahash(.event.id)
.updated_date_time = to_unix_timestamp(now())
Expand All @@ -2236,6 +2266,7 @@ transforms:
log("failed to generate peer id unique key: " + string!(err), level: "error", rate_limit_secs: 60)
}
.remote_peer_id_unique_key = seahash(peer_id_key)
.remote_peer_id = .data.remote_peer
.remote_maddrs = .data.remote_maddrs
.latency_ms = to_int!(.data.latency_ms)
.direction = if .data.direction == 1 {
Expand Down
93 changes: 93 additions & 0 deletions deploy/migrations/clickhouse/087_libp2p_add_peer_id.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
-- Migration 087 down: Remove peer_id columns from all libp2p tables

-- ============================================
-- SECTION 1: Remove peer_id columns from local tables (27 tables)
-- ============================================

ALTER TABLE libp2p_add_peer_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_remove_peer_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_recv_rpc_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_send_rpc_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_drop_rpc_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_join_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_leave_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_graft_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_prune_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_deliver_message_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_reject_message_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_duplicate_message_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_handle_status_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_handle_metadata_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_gossipsub_beacon_block_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_gossipsub_beacon_attestation_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_gossipsub_blob_sidecar_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_gossipsub_aggregate_and_proof_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_gossipsub_data_column_sidecar_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_rpc_meta_message_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_rpc_meta_subscription_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_rpc_meta_control_ihave_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_rpc_meta_control_iwant_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_rpc_meta_control_graft_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_rpc_meta_control_prune_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_rpc_meta_control_idontwant_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_rpc_data_column_custody_probe_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;

-- ============================================
-- SECTION 2: Remove peer_id columns from distributed tables (27 tables)
-- ============================================

ALTER TABLE libp2p_add_peer ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_remove_peer ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_recv_rpc ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_send_rpc ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_drop_rpc ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_join ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_leave ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_graft ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_prune ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_deliver_message ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_reject_message ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_duplicate_message ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_handle_status ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_handle_metadata ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_gossipsub_beacon_block ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_gossipsub_beacon_attestation ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_gossipsub_blob_sidecar ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_gossipsub_aggregate_and_proof ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_gossipsub_data_column_sidecar ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_rpc_meta_message ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_rpc_meta_subscription ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_rpc_meta_control_ihave ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_rpc_meta_control_iwant ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_rpc_meta_control_graft ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_rpc_meta_control_prune ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_rpc_meta_control_idontwant ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;
ALTER TABLE libp2p_rpc_data_column_custody_probe ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS peer_id;

-- ============================================
-- SECTION 3: Remove remote_peer_id columns from local tables (3 tables)
-- ============================================

ALTER TABLE libp2p_connected_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS remote_peer_id;
ALTER TABLE libp2p_disconnected_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS remote_peer_id;
ALTER TABLE libp2p_synthetic_heartbeat_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS remote_peer_id;

-- ============================================
-- SECTION 4: Remove remote_peer_id columns from distributed tables (3 tables)
-- ============================================

ALTER TABLE libp2p_connected ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS remote_peer_id;
ALTER TABLE libp2p_disconnected ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS remote_peer_id;
ALTER TABLE libp2p_synthetic_heartbeat ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS remote_peer_id;

-- ============================================
-- SECTION 5: Remove graft_peer_id column from local table (1 table)
-- ============================================

ALTER TABLE libp2p_rpc_meta_control_prune_local ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS graft_peer_id;

-- ============================================
-- SECTION 6: Remove graft_peer_id column from distributed table (1 table)
-- ============================================

ALTER TABLE libp2p_rpc_meta_control_prune ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS graft_peer_id;
Loading
Loading