Skip to content

Commit ff021e5

Browse files
committed
fix(upgrade): avoid reading the entire chunk for split tasks
1 parent 327a360 commit ff021e5

3 files changed

Lines changed: 36 additions & 27 deletions

File tree

pychunkedgraph/ingest/cluster.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def _post_task_completion(
4545
chunk_str += f"_{split}"
4646
# mark chunk as completed - "c"
4747
imanager.redis.sadd(f"{layer}c", chunk_str)
48+
logging.info(f"{chunk_str} marked as complete")
4849

4950

5051
def create_parent_chunk(
@@ -197,6 +198,8 @@ def _queue_tasks(imanager: IngestionManager, chunk_fn: Callable, coords: Iterabl
197198
q = imanager.get_task_queue(queue_name)
198199
batch_size = int(environ.get("JOB_BATCH_SIZE", 10000))
199200
batches = chunked(coords, batch_size)
201+
retry = int(environ.get("RETRY_COUNT", 0))
202+
failure_ttl = int(environ.get("FAILURE_TTL", 300))
200203
for batch in batches:
201204
_coords = get_chunks_not_done(imanager, 2, batch)
202205
# buffer for optimal use of redis memory
@@ -214,7 +217,9 @@ def _queue_tasks(imanager: IngestionManager, chunk_fn: Callable, coords: Iterabl
214217
timeout=environ.get("L2JOB_TIMEOUT", "3m"),
215218
result_ttl=0,
216219
job_id=chunk_id_str(2, chunk_coord),
217-
retry=Retry(int(environ.get("RETRY_COUNT", 1))),
220+
retry=Retry(retry) if retry > 1 else None,
221+
description="",
222+
failure_ttl=failure_ttl
218223
)
219224
)
220225
q.enqueue_many(job_datas)

pychunkedgraph/ingest/upgrade/parent_layer.py

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from pychunkedgraph.graph import ChunkedGraph, edges
1515
from pychunkedgraph.graph.attributes import Connectivity, Hierarchy
16-
from pychunkedgraph.graph.utils import serializers
16+
from pychunkedgraph.graph.utils import serializers, basetypes
1717
from pychunkedgraph.graph.types import empty_2d
1818
from pychunkedgraph.utils.general import chunked
1919

@@ -31,7 +31,10 @@ def _populate_nodes_and_children(
3131
) -> dict:
3232
global CHILDREN
3333
if nodes:
34-
CHILDREN = cg.get_children(nodes)
34+
children_map = cg.get_children(nodes)
35+
for k, v in children_map.items():
36+
if len(v):
37+
CHILDREN[k] = v
3538
return
3639
response = cg.range_read_chunk(chunk_id, properties=Hierarchy.Child)
3740
for k, v in response.items():
@@ -188,6 +191,17 @@ def _update_cross_edges_helper(args):
188191
gc.collect()
189192

190193

194+
def _get_split_nodes(
195+
cg: ChunkedGraph, chunk_id: basetypes.CHUNK_ID, split: int, splits: int
196+
):
197+
max_id = cg.client.get_max_node_id(chunk_id)
198+
total = max_id - chunk_id
199+
split_size = int(ceil(total / splits))
200+
start = int(chunk_id + np.uint64(split * split_size))
201+
end = int(start + split_size)
202+
return range(int(start), int(end))
203+
204+
191205
def update_chunk(
192206
cg: ChunkedGraph,
193207
chunk_coords: list[int],
@@ -204,23 +218,12 @@ def update_chunk(
204218
x, y, z = chunk_coords
205219
chunk_id = cg.get_chunk_id(layer=layer, x=x, y=y, z=z)
206220

207-
_populate_nodes_and_children(cg, chunk_id, nodes=nodes)
208-
logging.info(f"_populate_nodes_and_children: {time.time() - start}")
209-
if not CHILDREN:
210-
return
211-
212-
allnodes = list(CHILDREN.keys())
213221
if splits is not None:
214-
nodes = []
215-
split_size = int(ceil(len(allnodes) / splits))
216-
split_nodes = chunked(allnodes, split_size)
217-
for i, _nodes in enumerate(split_nodes):
218-
if i == split:
219-
nodes = list(_nodes)
220-
break
221-
else:
222-
nodes = allnodes
222+
nodes = _get_split_nodes(cg, chunk_id, split, splits)
223223

224+
_populate_nodes_and_children(cg, chunk_id, nodes=nodes)
225+
logging.info(f"_populate_nodes_and_children: {time.time() - start}")
226+
nodes = list(CHILDREN.keys())
224227
if len(nodes) == 0:
225228
return
226229

@@ -267,4 +270,3 @@ def update_chunk(
267270
)
268271
)
269272
logging.info(f"total elaspsed time: {time.time() - start}")
270-
gc.collect()

pychunkedgraph/ingest/utils.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import logging
44
import functools
5-
import math, sys
5+
import math, random, sys
66
from os import environ
77
from time import sleep
88
from typing import Any, Generator, Tuple
@@ -154,16 +154,12 @@ def print_status(imanager: IngestionManager, redis, upgrade: bool = False):
154154
def _refresh_status():
155155
pipeline = redis.pipeline()
156156
pipeline.get(r_keys.JOB_TYPE)
157-
worker_busy = []
157+
worker_busy = ["-"] * len(layers)
158158
for layer in layers:
159159
pipeline.scard(f"{layer}c")
160160
queue = Queue(f"l{layer}", connection=redis)
161161
pipeline.llen(queue.key)
162162
pipeline.zcard(queue.failed_job_registry.key)
163-
workers = Worker.all(queue=queue)
164-
worker_busy.append(
165-
sum([w.get_state() == WorkerStatus.BUSY for w in workers])
166-
)
167163

168164
results = pipeline.execute()
169165
job_type = "not_available"
@@ -218,6 +214,7 @@ def queue_layer_helper(
218214
batch_size = int(environ.get("JOB_BATCH_SIZE", 10000))
219215
timeout_scale = int(environ.get("TIMEOUT_SCALE_FACTOR", 1))
220216
batches = chunked(chunk_coords, batch_size)
217+
failure_ttl = int(environ.get("FAILURE_TTL", 300))
221218
for batch in batches:
222219
_coords = get_chunks_not_done(imanager, parent_layer, batch, splits=splits)
223220
# buffer for optimal use of redis memory
@@ -227,6 +224,7 @@ def queue_layer_helper(
227224
sleep(interval)
228225

229226
job_datas = []
227+
retry = int(environ.get("RETRY_COUNT", 0))
230228
for chunk_coord in _coords:
231229
if splits > 0:
232230
coord, split = chunk_coord
@@ -238,7 +236,9 @@ def queue_layer_helper(
238236
result_ttl=0,
239237
job_id=jid,
240238
timeout=f"{timeout_scale * int(parent_layer * parent_layer)}m",
241-
retry=Retry(int(environ.get("RETRY_COUNT", 1))),
239+
retry=Retry(retry) if retry > 1 else None,
240+
description="",
241+
failure_ttl=failure_ttl,
242242
)
243243
)
244244
else:
@@ -249,7 +249,9 @@ def queue_layer_helper(
249249
result_ttl=0,
250250
job_id=chunk_id_str(parent_layer, chunk_coord),
251251
timeout=f"{timeout_scale * int(parent_layer * parent_layer)}m",
252-
retry=Retry(int(environ.get("RETRY_COUNT", 1))),
252+
retry=Retry(retry) if retry > 1 else None,
253+
description="",
254+
failure_ttl=failure_ttl,
253255
)
254256
)
255257
q.enqueue_many(job_datas)

0 commit comments

Comments
 (0)