-
Notifications
You must be signed in to change notification settings - Fork 42
Updated and merged WaitSet approach into polled approach. Updated rt… #38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ damages arising out of the use or inability to use the software. | |
| #include "../CommonInfrastructure/DDSTypeWrapper.h" | ||
| #include "../CommonInfrastructure/OSAPI.h" | ||
|
|
||
| #define VTE_USE_WAITSET // comment-out to use polling for received track data | ||
|
|
||
| using namespace DDS; | ||
| using namespace com::rti::atc::generated; | ||
|
|
@@ -420,8 +421,8 @@ TrackReader::~TrackReader() | |
| } | ||
|
|
||
| // ---------------------------------------------------------------------------- | ||
| // This example is using an application thread to be notified when tracks | ||
| // arrive. | ||
| // This example is using an application thread to either poll or be notified | ||
| // when tracks arrive, depending on VTE_USE_WAITSET defined or not. | ||
| // | ||
| // In this example, we leave the data from the middleware's queue by calling | ||
| // read(). We do this to illustrate a case where an object that represents | ||
|
|
@@ -431,7 +432,7 @@ TrackReader::~TrackReader() | |
| // queue does not grow forever. | ||
| // | ||
| // There are three options for getting data from RTI Connext DDS: | ||
| // 1. Being notified in the application's thread of data arriving (as here). | ||
| // 1. Being notified in the application's thread of data arriving (WaitSet). | ||
| // This mechanism has slightly higher latency than option #2, but low | ||
| // latency is not important for this use case. In addition, this is safer | ||
| // than using option #2, because you do not have to worry about the effect | ||
|
|
@@ -442,25 +443,37 @@ TrackReader::~TrackReader() | |
| // 2. Being notified in a listener callback of data arriving. | ||
| // This has lower latency than using a WaitSet, but is more dangerous | ||
| // because you have to worry about not blocking the middleware's thread. | ||
| // 3. Polling for data. | ||
| // (Not shown in this example). | ||
| // 3. Polling for data. | ||
| // You can call read() or take() at any time to view or remove the data that | ||
| // is currently in the queue. | ||
| // A simple example of this can be found at: | ||
| // http://community.rti.com/examples/polling-read | ||
|
|
||
| void TrackReader::WaitForTracks( | ||
| // ---------------------------------------------------------------------------- | ||
| // When used in the polling configuration (VTE_USE_WAITSET not defined), this | ||
| // example uses an application thread to poll for all the existing | ||
| // track data inside the middleware's queue. | ||
| // | ||
| // This goes through two steps: | ||
| // 1) read() all ALIVE data from the DataReader. These are the updates of all | ||
| // the flights that have not landed. By calling read() we leave the data in | ||
| // the queue, and can access it again later if it is not updated. After | ||
| // reading the data, we return the loan to the DataReader() | ||
| // 2) take() all the NOT_ALIVE data from the DataReader. These are the updates | ||
| // of flights that have landed. This removes the deletion notices from the | ||
| // queue. After taking the data, we return the loan to the DataReader() | ||
| // | ||
| void TrackReader::GetCurrentTracks( | ||
| std::vector< DdsAutoType<Track> > *tracksUpdated, | ||
| std::vector< DdsAutoType<Track> > *tracksDeleted) | ||
| std::vector< DdsAutoType<Track> > *tracksDeleted) | ||
| { | ||
|
|
||
| _mutex->Lock(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you need a mutex to wait for data to become available? I wonder if that mutex is really neede. @rosemwahlin, do you know if we really need that mutex? Also, if I were to implement this lock, I would apply RAII and create a lock that locks in the constructor and unlocks in the destructor, so that you don't have to release the lock every time you throw an exception. |
||
| DDS_ReturnCode_t retcode; | ||
| #ifdef VTE_USE_WAITSET // define to use a WaitSet instead of polling to receive track data | ||
| ConditionSeq activeConditions; | ||
| DDS_Duration_t timeout = {0,300000000}; | ||
|
|
||
| _mutex->Lock(); | ||
|
|
||
| // Block this thread until track data becomes available. | ||
| DDS_ReturnCode_t retcode = _waitSet->wait(activeConditions, timeout); | ||
|
|
||
| retcode = _waitSet->wait(activeConditions, timeout); | ||
| // May be normal to time out | ||
| if (retcode == DDS_RETCODE_TIMEOUT) | ||
| { | ||
|
|
@@ -474,6 +487,7 @@ void TrackReader::WaitForTracks( | |
| _mutex->Unlock(); | ||
| throw errss.str(); | ||
| } | ||
| #endif // VTE_USE_WAITSET | ||
|
|
||
| // Note: These two sequences are being created with a length = 0. | ||
| // this means that the middleware is loaning memory to them, which | ||
|
|
@@ -482,96 +496,10 @@ void TrackReader::WaitForTracks( | |
| TrackSeq trackSeq; | ||
| SampleInfoSeq sampleInfos; | ||
|
|
||
| // Call read in a loop until there is no data left to read. Note that | ||
| // retcode must be okay, or an exception would have been thrown above | ||
| while (retcode != DDS_RETCODE_NO_DATA) | ||
| { | ||
| // This leaves the data in the DataReader's queue. Alternately, can | ||
| // call take() which will remove it from the queue. Leaving data in | ||
| // the makes sense in this application for two reasons: | ||
| // 1) the QoS allows the overwriting of data in the queue | ||
| // 2) the application wants to always see the latest update of each | ||
| // instance | ||
| retcode = _reader->read(trackSeq, sampleInfos); | ||
|
|
||
| if (retcode != DDS_RETCODE_NO_DATA && | ||
| retcode != DDS_RETCODE_OK) | ||
| { | ||
| std::stringstream errss; | ||
| errss << "WaitForTracks(): error when retrieving flight plans."; | ||
| _mutex->Unlock(); | ||
| throw errss.str(); | ||
| } | ||
|
|
||
| // Note, based on the QoS profile (history = keep last, depth = 1) and | ||
| // the fact that we modeled flights as separate instances, we can | ||
| // assume there is only one entry per flight. So if a flight plan for | ||
| // a particular flight has been changed 10 times, we will only be | ||
| // maintaining the most recent update to that flight plan in the | ||
| // middleware queue. | ||
| for (int i = 0; i < trackSeq.length(); i++) | ||
| { | ||
| if (sampleInfos[i].valid_data) | ||
| { | ||
| SampleInfo info = sampleInfos[i]; | ||
|
|
||
| // Making copies of this type for clean API because we do not | ||
| // need lowest latency for flight plan data | ||
| DdsAutoType<Track> trackType = trackSeq[i]; | ||
| tracksUpdated->push_back(trackType); | ||
| } | ||
| else if (!sampleInfos[i].valid_data) | ||
| { | ||
| if (sampleInfos[i].instance_state != ALIVE_INSTANCE_STATE) | ||
| { | ||
| DdsAutoType<Track> trackType = trackSeq[i]; | ||
| trackType.trackId = | ||
| _reader->get_key_value(trackType, | ||
| sampleInfos[i].instance_handle); | ||
| tracksDeleted->push_back(trackType); | ||
| } | ||
| } | ||
|
|
||
| } | ||
|
|
||
| // The original track sequence was loaned from the middleware to the | ||
| // application. We have copied the data out of it, so we can now | ||
| // return the loan to the middleware. | ||
| _reader->return_loan(trackSeq, sampleInfos); | ||
| } | ||
|
|
||
| _mutex->Unlock(); | ||
|
|
||
| } | ||
|
|
||
| // ---------------------------------------------------------------------------- | ||
| // This example is using an application thread to poll for all the existing | ||
| // track data inside the middleware's queue. | ||
| // | ||
| // This goes through two steps: | ||
| // 1) read() all ALIVE data from the DataReader. These are the updates of all | ||
| // the flights that have not landed. By calling read() we leave the data in | ||
| // the queue, and can access it again later if it is not updated. After | ||
| // reading the data, we return the loan to the DataReader() | ||
| // 2) take() all the NOT_ALIVE data from the DataReader. These are the updates | ||
| // of flights that have landed. This removes the deletion notices from the | ||
| // queue. After taking the data, we return the loan to the DataReader() | ||
| // | ||
| void TrackReader::GetCurrentTracks( | ||
| std::vector< DdsAutoType<Track> > *tracksUpdated, | ||
| std::vector< DdsAutoType<Track> > *tracksDeleted) | ||
| { | ||
| _mutex->Lock(); | ||
|
|
||
| // The sequences that will be filled in with the data. These are empty, | ||
| // so the middleware will loan the data to the sequences. | ||
| TrackSeq trackSeq; | ||
| SampleInfoSeq sampleInfos; | ||
|
|
||
| // This reads all ALIVE track data from the queue, and loans it to the | ||
| // application in the trackSeq sequence. See below where you must return | ||
| // the loan. | ||
| DDS_ReturnCode_t retcode = _reader->read( | ||
| retcode = _reader->read( | ||
| trackSeq, sampleInfos, | ||
| DDS_LENGTH_UNLIMITED, DDS_ANY_SAMPLE_STATE, | ||
| DDS_ANY_VIEW_STATE, DDS_ALIVE_INSTANCE_STATE); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of commenting it out there. I would add this option to the makefiles and Visual Studio solutions that build the product. That is, you would compile with
-DVTE_USE_WAITSETif I wanted to have it available and I would skip it otherwise.That decouples the code from the required compilation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anyway, do you think it's interesting to implement the polling case? Maybe we should just use the waitset always, in that case this define wouldn't be needed.