From e2232b64852249e1e7fa991039ed612df9ef2f1a Mon Sep 17 00:00:00 2001 From: David Ndungu Date: Thu, 9 Apr 2026 22:01:31 -0700 Subject: [PATCH 1/2] fix(compute): reuse dst GPU memory instead of allocating per call (#84) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GPU ops (gpuBinaryOp, gpuUnaryOp, gpuScalarOp, Transpose, MatMul, Sum) were allocating fresh device memory via pool.Alloc on every call even when a pre-sized dst tensor was provided, then swapping dst's storage to the new allocation. The old GPUStorage was orphaned and depended on Go's GC finalizer to call pool.Free. At large training shapes with hundreds of batches and ~20 ops per batch, orphaned allocations piled up faster than the GC could reclaim, causing unbounded GPU memory growth and OOM. Fix: add tryReuseDstPtr helper that checks if dst[0] already has a GPUStorage with sufficient capacity. If so, the kernel writes directly into the existing device pointer — no pool.Alloc, no orphaned storage, no GC pressure. When dst is nil or undersized, the existing alloc path is preserved unchanged. Applied to the six hot-path op families that cover PatchTST GPU training: - gpuBinaryOp (Add, Sub, Mul same-shape) - gpuUnaryOp (Exp, Log, Sin, Cos, Tanh, Sqrt) - gpuScalarOp (MulScalar, AddScalar, DivScalar) - Transpose (gpu_engine_memory.go) - MatMul standard float32 path (gpu_engine.go) - Sum/ReduceSum (gpu_kernels.go) Other ops (broadcast, Q4/Q8/BF16 matmul, fused kernels) continue using the existing alloc path and can be converted incrementally. Full ztensor test suite passes on CPU host. Closes #84 Refs zerfoo/zerfoo#373 --- compute/gpu_engine.go | 29 +++++++--- compute/gpu_engine_memory.go | 25 +++++++-- compute/gpu_kernels.go | 101 ++++++++++++++++++++++++++++------- 3 files changed, 124 insertions(+), 31 deletions(-) diff --git a/compute/gpu_engine.go b/compute/gpu_engine.go index e57c847..b110ee0 100644 --- a/compute/gpu_engine.go +++ b/compute/gpu_engine.go @@ -978,13 +978,16 @@ func (e *GPUEngine[T]) MatMul(ctx context.Context, a, b *tensor.TensorNumeric[T] return nil, err } - // Allocate device output. - devCTotal, err := e.pool.Alloc(e.deviceID, outputBytes) - if err != nil { - e.oomFallbackCount.Add(1) - e.logger.Warn("MatMul: GPU output alloc failed, falling back to CPU", "error", err.Error()) + // Reuse dst's existing GPU memory when possible (#84). + devCTotal, reusedC := tryReuseDstPtr[T](batchSize*cMatSize, dst) + if !reusedC { + devCTotal, err = e.pool.Alloc(e.deviceID, outputBytes) + if err != nil { + e.oomFallbackCount.Add(1) + e.logger.Warn("MatMul: GPU output alloc failed, falling back to CPU", "error", err.Error()) - return e.cpu.MatMul(ctx, a, b, dst...) + return e.cpu.MatMul(ctx, a, b, dst...) + } } // Use strided batched GEMM when available for float32 with batch > 1. @@ -1013,9 +1016,14 @@ func (e *GPUEngine[T]) MatMul(ctx context.Context, a, b *tensor.TensorNumeric[T] if err := batched.SgemmStridedBatched(m, n, k, 1.0, devA, strideA, devB, strideBVal, 0.0, devCTotal, strideC, batchSize); err != nil { - e.pool.Free(e.deviceID, devCTotal, outputBytes) + if !reusedC { + e.pool.Free(e.deviceID, devCTotal, outputBytes) + } return nil, fmt.Errorf("MatMul: batched GEMM: %w", err) } + if reusedC { + return finishReusedDst[T](dst[0], outShape), nil + } return makeGPUResult[T](e, outShape, devCTotal, batchSize*cMatSize, dst...) } } @@ -1052,12 +1060,17 @@ func (e *GPUEngine[T]) MatMul(ctx context.Context, a, b *tensor.TensorNumeric[T] } if blasErr != nil { - e.pool.Free(e.deviceID, devCTotal, outputBytes) + if !reusedC { + e.pool.Free(e.deviceID, devCTotal, outputBytes) + } return nil, fmt.Errorf("MatMul: BLAS batch %d: %w", batch, blasErr) } } + if reusedC { + return finishReusedDst[T](dst[0], outShape), nil + } return makeGPUResult[T](e, outShape, devCTotal, batchSize*cMatSize, dst...) } diff --git a/compute/gpu_engine_memory.go b/compute/gpu_engine_memory.go index 1aef149..6f90f16 100644 --- a/compute/gpu_engine_memory.go +++ b/compute/gpu_engine_memory.go @@ -132,9 +132,14 @@ func (e *GPUEngine[T]) Transpose(ctx context.Context, a *tensor.TensorNumeric[T] } byteSize := total * f32Size - devOut, err := e.pool.Alloc(e.deviceID, byteSize) - if err != nil { - return e.cpu.Transpose(ctx, a, axes, dst...) + + // Reuse dst's existing GPU memory when possible (#84). + devOut, reused := tryReuseDstPtr[T](total, dst) + if !reused { + devOut, err = e.pool.Alloc(e.deviceID, byteSize) + if err != nil { + return e.cpu.Transpose(ctx, a, axes, dst...) + } } // Fast path: 2D transpose. @@ -145,9 +150,14 @@ func (e *GPUEngine[T]) Transpose(ctx context.Context, a *tensor.TensorNumeric[T] "cols", fmt.Sprintf("%d", shape[1])) } if err := e.kernels.Transpose2D(devIn, devOut, shape[0], shape[1], e.stream); err != nil { - e.pool.Free(e.deviceID, devOut, byteSize) + if !reused { + e.pool.Free(e.deviceID, devOut, byteSize) + } return nil, err } + if reused { + return finishReusedDst[T](dst[0], outShape), nil + } return makeGPUResult[T](e, outShape, devOut, total, dst...) } @@ -175,10 +185,15 @@ func (e *GPUEngine[T]) Transpose(ctx context.Context, a *tensor.TensorNumeric[T] } if err := e.kernels.TransposeND(devIn, devOut, inStrides32, outStrides32, perm32, rank, total, e.stream); err != nil { - e.pool.Free(e.deviceID, devOut, byteSize) + if !reused { + e.pool.Free(e.deviceID, devOut, byteSize) + } return nil, err } + if reused { + return finishReusedDst[T](dst[0], outShape), nil + } return makeGPUResult[T](e, outShape, devOut, total, dst...) } diff --git a/compute/gpu_kernels.go b/compute/gpu_kernels.go index 3c36743..fb06e05 100644 --- a/compute/gpu_kernels.go +++ b/compute/gpu_kernels.go @@ -115,6 +115,35 @@ func getDevicePtr[T tensor.Numeric](e *GPUEngine[T], t *tensor.TensorNumeric[T]) return devPtr, cleanup, nil } +// tryReuseDstPtr checks whether dst[0] already has a GPUStorage with at least +// neededElems capacity. If so, it returns the existing device pointer so the +// caller can write kernel output directly into it, avoiding a pool.Alloc and +// the resulting GC-pressure from orphaned GPUStorage objects. See ztensor#84. +func tryReuseDstPtr[T tensor.Numeric](neededElems int, dst []*tensor.TensorNumeric[T]) (unsafe.Pointer, bool) { + if len(dst) == 0 || dst[0] == nil { + return nil, false + } + gs, ok := dst[0].GetStorage().(*tensor.GPUStorage[T]) + if !ok || gs.Len() < neededElems { + return nil, false + } + return gs.Ptr(), true +} + +// finishReusedDst updates dst's shape and strides in place after a kernel has +// written into dst's existing device memory. No new GPUStorage is created. +func finishReusedDst[T tensor.Numeric](dst *tensor.TensorNumeric[T], shape []int) *tensor.TensorNumeric[T] { + strides := make([]int, len(shape)) + stride := 1 + for i := len(shape) - 1; i >= 0; i-- { + strides[i] = stride + stride *= shape[i] + } + dst.SetShape(shape) + dst.SetStrides(strides) + return dst +} + // makeGPUResult creates a tensor with pool-backed GPUStorage wrapping the given // device pointer. When the tensor is freed, the pointer is returned to the pool // for reuse instead of calling cudaFree. @@ -522,17 +551,26 @@ func gpuBinaryOp[T tensor.Numeric]( byteSize := n * f32Size - devC, err := e.pool.Alloc(e.deviceID, byteSize) - if err != nil { - return nil, err + // Reuse dst's existing GPU memory when possible (#84). + devC, reused := tryReuseDstPtr[T](n, dst) + if !reused { + devC, err = e.pool.Alloc(e.deviceID, byteSize) + if err != nil { + return nil, err + } } if err := kernelFn(devA, devB, devC, n, e.stream); err != nil { - e.pool.Free(e.deviceID, devC, byteSize) + if !reused { + e.pool.Free(e.deviceID, devC, byteSize) + } return nil, err } + if reused { + return finishReusedDst[T](dst[0], a.Shape()), nil + } return makeGPUResult[T](e, a.Shape(), devC, n, dst...) } @@ -559,17 +597,26 @@ func gpuUnaryOp[T tensor.Numeric]( byteSize := n * f32Size - devC, err := e.pool.Alloc(e.deviceID, byteSize) - if err != nil { - return nil, err + // Reuse dst's existing GPU memory when possible (#84). + devC, reused := tryReuseDstPtr[T](n, dst) + if !reused { + devC, err = e.pool.Alloc(e.deviceID, byteSize) + if err != nil { + return nil, err + } } if err := kernelFn(devA, devC, n, e.stream); err != nil { - e.pool.Free(e.deviceID, devC, byteSize) + if !reused { + e.pool.Free(e.deviceID, devC, byteSize) + } return nil, err } + if reused { + return finishReusedDst[T](dst[0], a.Shape()), nil + } return makeGPUResult[T](e, a.Shape(), devC, n, dst...) } @@ -597,17 +644,26 @@ func gpuScalarOp[T tensor.Numeric]( byteSize := n * f32Size - devC, err := e.pool.Alloc(e.deviceID, byteSize) - if err != nil { - return nil, err + // Reuse dst's existing GPU memory when possible (#84). + devC, reused := tryReuseDstPtr[T](n, dst) + if !reused { + devC, err = e.pool.Alloc(e.deviceID, byteSize) + if err != nil { + return nil, err + } } if err := kernelFn(devA, scalar, devC, n, e.stream); err != nil { - e.pool.Free(e.deviceID, devC, byteSize) + if !reused { + e.pool.Free(e.deviceID, devC, byteSize) + } return nil, err } + if reused { + return finishReusedDst[T](dst[0], a.Shape()), nil + } return makeGPUResult[T](e, a.Shape(), devC, n, dst...) } @@ -957,20 +1013,29 @@ func (e *GPUEngine[T]) gpuSum(ctx context.Context, a *tensor.TensorNumeric[T], a outByteSize := numStripes * f32Size - devOut, err := e.pool.Alloc(e.deviceID, outByteSize) - if err != nil { - e.oomFallbackCount.Add(1) - e.logger.Warn("Sum: GPU output alloc failed, falling back to CPU", "error", err.Error()) + // Reuse dst's existing GPU memory when possible (#84). + devOut, reused := tryReuseDstPtr[T](numStripes, dst) + if !reused { + devOut, err = e.pool.Alloc(e.deviceID, outByteSize) + if err != nil { + e.oomFallbackCount.Add(1) + e.logger.Warn("Sum: GPU output alloc failed, falling back to CPU", "error", err.Error()) - return e.cpu.Sum(ctx, a, axis, keepDims, dst...) + return e.cpu.Sum(ctx, a, axis, keepDims, dst...) + } } if err := e.kernels.SumAxis(devIn, devOut, outer, inner, axisSize, e.stream); err != nil { - e.pool.Free(e.deviceID, devOut, outByteSize) + if !reused { + e.pool.Free(e.deviceID, devOut, outByteSize) + } return nil, err } + if reused { + return finishReusedDst[T](dst[0], newShape), nil + } return makeGPUResult[T](e, newShape, devOut, numStripes, dst...) } From 88f7394c237b9aea37ce4609e1cd2861ebd99ee2 Mon Sep 17 00:00:00 2001 From: David Ndungu Date: Thu, 9 Apr 2026 22:01:41 -0700 Subject: [PATCH 2/2] style(compute): gofmt gpu_engine.go --- compute/gpu_engine.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/compute/gpu_engine.go b/compute/gpu_engine.go index b110ee0..8482fb5 100644 --- a/compute/gpu_engine.go +++ b/compute/gpu_engine.go @@ -1269,7 +1269,8 @@ func (e *GPUEngine[T]) matMulQ4(ctx context.Context, qs *tensor.Q4Storage, a, b // matMulQ4BWeight handles MatMul where B has Q4 storage (virtual-transposed weight). // B's shape after virtual transpose is [K, N], but the Q4 data is laid out as [N, K]. // We compute C[M, N] = A[M, K] * dequant(B)^T by reformulating as: -// C_temp[N, M] = gemm_q4(B_q4[N, K], A^T[K, M]) +// +// C_temp[N, M] = gemm_q4(B_q4[N, K], A^T[K, M]) // // For GEMV (M=1), A^T[K,1] is just A's data as a column, and C_temp[N,1] // can be reshaped to [1, N] without a physical transpose.