Skip to content

Commit 82ebd6b

Browse files
committed
redis loader: Fix reorg handling when using string data structure
When data_structure='string', batch IDs are stored inside JSON values rather than as hash fields. The reorg handler now checks the data structure and uses GET+JSON parse for strings, HGET for hashes.
1 parent f63cc7c commit 82ebd6b

File tree

1 file changed

+16
-2
lines changed

1 file changed

+16
-2
lines changed

src/amp/loaders/implementations/redis_loader.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -796,8 +796,22 @@ def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str,
796796
if key_str.startswith('block_index:'):
797797
continue
798798

799-
# Get batch_id from the hash
800-
batch_id_value = self.redis_client.hget(key, '_amp_batch_id')
799+
# Get batch_id - handle both hash and string data structures
800+
batch_id_value = None
801+
if self.config.data_structure == 'string':
802+
# For string data structure, parse JSON to get _amp_batch_id
803+
value = self.redis_client.get(key)
804+
if value:
805+
try:
806+
import json
807+
data = json.loads(value.decode('utf-8') if isinstance(value, bytes) else value)
808+
batch_id_value = data.get('_amp_batch_id')
809+
except (json.JSONDecodeError, KeyError):
810+
pass
811+
else:
812+
# For hash data structure, use HGET
813+
batch_id_value = self.redis_client.hget(key, '_amp_batch_id')
814+
801815
if batch_id_value:
802816
batch_id_str = (
803817
batch_id_value.decode('utf-8') if isinstance(batch_id_value, bytes) else str(batch_id_value)

0 commit comments

Comments
 (0)