-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexperiment.py
More file actions
115 lines (90 loc) · 3.83 KB
/
experiment.py
File metadata and controls
115 lines (90 loc) · 3.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# experiment.py
import random
import time
from models import Transaction
from blockchain import Blockchain
from anomaly import AnomalyDetector, RuleBasedSecurityChecker
def generate_random_transaction(normal: bool = True) -> Transaction:
senders = ["Alice", "Bob", "Charlie", "Dave"]
receivers = ["Eve", "Frank", "Grace", "Heidi"]
sender = random.choice(senders)
receiver = random.choice(receivers)
if normal:
amount = random.uniform(1, 100) # normal range
else:
amount = random.uniform(1000, 5000) # anomalously high
return Transaction(sender=sender, receiver=receiver, amount=amount)
def run_experiment(
num_blocks: int = 50,
anomaly_probability: float = 0.2,
baseline_size: int = 10
):
"""
Simulate blockchain activity, sometimes injecting anomalous blocks.
Both a statistical anomaly detector (z-score based)
and a simple rule-based security checker are applied.
"""
bc = Blockchain()
# More sensitive statistical detector (lower z-threshold)
detector = AnomalyDetector(baseline_size=baseline_size, z_threshold=1.5)
# Simple rule-based security checker (fixed thresholds)
rule_checker = RuleBasedSecurityChecker(
max_single_tx_amount=2000.0,
max_block_total_amount=5000.0,
min_time_delta=0.02
)
previous_block = bc.last_block
for i in range(1, num_blocks + 1):
# Decide whether this block is "normal" or "anomalous" (synthetic label)
is_anomalous_block = random.random() < anomaly_probability
# More transactions in anomalous block (just as one possible signal)
if is_anomalous_block:
num_txs = random.randint(6, 10)
else:
num_txs = random.randint(1, 5)
txs = []
for _ in range(num_txs):
tx = generate_random_transaction(normal=not is_anomalous_block)
txs.append(tx)
# Manipulate time interval between blocks:
# - anomalous blocks: very short interval
# - normal blocks: slightly longer interval
if is_anomalous_block:
time.sleep(0.01)
else:
time.sleep(0.1)
# Add block to chain
new_block = bc.add_block(txs)
# Extract features for anomaly detection and rule-based checks
features = detector.extract_features(new_block, previous_block)
# Statistical anomaly detection (z-score)
decision = detector.process_block(new_block.index, features)
# Rule-based security checks
rule_decision = rule_checker.check_rules(features)
# Log output for later analysis in the report
print("=" * 60)
print(f"Block {new_block.index}")
print(f" Synthetic label (is_anomalous_block)? {is_anomalous_block}")
print(f" num_txs={features['num_txs']}, "
f"total_amount={features['total_amount']:.2f}, "
f"max_amount={features['max_amount']:.2f}, "
f"time_delta={features['time_delta']:.3f}")
# Output anomaly detector (statistical)
print(f" [Statistical] baseline_ready={detector.baseline_ready}")
print(f" [Statistical] is_anomaly={decision['is_anomaly']}")
print(f" [Statistical] Reason: {decision['reason']}")
if decision["feature_z_scores"]:
print(
" [Statistical] z-scores: "
+ ", ".join(f"{k}={v:.2f}" for k, v in decision["feature_z_scores"].items())
)
# Output rule-based checker
print(f" [Rules] rule_alert={rule_decision['rule_alert']}")
if rule_decision["rule_alert"]:
print(" [Rules] Violations:")
for v in rule_decision["violations"]:
print(f" - {v}")
previous_block = new_block
# Final integrity check
print("=" * 60)
print("Final chain validity:", bc.is_chain_valid())