From 50e9d71351216642f4e3b089da0c560d9631e126 Mon Sep 17 00:00:00 2001 From: Luca Terracciano Date: Tue, 7 Oct 2025 18:52:34 +0200 Subject: [PATCH 1/5] feat: variable size SPSC channel take into account payloads - the payload is not split into chunks. We require the user to provide a bigger buffer and we use part of it as excess to store entire tokens. The channel logic remains unchanged and it allows you to push only MAX_TOKENS * TOKEN_SIZE - channels take into account both how many tokens are in the channel and the available space - extended the isFull function to take a size_t and compute whether the channel is full - add extensive testing --- .../mpsc/nonlocking/include/producer.hpp | 2 +- .../variableSize/spsc/include/consumer.hpp | 4 +- .../variableSize/mpsc/nonlocking/consumer.hpp | 4 +- .../channel/variableSize/spsc/consumer.hpp | 31 ++- .../channel/variableSize/spsc/producer.hpp | 143 ++++++------ tests/frontends/channel/meson.build | 1 + .../channel/variableSize/meson.build | 1 + .../spsc/include/channelFixture.hpp | 38 ++++ .../channel/variableSize/spsc/mainMPI.cpp | 31 +++ .../channel/variableSize/spsc/meson.build | 9 + .../variableSize/spsc/source/common.hpp | 68 ++++++ .../variableSize/spsc/source/consumer.hpp | 206 ++++++++++++++++++ .../variableSize/spsc/source/fillBuffers.cpp | 60 +++++ .../variableSize/spsc/source/meson.build | 21 ++ .../variableSize/spsc/source/producer.hpp | 162 ++++++++++++++ 15 files changed, 705 insertions(+), 76 deletions(-) create mode 100644 tests/frontends/channel/variableSize/meson.build create mode 100644 tests/frontends/channel/variableSize/spsc/include/channelFixture.hpp create mode 100644 tests/frontends/channel/variableSize/spsc/mainMPI.cpp create mode 100644 tests/frontends/channel/variableSize/spsc/meson.build create mode 100644 tests/frontends/channel/variableSize/spsc/source/common.hpp create mode 100644 tests/frontends/channel/variableSize/spsc/source/consumer.hpp create mode 100644 tests/frontends/channel/variableSize/spsc/source/fillBuffers.cpp create mode 100644 tests/frontends/channel/variableSize/spsc/source/meson.build create mode 100644 tests/frontends/channel/variableSize/spsc/source/producer.hpp diff --git a/examples/channels/variableSize/mpsc/nonlocking/include/producer.hpp b/examples/channels/variableSize/mpsc/nonlocking/include/producer.hpp index 74bb6830..2b4dc020 100644 --- a/examples/channels/variableSize/mpsc/nonlocking/include/producer.hpp +++ b/examples/channels/variableSize/mpsc/nonlocking/include/producer.hpp @@ -129,7 +129,7 @@ void producerFc(HiCR::MemoryManager &coordinationMemoryManager, // is the message sizes buffer full? // Note: it might be necessary sometimes to also check the payload buffers // as they might overflow independently of the message size buffers - while (producer.isFull()) { producer.updateDepth(); } + while (producer.isFull(nextElemSize)) { producer.updateDepth(); } producer.push(sendSlot); Printer::printBytes(prefix, elements[i].first, payloadCapacity, 0, nextElemSize); diff --git a/examples/channels/variableSize/spsc/include/consumer.hpp b/examples/channels/variableSize/spsc/include/consumer.hpp index 9bd079fe..8b4ee68f 100644 --- a/examples/channels/variableSize/spsc/include/consumer.hpp +++ b/examples/channels/variableSize/spsc/include/consumer.hpp @@ -36,7 +36,7 @@ void consumerFc(HiCR::MemoryManager &coordinationMemoryManager, auto sizesBufferSlot = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, sizesBufferSize); // Allocating payload buffer as a local memory slot - auto payloadBufferSlot = payloadMemoryManager.allocateLocalMemorySlot(payloadMemorySpace, PAYLOAD_CAPACITY); + auto payloadBufferSlot = payloadMemoryManager.allocateLocalMemorySlot(payloadMemorySpace, PAYLOAD_CAPACITY * 2); // Getting required buffer size auto coordinationBufferSize = HiCR::channel::variableSize::Base::getCoordinationBufferSize(); @@ -86,7 +86,7 @@ void consumerFc(HiCR::MemoryManager &coordinationMemoryManager, channelCapacity); // Getting a single value from the channel - while (consumer.getDepth() != 1) consumer.updateDepth(); + while (consumer.getCoordinationDepth() != 1) consumer.updateDepth(); // Getting internal pointer of the token buffer slot auto payloadBufferPtr = (ELEMENT_TYPE *)payloadBufferSlot->getPointer(); diff --git a/include/hicr/frontends/channel/variableSize/mpsc/nonlocking/consumer.hpp b/include/hicr/frontends/channel/variableSize/mpsc/nonlocking/consumer.hpp index 31b2c291..4af63b9e 100644 --- a/include/hicr/frontends/channel/variableSize/mpsc/nonlocking/consumer.hpp +++ b/include/hicr/frontends/channel/variableSize/mpsc/nonlocking/consumer.hpp @@ -104,7 +104,7 @@ class Consumer * immediately upon creation of the SPSC channel. Therefore we do not reset * _depths to zero, and check for "early" received messages */ - _depths.push_back(consumerPtr->getDepth()); + _depths.push_back(consumerPtr->getCoordinationDepth()); for (size_t j = 0; j < _depths.back(); j++) { _channelPushes.push(i); } } } @@ -222,7 +222,7 @@ class Consumer for (size_t i = 0; i < _spscList.size(); i++) { _spscList[i]->updateDepth(); - newDepths[i] = _spscList[i]->getDepth(); + newDepths[i] = _spscList[i]->getCoordinationDepth(); } for (size_t i = 0; i < _spscList.size(); i++) diff --git a/include/hicr/frontends/channel/variableSize/spsc/consumer.hpp b/include/hicr/frontends/channel/variableSize/spsc/consumer.hpp index d04b20ea..715f6f5b 100644 --- a/include/hicr/frontends/channel/variableSize/spsc/consumer.hpp +++ b/include/hicr/frontends/channel/variableSize/spsc/consumer.hpp @@ -243,23 +243,28 @@ class Consumer final : public variableSize::Base * receiving the message counts (phase 2), returning this depth should guarantee * we already have received the payloads * - * \note This is not a thread-safe call * * This is a getter function that should complete in \f$ \Theta(1) \f$ time. * * @return The number of elements in variable-size consumer channel + * + * \note This is not a thread-safe call + * \note Even though there might be space for additional tokens in the coordination buffer, it is not guaranteed that + * the push() will succeed due to insufficient space in the payload buffer */ - size_t getDepth() { return getCircularBufferForCounts()->getDepth(); } + size_t getCoordinationDepth() { return getCircularBufferForCounts()->getDepth(); } /** * Returns the current depth of the channel holding the payloads * - * \note This is not a thread-safe call * * This is a getter function that should complete in \f$ \Theta(1) \f$ time. * * @returns The number of total bytes in the payloads channel * + * \note This is not a thread-safe call + * \note Even though there might be space for additional tokens in the payload buffer, it is not guaranteed that + * the push() will succeed due to insufficient space in the coordination buffer */ size_t getPayloadDepth() { return getCircularBufferForPayloads()->getDepth(); } @@ -271,7 +276,25 @@ class Consumer final : public variableSize::Base * \returns true, if both message count and payload buffers are empty * \returns false, if one of the buffers is not empty */ - bool isEmpty() { return (getDepth() == 0); } + bool isEmpty() { return getCoordinationDepth() == 0; } + + /** + * This funciton can be used to quickly check whether the channel is becoming full when trying + * to push an element of a given size + * + * \param[in] requiredBufferSize size of the token to push into the channel + * + * \return true if there is enough space to push the token, false otherwise + */ + bool isFull(size_t requiredBufferSize) + { + auto coordinationCircularBuffer = getCircularBufferForCounts(); + if (coordinationCircularBuffer->getDepth() == coordinationCircularBuffer->getCapacity()) return true; + auto payloadCircularBuffer = getCircularBufferForPayloads(); + if (payloadCircularBuffer->getDepth() + requiredBufferSize > payloadCircularBuffer->getCapacity()) return true; + + return false; + } /** * Retrieves the pointer to the channel's payload buffer diff --git a/include/hicr/frontends/channel/variableSize/spsc/producer.hpp b/include/hicr/frontends/channel/variableSize/spsc/producer.hpp index 40236950..01f157d2 100644 --- a/include/hicr/frontends/channel/variableSize/spsc/producer.hpp +++ b/include/hicr/frontends/channel/variableSize/spsc/producer.hpp @@ -48,7 +48,8 @@ class Producer : public variableSize::Base * \param[in] payloadCommunicationManager The backend's memory manager to facilitate communication between the producer and consumer payload buffers * \param[in] sizeInfoBuffer The local memory slot used to hold the information about the next message size * \param[in] payloadBuffer The global memory slot pertaining to the payload of all messages. The producer will push messages into this - * buffer, while there is enough space. This buffer should be large enough to hold at least the largest of the variable-size messages. + * buffer, while there is enough space. This buffer should be large enough to hold twice the capacity specified by @ref payloadCapacity argument. + * Half of the buffer is used as excess buffer to avoid internal fragmentation of messages * \param[in] tokenBuffer The memory slot pertaining to the token buffer, which is used to hold message size data. * The producer will push message sizes into this buffer, while there is enough space. This buffer should be large enough to * hold at least one message size. @@ -107,8 +108,11 @@ class Producer : public variableSize::Base __INLINE__ size_t getPayloadSize() { return _payloadSize; } /** - * get payload buffer depth - * @return payload buffer depth (in bytes) + * Get payload buffer depth + * + * \return payload buffer depth (in bytes). It is the occupancy of the buffers + * \note Even though there might be space for additional tokens in the payload buffer, it is not guaranteed that + * the push() will succeed due to insufficient space in the coordination buffer */ __INLINE__ size_t getPayloadDepth() { return getCircularBufferForPayloads()->getDepth(); } @@ -117,16 +121,6 @@ class Producer : public variableSize::Base * @return payload buffer capacity (in bytes) */ __INLINE__ size_t getPayloadCapacity() { return getCircularBufferForPayloads()->getCapacity(); } - - /** - * Given a proposed message, indicate whether there is enough payload space to push it - */ - __INLINE__ bool hasEnoughPayloadSpace(const size_t msgSize) - { - auto currentPayloadDepth = getCircularBufferForPayloads()->getDepth(); - if (msgSize + currentPayloadDepth > getPayloadCapacity()) return false; - return true; - } /** * Puts new variable-sized messages unto the channel. @@ -156,64 +150,59 @@ class Producer : public variableSize::Base if (n != 1) HICR_THROW_RUNTIME("HiCR currently has no implementation for n != 1 with push(sourceSlot, n) for variable size version."); // Make sure source slot is beg enough to satisfy the operation - size_t requiredBufferSize = sourceSlot->getSize(); - size_t providedBufferCapacity = getPayloadCapacity(); + size_t requiredPayloadBufferSize = sourceSlot->getSize(); + size_t providedPayloadBufferCapacity = getPayloadCapacity(); + + // size_t requiredCoordinationBufferSize = getTokenSize(); + // size_t providedCoordinationBufferCapacity = getPayloadCapacity(); // Updating depth of token (message sizes) and payload buffers updateDepth(); auto currentPayloadDepth = getCircularBufferForPayloads()->getDepth(); - auto currentDepth = getDepth(); - - /* - * Part 1: Copy the payload data - */ - if (hasEnoughPayloadSpace(requiredBufferSize) == false) + auto currentDepth = getCoordinationDepth(); + + /** + * Payload copy. + * 2 possible scenarios: + * + * 1) we still have space after the head after the push + * 2) we do not have enough space ahead after the push, but we have it at the beginning of the buffer + * and their sum allows us to push the token. In such case, push to the excess buffer so that we avoid + * breaking the token in 2 chunks. The head is always pointing to a valid position in the buffer + * + * 0 buffer capacity End of excess buffer + * TAIL HEAD1 + * |-------|--------|-------|-------------|----------| + * HEAD + * + * In this case HEAD1 indicates where the the token ends in the implementation, while HEAD its logical position + * + * If neither of those is true, we can not push + * */ + + // Check that the token to push is smaller than the buffer capacity + if (requiredPayloadBufferSize > providedPayloadBufferCapacity) + HICR_THROW_RUNTIME("Attempting to push (%lu) bytes, while the channel has a maximum capacity of (%lu)", requiredPayloadBufferSize, providedPayloadBufferCapacity); + + // Check whether there is enough space in the buffer to push the token + if (currentPayloadDepth + requiredPayloadBufferSize > providedPayloadBufferCapacity) HICR_THROW_RUNTIME("Attempting to push (%lu) bytes while the channel currently has payload depth (%lu). This would exceed capacity (%lu).\n", - requiredBufferSize, + requiredPayloadBufferSize, currentPayloadDepth, - providedBufferCapacity); + providedPayloadBufferCapacity); // Get communication managers - auto payloadCommunicationManager = getPayloadCommunicationManager(); + auto payloadCommunicationManager = getPayloadCommunicationManager(); + auto coordinationCommunicationManager = getCoordinationCommunicationManager(); - /* - * Payload copy: - * - We have checked (requiredBufferSize <= depth) - * that the payload fits into available circular buffer, - * but it is possible it spills over the end into the - * beginning. Cover this corner case below - */ - if (requiredBufferSize + getPayloadHeadPosition() > getPayloadCapacity()) - { - size_t first_chunk = getPayloadCapacity() - getPayloadHeadPosition(); - size_t second_chunk = requiredBufferSize - first_chunk; - - // copy first part to end of buffer - payloadCommunicationManager->memcpy(_payloadBuffer, /* destination */ - getPayloadHeadPosition(), /* dst_offset */ - sourceSlot, /* source */ - 0, /* src_offset */ - first_chunk); /* size */ - // copy second part to beginning of buffer - payloadCommunicationManager->memcpy(_payloadBuffer, /* destination */ - 0, /* dst_offset */ - sourceSlot, /* source */ - first_chunk, /* src_offset */ - second_chunk); /* size */ - payloadCommunicationManager->fence(sourceSlot, 2, 0); - } - else - { - payloadCommunicationManager->memcpy(_payloadBuffer, getPayloadHeadPosition(), sourceSlot, 0, requiredBufferSize); - payloadCommunicationManager->fence(sourceSlot, 1, 0); - } - - getCircularBufferForPayloads()->advanceHead(requiredBufferSize); + // Payload copy. Just push to the channel, we know there is enough space + payloadCommunicationManager->memcpy(_payloadBuffer, getPayloadHeadPosition(), sourceSlot, 0, requiredPayloadBufferSize); + payloadCommunicationManager->fence(sourceSlot, 1, 0); - auto coordinationCommunicationManager = getCoordinationCommunicationManager(); + // Advance the head in the payload buffer + getCircularBufferForPayloads()->advanceHead(requiredPayloadBufferSize); - // update the consumer coordination buffers (consumer does not update - // its own coordination head positions) + // update the consumer coordination buffers (consumer does not update its own coordination head positions) coordinationCommunicationManager->memcpy(_consumerCoordinationBufferForPayloads, _HICR_CHANNEL_HEAD_ADVANCE_COUNT_IDX * sizeof(size_t), getCoordinationBufferForPayloads(), @@ -224,15 +213,14 @@ class Producer : public variableSize::Base /* * Part 2: Copy the message size */ - auto *sizeInfoBufferPtr = static_cast(_sizeInfoBuffer->getPointer()); - sizeInfoBufferPtr[0] = requiredBufferSize; + sizeInfoBufferPtr[0] = requiredPayloadBufferSize; // If the exchange buffer does not have n free slots, reject the operation if (currentDepth + 1 > getCircularBufferForCounts()->getCapacity()) HICR_THROW_RUNTIME("Attempting to push with (%lu) tokens while the channel has (%lu) tokens and this would exceed capacity (%lu).\n", 1, - getDepth(), + getCoordinationDepth(), getCircularBufferForCounts()->getCapacity()); coordinationCommunicationManager->memcpy(_tokenBuffer, /* destination */ @@ -252,14 +240,16 @@ class Producer : public variableSize::Base } /** - * Get depth of variable-size producer. Since we have 2 buffers - one - * for counts, and one for payloads, we need to be careful. + * Get depth of the coordination buffer of variable-size producer. * Because the current implementation first receives the payload (phase 1) before // receiving the message counts (phase 2), returning this depth should guarantee // we already have received the payloads * @return The number of elements in the variable-size producer channel + * + * \note Even though there might be space for additional tokens in the coordination buffer, it is not guaranteed that + * the push() will succeed due to insufficient space in the payload buffer */ - size_t getDepth() { return getCircularBufferForCounts()->getDepth(); } + size_t getCoordinationDepth() { return getCircularBufferForCounts()->getDepth(); } /** * This function can be used to quickly check whether the channel is empty. @@ -268,8 +258,27 @@ class Producer : public variableSize::Base * * \returns true, if both message count and payload buffers are empty * \returns false, if one of the buffers is not empty + * */ - bool isEmpty() { return getDepth() == 0; } + bool isEmpty() { return getCoordinationDepth() == 0; } + + /** + * This funciton can be used to quickly check whether the channel is becoming full when trying + * to push an element of a given size + * + * \param[in] requiredBufferSize size of the token to push into the channel + * + * \return true if there is enough space to push the token, false otherwise + */ + bool isFull(size_t requiredBufferSize) + { + auto coordinationCircularBuffer = getCircularBufferForCounts(); + if (coordinationCircularBuffer->getDepth() == coordinationCircularBuffer->getCapacity()) return true; + auto payloadCircularBuffer = getCircularBufferForPayloads(); + if (payloadCircularBuffer->getDepth() + requiredBufferSize == payloadCircularBuffer->getCapacity()) return true; + + return false; + } private: diff --git a/tests/frontends/channel/meson.build b/tests/frontends/channel/meson.build index ff48683b..2b3a2aa0 100644 --- a/tests/frontends/channel/meson.build +++ b/tests/frontends/channel/meson.build @@ -1,2 +1,3 @@ subdir('circularBuffer') subdir('fixedSize') +subdir('variableSize') diff --git a/tests/frontends/channel/variableSize/meson.build b/tests/frontends/channel/variableSize/meson.build new file mode 100644 index 00000000..27e4d367 --- /dev/null +++ b/tests/frontends/channel/variableSize/meson.build @@ -0,0 +1 @@ +subdir('spsc') diff --git a/tests/frontends/channel/variableSize/spsc/include/channelFixture.hpp b/tests/frontends/channel/variableSize/spsc/include/channelFixture.hpp new file mode 100644 index 00000000..65c12109 --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/include/channelFixture.hpp @@ -0,0 +1,38 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include + +class ChannelFixture : public ::testing::Test +{ + public: + + ChannelFixture() + : ::testing::Test() + { + _mpiInstanceManager = std::make_unique(MPI_COMM_WORLD); + _mpiCommunicationManager = std::make_unique(MPI_COMM_WORLD); + _mpiMemoryManager = std::make_unique(); + _pthreadsComputeManager = std::make_unique(); + + _hwlocTopologyManager = HiCR::backend::hwloc::TopologyManager::createDefault(); + + _topology = _hwlocTopologyManager->queryTopology(); + } + + std::unique_ptr _mpiCommunicationManager; + std::unique_ptr _mpiInstanceManager; + std::unique_ptr _mpiMemoryManager; + std::unique_ptr _hwlocTopologyManager; + + std::unique_ptr _pthreadsComputeManager; + + HiCR::Topology _topology; +}; \ No newline at end of file diff --git a/tests/frontends/channel/variableSize/spsc/mainMPI.cpp b/tests/frontends/channel/variableSize/spsc/mainMPI.cpp new file mode 100644 index 00000000..ac693531 --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/mainMPI.cpp @@ -0,0 +1,31 @@ +#include +#include + +#include +#include + +class MPITestEnvironment : public ::testing::Environment +{ + void TearDown() override + { + int result = ::testing::UnitTest::GetInstance()->failed_test_count(); + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + if (result > 0) + { + std::this_thread::sleep_for(std::chrono::seconds(1700)); + fprintf(stderr, "[Rank %d] Test failed, aborting MPI.\n", rank); + MPI_Abort(MPI_COMM_WORLD, 1); + } + } +}; + +int main(int argc, char **argv) +{ + MPI_Init(&argc, &argv); + ::testing::InitGoogleTest(&argc, argv); + ::testing::AddGlobalTestEnvironment(new MPITestEnvironment); + int result = RUN_ALL_TESTS(); + MPI_Finalize(); + return result; +} diff --git a/tests/frontends/channel/variableSize/spsc/meson.build b/tests/frontends/channel/variableSize/spsc/meson.build new file mode 100644 index 00000000..ed2b47b8 --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/meson.build @@ -0,0 +1,9 @@ +channelsTestIncludeDirs = include_directories(['include']) + +channelsTestMPIDep = declare_dependency( + sources: ['mainMPI.cpp'], + include_directories: channelsTestIncludeDirs, + dependencies: [HiCRTestBuildDependencies], +) + +subdir('source') \ No newline at end of file diff --git a/tests/frontends/channel/variableSize/spsc/source/common.hpp b/tests/frontends/channel/variableSize/spsc/source/common.hpp new file mode 100644 index 00000000..c0d8dcce --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/source/common.hpp @@ -0,0 +1,68 @@ +/* + * Copyright 2025 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#define CHANNEL_TAG 0 +#define SIZES_BUFFER_KEY 0 +#define PRODUCER_COORDINATION_BUFFER_FOR_SIZES_KEY 1 +#define PRODUCER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY 2 +#define CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY 3 +#define CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY 4 +#define CONSUMER_PAYLOAD_KEY 5 +#define ELEMENT_TYPE unsigned int +#define CHANNEL_CAPACITY 5 + +using namespace std; +template +class Printer +{ + public: + + static void printBytes(const string prepend, void *buffer, size_t channelCapacity, size_t startIndex, size_t bytes) + { + if (bytes > channelCapacity) + { + std::cout << "Bytes larger than channel capacity, will not print\n"; + return; + } + + cout << "=====\n"; + cout << prepend; + if (buffer == NULL) + { + cerr << "buffer is NULL in print routine" << endl; + abort(); + } + if (startIndex + bytes <= channelCapacity) + { + vector v(static_cast(buffer) + startIndex / sizeof(T), static_cast(buffer) + (startIndex + bytes) / sizeof(ELEMENT_TYPE)); + copy(v.begin(), v.end(), ostream_iterator(cout, ",")); + } + else + { + vector v1(static_cast(buffer) + startIndex / sizeof(T), static_cast(buffer) + channelCapacity / sizeof(ELEMENT_TYPE)); + vector v2(static_cast(buffer), static_cast(buffer) + (bytes + startIndex - channelCapacity) / sizeof(ELEMENT_TYPE)); + copy(v1.begin(), v1.end(), ostream_iterator(cout, ",")); + copy(v2.begin(), v2.end(), ostream_iterator(cout, ",")); + } + cout << "\n=====\n"; + } +}; diff --git a/tests/frontends/channel/variableSize/spsc/source/consumer.hpp b/tests/frontends/channel/variableSize/spsc/source/consumer.hpp new file mode 100644 index 00000000..22bb4317 --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/source/consumer.hpp @@ -0,0 +1,206 @@ +/* + * Copyright 2025 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include "common.hpp" + +__INLINE__ std::shared_ptr peek(HiCR::channel::variableSize::SPSC::Consumer &consumerInterface, + HiCR::MemoryManager &memoryManager, + std::shared_ptr &memorySpace) +{ + // If the buffer is full, returning false + while (consumerInterface.isEmpty()) {} + + // Pushing buffer + auto result = consumerInterface.peek(); + + // Getting absolute pointer to the token + size_t tokenPos = result[0]; + size_t tokenSize = result[1]; + auto tokenBuffer = (uint8_t *)consumerInterface.getPayloadBufferMemorySlot()->getSourceLocalMemorySlot()->getPointer(); + void *tokenPtr = &tokenBuffer[tokenPos]; + + // Register and return the memory slot + return memoryManager.registerLocalMemorySlot(memorySpace, tokenPtr, tokenSize); +} + +void consumerFc(HiCR::MemoryManager &coordinationMemoryManager, + HiCR::MemoryManager &payloadMemoryManager, + HiCR::CommunicationManager &coordinationCommunicationManager, + HiCR::CommunicationManager &payloadCommunicationManager, + std::shared_ptr coordinationMemorySpace, + std::shared_ptr payloadMemorySpace, + const size_t channelCapacity) +{ + // Getting required buffer sizes + auto sizesBufferSize = HiCR::channel::variableSize::Base::getTokenBufferSize(sizeof(size_t), channelCapacity); + + // Allocating sizes buffer as a local memory slot + auto sizesBufferSlot = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, sizesBufferSize); + + // Allocating payload buffer as a local memory slot + auto payloadBufferSlot = payloadMemoryManager.allocateLocalMemorySlot(payloadMemorySpace, 2 * CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); + + // Getting required buffer size + auto coordinationBufferSize = HiCR::channel::variableSize::Base::getCoordinationBufferSize(); + + // Allocating coordination buffer for internal message size metadata + auto coordinationBufferForCounts = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize); + + // Allocating coordination buffer for internal payload metadata + auto coordinationBufferForPayloads = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize); + + // Initializing coordination buffer (sets to zero the counters) + HiCR::channel::variableSize::Base::initializeCoordinationBuffer(coordinationBufferForCounts); + + HiCR::channel::variableSize::Base::initializeCoordinationBuffer(coordinationBufferForPayloads); + + // Exchanging local memory slots to become global for them to be used by the remote end + coordinationCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, + {{SIZES_BUFFER_KEY, sizesBufferSlot}, + {CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY, coordinationBufferForCounts}, + {CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY, coordinationBufferForPayloads}}); + + payloadCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, {{CONSUMER_PAYLOAD_KEY, payloadBufferSlot}}); + + // Synchronizing so that all actors have finished registering their global memory slots + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Obtaining the globally exchanged memory slots + std::shared_ptr globalSizesBufferSlot = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, SIZES_BUFFER_KEY); + + auto producerCoordinationBufferForCounts = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, PRODUCER_COORDINATION_BUFFER_FOR_SIZES_KEY); + auto producerCoordinationBufferForPayloads = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, PRODUCER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY); + auto consumerCoordinationBufferForCounts = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY); + auto consumerCoordinationBufferForPayloads = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY); + auto payloadBuffer = payloadCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_PAYLOAD_KEY); + + // Creating producer and consumer channels + auto consumer = HiCR::channel::variableSize::SPSC::Consumer(coordinationCommunicationManager, + payloadCommunicationManager, + payloadBuffer /*payload buffer */, + globalSizesBufferSlot, + coordinationBufferForCounts, + coordinationBufferForPayloads, + producerCoordinationBufferForCounts, + producerCoordinationBufferForPayloads, + CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE), + channelCapacity); + + ASSERT_TRUE(consumer.isEmpty()); + ASSERT_EQ(consumer.getCoordinationDepth(), 0); + ASSERT_EQ(consumer.getPayloadDepth(), 0); + ASSERT_FALSE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE))); + ASSERT_TRUE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE) + 1)); + + // Wait for producer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Wait for the producer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + consumer.updateDepth(); + ASSERT_EQ(consumer.getCoordinationDepth(), 1); + ASSERT_EQ(consumer.getPayloadDepth(), CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); + + // Peek + auto res = consumer.peek(); + ASSERT_EQ(res[0], 0); + ASSERT_EQ(res[1], CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); + + auto tokenBuffer = (uint8_t *)consumer.getPayloadBufferMemorySlot()->getSourceLocalMemorySlot()->getPointer(); + void *tokenPtr = &tokenBuffer[res[0]]; + for (ELEMENT_TYPE i = 0; i < (res[1] / sizeof(ELEMENT_TYPE)); ++i) { ASSERT_EQ(i, static_cast(tokenPtr)[i]); } + + consumer.pop(); + ASSERT_TRUE(consumer.isEmpty()); + ASSERT_EQ(consumer.getCoordinationDepth(), 0); + ASSERT_EQ(consumer.getPayloadDepth(), 0); + + // Wait for the producer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Send token one by one + for (size_t i = 0; i < CHANNEL_CAPACITY; ++i) + { + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + ASSERT_EQ(consumer.getCoordinationDepth(), i + 1); + ASSERT_EQ(consumer.getPayloadDepth(), (i + 1) * sizeof(ELEMENT_TYPE)); + } + + ASSERT_TRUE(consumer.isFull(0)); + + // Wait for the producer 4 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Pop token one by one + auto peekIndex = 0; + for (size_t i = CHANNEL_CAPACITY; i > 0; --i) + { + ASSERT_EQ(consumer.getCoordinationDepth(), i); + ASSERT_EQ(consumer.getPayloadDepth(), i * sizeof(ELEMENT_TYPE)); + auto res = consumer.peek(); + ASSERT_EQ(res[0], peekIndex++ * sizeof(ELEMENT_TYPE)); + ASSERT_EQ(res[1], sizeof(ELEMENT_TYPE)); + + auto tokenBuffer = (uint8_t *)consumer.getPayloadBufferMemorySlot()->getSourceLocalMemorySlot()->getPointer(); + void *tokenPtr = &tokenBuffer[res[0]]; + auto token = static_cast(tokenPtr)[0]; + for (ELEMENT_TYPE i = 0; i < CHANNEL_CAPACITY; ++i) { ASSERT_EQ(0, token); } + consumer.pop(); + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + ASSERT_EQ(consumer.getCoordinationDepth(), i - 1); + ASSERT_EQ(consumer.getPayloadDepth(), (i - 1) * sizeof(ELEMENT_TYPE)); + } + + // Wait for the producer 5 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // De-registering global slots + coordinationCommunicationManager.deregisterGlobalMemorySlot(globalSizesBufferSlot); + coordinationCommunicationManager.deregisterGlobalMemorySlot(producerCoordinationBufferForCounts); + coordinationCommunicationManager.deregisterGlobalMemorySlot(producerCoordinationBufferForPayloads); + coordinationCommunicationManager.deregisterGlobalMemorySlot(consumerCoordinationBufferForCounts); + coordinationCommunicationManager.deregisterGlobalMemorySlot(consumerCoordinationBufferForPayloads); + + // Destroying global slots (collective calls) + coordinationCommunicationManager.destroyGlobalMemorySlot(consumerCoordinationBufferForCounts); + coordinationCommunicationManager.destroyGlobalMemorySlot(consumerCoordinationBufferForPayloads); + payloadCommunicationManager.destroyGlobalMemorySlot(payloadBuffer); + + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Freeing up local memory + payloadMemoryManager.freeLocalMemorySlot(payloadBufferSlot); + coordinationMemoryManager.freeLocalMemorySlot(sizesBufferSlot); + coordinationMemoryManager.freeLocalMemorySlot(coordinationBufferForCounts); + coordinationMemoryManager.freeLocalMemorySlot(coordinationBufferForPayloads); +} diff --git a/tests/frontends/channel/variableSize/spsc/source/fillBuffers.cpp b/tests/frontends/channel/variableSize/spsc/source/fillBuffers.cpp new file mode 100644 index 00000000..0383d336 --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/source/fillBuffers.cpp @@ -0,0 +1,60 @@ +#include +#include +#include + +#include +#include +#include +#include + +#include "channelFixture.hpp" +#include "producer.hpp" +#include "consumer.hpp" + +TEST_F(ChannelFixture, fillBufferCounter) +{ + // Getting MPI values + int rankCount = 0; + int rankId = 0; + MPI_Comm_rank(MPI_COMM_WORLD, &rankId); + MPI_Comm_size(MPI_COMM_WORLD, &rankCount); + + // Sanity Check + if (rankCount != 2) + { + if (rankId == 0) fprintf(stderr, "Launch error: MPI process count must be equal to 2\n"); + MPI_Finalize(); + } + + // Reading argument + size_t channelCapacity = CHANNEL_CAPACITY; + + // Instantiating backend + HiCR::backend::mpi::MemoryManager m; + HiCR::backend::mpi::CommunicationManager c(MPI_COMM_WORLD); + + // Creating HWloc topology object + hwloc_topology_t topology; + + // Reserving memory for hwloc + hwloc_topology_init(&topology); + + // Initializing host (CPU) topology manager + HiCR::backend::hwloc::TopologyManager dm(&topology); + + // Asking backend to check the available devices + const auto t = dm.queryTopology(); + + // Getting first device found + auto d = *t.getDevices().begin(); + + // Obtaining memory spaces + auto memSpaces = d->getMemorySpaceList(); + + // Getting a reference to the first memory space + auto firstMemorySpace = *memSpaces.begin(); + + // Rank 0 is producer, Rank 1 is consumer + if (rankId == 0) producerFc(m, m, c, c, firstMemorySpace, firstMemorySpace, channelCapacity); + if (rankId == 1) consumerFc(m, m, c, c, firstMemorySpace, firstMemorySpace, channelCapacity); +} diff --git a/tests/frontends/channel/variableSize/spsc/source/meson.build b/tests/frontends/channel/variableSize/spsc/source/meson.build new file mode 100644 index 00000000..c91ae60d --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/source/meson.build @@ -0,0 +1,21 @@ +testSuite = ['tests', 'channels', 'variableSize', 'spsc', 'distributed'] +test_timeout = 60 + +if 'mpi' in enabledBackends and 'hwloc' in enabledBackends + mpi = executable( + 'fillBuffers', + ['fillBuffers.cpp'], + dependencies: channelsTestMPIDep, + include_directories: [exampleBuildIncludes], + ) + + if get_option('buildTests') + test( + 'fillBuffers', + mpirunExecutable, + args: ['-np', '2', '--oversubscribe', mpi.full_path()], + timeout: test_timeout, + suite: testSuite, + ) + endif +endif \ No newline at end of file diff --git a/tests/frontends/channel/variableSize/spsc/source/producer.hpp b/tests/frontends/channel/variableSize/spsc/source/producer.hpp new file mode 100644 index 00000000..fbf770c4 --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/source/producer.hpp @@ -0,0 +1,162 @@ +/* + * Copyright 2025 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "common.hpp" + +void producerFc(HiCR::MemoryManager &coordinationMemoryManager, + HiCR::MemoryManager &payloadMemoryManager, + HiCR::CommunicationManager &coordinationCommunicationManager, + HiCR::CommunicationManager &payloadCommunicationManager, + std::shared_ptr coordinationMemorySpace, + std::shared_ptr payloadMemorySpace, + const size_t channelCapacity) +{ + // Getting required buffer size + auto coordinationBufferSize = HiCR::channel::variableSize::Base::getCoordinationBufferSize(); + + // Allocating sizes buffer as a local memory slot + auto coordinationBufferForCounts = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize); + + auto coordinationBufferForPayloads = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize); + + auto sizeInfoBuffer = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, sizeof(size_t)); + + // Initializing coordination buffers for message sizes and payloads (sets to zero the counters) + HiCR::channel::variableSize::Base::initializeCoordinationBuffer(coordinationBufferForCounts); + HiCR::channel::variableSize::Base::initializeCoordinationBuffer(coordinationBufferForPayloads); + + // Exchanging local memory slots to become global for them to be used by the remote end + coordinationCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, /* global tag */ + {{PRODUCER_COORDINATION_BUFFER_FOR_SIZES_KEY, coordinationBufferForCounts}, /* key-slot pairs */ + {PRODUCER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY, coordinationBufferForPayloads}}); + + payloadCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, {}); + + // Synchronizing so that all actors have finished registering their global memory slots + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Obtaining the globally exchanged memory slots + auto sizesBuffer = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, SIZES_BUFFER_KEY); + auto producerCoordinationBufferForCounts = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, PRODUCER_COORDINATION_BUFFER_FOR_SIZES_KEY); + auto producerCoordinationBufferForPayloads = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, PRODUCER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY); + auto consumerCoordinationBufferForCounts = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY); + auto consumerCoordinationBufferForPayloads = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY); + auto payloadBuffer = payloadCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_PAYLOAD_KEY); + + // Creating producer and consumer channels + auto producer = HiCR::channel::variableSize::SPSC::Producer(coordinationCommunicationManager, + payloadCommunicationManager, + sizeInfoBuffer, + payloadBuffer, + sizesBuffer, + coordinationBufferForCounts, + coordinationBufferForPayloads, + consumerCoordinationBufferForCounts, + consumerCoordinationBufferForPayloads, + CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE), + sizeof(ELEMENT_TYPE), + channelCapacity); + + ////////////////////// Test begin + + // Send a buffer big as the buffer channel + ELEMENT_TYPE sendBuffer[CHANNEL_CAPACITY] = {0, 1, 2, 3, 4}; + ELEMENT_TYPE sendBuffer2[1] = {0}; + auto sendBufferPtr = &sendBuffer; + auto sendBuffer2Ptr = &sendBuffer; + auto sendSlot = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBufferPtr, sizeof(sendBuffer)); + auto sendSlot2 = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBuffer2Ptr, sizeof(sendBuffer2)); + + // Wait for the consumer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Pushing first batch that fills the channel + ASSERT_EQ(producer.getPayloadCapacity(), CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); + ASSERT_TRUE(producer.isEmpty()); + ASSERT_FALSE(producer.isFull(0)); + producer.updateDepth(); + ASSERT_EQ(producer.getCoordinationDepth(), 0); + ASSERT_EQ(producer.getPayloadDepth(), 0); + EXPECT_NO_THROW(producer.push(sendSlot)); + ASSERT_TRUE(producer.isFull(0)); + producer.updateDepth(); + ASSERT_EQ(producer.getCoordinationDepth(), 1); + ASSERT_EQ(producer.getPayloadDepth(), CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); + EXPECT_THROW(producer.push(sendSlot2), HiCR::RuntimeException); + + // Wait for the consumer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Wait for the consumer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Send token one by one + for (size_t i = 0; i < CHANNEL_CAPACITY; ++i) + { + ELEMENT_TYPE sendBuffer[1] = {0}; + auto sendBufferPtr = &sendBuffer; + auto sendSlot = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBufferPtr, sizeof(sendBuffer)); + ASSERT_EQ(producer.getCoordinationDepth(), i); + ASSERT_EQ(producer.getPayloadDepth(), i * sizeof(ELEMENT_TYPE)); + ASSERT_NO_THROW(producer.push(sendSlot)); + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + ASSERT_EQ(producer.getCoordinationDepth(), i + 1); + ASSERT_EQ(producer.getPayloadDepth(), (i + 1) * sizeof(ELEMENT_TYPE)); + } + + ASSERT_TRUE(producer.isFull(0)); + + // Wait for the consumer 4 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + for (size_t i = CHANNEL_CAPACITY; i > 0; --i) + { + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + ASSERT_EQ(producer.getCoordinationDepth(), i - 1); + ASSERT_EQ(producer.getPayloadDepth(), (i - 1) * sizeof(ELEMENT_TYPE)); + } + + // Wait for the consumer 5 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Destroying global slots (collective calls) + coordinationCommunicationManager.destroyGlobalMemorySlot(sizesBuffer); + coordinationCommunicationManager.destroyGlobalMemorySlot(producerCoordinationBufferForCounts); + coordinationCommunicationManager.destroyGlobalMemorySlot(producerCoordinationBufferForPayloads); + + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Freeing up local memory + coordinationMemoryManager.freeLocalMemorySlot(coordinationBufferForCounts); + coordinationMemoryManager.freeLocalMemorySlot(coordinationBufferForPayloads); + coordinationMemoryManager.freeLocalMemorySlot(sizeInfoBuffer); +} From 7275e1352dae562c307a5a98b1d286a9cb90268d Mon Sep 17 00:00:00 2001 From: Luca Terracciano Date: Wed, 8 Oct 2025 10:46:06 +0200 Subject: [PATCH 2/5] tests: refactor variable size spsc test --- .../spsc/include/channelFixture.hpp | 216 ++++++++++++++++-- .../variableSize/spsc/source/consumer.hpp | 87 +------ .../variableSize/spsc/source/fillBuffers.cpp | 45 +--- .../variableSize/spsc/source/producer.hpp | 72 +----- 4 files changed, 211 insertions(+), 209 deletions(-) diff --git a/tests/frontends/channel/variableSize/spsc/include/channelFixture.hpp b/tests/frontends/channel/variableSize/spsc/include/channelFixture.hpp index 65c12109..23ddc5e0 100644 --- a/tests/frontends/channel/variableSize/spsc/include/channelFixture.hpp +++ b/tests/frontends/channel/variableSize/spsc/include/channelFixture.hpp @@ -1,38 +1,224 @@ #pragma once #include -#include #include #include #include #include #include -#include +#include +#include + +#include "common.hpp" class ChannelFixture : public ::testing::Test { public: - ChannelFixture() - : ::testing::Test() + std::unique_ptr createProducer(HiCR::MemoryManager &coordinationMemoryManager, + HiCR::MemoryManager &payloadMemoryManager, + HiCR::CommunicationManager &coordinationCommunicationManager, + HiCR::CommunicationManager &payloadCommunicationManager, + std::shared_ptr coordinationMemorySpace, + std::shared_ptr payloadMemorySpace, + const size_t channelCapacity) + { + // Getting required buffer size + auto coordinationBufferSize = HiCR::channel::variableSize::Base::getCoordinationBufferSize(); + + // Allocating sizes buffer as a local memory slot + auto coordinationBufferForCounts = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize); + + auto coordinationBufferForPayloads = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize); + + auto sizeInfoBuffer = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, sizeof(size_t)); + + // Initializing coordination buffers for message sizes and payloads (sets to zero the counters) + HiCR::channel::variableSize::Base::initializeCoordinationBuffer(coordinationBufferForCounts); + HiCR::channel::variableSize::Base::initializeCoordinationBuffer(coordinationBufferForPayloads); + + // Exchanging local memory slots to become global for them to be used by the remote end + coordinationCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, /* global tag */ + {{PRODUCER_COORDINATION_BUFFER_FOR_SIZES_KEY, coordinationBufferForCounts}, /* key-slot pairs */ + {PRODUCER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY, coordinationBufferForPayloads}}); + + payloadCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, {}); + + // Synchronizing so that all actors have finished registering their global memory slots + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Obtaining the globally exchanged memory slots + auto sizesBuffer = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, SIZES_BUFFER_KEY); + auto producerCoordinationBufferForCounts = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, PRODUCER_COORDINATION_BUFFER_FOR_SIZES_KEY); + auto producerCoordinationBufferForPayloads = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, PRODUCER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY); + auto consumerCoordinationBufferForCounts = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY); + auto consumerCoordinationBufferForPayloads = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY); + auto payloadBuffer = payloadCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_PAYLOAD_KEY); + + _localSlots.insert(coordinationBufferForCounts); + _localSlots.insert(coordinationBufferForPayloads); + _localSlots.insert(sizeInfoBuffer); + + _globalSlotsToDestroy.insert(sizesBuffer); + _globalSlotsToDestroy.insert(producerCoordinationBufferForCounts); + _globalSlotsToDestroy.insert(producerCoordinationBufferForPayloads); + + _globalSlots.insert(sizesBuffer); + _globalSlots.insert(producerCoordinationBufferForCounts); + _globalSlots.insert(producerCoordinationBufferForPayloads); + _globalSlots.insert(consumerCoordinationBufferForCounts); + _globalSlots.insert(consumerCoordinationBufferForPayloads); + _globalSlots.insert(payloadBuffer); + + // Creating producer and consumer channels + return std::make_unique(coordinationCommunicationManager, + payloadCommunicationManager, + sizeInfoBuffer, + payloadBuffer, + sizesBuffer, + coordinationBufferForCounts, + coordinationBufferForPayloads, + consumerCoordinationBufferForCounts, + consumerCoordinationBufferForPayloads, + CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE), + sizeof(ELEMENT_TYPE), + channelCapacity); + } + + std::unique_ptr createConsumer(HiCR::MemoryManager &coordinationMemoryManager, + HiCR::MemoryManager &payloadMemoryManager, + HiCR::CommunicationManager &coordinationCommunicationManager, + HiCR::CommunicationManager &payloadCommunicationManager, + std::shared_ptr coordinationMemorySpace, + std::shared_ptr payloadMemorySpace, + const size_t channelCapacity) + { + // Getting required buffer sizes + auto sizesBufferSize = HiCR::channel::variableSize::Base::getTokenBufferSize(sizeof(size_t), channelCapacity); + + // Allocating sizes buffer as a local memory slot + auto sizesBufferSlot = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, sizesBufferSize); + + // Allocating payload buffer as a local memory slot + auto payloadBufferSlot = payloadMemoryManager.allocateLocalMemorySlot(payloadMemorySpace, 2 * CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); + + // Getting required buffer size + auto coordinationBufferSize = HiCR::channel::variableSize::Base::getCoordinationBufferSize(); + + // Allocating coordination buffer for internal message size metadata + auto coordinationBufferForCounts = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize); + + // Allocating coordination buffer for internal payload metadata + auto coordinationBufferForPayloads = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize); + + // Initializing coordination buffer (sets to zero the counters) + HiCR::channel::variableSize::Base::initializeCoordinationBuffer(coordinationBufferForCounts); + + HiCR::channel::variableSize::Base::initializeCoordinationBuffer(coordinationBufferForPayloads); + + // Exchanging local memory slots to become global for them to be used by the remote end + coordinationCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, + {{SIZES_BUFFER_KEY, sizesBufferSlot}, + {CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY, coordinationBufferForCounts}, + {CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY, coordinationBufferForPayloads}}); + + payloadCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, {{CONSUMER_PAYLOAD_KEY, payloadBufferSlot}}); + + // Synchronizing so that all actors have finished registering their global memory slots + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Obtaining the globally exchanged memory slots + std::shared_ptr globalSizesBufferSlot = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, SIZES_BUFFER_KEY); + + auto producerCoordinationBufferForCounts = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, PRODUCER_COORDINATION_BUFFER_FOR_SIZES_KEY); + auto producerCoordinationBufferForPayloads = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, PRODUCER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY); + auto consumerCoordinationBufferForCounts = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY); + auto consumerCoordinationBufferForPayloads = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY); + auto payloadBuffer = payloadCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_PAYLOAD_KEY); + + _localSlots.insert(sizesBufferSlot); + _localSlots.insert(payloadBufferSlot); + _localSlots.insert(coordinationBufferForCounts); + _localSlots.insert(coordinationBufferForPayloads); + + _globalSlotsToDestroy.insert(consumerCoordinationBufferForCounts); + _globalSlotsToDestroy.insert(consumerCoordinationBufferForPayloads); + _globalSlotsToDestroy.insert(payloadBuffer); + + _globalSlots.insert(globalSizesBufferSlot); + _globalSlots.insert(producerCoordinationBufferForCounts); + _globalSlots.insert(producerCoordinationBufferForPayloads); + _globalSlots.insert(consumerCoordinationBufferForCounts); + _globalSlots.insert(consumerCoordinationBufferForPayloads); + + // Creating producer and consumer channels + return std::make_unique(coordinationCommunicationManager, + payloadCommunicationManager, + payloadBuffer /*payload buffer */, + globalSizesBufferSlot, + coordinationBufferForCounts, + coordinationBufferForPayloads, + producerCoordinationBufferForCounts, + producerCoordinationBufferForPayloads, + CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE), + channelCapacity); + } + + std::unique_ptr _communicationManager; + std::unique_ptr _instanceManager; + std::unique_ptr _memoryManager; + std::unique_ptr _topologyManager; + std::unique_ptr _computeManager; + + std::unique_ptr _consumer; + std::unique_ptr _producer; + + std::shared_ptr _memorySpace; + + protected: + + void SetUp() override { - _mpiInstanceManager = std::make_unique(MPI_COMM_WORLD); - _mpiCommunicationManager = std::make_unique(MPI_COMM_WORLD); - _mpiMemoryManager = std::make_unique(); - _pthreadsComputeManager = std::make_unique(); + _instanceManager = std::make_unique(MPI_COMM_WORLD); - _hwlocTopologyManager = HiCR::backend::hwloc::TopologyManager::createDefault(); + // Sanity Check + if (_instanceManager->getInstances().size() != 2) + { + if (_instanceManager->getCurrentInstance()->isRootInstance()) fprintf(stderr, "Launch error: MPI process count must be equal to 2\n"); + MPI_Finalize(); + } - _topology = _hwlocTopologyManager->queryTopology(); + _communicationManager = std::make_unique(MPI_COMM_WORLD); + _memoryManager = std::make_unique(); + _computeManager = std::make_unique(); + _topologyManager = HiCR::backend::hwloc::TopologyManager::createDefault(); + + _topology = _topologyManager->queryTopology(); + _memorySpace = _topology.getDevices().begin().operator*()->getMemorySpaceList().begin().operator*(); + + if (_instanceManager->getCurrentInstance()->isRootInstance()) + { + _producer = createProducer(*_memoryManager, *_memoryManager, *_communicationManager, *_communicationManager, _memorySpace, _memorySpace, CHANNEL_CAPACITY); + } + else { _consumer = createConsumer(*_memoryManager, *_memoryManager, *_communicationManager, *_communicationManager, _memorySpace, _memorySpace, CHANNEL_CAPACITY); } } - std::unique_ptr _mpiCommunicationManager; - std::unique_ptr _mpiInstanceManager; - std::unique_ptr _mpiMemoryManager; - std::unique_ptr _hwlocTopologyManager; + void TearDown() override + { + for (auto &g : _globalSlots) { _communicationManager->deregisterGlobalMemorySlot(g); } + for (auto &g : _globalSlotsToDestroy) { _communicationManager->destroyGlobalMemorySlot(g); } + _communicationManager->fence(CHANNEL_TAG); + for (auto &l : _localSlots) { _memoryManager->freeLocalMemorySlot(l); } + } - std::unique_ptr _pthreadsComputeManager; + private: HiCR::Topology _topology; + + std::unordered_set> _globalSlots; + std::unordered_set> _globalSlotsToDestroy; + std::unordered_set> _localSlots; }; \ No newline at end of file diff --git a/tests/frontends/channel/variableSize/spsc/source/consumer.hpp b/tests/frontends/channel/variableSize/spsc/source/consumer.hpp index 22bb4317..b21544ca 100644 --- a/tests/frontends/channel/variableSize/spsc/source/consumer.hpp +++ b/tests/frontends/channel/variableSize/spsc/source/consumer.hpp @@ -42,70 +42,10 @@ __INLINE__ std::shared_ptr peek(HiCR::channel::variableSi return memoryManager.registerLocalMemorySlot(memorySpace, tokenPtr, tokenSize); } -void consumerFc(HiCR::MemoryManager &coordinationMemoryManager, - HiCR::MemoryManager &payloadMemoryManager, - HiCR::CommunicationManager &coordinationCommunicationManager, - HiCR::CommunicationManager &payloadCommunicationManager, - std::shared_ptr coordinationMemorySpace, - std::shared_ptr payloadMemorySpace, - const size_t channelCapacity) +void consumerFc(HiCR::CommunicationManager &coordinationCommunicationManager, + HiCR::CommunicationManager &payloadCommunicationManager, + HiCR::channel::variableSize::SPSC::Consumer &consumer) { - // Getting required buffer sizes - auto sizesBufferSize = HiCR::channel::variableSize::Base::getTokenBufferSize(sizeof(size_t), channelCapacity); - - // Allocating sizes buffer as a local memory slot - auto sizesBufferSlot = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, sizesBufferSize); - - // Allocating payload buffer as a local memory slot - auto payloadBufferSlot = payloadMemoryManager.allocateLocalMemorySlot(payloadMemorySpace, 2 * CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); - - // Getting required buffer size - auto coordinationBufferSize = HiCR::channel::variableSize::Base::getCoordinationBufferSize(); - - // Allocating coordination buffer for internal message size metadata - auto coordinationBufferForCounts = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize); - - // Allocating coordination buffer for internal payload metadata - auto coordinationBufferForPayloads = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize); - - // Initializing coordination buffer (sets to zero the counters) - HiCR::channel::variableSize::Base::initializeCoordinationBuffer(coordinationBufferForCounts); - - HiCR::channel::variableSize::Base::initializeCoordinationBuffer(coordinationBufferForPayloads); - - // Exchanging local memory slots to become global for them to be used by the remote end - coordinationCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, - {{SIZES_BUFFER_KEY, sizesBufferSlot}, - {CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY, coordinationBufferForCounts}, - {CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY, coordinationBufferForPayloads}}); - - payloadCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, {{CONSUMER_PAYLOAD_KEY, payloadBufferSlot}}); - - // Synchronizing so that all actors have finished registering their global memory slots - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); - - // Obtaining the globally exchanged memory slots - std::shared_ptr globalSizesBufferSlot = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, SIZES_BUFFER_KEY); - - auto producerCoordinationBufferForCounts = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, PRODUCER_COORDINATION_BUFFER_FOR_SIZES_KEY); - auto producerCoordinationBufferForPayloads = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, PRODUCER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY); - auto consumerCoordinationBufferForCounts = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY); - auto consumerCoordinationBufferForPayloads = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY); - auto payloadBuffer = payloadCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_PAYLOAD_KEY); - - // Creating producer and consumer channels - auto consumer = HiCR::channel::variableSize::SPSC::Consumer(coordinationCommunicationManager, - payloadCommunicationManager, - payloadBuffer /*payload buffer */, - globalSizesBufferSlot, - coordinationBufferForCounts, - coordinationBufferForPayloads, - producerCoordinationBufferForCounts, - producerCoordinationBufferForPayloads, - CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE), - channelCapacity); - ASSERT_TRUE(consumer.isEmpty()); ASSERT_EQ(consumer.getCoordinationDepth(), 0); ASSERT_EQ(consumer.getPayloadDepth(), 0); @@ -182,25 +122,4 @@ void consumerFc(HiCR::MemoryManager &coordinationMemoryManager, // Wait for the producer 5 coordinationCommunicationManager.fence(CHANNEL_TAG); payloadCommunicationManager.fence(CHANNEL_TAG); - - // De-registering global slots - coordinationCommunicationManager.deregisterGlobalMemorySlot(globalSizesBufferSlot); - coordinationCommunicationManager.deregisterGlobalMemorySlot(producerCoordinationBufferForCounts); - coordinationCommunicationManager.deregisterGlobalMemorySlot(producerCoordinationBufferForPayloads); - coordinationCommunicationManager.deregisterGlobalMemorySlot(consumerCoordinationBufferForCounts); - coordinationCommunicationManager.deregisterGlobalMemorySlot(consumerCoordinationBufferForPayloads); - - // Destroying global slots (collective calls) - coordinationCommunicationManager.destroyGlobalMemorySlot(consumerCoordinationBufferForCounts); - coordinationCommunicationManager.destroyGlobalMemorySlot(consumerCoordinationBufferForPayloads); - payloadCommunicationManager.destroyGlobalMemorySlot(payloadBuffer); - - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); - - // Freeing up local memory - payloadMemoryManager.freeLocalMemorySlot(payloadBufferSlot); - coordinationMemoryManager.freeLocalMemorySlot(sizesBufferSlot); - coordinationMemoryManager.freeLocalMemorySlot(coordinationBufferForCounts); - coordinationMemoryManager.freeLocalMemorySlot(coordinationBufferForPayloads); } diff --git a/tests/frontends/channel/variableSize/spsc/source/fillBuffers.cpp b/tests/frontends/channel/variableSize/spsc/source/fillBuffers.cpp index 0383d336..42cae0ea 100644 --- a/tests/frontends/channel/variableSize/spsc/source/fillBuffers.cpp +++ b/tests/frontends/channel/variableSize/spsc/source/fillBuffers.cpp @@ -13,48 +13,7 @@ TEST_F(ChannelFixture, fillBufferCounter) { - // Getting MPI values - int rankCount = 0; - int rankId = 0; - MPI_Comm_rank(MPI_COMM_WORLD, &rankId); - MPI_Comm_size(MPI_COMM_WORLD, &rankCount); - - // Sanity Check - if (rankCount != 2) - { - if (rankId == 0) fprintf(stderr, "Launch error: MPI process count must be equal to 2\n"); - MPI_Finalize(); - } - - // Reading argument - size_t channelCapacity = CHANNEL_CAPACITY; - - // Instantiating backend - HiCR::backend::mpi::MemoryManager m; - HiCR::backend::mpi::CommunicationManager c(MPI_COMM_WORLD); - - // Creating HWloc topology object - hwloc_topology_t topology; - - // Reserving memory for hwloc - hwloc_topology_init(&topology); - - // Initializing host (CPU) topology manager - HiCR::backend::hwloc::TopologyManager dm(&topology); - - // Asking backend to check the available devices - const auto t = dm.queryTopology(); - - // Getting first device found - auto d = *t.getDevices().begin(); - - // Obtaining memory spaces - auto memSpaces = d->getMemorySpaceList(); - - // Getting a reference to the first memory space - auto firstMemorySpace = *memSpaces.begin(); - // Rank 0 is producer, Rank 1 is consumer - if (rankId == 0) producerFc(m, m, c, c, firstMemorySpace, firstMemorySpace, channelCapacity); - if (rankId == 1) consumerFc(m, m, c, c, firstMemorySpace, firstMemorySpace, channelCapacity); + if (_instanceManager->getCurrentInstance()->isRootInstance()) { producerFc(*_memoryManager, *_communicationManager, *_communicationManager, _memorySpace, *_producer); } + else { consumerFc(*_communicationManager, *_communicationManager, *_consumer); } } diff --git a/tests/frontends/channel/variableSize/spsc/source/producer.hpp b/tests/frontends/channel/variableSize/spsc/source/producer.hpp index fbf770c4..edd70172 100644 --- a/tests/frontends/channel/variableSize/spsc/source/producer.hpp +++ b/tests/frontends/channel/variableSize/spsc/source/producer.hpp @@ -23,61 +23,12 @@ #include "common.hpp" -void producerFc(HiCR::MemoryManager &coordinationMemoryManager, - HiCR::MemoryManager &payloadMemoryManager, - HiCR::CommunicationManager &coordinationCommunicationManager, - HiCR::CommunicationManager &payloadCommunicationManager, - std::shared_ptr coordinationMemorySpace, - std::shared_ptr payloadMemorySpace, - const size_t channelCapacity) +void producerFc(HiCR::MemoryManager &payloadMemoryManager, + HiCR::CommunicationManager &coordinationCommunicationManager, + HiCR::CommunicationManager &payloadCommunicationManager, + std::shared_ptr payloadMemorySpace, + HiCR::channel::variableSize::SPSC::Producer &producer) { - // Getting required buffer size - auto coordinationBufferSize = HiCR::channel::variableSize::Base::getCoordinationBufferSize(); - - // Allocating sizes buffer as a local memory slot - auto coordinationBufferForCounts = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize); - - auto coordinationBufferForPayloads = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize); - - auto sizeInfoBuffer = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, sizeof(size_t)); - - // Initializing coordination buffers for message sizes and payloads (sets to zero the counters) - HiCR::channel::variableSize::Base::initializeCoordinationBuffer(coordinationBufferForCounts); - HiCR::channel::variableSize::Base::initializeCoordinationBuffer(coordinationBufferForPayloads); - - // Exchanging local memory slots to become global for them to be used by the remote end - coordinationCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, /* global tag */ - {{PRODUCER_COORDINATION_BUFFER_FOR_SIZES_KEY, coordinationBufferForCounts}, /* key-slot pairs */ - {PRODUCER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY, coordinationBufferForPayloads}}); - - payloadCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, {}); - - // Synchronizing so that all actors have finished registering their global memory slots - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); - - // Obtaining the globally exchanged memory slots - auto sizesBuffer = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, SIZES_BUFFER_KEY); - auto producerCoordinationBufferForCounts = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, PRODUCER_COORDINATION_BUFFER_FOR_SIZES_KEY); - auto producerCoordinationBufferForPayloads = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, PRODUCER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY); - auto consumerCoordinationBufferForCounts = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY); - auto consumerCoordinationBufferForPayloads = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY); - auto payloadBuffer = payloadCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_PAYLOAD_KEY); - - // Creating producer and consumer channels - auto producer = HiCR::channel::variableSize::SPSC::Producer(coordinationCommunicationManager, - payloadCommunicationManager, - sizeInfoBuffer, - payloadBuffer, - sizesBuffer, - coordinationBufferForCounts, - coordinationBufferForPayloads, - consumerCoordinationBufferForCounts, - consumerCoordinationBufferForPayloads, - CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE), - sizeof(ELEMENT_TYPE), - channelCapacity); - ////////////////////// Test begin // Send a buffer big as the buffer channel @@ -146,17 +97,4 @@ void producerFc(HiCR::MemoryManager &coordinationMemoryManager, // Wait for the consumer 5 coordinationCommunicationManager.fence(CHANNEL_TAG); payloadCommunicationManager.fence(CHANNEL_TAG); - - // Destroying global slots (collective calls) - coordinationCommunicationManager.destroyGlobalMemorySlot(sizesBuffer); - coordinationCommunicationManager.destroyGlobalMemorySlot(producerCoordinationBufferForCounts); - coordinationCommunicationManager.destroyGlobalMemorySlot(producerCoordinationBufferForPayloads); - - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); - - // Freeing up local memory - coordinationMemoryManager.freeLocalMemorySlot(coordinationBufferForCounts); - coordinationMemoryManager.freeLocalMemorySlot(coordinationBufferForPayloads); - coordinationMemoryManager.freeLocalMemorySlot(sizeInfoBuffer); } From d784b2a30f62f48a78c9fb3eccda512407eff8cf Mon Sep 17 00:00:00 2001 From: Luca Terracciano Date: Wed, 8 Oct 2025 11:45:12 +0200 Subject: [PATCH 3/5] fix: fix isfull for variable size spsc producer and add docs --- .../channel/variableSize/spsc/consumer.hpp | 13 +- .../channel/variableSize/spsc/producer.hpp | 156 +++++++++--------- 2 files changed, 88 insertions(+), 81 deletions(-) diff --git a/include/hicr/frontends/channel/variableSize/spsc/consumer.hpp b/include/hicr/frontends/channel/variableSize/spsc/consumer.hpp index 715f6f5b..241bcb09 100644 --- a/include/hicr/frontends/channel/variableSize/spsc/consumer.hpp +++ b/include/hicr/frontends/channel/variableSize/spsc/consumer.hpp @@ -49,9 +49,9 @@ class Consumer final : public variableSize::Base * * \param[in] coordinationCommunicationManager The backend's memory manager to facilitate communication between the producer and consumer coordination buffers * \param[in] payloadCommunicationManager The backend's memory manager to facilitate communication between the producer and consumer payload buffers - * \param[in] payloadBuffer The memory slot pertaining to the payload buffer. The producer will push new tokens - * into this buffer, while there is enough space (in bytes). This buffer should be big enough to hold at least the - * largest message of the variable-sized messages to be pushed. + * \param[in] payloadBuffer The memory slot pertaining to the payload buffer. The producer will push messages into this + * buffer, while there is enough space. This buffer should be large enough to hold twice the capacity specified by \ref payloadCapacity argument. + * Half of the buffer is used as excess buffer to avoid internal fragmentation of messages * \param[in] tokenBuffer The memory slot pertaining to the token buffer. This buffer is only used to exchange internal metadata * about the sizes of the individual messages being sent. * \param[in] internalCoordinationBufferForCounts This is a small buffer to hold the internal (local) state of the @@ -62,7 +62,7 @@ class Consumer final : public variableSize::Base * buffer for message counts, used for remote updates on pop() * \param[in] producerCoordinationBufferForPayloads A global reference to the producer channel's internal coordination * buffer for payload sizes (in bytes), used for remote updates on pop() - * \param[in] payloadCapacity The capacity (in bytes) of the buffer for variable-sized messages + * \param[in] payloadCapacity The capacity (in bytes) of the buffer for variable-sized messages. * \param[in] capacity The maximum number of tokens that will be held by this channel * @note: The token size in var-size channels is used only internally, and is passed as having a type size_t (with size sizeof(size_t)) */ @@ -280,7 +280,10 @@ class Consumer final : public variableSize::Base /** * This funciton can be used to quickly check whether the channel is becoming full when trying - * to push an element of a given size + * to push an element of a given size. First thing, we are checking if we can still + * push tokens (i.e., if the coordination buffer has space). Second thing, we are checking the + * payload buffer. If the current depth of the payload and the \ref requiredBufferSize to push + * exceed the channel capacity, the channel is considered full. * * \param[in] requiredBufferSize size of the token to push into the channel * diff --git a/include/hicr/frontends/channel/variableSize/spsc/producer.hpp b/include/hicr/frontends/channel/variableSize/spsc/producer.hpp index 01f157d2..126dfac3 100644 --- a/include/hicr/frontends/channel/variableSize/spsc/producer.hpp +++ b/include/hicr/frontends/channel/variableSize/spsc/producer.hpp @@ -48,7 +48,7 @@ class Producer : public variableSize::Base * \param[in] payloadCommunicationManager The backend's memory manager to facilitate communication between the producer and consumer payload buffers * \param[in] sizeInfoBuffer The local memory slot used to hold the information about the next message size * \param[in] payloadBuffer The global memory slot pertaining to the payload of all messages. The producer will push messages into this - * buffer, while there is enough space. This buffer should be large enough to hold twice the capacity specified by @ref payloadCapacity argument. + * buffer, while there is enough space. This buffer should be large enough to hold twice the capacity specified by \ref payloadCapacity argument. * Half of the buffer is used as excess buffer to avoid internal fragmentation of messages * \param[in] tokenBuffer The memory slot pertaining to the token buffer, which is used to hold message size data. * The producer will push message sizes into this buffer, while there is enough space. This buffer should be large enough to @@ -57,8 +57,8 @@ class Producer : public variableSize::Base * \param[in] internalCoordinationBufferForPayloads This is a small buffer to hold the internal (local) state of the channel's payload sizes (in bytes) * \param[in] consumerCoordinationBufferForCounts A global reference of the consumer's own coordination buffer to check for updates on message counts * \param[in] consumerCoordinationBufferForPayloads A global reference of the consumer's own coordination buffer to check for updates on payload sizes (in bytes) - * \param[in] payloadCapacity capacity in bytes of the buffer for message payloads - * \param[in] payloadSize size in bytes of the datatype used for variable-sized messages + * \param[in] payloadCapacity capacity in bytes of the buffer for message payloads. + * \param[in] payloadSize size in bytes of the datatype used for variable-sized messages. * \param[in] capacity The maximum number of tokens that will be held by this channel */ Producer(CommunicationManager &coordinationCommunicationManager, @@ -89,39 +89,6 @@ class Producer : public variableSize::Base ~Producer() = default; - /** - * Identical to Producer::updateDepth(), but this coordination buffer - * is larger and contains payload information as well as token metadata - */ - __INLINE__ void updateDepth() {} - - /** - * get payload buffer head position - * @return payload buffer head position (in bytes) - */ - [[nodiscard]] __INLINE__ size_t getPayloadHeadPosition() const noexcept { return getCircularBufferForPayloads()->getHeadPosition(); } - - /** - * get the datatype size used for payload buffer - * @return datatype size (in bytes) for payload buffer - */ - __INLINE__ size_t getPayloadSize() { return _payloadSize; } - - /** - * Get payload buffer depth - * - * \return payload buffer depth (in bytes). It is the occupancy of the buffers - * \note Even though there might be space for additional tokens in the payload buffer, it is not guaranteed that - * the push() will succeed due to insufficient space in the coordination buffer - */ - __INLINE__ size_t getPayloadDepth() { return getCircularBufferForPayloads()->getDepth(); } - - /** - * get payload buffer capacity - * @return payload buffer capacity (in bytes) - */ - __INLINE__ size_t getPayloadCapacity() { return getCircularBufferForPayloads()->getCapacity(); } - /** * Puts new variable-sized messages unto the channel. * The implementation consists of two phases. In phase 1, we copy the @@ -149,47 +116,51 @@ class Producer : public variableSize::Base { if (n != 1) HICR_THROW_RUNTIME("HiCR currently has no implementation for n != 1 with push(sourceSlot, n) for variable size version."); - // Make sure source slot is beg enough to satisfy the operation - size_t requiredPayloadBufferSize = sourceSlot->getSize(); - size_t providedPayloadBufferCapacity = getPayloadCapacity(); - - // size_t requiredCoordinationBufferSize = getTokenSize(); - // size_t providedCoordinationBufferCapacity = getPayloadCapacity(); - // Updating depth of token (message sizes) and payload buffers updateDepth(); - auto currentPayloadDepth = getCircularBufferForPayloads()->getDepth(); - auto currentDepth = getCoordinationDepth(); /** * Payload copy. - * 2 possible scenarios: * - * 1) we still have space after the head after the push + * We partition the payload buffer in 2 parts: + * - payload buffer: it is the logical size of the buffer, and the channel will work + * as if a buffer of that size was passed as input + * - excess buffer: extra space needed for the example described below. This can not + * be directly used from the outside, but is used by the channel + * to avoid fragmentation: we currently require the token to be pushed + * and peeked as a contiguous memory region + * + * 2 possible scenarios after the push: + * + * 1) we have space between the head and the end of the buffer * 2) we do not have enough space ahead after the push, but we have it at the beginning of the buffer - * and their sum allows us to push the token. In such case, push to the excess buffer so that we avoid - * breaking the token in 2 chunks. The head is always pointing to a valid position in the buffer + * and their sum (i.e., channel capacity - channel depth) allows us to push the token. + * + * If 2), we push to the excess buffer to avoid breaking the token in 2 chunks. + * The head is always pointing to a valid position in the buffer, meaning that when a push into the + * excess buffer is made, the logical position of the head behaves as if there were no excess buffer (See below). + * An excess buffer with the same size as the payload buffer guarantees that all kind of + * token size will succeed. * * 0 buffer capacity End of excess buffer * TAIL HEAD1 - * |-------|--------|-------|-------------|----------| - * HEAD - * - * In this case HEAD1 indicates where the the token ends in the implementation, while HEAD its logical position - * - * If neither of those is true, we can not push + * |-------|--------|-------|-------------|----------| In this case HEAD1 indicates where the the token + * HEAD ends in the implementation, while HEAD is its logical position + * and observable value * */ - // Check that the token to push is smaller than the buffer capacity - if (requiredPayloadBufferSize > providedPayloadBufferCapacity) - HICR_THROW_RUNTIME("Attempting to push (%lu) bytes, while the channel has a maximum capacity of (%lu)", requiredPayloadBufferSize, providedPayloadBufferCapacity); + // Get bytes required to push the token + size_t requiredPayloadBufferSize = sourceSlot->getSize(); - // Check whether there is enough space in the buffer to push the token - if (currentPayloadDepth + requiredPayloadBufferSize > providedPayloadBufferCapacity) - HICR_THROW_RUNTIME("Attempting to push (%lu) bytes while the channel currently has payload depth (%lu). This would exceed capacity (%lu).\n", - requiredPayloadBufferSize, - currentPayloadDepth, - providedPayloadBufferCapacity); + // Throw exception if the token can not be pushed + if (isFull(requiredPayloadBufferSize) == true) + { + HICR_THROW_RUNTIME("Attempting to push a token while the channel is full.\nChannel depth: %lu capacity: %lu\nPayload depth: %lu capacity: %lu", + getCircularBufferForCounts()->getDepth(), + getCircularBufferForCounts()->getCapacity(), + getCircularBufferForPayloads()->getDepth(), + getCircularBufferForPayloads()->getCapacity()); + } // Get communication managers auto payloadCommunicationManager = getPayloadCommunicationManager(); @@ -216,13 +187,6 @@ class Producer : public variableSize::Base auto *sizeInfoBufferPtr = static_cast(_sizeInfoBuffer->getPointer()); sizeInfoBufferPtr[0] = requiredPayloadBufferSize; - // If the exchange buffer does not have n free slots, reject the operation - if (currentDepth + 1 > getCircularBufferForCounts()->getCapacity()) - HICR_THROW_RUNTIME("Attempting to push with (%lu) tokens while the channel has (%lu) tokens and this would exceed capacity (%lu).\n", - 1, - getCoordinationDepth(), - getCircularBufferForCounts()->getCapacity()); - coordinationCommunicationManager->memcpy(_tokenBuffer, /* destination */ getTokenSize() * getCircularBufferForCounts()->getHeadPosition(), /* dst_offset */ _sizeInfoBuffer, /* source */ @@ -239,11 +203,45 @@ class Producer : public variableSize::Base coordinationCommunicationManager->fence(getCoordinationBufferForCounts(), 1, 0); } + /** + * Identical to Producer::updateDepth(), but this coordination buffer + * is larger and contains payload information as well as token metadata + */ + __INLINE__ void updateDepth() {} + + /** + * get payload buffer head position + * @return payload buffer head position (in bytes) + */ + [[nodiscard]] __INLINE__ size_t getPayloadHeadPosition() const noexcept { return getCircularBufferForPayloads()->getHeadPosition(); } + + /** + * get the datatype size used for payload buffer + * @return datatype size (in bytes) for payload buffer + */ + __INLINE__ size_t getPayloadSize() { return _payloadSize; } + + /** + * Get payload buffer depth + * + * \return payload buffer depth (in bytes). It is the occupancy of the buffers + * \note Even though there might be space for additional tokens in the payload buffer, it is not guaranteed that + * the push() will succeed due to insufficient space in the coordination buffer + */ + __INLINE__ size_t getPayloadDepth() { return getCircularBufferForPayloads()->getDepth(); } + + /** + * get payload buffer capacity + * @return payload buffer capacity (in bytes) + */ + __INLINE__ size_t getPayloadCapacity() { return getCircularBufferForPayloads()->getCapacity(); } + /** * Get depth of the coordination buffer of variable-size producer. * Because the current implementation first receives the payload (phase 1) before - // receiving the message counts (phase 2), returning this depth should guarantee - // we already have received the payloads + * receiving the message counts (phase 2), returning this depth should guarantee + * we already have received the payloads + * * @return The number of elements in the variable-size producer channel * * \note Even though there might be space for additional tokens in the coordination buffer, it is not guaranteed that @@ -264,7 +262,10 @@ class Producer : public variableSize::Base /** * This funciton can be used to quickly check whether the channel is becoming full when trying - * to push an element of a given size + * to push an element of a given size. First thing, we are checking if we can still + * push tokens (i.e., if the coordination buffer has space). Second thing, we are checking the + * payload buffer. If the current depth of the payload and the \ref requiredBufferSize to push + * exceed the channel capacity, the channel is considered full. * * \param[in] requiredBufferSize size of the token to push into the channel * @@ -272,10 +273,13 @@ class Producer : public variableSize::Base */ bool isFull(size_t requiredBufferSize) { + // Check if we can push one more token auto coordinationCircularBuffer = getCircularBufferForCounts(); - if (coordinationCircularBuffer->getDepth() == coordinationCircularBuffer->getCapacity()) return true; + if (coordinationCircularBuffer->getDepth() == coordinationCircularBuffer->getCapacity()) { return true; } + + // Check if there is enough space in the payload buffer. If auto payloadCircularBuffer = getCircularBufferForPayloads(); - if (payloadCircularBuffer->getDepth() + requiredBufferSize == payloadCircularBuffer->getCapacity()) return true; + if (payloadCircularBuffer->getDepth() + requiredBufferSize > payloadCircularBuffer->getCapacity()) { return true; } return false; } From 7c09f5a3c74cb8c0006d0f4b3e8eace2dac1c320 Mon Sep 17 00:00:00 2001 From: Luca Terracciano Date: Wed, 8 Oct 2025 11:45:28 +0200 Subject: [PATCH 4/5] tests: add mpi tests for variable size spsc --- .../spsc/include/channelFixture.hpp | 48 ++--- .../spsc/{source => include}/common.hpp | 0 .../variableSize/spsc/source/consumer.hpp | 125 ------------ .../variableSize/spsc/source/excessBuffer.cpp | 186 ++++++++++++++++++ .../spsc/source/fillBufferMultipleTokens.cpp | 146 ++++++++++++++ .../spsc/source/fillBufferOneToken.cpp | 137 +++++++++++++ .../variableSize/spsc/source/fillBuffers.cpp | 19 -- .../spsc/source/fillCoordination.cpp | 158 +++++++++++++++ .../variableSize/spsc/source/meson.build | 39 ++-- .../variableSize/spsc/source/producer.hpp | 100 ---------- 10 files changed, 672 insertions(+), 286 deletions(-) rename tests/frontends/channel/variableSize/spsc/{source => include}/common.hpp (100%) delete mode 100644 tests/frontends/channel/variableSize/spsc/source/consumer.hpp create mode 100644 tests/frontends/channel/variableSize/spsc/source/excessBuffer.cpp create mode 100644 tests/frontends/channel/variableSize/spsc/source/fillBufferMultipleTokens.cpp create mode 100644 tests/frontends/channel/variableSize/spsc/source/fillBufferOneToken.cpp delete mode 100644 tests/frontends/channel/variableSize/spsc/source/fillBuffers.cpp create mode 100644 tests/frontends/channel/variableSize/spsc/source/fillCoordination.cpp delete mode 100644 tests/frontends/channel/variableSize/spsc/source/producer.hpp diff --git a/tests/frontends/channel/variableSize/spsc/include/channelFixture.hpp b/tests/frontends/channel/variableSize/spsc/include/channelFixture.hpp index 23ddc5e0..39490226 100644 --- a/tests/frontends/channel/variableSize/spsc/include/channelFixture.hpp +++ b/tests/frontends/channel/variableSize/spsc/include/channelFixture.hpp @@ -167,51 +167,45 @@ class ChannelFixture : public ::testing::Test channelCapacity); } - std::unique_ptr _communicationManager; - std::unique_ptr _instanceManager; - std::unique_ptr _memoryManager; - std::unique_ptr _topologyManager; - std::unique_ptr _computeManager; + std::unique_ptr communicationManager; + std::unique_ptr instanceManager; + std::unique_ptr memoryManager; + std::unique_ptr topologyManager; + std::unique_ptr computeManager; - std::unique_ptr _consumer; - std::unique_ptr _producer; + std::unique_ptr consumer; + std::unique_ptr producer; - std::shared_ptr _memorySpace; + std::shared_ptr memorySpace; protected: void SetUp() override { - _instanceManager = std::make_unique(MPI_COMM_WORLD); + instanceManager = std::make_unique(MPI_COMM_WORLD); // Sanity Check - if (_instanceManager->getInstances().size() != 2) + if (instanceManager->getInstances().size() != 2) { - if (_instanceManager->getCurrentInstance()->isRootInstance()) fprintf(stderr, "Launch error: MPI process count must be equal to 2\n"); + if (instanceManager->getCurrentInstance()->isRootInstance()) fprintf(stderr, "Launch error: MPI process count must be equal to 2\n"); MPI_Finalize(); } - _communicationManager = std::make_unique(MPI_COMM_WORLD); - _memoryManager = std::make_unique(); - _computeManager = std::make_unique(); - _topologyManager = HiCR::backend::hwloc::TopologyManager::createDefault(); + communicationManager = std::make_unique(MPI_COMM_WORLD); + memoryManager = std::make_unique(); + computeManager = std::make_unique(); + topologyManager = HiCR::backend::hwloc::TopologyManager::createDefault(); - _topology = _topologyManager->queryTopology(); - _memorySpace = _topology.getDevices().begin().operator*()->getMemorySpaceList().begin().operator*(); - - if (_instanceManager->getCurrentInstance()->isRootInstance()) - { - _producer = createProducer(*_memoryManager, *_memoryManager, *_communicationManager, *_communicationManager, _memorySpace, _memorySpace, CHANNEL_CAPACITY); - } - else { _consumer = createConsumer(*_memoryManager, *_memoryManager, *_communicationManager, *_communicationManager, _memorySpace, _memorySpace, CHANNEL_CAPACITY); } + _topology = topologyManager->queryTopology(); + memorySpace = _topology.getDevices().begin().operator*()->getMemorySpaceList().begin().operator*(); } void TearDown() override { - for (auto &g : _globalSlots) { _communicationManager->deregisterGlobalMemorySlot(g); } - for (auto &g : _globalSlotsToDestroy) { _communicationManager->destroyGlobalMemorySlot(g); } - _communicationManager->fence(CHANNEL_TAG); - for (auto &l : _localSlots) { _memoryManager->freeLocalMemorySlot(l); } + for (auto &g : _globalSlots) { communicationManager->deregisterGlobalMemorySlot(g); } + for (auto &g : _globalSlotsToDestroy) { communicationManager->destroyGlobalMemorySlot(g); } + communicationManager->fence(CHANNEL_TAG); + for (auto &l : _localSlots) { memoryManager->freeLocalMemorySlot(l); } } private: diff --git a/tests/frontends/channel/variableSize/spsc/source/common.hpp b/tests/frontends/channel/variableSize/spsc/include/common.hpp similarity index 100% rename from tests/frontends/channel/variableSize/spsc/source/common.hpp rename to tests/frontends/channel/variableSize/spsc/include/common.hpp diff --git a/tests/frontends/channel/variableSize/spsc/source/consumer.hpp b/tests/frontends/channel/variableSize/spsc/source/consumer.hpp deleted file mode 100644 index b21544ca..00000000 --- a/tests/frontends/channel/variableSize/spsc/source/consumer.hpp +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright 2025 Huawei Technologies Co., Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include -#include -#include "common.hpp" - -__INLINE__ std::shared_ptr peek(HiCR::channel::variableSize::SPSC::Consumer &consumerInterface, - HiCR::MemoryManager &memoryManager, - std::shared_ptr &memorySpace) -{ - // If the buffer is full, returning false - while (consumerInterface.isEmpty()) {} - - // Pushing buffer - auto result = consumerInterface.peek(); - - // Getting absolute pointer to the token - size_t tokenPos = result[0]; - size_t tokenSize = result[1]; - auto tokenBuffer = (uint8_t *)consumerInterface.getPayloadBufferMemorySlot()->getSourceLocalMemorySlot()->getPointer(); - void *tokenPtr = &tokenBuffer[tokenPos]; - - // Register and return the memory slot - return memoryManager.registerLocalMemorySlot(memorySpace, tokenPtr, tokenSize); -} - -void consumerFc(HiCR::CommunicationManager &coordinationCommunicationManager, - HiCR::CommunicationManager &payloadCommunicationManager, - HiCR::channel::variableSize::SPSC::Consumer &consumer) -{ - ASSERT_TRUE(consumer.isEmpty()); - ASSERT_EQ(consumer.getCoordinationDepth(), 0); - ASSERT_EQ(consumer.getPayloadDepth(), 0); - ASSERT_FALSE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE))); - ASSERT_TRUE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE) + 1)); - - // Wait for producer 1 - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); - - // Wait for the producer 2 - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); - - consumer.updateDepth(); - ASSERT_EQ(consumer.getCoordinationDepth(), 1); - ASSERT_EQ(consumer.getPayloadDepth(), CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); - - // Peek - auto res = consumer.peek(); - ASSERT_EQ(res[0], 0); - ASSERT_EQ(res[1], CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); - - auto tokenBuffer = (uint8_t *)consumer.getPayloadBufferMemorySlot()->getSourceLocalMemorySlot()->getPointer(); - void *tokenPtr = &tokenBuffer[res[0]]; - for (ELEMENT_TYPE i = 0; i < (res[1] / sizeof(ELEMENT_TYPE)); ++i) { ASSERT_EQ(i, static_cast(tokenPtr)[i]); } - - consumer.pop(); - ASSERT_TRUE(consumer.isEmpty()); - ASSERT_EQ(consumer.getCoordinationDepth(), 0); - ASSERT_EQ(consumer.getPayloadDepth(), 0); - - // Wait for the producer 3 - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); - - // Send token one by one - for (size_t i = 0; i < CHANNEL_CAPACITY; ++i) - { - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); - - ASSERT_EQ(consumer.getCoordinationDepth(), i + 1); - ASSERT_EQ(consumer.getPayloadDepth(), (i + 1) * sizeof(ELEMENT_TYPE)); - } - - ASSERT_TRUE(consumer.isFull(0)); - - // Wait for the producer 4 - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); - - // Pop token one by one - auto peekIndex = 0; - for (size_t i = CHANNEL_CAPACITY; i > 0; --i) - { - ASSERT_EQ(consumer.getCoordinationDepth(), i); - ASSERT_EQ(consumer.getPayloadDepth(), i * sizeof(ELEMENT_TYPE)); - auto res = consumer.peek(); - ASSERT_EQ(res[0], peekIndex++ * sizeof(ELEMENT_TYPE)); - ASSERT_EQ(res[1], sizeof(ELEMENT_TYPE)); - - auto tokenBuffer = (uint8_t *)consumer.getPayloadBufferMemorySlot()->getSourceLocalMemorySlot()->getPointer(); - void *tokenPtr = &tokenBuffer[res[0]]; - auto token = static_cast(tokenPtr)[0]; - for (ELEMENT_TYPE i = 0; i < CHANNEL_CAPACITY; ++i) { ASSERT_EQ(0, token); } - consumer.pop(); - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); - ASSERT_EQ(consumer.getCoordinationDepth(), i - 1); - ASSERT_EQ(consumer.getPayloadDepth(), (i - 1) * sizeof(ELEMENT_TYPE)); - } - - // Wait for the producer 5 - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); -} diff --git a/tests/frontends/channel/variableSize/spsc/source/excessBuffer.cpp b/tests/frontends/channel/variableSize/spsc/source/excessBuffer.cpp new file mode 100644 index 00000000..be717675 --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/source/excessBuffer.cpp @@ -0,0 +1,186 @@ +#include +#include +#include + +#include +#include +#include +#include + +#include "../include/channelFixture.hpp" + +void producerFc(ChannelFixture &fixture) +{ + // Create producer and pick managers from the fixture + fixture.producer = fixture.createProducer( + *fixture.memoryManager, *fixture.memoryManager, *fixture.communicationManager, *fixture.communicationManager, fixture.memorySpace, fixture.memorySpace, CHANNEL_CAPACITY); + + auto &producer = *fixture.producer; + auto &payloadMemoryManager = *fixture.memoryManager; + auto &coordinationCommunicationManager = *fixture.communicationManager; + auto &payloadCommunicationManager = *fixture.communicationManager; + auto payloadMemorySpace = fixture.memorySpace; + + ////////////////////// Test begin + + // Check payload capacity, that buffer is empty, an thus not full + ASSERT_EQ(producer.getPayloadCapacity(), CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); + producer.updateDepth(); + ASSERT_EQ(producer.getCoordinationDepth(), 0); + ASSERT_EQ(producer.getPayloadDepth(), 0); + ASSERT_TRUE(producer.isEmpty()); + ASSERT_FALSE(producer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE))); + ASSERT_TRUE(producer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE) + 1)); + + // Send a buffer big as the buffer channel + ELEMENT_TYPE sendBuffer[CHANNEL_CAPACITY - 1] = {0, 1, 2, 3}; + auto sendBufferPtr = &sendBuffer; + auto sendSlot = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBufferPtr, sizeof(sendBuffer)); + + // Wait for the consumer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Push the slot + EXPECT_NO_THROW(producer.push(sendSlot)); + + // Check that the channel can accept one more element + ASSERT_FALSE(producer.isFull(sizeof(ELEMENT_TYPE))); + ASSERT_TRUE(producer.isFull(2 * sizeof(ELEMENT_TYPE))); + ASSERT_FALSE(producer.isEmpty()); + + // Check there is only one token, and the payload depth is equal to the capacity of the buffer minus 1 element + producer.updateDepth(); + ASSERT_EQ(producer.getCoordinationDepth(), 1); + ASSERT_EQ(producer.getPayloadDepth(), (CHANNEL_CAPACITY - 1) * sizeof(ELEMENT_TYPE)); + + // Check that trying to push another element throws exception since the channel does not have enough space + EXPECT_THROW(producer.push(sendSlot), HiCR::RuntimeException); + + // Wait for the consumer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Let the consumer pop + + // Wait for the consumer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Now that consumer has popped it should succeed, and push to the excess buffer + EXPECT_NO_THROW(producer.push(sendSlot)); + + // Wait for the consumer 4 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Let the consumer do its part of the test + + // Wait for the consumer 5 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); +} + +void consumerFc(ChannelFixture &fixture) +{ + // Create producer and pick managers from the fixture + fixture.consumer = fixture.createConsumer( + *fixture.memoryManager, *fixture.memoryManager, *fixture.communicationManager, *fixture.communicationManager, fixture.memorySpace, fixture.memorySpace, CHANNEL_CAPACITY); + + auto &consumer = *fixture.consumer; + auto &coordinationCommunicationManager = *fixture.communicationManager; + auto &payloadCommunicationManager = *fixture.communicationManager; + + ////////////////////// Test begin + + // Check payload capacity, that buffer is empty, an thus not full + consumer.updateDepth(); + ASSERT_EQ(consumer.getCoordinationDepth(), 0); + ASSERT_EQ(consumer.getPayloadDepth(), 0); + ASSERT_TRUE(consumer.isEmpty()); + ASSERT_FALSE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE))); + ASSERT_TRUE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE) + 1)); + + // Wait for producer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Let the producer do its part of the test + + // Wait for the producer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // After the push, check there is one token and payload buffer is full + consumer.updateDepth(); + ASSERT_EQ(consumer.getCoordinationDepth(), 1); + ASSERT_EQ(consumer.getPayloadDepth(), (CHANNEL_CAPACITY - 1) * sizeof(ELEMENT_TYPE)); + ASSERT_FALSE(consumer.isEmpty()); + // Check that we still have space to push 1 token + ASSERT_FALSE(consumer.isFull(sizeof(ELEMENT_TYPE))); + ASSERT_TRUE(consumer.isFull(2 * sizeof(ELEMENT_TYPE))); + + // Peek and check the token data are correct + auto res = consumer.peek(); + ASSERT_EQ(res[0], 0); + ASSERT_EQ(res[1], (CHANNEL_CAPACITY - 1) * sizeof(ELEMENT_TYPE)); + + // Check the vectory elements corresponds to the ground truth + auto tokenBuffer = (uint8_t *)consumer.getPayloadBufferMemorySlot()->getSourceLocalMemorySlot()->getPointer(); + void *tokenPtr = &tokenBuffer[res[0]]; + for (ELEMENT_TYPE i = 0; i < (res[1] / sizeof(ELEMENT_TYPE)); ++i) { ASSERT_EQ(i, static_cast(tokenPtr)[i]); } + + // Pop and check that the channel is empty, the depth are updated + consumer.pop(); + ASSERT_TRUE(consumer.isEmpty()); + ASSERT_FALSE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE))); + ASSERT_EQ(consumer.getCoordinationDepth(), 0); + ASSERT_EQ(consumer.getPayloadDepth(), 0); + + // Wait for the producer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Let the producer push again + + // Wait for the producer 4 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Wait for the producer 5 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // After the push, check there is one token + consumer.updateDepth(); + ASSERT_EQ(consumer.getCoordinationDepth(), 1); + ASSERT_EQ(consumer.getPayloadDepth(), (CHANNEL_CAPACITY - 1) * sizeof(ELEMENT_TYPE)); + ASSERT_FALSE(consumer.isEmpty()); + // Check that we still have space to push 1 token + ASSERT_FALSE(consumer.isFull(sizeof(ELEMENT_TYPE))); + ASSERT_TRUE(consumer.isFull(2 * sizeof(ELEMENT_TYPE))); + + // Peek and check the token data are correct + res = consumer.peek(); + ASSERT_EQ(res[0], (CHANNEL_CAPACITY - 1) * sizeof(ELEMENT_TYPE)); + ASSERT_EQ(res[1], (CHANNEL_CAPACITY - 1) * sizeof(ELEMENT_TYPE)); + + // Check the vectory elements corresponds to the ground truth + tokenBuffer = (uint8_t *)consumer.getPayloadBufferMemorySlot()->getSourceLocalMemorySlot()->getPointer(); + tokenPtr = &tokenBuffer[res[0]]; + for (ELEMENT_TYPE i = 0; i < (res[1] / sizeof(ELEMENT_TYPE)); ++i) { ASSERT_EQ(i, static_cast(tokenPtr)[i]); } + + // Pop and check that the channel is empty, the depth are updated + consumer.pop(); + ASSERT_TRUE(consumer.isEmpty()); + ASSERT_FALSE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE))); + ASSERT_EQ(consumer.getCoordinationDepth(), 0); + ASSERT_EQ(consumer.getPayloadDepth(), 0); +} + +TEST_F(ChannelFixture, UseExcessBuffer) +{ + // Rank 0 is producer, Rank 1 is consumer + if (instanceManager->getCurrentInstance()->isRootInstance()) { producerFc(*this); } + else { consumerFc(*this); } +} diff --git a/tests/frontends/channel/variableSize/spsc/source/fillBufferMultipleTokens.cpp b/tests/frontends/channel/variableSize/spsc/source/fillBufferMultipleTokens.cpp new file mode 100644 index 00000000..6334b8b8 --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/source/fillBufferMultipleTokens.cpp @@ -0,0 +1,146 @@ +#include +#include +#include + +#include +#include +#include +#include + +#include "../include/channelFixture.hpp" + +void producerFc(ChannelFixture &fixture) +{ + // Create producer and pick managers from the fixture + fixture.producer = fixture.createProducer( + *fixture.memoryManager, *fixture.memoryManager, *fixture.communicationManager, *fixture.communicationManager, fixture.memorySpace, fixture.memorySpace, CHANNEL_CAPACITY); + + auto &producer = *fixture.producer; + auto &payloadMemoryManager = *fixture.memoryManager; + auto &coordinationCommunicationManager = *fixture.communicationManager; + auto &payloadCommunicationManager = *fixture.communicationManager; + auto payloadMemorySpace = fixture.memorySpace; + + ////////////////////// Test begin + + // Wait for the consumer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Send token one by one + for (size_t i = 0; i < CHANNEL_CAPACITY; ++i) + { + // Prepare slot to send + ELEMENT_TYPE sendBuffer[1] = {static_cast(i)}; + auto sendBufferPtr = &sendBuffer; + auto sendSlot = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBufferPtr, sizeof(sendBuffer)); + + // Push and check depths are increased + ASSERT_EQ(producer.getCoordinationDepth(), i); + ASSERT_EQ(producer.getPayloadDepth(), i * sizeof(ELEMENT_TYPE)); + ASSERT_NO_THROW(producer.push(sendSlot)); + + // Fence to synchronize with the consumer + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + ASSERT_EQ(producer.getCoordinationDepth(), i + 1); + ASSERT_EQ(producer.getPayloadDepth(), (i + 1) * sizeof(ELEMENT_TYPE)); + } + + // Check the channel is full + ASSERT_TRUE(producer.isFull(1)); + + // Wait for the consumer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // The consumer is popping, check the depths are updated + for (size_t i = CHANNEL_CAPACITY; i > 0; --i) + { + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + ASSERT_EQ(producer.getCoordinationDepth(), i - 1); + ASSERT_EQ(producer.getPayloadDepth(), (i - 1) * sizeof(ELEMENT_TYPE)); + } + + // Wait for the consumer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); +} + +void consumerFc(ChannelFixture &fixture) +{ + // Create producer and pick managers from the fixture + fixture.consumer = fixture.createConsumer( + *fixture.memoryManager, *fixture.memoryManager, *fixture.communicationManager, *fixture.communicationManager, fixture.memorySpace, fixture.memorySpace, CHANNEL_CAPACITY); + + auto &consumer = *fixture.consumer; + auto &coordinationCommunicationManager = *fixture.communicationManager; + auto &payloadCommunicationManager = *fixture.communicationManager; + + ////////////////////// Test begin + + // Wait for producer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // The producer is pushing, check depths are increasing + for (size_t i = 0; i < CHANNEL_CAPACITY; ++i) + { + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + ASSERT_EQ(consumer.getCoordinationDepth(), i + 1); + ASSERT_EQ(consumer.getPayloadDepth(), (i + 1) * sizeof(ELEMENT_TYPE)); + } + + // Check channel is full + ASSERT_TRUE(consumer.isFull(1)); + + // Wait for the producer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Pop token one by one + auto peekIndex = 0; + for (size_t i = CHANNEL_CAPACITY; i > 0; --i) + { + // Check depths are decreasing + ASSERT_EQ(consumer.getCoordinationDepth(), i); + ASSERT_EQ(consumer.getPayloadDepth(), i * sizeof(ELEMENT_TYPE)); + + // Peek and check data are correct + auto res = consumer.peek(); + ASSERT_EQ(res[0], peekIndex * sizeof(ELEMENT_TYPE)); + ASSERT_EQ(res[1], sizeof(ELEMENT_TYPE)); + + auto tokenBuffer = (uint8_t *)consumer.getPayloadBufferMemorySlot()->getSourceLocalMemorySlot()->getPointer(); + void *tokenPtr = &tokenBuffer[res[0]]; + auto token = static_cast(tokenPtr)[0]; + // In this specific test, the value pushed coincides with the index in the channel + ASSERT_EQ(peekIndex, token); + + // Pop + consumer.pop(); + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + ASSERT_EQ(consumer.getCoordinationDepth(), i - 1); + ASSERT_EQ(consumer.getPayloadDepth(), (i - 1) * sizeof(ELEMENT_TYPE)); + + // Update peek index + peekIndex++; + } + + // Wait for the producer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); +} + +TEST_F(ChannelFixture, FillBufferWithMultipleTokens) +{ + // Rank 0 is producer, Rank 1 is consumer + if (instanceManager->getCurrentInstance()->isRootInstance()) { producerFc(*this); } + else { consumerFc(*this); } +} diff --git a/tests/frontends/channel/variableSize/spsc/source/fillBufferOneToken.cpp b/tests/frontends/channel/variableSize/spsc/source/fillBufferOneToken.cpp new file mode 100644 index 00000000..e830a4f1 --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/source/fillBufferOneToken.cpp @@ -0,0 +1,137 @@ +#include +#include +#include + +#include +#include +#include +#include + +#include "../include/channelFixture.hpp" + +void producerFc(ChannelFixture &fixture) +{ + // Create producer and pick managers from the fixture + fixture.producer = fixture.createProducer( + *fixture.memoryManager, *fixture.memoryManager, *fixture.communicationManager, *fixture.communicationManager, fixture.memorySpace, fixture.memorySpace, CHANNEL_CAPACITY); + + auto &producer = *fixture.producer; + auto &payloadMemoryManager = *fixture.memoryManager; + auto &coordinationCommunicationManager = *fixture.communicationManager; + auto &payloadCommunicationManager = *fixture.communicationManager; + auto payloadMemorySpace = fixture.memorySpace; + + ////////////////////// Test begin + + // Check payload capacity, that buffer is empty, an thus not full + ASSERT_EQ(producer.getPayloadCapacity(), CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); + producer.updateDepth(); + ASSERT_EQ(producer.getCoordinationDepth(), 0); + ASSERT_EQ(producer.getPayloadDepth(), 0); + ASSERT_TRUE(producer.isEmpty()); + ASSERT_FALSE(producer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE))); + ASSERT_TRUE(producer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE) + 1)); + + // Send a buffer big as the buffer channel + ELEMENT_TYPE sendBuffer[CHANNEL_CAPACITY] = {0, 1, 2, 3, 4}; + auto sendBufferPtr = &sendBuffer; + auto sendSlot = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBufferPtr, sizeof(sendBuffer)); + + // Wait for the consumer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Push the slot + EXPECT_NO_THROW(producer.push(sendSlot)); + + // Check that the channel is full + ASSERT_TRUE(producer.isFull(1)); + ASSERT_FALSE(producer.isEmpty()); + + // Check there is only one token, but the payload depth is equal to the capacity of the buffer + producer.updateDepth(); + ASSERT_EQ(producer.getCoordinationDepth(), 1); + ASSERT_EQ(producer.getPayloadDepth(), producer.getPayloadCapacity()); + + // Check that trying to push another element throws exception + ELEMENT_TYPE sendBuffer2[1] = {5}; + auto sendBufferPtr2 = &sendBuffer2; + auto sendSlot2 = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBufferPtr2, sizeof(sendBuffer2)); + EXPECT_THROW(producer.push(sendSlot2), HiCR::RuntimeException); + + // Wait for the consumer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Let the consumer do its part of the test + + // Wait for the consumer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); +} + +void consumerFc(ChannelFixture &fixture) +{ + // Create producer and pick managers from the fixture + fixture.consumer = fixture.createConsumer( + *fixture.memoryManager, *fixture.memoryManager, *fixture.communicationManager, *fixture.communicationManager, fixture.memorySpace, fixture.memorySpace, CHANNEL_CAPACITY); + + auto &consumer = *fixture.consumer; + auto &coordinationCommunicationManager = *fixture.communicationManager; + auto &payloadCommunicationManager = *fixture.communicationManager; + + ////////////////////// Test begin + + // Check payload capacity, that buffer is empty, an thus not full + consumer.updateDepth(); + ASSERT_EQ(consumer.getCoordinationDepth(), 0); + ASSERT_EQ(consumer.getPayloadDepth(), 0); + ASSERT_TRUE(consumer.isEmpty()); + ASSERT_FALSE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE))); + ASSERT_TRUE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE) + 1)); + + // Wait for producer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Let the producer do its part of the test + + // Wait for the producer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // After the push, check there is one token and payload buffer is full + consumer.updateDepth(); + ASSERT_EQ(consumer.getCoordinationDepth(), 1); + ASSERT_EQ(consumer.getPayloadDepth(), CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); + ASSERT_FALSE(consumer.isEmpty()); + ASSERT_TRUE(consumer.isFull(1)); + + // Peek and check the token data are correct + auto res = consumer.peek(); + ASSERT_EQ(res[0], 0); + ASSERT_EQ(res[1], CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); + + // Check the vectory elements corresponds to the ground truth + auto tokenBuffer = (uint8_t *)consumer.getPayloadBufferMemorySlot()->getSourceLocalMemorySlot()->getPointer(); + void *tokenPtr = &tokenBuffer[res[0]]; + for (ELEMENT_TYPE i = 0; i < (res[1] / sizeof(ELEMENT_TYPE)); ++i) { ASSERT_EQ(i, static_cast(tokenPtr)[i]); } + + // Pop and check that the channel is empty, the depth are updated + consumer.pop(); + ASSERT_TRUE(consumer.isEmpty()); + ASSERT_FALSE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE))); + ASSERT_EQ(consumer.getCoordinationDepth(), 0); + ASSERT_EQ(consumer.getPayloadDepth(), 0); + + // Wait for the producer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); +} + +TEST_F(ChannelFixture, FillBufferWithOneToken) +{ + // Rank 0 is producer, Rank 1 is consumer + if (instanceManager->getCurrentInstance()->isRootInstance()) { producerFc(*this); } + else { consumerFc(*this); } +} diff --git a/tests/frontends/channel/variableSize/spsc/source/fillBuffers.cpp b/tests/frontends/channel/variableSize/spsc/source/fillBuffers.cpp deleted file mode 100644 index 42cae0ea..00000000 --- a/tests/frontends/channel/variableSize/spsc/source/fillBuffers.cpp +++ /dev/null @@ -1,19 +0,0 @@ -#include -#include -#include - -#include -#include -#include -#include - -#include "channelFixture.hpp" -#include "producer.hpp" -#include "consumer.hpp" - -TEST_F(ChannelFixture, fillBufferCounter) -{ - // Rank 0 is producer, Rank 1 is consumer - if (_instanceManager->getCurrentInstance()->isRootInstance()) { producerFc(*_memoryManager, *_communicationManager, *_communicationManager, _memorySpace, *_producer); } - else { consumerFc(*_communicationManager, *_communicationManager, *_consumer); } -} diff --git a/tests/frontends/channel/variableSize/spsc/source/fillCoordination.cpp b/tests/frontends/channel/variableSize/spsc/source/fillCoordination.cpp new file mode 100644 index 00000000..1c85bf9a --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/source/fillCoordination.cpp @@ -0,0 +1,158 @@ +#include +#include +#include + +#include +#include +#include +#include + +#include "../include/channelFixture.hpp" + +void producerFc(ChannelFixture &fixture) +{ + // Create producer and pick managers from the fixture + fixture.producer = fixture.createProducer( + *fixture.memoryManager, *fixture.memoryManager, *fixture.communicationManager, *fixture.communicationManager, fixture.memorySpace, fixture.memorySpace, CHANNEL_CAPACITY - 1); + + auto &producer = *fixture.producer; + auto &payloadMemoryManager = *fixture.memoryManager; + auto &coordinationCommunicationManager = *fixture.communicationManager; + auto &payloadCommunicationManager = *fixture.communicationManager; + auto payloadMemorySpace = fixture.memorySpace; + + ////////////////////// Test begin + + // Here the channel has been created with a smaller capacity. Hence we should fail the last push, + // even though the payload buffer has enough space to hold one more token + + // Wait for the consumer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Send token one by one + for (size_t i = 0; i < CHANNEL_CAPACITY - 1; ++i) + { + // Prepare slot to send + ELEMENT_TYPE sendBuffer[1] = {static_cast(i)}; + auto sendBufferPtr = &sendBuffer; + auto sendSlot = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBufferPtr, sizeof(sendBuffer)); + + // Push and check depths are increased + ASSERT_EQ(producer.getCoordinationDepth(), i); + ASSERT_EQ(producer.getPayloadDepth(), i * sizeof(ELEMENT_TYPE)); + ASSERT_NO_THROW(producer.push(sendSlot)); + + // Fence to synchronize with the consumer + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + ASSERT_EQ(producer.getCoordinationDepth(), i + 1); + ASSERT_EQ(producer.getPayloadDepth(), (i + 1) * sizeof(ELEMENT_TYPE)); + } + + // This is the last push, and should fail + // Prepare slot to send + ELEMENT_TYPE sendBuffer[1] = {static_cast(10)}; + auto sendBufferPtr = &sendBuffer; + auto sendSlot = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBufferPtr, sizeof(sendBuffer)); + + // Push and check it fails + ASSERT_THROW(producer.push(sendSlot), HiCR::RuntimeException); + + // Check the channel is full + ASSERT_TRUE(producer.isFull(1)); + + // Wait for the consumer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // The consumer is popping, check the depths are updated + for (size_t i = CHANNEL_CAPACITY - 1; i > 0; --i) + { + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + ASSERT_EQ(producer.getCoordinationDepth(), i - 1); + ASSERT_EQ(producer.getPayloadDepth(), (i - 1) * sizeof(ELEMENT_TYPE)); + } + + // Wait for the consumer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); +} + +void consumerFc(ChannelFixture &fixture) +{ + // Create producer and pick managers from the fixture + fixture.consumer = fixture.createConsumer( + *fixture.memoryManager, *fixture.memoryManager, *fixture.communicationManager, *fixture.communicationManager, fixture.memorySpace, fixture.memorySpace, CHANNEL_CAPACITY - 1); + + auto &consumer = *fixture.consumer; + auto &coordinationCommunicationManager = *fixture.communicationManager; + auto &payloadCommunicationManager = *fixture.communicationManager; + + ////////////////////// Test begin + + // Wait for producer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // The producer is pushing, check depths are increasing + for (size_t i = 0; i < CHANNEL_CAPACITY - 1; ++i) + { + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + ASSERT_EQ(consumer.getCoordinationDepth(), i + 1); + ASSERT_EQ(consumer.getPayloadDepth(), (i + 1) * sizeof(ELEMENT_TYPE)); + } + + // Check channel is full + ASSERT_TRUE(consumer.isFull(1)); + + // Wait for the producer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Pop token one by one + auto peekIndex = 0; + for (size_t i = CHANNEL_CAPACITY - 1; i > 0; --i) + { + // Check depths are decreasing + ASSERT_EQ(consumer.getCoordinationDepth(), i); + ASSERT_EQ(consumer.getPayloadDepth(), i * sizeof(ELEMENT_TYPE)); + + // Peek and check data are correct + auto res = consumer.peek(); + ASSERT_EQ(res[0], peekIndex * sizeof(ELEMENT_TYPE)); + ASSERT_EQ(res[1], sizeof(ELEMENT_TYPE)); + + auto tokenBuffer = (uint8_t *)consumer.getPayloadBufferMemorySlot()->getSourceLocalMemorySlot()->getPointer(); + void *tokenPtr = &tokenBuffer[res[0]]; + auto token = static_cast(tokenPtr)[0]; + // In this specific test, the value pushed coincides with the index in the channel + ASSERT_EQ(peekIndex, token); + + // Pop + consumer.pop(); + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + ASSERT_EQ(consumer.getCoordinationDepth(), i - 1); + ASSERT_EQ(consumer.getPayloadDepth(), (i - 1) * sizeof(ELEMENT_TYPE)); + + // Update peek index + peekIndex++; + } + + // Wait for the producer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); +} + +TEST_F(ChannelFixture, FillCoordinationBuffer) +{ + // Rank 0 is producer, Rank 1 is consumer + if (instanceManager->getCurrentInstance()->isRootInstance()) { producerFc(*this); } + else { consumerFc(*this); } +} diff --git a/tests/frontends/channel/variableSize/spsc/source/meson.build b/tests/frontends/channel/variableSize/spsc/source/meson.build index c91ae60d..e4166982 100644 --- a/tests/frontends/channel/variableSize/spsc/source/meson.build +++ b/tests/frontends/channel/variableSize/spsc/source/meson.build @@ -1,21 +1,30 @@ testSuite = ['tests', 'channels', 'variableSize', 'spsc', 'distributed'] test_timeout = 60 -if 'mpi' in enabledBackends and 'hwloc' in enabledBackends - mpi = executable( - 'fillBuffers', - ['fillBuffers.cpp'], - dependencies: channelsTestMPIDep, - include_directories: [exampleBuildIncludes], - ) +sourceList = [ + 'fillBufferOneToken', + 'fillBufferMultipleTokens', + 'fillCoordination', + 'excessBuffer', +] - if get_option('buildTests') - test( - 'fillBuffers', - mpirunExecutable, - args: ['-np', '2', '--oversubscribe', mpi.full_path()], - timeout: test_timeout, - suite: testSuite, +foreach source : sourceList + if 'mpi' in enabledBackends and 'hwloc' in enabledBackends + mpi = executable( + source, + [source + '.cpp'], + dependencies: channelsTestMPIDep, + include_directories: [exampleBuildIncludes], ) + + if get_option('buildTests') + test( + source, + mpirunExecutable, + args: ['-np', '2', '--oversubscribe', mpi.full_path()], + timeout: test_timeout, + suite: testSuite, + ) + endif endif -endif \ No newline at end of file +endforeach \ No newline at end of file diff --git a/tests/frontends/channel/variableSize/spsc/source/producer.hpp b/tests/frontends/channel/variableSize/spsc/source/producer.hpp deleted file mode 100644 index edd70172..00000000 --- a/tests/frontends/channel/variableSize/spsc/source/producer.hpp +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright 2025 Huawei Technologies Co., Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include -#include - -#include "common.hpp" - -void producerFc(HiCR::MemoryManager &payloadMemoryManager, - HiCR::CommunicationManager &coordinationCommunicationManager, - HiCR::CommunicationManager &payloadCommunicationManager, - std::shared_ptr payloadMemorySpace, - HiCR::channel::variableSize::SPSC::Producer &producer) -{ - ////////////////////// Test begin - - // Send a buffer big as the buffer channel - ELEMENT_TYPE sendBuffer[CHANNEL_CAPACITY] = {0, 1, 2, 3, 4}; - ELEMENT_TYPE sendBuffer2[1] = {0}; - auto sendBufferPtr = &sendBuffer; - auto sendBuffer2Ptr = &sendBuffer; - auto sendSlot = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBufferPtr, sizeof(sendBuffer)); - auto sendSlot2 = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBuffer2Ptr, sizeof(sendBuffer2)); - - // Wait for the consumer 1 - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); - - // Pushing first batch that fills the channel - ASSERT_EQ(producer.getPayloadCapacity(), CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); - ASSERT_TRUE(producer.isEmpty()); - ASSERT_FALSE(producer.isFull(0)); - producer.updateDepth(); - ASSERT_EQ(producer.getCoordinationDepth(), 0); - ASSERT_EQ(producer.getPayloadDepth(), 0); - EXPECT_NO_THROW(producer.push(sendSlot)); - ASSERT_TRUE(producer.isFull(0)); - producer.updateDepth(); - ASSERT_EQ(producer.getCoordinationDepth(), 1); - ASSERT_EQ(producer.getPayloadDepth(), CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); - EXPECT_THROW(producer.push(sendSlot2), HiCR::RuntimeException); - - // Wait for the consumer 2 - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); - - // Wait for the consumer 3 - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); - - // Send token one by one - for (size_t i = 0; i < CHANNEL_CAPACITY; ++i) - { - ELEMENT_TYPE sendBuffer[1] = {0}; - auto sendBufferPtr = &sendBuffer; - auto sendSlot = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBufferPtr, sizeof(sendBuffer)); - ASSERT_EQ(producer.getCoordinationDepth(), i); - ASSERT_EQ(producer.getPayloadDepth(), i * sizeof(ELEMENT_TYPE)); - ASSERT_NO_THROW(producer.push(sendSlot)); - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); - ASSERT_EQ(producer.getCoordinationDepth(), i + 1); - ASSERT_EQ(producer.getPayloadDepth(), (i + 1) * sizeof(ELEMENT_TYPE)); - } - - ASSERT_TRUE(producer.isFull(0)); - - // Wait for the consumer 4 - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); - - for (size_t i = CHANNEL_CAPACITY; i > 0; --i) - { - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); - ASSERT_EQ(producer.getCoordinationDepth(), i - 1); - ASSERT_EQ(producer.getPayloadDepth(), (i - 1) * sizeof(ELEMENT_TYPE)); - } - - // Wait for the consumer 5 - coordinationCommunicationManager.fence(CHANNEL_TAG); - payloadCommunicationManager.fence(CHANNEL_TAG); -} From 50a4d4ea0fb0f94952d66a93665ee4f329b720c5 Mon Sep 17 00:00:00 2001 From: Luca Terracciano Date: Wed, 8 Oct 2025 15:25:50 +0200 Subject: [PATCH 5/5] style: format files --- .../hicr/frontends/channel/variableSize/spsc/consumer.hpp | 4 ++-- .../hicr/frontends/channel/variableSize/spsc/producer.hpp | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/include/hicr/frontends/channel/variableSize/spsc/consumer.hpp b/include/hicr/frontends/channel/variableSize/spsc/consumer.hpp index 241bcb09..457e7948 100644 --- a/include/hicr/frontends/channel/variableSize/spsc/consumer.hpp +++ b/include/hicr/frontends/channel/variableSize/spsc/consumer.hpp @@ -50,7 +50,7 @@ class Consumer final : public variableSize::Base * \param[in] coordinationCommunicationManager The backend's memory manager to facilitate communication between the producer and consumer coordination buffers * \param[in] payloadCommunicationManager The backend's memory manager to facilitate communication between the producer and consumer payload buffers * \param[in] payloadBuffer The memory slot pertaining to the payload buffer. The producer will push messages into this - * buffer, while there is enough space. This buffer should be large enough to hold twice the capacity specified by \ref payloadCapacity argument. + * buffer, while there is enough space. This buffer should be large enough to hold twice the capacity specified by payloadCapacity argument. * Half of the buffer is used as excess buffer to avoid internal fragmentation of messages * \param[in] tokenBuffer The memory slot pertaining to the token buffer. This buffer is only used to exchange internal metadata * about the sizes of the individual messages being sent. @@ -282,7 +282,7 @@ class Consumer final : public variableSize::Base * This funciton can be used to quickly check whether the channel is becoming full when trying * to push an element of a given size. First thing, we are checking if we can still * push tokens (i.e., if the coordination buffer has space). Second thing, we are checking the - * payload buffer. If the current depth of the payload and the \ref requiredBufferSize to push + * payload buffer. If the current depth of the payload and the requiredBufferSize to push * exceed the channel capacity, the channel is considered full. * * \param[in] requiredBufferSize size of the token to push into the channel diff --git a/include/hicr/frontends/channel/variableSize/spsc/producer.hpp b/include/hicr/frontends/channel/variableSize/spsc/producer.hpp index 126dfac3..ea07817f 100644 --- a/include/hicr/frontends/channel/variableSize/spsc/producer.hpp +++ b/include/hicr/frontends/channel/variableSize/spsc/producer.hpp @@ -48,7 +48,7 @@ class Producer : public variableSize::Base * \param[in] payloadCommunicationManager The backend's memory manager to facilitate communication between the producer and consumer payload buffers * \param[in] sizeInfoBuffer The local memory slot used to hold the information about the next message size * \param[in] payloadBuffer The global memory slot pertaining to the payload of all messages. The producer will push messages into this - * buffer, while there is enough space. This buffer should be large enough to hold twice the capacity specified by \ref payloadCapacity argument. + * buffer, while there is enough space. This buffer should be large enough to hold twice the capacity specified by payloadCapacity argument. * Half of the buffer is used as excess buffer to avoid internal fragmentation of messages * \param[in] tokenBuffer The memory slot pertaining to the token buffer, which is used to hold message size data. * The producer will push message sizes into this buffer, while there is enough space. This buffer should be large enough to @@ -235,7 +235,7 @@ class Producer : public variableSize::Base * @return payload buffer capacity (in bytes) */ __INLINE__ size_t getPayloadCapacity() { return getCircularBufferForPayloads()->getCapacity(); } - + /** * Get depth of the coordination buffer of variable-size producer. * Because the current implementation first receives the payload (phase 1) before @@ -264,7 +264,7 @@ class Producer : public variableSize::Base * This funciton can be used to quickly check whether the channel is becoming full when trying * to push an element of a given size. First thing, we are checking if we can still * push tokens (i.e., if the coordination buffer has space). Second thing, we are checking the - * payload buffer. If the current depth of the payload and the \ref requiredBufferSize to push + * payload buffer. If the current depth of the payload and the requiredBufferSize to push * exceed the channel capacity, the channel is considered full. * * \param[in] requiredBufferSize size of the token to push into the channel