@@ -243,9 +243,119 @@ allDrained()
243243 }
244244 return true ;
245245}
246-
246+
247+ int existsMatchingRequests (MPI_Status status, MPI_Comm comm) {
248+ int isMatchingRequest = 0 ;
249+ MPI_Request matched_request;
250+ std::map<MPI_Request, mpi_nonblocking_call_t *>::iterator it;
251+ // Check if there are pending MPI_Irecv's that matches the envelope of the
252+ // probed message.
253+ for (it = g_nonblocking_calls.begin (); it != g_nonblocking_calls.end (); it++) {
254+ MPI_Request req = it->first ;
255+ mpi_nonblocking_call_t *call = it->second ;
256+ if (call->type == IRECV_REQUEST &&
257+ call->comm == comm &&
258+ (call->tag == status.MPI_TAG || call->tag == MPI_ANY_TAG) &&
259+ (call->remote_node == status.MPI_SOURCE ||
260+ call->remote_node == MPI_ANY_SOURCE)) {
261+ matched_request = req;
262+ isMatchingRequest = 1 ;
263+ break ;
264+ }
265+ }
266+ return isMatchingRequest;
267+ }
268+
269+ int
270+ drainRemainingP2pMsgsONLY ()
271+ {
272+ int bytesReceived = 0 ;
273+ std::unordered_set<MPI_Comm>::iterator comm;
274+ for (comm = active_comms.begin (); comm != active_comms.end (); comm++) {
275+ // If the communicator is MPI_COMM_NULL, skip it.
276+ // MPI_COMM_NULL can be returned from functions like MPI_Comm_split
277+ // if the color is specified on only one side of the inter-communicator, or
278+ // specified as MPI_UNDEFINED by the program. In this case, the MPI function
279+ // still returns MPI_SUCCESS, so the MPI_COMM_NULL can be added to the
280+ // active communicator set `active_comms'.
281+ if (*comm == MPI_COMM_NULL) {
282+ continue ;
283+ }
284+ int flag = 1 ;
285+ while (flag) {
286+ MPI_Status status;
287+ int retval = MPI_Iprobe (MPI_ANY_SOURCE, MPI_ANY_TAG, *comm, &flag, &status);
288+ JASSERT (retval == MPI_SUCCESS);
289+ if (flag) {
290+ JASSERT (! existsMatchingRequests (status, *comm));
291+ bytesReceived += recvMsgIntoInternalBuffer (status, *comm);
292+ }
293+ }
294+ }
295+ return bytesReceived;
296+ }
297+
247298void
248299drainSendRecv ()
300+ {
301+ int i;
302+ int numReceived = -1 ;
303+ int totalRecvBytes = 0 ;
304+ int lastTotalRecvBytes = 0 ;
305+ int bytesReceived = 0 ;
306+ // QUESTION: Are the requests saved as a FIFO queue (so that
307+ // non-overtaking means that we see the oldest request first
308+ // for a source/tag?
309+ // STAGE 1: Test requests, until all requests that can be completed
310+ // have been completed.
311+ int done = 0 ;
312+ while (bytesReceived > 0 || !done) {
313+ while (bytesReceived > 0 ) {
314+ done = 0 ;
315+ // If pending MPI_Irecv or MPI_Isend, use MPI_Test to try to complete it.
316+ bytesReceived += completePendingP2pRequests ();
317+ }
318+ // Sleep for 5 seconds to allow more remote send requests to be completed,
319+ // and sent to us. Eventually, all send requests for which we
320+ // have a pending recv request should be sent, if we continue
321+ // to test all send and recv requests.
322+ //
323+ // Now try twice more. This gives the remote process time to
324+ // complete a pending send, if needed.
325+ done = 0 ;
326+ bytesReceived = 0 ;
327+ sleep (5 );
328+ // If pending MPI_Irecv or MPI_Isend, use MPI_Test to try to complete it.
329+ bytesReceived += completePendingP2pRequests ();
330+ sleep (5 );
331+ bytesReceived += completePendingP2pRequests ();
332+ if (bytesReceived == 0 ) {
333+ done = 1 ; // Okay. We waited for slow senders. Now we're done.
334+ }
335+ }
336+
337+ // This is where we can add tests of correctness.
338+ // An example is: sleep(5*60); and see if bit-for-bit bug goes away.
339+ // If 'sleep(5*60)' says that there was a bug earlier in the code,
340+ // now we can debug the implicit assertion that there should be
341+ // no matching remote send request with a local recv request.
342+ // As an example of this, we can do:
343+ // sleep(5*60); Re-execute STAGE 1 to see what new requests were satisfied.
344+ // Another example is to use MPI_Alltoall or MPI_Allreduce to try
345+ // to discover pending matches of send and recv request that
346+ // were not caught earlier.
347+
348+ // STAGE 2: Assume that all tests that could be completed have been
349+ // completed. So, any remaining messages must be drained
350+ // to the MANA internal buffer.
351+ // There are now no matching remote send requests with local recv requests.
352+ // So we can drain all remaining messages into internal buffers.
353+ // If MPI_Irecv not posted, but msg was sent, use MPI_Iprobe to drain msg
354+ drainRemainingP2pMsgsONLY ();
355+ }
356+
357+ void
358+ drainSendRecvOLD ()
249359{
250360 int numReceived = -1 ;
251361 while (numReceived != 0 ) {
0 commit comments