diff --git a/.github/workflows/cpp_integration.yml b/.github/workflows/cpp_integration.yml index 804f98c8208..791fb1e0900 100644 --- a/.github/workflows/cpp_integration.yml +++ b/.github/workflows/cpp_integration.yml @@ -126,4 +126,25 @@ jobs: test-compile exec:java \ -Dexec.classpathScope="test" \ -Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.CppWriteJavaReadTestWithZSTD" \ + -Dexec.args="-XX:MaxDirectMemorySize=2G" + - name: Run Cpp-MergeWrite Java-Read Hybrid Integration Test (NONE Compression) + run: | + build/mvn -pl worker \ + test-compile exec:java \ + -Dexec.classpathScope="test" \ + -Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.CppMergeWriteJavaReadTestWithNONE" \ + -Dexec.args="-XX:MaxDirectMemorySize=2G" + - name: Run Cpp-MergeWrite Java-Read Hybrid Integration Test (LZ4 Compression) + run: | + build/mvn -pl worker \ + test-compile exec:java \ + -Dexec.classpathScope="test" \ + -Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.CppMergeWriteJavaReadTestWithLZ4" \ + -Dexec.args="-XX:MaxDirectMemorySize=2G" + - name: Run Cpp-MergeWrite Java-Read Hybrid Integration Test (ZSTD Compression) + run: | + build/mvn -pl worker \ + test-compile exec:java \ + -Dexec.classpathScope="test" \ + -Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.CppMergeWriteJavaReadTestWithZSTD" \ -Dexec.args="-XX:MaxDirectMemorySize=2G" \ No newline at end of file diff --git a/cpp/celeborn/tests/CMakeLists.txt b/cpp/celeborn/tests/CMakeLists.txt index 104607ce2f0..1114ed2d466 100644 --- a/cpp/celeborn/tests/CMakeLists.txt +++ b/cpp/celeborn/tests/CMakeLists.txt @@ -61,3 +61,28 @@ target_link_libraries( add_executable(cppDataSumWithWriterClient DataSumWithWriterClient.cpp) target_link_libraries(cppDataSumWithWriterClient dataSumWithWriterClient) + +add_library( + dataSumWithMergeWriterClient + DataSumWithMergeWriterClient.cpp) + +target_link_libraries( + dataSumWithMergeWriterClient + memory + utils + conf + proto + network + protocol + client + ${WANGLE} + ${FIZZ} + ${LIBSODIUM_LIBRARY} + ${FOLLY_WITH_DEPENDENCIES} + ${GLOG} + ${GFLAGS_LIBRARIES} +) + +add_executable(cppDataSumWithMergeWriterClient DataSumWithMergeWriterClient.cpp) + +target_link_libraries(cppDataSumWithMergeWriterClient dataSumWithMergeWriterClient) \ No newline at end of file diff --git a/cpp/celeborn/tests/DataSumWithMergeWriterClient.cpp b/cpp/celeborn/tests/DataSumWithMergeWriterClient.cpp new file mode 100644 index 00000000000..abb6925d9f7 --- /dev/null +++ b/cpp/celeborn/tests/DataSumWithMergeWriterClient.cpp @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include + +int main(int argc, char** argv) { + folly::init(&argc, &argv, false); + assert(argc == 10); + std::string lifecycleManagerHost = argv[1]; + int lifecycleManagerPort = std::atoi(argv[2]); + std::string appUniqueId = argv[3]; + int shuffleId = std::atoi(argv[4]); + int attemptId = std::atoi(argv[5]); + int numMappers = std::atoi(argv[6]); + int numPartitions = std::atoi(argv[7]); + std::string resultFile = argv[8]; + std::string compressCodec = argv[9]; + std::cout << "lifecycleManagerHost = " << lifecycleManagerHost + << ", lifecycleManagerPort = " << lifecycleManagerPort + << ", appUniqueId = " << appUniqueId + << ", shuffleId = " << shuffleId << ", attemptId = " << attemptId + << ", numMappers = " << numMappers + << ", numPartitions = " << numPartitions + << ", resultFile = " << resultFile + << ", compressCodec = " << compressCodec << std::endl; + + auto conf = std::make_shared(); + conf->registerProperty( + celeborn::conf::CelebornConf::kShuffleCompressionCodec, compressCodec); + auto clientEndpoint = + std::make_shared(conf); + auto shuffleClient = celeborn::client::ShuffleClientImpl::create( + appUniqueId, conf, *clientEndpoint); + shuffleClient->setupLifecycleManagerRef( + lifecycleManagerHost, lifecycleManagerPort); + + long maxData = 1000000; + size_t numData = 1000; + std::vector result(numPartitions, 0); + std::vector dataCnt(numPartitions, 0); + for (int mapId = 0; mapId < numMappers; mapId++) { + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + for (size_t i = 0; i < numData; i++) { + int data = std::rand() % maxData; + result[partitionId] += data; + dataCnt[partitionId]++; + std::string dataStr = "-" + std::to_string(data); + shuffleClient->mergeData( + shuffleId, + mapId, + attemptId, + partitionId, + reinterpret_cast(dataStr.c_str()), + 0, + dataStr.size(), + numMappers, + numPartitions); + } + } + shuffleClient->pushMergedData(shuffleId, mapId, attemptId); + shuffleClient->mapperEnd(shuffleId, mapId, attemptId, numMappers); + } + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + std::cout << "partition " << partitionId + << " sum result = " << result[partitionId] + << ", dataCnt = " << dataCnt[partitionId] << std::endl; + } + + remove(resultFile.c_str()); + std::ofstream of(resultFile); + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + of << result[partitionId] << std::endl; + } + of.close(); + + return 0; +} \ No newline at end of file diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppMergeWriteJavaReadTestWithLZ4.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppMergeWriteJavaReadTestWithLZ4.scala new file mode 100644 index 00000000000..ae7bf3c06fd --- /dev/null +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppMergeWriteJavaReadTestWithLZ4.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.cluster + +import org.apache.celeborn.common.protocol.CompressionCodec + +object CppMergeWriteJavaReadTestWithLZ4 extends JavaCppHybridReadWriteTestBase { + + def main(args: Array[String]) = { + testCppMergeWriteJavaRead(CompressionCodec.LZ4) + } +} diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppMergeWriteJavaReadTestWithNONE.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppMergeWriteJavaReadTestWithNONE.scala new file mode 100644 index 00000000000..8fa6df28793 --- /dev/null +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppMergeWriteJavaReadTestWithNONE.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.cluster + +import org.apache.celeborn.common.protocol.CompressionCodec + +object CppMergeWriteJavaReadTestWithNONE extends JavaCppHybridReadWriteTestBase { + + def main(args: Array[String]) = { + testCppMergeWriteJavaRead(CompressionCodec.NONE) + } +} diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppMergeWriteJavaReadTestWithZSTD.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppMergeWriteJavaReadTestWithZSTD.scala new file mode 100644 index 00000000000..91da455dde4 --- /dev/null +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/CppMergeWriteJavaReadTestWithZSTD.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.cluster + +import org.apache.celeborn.common.protocol.CompressionCodec + +object CppMergeWriteJavaReadTestWithZSTD extends JavaCppHybridReadWriteTestBase { + + def main(args: Array[String]) = { + testCppMergeWriteJavaRead(CompressionCodec.ZSTD) + } +} diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaCppHybridReadWriteTestBase.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaCppHybridReadWriteTestBase.scala index 621841826c1..73bb7309907 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaCppHybridReadWriteTestBase.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaCppHybridReadWriteTestBase.scala @@ -244,4 +244,95 @@ trait JavaCppHybridReadWriteTestBase extends AnyFunSuite shuffleClient.shutdown() } + def testCppMergeWriteJavaRead(codec: CompressionCodec): Unit = { + beforeAll() + try { + runCppMergeWriteJavaRead(codec) + } finally { + afterAll() + } + } + + def runCppMergeWriteJavaRead(codec: CompressionCodec): Unit = { + val appUniqueId = "test-app" + val shuffleId = 0 + val attemptId = 0 + + val clientConf = new CelebornConf() + .set(CelebornConf.MASTER_ENDPOINTS.key, s"localhost:$masterPort") + .set(CelebornConf.SHUFFLE_COMPRESSION_CODEC.key, codec.name) + .set(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key, "true") + .set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K") + .set(CelebornConf.READ_LOCAL_SHUFFLE_FILE, false) + .set("celeborn.data.io.numConnectionsPerPeer", "1") + val lifecycleManager = new LifecycleManager(appUniqueId, clientConf) + + val shuffleClient = + new ShuffleClientImpl(appUniqueId, clientConf, UserIdentifier("mock", "mock")) + shuffleClient.setupLifecycleManagerRef(lifecycleManager.self) + + val numMappers = 2 + val numPartitions = 2 + + val cppResultFile = "/tmp/celeborn-cpp-merge-writer-result.txt" + val lifecycleManagerHost = lifecycleManager.getHost + val lifecycleManagerPort = lifecycleManager.getPort + val projectDirectory = new File(new File(".").getAbsolutePath) + val cppBinRelativeDirectory = "cpp/build/celeborn/tests/" + val cppBinFileName = "cppDataSumWithMergeWriterClient" + val cppBinFilePath = s"$projectDirectory/$cppBinRelativeDirectory/$cppBinFileName" + val cppCodec = codec.name() + val command = { + s"$cppBinFilePath $lifecycleManagerHost $lifecycleManagerPort $appUniqueId $shuffleId $attemptId $numMappers $numPartitions $cppResultFile $cppCodec" + } + println(s"run command: $command") + val commandOutput = runCommand(command) + println(s"command output: $commandOutput") + + val metricsCallback = new MetricsCallback { + override def incBytesRead(bytesWritten: Long): Unit = {} + override def incReadTime(time: Long): Unit = {} + } + + var sums = new util.ArrayList[Long](numPartitions) + for (partitionId <- 0 until numPartitions) { + sums.add(0) + val inputStream = shuffleClient.readPartition( + shuffleId, + partitionId, + attemptId, + 0, + 0, + Integer.MAX_VALUE, + metricsCallback) + var c = inputStream.read() + var data: Long = 0 + var dataCnt = 0 + while (c != -1) { + if (c == '-') { + sums.set(partitionId, sums.get(partitionId) + data) + data = 0 + dataCnt += 1 + } else { + assert(c >= '0' && c <= '9') + data *= 10 + data += c - '0' + } + c = inputStream.read() + } + sums.set(partitionId, sums.get(partitionId) + data) + println(s"partition $partitionId sum result = ${sums.get(partitionId)}, dataCnt = $dataCnt") + } + + var lineCount = 0 + for (line <- Source.fromFile(cppResultFile, "utf-8").getLines.toList) { + val data = line.toLong + Assert.assertEquals(data, sums.get(lineCount)) + lineCount += 1 + } + Assert.assertEquals(lineCount, numPartitions) + lifecycleManager.stop() + shuffleClient.shutdown() + } + }