Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ venv.bak/


# local dev stuff
.claude/
.devcontainer/
*.ipynb
*.rdb
Expand Down
28 changes: 25 additions & 3 deletions pychunkedgraph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,26 @@
"ignore", message="Schema id not specified", module="python_jsonschema_objects"
)

# Custom log level between INFO (20) and WARNING (30)
# Use logger.notice() for pychunkedgraph logs that should always show
# even when third-party INFO is suppressed
NOTICE = 25
stdlib_logging.addLevelName(NOTICE, "NOTICE")


class PCGLogger(stdlib_logging.Logger):
def note(self, message, *args, **kwargs):
if self.isEnabledFor(NOTICE):
self._log(NOTICE, message, args, stacklevel=2, **kwargs)


stdlib_logging.setLoggerClass(PCGLogger)


def get_logger(name: str) -> PCGLogger:
return stdlib_logging.getLogger(name) # type: ignore[return-value]


# Export logging levels for convenience
DEBUG = stdlib_logging.DEBUG
INFO = stdlib_logging.INFO
Expand Down Expand Up @@ -36,7 +56,7 @@ def configure_logging(level=stdlib_logging.INFO, format_str=None, stream=None):
pychunkedgraph.configure_logging(pychunkedgraph.DEBUG) # Enable DEBUG level
"""
if format_str is None:
format_str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
format_str = "%(asctime)s %(module)s:%(funcName)s:%(lineno)d %(message)s"
if stream is None:
stream = sys.stdout

Expand All @@ -54,10 +74,12 @@ def configure_logging(level=stdlib_logging.INFO, format_str=None, stream=None):

handler = stdlib_logging.StreamHandler(stream)
handler.setLevel(level)
handler.setFormatter(stdlib_logging.Formatter(format_str))
formatter = stdlib_logging.Formatter(format_str)
formatter.default_msec_format = "%s.%03d"
handler.setFormatter(formatter)
logger.addHandler(handler)

return logger


configure_logging()
configure_logging(level=NOTICE)
5 changes: 5 additions & 0 deletions pychunkedgraph/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from flask_cors import CORS
from rq import Queue

from pychunkedgraph import NOTICE, configure_logging
from pychunkedgraph.logging import jsonformatter

from . import config
Expand Down Expand Up @@ -99,6 +100,10 @@ def configure_app(app):
app.logger.setLevel(app.config["LOGGING_LEVEL"])
app.logger.propagate = False

# Ensure pychunkedgraph logger always works at NOTICE level
# regardless of app config or environment log level
configure_logging(level=NOTICE)

if app.config["USE_REDIS_JOBS"]:
app.redis = redis.Redis.from_url(app.config["REDIS_URL"])
app.test_q = Queue("test", connection=app.redis)
Expand Down
5 changes: 5 additions & 0 deletions pychunkedgraph/app/app_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from pychunkedgraph.graph import ChunkedGraph
from pychunkedgraph.graph import get_default_client_info
from pychunkedgraph.graph import exceptions as cg_exceptions
from pychunkedgraph.graph.utils.generic import lookup_svs_from_seg

PCG_CACHE = {}

Expand Down Expand Up @@ -238,6 +239,10 @@ def ccs(coordinates_nm_):
f"{coordinates} - Validation stage."
)

# Fast path: all node_ids are L1 and OCDBT — single seg read for all coords
if cg.meta.ocdbt_seg and np.all(cg.get_chunk_layers(np.unique(node_ids)) == 1):
return lookup_svs_from_seg(cg.meta, coordinates)

atomic_ids = np.zeros(len(coordinates), dtype=np.uint64)
for node_id in np.unique(node_ids):
node_id_m = node_ids == node_id
Expand Down
103 changes: 70 additions & 33 deletions pychunkedgraph/app/segmentation/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,22 @@

import numpy as np
import pandas as pd
import fastremap
from flask import current_app, g, jsonify, make_response, request
from pytz import UTC

from pychunkedgraph import __version__
from pychunkedgraph.app import app_utils
from pychunkedgraph.graph import (
attributes,
cutting,
segmenthistory,
)
from pychunkedgraph.graph import attributes, cutting, segmenthistory, ChunkedGraph
from pychunkedgraph.graph import (
edges as cg_edges,
)
from pychunkedgraph.graph import (
exceptions as cg_exceptions,
)
from pychunkedgraph.graph.analysis import pathing
from pychunkedgraph.graph.attributes import OperationLogs
from pychunkedgraph.graph.edits_sv import split_supervoxel
from pychunkedgraph.graph.misc import get_contact_sites
from pychunkedgraph.graph.operation import GraphEditOperation
from pychunkedgraph.graph import basetypes
Expand Down Expand Up @@ -385,7 +384,6 @@ def handle_merge(table_id, allow_same_segment_merge=False):
source_coords=coords[:1],
sink_coords=coords[1:],
allow_same_segment_merge=allow_same_segment_merge,
do_sanity_check=True,
)

except cg_exceptions.LockingError as e:
Expand All @@ -396,7 +394,7 @@ def handle_merge(table_id, allow_same_segment_merge=False):
current_app.operation_id = ret.operation_id
if ret.new_root_ids is None:
raise cg_exceptions.InternalServerError(
"Could not merge selected " "supervoxel."
f"{ret.operation_id}: Could not merge selected supervoxels."
)

current_app.logger.debug(("lvl2_nodes:", ret.new_lvl2_ids))
Expand All @@ -410,24 +408,9 @@ def handle_merge(table_id, allow_same_segment_merge=False):
### SPLIT ----------------------------------------------------------------------


def handle_split(table_id):
current_app.table_id = table_id
user_id = str(g.auth_user.get("id", current_app.user_id))

data = json.loads(request.data)
is_priority = request.args.get("priority", True, type=str2bool)
remesh = request.args.get("remesh", True, type=str2bool)
mincut = request.args.get("mincut", True, type=str2bool)

current_app.logger.debug(data)

# Call ChunkedGraph
cg = app_utils.get_cg(table_id, skip_cache=True)
def _get_sources_and_sinks(cg: ChunkedGraph, data):
node_idents = []
node_ident_map = {
"sources": 0,
"sinks": 1,
}
node_ident_map = {"sources": 0, "sinks": 1}
coords = []
node_ids = []

Expand All @@ -440,20 +423,74 @@ def handle_split(table_id):
node_ids = np.array(node_ids, dtype=np.uint64)
coords = np.array(coords)
node_idents = np.array(node_idents)

sv_ids = app_utils.handle_supervoxel_id_lookup(cg, coords, node_ids)
current_app.logger.debug(
{"node_id": node_ids, "sv_id": sv_ids, "node_ident": node_idents}
)
source_ids = sv_ids[node_idents == 0]
sink_ids = sv_ids[node_idents == 1]
source_coords = coords[node_idents == 0]
sink_coords = coords[node_idents == 1]
return (source_ids, sink_ids, source_coords, sink_coords)


def handle_split(table_id):
current_app.table_id = table_id
user_id = str(g.auth_user.get("id", current_app.user_id))

data = json.loads(request.data)
is_priority = request.args.get("priority", True, type=str2bool)
remesh = request.args.get("remesh", True, type=str2bool)
mincut = request.args.get("mincut", True, type=str2bool)

cg = app_utils.get_cg(table_id, skip_cache=True)
current_app.logger.debug(data)
sources, sinks, source_coords, sink_coords = _get_sources_and_sinks(cg, data)
current_app.logger.info(f"sv_lookup pre-split: sources={sources}, sinks={sinks}")
try:
ret = cg.remove_edges(
user_id=user_id,
source_ids=sv_ids[node_idents == 0],
sink_ids=sv_ids[node_idents == 1],
source_coords=coords[node_idents == 0],
sink_coords=coords[node_idents == 1],
source_ids=sources,
sink_ids=sinks,
source_coords=source_coords,
sink_coords=sink_coords,
mincut=mincut,
)
except cg_exceptions.SupervoxelSplitRequiredError as e:
current_app.logger.info(e)
sources_remapped = fastremap.remap(
sources,
e.sv_remapping,
preserve_missing_labels=True,
in_place=False,
)
sinks_remapped = fastremap.remap(
sinks,
e.sv_remapping,
preserve_missing_labels=True,
in_place=False,
)
overlap_mask = np.isin(sources_remapped, sinks_remapped)
for sv_to_split in np.unique(sources_remapped[overlap_mask]):
_mask0 = sources_remapped == sv_to_split
_mask1 = sinks_remapped == sv_to_split
split_supervoxel(
cg,
sources[_mask0][0],
source_coords[_mask0],
sink_coords[_mask1],
e.operation_id,
)

sources, sinks, source_coords, sink_coords = _get_sources_and_sinks(cg, data)
current_app.logger.info(
f"sv_lookup post-split: sources={sources}, sinks={sinks}"
)
ret = cg.remove_edges(
user_id=user_id,
source_ids=sources,
sink_ids=sinks,
source_coords=source_coords,
sink_coords=sink_coords,
mincut=mincut,
do_sanity_check=True,
)
except cg_exceptions.LockingError as e:
raise cg_exceptions.InternalServerError(e)
Expand All @@ -463,7 +500,7 @@ def handle_split(table_id):
current_app.operation_id = ret.operation_id
if ret.new_root_ids is None:
raise cg_exceptions.InternalServerError(
"Could not split selected segment groups."
f"{ret.operation_id}: Could not split selected segment groups."
)

current_app.logger.debug(("after split:", ret.new_root_ids))
Expand Down
55 changes: 41 additions & 14 deletions pychunkedgraph/graph/chunkedgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,21 @@ def get_atomic_ids_from_coords(
:param max_dist_nm: max distance explored
:return: supervoxel ids; returns None if no solution was found
"""
if self.get_chunk_layer(parent_id) == 1:
if self.get_chunk_layer(parent_id) == 1 and not self.meta.ocdbt_seg:
return np.array([parent_id] * len(coordinates), dtype=np.uint64)

