11"""NEXUS Agent Runtime - Agent-Annotated Bytecode (AAB) codec.
22
33AAB format: 8-byte core instruction + variable-length TLV metadata trailer.
4- TLV tags: INTENT, CAPABILITY, SAFETY, TRUST, NARRATIVE.
4+ Each AAB instruction = [Core: 8 bytes] [TLV Block: variable]
5+ TLV format: [Tag:1][Length:2][Value:N] ... [Tag:0x00 = end]
56ESP32 receives only the stripped 8-byte core. Zero execution overhead.
67"""
78
89from __future__ import annotations
910
10- from dataclasses import dataclass
11+ import struct
12+ from dataclasses import dataclass , field
1113
14+ # ===================================================================
15+ # TLV Tag Constants (13 tags)
16+ # ===================================================================
17+
18+ TLV_TYPE_DESC = 0x01
19+ TLV_CAP_REQ = 0x02
20+ TLV_SAFETY_CONSTRAINT = 0x03
21+ TLV_TRUST_REQ = 0x04
22+ TLV_NARRATIVE = 0x05
23+ TLV_AGENT_SOURCE = 0x06
24+ TLV_INTENT_TAG = 0x07
25+ TLV_PRECONDITION = 0x08
26+ TLV_POSTCONDITION = 0x09
27+ TLV_PERFORMANCE_HINT = 0x0A
28+ TLV_DOMAIN_TAG = 0x0B
29+ TLV_VESSEL_CAP = 0x0C
30+ TLV_TRUST_MULTIPLIER = 0x0D
31+ TLV_END = 0x00
32+
33+ TLV_TAG_NAMES : dict [int , str ] = {
34+ 0x00 : "END" ,
35+ 0x01 : "TYPE_DESC" ,
36+ 0x02 : "CAP_REQ" ,
37+ 0x03 : "SAFETY_CONSTRAINT" ,
38+ 0x04 : "TRUST_REQ" ,
39+ 0x05 : "NARRATIVE" ,
40+ 0x06 : "AGENT_SOURCE" ,
41+ 0x07 : "INTENT_TAG" ,
42+ 0x08 : "PRECONDITION" ,
43+ 0x09 : "POSTCONDITION" ,
44+ 0x0A : "PERFORMANCE_HINT" ,
45+ 0x0B : "DOMAIN_TAG" ,
46+ 0x0C : "VESSEL_CAP" ,
47+ 0x0D : "TRUST_MULTIPLIER" ,
48+ }
49+
50+
51+ # ===================================================================
52+ # TLV Entry
53+ # ===================================================================
1254
1355@dataclass
1456class TLVEntry :
@@ -18,7 +60,7 @@ class TLVEntry:
1860 value : bytes
1961
2062 def encode (self ) -> bytes :
21- """Encode TLV entry to bytes."""
63+ """Encode TLV entry to bytes: [Tag:1][Length:2][Value:N] ."""
2264 return bytes ([self .tag ]) + len (self .value ).to_bytes (2 , "little" ) + self .value
2365
2466 @classmethod
@@ -29,55 +71,188 @@ def decode(cls, data: bytes, offset: int) -> tuple[TLVEntry, int]:
2971 Tuple of (TLVEntry, next_offset).
3072 """
3173 tag = data [offset ]
74+ if tag == TLV_END :
75+ return cls (tag = TLV_END , value = b"" ), offset + 1
3276 length = int .from_bytes (data [offset + 1 :offset + 3 ], "little" )
3377 value = data [offset + 3 :offset + 3 + length ]
3478 return cls (tag = tag , value = value ), offset + 3 + length
3579
80+ def __str__ (self ) -> str :
81+ name = TLV_TAG_NAMES .get (self .tag , f"UNKNOWN(0x{ self .tag :02X} )" )
82+ try :
83+ val_str = self .value .decode ("utf-8" )
84+ except UnicodeDecodeError :
85+ val_str = self .value .hex ()
86+ return f"TLV({ name } , { val_str !r} )"
3687
37- # TLV tag constants
38- TLV_INTENT = 0x01
39- TLV_CAPABILITY = 0x02
40- TLV_SAFETY = 0x03
41- TLV_TRUST = 0x04
42- TLV_NARRATIVE = 0x05
4388
89+ # ===================================================================
90+ # AAB Instruction
91+ # ===================================================================
4492
45- class AABCodec :
46- """Agent-Annotated Bytecode encoder/decoder (stub)."""
93+ @dataclass
94+ class AABInstruction :
95+ """A single AAB instruction: 8-byte core + variable TLV metadata."""
4796
48- def encode ( self , bytecode : bytes , metadata : list [ TLVEntry ]) -> bytes :
49- """Encode bytecode with AAB metadata.
97+ core : bytes # Exactly 8 bytes
98+ metadata : list [ TLVEntry ] = field ( default_factory = list )
5099
51- Args:
52- bytecode: Raw 8-byte-aligned bytecode.
53- metadata: List of TLV metadata entries.
54100
55- Returns:
56- AAB-encoded bytes.
57- """
58- # TODO: Implement AAB encoding
59- return bytecode
101+ # ===================================================================
102+ # AAB File Format Constants
103+ # ===================================================================
60104
61- def decode (self , aab_data : bytes ) -> tuple [bytes , list [TLVEntry ]]:
62- """Decode AAB data into core bytecode and metadata.
105+ AAB_MAGIC = b"NXAB"
106+ AAB_VERSION = 0x0001
107+ AAB_HEADER_SIZE = 8 # 4 magic + 2 version + 2 instruction count
63108
64- Args:
65- aab_data: AAB-encoded bytes.
66109
67- Returns:
68- Tuple of (core_bytecode, metadata_entries).
69- """
70- # TODO: Implement AAB decoding
71- return aab_data , []
110+ # ===================================================================
111+ # AAB Codec Functions
112+ # ===================================================================
72113
73- def strip ( self , aab_data : bytes ) -> bytes :
74- """Strip all TLV metadata, returning only core bytecode .
114+ def aab_encode ( instructions : list [ AABInstruction ] ) -> bytes :
115+ """Encode a list of AAB instructions into bytes .
75116
76- Args:
77- aab_data: AAB-encoded bytes.
117+ File format:
118+ Header: NXAB (4) + version (2) + count (2) = 8 bytes
119+ Body: N instructions, each = 8-byte core + variable TLV block
120+ """
121+ # Build header
122+ header = AAB_MAGIC + AAB_VERSION .to_bytes (2 , "little" ) + len (instructions ).to_bytes (2 , "little" )
78123
79- Returns:
80- Core 8-byte bytecode only.
81- """
82- core , _ = self .decode (aab_data )
83- return core
124+ # Build body
125+ body = b""
126+ for instr in instructions :
127+ if len (instr .core ) != 8 :
128+ raise ValueError (f"Core instruction must be exactly 8 bytes, got { len (instr .core )} " )
129+ body += instr .core
130+ for tlv in instr .metadata :
131+ body += tlv .encode ()
132+ body += bytes ([TLV_END ]) # End of TLV block for this instruction
133+
134+ return header + body
135+
136+
137+ def aab_decode (data : bytes ) -> list [AABInstruction ]:
138+ """Decode AAB bytes into a list of AAB instructions.
139+
140+ Returns:
141+ List of AABInstruction objects.
142+ """
143+ if len (data ) < AAB_HEADER_SIZE :
144+ raise ValueError (f"Data too short for AAB header: { len (data )} bytes" )
145+
146+ # Parse header
147+ magic = data [0 :4 ]
148+ if magic != AAB_MAGIC :
149+ raise ValueError (f"Invalid AAB magic: { magic !r} , expected { AAB_MAGIC !r} " )
150+
151+ version = int .from_bytes (data [4 :6 ], "little" )
152+ if version != AAB_VERSION :
153+ raise ValueError (f"Unsupported AAB version: { version } , expected { AAB_VERSION } " )
154+
155+ count = int .from_bytes (data [6 :8 ], "little" )
156+
157+ instructions : list [AABInstruction ] = []
158+ offset = AAB_HEADER_SIZE
159+
160+ for _ in range (count ):
161+ if offset + 8 > len (data ):
162+ raise ValueError (f"Unexpected end of data at offset { offset } , expected 8 bytes for core" )
163+
164+ core = data [offset :offset + 8 ]
165+ offset += 8
166+
167+ metadata : list [TLVEntry ] = []
168+ while offset < len (data ):
169+ tlv , offset = TLVEntry .decode (data , offset )
170+ if tlv .tag == TLV_END :
171+ break
172+ metadata .append (tlv )
173+
174+ instructions .append (AABInstruction (core = core , metadata = metadata ))
175+
176+ return instructions
177+
178+
179+ def aab_strip (data : bytes ) -> bytes :
180+ """Strip all TLV metadata, returning only raw 8-byte core instructions.
181+
182+ Args:
183+ data: AAB-encoded bytes (with header).
184+
185+ Returns:
186+ Raw 8-byte core instructions concatenated.
187+ """
188+ instructions = aab_decode (data )
189+ return b"" .join (instr .core for instr in instructions )
190+
191+
192+ # ===================================================================
193+ # Convenience helpers
194+ # ===================================================================
195+
196+ def make_tlv (tag : int , value : str | bytes | float | int ) -> TLVEntry :
197+ """Create a TLVEntry with automatic encoding."""
198+ if isinstance (value , str ):
199+ return TLVEntry (tag = tag , value = value .encode ("utf-8" ))
200+ elif isinstance (value , float ):
201+ return TLVEntry (tag = tag , value = struct .pack ("<f" , value ))
202+ elif isinstance (value , int ):
203+ return TLVEntry (tag = tag , value = value .to_bytes (4 , "little" , signed = True ))
204+ elif isinstance (value , bytes ):
205+ return TLVEntry (tag = tag , value = value )
206+ else :
207+ raise TypeError (f"Unsupported value type: { type (value )} " )
208+
209+
210+ def struct_pack_f32 (value : float ) -> bytes :
211+ """Pack a float32 value to bytes (little-endian)."""
212+ return struct .pack ("<f" , value )
213+
214+
215+ def struct_unpack_f32 (data : bytes ) -> float :
216+ """Unpack a float32 value from bytes (little-endian)."""
217+ return struct .unpack ("<f" , data )[0 ]
218+
219+
220+ def get_tlv_string (metadata : list [TLVEntry ], tag : int ) -> str | None :
221+ """Get a string value from metadata by tag."""
222+ for entry in metadata :
223+ if entry .tag == tag :
224+ try :
225+ return entry .value .decode ("utf-8" )
226+ except UnicodeDecodeError :
227+ return None
228+ return None
229+
230+
231+ def get_tlv_float (metadata : list [TLVEntry ], tag : int ) -> float | None :
232+ """Get a float value from metadata by tag."""
233+ for entry in metadata :
234+ if entry .tag == tag and len (entry .value ) == 4 :
235+ return struct_unpack_f32 (entry .value )
236+ return None
237+
238+
239+ # Keep legacy AABCodec class for backward compatibility
240+ class AABCodec :
241+ """Agent-Annotated Bytecode encoder/decoder (wraps module functions)."""
242+
243+ def encode (self , bytecode : bytes , metadata : list [TLVEntry ]) -> bytes :
244+ """Encode bytecode with AAB metadata (legacy single-instruction API)."""
245+ instructions = [AABInstruction (core = bytecode [:8 ], metadata = metadata )]
246+ return aab_encode (instructions )
247+
248+ def decode (self , aab_data : bytes ) -> tuple [bytes , list [TLVEntry ]]:
249+ """Decode AAB data into core bytecode and metadata (legacy API)."""
250+ instructions = aab_decode (aab_data )
251+ if len (instructions ) == 0 :
252+ return b"" , []
253+ instr = instructions [0 ]
254+ return instr .core , instr .metadata
255+
256+ def strip (self , aab_data : bytes ) -> bytes :
257+ """Strip all TLV metadata, returning only core bytecode."""
258+ return aab_strip (aab_data )
0 commit comments