diff --git a/xrspatial/rasterize.py b/xrspatial/rasterize.py index 65020ae0c..cfb9f930c 100644 --- a/xrspatial/rasterize.py +++ b/xrspatial/rasterize.py @@ -1250,28 +1250,105 @@ def _check_gpu_edge_cap(row_ptr): ) -def _ensure_gpu_kernels(merge_fn, should_write): +#: Built-in merges whose GPU path uses CUDA atomics so overlapping +#: writes are deterministic. User-supplied merge callables fall back to +#: the non-atomic closure path (the public ``merge_fn`` signature has no +#: slot for atomicity, and we cannot synthesise atomics for an arbitrary +#: Python function). +_GPU_ATOMIC_MERGES = ('sum', 'count', 'min', 'max', 'first', 'last') + + +def _ensure_gpu_kernels(merge_fn, should_write, merge_name=None): """Compile CUDA kernels for the given merge + predicate pair and cache. Each unique pair produces a separate set of kernels because CUDA kernels cannot accept function arguments; both functions are captured by closure at compile time. + + When ``merge_name`` is one of the built-in merges (``sum``, ``count``, + ``min``, ``max``, ``first``, ``last``), the returned kernel set uses + CUDA atomics on ``out`` / ``order`` so that overlapping point writes, + crossing line segments, and shared polygon-boundary pixels produce + deterministic results matching the CPU backend. Otherwise (callable + or unknown merge) the kernels keep the legacy non-atomic + read-modify-write path, which is racy on overlap but preserved for + user callables whose signature has no atomic equivalent. """ - key = (id(merge_fn), id(should_write)) + key = (id(merge_fn), id(should_write), merge_name) if key in _gpu_kernel_cache: return _gpu_kernel_cache[key] from numba import cuda + atomic_mode = merge_name if merge_name in _GPU_ATOMIC_MERGES else None + + # The per-pixel write strategy depends on ``atomic_mode``. Numba + # treats free Python variables in a ``cuda.jit`` closure as compile- + # time constants, so the if/elif ladder below is folded away and + # each compiled kernel contains only the relevant atomic call. + # + # Initialisation contract (launcher fills ``out`` / ``order`` before + # the kernel runs): + # sum, count -> out = 0.0 , order = MIN_INT64 + # min -> out = +inf , order = MIN_INT64 + # max -> out = -inf , order = MIN_INT64 + # first -> out = fill , order = MAX_INT64 + # last -> out = fill , order = MIN_INT64 + # None (user) -> out = fill , order = -1 (legacy) + # + # For sum/count/min/max, ``order`` is repurposed as a touched mask: + # any non-sentinel value means the pixel was written by at least + # one geometry. A host-side post-pass blends in ``fill`` where + # the pixel is still untouched, and substitutes ``fill`` for the + # numerical sentinels used by min/max init. + # + # For first/last, the kernel is two-pass: pass 1 resolves the + # winning input index atomically into ``order``; pass 2 stamps the + # winner's ``props[0]`` value into ``out``. Pass 1 and pass 2 are + # separate compiled kernels (``_..._gpu`` and ``_..._gpu_pass2``); + # ``_run_cupy`` / ``_rasterize_tile_cupy`` skip the pass-2 launch + # for all other modes. + @cuda.jit(device=True) def _apply_merge_gpu(out, written, order, r, c, props, new_idx): - is_first = np.int64(written[r, c] == 0) - cur_idx = order[r, c] - if not should_write(is_first, new_idx, cur_idx): - return - out[r, c] = merge_fn(out[r, c], props, is_first) - written[r, c] = 1 - order[r, c] = new_idx + if atomic_mode == 'sum': + cuda.atomic.add(out, (r, c), props[0]) + cuda.atomic.max(order, (r, c), new_idx) + elif atomic_mode == 'count': + cuda.atomic.add(out, (r, c), 1.0) + cuda.atomic.max(order, (r, c), new_idx) + elif atomic_mode == 'min': + cuda.atomic.min(out, (r, c), props[0]) + cuda.atomic.max(order, (r, c), new_idx) + elif atomic_mode == 'max': + cuda.atomic.max(out, (r, c), props[0]) + cuda.atomic.max(order, (r, c), new_idx) + elif atomic_mode == 'first': + # Pass 1: resolve winning input index per pixel. + cuda.atomic.min(order, (r, c), new_idx) + elif atomic_mode == 'last': + cuda.atomic.max(order, (r, c), new_idx) + else: + # Legacy non-atomic path for user callables. + is_first = np.int64(written[r, c] == 0) + cur_idx = order[r, c] + if not should_write(is_first, new_idx, cur_idx): + return + out[r, c] = merge_fn(out[r, c], props, is_first) + written[r, c] = 1 + order[r, c] = new_idx + + # Pass-2 device function for first/last: stamp the winner's value + # into ``out``. Only the geometry whose index won pass 1 writes. + # Multiple threads from the same geometry may write the same pixel, + # but they all write ``props[0]`` so the outcome is deterministic. + @cuda.jit(device=True) + def _apply_merge_gpu_pass2(out, written, order, r, c, props, new_idx): + if atomic_mode == 'first' or atomic_mode == 'last': + if order[r, c] == new_idx: + out[r, c] = props[0] + # else: pass 2 is a no-op for all other modes; the launcher + # simply skips it. @cuda.jit def _scanline_fill_gpu(out, written, order, edge_y_min, edge_x_at_ymin, @@ -1440,8 +1517,6 @@ def _burn_lines_supercover_gpu( props = geom_props[gi] new_idx = geom_global_idx[gi] - # math.floor via int() works for the float64 range we have - # here (segments are already clipped to raster bounds). cx = int(math.floor(x0)) cy = int(math.floor(y0)) end_cx = int(math.floor(x1)) @@ -1507,11 +1582,232 @@ def _burn_lines_supercover_gpu( cx += step_x cy += step_y + # ---- Pass-2 kernels (only meaningful for first / last) ----------- + # These mirror the pass-1 kernels but call ``_apply_merge_gpu_pass2`` + # to stamp the winner's value once ``order`` holds the resolved + # input index. Compiled unconditionally so the kernels dict shape + # stays uniform; for non-first/last modes the launcher does not + # invoke them. + + @cuda.jit + def _scanline_fill_gpu_pass2(out, written, order, edge_y_min, + edge_x_at_ymin, edge_inv_slope, + edge_geom_id, geom_props, geom_global_idx, + row_ptr, col_idx, width): + row = cuda.grid(1) + if row >= out.shape[0]: + return + start = row_ptr[row] + end = row_ptr[row + 1] + if end - start == 0: + return + MAX_ISECT = 2048 + xs = cuda.local.array(2048, dtype=np.float64) + gs = cuda.local.array(2048, dtype=np.int32) + actual = 0 + for k in range(start, end): + if actual >= MAX_ISECT: + break + e = col_idx[k] + xs[actual] = (edge_x_at_ymin[e] + + (row - edge_y_min[e]) * edge_inv_slope[e]) + gs[actual] = edge_geom_id[e] + actual += 1 + gap = actual >> 1 + while gap > 0: + for i in range(gap, actual): + kx = xs[i] + kg = gs[i] + j = i - gap + while j >= 0 and (gs[j] > kg or (gs[j] == kg and xs[j] > kx)): + xs[j + gap] = xs[j] + gs[j + gap] = gs[j] + j -= gap + xs[j + gap] = kx + gs[j + gap] = kg + gap >>= 1 + i = 0 + while i < actual - 1: + gid = gs[i] + new_idx = geom_global_idx[gid] + j = i + while j < actual and gs[j] == gid: + j += 1 + k = i + while k + 1 < j: + x_start = xs[k] + x_end = xs[k + 1] + ix = int(x_start) + col_start = ix + 1 if x_start > ix else ix + if col_start < 0: + col_start = 0 + ix_end = int(x_end) + col_end = ix_end if x_end > ix_end else ix_end - 1 + if col_end >= width: + col_end = width - 1 + for c in range(col_start, col_end + 1): + _apply_merge_gpu_pass2(out, written, order, row, c, + geom_props[gid], new_idx) + k += 2 + i = j + + @cuda.jit + def _burn_points_gpu_pass2(out, written, order, rows, cols, geom_idx, + geom_props, geom_global_idx, n_points): + i = cuda.grid(1) + if i >= n_points: + return + r = rows[i] + c = cols[i] + if 0 <= r < out.shape[0] and 0 <= c < out.shape[1]: + gi = geom_idx[i] + _apply_merge_gpu_pass2(out, written, order, r, c, + geom_props[gi], geom_global_idx[gi]) + + @cuda.jit + def _burn_lines_gpu_pass2(out, written, order, r0_arr, c0_arr, r1_arr, + c1_arr, geom_idx, geom_props, + geom_global_idx, n_segs, height, width): + i = cuda.grid(1) + if i >= n_segs: + return + r0 = r0_arr[i] + c0 = c0_arr[i] + r1 = r1_arr[i] + c1 = c1_arr[i] + gi = geom_idx[i] + props = geom_props[gi] + new_idx = geom_global_idx[gi] + dr = r1 - r0 + dc = c1 - c0 + sr = 1 if dr >= 0 else -1 + sc = 1 if dc >= 0 else -1 + if dr < 0: + dr = -dr + if dc < 0: + dc = -dc + if dr >= dc: + err = dc - dr + r, c = r0, c0 + for _ in range(dr + 1): + if 0 <= r < height and 0 <= c < width: + _apply_merge_gpu_pass2(out, written, order, r, c, + props, new_idx) + if err >= 0: + c += sc + err -= dr + r += sr + err += dc + else: + err = dr - dc + r, c = r0, c0 + for _ in range(dc + 1): + if 0 <= r < height and 0 <= c < width: + _apply_merge_gpu_pass2(out, written, order, r, c, + props, new_idx) + if err >= 0: + r += sr + err -= dc + c += sc + err += dr + @cuda.jit + def _burn_lines_supercover_gpu_pass2( + out, written, order, + x0_arr, y0_arr, x1_arr, y1_arr, + geom_idx, geom_props, geom_global_idx, + n_segs, height, width): + """Pass-2 supercover boundary burn for first/last modes. + + Mirrors ``_burn_lines_supercover_gpu`` but calls + ``_apply_merge_gpu_pass2`` so the winning-index resolved in + pass 1 gets stamped exactly once per pixel. + """ + i = cuda.grid(1) + if i >= n_segs: + return + x0 = x0_arr[i] + y0 = y0_arr[i] + x1 = x1_arr[i] + y1 = y1_arr[i] + gi = geom_idx[i] + props = geom_props[gi] + new_idx = geom_global_idx[gi] + + cx = int(math.floor(x0)) + cy = int(math.floor(y0)) + end_cx = int(math.floor(x1)) + end_cy = int(math.floor(y1)) + + dx = x1 - x0 + dy = y1 - y0 + + if dx > 0.0: + step_x = 1 + elif dx < 0.0: + step_x = -1 + else: + step_x = 0 + + if dy > 0.0: + step_y = 1 + elif dy < 0.0: + step_y = -1 + else: + step_y = 0 + + if dx != 0.0: + t_delta_x = 1.0 / (dx if dx > 0.0 else -dx) + else: + t_delta_x = math.inf + + if dy != 0.0: + t_delta_y = 1.0 / (dy if dy > 0.0 else -dy) + else: + t_delta_y = math.inf + + if dx > 0.0: + t_max_x = (float(cx + 1) - x0) / dx + elif dx < 0.0: + t_max_x = (float(cx) - x0) / dx + else: + t_max_x = math.inf + + if dy > 0.0: + t_max_y = (float(cy + 1) - y0) / dy + elif dy < 0.0: + t_max_y = (float(cy) - y0) / dy + else: + t_max_y = math.inf + + max_steps = abs(end_cx - cx) + abs(end_cy - cy) + 2 + for _ in range(max_steps): + if 0 <= cy < height and 0 <= cx < width: + _apply_merge_gpu_pass2(out, written, order, cy, cx, + props, new_idx) + if cx == end_cx and cy == end_cy: + break + if t_max_x < t_max_y: + t_max_x += t_delta_x + cx += step_x + elif t_max_y < t_max_x: + t_max_y += t_delta_y + cy += step_y + else: + t_max_x += t_delta_x + t_max_y += t_delta_y + cx += step_x + cy += step_y + kernels = { 'scanline_fill': _scanline_fill_gpu, 'burn_points': _burn_points_gpu, 'burn_lines': _burn_lines_gpu, 'burn_lines_supercover': _burn_lines_supercover_gpu, + 'scanline_fill_pass2': _scanline_fill_gpu_pass2, + 'burn_points_pass2': _burn_points_gpu_pass2, + 'burn_lines_pass2': _burn_lines_gpu_pass2, + 'burn_lines_supercover_pass2': _burn_lines_supercover_gpu_pass2, + 'atomic_mode': atomic_mode, } _gpu_kernel_cache[key] = kernels return kernels @@ -1574,14 +1870,92 @@ def _build_row_csr_numba(edge_y_min, edge_y_max, height): return row_ptr, col_idx +#: Sentinels used to mark "untouched" pixels in the GPU ``order`` array. +#: Numba's ``cuda.atomic.{min,max}`` on int64 accepts standard limits. +_GPU_ORDER_SENTINEL_MIN = np.int64(np.iinfo(np.int64).min) +_GPU_ORDER_SENTINEL_MAX = np.int64(np.iinfo(np.int64).max) + + +def _gpu_init_buffers(height, width, fill, merge_name): + """Allocate ``out`` / ``written`` / ``order`` with the right initial + values for the selected atomic merge. Returns the trio plus the + sentinel value used in ``order`` so the caller can blend ``fill`` + back in afterwards. + + The atomic kernels never consult ``written`` (the legacy + user-callable path is the only consumer), so atomic modes get a + placeholder ``(1, 1)`` buffer to satisfy the kernel signature + without spending ``H*W`` bytes on dead storage. + """ + if merge_name in _GPU_ATOMIC_MERGES: + written = cupy.zeros((1, 1), dtype=cupy.int8) + else: + written = cupy.zeros((height, width), dtype=cupy.int8) + if merge_name in ('sum', 'count'): + out = cupy.zeros((height, width), dtype=cupy.float64) + order = cupy.full((height, width), _GPU_ORDER_SENTINEL_MIN, + dtype=cupy.int64) + order_sentinel = _GPU_ORDER_SENTINEL_MIN + elif merge_name == 'min': + out = cupy.full((height, width), cupy.inf, dtype=cupy.float64) + order = cupy.full((height, width), _GPU_ORDER_SENTINEL_MIN, + dtype=cupy.int64) + order_sentinel = _GPU_ORDER_SENTINEL_MIN + elif merge_name == 'max': + out = cupy.full((height, width), -cupy.inf, dtype=cupy.float64) + order = cupy.full((height, width), _GPU_ORDER_SENTINEL_MIN, + dtype=cupy.int64) + order_sentinel = _GPU_ORDER_SENTINEL_MIN + elif merge_name == 'first': + out = cupy.full((height, width), fill, dtype=cupy.float64) + order = cupy.full((height, width), _GPU_ORDER_SENTINEL_MAX, + dtype=cupy.int64) + order_sentinel = _GPU_ORDER_SENTINEL_MAX + elif merge_name == 'last': + out = cupy.full((height, width), fill, dtype=cupy.float64) + order = cupy.full((height, width), _GPU_ORDER_SENTINEL_MIN, + dtype=cupy.int64) + order_sentinel = _GPU_ORDER_SENTINEL_MIN + else: + # User callable path: legacy initialisation, non-atomic semantics. + out = cupy.full((height, width), fill, dtype=cupy.float64) + order = cupy.full((height, width), -1, dtype=cupy.int64) + order_sentinel = None + return out, written, order, order_sentinel + + +def _gpu_finalize_buffers(out, order, order_sentinel, fill, merge_name): + """Apply the post-pass blend that restores ``fill`` for untouched + pixels (sum/count/min/max/first/last) and replaces the +/- inf + initialisers used by min/max. + + Returns ``out`` (modified in-place where convenient). + """ + if merge_name is None or order_sentinel is None: + return out + touched = order != order_sentinel + if merge_name in ('sum', 'count'): + # Pixels with no contribution should show ``fill``, not 0. + out = cupy.where(touched, out, cupy.float64(fill)) + elif merge_name == 'min': + # +inf initialiser leaks into untouched pixels. + out = cupy.where(touched, out, cupy.float64(fill)) + elif merge_name == 'max': + out = cupy.where(touched, out, cupy.float64(fill)) + # first/last already initialise ``out`` to ``fill``; pass-2 only + # writes for touched pixels, so no blend is needed. + return out + + def _run_cupy(geometries, props_array, bounds, height, width, fill, dtype, - all_touched, merge_fn, should_write): + all_touched, merge_fn, should_write, merge_name=None): """CuPy backend for rasterize.""" - kernels = _ensure_gpu_kernels(merge_fn, should_write) + kernels = _ensure_gpu_kernels(merge_fn, should_write, merge_name) + atomic_mode = kernels['atomic_mode'] + two_pass = atomic_mode in ('first', 'last') - out = cupy.full((height, width), fill, dtype=cupy.float64) - written = cupy.zeros((height, width), dtype=cupy.int8) - order = cupy.full((height, width), -1, dtype=cupy.int64) + out, written, order, order_sentinel = _gpu_init_buffers( + height, width, fill, atomic_mode) (poly_geoms, poly_props, poly_ids, poly_global), \ (line_geoms, line_props, line_global), \ @@ -1595,80 +1969,109 @@ def _run_cupy(geometries, props_array, bounds, height, width, fill, dtype, edge_y_min, edge_y_max, edge_x_at_ymin, edge_inv_slope, \ edge_geom_id = edge_arrays + # Stage the per-launch inputs once; first/last reuses these tensors + # for the pass-2 stamp launches. + poly_launch = None if len(edge_y_min) > 0: - # Build CSR structure on CPU, then transfer to GPU row_ptr, col_idx = _build_row_csr_numba( edge_y_min, edge_y_max, height) - # The CUDA scanline kernel allocates a fixed-size local array; rows # exceeding _GPU_MAX_ISECT would have edges silently dropped and # produce wrong output. Fail fast instead. _check_gpu_edge_cap(row_ptr) - - d_y_min = cupy.asarray(edge_y_min) - d_x_at_ymin = cupy.asarray(edge_x_at_ymin) - d_inv_slope = cupy.asarray(edge_inv_slope) - d_geom_id = cupy.asarray(edge_geom_id) - d_geom_props = cupy.asarray(poly_props) - d_poly_global = cupy.asarray(poly_global) - d_row_ptr = cupy.asarray(row_ptr) - d_col_idx = cupy.asarray(col_idx) - - tpb = 256 - blocks = (height + tpb - 1) // tpb - kernels['scanline_fill'][blocks, tpb]( - out, written, order, d_y_min, d_x_at_ymin, d_inv_slope, - d_geom_id, d_geom_props, d_poly_global, - d_row_ptr, d_col_idx, width) - - # 1b. all_touched: draw polygon boundaries with the supercover - # grid traversal on the GPU. Float pixel endpoints so every - # cell a ring segment crosses gets burned. + poly_launch = ( + cupy.asarray(edge_y_min), cupy.asarray(edge_x_at_ymin), + cupy.asarray(edge_inv_slope), cupy.asarray(edge_geom_id), + cupy.asarray(poly_props), cupy.asarray(poly_global), + cupy.asarray(row_ptr), cupy.asarray(col_idx)) + + # all_touched boundaries. ``poly_geoms`` and ``poly_ids`` come + # from ``_classify_geometries`` above and are always defined here, + # so this block does not depend on the polygon scanline path + # having allocated ``poly_launch``. The supercover boundary kernel + # walks every cell a ring segment crosses (float pixel endpoints), + # replacing the older Bresenham boundary burn. + boundary_launch = None if all_touched and poly_geoms: bx0, by0, bx1, by1, bidx = ( _extract_polygon_boundary_segments_float( poly_geoms, poly_ids, bounds, height, width)) if len(bx0) > 0: - n_bsegs = len(bx0) - tpb = 256 - bpg = (n_bsegs + tpb - 1) // tpb - kernels['burn_lines_supercover'][bpg, tpb]( - out, written, order, + boundary_launch = ( cupy.asarray(bx0), cupy.asarray(by0), cupy.asarray(bx1), cupy.asarray(by1), cupy.asarray(bidx), cupy.asarray(poly_props), - cupy.asarray(poly_global), - n_bsegs, height, width) + cupy.asarray(poly_global), len(bx0)) - # 2. Lines r0, c0, r1, c1, line_idx = _extract_line_segments( line_geoms, bounds, height, width) + line_launch = None if len(r0) > 0: - n_segs = len(r0) - tpb = 256 - bpg = (n_segs + tpb - 1) // tpb - kernels['burn_lines'][bpg, tpb]( - out, written, order, + line_launch = ( cupy.asarray(r0), cupy.asarray(c0), cupy.asarray(r1), cupy.asarray(c1), cupy.asarray(line_idx), cupy.asarray(line_props), - cupy.asarray(line_global), - n_segs, height, width) + cupy.asarray(line_global), len(r0)) - # 3. Points prows, pcols, pt_idx = _extract_points( point_geoms, bounds, height, width) + point_launch = None if len(prows) > 0: - n_pts = len(prows) - tpb = 256 - bpg = (n_pts + tpb - 1) // tpb - kernels['burn_points'][bpg, tpb]( - out, written, order, + point_launch = ( cupy.asarray(prows), cupy.asarray(pcols), cupy.asarray(pt_idx), cupy.asarray(point_props), - cupy.asarray(point_global), n_pts) + cupy.asarray(point_global), len(prows)) - return out.astype(dtype) + def _launch_phase(k_scan, k_lines, k_boundary, k_points): + tpb = 256 + if poly_launch is not None: + blocks = (height + tpb - 1) // tpb + k_scan[blocks, tpb](out, written, order, + poly_launch[0], poly_launch[1], + poly_launch[2], poly_launch[3], + poly_launch[4], poly_launch[5], + poly_launch[6], poly_launch[7], width) + if boundary_launch is not None: + n_bsegs = boundary_launch[7] + bpg = (n_bsegs + tpb - 1) // tpb + k_boundary[bpg, tpb](out, written, order, + boundary_launch[0], boundary_launch[1], + boundary_launch[2], boundary_launch[3], + boundary_launch[4], boundary_launch[5], + boundary_launch[6], n_bsegs, height, width) + if line_launch is not None: + n_segs = line_launch[7] + bpg = (n_segs + tpb - 1) // tpb + k_lines[bpg, tpb](out, written, order, + line_launch[0], line_launch[1], + line_launch[2], line_launch[3], + line_launch[4], line_launch[5], + line_launch[6], n_segs, height, width) + if point_launch is not None: + n_pts = point_launch[5] + bpg = (n_pts + tpb - 1) // tpb + k_points[bpg, tpb](out, written, order, + point_launch[0], point_launch[1], + point_launch[2], point_launch[3], + point_launch[4], n_pts) + + # Pass 1: write values (atomics) or, for first/last, resolve the + # per-pixel winning input index into ``order``. + _launch_phase(kernels['scanline_fill'], + kernels['burn_lines'], + kernels['burn_lines_supercover'], + kernels['burn_points']) + + # Pass 2 (first / last only): stamp the winner's value into ``out``. + if two_pass: + _launch_phase(kernels['scanline_fill_pass2'], + kernels['burn_lines_pass2'], + kernels['burn_lines_supercover_pass2'], + kernels['burn_points_pass2']) + + final_out = _gpu_finalize_buffers(out, order, order_sentinel, fill, + atomic_mode) + return final_out.astype(dtype) # --------------------------------------------------------------------------- @@ -2014,89 +2417,128 @@ def _rasterize_tile_cupy(poly_wkb, poly_props_2d, poly_global_2d, tile_bounds, seg_r0, seg_c0, seg_r1, seg_c1, seg_geom_idx, line_props, line_global, pt_rows, pt_cols, pt_geom_idx, - point_props, point_global): - """GPU tile rasterization: polygons as WKB, lines/points as segments.""" - kernels = _ensure_gpu_kernels(merge_fn, should_write) + point_props, point_global, merge_name=None): + """GPU tile rasterization: polygons as WKB, lines/points as segments. - out = cupy.full((tile_h, tile_w), fill, dtype=cupy.float64) - written = cupy.zeros((tile_h, tile_w), dtype=cupy.int8) - order = cupy.full((tile_h, tile_w), -1, dtype=cupy.int64) - - # 1. Polygons (deserialize WKB, then scanline fill on GPU) + When ``merge_name`` selects an atomic built-in merge, the per-tile + kernels use CUDA atomics so overlapping writes inside the tile are + deterministic. Cross-tile pixels do not overlap (the tiling is a + partition of the output grid), so per-tile atomicity is sufficient. + """ + kernels = _ensure_gpu_kernels(merge_fn, should_write, merge_name) + atomic_mode = kernels['atomic_mode'] + two_pass = atomic_mode in ('first', 'last') + + out, written, order, order_sentinel = _gpu_init_buffers( + tile_h, tile_w, fill, atomic_mode) + + # Stage launches (same pattern as _run_cupy). Stage once so the + # first/last pass-2 sweep reuses the device buffers. ``poly_geoms`` + # and ``poly_ids`` are bound inside the ``if poly_wkb:`` block, so + # ``boundary_launch`` is staged inside the same block to keep the + # dependency local. + poly_launch = None + boundary_launch = None if poly_wkb: poly_geoms = _polys_from_wkb(poly_wkb) poly_ids = np.arange(len(poly_geoms), dtype=np.int32) edge_arrays = _extract_edges( - poly_geoms, poly_ids, tile_bounds, - tile_h, tile_w) + poly_geoms, poly_ids, tile_bounds, tile_h, tile_w) edge_arrays = _sort_edges(edge_arrays) edge_y_min = edge_arrays[0] if len(edge_y_min) > 0: - edge_y_max, edge_x_at_ymin, edge_inv_slope, \ - edge_geom_id = edge_arrays[1:] + edge_y_max, edge_x_at_ymin, edge_inv_slope, edge_geom_id = \ + edge_arrays[1:] row_ptr, col_idx = _build_row_csr_numba( edge_y_min, edge_y_max, tile_h) - # Same fail-fast as eager CuPy: per-tile truncation would be - # silent without this check. _check_gpu_edge_cap(row_ptr) - tpb = 256 - blocks = (tile_h + tpb - 1) // tpb - kernels['scanline_fill'][blocks, tpb]( - out, written, order, + poly_launch = ( cupy.asarray(edge_y_min), cupy.asarray(edge_x_at_ymin), cupy.asarray(edge_inv_slope), cupy.asarray(edge_geom_id), cupy.asarray(poly_props_2d), cupy.asarray(poly_global_2d), - cupy.asarray(row_ptr), cupy.asarray(col_idx), tile_w) + cupy.asarray(row_ptr), cupy.asarray(col_idx)) - # 1b. all_touched: draw polygon boundaries with the - # supercover grid traversal kernel so the dask+cupy path - # matches numpy/cupy pixel-for-pixel. + # all_touched: stage the supercover boundary burn through + # ``boundary_launch`` so the two-pass first/last path can fire + # both pass-1 and pass-2 kernels. if all_touched: bx0, by0, bx1, by1, bidx = ( _extract_polygon_boundary_segments_float( poly_geoms, poly_ids, tile_bounds, tile_h, tile_w)) if len(bx0) > 0: - n_bsegs = len(bx0) - tpb_b = 256 - bpg_b = (n_bsegs + tpb_b - 1) // tpb_b - kernels['burn_lines_supercover'][bpg_b, tpb_b]( - out, written, order, + boundary_launch = ( cupy.asarray(bx0), cupy.asarray(by0), cupy.asarray(bx1), cupy.asarray(by1), cupy.asarray(bidx), cupy.asarray(poly_props_2d), - cupy.asarray(poly_global_2d), - n_bsegs, tile_h, tile_w) + cupy.asarray(poly_global_2d), len(bx0)) - # 2. Lines (tile-local segments, GPU Bresenham) + line_launch = None if len(seg_r0) > 0: - n_segs = len(seg_r0) - tpb = 256 - bpg = (n_segs + tpb - 1) // tpb - kernels['burn_lines'][bpg, tpb]( - out, written, order, + line_launch = ( cupy.asarray(seg_r0), cupy.asarray(seg_c0), cupy.asarray(seg_r1), cupy.asarray(seg_c1), cupy.asarray(seg_geom_idx), _ensure_cupy(line_props), - _ensure_cupy(line_global), - n_segs, tile_h, tile_w) + _ensure_cupy(line_global), len(seg_r0)) - # 3. Points (tile-local) + point_launch = None if len(pt_rows) > 0: - n_pts = len(pt_rows) - tpb = 256 - bpg = (n_pts + tpb - 1) // tpb - kernels['burn_points'][bpg, tpb]( - out, written, order, + point_launch = ( cupy.asarray(pt_rows), cupy.asarray(pt_cols), cupy.asarray(pt_geom_idx), _ensure_cupy(point_props), - _ensure_cupy(point_global), n_pts) + _ensure_cupy(point_global), len(pt_rows)) - return out.astype(dtype) + def _launch_phase(k_scan, k_lines, k_boundary, k_points): + tpb = 256 + if poly_launch is not None: + blocks = (tile_h + tpb - 1) // tpb + k_scan[blocks, tpb](out, written, order, + poly_launch[0], poly_launch[1], + poly_launch[2], poly_launch[3], + poly_launch[4], poly_launch[5], + poly_launch[6], poly_launch[7], tile_w) + if boundary_launch is not None: + n_bsegs = boundary_launch[7] + bpg = (n_bsegs + tpb - 1) // tpb + k_boundary[bpg, tpb](out, written, order, + boundary_launch[0], boundary_launch[1], + boundary_launch[2], boundary_launch[3], + boundary_launch[4], boundary_launch[5], + boundary_launch[6], + n_bsegs, tile_h, tile_w) + if line_launch is not None: + n_segs = line_launch[7] + bpg = (n_segs + tpb - 1) // tpb + k_lines[bpg, tpb](out, written, order, + line_launch[0], line_launch[1], + line_launch[2], line_launch[3], + line_launch[4], line_launch[5], + line_launch[6], n_segs, tile_h, tile_w) + if point_launch is not None: + n_pts = point_launch[5] + bpg = (n_pts + tpb - 1) // tpb + k_points[bpg, tpb](out, written, order, + point_launch[0], point_launch[1], + point_launch[2], point_launch[3], + point_launch[4], n_pts) + + _launch_phase(kernels['scanline_fill'], + kernels['burn_lines'], + kernels['burn_lines_supercover'], + kernels['burn_points']) + if two_pass: + _launch_phase(kernels['scanline_fill_pass2'], + kernels['burn_lines_pass2'], + kernels['burn_lines_supercover_pass2'], + kernels['burn_points_pass2']) + + final_out = _gpu_finalize_buffers(out, order, order_sentinel, fill, + atomic_mode) + return final_out.astype(dtype) def _run_dask_cupy(geometries, props_array, bounds, height, width, fill, dtype, all_touched, merge_fn, should_write, - row_chunks, col_chunks): + row_chunks, col_chunks, merge_name=None): """Dask + CuPy backend: tile-based parallel GPU rasterization.""" import dask import dask.array as da @@ -2175,7 +2617,8 @@ def _run_dask_cupy(geometries, props_array, bounds, height, width, fill, tile_h, tile_w, fill, dtype, all_touched, merge_fn, should_write, *ts, tile_line_props, tile_line_global, - *tp, tile_point_props, tile_point_global) + *tp, tile_point_props, tile_point_global, + merge_name) blocks[i][j] = da.from_delayed( delayed_tile, shape=(tile_h, tile_w), dtype=dtype, meta=cupy.empty(())) @@ -2684,6 +3127,10 @@ def rasterize( # For GPU paths, resolve string merge names to GPU (merge_fn, # should_write_fn) pairs. User callables are paired with the # always-write predicate (matches the public 3-arg signature). + # ``gpu_merge_name`` selects the atomic kernel path in + # ``_ensure_gpu_kernels``; ``None`` falls back to the non-atomic + # closure path used for user callables. + gpu_merge_name = None if use_cuda: if cupy is None: raise ImportError( @@ -2691,6 +3138,7 @@ def rasterize( gpu_fns = _get_gpu_merge_fns() if isinstance(_merge_fn_gpu, str): gpu_merge_fn, should_write_gpu = gpu_fns[_merge_fn_gpu] + gpu_merge_name = _merge_fn_gpu else: gpu_merge_fn = _merge_fn_gpu # Need an always-true predicate compiled for cuda; reuse any @@ -2706,7 +3154,7 @@ def rasterize( geom_list, props_array, final_bounds, final_height, final_width, fill, final_dtype, all_touched, gpu_merge_fn, should_write_gpu, - row_chunks, col_chunks) + row_chunks, col_chunks, gpu_merge_name) else: out = _run_dask_numpy( geom_list, props_array, final_bounds, @@ -2716,7 +3164,8 @@ def rasterize( elif use_cuda: out = _run_cupy(geom_list, props_array, final_bounds, final_height, final_width, fill, final_dtype, - all_touched, gpu_merge_fn, should_write_gpu) + all_touched, gpu_merge_fn, should_write_gpu, + gpu_merge_name) else: out = _run_numpy(geom_list, props_array, final_bounds, final_height, final_width, fill, final_dtype, diff --git a/xrspatial/tests/test_rasterize_gpu_race_2167.py b/xrspatial/tests/test_rasterize_gpu_race_2167.py new file mode 100644 index 000000000..f1f6e9557 --- /dev/null +++ b/xrspatial/tests/test_rasterize_gpu_race_2167.py @@ -0,0 +1,231 @@ +"""Regression tests for issue #2167. + +GPU point and line rasterize kernels previously did non-atomic +read-modify-write on the per-pixel state. When multiple threads +landed on the same pixel (overlapping points, crossing line +segments, duplicate segments, shared polygon boundaries with +``all_touched=True``) the writes raced and the cupy backend could +return a value that disagreed with the numpy backend, and varied +from run to run. + +These tests pin down deterministic, numpy-matching behaviour for +the six built-in aggregators (``last``, ``first``, ``sum``, +``count``, ``min``, ``max``) on overlapping geometries. + +When cupy / CUDA are not available the whole module skips. +""" +import numpy as np +import pytest + +try: + from shapely.geometry import LineString, Point, box + has_shapely = True +except ImportError: + has_shapely = False + +if has_shapely: + from xrspatial.rasterize import rasterize + +try: + import cupy # noqa: F401 + has_cupy = True +except ImportError: + has_cupy = False + +try: + from numba import cuda + has_cuda = has_cupy and cuda.is_available() +except Exception: + has_cuda = False + + +pytestmark = [ + pytest.mark.skipif(not has_shapely, reason="shapely not installed"), + pytest.mark.skipif(not has_cuda, reason="CUDA / CuPy not available"), +] + + +BOUNDS = (0.0, 0.0, 10.0, 10.0) +HEIGHT = 10 +WIDTH = 10 +MERGES = ('last', 'first', 'sum', 'count', 'min', 'max') + + +def _as_numpy(arr): + """Return a plain numpy ndarray regardless of backend.""" + data = arr.data + if hasattr(data, 'get'): + return data.get() + return np.asarray(data) + + +def _run(geom_list, merge, use_cuda, all_touched=False): + return rasterize( + geom_list, width=WIDTH, height=HEIGHT, bounds=BOUNDS, + merge=merge, use_cuda=use_cuda, fill=np.nan, + all_touched=all_touched, + ) + + +# --------------------------------------------------------------------------- +# Scenario builders +# --------------------------------------------------------------------------- + +def _coincident_points(): + """Three points that land on the same pixel, plus one offset point.""" + # Pixel grid spans 0-10 in 10 steps -> 1 unit per pixel. All three + # of these land in the pixel centred at (5.5, 5.5). The fourth one + # is well away so we can confirm non-overlap is unaffected. + return [ + (Point(5.5, 5.5), 1.0), + (Point(5.7, 5.3), 2.0), + (Point(5.2, 5.8), 4.0), + (Point(1.5, 1.5), 9.0), + ] + + +def _crossing_lines(): + """Two line segments that cross at one or more pixels.""" + return [ + (LineString([(0.5, 0.5), (9.5, 9.5)]), 1.0), + (LineString([(0.5, 9.5), (9.5, 0.5)]), 2.0), + (LineString([(2.5, 5.5), (7.5, 5.5)]), 4.0), + ] + + +def _duplicate_segments(): + """Two identical line segments -- every pixel they cover overlaps.""" + coords = [(1.5, 1.5), (8.5, 8.5)] + return [ + (LineString(coords), 1.0), + (LineString(coords), 8.0), + ] + + +SCENARIOS = { + 'coincident_points': _coincident_points, + 'crossing_lines': _crossing_lines, + 'duplicate_segments': _duplicate_segments, +} + + +# --------------------------------------------------------------------------- +# Cross-backend parity +# --------------------------------------------------------------------------- + +@pytest.mark.parametrize('scenario', list(SCENARIOS)) +@pytest.mark.parametrize('merge', MERGES) +def test_cupy_matches_numpy_on_overlap(scenario, merge): + geoms = SCENARIOS[scenario]() + expected = _as_numpy(_run(geoms, merge, use_cuda=False)) + actual = _as_numpy(_run(geoms, merge, use_cuda=True)) + + # Pixels touched by no geometry should be ``fill`` (NaN here) on + # both backends. np.testing.assert_array_equal treats NaN==NaN + # when equal_nan=True is requested via assert_array_equal in + # recent NumPy; use the explicit form for portability. + np.testing.assert_allclose( + actual, expected, rtol=0, atol=0, equal_nan=True, + err_msg=( + f"cupy backend disagrees with numpy for scenario " + f"{scenario!r} and merge {merge!r}.\n" + f"cupy:\n{actual}\nnumpy:\n{expected}" + ), + ) + + +# --------------------------------------------------------------------------- +# Determinism: same input must give identical output across runs. +# --------------------------------------------------------------------------- + +@pytest.mark.parametrize('scenario', list(SCENARIOS)) +@pytest.mark.parametrize('merge', MERGES) +def test_cupy_is_deterministic_across_runs(scenario, merge): + geoms = SCENARIOS[scenario]() + first = _as_numpy(_run(geoms, merge, use_cuda=True)) + # Six repeats is enough to surface a thread-interleaving race + # without making the test slow. + for _ in range(5): + again = _as_numpy(_run(geoms, merge, use_cuda=True)) + np.testing.assert_allclose( + again, first, rtol=0, atol=0, equal_nan=True, + err_msg=( + f"cupy backend produced different results across runs " + f"for scenario {scenario!r} and merge {merge!r}.\n" + f"first:\n{first}\nagain:\n{again}" + ), + ) + + +# --------------------------------------------------------------------------- +# Spot-check that sum on coincident points actually accumulates. Without +# the atomic-add fix, this test would either return one of the input +# values or NaN, never the true sum. +# --------------------------------------------------------------------------- + +def test_sum_of_coincident_points_equals_total(): + geoms = _coincident_points() + # The first three points all land in the same pixel; sum should be 7. + result = _as_numpy(_run(geoms, 'sum', use_cuda=True)) + # Find the pixel with the largest accumulated value. + finite = result[np.isfinite(result)] + assert finite.size > 0 + # Three coincident points (1 + 2 + 4) plus an isolated point of 9. + assert 7.0 in finite, ( + f"expected the three-point pixel to sum to 7.0; got {finite}" + ) + + +def test_count_of_coincident_points_equals_three(): + geoms = _coincident_points() + result = _as_numpy(_run(geoms, 'count', use_cuda=True)) + finite = result[np.isfinite(result)] + assert 3.0 in finite, ( + f"expected the three-point pixel to count 3; got {finite}" + ) + + +# --------------------------------------------------------------------------- +# all_touched=True polygon boundaries are the polygon analogue of the +# line-overlap case: two polygons sharing a boundary write the same +# pixels twice via the Bresenham boundary pass, on top of the scanline +# fill. Confirm cupy still matches numpy across all six aggregators. +# --------------------------------------------------------------------------- + +def _shared_boundary_polygons(): + """Two rectangles that share an edge.""" + return [ + (box(1.0, 1.0, 5.0, 5.0), 1.0), + (box(5.0, 1.0, 9.0, 5.0), 2.0), + ] + + +@pytest.mark.parametrize('merge', MERGES) +def test_cupy_matches_numpy_all_touched_shared_boundary(merge): + geoms = _shared_boundary_polygons() + expected = _as_numpy(_run(geoms, merge, use_cuda=False, all_touched=True)) + actual = _as_numpy(_run(geoms, merge, use_cuda=True, all_touched=True)) + np.testing.assert_allclose( + actual, expected, rtol=0, atol=0, equal_nan=True, + err_msg=( + f"cupy backend disagrees with numpy for shared-boundary " + f"polygons with all_touched=True and merge {merge!r}.\n" + f"cupy:\n{actual}\nnumpy:\n{expected}" + ), + ) + + +@pytest.mark.parametrize('merge', MERGES) +def test_cupy_deterministic_all_touched_shared_boundary(merge): + geoms = _shared_boundary_polygons() + first = _as_numpy(_run(geoms, merge, use_cuda=True, all_touched=True)) + for _ in range(5): + again = _as_numpy(_run(geoms, merge, use_cuda=True, all_touched=True)) + np.testing.assert_allclose( + again, first, rtol=0, atol=0, equal_nan=True, + err_msg=( + f"cupy backend produced different results across runs " + f"for shared-boundary polygons with all_touched=True and " + f"merge {merge!r}." + ), + )