# Enable search with old parent by using its timestamp and map to parents
parent_ts = self.get_node_timestamps([parent_id], return_numpy=False)[0]
layer = self.get_chunk_layer(parent_id)
# L1 nodes don't have children, skip timestamp lookup
parent_ts = (
None
if layer == 1
else self.get_node_timestamps([parent_id], return_numpy=False)[0]
)
return id_helpers.get_atomic_ids_from_coords(
self.meta,
coordinates,
parent_id,
self.get_chunk_layer(parent_id),
layer,
parent_ts,
self.get_roots,
max_dist_nm,
Expand Down Expand Up @@ -672,22 +677,44 @@ def get_subgraph_leaves(
self, node_id_or_ids, bbox, bbox_is_coordinate, False, True
)

def get_fake_edges(
def get_edges_from_edits(
self, chunk_ids: np.ndarray, time_stamp: datetime.datetime = None
) -> typing.Dict:
"""
Edges stored within a pcg that were created as a result of edits.
Either 'fake' edges that were adding for a merge edit;
Or 'split' edges resulting from a supervoxel split.
"""
result = {}
fake_edges_d = self.client.read_nodes(
properties = [
attributes.Connectivity.FakeEdges,
attributes.Connectivity.SplitEdges,
attributes.Connectivity.Affinity,
attributes.Connectivity.Area,
]
_edges_d = self.client.read_nodes(
node_ids=chunk_ids,
properties=attributes.Connectivity.FakeEdges,
properties=properties,
end_time=time_stamp,
end_time_inclusive=True,
fake_edges=True,
)
for id_, val in fake_edges_d.items():
edges = np.concatenate(
[np.asarray(e.value, dtype=basetypes.NODE_ID) for e in val]
)
result[id_] = Edges(edges[:, 0], edges[:, 1])
for id_, val in _edges_d.items():
edges = val.get(attributes.Connectivity.FakeEdges, [])
edges = np.concatenate([types.empty_2d, *[e.value for e in edges]])
fake_edges_ = Edges(edges[:, 0], edges[:, 1])

edges = val.get(attributes.Connectivity.SplitEdges, [])
edges = np.concatenate([types.empty_2d, *[e.value for e in edges]])

aff = val.get(attributes.Connectivity.Affinity, [])
aff = np.concatenate([types.empty_affinities, *[e.value for e in aff]])

areas = val.get(attributes.Connectivity.Area, [])
areas = np.concatenate([types.empty_areas, *[e.value for e in areas]])
split_edges_ = Edges(edges[:, 0], edges[:, 1], affinities=aff, areas=areas)

result[id_] = fake_edges_ + split_edges_
return result

def copy_fake_edges(self, chunk_id: np.uint64) -> None:
Expand Down Expand Up @@ -726,10 +753,10 @@ def get_l2_agglomerations(
if self.mock_edges is None:
edges_d = self.read_chunk_edges(chunk_ids)

fake_edges = self.get_fake_edges(chunk_ids)
edited_edges = self.get_edges_from_edits(chunk_ids)
all_chunk_edges = reduce(
lambda x, y: x + y,
chain(edges_d.values(), fake_edges.values()),
chain(edges_d.values(), edited_edges.values()),
Edges([], []),
)
if self.mock_edges is not None:
Expand Down
Loading
Loading