From 52a7034f50bc23351ebf8a96fd098acb325947cf Mon Sep 17 00:00:00 2001 From: david dali susanibar arce Date: Mon, 24 Jul 2023 16:39:55 -0500 Subject: [PATCH 1/4] feat: Document how to convert JDBC Adapter result into a Parquet file --- java/source/dataset.rst | 8 +- java/source/flight.rst | 4 +- java/source/jdbc.rst | 164 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 173 insertions(+), 3 deletions(-) diff --git a/java/source/dataset.rst b/java/source/dataset.rst index f7ee556a..05f063df 100644 --- a/java/source/dataset.rst +++ b/java/source/dataset.rst @@ -533,4 +533,10 @@ Let's read a CSV file. Salesforce Slack 27.7 01/12/2020 Total batch size: 3 -.. _Arrow Java Dataset: https://arrow.apache.org/docs/dev/java/dataset.html \ No newline at end of file +.. _Arrow Java Dataset: https://arrow.apache.org/docs/dev/java/dataset.html + + +Write Parquet Files +=================== + +Go to :doc:`JDBC Adapter - Write ResultSet to Parquet File ` for an example. diff --git a/java/source/flight.rst b/java/source/flight.rst index 53017b24..fc2f76e0 100644 --- a/java/source/flight.rst +++ b/java/source/flight.rst @@ -287,7 +287,7 @@ Flight Client and Server S1: Server (Location): Listening on port 33333 C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333 C2: Client (Populate Data): Wrote 2 batches with 3 rows each - C3: Client (Get Metadata): FlightInfo{schema=Schema, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=6, ordered=false} + C3: Client (Get Metadata): FlightInfo{schema=Schema, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a, expirationTime=(none)}], bytes=-1, records=6, ordered=false} C4: Client (Get Stream): Client Received batch #1, Data: name @@ -299,7 +299,7 @@ Flight Client and Server Manuel Felipe JJ - C5: Client (List Flights Info): FlightInfo{schema=Schema, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=6, ordered=false} + C5: Client (List Flights Info): FlightInfo{schema=Schema, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a, expirationTime=(none)}], bytes=-1, records=6, ordered=false} C6: Client (Do Delete Action): Delete completed C7: Client (List Flights Info): After delete - No records C8: Server shut down successfully diff --git a/java/source/jdbc.rst b/java/source/jdbc.rst index 78f78f27..2242861b 100644 --- a/java/source/jdbc.rst +++ b/java/source/jdbc.rst @@ -307,3 +307,167 @@ values to the given scale. 102 true 100000000030.0000000 some char text [1,2] INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19 103 true 10000000003.0000000 some char text [1] + +Write ResultSet to Parquet File +=============================== + +In this example, we have the JDBC adapter result and trying to write them +into a parquet file. + +.. testcode:: + + import java.io.BufferedReader; + import java.io.FileReader; + import java.io.IOException; + import java.nio.file.DirectoryStream; + import java.nio.file.Files; + import java.nio.file.Path; + import java.sql.Connection; + import java.sql.DriverManager; + import java.sql.ResultSet; + import java.sql.SQLException; + import java.sql.Types; + import java.util.HashMap; + + import org.apache.arrow.adapter.jdbc.ArrowVectorIterator; + import org.apache.arrow.adapter.jdbc.JdbcFieldInfo; + import org.apache.arrow.adapter.jdbc.JdbcToArrow; + import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig; + import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder; + import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils; + import org.apache.arrow.dataset.file.DatasetFileWriter; + import org.apache.arrow.dataset.file.FileFormat; + import org.apache.arrow.dataset.file.FileSystemDatasetFactory; + import org.apache.arrow.dataset.jni.NativeMemoryPool; + import org.apache.arrow.dataset.scanner.ScanOptions; + import org.apache.arrow.dataset.scanner.Scanner; + import org.apache.arrow.dataset.source.Dataset; + import org.apache.arrow.dataset.source.DatasetFactory; + import org.apache.arrow.memory.BufferAllocator; + import org.apache.arrow.memory.RootAllocator; + import org.apache.arrow.vector.VectorLoader; + import org.apache.arrow.vector.VectorSchemaRoot; + import org.apache.arrow.vector.VectorUnloader; + import org.apache.arrow.vector.ipc.ArrowReader; + import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; + import org.apache.arrow.vector.types.pojo.Schema; + import org.apache.ibatis.jdbc.ScriptRunner; + + class JDBCReader extends ArrowReader { + private final ArrowVectorIterator iter; + private final Schema schema; + + public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter, Schema schema) { + super(allocator); + this.iter = iter; + this.schema = schema; + } + + @Override + public boolean loadNextBatch() throws IOException { + while (iter.hasNext()) { + try (VectorSchemaRoot rootTmp = iter.next()) { + if (rootTmp.getRowCount() > 0) { + VectorUnloader unloader = new VectorUnloader(rootTmp); + VectorLoader loader = new VectorLoader(super.getVectorSchemaRoot()); + try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) { + loader.load(recordBatch); + } + return true; + } + else { + return false; + } + } + } + return false; + } + + @Override + public long bytesRead() { + return 0; + } + + @Override + protected void closeReadSource() throws IOException { + } + + @Override + protected Schema readSchema() { + return schema; + } + } + final BufferAllocator allocator = new RootAllocator(); + try ( + final Connection connection = DriverManager.getConnection( + "jdbc:h2:mem:h2-jdbc-adapter") + ) { + ScriptRunner runnerDDLDML = new ScriptRunner(connection); + runnerDDLDML.setLogWriter(null); + runnerDDLDML.runScript(new BufferedReader( + new FileReader("./thirdpartydeps/jdbc/h2-ddl.sql"))); + runnerDDLDML.runScript(new BufferedReader( + new FileReader("./thirdpartydeps/jdbc/h2-dml.sql"))); + JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocator, + JdbcToArrowUtils.getUtcCalendar()) + .setTargetBatchSize(2) + .setArraySubTypeByColumnNameMap( + new HashMap() {{ + put("LIST_FIELD19", + new JdbcFieldInfo(Types.INTEGER)); + }} + ) + .build(); + String query = "SELECT int_field1, bool_field2, bigint_field5, char_field16, list_field19 FROM TABLE1"; + try ( + final ResultSet resultSetConvertToParquet = connection.createStatement().executeQuery(query); + final ResultSet resultSetForSchema = connection.createStatement().executeQuery(query); + final ArrowVectorIterator arrowVectorIterator = JdbcToArrow.sqlToArrowVectorIterator( + resultSetConvertToParquet, config) + ) { + Schema schema = JdbcToArrow.sqlToArrowVectorIterator(resultSetForSchema, config).next().getSchema(); + Path uri = Files.createTempDirectory("parquet_"); + try ( + // get jdbc row data as a arrow reader + final JDBCReader arrowReader = new JDBCReader(allocator, arrowVectorIterator, schema) + ) { + // write arrow reader to parqueet file + DatasetFileWriter.write(allocator, arrowReader, FileFormat.PARQUET, uri.toUri().toString()); + } + // validate data of parquet file created + ScanOptions options = new ScanOptions(/*batchSize*/ 32768); + try ( + DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, + NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri.toUri().toString()); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader reader = scanner.scanBatches() + ) { + while (reader.loadNextBatch()) { + System.out.print(reader.getVectorSchemaRoot().contentToTSVString()); + } + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + // delete temporary parquet file created + try (DirectoryStream dir = Files.newDirectoryStream(uri)) { + uri.toFile().deleteOnExit(); + for (Path path : dir) { + path.toFile().deleteOnExit(); + } + } + } + runnerDDLDML.closeConnection(); + } catch (SQLException | IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + +.. testoutput:: + + INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19 + 101 true 1000000000300 some char text [1,2,3] + 102 true 100000000030 some char text [1,2] + INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19 + 103 true 10000000003 some char text [1] From 7d448c009f839dcb9e21d40ba8a194ebf701fabf Mon Sep 17 00:00:00 2001 From: david dali susanibar arce Date: Fri, 11 Aug 2023 13:30:53 -0500 Subject: [PATCH 2/4] clean code + jdbc/dataset/reader recipes --- java/source/dataset.rst | 84 ++++++++++++++++++++++++++++++++++- java/source/demo/pom.xml | 5 +++ java/source/io.rst | 92 +++++++++++++++++++++++++++++++++++++++ java/source/jdbc.rst | 80 ++++++++++++++++++++++------------ java/source/substrait.rst | 8 ++-- 5 files changed, 236 insertions(+), 33 deletions(-) diff --git a/java/source/dataset.rst b/java/source/dataset.rst index 05f063df..857956ce 100644 --- a/java/source/dataset.rst +++ b/java/source/dataset.rst @@ -539,4 +539,86 @@ Let's read a CSV file. Write Parquet Files =================== -Go to :doc:`JDBC Adapter - Write ResultSet to Parquet File ` for an example. +Let's read an Arrow file and populate that data into a Parquet file. + +.. testcode:: + + import java.io.IOException; + import java.nio.file.DirectoryStream; + import java.nio.file.Files; + import java.nio.file.Path; + import java.nio.file.Paths; + + import org.apache.arrow.dataset.file.DatasetFileWriter; + import org.apache.arrow.dataset.file.FileFormat; + import org.apache.arrow.dataset.file.FileSystemDatasetFactory; + import org.apache.arrow.dataset.jni.NativeMemoryPool; + import org.apache.arrow.dataset.scanner.ScanOptions; + import org.apache.arrow.dataset.scanner.Scanner; + import org.apache.arrow.dataset.source.Dataset; + import org.apache.arrow.dataset.source.DatasetFactory; + import org.apache.arrow.memory.BufferAllocator; + import org.apache.arrow.memory.RootAllocator; + import org.apache.arrow.vector.ipc.ArrowFileReader; + import org.apache.arrow.vector.ipc.ArrowReader; + import org.apache.arrow.vector.ipc.SeekableReadChannel; + import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; + + // read arrow demo data + Path uriRead = Paths.get("./thirdpartydeps/arrowfiles/random_access.arrow"); + try ( + BufferAllocator allocator = new RootAllocator(); + ArrowFileReader readerForDemoData = new ArrowFileReader( + new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel( + Files.readAllBytes(uriRead))), allocator) + ) { + Path uriWrite = Files.createTempDirectory("parquet_"); + // write data for new parquet file + DatasetFileWriter.write(allocator, readerForDemoData, FileFormat.PARQUET, uriWrite.toUri().toString()); + // validate data of parquet file just created + ScanOptions options = new ScanOptions(/*batchSize*/ 32768); + try ( + DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, + NativeMemoryPool.getDefault(), FileFormat.PARQUET, uriWrite.toUri().toString()); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader readerForFileCreated = scanner.scanBatches() + ) { + while (readerForFileCreated.loadNextBatch()) { + System.out.print(readerForFileCreated.getVectorSchemaRoot().contentToTSVString()); + System.out.println("RowCount: " + readerForFileCreated.getVectorSchemaRoot().getRowCount()); + } + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + // delete temporary parquet file created + try (DirectoryStream dir = Files.newDirectoryStream(uriWrite)) { + uriWrite.toFile().deleteOnExit(); + for (Path path : dir) { + path.toFile().deleteOnExit(); + } + } + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + +.. testoutput:: + + name age + David 10 + Gladis 20 + Juan 30 + RowCount: 3 + name age + Nidia 15 + Alexa 20 + Mara 15 + RowCount: 3 + name age + Raul 34 + Jhon 29 + Thomy 33 + RowCount: 3 \ No newline at end of file diff --git a/java/source/demo/pom.xml b/java/source/demo/pom.xml index 4f844ea8..38b1fa50 100644 --- a/java/source/demo/pom.xml +++ b/java/source/demo/pom.xml @@ -107,5 +107,10 @@ core 0.11.0 + + ch.qos.logback + logback-classic + 1.2.11 + diff --git a/java/source/io.rst b/java/source/io.rst index 74f74d15..7b66fd67 100644 --- a/java/source/io.rst +++ b/java/source/io.rst @@ -579,3 +579,95 @@ Reading and writing dictionary-encoded data requires separately tracking the dic Dictionary-encoded data recovered: [0, 3, 4, 5, 7] Dictionary recovered: Dictionary DictionaryEncoding[id=666,ordered=false,indexType=Int(8, true)] [Andorra, Cuba, Grecia, Guinea, Islandia, Malta, Tailandia, Uganda, Yemen, Zambia] Decoded data: [Andorra, Guinea, Islandia, Malta, Uganda] + +Customize Logic to Read Dataset +=============================== + +If you need to implement a custom dataset reader, consider extending `ArrowReader`_ class. + +The ArrowReader class can be extended as follows: + +1. Write the logic to read schema on ``readSchema()``. +2. If you do not want to define a logic for reading the schema, then you will also need to override ``getVectorSchemaRoot()``. +3. Once (1) or (2) have been completed, you can proceed to ``loadNextBatch()``. +4. At the end don’t forget to define the logic to ``closeReadSource()``. +5. Make sure you define the logic for closing the ``closeReadSource()`` at the end. + +For example, let's create a custom JDBCReader reader. + +.. code-block:: java + + import java.io.IOException; + + import org.apache.arrow.adapter.jdbc.ArrowVectorIterator; + import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig; + import org.apache.arrow.memory.BufferAllocator; + import org.apache.arrow.vector.VectorSchemaRoot; + import org.apache.arrow.vector.ipc.ArrowReader; + import org.apache.arrow.vector.types.pojo.Schema; + + class JDBCReader extends ArrowReader { + private final ArrowVectorIterator iter; + private final JdbcToArrowConfig config; + private VectorSchemaRoot root; + private boolean firstRoot = true; + + public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter, JdbcToArrowConfig config) { + super(allocator); + this.iter = iter; + this.config = config; + } + + @Override + public boolean loadNextBatch() throws IOException { + if (firstRoot) { + firstRoot = false; + return true; + } + else { + if (iter.hasNext()) { + if (root != null && !config.isReuseVectorSchemaRoot()) { + root.close(); + } + else { + root.allocateNew(); + } + root = iter.next(); + return root.getRowCount() != 0; + } + else { + return false; + } + } + } + + @Override + public long bytesRead() { + return 0; + } + + @Override + protected void closeReadSource() throws IOException { + if (root != null && !config.isReuseVectorSchemaRoot()) { + root.close(); + } + } + + @Override + protected Schema readSchema() throws IOException { + return null; + } + + @Override + public VectorSchemaRoot getVectorSchemaRoot() throws IOException { + if (root == null) { + root = iter.next(); + } + return root; + } + } + + + + +.. _`ArrowReader`: https://arrow.apache.org/docs/java/reference/org/apache/arrow/vector/ipc/ArrowReader.html diff --git a/java/source/jdbc.rst b/java/source/jdbc.rst index 2242861b..81ffeef8 100644 --- a/java/source/jdbc.rst +++ b/java/source/jdbc.rst @@ -311,8 +311,7 @@ values to the given scale. Write ResultSet to Parquet File =============================== -In this example, we have the JDBC adapter result and trying to write them -into a parquet file. +As an example, we are trying to write a parquet file from the JDBC adapter results. .. testcode:: @@ -345,42 +344,48 @@ into a parquet file. import org.apache.arrow.dataset.source.DatasetFactory; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; - import org.apache.arrow.vector.VectorLoader; import org.apache.arrow.vector.VectorSchemaRoot; - import org.apache.arrow.vector.VectorUnloader; import org.apache.arrow.vector.ipc.ArrowReader; - import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.ibatis.jdbc.ScriptRunner; + import org.slf4j.LoggerFactory; + + import ch.qos.logback.classic.Level; + import ch.qos.logback.classic.Logger; class JDBCReader extends ArrowReader { private final ArrowVectorIterator iter; - private final Schema schema; + private final JdbcToArrowConfig config; + private VectorSchemaRoot root; + private boolean firstRoot = true; - public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter, Schema schema) { + public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter, JdbcToArrowConfig config) { super(allocator); this.iter = iter; - this.schema = schema; + this.config = config; } @Override public boolean loadNextBatch() throws IOException { - while (iter.hasNext()) { - try (VectorSchemaRoot rootTmp = iter.next()) { - if (rootTmp.getRowCount() > 0) { - VectorUnloader unloader = new VectorUnloader(rootTmp); - VectorLoader loader = new VectorLoader(super.getVectorSchemaRoot()); - try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) { - loader.load(recordBatch); - } - return true; + if (firstRoot) { + firstRoot = false; + return true; + } + else { + if (iter.hasNext()) { + if (root != null && !config.isReuseVectorSchemaRoot()) { + root.close(); } else { - return false; + root.allocateNew(); } + root = iter.next(); + return root.getRowCount() != 0; + } + else { + return false; } } - return false; } @Override @@ -390,15 +395,32 @@ into a parquet file. @Override protected void closeReadSource() throws IOException { + if (root != null && !config.isReuseVectorSchemaRoot()) { + root.close(); + } } @Override - protected Schema readSchema() { - return schema; + protected Schema readSchema() throws IOException { + return null; + } + + @Override + public VectorSchemaRoot getVectorSchemaRoot() throws IOException { + if (root == null) { + root = iter.next(); + } + return root; } } - final BufferAllocator allocator = new RootAllocator(); + + ((Logger) LoggerFactory.getLogger("org.apache.arrow")).setLevel(Level.TRACE); try ( + final BufferAllocator allocator = new RootAllocator(); + final BufferAllocator allocatorJDBC = allocator.newChildAllocator("allocatorJDBC", 0, Long.MAX_VALUE); + final BufferAllocator allocatorReader = allocator.newChildAllocator("allocatorReader", 0, Long.MAX_VALUE); + final BufferAllocator allocatorParquetWrite = allocator.newChildAllocator("allocatorParquetWrite", 0, + Long.MAX_VALUE); final Connection connection = DriverManager.getConnection( "jdbc:h2:mem:h2-jdbc-adapter") ) { @@ -408,11 +430,12 @@ into a parquet file. new FileReader("./thirdpartydeps/jdbc/h2-ddl.sql"))); runnerDDLDML.runScript(new BufferedReader( new FileReader("./thirdpartydeps/jdbc/h2-dml.sql"))); - JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocator, + JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocatorJDBC, JdbcToArrowUtils.getUtcCalendar()) .setTargetBatchSize(2) + .setReuseVectorSchemaRoot(true) .setArraySubTypeByColumnNameMap( - new HashMap() {{ + new HashMap<>() {{ put("LIST_FIELD19", new JdbcFieldInfo(Types.INTEGER)); }} @@ -421,18 +444,16 @@ into a parquet file. String query = "SELECT int_field1, bool_field2, bigint_field5, char_field16, list_field19 FROM TABLE1"; try ( final ResultSet resultSetConvertToParquet = connection.createStatement().executeQuery(query); - final ResultSet resultSetForSchema = connection.createStatement().executeQuery(query); final ArrowVectorIterator arrowVectorIterator = JdbcToArrow.sqlToArrowVectorIterator( resultSetConvertToParquet, config) ) { - Schema schema = JdbcToArrow.sqlToArrowVectorIterator(resultSetForSchema, config).next().getSchema(); Path uri = Files.createTempDirectory("parquet_"); try ( // get jdbc row data as a arrow reader - final JDBCReader arrowReader = new JDBCReader(allocator, arrowVectorIterator, schema) + final JDBCReader arrowReader = new JDBCReader(allocatorReader, arrowVectorIterator, config) ) { // write arrow reader to parqueet file - DatasetFileWriter.write(allocator, arrowReader, FileFormat.PARQUET, uri.toUri().toString()); + DatasetFileWriter.write(allocatorParquetWrite, arrowReader, FileFormat.PARQUET, uri.toUri().toString()); } // validate data of parquet file created ScanOptions options = new ScanOptions(/*batchSize*/ 32768); @@ -445,6 +466,7 @@ into a parquet file. ) { while (reader.loadNextBatch()) { System.out.print(reader.getVectorSchemaRoot().contentToTSVString()); + System.out.println("RowCount: " + reader.getVectorSchemaRoot().getRowCount()); } } catch (Exception e) { e.printStackTrace(); @@ -469,5 +491,7 @@ into a parquet file. INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19 101 true 1000000000300 some char text [1,2,3] 102 true 100000000030 some char text [1,2] + RowCount: 2 INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19 103 true 10000000003 some char text [1] + RowCount: 1 diff --git a/java/source/substrait.rst b/java/source/substrait.rst index ee87371f..750955ca 100644 --- a/java/source/substrait.rst +++ b/java/source/substrait.rst @@ -63,7 +63,7 @@ Here is an example of a Java program that queries a Parquet file: import java.util.HashMap; import java.util.Map; - static Plan queryTableNation() throws SqlParseException { + Plan queryTableNation() throws SqlParseException { String sql = "SELECT * FROM NATION WHERE N_NATIONKEY = 17"; String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " + "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))"; @@ -72,7 +72,7 @@ Here is an example of a Java program that queries a Parquet file: return plan; } - static void queryDatasetThruSubstraitPlanDefinition() { + void queryDatasetThruSubstraitPlanDefinition() { String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet"; ScanOptions options = new ScanOptions(/*batchSize*/ 32768); try ( @@ -135,7 +135,7 @@ For example, we can join the nation and customer tables from the TPC-H benchmark import java.util.HashMap; import java.util.Map; - static Plan queryTableNationJoinCustomer() throws SqlParseException { + Plan queryTableNationJoinCustomer() throws SqlParseException { String sql = "SELECT n.n_name, COUNT(*) AS NUMBER_CUSTOMER FROM NATION n JOIN CUSTOMER c " + "ON n.n_nationkey = c.c_nationkey WHERE n.n_nationkey = 17 " + "GROUP BY n.n_name"; @@ -151,7 +151,7 @@ For example, we can join the nation and customer tables from the TPC-H benchmark return plan; } - static void queryTwoDatasetsThruSubstraitPlanDefinition() { + void queryTwoDatasetsThruSubstraitPlanDefinition() { String uriNation = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet"; String uriCustomer = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/customer.parquet"; ScanOptions options = new ScanOptions(/*batchSize*/ 32768); From 95edea45415503f32f0428c520846c89ab920443 Mon Sep 17 00:00:00 2001 From: david dali susanibar arce Date: Mon, 28 Aug 2023 18:11:58 -0500 Subject: [PATCH 3/4] fix: code review --- java/source/dataset.rst | 88 -------------------- java/source/io.rst | 172 +++++++++++++++++++++------------------- 2 files changed, 92 insertions(+), 168 deletions(-) diff --git a/java/source/dataset.rst b/java/source/dataset.rst index 857956ce..2ac3fa77 100644 --- a/java/source/dataset.rst +++ b/java/source/dataset.rst @@ -534,91 +534,3 @@ Let's read a CSV file. Total batch size: 3 .. _Arrow Java Dataset: https://arrow.apache.org/docs/dev/java/dataset.html - - -Write Parquet Files -=================== - -Let's read an Arrow file and populate that data into a Parquet file. - -.. testcode:: - - import java.io.IOException; - import java.nio.file.DirectoryStream; - import java.nio.file.Files; - import java.nio.file.Path; - import java.nio.file.Paths; - - import org.apache.arrow.dataset.file.DatasetFileWriter; - import org.apache.arrow.dataset.file.FileFormat; - import org.apache.arrow.dataset.file.FileSystemDatasetFactory; - import org.apache.arrow.dataset.jni.NativeMemoryPool; - import org.apache.arrow.dataset.scanner.ScanOptions; - import org.apache.arrow.dataset.scanner.Scanner; - import org.apache.arrow.dataset.source.Dataset; - import org.apache.arrow.dataset.source.DatasetFactory; - import org.apache.arrow.memory.BufferAllocator; - import org.apache.arrow.memory.RootAllocator; - import org.apache.arrow.vector.ipc.ArrowFileReader; - import org.apache.arrow.vector.ipc.ArrowReader; - import org.apache.arrow.vector.ipc.SeekableReadChannel; - import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; - - // read arrow demo data - Path uriRead = Paths.get("./thirdpartydeps/arrowfiles/random_access.arrow"); - try ( - BufferAllocator allocator = new RootAllocator(); - ArrowFileReader readerForDemoData = new ArrowFileReader( - new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel( - Files.readAllBytes(uriRead))), allocator) - ) { - Path uriWrite = Files.createTempDirectory("parquet_"); - // write data for new parquet file - DatasetFileWriter.write(allocator, readerForDemoData, FileFormat.PARQUET, uriWrite.toUri().toString()); - // validate data of parquet file just created - ScanOptions options = new ScanOptions(/*batchSize*/ 32768); - try ( - DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, - NativeMemoryPool.getDefault(), FileFormat.PARQUET, uriWrite.toUri().toString()); - Dataset dataset = datasetFactory.finish(); - Scanner scanner = dataset.newScan(options); - ArrowReader readerForFileCreated = scanner.scanBatches() - ) { - while (readerForFileCreated.loadNextBatch()) { - System.out.print(readerForFileCreated.getVectorSchemaRoot().contentToTSVString()); - System.out.println("RowCount: " + readerForFileCreated.getVectorSchemaRoot().getRowCount()); - } - } catch (Exception e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - // delete temporary parquet file created - try (DirectoryStream dir = Files.newDirectoryStream(uriWrite)) { - uriWrite.toFile().deleteOnExit(); - for (Path path : dir) { - path.toFile().deleteOnExit(); - } - } - } catch (IOException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - -.. testoutput:: - - name age - David 10 - Gladis 20 - Juan 30 - RowCount: 3 - name age - Nidia 15 - Alexa 20 - Mara 15 - RowCount: 3 - name age - Raul 34 - Jhon 29 - Thomy 33 - RowCount: 3 \ No newline at end of file diff --git a/java/source/io.rst b/java/source/io.rst index 7b66fd67..35a9502c 100644 --- a/java/source/io.rst +++ b/java/source/io.rst @@ -263,6 +263,93 @@ Write - Out to Buffer Number of rows written: 3 +Write Parquet Files +******************* + +Let's read an Arrow file and populate that data into a Parquet file. + +.. testcode:: + + import java.io.IOException; + import java.nio.file.DirectoryStream; + import java.nio.file.Files; + import java.nio.file.Path; + import java.nio.file.Paths; + + import org.apache.arrow.dataset.file.DatasetFileWriter; + import org.apache.arrow.dataset.file.FileFormat; + import org.apache.arrow.dataset.file.FileSystemDatasetFactory; + import org.apache.arrow.dataset.jni.NativeMemoryPool; + import org.apache.arrow.dataset.scanner.ScanOptions; + import org.apache.arrow.dataset.scanner.Scanner; + import org.apache.arrow.dataset.source.Dataset; + import org.apache.arrow.dataset.source.DatasetFactory; + import org.apache.arrow.memory.BufferAllocator; + import org.apache.arrow.memory.RootAllocator; + import org.apache.arrow.vector.ipc.ArrowFileReader; + import org.apache.arrow.vector.ipc.ArrowReader; + import org.apache.arrow.vector.ipc.SeekableReadChannel; + import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; + + // read arrow demo data + Path uriRead = Paths.get("./thirdpartydeps/arrowfiles/random_access.arrow"); + try ( + BufferAllocator allocator = new RootAllocator(); + ArrowFileReader readerForDemoData = new ArrowFileReader( + new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel( + Files.readAllBytes(uriRead))), allocator) + ) { + Path uriWrite = Files.createTempDirectory("parquet_"); + // write data for new parquet file + DatasetFileWriter.write(allocator, readerForDemoData, FileFormat.PARQUET, uriWrite.toUri().toString()); + // validate data of parquet file just created + ScanOptions options = new ScanOptions(/*batchSize*/ 32768); + try ( + DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, + NativeMemoryPool.getDefault(), FileFormat.PARQUET, uriWrite.toUri().toString()); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader readerForFileCreated = scanner.scanBatches() + ) { + while (readerForFileCreated.loadNextBatch()) { + System.out.print(readerForFileCreated.getVectorSchemaRoot().contentToTSVString()); + System.out.println("RowCount: " + readerForFileCreated.getVectorSchemaRoot().getRowCount()); + } + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + // delete temporary parquet file created + try (DirectoryStream dir = Files.newDirectoryStream(uriWrite)) { + uriWrite.toFile().deleteOnExit(); + for (Path path : dir) { + path.toFile().deleteOnExit(); + } + } + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + +.. testoutput:: + + name age + David 10 + Gladis 20 + Juan 30 + RowCount: 3 + name age + Nidia 15 + Alexa 20 + Mara 15 + RowCount: 3 + name age + Raul 34 + Jhon 29 + Thomy 33 + RowCount: 3 + Reading ======= @@ -461,8 +548,8 @@ Reading Parquet File Please check :doc:`Dataset <./dataset>` -Handling Data with Dictionaries -******************************* +Reading Data with Dictionaries +****************************** Reading and writing dictionary-encoded data requires separately tracking the dictionaries. @@ -580,8 +667,8 @@ Reading and writing dictionary-encoded data requires separately tracking the dic Dictionary recovered: Dictionary DictionaryEncoding[id=666,ordered=false,indexType=Int(8, true)] [Andorra, Cuba, Grecia, Guinea, Islandia, Malta, Tailandia, Uganda, Yemen, Zambia] Decoded data: [Andorra, Guinea, Islandia, Malta, Uganda] -Customize Logic to Read Dataset -=============================== +Reading Custom Dataset +********************** If you need to implement a custom dataset reader, consider extending `ArrowReader`_ class. @@ -593,81 +680,6 @@ The ArrowReader class can be extended as follows: 4. At the end don’t forget to define the logic to ``closeReadSource()``. 5. Make sure you define the logic for closing the ``closeReadSource()`` at the end. -For example, let's create a custom JDBCReader reader. - -.. code-block:: java - - import java.io.IOException; - - import org.apache.arrow.adapter.jdbc.ArrowVectorIterator; - import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig; - import org.apache.arrow.memory.BufferAllocator; - import org.apache.arrow.vector.VectorSchemaRoot; - import org.apache.arrow.vector.ipc.ArrowReader; - import org.apache.arrow.vector.types.pojo.Schema; - - class JDBCReader extends ArrowReader { - private final ArrowVectorIterator iter; - private final JdbcToArrowConfig config; - private VectorSchemaRoot root; - private boolean firstRoot = true; - - public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter, JdbcToArrowConfig config) { - super(allocator); - this.iter = iter; - this.config = config; - } - - @Override - public boolean loadNextBatch() throws IOException { - if (firstRoot) { - firstRoot = false; - return true; - } - else { - if (iter.hasNext()) { - if (root != null && !config.isReuseVectorSchemaRoot()) { - root.close(); - } - else { - root.allocateNew(); - } - root = iter.next(); - return root.getRowCount() != 0; - } - else { - return false; - } - } - } - - @Override - public long bytesRead() { - return 0; - } - - @Override - protected void closeReadSource() throws IOException { - if (root != null && !config.isReuseVectorSchemaRoot()) { - root.close(); - } - } - - @Override - protected Schema readSchema() throws IOException { - return null; - } - - @Override - public VectorSchemaRoot getVectorSchemaRoot() throws IOException { - if (root == null) { - root = iter.next(); - } - return root; - } - } - - - +You could see and example of custom JDBC Reader at :doc:`Write ResultSet to Parquet File <./jdbc>` .. _`ArrowReader`: https://arrow.apache.org/docs/java/reference/org/apache/arrow/vector/ipc/ArrowReader.html From 991f40b14c0d4186a5a91266934cfa3e21864848 Mon Sep 17 00:00:00 2001 From: david dali susanibar arce Date: Thu, 14 Sep 2023 10:28:46 -0500 Subject: [PATCH 4/4] fix: code review --- java/source/io.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/source/io.rst b/java/source/io.rst index 35a9502c..d9253107 100644 --- a/java/source/io.rst +++ b/java/source/io.rst @@ -291,7 +291,7 @@ Let's read an Arrow file and populate that data into a Parquet file. import org.apache.arrow.vector.ipc.SeekableReadChannel; import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; - // read arrow demo data + // read arrow demo data: Three row groups each consisting of three rows Path uriRead = Paths.get("./thirdpartydeps/arrowfiles/random_access.arrow"); try ( BufferAllocator allocator = new RootAllocator();