-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtrust_portability.py
More file actions
executable file
·2048 lines (1691 loc) · 73.4 KB
/
trust_portability.py
File metadata and controls
executable file
·2048 lines (1691 loc) · 73.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
Cross-Project Trust Portability Layer for Tabula Rasa SuperInstance Fleet
==========================================================================
Enables trust earned in one SuperInstance repository to carry weight in another.
This module bridges trust across the fleet, making reputation a portable asset.
Based on research from:
- Jøsang (2001): A Logic for Uncertain Probabilities — Subjective Logic
- Jøsang, Hayward & Pope (2006): Trust Network Analysis with Subjective Logic
- Ding et al. (2009): Computing Reputation in Online Social Networks
- Gartner TRiSM (2024): Trust, Risk, Security Management for Agentic AI
- RepuNet (2025): Dynamic Dual-Level Reputation for LLM-based MAS
Design principles:
1. Trust is portable — earned in one repo, recognized in others
2. Attestations are signed — tamper-proof via HMAC-SHA256
3. Foreign trust decays — older attestations count less
4. Inconsistency is detectable — conflicting repo reports are flagged
5. Trust propagates — transitivity via Subjective Logic discount operator
6. Replay attacks prevented — each attestation has a unique fingerprint
7. Anchors of trust — designated repos whose attestations are auto-accepted
Architecture:
Repo A (local) Repo B (foreign)
┌──────────────┐ ┌──────────────┐
│ TrustEngine │──export──>│ TrustAttestation │──import──>│ FleetTrustBridge │
│ │ │ (signed proof) │ │ (blended trust) │
└──────────────┘ └──────────────┘ └──────────────────┘
│
┌─────────────────────────┘
v
TrustPropagationGraph
(cross-fleet trust paths)
"""
import json
import time
import math
import hashlib
import hmac
from typing import (
Optional, Dict, List, Tuple, Set, Any,
Callable, NamedTuple
)
from dataclasses import dataclass, field
from collections import deque
# ═══════════════════════════════════════════════════════════════
# Constants
# ═══════════════════════════════════════════════════════════════
# Shared fleet key for HMAC-SHA256 attestation signing.
# In production, this would be injected from environment or KMS.
FLEET_TRUST_KEY: str = "superinstance-fleet-trust-v1"
# Default weight for foreign trust when blending with local trust.
# 0.0 = ignore foreign, 1.0 = only foreign. 0.3 means 30% foreign, 70% local.
DEFAULT_IMPORT_FACTOR: float = 0.3
# How many seconds before a foreign attestation is considered stale.
ATTESTATION_STALENESS_SECONDS: float = 30 * 86400 # 30 days
# Exponential decay rate for foreign attestations (per day).
# After 7 days, weight is multiplied by FOREIGN_DECAY_RATE^7 ≈ 0.72
FOREIGN_DECAY_RATE: float = 0.96
# Maximum BFS depth for trust path finding.
MAX_PATH_DEPTH: int = 3
# Threshold for flagging trust inconsistency between repos.
# If two repos differ by more than this on composite trust, flag it.
INCONSISTENCY_THRESHOLD: float = 0.4
# Minimum number of sources required for consensus computation.
MIN_CONSENSUS_SOURCES: int = 2
# Maximum age in seconds for an attestation to be considered valid.
ATTESTATION_MAX_AGE: float = 90 * 86400 # 90 days
# Echo chamber detection threshold: if all edges point inward, it's a chamber.
ECHO_CHAMBER_INWARD_RATIO: float = 0.8
# ═══════════════════════════════════════════════════════════════
# Trust Dimensions — mirrors trust_engine.TRUST_DIMENSIONS
# ═══════════════════════════════════════════════════════════════
TRUST_DIMENSIONS: List[str] = [
"code_quality",
"task_completion",
"collaboration",
"reliability",
"innovation",
]
BASE_TRUST: float = 0.3
# ═══════════════════════════════════════════════════════════════
# Cross-Repo Trust Event Presets
# ═══════════════════════════════════════════════════════════════
CROSS_REPO_TRUST_EVENTS: Dict[str, Dict[str, Any]] = {
"cross_repo_code_review": {
"dimension": "code_quality",
"value": 0.85,
"weight": 1.2,
"description": "Agent submitted code reviewed and approved by another repo",
},
"foreign_task_completed": {
"dimension": "task_completion",
"value": 0.8,
"weight": 1.0,
"description": "Agent successfully completed a task assigned by a foreign repo",
},
"fleet_collaboration": {
"dimension": "collaboration",
"value": 0.9,
"weight": 1.3,
"description": "Agent demonstrated effective cross-repo collaboration",
},
"cross_repo_bug_found": {
"dimension": "code_quality",
"value": 0.8,
"weight": 0.9,
"description": "Agent found a bug in code from another repo",
},
"foreign_dependency_shipped": {
"dimension": "reliability",
"value": 0.85,
"weight": 1.1,
"description": "Agent delivered a dependency that was used by another repo",
},
"fleet_knowledge_shared": {
"dimension": "collaboration",
"value": 0.8,
"weight": 0.8,
"description": "Agent shared useful knowledge across the fleet",
},
"cross_repo_innovation": {
"dimension": "innovation",
"value": 0.9,
"weight": 1.2,
"description": "Agent introduced a novel approach adopted by another repo",
},
"fleet_conflict_resolved": {
"dimension": "collaboration",
"value": 0.85,
"weight": 1.0,
"description": "Agent helped resolve a cross-repo conflict",
},
"foreign_integration_success": {
"dimension": "reliability",
"value": 0.8,
"weight": 1.0,
"description": "Agent's work integrated successfully into another repo",
},
"cross_repo_mentorship": {
"dimension": "collaboration",
"value": 0.9,
"weight": 1.1,
"description": "Agent mentored someone from another repo effectively",
},
}
# ═══════════════════════════════════════════════════════════════
# 1. TrustAttestation — Signed Trust Proof
# ═══════════════════════════════════════════════════════════════
@dataclass
class TrustAttestation:
"""
A cryptographically signed trust proof that can be exported from one
SuperInstance repo and imported into another.
The attestation contains:
- Agent identity (who the trust is about)
- Trust dimensions (per-dimension scores)
- Composite score (weighted aggregate)
- Issuer info (which repo issued this)
- Timestamp (when it was created)
- HMAC-SHA256 signature (tamper-proof)
- Trust vector fingerprint (for replay detection)
The attestation is the unit of cross-repo trust portability.
It is immutable once signed — any modification invalidates the signature.
"""
# Identity
agent_name: str = ""
issuer_repo: str = ""
issuer_id: str = ""
# Trust data
dimensions: Dict[str, float] = field(default_factory=dict)
composite: float = BASE_TRUST
# Evidence metadata
event_count: int = 0
is_meaningful: bool = False
cross_repo_events: List[str] = field(default_factory=list)
# Timing
issued_at: float = field(default_factory=time.time)
expires_at: float = 0.0 # 0 = never expires
# Cryptographic
signature: str = ""
fingerprint: str = ""
def __post_init__(self):
"""Ensure dimensions dict has all trust dimensions populated."""
for dim in TRUST_DIMENSIONS:
if dim not in self.dimensions:
self.dimensions[dim] = BASE_TRUST
def compute_fingerprint(self) -> str:
"""
Compute a unique fingerprint for this attestation.
The fingerprint is a SHA-256 hash of the attestation's content
(excluding the signature itself). Used for replay detection:
if the same fingerprint is imported twice, it's a replay attack.
"""
content = self._content_hash_input()
return hashlib.sha256(content.encode("utf-8")).hexdigest()
def _content_hash_input(self) -> str:
"""
Build the canonical string used for signing and fingerprinting.
Fields are sorted to ensure deterministic ordering regardless
of how the dict was constructed.
"""
sorted_dims = json.dumps(
{k: self.dimensions[k] for k in sorted(self.dimensions)},
separators=(",", ":")
)
sorted_events = json.dumps(sorted(self.cross_repo_events), separators=(",", ":"))
return (
f"{self.agent_name}|{self.issuer_repo}|{self.issuer_id}|"
f"{sorted_dims}|{self.composite}|{self.event_count}|"
f"{self.is_meaningful}|{sorted_events}|{self.issued_at}|{self.expires_at}"
)
def sign(self, key: str = FLEET_TRUST_KEY):
"""
Sign this attestation using HMAC-SHA256 with the fleet key.
The signature covers all trust data plus metadata, making it
tamper-proof. Any modification to the attestation after signing
will cause verification to fail.
"""
self.fingerprint = self.compute_fingerprint()
message = self._content_hash_input()
self.signature = hmac.new(
key.encode("utf-8"),
message.encode("utf-8"),
hashlib.sha256
).hexdigest()
def verify(self, key: str = FLEET_TRUST_KEY) -> bool:
"""
Verify the attestation's HMAC-SHA256 signature.
Returns True if:
1. The signature is non-empty
2. The fingerprint matches the current content
3. The HMAC verification succeeds
This ensures the attestation has not been tampered with
since it was signed by the issuing repo.
"""
if not self.signature:
return False
# Check fingerprint integrity
current_fingerprint = self.compute_fingerprint()
if self.fingerprint and self.fingerprint != current_fingerprint:
return False
# Verify HMAC
message = self._content_hash_input()
expected = hmac.new(
key.encode("utf-8"),
message.encode("utf-8"),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(self.signature, expected)
def is_expired(self, current_time: float = None) -> bool:
"""Check if this attestation has expired."""
if self.expires_at <= 0:
return False
now = current_time or time.time()
return now > self.expires_at
def age_seconds(self, current_time: float = None) -> float:
"""How old this attestation is in seconds."""
now = current_time or time.time()
return now - self.issued_at
def age_days(self, current_time: float = None) -> float:
"""How old this attestation is in days."""
return self.age_seconds(current_time) / 86400.0
def decayed_weight(self, current_time: float = None) -> float:
"""
Compute a decay factor based on attestation age.
Uses exponential decay: weight = FOREIGN_DECAY_RATE^days.
A fresh attestation (0 days old) returns 1.0.
After 30 days, returns ~0.29.
"""
days = self.age_days(current_time)
return FOREIGN_DECAY_RATE ** days
def to_dict(self) -> dict:
"""Serialize this attestation to a dictionary."""
return {
"agent_name": self.agent_name,
"issuer_repo": self.issuer_repo,
"issuer_id": self.issuer_id,
"dimensions": dict(self.dimensions),
"composite": round(self.composite, 6),
"event_count": self.event_count,
"is_meaningful": self.is_meaningful,
"cross_repo_events": list(self.cross_repo_events),
"issued_at": self.issued_at,
"expires_at": self.expires_at,
"signature": self.signature,
"fingerprint": self.fingerprint,
}
@classmethod
def from_dict(cls, data: dict) -> 'TrustAttestation':
"""Deserialize an attestation from a dictionary."""
att = cls(
agent_name=data.get("agent_name", ""),
issuer_repo=data.get("issuer_repo", ""),
issuer_id=data.get("issuer_id", ""),
dimensions=data.get("dimensions", {}),
composite=data.get("composite", BASE_TRUST),
event_count=data.get("event_count", 0),
is_meaningful=data.get("is_meaningful", False),
cross_repo_events=data.get("cross_repo_events", []),
issued_at=data.get("issued_at", time.time()),
expires_at=data.get("expires_at", 0.0),
signature=data.get("signature", ""),
fingerprint=data.get("fingerprint", ""),
)
return att
def to_json(self, indent: int = 2) -> str:
"""Serialize to a JSON string."""
return json.dumps(self.to_dict(), indent=indent)
@classmethod
def from_json(cls, json_str: str) -> 'TrustAttestation':
"""Deserialize from a JSON string."""
return cls.from_dict(json.loads(json_str))
# ═══════════════════════════════════════════════════════════════
# 2. FleetTrustBridge — Main Bridge Class
# ═══════════════════════════════════════════════════════════════
@dataclass
class InconsistencyReport:
"""A report of trust inconsistency between repos for an agent."""
agent_name: str
repo_scores: Dict[str, float] # repo -> composite score
max_difference: float
flagged: bool
description: str
def to_dict(self) -> dict:
return {
"agent_name": self.agent_name,
"repo_scores": {k: round(v, 4) for k, v in self.repo_scores.items()},
"max_difference": round(self.max_difference, 4),
"flagged": self.flagged,
"description": self.description,
}
class FleetTrustBridge:
"""
The main bridge between local and foreign trust.
Maintains a cache of foreign trust attestations and blends them with
local trust scores. Provides unified fleet-wide trust computation.
Key responsibilities:
- Import and validate foreign attestations
- Blend local trust with foreign attestations using configurable weights
- Compute trust consensus when multiple repos report on the same agent
- Detect trust inconsistency across repos
- Auto-decay foreign attestations based on age
"""
def __init__(
self,
local_repo: str = "local",
import_factor: float = DEFAULT_IMPORT_FACTOR,
fleet_key: str = FLEET_TRUST_KEY,
trust_getter: Optional[Callable[[str], float]] = None,
):
"""
Initialize the FleetTrustBridge.
Args:
local_repo: Name/identifier of this local repository
import_factor: Weight given to foreign trust (0.0-1.0)
fleet_key: HMAC key for attestation verification
trust_getter: Callback to get local trust: fn(agent_name) -> float
"""
self.local_repo = local_repo
self.import_factor = max(0.0, min(1.0, import_factor))
self.fleet_key = fleet_key
self.trust_getter = trust_getter
# Foreign attestation cache: agent_name -> [TrustAttestation, ...]
self._foreign_attestations: Dict[str, List[TrustAttestation]] = {}
# Fingerprint registry for replay detection: fingerprint -> import timestamp
self._seen_fingerprints: Dict[str, float] = {}
# Inconsistency cache: agent_name -> InconsistencyReport
self._inconsistencies: Dict[str, InconsistencyReport] = {}
# Import statistics
self._import_count: int = 0
self._replay_count: int = 0
self._invalid_count: int = 0
def _get_local_trust(self, agent_name: str) -> float:
"""Get local trust score for an agent."""
if self.trust_getter:
try:
return max(0.0, min(1.0, self.trust_getter(agent_name)))
except Exception:
pass
return BASE_TRUST
# ─── Import / Export ────────────────────────────────────
def import_attestation(
self,
attestation: TrustAttestation,
current_time: float = None
) -> dict:
"""
Import and validate a foreign trust attestation.
Validation checks:
1. Signature verification (HMAC-SHA256)
2. Replay detection (fingerprint uniqueness)
3. Expiration check
4. Maximum age check
Args:
attestation: The attestation to import
current_time: Override current time (for testing)
Returns:
dict with 'accepted', 'reason', and 'agent_name'
"""
now = current_time or time.time()
# 1. Verify signature
if not attestation.verify(self.fleet_key):
self._invalid_count += 1
return {
"accepted": False,
"reason": "invalid_signature",
"agent_name": attestation.agent_name,
}
# 2. Replay detection
fp = attestation.fingerprint or attestation.compute_fingerprint()
if fp in self._seen_fingerprints:
self._replay_count += 1
return {
"accepted": False,
"reason": "replay_detected",
"agent_name": attestation.agent_name,
"fingerprint": fp,
}
# 3. Expiration check
if attestation.is_expired(now):
self._invalid_count += 1
return {
"accepted": False,
"reason": "expired",
"agent_name": attestation.agent_name,
}
# 4. Maximum age check
if attestation.age_seconds(now) > ATTESTATION_MAX_AGE:
self._invalid_count += 1
return {
"accepted": False,
"reason": "too_old",
"agent_name": attestation.agent_name,
"age_days": round(attestation.age_days(now), 1),
}
# All checks passed — register the attestation
self._seen_fingerprints[fp] = now
agent = attestation.agent_name
if agent not in self._foreign_attestations:
self._foreign_attestations[agent] = []
# Keep only the most recent attestation from each issuer
existing = self._foreign_attestations[agent]
existing = [
a for a in existing
if a.issuer_repo != attestation.issuer_repo
]
existing.append(attestation)
self._foreign_attestations[agent] = existing
self._import_count += 1
return {
"accepted": True,
"reason": "valid",
"agent_name": agent,
"issuer_repo": attestation.issuer_repo,
"composite": attestation.composite,
}
def import_attestations(
self,
attestations: List[TrustAttestation],
current_time: float = None
) -> dict:
"""
Import a batch of attestations.
Returns:
dict with 'accepted_count', 'rejected_count', and 'results'
"""
accepted = 0
rejected = 0
results = []
for att in attestations:
result = self.import_attestation(att, current_time)
if result["accepted"]:
accepted += 1
else:
rejected += 1
results.append(result)
return {
"accepted_count": accepted,
"rejected_count": rejected,
"results": results,
}
def export_attestation(
self,
agent_name: str,
trust_getter: Callable[[str], float],
composite_getter: Callable[[str], float],
event_count_getter: Callable[[str], int] = None,
meaningful_getter: Callable[[str], bool] = None,
cross_repo_events: List[str] = None,
) -> TrustAttestation:
"""
Create a signed attestation for a local agent's trust.
This is used when exporting trust from this repo to others.
Args:
agent_name: Name of the agent
trust_getter: fn(dimension) -> float for per-dimension trust
composite_getter: fn() -> float for composite trust
event_count_getter: optional fn() -> int for total events
meaningful_getter: optional fn() -> bool for meaningful check
cross_repo_events: list of cross-repo event names
Returns:
Signed TrustAttestation ready for export
"""
dimensions = {}
for dim in TRUST_DIMENSIONS:
try:
dimensions[dim] = max(0.0, min(1.0, trust_getter(dim)))
except Exception:
dimensions[dim] = BASE_TRUST
composite = composite_getter()
composite = max(0.0, min(1.0, composite))
event_count = 0
if event_count_getter:
try:
event_count = event_count_getter()
except Exception:
pass
is_meaningful = False
if meaningful_getter:
try:
is_meaningful = meaningful_getter()
except Exception:
pass
att = TrustAttestation(
agent_name=agent_name,
issuer_repo=self.local_repo,
issuer_id=self.local_repo,
dimensions=dimensions,
composite=composite,
event_count=event_count,
is_meaningful=is_meaningful,
cross_repo_events=cross_repo_events or [],
)
att.sign(self.fleet_key)
return att
# ─── Trust Blending ─────────────────────────────────────
def foreign_trust(self, agent_name: str, current_time: float = None) -> float:
"""
Compute the weighted consensus of foreign trust for an agent.
If multiple repos have attested for this agent, compute a weighted
consensus where:
- Each attestation is weighted by its decay factor (recency)
- More meaningful attestations get higher weight
- The final score is the weighted average across all sources
Returns BASE_TRUST if no foreign attestations exist.
"""
attestations = self._foreign_attestations.get(agent_name, [])
if not attestations:
return BASE_TRUST
now = current_time or time.time()
weighted_sum = 0.0
weight_total = 0.0
for att in attestations:
# Skip expired attestations
if att.is_expired(now):
continue
# Decay weight based on age
decay = att.decayed_weight(now)
# Meaningful attestations get 2x weight
meaningful_bonus = 1.5 if att.is_meaningful else 1.0
# Event count bonus (logarithmic, capped at 2x)
event_bonus = min(2.0, 1.0 + math.log1p(att.event_count) / math.log1p(20))
total_weight = decay * meaningful_bonus * event_bonus
weighted_sum += att.composite * total_weight
weight_total += total_weight
if weight_total <= 0:
return BASE_TRUST
return max(0.0, min(1.0, weighted_sum / weight_total))
def fleet_composite_trust(self, agent_name: str, current_time: float = None) -> float:
"""
Compute a unified trust score blending local and foreign trust.
Formula: fleet_trust = (1 - α) * local_trust + α * foreign_trust
where α is the import_factor.
If no local trust getter is configured, returns BASE_TRUST.
If no foreign attestations exist, returns local_trust (no foreign
influence can pull the score down from local-only).
"""
local = self._get_local_trust(agent_name)
foreign = self.foreign_trust(agent_name, current_time)
# If no foreign data, return local only
attestations = self._foreign_attestations.get(agent_name, [])
now = current_time or time.time()
active_attestations = [
a for a in attestations if not a.is_expired(now)
]
if not active_attestations:
return local
blended = (1.0 - self.import_factor) * local + self.import_factor * foreign
return max(0.0, min(1.0, blended))
def fleet_dimension_trust(
self, agent_name: str, dimension: str, current_time: float = None
) -> float:
"""
Get blended trust for a specific dimension.
Averages the local dimension score with foreign attestations'
dimension scores, weighted by import_factor.
"""
attestations = self._foreign_attestations.get(agent_name, [])
now = current_time or time.time()
# Get foreign dimension scores (weighted by decay)
foreign_weighted = 0.0
foreign_total_weight = 0.0
for att in attestations:
if att.is_expired(now):
continue
decay = att.decayed_weight(now)
dim_score = att.dimensions.get(dimension, BASE_TRUST)
foreign_weighted += dim_score * decay
foreign_total_weight += decay
if foreign_total_weight <= 0:
return self._get_local_trust(agent_name)
foreign_dim = foreign_weighted / foreign_total_weight
# Use composite-based local trust as proxy for dimension-specific
local = self._get_local_trust(agent_name)
blended = (1.0 - self.import_factor) * local + self.import_factor * foreign_dim
return max(0.0, min(1.0, blended))
# ─── Inconsistency Detection ────────────────────────────
def detect_inconsistencies(self, current_time: float = None) -> List[InconsistencyReport]:
"""
Detect trust inconsistency across repos for all agents.
If repo A reports trust 0.8 and repo B reports 0.2 for the same agent,
this signals a potential issue (sybil attack, context mismatch, etc.).
Returns list of InconsistencyReports for all flagged agents.
"""
now = current_time or time.time()
reports = []
for agent_name, attestations in self._foreign_attestations.items():
active = [a for a in attestations if not a.is_expired(now)]
if len(active) < MIN_CONSENSUS_SOURCES:
continue
repo_scores = {}
for att in active:
repo_scores[att.issuer_repo] = att.composite
# Add local score
local = self._get_local_trust(agent_name)
repo_scores[self.local_repo] = local
scores = list(repo_scores.values())
max_diff = max(scores) - min(scores)
flagged = max_diff > INCONSISTENCY_THRESHOLD
report = InconsistencyReport(
agent_name=agent_name,
repo_scores=repo_scores,
max_difference=max_diff,
flagged=flagged,
description=(
f"Trust for '{agent_name}' varies from "
f"{min(scores):.2f} to {max(scores):.2f} across "
f"{len(repo_scores)} sources"
+ (" — INCONSISTENT" if flagged else "")
),
)
reports.append(report)
self._inconsistencies[agent_name] = report
return sorted(reports, key=lambda r: r.max_difference, reverse=True)
def get_inconsistency(self, agent_name: str) -> Optional[InconsistencyReport]:
"""Get the cached inconsistency report for a specific agent."""
return self._inconsistencies.get(agent_name)
# ─── Consensus ──────────────────────────────────────────
def trust_consensus(self, agent_name: str, current_time: float = None) -> dict:
"""
Compute trust consensus across all repos for an agent.
Returns:
dict with 'local', 'foreign', 'fleet', 'sources', 'consensus_score'
"""
local = self._get_local_trust(agent_name)
foreign = self.foreign_trust(agent_name, current_time)
fleet = self.fleet_composite_trust(agent_name, current_time)
attestations = self._foreign_attestations.get(agent_name, [])
now = current_time or time.time()
sources = {
att.issuer_repo: {
"composite": round(att.composite, 4),
"age_days": round(att.age_days(now), 1),
"decayed_weight": round(att.decayed_weight(now), 4),
"meaningful": att.is_meaningful,
}
for att in attestations
if not att.is_expired(now)
}
sources[self.local_repo] = {
"composite": round(local, 4),
"age_days": 0.0,
"decayed_weight": 1.0,
"meaningful": True,
}
# Consensus score: how much agreement is there between sources?
all_scores = [s["composite"] for s in sources.values()]
if len(all_scores) >= 2:
mean = sum(all_scores) / len(all_scores)
variance = sum((s - mean) ** 2 for s in all_scores) / len(all_scores)
std_dev = math.sqrt(variance)
# Consensus is inverse of normalized std dev (1 = perfect agreement)
consensus_score = max(0.0, 1.0 - std_dev / 0.5) # 0.5 = moderate disagreement
else:
consensus_score = 1.0 # No disagreement possible with one source
return {
"agent_name": agent_name,
"local_trust": round(local, 4),
"foreign_trust": round(foreign, 4),
"fleet_trust": round(fleet, 4),
"source_count": len(sources),
"sources": sources,
"consensus_score": round(consensus_score, 4),
}
# ─── Maintenance ───────────────────────────────────────
def prune_stale_attestations(self, current_time: float = None) -> int:
"""Remove expired attestations from the cache. Returns count removed."""
now = current_time or time.time()
removed = 0
for agent_name in list(self._foreign_attestations.keys()):
before = len(self._foreign_attestations[agent_name])
self._foreign_attestations[agent_name] = [
a for a in self._foreign_attestations[agent_name]
if not a.is_expired(now) and a.age_seconds(now) <= ATTESTATION_MAX_AGE
]
after = len(self._foreign_attestations[agent_name])
removed += before - after
# Clean up empty entries
if not self._foreign_attestations[agent_name]:
del self._foreign_attestations[agent_name]
# Prune old fingerprints (keep for ATTESTATION_MAX_AGE + buffer)
fp_cutoff = now - ATTESTATION_MAX_AGE - (7 * 86400)
stale_fps = [fp for fp, ts in self._seen_fingerprints.items() if ts < fp_cutoff]
for fp in stale_fps:
del self._seen_fingerprints[fp]
return removed
def agents_with_foreign_trust(self) -> List[str]:
"""Get list of agents that have foreign attestations."""
return list(self._foreign_attestations.keys())
def stats(self) -> dict:
"""Bridge statistics."""
total_attestations = sum(
len(atts) for atts in self._foreign_attestations.values()
)
return {
"local_repo": self.local_repo,
"import_factor": self.import_factor,
"agents_with_foreign_trust": len(self._foreign_attestations),
"total_foreign_attestations": total_attestations,
"total_imports": self._import_count,
"replays_detected": self._replay_count,
"invalid_attestations": self._invalid_count,
"unique_fingerprints_tracked": len(self._seen_fingerprints),
"inconsistencies_detected": len(self._inconsistencies),
"inconsistencies_flagged": sum(
1 for r in self._inconsistencies.values() if r.flagged
),
}
# ─── Serialization ─────────────────────────────────────
def to_dict(self) -> dict:
"""Serialize the bridge state to a dictionary."""
return {
"local_repo": self.local_repo,
"import_factor": self.import_factor,
"foreign_attestations": {
agent: [att.to_dict() for att in atts]
for agent, atts in self._foreign_attestations.items()
},
"stats": self.stats(),
}
@classmethod
def from_dict(cls, data: dict, trust_getter: Callable = None) -> 'FleetTrustBridge':
"""Deserialize a bridge from a dictionary."""
bridge = cls(
local_repo=data.get("local_repo", "local"),
import_factor=data.get("import_factor", DEFAULT_IMPORT_FACTOR),
trust_getter=trust_getter,
)
for agent, att_list in data.get("foreign_attestations", {}).items():
bridge._foreign_attestations[agent] = [
TrustAttestation.from_dict(att_data) for att_data in att_list
]
# Rebuild fingerprint registry
for att_list in bridge._foreign_attestations.values():
for att in att_list:
if att.fingerprint:
bridge._seen_fingerprints[att.fingerprint] = att.issued_at
return bridge
# ═══════════════════════════════════════════════════════════════
# 3. TrustPropagationGraph — Cross-Fleet Trust Network
# ═══════════════════════════════════════════════════════════════
@dataclass
class TrustEdge:
"""
A directed trust relationship between two agents.
trust_value is the source agent's trust in the target agent.
This uses Subjective Logic: the trust value is the belief component
of the opinion triplet (b, d, u) where b + d + u = 1.
For simplification, we store a single trust value [0, 1] which
represents the belief. Disbelief and uncertainty are derived:
d = (1 - b) * 0.5
u = (1 - b) * 0.5
"""
source: str
target: str
trust_value: float
weight: float = 1.0 # edge weight for path finding
repo: str = "" # which repo this edge originates from
created_at: float = field(default_factory=time.time)
@property
def belief(self) -> float:
"""Subjective Logic belief component."""
return max(0.0, min(1.0, self.trust_value))
@property
def disbelief(self) -> float:
"""Subjective Logic disbelief component (derived)."""
return (1.0 - self.belief) * 0.5
@property
def uncertainty(self) -> float:
"""Subjective Logic uncertainty component (derived)."""
return (1.0 - self.belief) * 0.5
@property
def opinion(self) -> Tuple[float, float, float]:
"""Subjective Logic triplet (belief, disbelief, uncertainty)."""
return (self.belief, self.disbelief, self.uncertainty)
def to_dict(self) -> dict:
return {
"source": self.source,
"target": self.target,
"trust_value": round(self.trust_value, 4),
"weight": round(self.weight, 4),
"repo": self.repo,
"created_at": self.created_at,
"opinion": {
"belief": round(self.belief, 4),
"disbelief": round(self.disbelief, 4),
"uncertainty": round(self.uncertainty, 4),
},
}
@classmethod
def from_dict(cls, data: dict) -> 'TrustEdge':
return cls(
source=data["source"],
target=data["target"],
trust_value=data.get("trust_value", BASE_TRUST),
weight=data.get("weight", 1.0),
repo=data.get("repo", ""),
created_at=data.get("created_at", time.time()),
)
class TrustPropagationGraph:
"""