From badc1564ee614f40b42ce5e84015ec7f9614acde Mon Sep 17 00:00:00 2001 From: e-strauss Date: Wed, 10 Dec 2025 15:27:19 +0100 Subject: [PATCH] scipy matrix transfer to java runtime --- .../runtime/util/Py4jConverterUtils.java | 38 ++ src/main/python/make_jar_and_package.sh | 2 + .../systemds/context/systemds_context.py | 12 +- src/main/python/systemds/utils/converters.py | 316 ++++++----- .../tests/matrix/test_block_converter.py | 31 +- .../frame/array/Py4jConverterUtilsTest.java | 240 --------- .../utils/Py4jConverterUtilsTest.java | 510 ++++++++++++++++++ 7 files changed, 768 insertions(+), 381 deletions(-) create mode 100755 src/main/python/make_jar_and_package.sh delete mode 100644 src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java create mode 100644 src/test/java/org/apache/sysds/test/component/utils/Py4jConverterUtilsTest.java diff --git a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java index 7faee722d04..ea3e90333aa 100644 --- a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java +++ b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java @@ -63,6 +63,44 @@ public static MatrixBlock convertSciPyCOOToMB(byte[] data, byte[] row, byte[] co return mb; } + public static MatrixBlock convertSciPyCSRToMB(byte[] data, byte[] indices, byte[] indptr, int rlen, int clen, int nnz) { + MatrixBlock mb = new MatrixBlock(rlen, clen, true); + mb.allocateSparseRowsBlock(false); + ByteBuffer dataBuf = ByteBuffer.wrap(data); + dataBuf.order(ByteOrder.nativeOrder()); + ByteBuffer indicesBuf = ByteBuffer.wrap(indices); + indicesBuf.order(ByteOrder.nativeOrder()); + ByteBuffer indptrBuf = ByteBuffer.wrap(indptr); + indptrBuf.order(ByteOrder.nativeOrder()); + + // Read indptr array to get row boundaries + int[] rowPtrs = new int[rlen + 1]; + for(int i = 0; i <= rlen; i++) { + rowPtrs[i] = indptrBuf.getInt(); + } + + // Iterate through each row + for(int row = 0; row < rlen; row++) { + int startIdx = rowPtrs[row]; + int endIdx = rowPtrs[row + 1]; + + // Set buffer positions to the start of this row + dataBuf.position(startIdx * Double.BYTES); + indicesBuf.position(startIdx * Integer.BYTES); + + // Process all non-zeros in this row sequentially + for(int idx = startIdx; idx < endIdx; idx++) { + double val = dataBuf.getDouble(); + int colIndex = indicesBuf.getInt(); + mb.set(row, colIndex, val); + } + } + + mb.recomputeNonZeros(); + mb.examSparsity(); + return mb; + } + public static MatrixBlock allocateDenseOrSparse(int rlen, int clen, boolean isSparse) { MatrixBlock ret = new MatrixBlock(rlen, clen, isSparse); ret.allocateBlock(); diff --git a/src/main/python/make_jar_and_package.sh b/src/main/python/make_jar_and_package.sh new file mode 100755 index 00000000000..85d46943cb1 --- /dev/null +++ b/src/main/python/make_jar_and_package.sh @@ -0,0 +1,2 @@ +mvn package -P distribution +python create_python_dist.py \ No newline at end of file diff --git a/src/main/python/systemds/context/systemds_context.py b/src/main/python/systemds/context/systemds_context.py index 99a6cba57b8..6d8578722a5 100644 --- a/src/main/python/systemds/context/systemds_context.py +++ b/src/main/python/systemds/context/systemds_context.py @@ -29,6 +29,7 @@ import sys import struct import traceback +import warnings from contextlib import contextmanager from glob import glob from queue import Queue @@ -103,9 +104,18 @@ def __init__( The logging levels are as follows: 10 DEBUG, 20 INFO, 30 WARNING, 40 ERROR, 50 CRITICAL. :param py4j_logging_level: The logging level for Py4j to use, since all communication to the JVM is done through this, it can be verbose if not set high. - :param data_transfer_mode: default 0, + :param data_transfer_mode: default 0, 0 for py4j, 1 for using pipes (on unix systems) + :param multi_pipe_enabled: default False, if True, use multiple pipes for data transfer + only used if data_transfer_mode is 1. + .. experimental:: This parameter is experimental and may be removed in a future version. """ + if multi_pipe_enabled: + warnings.warn( + "The 'multi_pipe_enabled' parameter is experimental and may be removed in a future version.", + DeprecationWarning, + stacklevel=2, + ) self.__setup_logging(logging_level, py4j_logging_level) self.__start(port, capture_stdout) self.capture_stats(capture_statistics) diff --git a/src/main/python/systemds/utils/converters.py b/src/main/python/systemds/utils/converters.py index ab7b7ffd8d5..e0a2e543d31 100644 --- a/src/main/python/systemds/utils/converters.py +++ b/src/main/python/systemds/utils/converters.py @@ -26,6 +26,7 @@ import concurrent.futures from py4j.java_gateway import JavaClass, JavaGateway, JavaObject, JVMView import os +import scipy.sparse as sp # Constants _HANDSHAKE_OFFSET = 1000 @@ -86,129 +87,6 @@ def _pipe_receive_bytes(pipe, view, offset, end, batch_size_bytes, logger): offset += actual_size -def _pipe_receive_strings( - pipe, num_strings, batch_size=_DEFAULT_BATCH_SIZE_BYTES, pipe_id=0, logger=None -): - """ - Reads UTF-8 encoded strings from the pipe in batches. - Format: 0: - buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining] - - # Read more data - t0 = time() - chunk = os.read(fd, batch_size) - t_io += time() - t0 - if not chunk: - raise IOError("Pipe read returned empty data unexpectedly") - - # Append new data to buffer - chunk_len = len(chunk) - if buf_remaining + chunk_len > len(buf): - # Grow buffer if needed - new_buf = bytearray(len(buf) * 2) - new_buf[:buf_remaining] = buf[:buf_remaining] - buf = new_buf - - buf[buf_remaining : buf_remaining + chunk_len] = chunk - buf_remaining += chunk_len - buf_pos = 0 - - # Read length prefix (little-endian int32) - # Note: length can be -1 (0xFFFFFFFF) to indicate null value - length = struct.unpack( - " 0: - buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining] - buf_pos = 0 - - # Read more data until we have enough - bytes_needed = length - buf_remaining - while bytes_needed > 0: - t0 = time() - chunk = os.read(fd, min(batch_size, bytes_needed)) - t_io += time() - t0 - if not chunk: - raise IOError("Pipe read returned empty data unexpectedly") - - chunk_len = len(chunk) - if buf_remaining + chunk_len > len(buf): - # Grow buffer if needed - new_buf = bytearray(len(buf) * 2) - new_buf[:buf_remaining] = buf[:buf_remaining] - buf = new_buf - - buf[buf_remaining : buf_remaining + chunk_len] = chunk - buf_remaining += chunk_len - bytes_needed -= chunk_len - - # Decode the string - t0 = time() - if length == 0: - decoded_str = "" - else: - decoded_str = buf[buf_pos : buf_pos + length].decode("utf-8") - t_decode += time() - t0 - - strings.append(decoded_str) - buf_pos += length - buf_remaining -= length - i += 1 - header_received = False - if buf_remaining == _STRING_LENGTH_PREFIX_SIZE: - # There is still data in the buffer, probably the handshake header - received = struct.unpack( - " _STRING_LENGTH_PREFIX_SIZE: - raise ValueError( - "Unexpected number of bytes in buffer: {}".format(buf_remaining) - ) - - t_total = time() - t_total_start - return (strings, t_total, t_decode, t_io, num_strings, header_received) - - def _get_numpy_value_type(jvm, dtype): """Maps numpy dtype to SystemDS ValueType.""" if dtype is np.dtype(np.uint8): @@ -280,37 +158,39 @@ def _pipe_write_task(_pipe_id, _pipe, memview, start, end): return fut_java.result() # Java returns MatrixBlock -def numpy_to_matrix_block(sds, np_arr: np.array): +def numpy_to_matrix_block(sds, arr: np.ndarray | sp.spmatrix): """Converts a given numpy array, to internal matrix block representation. :param sds: The current systemds context. - :param np_arr: the numpy array to convert to matrixblock. + :param np_arr: the numpy array or scipy sparse matrix to convert to matrixblock. """ - assert np_arr.ndim <= 2, "np_arr invalid, because it has more than 2 dimensions" - rows = np_arr.shape[0] - cols = np_arr.shape[1] if np_arr.ndim == 2 else 1 + assert arr.ndim <= 2, "np_arr invalid, because it has more than 2 dimensions" + rows = arr.shape[0] + cols = arr.shape[1] if arr.ndim == 2 else 1 if rows > 2147483647: raise ValueError("Matrix rows exceed maximum value (2147483647)") # If not numpy array then convert to numpy array - if not isinstance(np_arr, np.ndarray): - np_arr = np.asarray(np_arr, dtype=np.float64) + if isinstance(arr, sp.spmatrix): + return scipy_sparse_matrix_to_matrix_block(sds, arr) + if not isinstance(arr, np.ndarray): + arr = np.asarray(arr, dtype=np.float64) jvm: JVMView = sds.java_gateway.jvm ep = sds.java_gateway.entry_point # Flatten and set value type - if np_arr.dtype is np.dtype(np.uint8): - arr = np_arr.ravel() - elif np_arr.dtype is np.dtype(np.int32): - arr = np_arr.ravel() - elif np_arr.dtype is np.dtype(np.float32): - arr = np_arr.ravel() + if arr.dtype is np.dtype(np.uint8): + arr = arr.ravel() + elif arr.dtype is np.dtype(np.int32): + arr = arr.ravel() + elif arr.dtype is np.dtype(np.float32): + arr = arr.ravel() else: - arr = np_arr.ravel().astype(np.float64) + arr = arr.ravel().astype(np.float64) - value_type = _get_numpy_value_type(jvm, np_arr.dtype) + value_type = _get_numpy_value_type(jvm, arr.dtype) if sds._data_transfer_mode == 1: mv = memoryview(arr).cast("B") @@ -334,7 +214,7 @@ def numpy_to_matrix_block(sds, np_arr: np.array): ) else: return _transfer_matrix_block_multi_pipe( - sds, mv, arr, np_arr, total_bytes, rows, cols, value_type, ep, jvm + sds, mv, arr, arr, total_bytes, rows, cols, value_type, ep, jvm ) else: # Prepare byte buffer and send data to java via Py4J @@ -343,6 +223,41 @@ def numpy_to_matrix_block(sds, np_arr: np.array): return j_class.convertPy4JArrayToMB(buf, rows, cols, value_type) +def scipy_sparse_matrix_to_matrix_block(sds, arr: sp.spmatrix): + """Converts a given scipy sparse matrix to an internal matrix block representation. + + :param sds: The current systemds context. + :param arr: The scipy sparse matrix to convert to matrixblock. + """ + jvm: JVMView = sds.java_gateway.jvm + ep = sds.java_gateway.entry_point + + if sds._data_transfer_mode == 1: + # single pipe implementation + pass + else: + # py4j implementation + j_class: JavaClass = jvm.org.apache.sysds.runtime.util.Py4jConverterUtils + if isinstance(arr, sp.csr_matrix): + data = arr.data.tobytes() + indices = arr.indices.tobytes() + indptr = arr.indptr.tobytes() + nnz = arr.nnz + rows = arr.shape[0] + cols = arr.shape[1] + # convertSciPyCSRToMB(byte[] data, byte[] indices, byte[] indptr, int rlen, int clen, int nnz) + return j_class.convertSciPyCSRToMB(data, indices, indptr, rows, cols, nnz) + elif isinstance(arr, sp.coo_matrix): + data = arr.data.tobytes() + row = arr.row.tobytes() + col = arr.col.tobytes() + nnz = arr.nnz + rows = arr.shape[0] + cols = arr.shape[1] + # convertSciPyCOOToMB(byte[] data, byte[] row, byte[] col, int rlen, int clen, int nnz) + return j_class.convertSciPyCOOToMB(data, row, col, rows, cols, nnz) + + def matrix_block_to_numpy(sds, mb: JavaObject): """Converts a MatrixBlock object in the JVM to a numpy array. @@ -763,6 +678,129 @@ def _pipe_transfer_strings(pipe, pd_series, batch_size=_DEFAULT_BATCH_SIZE_BYTES return (t_total, t_encoding, t_packing, t_io, num_strings) +def _pipe_receive_strings( + pipe, num_strings, batch_size=_DEFAULT_BATCH_SIZE_BYTES, pipe_id=0, logger=None +): + """ + Reads UTF-8 encoded strings from the pipe in batches. + Format: 0: + buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining] + + # Read more data + t0 = time() + chunk = os.read(fd, batch_size) + t_io += time() - t0 + if not chunk: + raise IOError("Pipe read returned empty data unexpectedly") + + # Append new data to buffer + chunk_len = len(chunk) + if buf_remaining + chunk_len > len(buf): + # Grow buffer if needed + new_buf = bytearray(len(buf) * 2) + new_buf[:buf_remaining] = buf[:buf_remaining] + buf = new_buf + + buf[buf_remaining : buf_remaining + chunk_len] = chunk + buf_remaining += chunk_len + buf_pos = 0 + + # Read length prefix (little-endian int32) + # Note: length can be -1 (0xFFFFFFFF) to indicate null value + length = struct.unpack( + " 0: + buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining] + buf_pos = 0 + + # Read more data until we have enough + bytes_needed = length - buf_remaining + while bytes_needed > 0: + t0 = time() + chunk = os.read(fd, min(batch_size, bytes_needed)) + t_io += time() - t0 + if not chunk: + raise IOError("Pipe read returned empty data unexpectedly") + + chunk_len = len(chunk) + if buf_remaining + chunk_len > len(buf): + # Grow buffer if needed + new_buf = bytearray(len(buf) * 2) + new_buf[:buf_remaining] = buf[:buf_remaining] + buf = new_buf + + buf[buf_remaining : buf_remaining + chunk_len] = chunk + buf_remaining += chunk_len + bytes_needed -= chunk_len + + # Decode the string + t0 = time() + if length == 0: + decoded_str = "" + else: + decoded_str = buf[buf_pos : buf_pos + length].decode("utf-8") + t_decode += time() - t0 + + strings.append(decoded_str) + buf_pos += length + buf_remaining -= length + i += 1 + header_received = False + if buf_remaining == _STRING_LENGTH_PREFIX_SIZE: + # There is still data in the buffer, probably the handshake header + received = struct.unpack( + " _STRING_LENGTH_PREFIX_SIZE: + raise ValueError( + "Unexpected number of bytes in buffer: {}".format(buf_remaining) + ) + + t_total = time() - t_total_start + return (strings, t_total, t_decode, t_io, num_strings, header_received) + + def _get_elem_size_for_type(d_type): """Returns the element size in bytes for a given SystemDS type.""" return { diff --git a/src/main/python/tests/matrix/test_block_converter.py b/src/main/python/tests/matrix/test_block_converter.py index ef4b28b1bc4..168b48b6ebd 100644 --- a/src/main/python/tests/matrix/test_block_converter.py +++ b/src/main/python/tests/matrix/test_block_converter.py @@ -26,6 +26,7 @@ from py4j.java_gateway import JVMView from systemds.context import SystemDSContext from systemds.utils.converters import matrix_block_to_numpy, numpy_to_matrix_block +import scipy.sparse as sp class Test_MatrixBlockConverter(unittest.TestCase): @@ -35,7 +36,9 @@ class Test_MatrixBlockConverter(unittest.TestCase): @classmethod def setUpClass(cls): - cls.sds = SystemDSContext(capture_stdout=True, logging_level=50) + cls.sds = SystemDSContext( + capture_stdout=True, logging_level=50, data_transfer_mode=0 + ) @classmethod def tearDownClass(cls): @@ -70,10 +73,36 @@ def test_random_nxk(self): array = np.array([rng.standard_normal(n) for x in range(k)]) self.convert_back_and_forth(array) + def test_random_sparse_csr_nxn(self): + n = 10 + array = sp.rand(n, n, density=0.1, format="csr") + self.convert_back_and_forth(array) + + def test_sparse_csr_rectangular(self): + """Test CSR conversion with rectangular matrices""" + array = sp.rand(5, 10, density=0.2, format="csr") + self.convert_back_and_forth(array) + + def test_sparse_csr_known_values(self): + """Test CSR conversion with a known sparse matrix""" + # Create a known CSR matrix + data = np.array([1.0, 2.0, 3.0, 4.0]) + row = np.array([0, 0, 1, 2]) + col = np.array([0, 2, 1, 2]) + array = sp.csr_matrix((data, (row, col)), shape=(3, 3)) + self.convert_back_and_forth(array) + + def test_random_sparse_coo_nxn(self): + n = 10 + array = sp.rand(n, n, density=0.1, format="coo") + self.convert_back_and_forth(array) + def convert_back_and_forth(self, array): matrix_block = numpy_to_matrix_block(self.sds, array) # use the ability to call functions on matrix_block. returned = matrix_block_to_numpy(self.sds, matrix_block) + if isinstance(array, sp.spmatrix): + array = array.toarray() self.assertTrue(np.allclose(array, returned)) diff --git a/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java b/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java deleted file mode 100644 index 466c3337d83..00000000000 --- a/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysds.test.component.frame.array; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.charset.StandardCharsets; - -import org.apache.sysds.common.Types; -import org.apache.sysds.common.Types.ValueType; -import org.apache.sysds.runtime.util.Py4jConverterUtils; -import org.apache.sysds.runtime.frame.data.columns.Array; -import org.junit.Test; - -public class Py4jConverterUtilsTest { - - @Test - public void testConvertUINT8() { - int numElements = 4; - byte[] data = {1, 2, 3, 4}; - Array result = Py4jConverterUtils.convert(data, numElements, Types.ValueType.UINT8); - assertNotNull(result); - assertEquals(4, result.size()); - assertEquals(1, result.get(0)); - assertEquals(2, result.get(1)); - assertEquals(3, result.get(2)); - assertEquals(4, result.get(3)); - } - - @Test - public void testConvertINT32() { - int numElements = 4; - ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * numElements); - buffer.order(ByteOrder.nativeOrder()); - for(int i = 1; i <= numElements; i++) { - buffer.putInt(i); - } - Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.INT32); - assertNotNull(result); - assertEquals(4, result.size()); - assertEquals(1, result.get(0)); - assertEquals(2, result.get(1)); - assertEquals(3, result.get(2)); - assertEquals(4, result.get(3)); - } - - @Test - public void testConvertINT64() { - int numElements = 4; - ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * numElements); - buffer.order(ByteOrder.nativeOrder()); - for(int i = 1; i <= numElements; i++) { - buffer.putLong((long) i); - } - Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.INT64); - assertNotNull(result); - assertEquals(4, result.size()); - assertEquals(1L, result.get(0)); - assertEquals(2L, result.get(1)); - assertEquals(3L, result.get(2)); - assertEquals(4L, result.get(3)); - } - - - @Test - public void testConvertHASH32() { - int numElements = 4; - ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * numElements); - buffer.order(ByteOrder.nativeOrder()); - for(int i = 1; i <= numElements; i++) { - buffer.putInt(i); - } - Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.HASH32); - assertNotNull(result); - assertEquals(4, result.size()); - assertEquals("1", result.get(0)); - assertEquals("2", result.get(1)); - assertEquals("3", result.get(2)); - assertEquals("4", result.get(3)); - } - - @Test - public void testConvertHASH64() { - int numElements = 4; - ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * numElements); - buffer.order(ByteOrder.nativeOrder()); - for(int i = 1; i <= numElements; i++) { - buffer.putLong((long) i); - } - Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.HASH64); - assertNotNull(result); - assertEquals(4, result.size()); - assertEquals("1", result.get(0)); - assertEquals("2", result.get(1)); - assertEquals("3", result.get(2)); - assertEquals("4", result.get(3)); - } - - @Test - public void testConvertFP32() { - int numElements = 4; - ByteBuffer buffer = ByteBuffer.allocate(Float.BYTES * numElements); - buffer.order(ByteOrder.nativeOrder()); - for(float i = 1.1f; i <= numElements + 1; i += 1.0) { - buffer.putFloat(i); - } - Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.FP32); - assertNotNull(result); - assertEquals(4, result.size()); - assertEquals(1.1f, result.get(0)); - assertEquals(2.1f, result.get(1)); - assertEquals(3.1f, result.get(2)); - assertEquals(4.1f, result.get(3)); - } - - @Test - public void testConvertFP64() { - int numElements = 4; - ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES * numElements); - buffer.order(ByteOrder.nativeOrder()); - for(double i = 1.1; i <= numElements + 1; i += 1.0) { - buffer.putDouble(i); - } - Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.FP64); - assertNotNull(result); - assertEquals(4, result.size()); - assertEquals(1.1, result.get(0)); - assertEquals(2.1, result.get(1)); - assertEquals(3.1, result.get(2)); - assertEquals(4.1, result.get(3)); - } - - @Test - public void testConvertBoolean() { - int numElements = 4; - byte[] data = {1, 0, 1, 0}; - Array result = Py4jConverterUtils.convert(data, numElements, Types.ValueType.BOOLEAN); - assertNotNull(result); - assertEquals(4, result.size()); - assertEquals(true, result.get(0)); - assertEquals(false, result.get(1)); - assertEquals(true, result.get(2)); - assertEquals(false, result.get(3)); - } - - @Test - public void testConvertString() { - int numElements = 2; - String[] strings = {"hello", "world"}; - ByteBuffer buffer = ByteBuffer.allocate(4 + strings[0].length() + 4 + strings[1].length()); - buffer.order(ByteOrder.LITTLE_ENDIAN); - for(String s : strings) { - buffer.putInt(s.length()); - buffer.put(s.getBytes(StandardCharsets.UTF_8)); - } - Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.STRING); - assertNotNull(result); - assertEquals(2, result.size()); - assertEquals("hello", result.get(0)); - assertEquals("world", result.get(1)); - } - - @Test - public void testConvertChar() { - char[] c = {'h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd'}; - ByteBuffer buffer = ByteBuffer.allocate(Character.BYTES * c.length); - buffer.order(ByteOrder.LITTLE_ENDIAN); - for(char s : c) { - buffer.putChar(s); - } - Array result = Py4jConverterUtils.convert(buffer.array(), c.length, Types.ValueType.CHARACTER); - assertNotNull(result); - assertEquals(c.length, result.size()); - - for(int i = 0; i < c.length; i++) { - assertEquals(c[i], result.get(i)); - } - } - - @Test - public void testConvertRow() { - int numElements = 4; - byte[] data = {1, 2, 3, 4}; - Object[] row = Py4jConverterUtils.convertRow(data, numElements, Types.ValueType.UINT8); - assertNotNull(row); - assertEquals(4, row.length); - assertEquals(1, row[0]); - assertEquals(2, row[1]); - assertEquals(3, row[2]); - assertEquals(4, row[3]); - } - - @Test - public void testConvertFused() { - int numElements = 1; - byte[] data = {1, 2, 3, 4}; - Types.ValueType[] valueTypes = {ValueType.UINT8, ValueType.UINT8, ValueType.UINT8, ValueType.UINT8}; - Array[] arrays = Py4jConverterUtils.convertFused(data, numElements, valueTypes); - assertNotNull(arrays); - assertEquals(4, arrays.length); - for(int i = 0; i < 4; i++) { - assertEquals(1 + i, arrays[i].get(0)); - } - } - - @Test(expected = Exception.class) - public void nullData() { - Py4jConverterUtils.convert(null, 14, ValueType.BOOLEAN); - } - - @Test(expected = Exception.class) - public void nullValueType() { - Py4jConverterUtils.convert(new byte[] {1, 2, 3}, 14, null); - } - - @Test(expected = Exception.class) - public void unknownValueType() { - Py4jConverterUtils.convert(new byte[] {1, 2, 3}, 14, ValueType.UNKNOWN); - } -} diff --git a/src/test/java/org/apache/sysds/test/component/utils/Py4jConverterUtilsTest.java b/src/test/java/org/apache/sysds/test/component/utils/Py4jConverterUtilsTest.java new file mode 100644 index 00000000000..e35e547971d --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/utils/Py4jConverterUtilsTest.java @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; + +import org.apache.sysds.common.Types; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.columns.Array; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.util.Py4jConverterUtils; +import org.junit.Test; + +public class Py4jConverterUtilsTest { + + @Test + public void testConvertUINT8() { + int numElements = 4; + byte[] data = {1, 2, 3, 4}; + Array result = Py4jConverterUtils.convert(data, numElements, Types.ValueType.UINT8); + assertNotNull(result); + assertEquals(4, result.size()); + assertEquals(1, result.get(0)); + assertEquals(2, result.get(1)); + assertEquals(3, result.get(2)); + assertEquals(4, result.get(3)); + } + + @Test + public void testConvertINT32() { + int numElements = 4; + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * numElements); + buffer.order(ByteOrder.nativeOrder()); + for(int i = 1; i <= numElements; i++) { + buffer.putInt(i); + } + Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.INT32); + assertNotNull(result); + assertEquals(4, result.size()); + assertEquals(1, result.get(0)); + assertEquals(2, result.get(1)); + assertEquals(3, result.get(2)); + assertEquals(4, result.get(3)); + } + + @Test + public void testConvertINT64() { + int numElements = 4; + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * numElements); + buffer.order(ByteOrder.nativeOrder()); + for(int i = 1; i <= numElements; i++) { + buffer.putLong((long) i); + } + Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.INT64); + assertNotNull(result); + assertEquals(4, result.size()); + assertEquals(1L, result.get(0)); + assertEquals(2L, result.get(1)); + assertEquals(3L, result.get(2)); + assertEquals(4L, result.get(3)); + } + + + @Test + public void testConvertHASH32() { + int numElements = 4; + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * numElements); + buffer.order(ByteOrder.nativeOrder()); + for(int i = 1; i <= numElements; i++) { + buffer.putInt(i); + } + Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.HASH32); + assertNotNull(result); + assertEquals(4, result.size()); + assertEquals("1", result.get(0)); + assertEquals("2", result.get(1)); + assertEquals("3", result.get(2)); + assertEquals("4", result.get(3)); + } + + @Test + public void testConvertHASH64() { + int numElements = 4; + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * numElements); + buffer.order(ByteOrder.nativeOrder()); + for(int i = 1; i <= numElements; i++) { + buffer.putLong((long) i); + } + Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.HASH64); + assertNotNull(result); + assertEquals(4, result.size()); + assertEquals("1", result.get(0)); + assertEquals("2", result.get(1)); + assertEquals("3", result.get(2)); + assertEquals("4", result.get(3)); + } + + @Test + public void testConvertFP32() { + int numElements = 4; + ByteBuffer buffer = ByteBuffer.allocate(Float.BYTES * numElements); + buffer.order(ByteOrder.nativeOrder()); + for(float i = 1.1f; i <= numElements + 1; i += 1.0) { + buffer.putFloat(i); + } + Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.FP32); + assertNotNull(result); + assertEquals(4, result.size()); + assertEquals(1.1f, result.get(0)); + assertEquals(2.1f, result.get(1)); + assertEquals(3.1f, result.get(2)); + assertEquals(4.1f, result.get(3)); + } + + @Test + public void testConvertFP64() { + int numElements = 4; + ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES * numElements); + buffer.order(ByteOrder.nativeOrder()); + for(double i = 1.1; i <= numElements + 1; i += 1.0) { + buffer.putDouble(i); + } + Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.FP64); + assertNotNull(result); + assertEquals(4, result.size()); + assertEquals(1.1, result.get(0)); + assertEquals(2.1, result.get(1)); + assertEquals(3.1, result.get(2)); + assertEquals(4.1, result.get(3)); + } + + @Test + public void testConvertBoolean() { + int numElements = 4; + byte[] data = {1, 0, 1, 0}; + Array result = Py4jConverterUtils.convert(data, numElements, Types.ValueType.BOOLEAN); + assertNotNull(result); + assertEquals(4, result.size()); + assertEquals(true, result.get(0)); + assertEquals(false, result.get(1)); + assertEquals(true, result.get(2)); + assertEquals(false, result.get(3)); + } + + @Test + public void testConvertString() { + int numElements = 2; + String[] strings = {"hello", "world"}; + ByteBuffer buffer = ByteBuffer.allocate(4 + strings[0].length() + 4 + strings[1].length()); + buffer.order(ByteOrder.LITTLE_ENDIAN); + for(String s : strings) { + buffer.putInt(s.length()); + buffer.put(s.getBytes(StandardCharsets.UTF_8)); + } + Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.STRING); + assertNotNull(result); + assertEquals(2, result.size()); + assertEquals("hello", result.get(0)); + assertEquals("world", result.get(1)); + } + + @Test + public void testConvertChar() { + char[] c = {'h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd'}; + ByteBuffer buffer = ByteBuffer.allocate(Character.BYTES * c.length); + buffer.order(ByteOrder.LITTLE_ENDIAN); + for(char s : c) { + buffer.putChar(s); + } + Array result = Py4jConverterUtils.convert(buffer.array(), c.length, Types.ValueType.CHARACTER); + assertNotNull(result); + assertEquals(c.length, result.size()); + + for(int i = 0; i < c.length; i++) { + assertEquals(c[i], result.get(i)); + } + } + + @Test + public void testConvertRow() { + int numElements = 4; + byte[] data = {1, 2, 3, 4}; + Object[] row = Py4jConverterUtils.convertRow(data, numElements, Types.ValueType.UINT8); + assertNotNull(row); + assertEquals(4, row.length); + assertEquals(1, row[0]); + assertEquals(2, row[1]); + assertEquals(3, row[2]); + assertEquals(4, row[3]); + } + + @Test + public void testConvertFused() { + int numElements = 1; + byte[] data = {1, 2, 3, 4}; + Types.ValueType[] valueTypes = {ValueType.UINT8, ValueType.UINT8, ValueType.UINT8, ValueType.UINT8}; + Array[] arrays = Py4jConverterUtils.convertFused(data, numElements, valueTypes); + assertNotNull(arrays); + assertEquals(4, arrays.length); + for(int i = 0; i < 4; i++) { + assertEquals(1 + i, arrays[i].get(0)); + } + } + + @Test(expected = Exception.class) + public void nullData() { + Py4jConverterUtils.convert(null, 14, ValueType.BOOLEAN); + } + + @Test(expected = Exception.class) + public void nullValueType() { + Py4jConverterUtils.convert(new byte[] {1, 2, 3}, 14, null); + } + + @Test(expected = Exception.class) + public void unknownValueType() { + Py4jConverterUtils.convert(new byte[] {1, 2, 3}, 14, ValueType.UNKNOWN); + } + + @Test + public void testConvertPy4JArrayToMB_FP64() { + int rlen = 2; + int clen = 3; + ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES * rlen * clen); + buffer.order(ByteOrder.nativeOrder()); + double[] values = {1.1, 2.2, 3.3, 4.4, 5.5, 6.6}; + for(double val : values) { + buffer.putDouble(val); + } + MatrixBlock mb = Py4jConverterUtils.convertPy4JArrayToMB(buffer.array(), rlen, clen); + assertNotNull(mb); + assertEquals(rlen, mb.getNumRows()); + assertEquals(clen, mb.getNumColumns()); + assertEquals(1.1, mb.get(0, 0), 0.0001); + assertEquals(2.2, mb.get(0, 1), 0.0001); + assertEquals(3.3, mb.get(0, 2), 0.0001); + assertEquals(4.4, mb.get(1, 0), 0.0001); + assertEquals(5.5, mb.get(1, 1), 0.0001); + assertEquals(6.6, mb.get(1, 2), 0.0001); + } + + @Test + public void testConvertPy4JArrayToMB_UINT8() { + int rlen = 2; + int clen = 2; + byte[] data = {(byte) 1, (byte) 2, (byte) 3, (byte) 4}; + MatrixBlock mb = Py4jConverterUtils.convertPy4JArrayToMB(data, rlen, clen, ValueType.UINT8); + assertNotNull(mb); + assertEquals(rlen, mb.getNumRows()); + assertEquals(clen, mb.getNumColumns()); + assertEquals(1.0, mb.get(0, 0), 0.0001); + assertEquals(2.0, mb.get(0, 1), 0.0001); + assertEquals(3.0, mb.get(1, 0), 0.0001); + assertEquals(4.0, mb.get(1, 1), 0.0001); + } + + @Test + public void testConvertPy4JArrayToMB_INT32() { + int rlen = 2; + int clen = 2; + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * rlen * clen); + buffer.order(ByteOrder.nativeOrder()); + int[] values = {10, 20, 30, 40}; + for(int val : values) { + buffer.putInt(val); + } + MatrixBlock mb = Py4jConverterUtils.convertPy4JArrayToMB(buffer.array(), rlen, clen, ValueType.INT32); + assertNotNull(mb); + assertEquals(rlen, mb.getNumRows()); + assertEquals(clen, mb.getNumColumns()); + assertEquals(10.0, mb.get(0, 0), 0.0001); + assertEquals(20.0, mb.get(0, 1), 0.0001); + assertEquals(30.0, mb.get(1, 0), 0.0001); + assertEquals(40.0, mb.get(1, 1), 0.0001); + } + + @Test + public void testConvertPy4JArrayToMB_FP32() { + int rlen = 2; + int clen = 2; + ByteBuffer buffer = ByteBuffer.allocate(Float.BYTES * rlen * clen); + buffer.order(ByteOrder.nativeOrder()); + float[] values = {1.5f, 2.5f, 3.5f, 4.5f}; + for(float val : values) { + buffer.putFloat(val); + } + MatrixBlock mb = Py4jConverterUtils.convertPy4JArrayToMB(buffer.array(), rlen, clen, ValueType.FP32); + assertNotNull(mb); + assertEquals(rlen, mb.getNumRows()); + assertEquals(clen, mb.getNumColumns()); + assertEquals(1.5f, mb.get(0, 0), 0.0001); + assertEquals(2.5f, mb.get(0, 1), 0.0001); + assertEquals(3.5f, mb.get(1, 0), 0.0001); + assertEquals(4.5f, mb.get(1, 1), 0.0001); + } + + @Test(expected = DMLRuntimeException.class) + public void testConvertPy4JArrayToMB_SparseNotSupported() { + int rlen = 2; + int clen = 2; + byte[] data = {1, 2, 3, 4}; + Py4jConverterUtils.convertPy4JArrayToMB(data, rlen, clen, true, ValueType.UINT8); + } + + @Test + public void testConvertSciPyCOOToMB() { + int rlen = 10; + int clen = 10; + int nnz = 3; + // Create COO format: values at (0,0)=1.0, (1,2)=2.0, (2,1)=3.0 + ByteBuffer dataBuf = ByteBuffer.allocate(Double.BYTES * nnz); + dataBuf.order(ByteOrder.nativeOrder()); + dataBuf.putDouble(1.0); + dataBuf.putDouble(2.0); + dataBuf.putDouble(3.0); + + ByteBuffer rowBuf = ByteBuffer.allocate(Integer.BYTES * nnz); + rowBuf.order(ByteOrder.nativeOrder()); + rowBuf.putInt(0); + rowBuf.putInt(1); + rowBuf.putInt(2); + + ByteBuffer colBuf = ByteBuffer.allocate(Integer.BYTES * nnz); + colBuf.order(ByteOrder.nativeOrder()); + colBuf.putInt(0); + colBuf.putInt(2); + colBuf.putInt(1); + + MatrixBlock mb = Py4jConverterUtils.convertSciPyCOOToMB( + dataBuf.array(), rowBuf.array(), colBuf.array(), rlen, clen, nnz); + assertNotNull(mb); + assertEquals(rlen, mb.getNumRows()); + assertEquals(clen, mb.getNumColumns()); + assertTrue(mb.isInSparseFormat()); + assertEquals(1.0, mb.get(0, 0), 0.0001); + assertEquals(2.0, mb.get(1, 2), 0.0001); + assertEquals(3.0, mb.get(2, 1), 0.0001); + assertEquals(0.0, mb.get(0, 1), 0.0001); + assertEquals(0.0, mb.get(1, 0), 0.0001); + } + + @Test + public void testConvertSciPyCSRToMB() { + int rlen = 10; + int clen = 10; + int nnz = 3; + // Create CSR format: values at (0,0)=1.0, (1,2)=2.0, (2,1)=3.0 + ByteBuffer dataBuf = ByteBuffer.allocate(Double.BYTES * nnz); + dataBuf.order(ByteOrder.nativeOrder()); + dataBuf.putDouble(1.0); + dataBuf.putDouble(2.0); + dataBuf.putDouble(3.0); + + ByteBuffer indicesBuf = ByteBuffer.allocate(Integer.BYTES * nnz); + indicesBuf.order(ByteOrder.nativeOrder()); + indicesBuf.putInt(0); // column for row 0 + indicesBuf.putInt(2); // column for row 1 + indicesBuf.putInt(1); // column for row 2 + + ByteBuffer indptrBuf = ByteBuffer.allocate(Integer.BYTES * (rlen + 1)); + indptrBuf.order(ByteOrder.nativeOrder()); + indptrBuf.putInt(0); // row 0 starts at index 0 + indptrBuf.putInt(1); // row 1 starts at index 1 + indptrBuf.putInt(2); // row 2 starts at index 2 + indptrBuf.putInt(3); // end marker + + MatrixBlock mb = Py4jConverterUtils.convertSciPyCSRToMB( + dataBuf.array(), indicesBuf.array(), indptrBuf.array(), rlen, clen, nnz); + assertNotNull(mb); + assertEquals(rlen, mb.getNumRows()); + assertEquals(clen, mb.getNumColumns()); + assertTrue(mb.isInSparseFormat()); + assertEquals(1.0, mb.get(0, 0), 0.0001); + assertEquals(2.0, mb.get(1, 2), 0.0001); + assertEquals(3.0, mb.get(2, 1), 0.0001); + assertEquals(0.0, mb.get(0, 1), 0.0001); + assertEquals(0.0, mb.get(1, 0), 0.0001); + } + + @Test + public void testAllocateDenseOrSparse_Dense() { + int rlen = 5; + int clen = 5; + MatrixBlock mb = Py4jConverterUtils.allocateDenseOrSparse(rlen, clen, false); + assertNotNull(mb); + assertEquals(rlen, mb.getNumRows()); + assertEquals(clen, mb.getNumColumns()); + assertTrue(!mb.isInSparseFormat()); + } + + @Test + public void testAllocateDenseOrSparse_Sparse() { + int rlen = 5; + int clen = 5; + MatrixBlock mb = Py4jConverterUtils.allocateDenseOrSparse(rlen, clen, true); + assertNotNull(mb); + assertEquals(rlen, mb.getNumRows()); + assertEquals(clen, mb.getNumColumns()); + assertTrue(mb.isInSparseFormat()); + } + + @Test + public void testAllocateDenseOrSparse_Long() { + long rlen = 10L; + long clen = 10L; + MatrixBlock mb = Py4jConverterUtils.allocateDenseOrSparse(rlen, clen, false); + assertNotNull(mb); + assertEquals((int) rlen, mb.getNumRows()); + assertEquals((int) clen, mb.getNumColumns()); + } + + @Test(expected = DMLRuntimeException.class) + public void testAllocateDenseOrSparse_LongTooLarge() { + long rlen = Integer.MAX_VALUE + 1L; + long clen = 10L; + Py4jConverterUtils.allocateDenseOrSparse(rlen, clen, false); + } + + @Test + public void testConvertMBtoPy4JDenseArr() { + int rlen = 2; + int clen = 2; + MatrixBlock mb = new MatrixBlock(rlen, clen, false); + mb.allocateBlock(); + mb.set(0, 0, 1.0); + mb.set(0, 1, 2.0); + mb.set(1, 0, 3.0); + mb.set(1, 1, 4.0); + + byte[] result = Py4jConverterUtils.convertMBtoPy4JDenseArr(mb); + assertNotNull(result); + assertEquals(Double.BYTES * rlen * clen, result.length); + + ByteBuffer buffer = ByteBuffer.wrap(result); + buffer.order(ByteOrder.nativeOrder()); + assertEquals(1.0, buffer.getDouble(), 0.0001); + assertEquals(2.0, buffer.getDouble(), 0.0001); + assertEquals(3.0, buffer.getDouble(), 0.0001); + assertEquals(4.0, buffer.getDouble(), 0.0001); + } + + @Test + public void testConvertMBtoPy4JDenseArr_RoundTrip() { + int rlen = 2; + int clen = 3; + ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES * rlen * clen); + buffer.order(ByteOrder.nativeOrder()); + double[] values = {1.1, 2.2, 3.3, 4.4, 5.5, 6.6}; + for(double val : values) { + buffer.putDouble(val); + } + + MatrixBlock mb = Py4jConverterUtils.convertPy4JArrayToMB(buffer.array(), rlen, clen); + byte[] result = Py4jConverterUtils.convertMBtoPy4JDenseArr(mb); + + ByteBuffer resultBuffer = ByteBuffer.wrap(result); + resultBuffer.order(ByteOrder.nativeOrder()); + for(double expected : values) { + assertEquals(expected, resultBuffer.getDouble(), 0.0001); + } + } + + @Test + public void testConvertMBtoPy4JDenseArr_SparseToDense() { + new Py4jConverterUtils(); + int rlen = 3; + int clen = 3; + MatrixBlock mb = new MatrixBlock(rlen, clen, true); + mb.allocateSparseRowsBlock(false); + mb.set(0, 0, 1.0); + mb.set(2, 2, 2.0); + + byte[] result = Py4jConverterUtils.convertMBtoPy4JDenseArr(mb); + assertNotNull(result); + assertEquals(Double.BYTES * rlen * clen, result.length); + + ByteBuffer buffer = ByteBuffer.wrap(result); + buffer.order(ByteOrder.nativeOrder()); + assertEquals(1.0, buffer.getDouble(), 0.0001); + // Skip to position (2,2) = index 8 + for(int i = 1; i < 8; i++) { + assertEquals(0.0, buffer.getDouble(), 0.0001); + } + assertEquals(2.0, buffer.getDouble(), 0.0001); + } +}