@@ -15,6 +15,7 @@ using namespace dmq;
1515// ----------------------------------------------------------------------------
1616Thread::Thread (const std::string& threadName, size_t maxQueueSize)
1717 : THREAD_NAME(threadName)
18+ , m_exit(false )
1819{
1920 // If 0 is passed, use the default size
2021 m_queueSize = (maxQueueSize == 0 ) ? DEFAULT_QUEUE_SIZE : maxQueueSize;
@@ -93,12 +94,14 @@ void Thread::ExitThread()
9394{
9495 if (m_msgq != NULL )
9596 {
97+ m_exit.store (true );
98+
9699 // Send exit message
97100 ThreadMsg* msg = new (std::nothrow) ThreadMsg (MSG_EXIT_THREAD);
98101 if (msg)
99102 {
100- // Send pointer, wait 100 ticks max
101- if (osMessageQueuePut (m_msgq, &msg, 0 , 100 ) != osOK)
103+ // Send pointer, wait forever to ensure it gets in.
104+ if (osMessageQueuePut (m_msgq, &msg, 0 , osWaitForever ) != osOK)
102105 {
103106 delete msg; // Failed to send
104107 }
@@ -112,12 +115,7 @@ void Thread::ExitThread()
112115 }
113116
114117 // Thread has finished Run(). Now we can safely clean up resources.
115-
116- // Terminate ensures the thread is in INACTIVE state and resources are reclaimed.
117- if (m_thread) {
118- osThreadTerminate (m_thread);
119- m_thread = NULL ;
120- }
118+ m_thread = NULL ;
121119
122120 if (m_msgq) {
123121 osMessageQueueDelete (m_msgq);
@@ -150,6 +148,17 @@ bool Thread::IsCurrentThread()
150148 return GetThreadId () == GetCurrentThreadId ();
151149}
152150
151+ // ----------------------------------------------------------------------------
152+ // GetQueueSize
153+ // ----------------------------------------------------------------------------
154+ size_t Thread::GetQueueSize ()
155+ {
156+ if (m_msgq != NULL ) {
157+ return (size_t )osMessageQueueGetCount (m_msgq);
158+ }
159+ return 0 ;
160+ }
161+
153162// ----------------------------------------------------------------------------
154163// DispatchDelegate
155164// ----------------------------------------------------------------------------
@@ -192,17 +201,16 @@ void Thread::Run()
192201{
193202 ThreadMsg* msg = nullptr ;
194203
195- while (true )
204+ while (!m_exit. load () )
196205 {
197206 // Block forever waiting for a message
198207 // msg is a pointer to ThreadMsg*. The queue holds the pointer.
199208 if (osMessageQueueGet (m_msgq, &msg, NULL , osWaitForever) == osOK)
200209 {
201210 if (!msg) continue ;
202211
203- switch (msg->GetId ())
204- {
205- case MSG_DISPATCH_DELEGATE:
212+ int msgId = msg->GetId ();
213+ if (msgId == MSG_DISPATCH_DELEGATE)
206214 {
207215 auto delegateMsg = msg->GetData ();
208216 if (delegateMsg) {
@@ -211,24 +219,18 @@ void Thread::Run()
211219 invoker->Invoke (delegateMsg);
212220 }
213221 }
214- break ;
215- }
216-
217- case MSG_EXIT_THREAD:
218- {
219- delete msg;
220- // Signal ExitThread() that we are done
221- if (m_exitSem) {
222- osSemaphoreRelease (m_exitSem);
223- }
224- return ;
225222 }
223+
224+ delete msg;
226225
227- default :
226+ if (msgId == MSG_EXIT_THREAD) {
228227 break ;
229228 }
230-
231- delete msg;
232229 }
233230 }
231+
232+ // Signal ExitThread() that we are done
233+ if (m_exitSem) {
234+ osSemaphoreRelease (m_exitSem);
235+ }
234236}
0 commit comments