Skip to content

Commit ed38e64

Browse files
Add Scheduler class to manage retransmission timers and track ACKs
1 parent 1821abd commit ed38e64

File tree

2 files changed

+166
-0
lines changed

2 files changed

+166
-0
lines changed

communication/include/scheduler.h

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
#ifndef __SCHEDULER_H__
2+
#define __SCHEDULER_H__
3+
4+
#include <chrono>
5+
#include <condition_variable>
6+
#include <functional>
7+
#include <future>
8+
#include <mutex>
9+
#include <unordered_map>
10+
#include <vector>
11+
#include "global_clock.h"
12+
13+
// must be set
14+
#define MAX_ACK_TIMEOUT \
15+
(20 * TICK_DURATION) ///< Maximum time to wait for ACK
16+
17+
/**
18+
* @class Scheduler
19+
* @brief Manages retransmission timers and tracks acknowledgments for packets.
20+
*
21+
* This class provides functionality to manage retransmission timers for
22+
* packets, handle acknowledgments, and clean up packet data once
23+
* retransmissions are complete.
24+
*/
25+
class Scheduler
26+
{
27+
public:
28+
using Callback = std::function<void(int)>;
29+
30+
Scheduler();
31+
~Scheduler();
32+
33+
/**
34+
* @brief Stops all active timers and waits for their completion.
35+
*
36+
* This method ensures that all active timers (threads) complete their
37+
* execution before the program exits.
38+
*/
39+
void stopAllTimers();
40+
41+
/**
42+
* @brief Starts a retransmission timer for a given packet ID.
43+
*
44+
* The function initiates a timer to wait for an ACK (Acknowledgment) for the specified packet.
45+
* If the ACK is received within the MAX_ACK_TIMEOUT, it clears the packet data and sets the result
46+
* of the ackPromise to `true`, indicating success. If the timeout occurs and no ACK is received,
47+
* it triggers the provided callback function to retransmit the packet and increments the retry count.
48+
*
49+
* @param packetID The unique ID of the packet being transmitted.
50+
* @param callback The callback function to call when retransmitting the packet after a timeout.
51+
* @param ackPromise A shared promise that communicates whether the packet transmission was successful
52+
* (i.e., ACK was received).
53+
*/
54+
void startRetransmissionTimer(int packetID, Callback callback, std::shared_ptr<std::promise<bool>> ackPromise);
55+
56+
/**
57+
* @brief Receives an acknowledgment for a packet.
58+
*
59+
* @param packetID The ID of the packet that has been acknowledged.
60+
*/
61+
void receiveACK(int packetID);
62+
63+
/**
64+
* @brief Clears the data associated with a packet.
65+
*
66+
* @param packetID The ID of the packet whose data is to be cleared.
67+
*/
68+
void clearPacketData(int packetID);
69+
70+
private:
71+
std::unordered_map<int, bool>
72+
ackReceived; ///< Map to track received acknowledgments
73+
std::unordered_map<int, int>
74+
retryCounts; ///< Map to track retry counts for packets
75+
std::mutex mutex; ///< Mutex for thread safety
76+
std::condition_variable cv; ///< Condition variable for synchronization
77+
std::vector<std::future<void>>
78+
futures; ///< Vector to store futures of active threads
79+
};
80+
81+
#endif // __SCHEDULER_H__

communication/src/scheduler.cpp

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#include "scheduler.h"
2+
3+
Scheduler::Scheduler()
4+
{
5+
}
6+
7+
Scheduler::~Scheduler()
8+
{
9+
stopAllTimers();
10+
}
11+
12+
void Scheduler::stopAllTimers()
13+
{
14+
for (auto &future : futures)
15+
{
16+
if (future.valid())
17+
{
18+
future.wait(); // Wait for all threads to finish
19+
}
20+
}
21+
}
22+
23+
void Scheduler::startRetransmissionTimer(int packetID, Callback callback, std::shared_ptr<std::promise<bool>> ackPromise)
24+
{
25+
// Promise to manage the lifecycle of the thread itself
26+
std::promise<void> threadCompletionPromise;
27+
28+
// Future object to track the thread's completion
29+
std::future<void> future = threadCompletionPromise.get_future();
30+
31+
// Store the future to ensure we can wait for the thread to finish later
32+
futures.push_back(std::move(future));
33+
34+
// Start a new thread for handling retransmission and ACK wait
35+
std::thread([this, packetID, callback, threadCompletionPromise = std::move(threadCompletionPromise), ackPromise]() mutable
36+
{
37+
int retryCount = 0;
38+
39+
{
40+
// Lock the mutex to synchronize access to shared data (ackReceived)
41+
std::unique_lock<std::mutex> lock(mutex);
42+
43+
// Wait for an ACK or timeout
44+
if (cv.wait_for(lock, MAX_ACK_TIMEOUT, [this, packetID]()
45+
{ return ackReceived[packetID]; }))
46+
{
47+
// ACK received within the timeout period
48+
std::cout << "ACK received for packet ID: " << packetID << std::endl;
49+
clearPacketData(packetID); // Clear packet data
50+
51+
// Set both promises to indicate success and thread completion
52+
threadCompletionPromise.set_value();
53+
ackPromise->set_value(true); // ACK was received, set to true
54+
return; // Exit the thread
55+
}
56+
else
57+
{
58+
// Timeout occurred, retransmit the packet
59+
retryCounts[packetID]++;
60+
retryCount = retryCounts[packetID];
61+
std::cout << "Timeout! Retransmitting packet ID: " << packetID << std::endl;
62+
}
63+
}
64+
65+
// Call the callback function with the updated retry count
66+
callback(retryCount);
67+
68+
// Set the promise to indicate the thread has finished
69+
threadCompletionPromise.set_value();
70+
})
71+
.detach(); // Detach the thread to allow it to run independently
72+
}
73+
74+
void Scheduler::receiveACK(int packetID)
75+
{
76+
std::unique_lock<std::mutex> lock(mutex);
77+
ackReceived[packetID] = true;
78+
cv.notify_all(); // Notify all waiting threads
79+
}
80+
81+
void Scheduler::clearPacketData(int packetID)
82+
{
83+
ackReceived.erase(packetID);
84+
retryCounts.erase(packetID);
85+
}

0 commit comments

Comments
 (0)