Skip to content

Commit 68bbdb3

Browse files
committed
redis loader: Fix reorg handling when using string data structure
When data_structure='string', batch IDs are stored inside JSON values rather than as hash fields. The reorg handler now checks the data structure and uses GET+JSON parse for strings, HGET for hashes.
1 parent d58af45 commit 68bbdb3

File tree

1 file changed

+17
-2
lines changed

1 file changed

+17
-2
lines changed

src/amp/loaders/implementations/redis_loader.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -796,8 +796,23 @@ def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str,
796796
if key_str.startswith('block_index:'):
797797
continue
798798

799-
# Get batch_id from the hash
800-
batch_id_value = self.redis_client.hget(key, '_amp_batch_id')
799+
# Get batch_id - handle both hash and string data structures
800+
batch_id_value = None
801+
if self.config.data_structure == 'string':
802+
# For string data structure, parse JSON to get _amp_batch_id
803+
value = self.redis_client.get(key)
804+
if value:
805+
try:
806+
import json
807+
808+
data = json.loads(value.decode('utf-8') if isinstance(value, bytes) else value)
809+
batch_id_value = data.get('_amp_batch_id')
810+
except (json.JSONDecodeError, KeyError):
811+
pass
812+
else:
813+
# For hash data structure, use HGET
814+
batch_id_value = self.redis_client.hget(key, '_amp_batch_id')
815+
801816
if batch_id_value:
802817
batch_id_str = (
803818
batch_id_value.decode('utf-8') if isinstance(batch_id_value, bytes) else str(batch_id_value)

0 commit comments

Comments
 (0)