ROSS
network-mpi.c
Go to the documentation of this file.
1#include <ross.h>
2#include <mpi.h>
3
4MPI_Comm MPI_COMM_ROSS = MPI_COMM_WORLD;
6
7/**
8 * @struct act_q
9 * @brief Keeps track of posted send or recv operations.
10 *
11 * This list structure is used *only* by the network mpi layer (this
12 * file). Within this file, two lists are used, for MPI Irecv and
13 * Isend requests. The MPI requests and statusus are linked with an
14 * event buffer through this struct.
15 */
16struct act_q
17{
18 const char *name; /**< name of the list, used in error printouts */
19
20 tw_event **event_list; /**< list of event pointers in this queue */
21 MPI_Request *req_list; /**< list of MPI request handles */
22 int *idx_list; /**< indices in this queue of finished operations */
23 MPI_Status *status_list; /**< list of MPI_Status handles */
24 unsigned int cur; /**< index of first open spot in the queue */
25};
26
27#define EVENT_TAG 1
28
29#define EVENT_SIZE(e) g_tw_event_msg_sz
30
31static struct act_q posted_sends;
32static struct act_q posted_recvs;
34
35static unsigned int read_buffer = 16; /**< Number of Irecv's to buffer, length of posted_recvs queue */
36static unsigned int send_buffer = 1024; /**< Number of Isend's to buffer, length of posted_sends queue */
37static int world_size = 1;
38
39static const tw_optdef mpi_opts[] = {
40 TWOPT_GROUP("ROSS MPI Kernel"),
42 "read-buffer",
44 "network read buffer size in # of events"),
46 "send-buffer",
48 "network send buffer size in # of events"),
49 TWOPT_END()
50};
51
52// Forward declarations of functions used in MPI network message processing
53static int recv_begin(tw_pe *me);
54static void recv_finish(tw_pe *me, tw_event *e, char * buffer);
55static int send_begin(tw_pe *me);
56static void send_finish(tw_pe *me, tw_event *e, char * buffer);
57
58// Start of implmentation of network processing routines/functions
59void tw_comm_set(MPI_Comm comm)
60{
61 MPI_COMM_ROSS = comm;
63}
64
65const tw_optdef *
66tw_net_init(int *argc, char ***argv)
67{
68 int my_rank;
69 int initialized;
70 MPI_Initialized(&initialized);
71
72 if (!initialized) {
73 if (MPI_Init(argc, argv) != MPI_SUCCESS)
74 tw_error(TW_LOC, "MPI_Init failed.");
75 }
76
77 if (MPI_Comm_rank(MPI_COMM_ROSS, &my_rank) != MPI_SUCCESS)
78 tw_error(TW_LOC, "Cannot get MPI_Comm_rank(MPI_COMM_ROSS)");
79
81 g_tw_mynode = my_rank;
82
83 return mpi_opts;
84}
85
86/**
87 * @brief Initializes queues used for posted sends and receives
88 *
89 * @param[in] q pointer to the queue to be initialized
90 * @param[in] name name of the queue
91 */
92static void
93init_q(struct act_q *q, const char *name, unsigned int size)
94{
95 q->name = name;
96 q->event_list = (tw_event **) tw_calloc(TW_LOC, name, sizeof(tw_event *), size);
97 q->req_list = (MPI_Request *) tw_calloc(TW_LOC, name, sizeof(MPI_Request), size);
98 q->idx_list = (int *) tw_calloc(TW_LOC, name, sizeof(int), size);
99 q->status_list = (MPI_Status *) tw_calloc(TW_LOC, name, sizeof(MPI_Status), size);
100}
101
102unsigned int
104{
105 return world_size;
106}
107
108void
110{
111 // sets value of tw_nnodes
112 if (MPI_Comm_size(MPI_COMM_ROSS, &world_size) != MPI_SUCCESS)
113 tw_error(TW_LOC, "Cannot get MPI_Comm_size(MPI_COMM_ROSS)");
114
115 if( g_tw_mynode == 0)
116 {
117 printf("tw_net_start: Found world size to be %d \n", world_size );
118 }
119
120 // Check after tw_nnodes is defined
121 if(tw_nnodes() == 1) {
122 // force the setting of SEQUENTIAL protocol
127 fprintf(stderr, "Warning: Defaulting to Sequential Simulation, not enough PEs defined.\n");
128 }
129 }
130
131 tw_pe_init();
132
133 // these values are command line options
134 if (send_buffer < 1) tw_error(TW_LOC, "network send buffer must be >= 1");
135 if (read_buffer < 1) tw_error(TW_LOC, "network read buffer must be >= 1");
136
137 init_q(&posted_sends, "MPI send queue", send_buffer);
138 init_q(&posted_recvs, "MPI recv queue", read_buffer);
139
141
142 // pre-post all the Irecv operations
144}
145
146void
148{
149 MPI_Abort(MPI_COMM_ROSS, 1);
150 exit(1);
151}
152
153void
155{
156#ifdef USE_DAMARIS
157 if (g_st_damaris_enabled)
158 st_damaris_ross_finalize();
159 else
160 {
161 if (!custom_communicator) {
162 if (MPI_Finalize() != MPI_SUCCESS)
163 tw_error(TW_LOC, "Failed to finalize MPI");
164 }
165 }
166#else
167 if (!custom_communicator) {
168 if (MPI_Finalize() != MPI_SUCCESS)
169 tw_error(TW_LOC, "Failed to finalize MPI");
170 }
171#endif
172}
173
174void
176{
177 if (MPI_Barrier(MPI_COMM_ROSS) != MPI_SUCCESS)
178 tw_error(TW_LOC, "Failed to wait for MPI_Barrier");
179}
180
183{
185 tw_event *e;
186 unsigned int i;
187
188 e = outq.head;
189 while (e) {
190 if (TW_STIME_CMP(m, e->recv_ts) > 0)
191 m = e->recv_ts;
192 e = e->next;
193 }
194
195 for (i = 0; i < posted_sends.cur; i++) {
196 e = posted_sends.event_list[i];
197 if (TW_STIME_CMP(m, e->recv_ts) > 0)
198 m = e->recv_ts;
199 }
200
201 return m;
202}
203
204#ifdef USE_RAND_TIEBREAKER
205tw_event_sig const *
207{
208 tw_event_sig const * m = &g_tw_max_sig;
209 tw_event *e;
210 unsigned int i;
211
212 e = outq.head;
213 while (e) {
214 if (tw_event_sig_compare_ptr(m, &e->sig) > 0) {
215 m = &e->sig;
216 }
217 e = e->next;
218 }
219
220 for (i = 0; i < posted_sends.cur; i++) {
221 e = posted_sends.event_list[i];
222 if (tw_event_sig_compare_ptr(m, &e->sig) > 0) {
223 m = &e->sig;
224 }
225 }
226
227 return m;
228}
229#endif
230
231/**
232 * @brief Calls MPI_Testsome on the provided queue, to check for finished operations.
233 *
234 * @param[in] q queue to check
235 * @param[in] me pointer to the PE
236 * @param[in] finish pointer to function that will perform the appropriate send/recv
237 * finish functionality
238 *
239 * @return 0 if MPI_Testsome did not return any finished operations, 1 otherwise.
240 */
241static int
243 struct act_q *q,
244 tw_pe *me,
245 void (*finish)(tw_pe *, tw_event *, char *))
246{
247 int ready, i, n;
248
249 if (!q->cur)
250 return 0;
251
252 if (MPI_Testsome(
253 q->cur,
254 q->req_list,
255 &ready,
256 q->idx_list,
257 q->status_list) != MPI_SUCCESS) {
258 tw_error(
259 TW_LOC,
260 "MPI_testsome failed with %u items in %s",
261 q->cur,
262 q->name);
263 }
264
265 if (1 > ready)
266 return 0;
267
268 for (i = 0; i < ready; i++)
269 {
270 tw_event *e;
271
272 n = q->idx_list[i];
273 e = q->event_list[n];
274 q->event_list[n] = NULL;
275
276 finish(me, e, NULL);
277 }
278
279 /* Collapse the lists to remove any holes we left. */
280 for (i = 0, n = 0; (unsigned int)i < q->cur; i++)
281 {
282 if (q->event_list[i])
283 {
284 if (i != n)
285 {
286 // swap the event pointers
287 q->event_list[n] = q->event_list[i];
288
289 // copy the request handles
290 memcpy(
291 &q->req_list[n],
292 &q->req_list[i],
293 sizeof(q->req_list[0]));
294
295 } // endif (i != n)
296 n++;
297 } // endif (q->event_list[i])
298 }
299 q->cur -= ready;
300
301 return 1;
302}
303
304/**
305 * @brief If there are any openings in the posted_recvs queue, post more Irecvs.
306 *
307 * @param[in] me pointer to the PE
308 * @return 0 if no changes are made to the queue, 1 otherwise.
309 */
310static int
312{
313 tw_event *e = NULL;
314
315 int changed = 0;
316
317 while (posted_recvs.cur < read_buffer)
318 {
319 unsigned id = posted_recvs.cur;
320
321 if(!(e = tw_event_grab(me)))
322 {
323 if(tw_gvt_inprogress(me))
324 tw_error(TW_LOC, "Out of events in GVT! Consider increasing --extramem");
325 return changed;
326 }
327
328 if( MPI_Irecv(e,
329 (int)EVENT_SIZE(e),
330 MPI_BYTE,
331 MPI_ANY_SOURCE,
332 EVENT_TAG,
334 &posted_recvs.req_list[id]) != MPI_SUCCESS)
335 {
336 tw_event_free(me, e);
337 return changed;
338 }
339
340 posted_recvs.event_list[id] = e;
341 posted_recvs.cur++;
342 changed = 1;
343 }
344
345 return changed;
346}
347
348/**
349 * @brief Determines how to handle the newly received event.
350 *
351 * @param[in] me pointer to PE
352 * @param[in] e pointer to event that we just received
353 * @param[in] buffer not currently used
354 */
355static void
356recv_finish(tw_pe *me, tw_event *e, char * buffer)
357{
358 (void) buffer;
359 tw_pe *dest_pe;
360 tw_clock start;
361
363 me->s_nwhite_recv++;
364
365 // printf("recv_finish: remote event [cancel %u] FROM: LP %lu, PE %lu, TO: LP %lu, PE %lu at TS %lf \n",
366 // e->state.cancel_q, (tw_lpid)e->src_lp, e->send_pe, (tw_lpid)e->dest_lp, me->id, e->recv_ts);
367
369 dest_pe = e->dest_lp->pe;
370 // instrumentation
373
374 if(e->send_pe > tw_nnodes()-1)
375 tw_error(TW_LOC, "bad sendpe_id: %d", e->send_pe);
376
377 e->cancel_next = NULL;
378 e->caused_by_me = NULL;
379 e->cause_next = NULL;
380
381#ifdef USE_RAND_TIEBREAKER
382 if(tw_event_sig_compare_ptr(&e->sig, &me->GVT_sig) < 0)
383 tw_error(TW_LOC, "%d: Received straggler from %d: %lf < GVT%lf (%d)",
384 me->id, e->send_pe, e->sig.recv_ts, me->GVT_sig.recv_ts, e->state.cancel_q);
385
386 if(tw_gvt_inprogress(me)) {
387 if (tw_event_sig_compare_ptr(&me->trans_msg_sig, &e->sig) >= 0) {
389 }
390 }
391#else
392 if(TW_STIME_CMP(e->recv_ts, me->GVT) < 0)
393 tw_error(TW_LOC, "%d: Received straggler from %d: %lf (%d)",
394 me->id, e->send_pe, e->recv_ts, e->state.cancel_q);
395
396 if(tw_gvt_inprogress(me))
397 me->trans_msg_ts = (TW_STIME_CMP(me->trans_msg_ts, e->recv_ts) < 0) ? me->trans_msg_ts : e->recv_ts;
398#endif
399
400 // if cancel event, retrieve and flush
401 // else, store in hash table
402 if(e->state.cancel_q)
403 {
404 tw_event *cancel = tw_hash_remove(me->hash_t, e, e->send_pe);
405
406 // NOTE: it is possible to cancel the event we
407 // are currently processing at this PE since this
408 // MPI module lets me read cancel events during
409 // event sends over the network.
410
411 cancel->state.cancel_q = 1;
412 cancel->state.remote = 0;
413
414 cancel->cancel_next = dest_pe->cancel_q;
415 dest_pe->cancel_q = cancel;
416
417 tw_event_free(me, e);
418
419 return;
420 }
421
425 tw_hash_insert(me->hash_t, e, e->send_pe);
426 e->state.remote = 1;
427 }
428
429 /* NOTE: the final check in the if conditional below was added to make sure
430 * that we do not execute the fast case unless the cancellation queue is
431 * empty on the destination PE. Otherwise we need to invoke the normal
432 * scheduling routines to make sure that a forward event doesn't bypass a
433 * cancellation event with an earlier timestamp. This is helpful for
434 * stateful models that produce incorrect results when presented with
435 * duplicate messages with no rollback between them.
436 */
437#ifdef USE_RAND_TIEBREAKER
438 if(me == dest_pe && tw_event_sig_compare_ptr(&e->dest_lp->kp->last_sig, &e->sig) <= 0 && !dest_pe->cancel_q) {
439#else
440 if(me == dest_pe && TW_STIME_CMP(e->dest_lp->kp->last_time, e->recv_ts) <= 0 && !dest_pe->cancel_q) {
441#endif
442 /* Fast case, we are sending to our own PE and
443 * there is no rollback caused by this send.
444 */
445 start = tw_clock_read();
446 tw_pq_enqueue(dest_pe->pq, e);
447 dest_pe->stats.s_pq += tw_clock_read() - start;
448 return;
449 }
450
451 if (me->id == dest_pe->id) {
452 /* Slower, but still local send, so put into top
453 * of dest_pe->event_q.
454 */
456 tw_eventq_push(&dest_pe->event_q, e);
457 return;
458 }
459
460 /* Never should happen; MPI should have gotten the
461 * message to the correct node without needing us
462 * to redirect the message there for it. This is
463 * probably a serious bug with the event headers
464 * not being formatted right.
465 */
466 tw_error(
467 TW_LOC,
468 "Event recived by PE %u but meant for PE %u",
469 me->id,
470 dest_pe->id);
471}
472
473/**
474 * @brief If there are any openings in the posted_sends queue, start sends
475 * for events in the outgoing queue.
476 *
477 * @param[in] me pointer to the PE
478 * @return 0 if no changes are made to the posted_sends queue, 1 otherwise.
479 */
480static int
482{
483 int changed = 0;
484
485 while (posted_sends.cur < send_buffer)
486 {
488 tw_peid dest_pe;
489
490 unsigned id = posted_sends.cur;
491
492 if (!e)
493 break;
494
495 if(e == me->abort_event)
496 tw_error(TW_LOC, "Sending abort event!");
497
498 dest_pe = (*e->src_lp->type->map) ((tw_lpid) e->dest_lp);
499
501 e->send_lp = e->src_lp->gid;
502
503 if (MPI_Isend(e,
504 (int)EVENT_SIZE(e),
505 MPI_BYTE,
506 (int)dest_pe,
507 EVENT_TAG,
509 &posted_sends.req_list[id]) != MPI_SUCCESS) {
510 return changed;
511 }
512
514 e->state.owner = e->state.cancel_q
516 : TW_net_asend;
517
518 posted_sends.event_list[id] = e;
519 posted_sends.cur++;
520 me->s_nwhite_sent++;
521
522 changed = 1;
523 }
524 return changed;
525}
526
527/**
528 * @brief Determines how to handle the buffer of event whose send operation
529 * just finished.
530 *
531 * @param[in] me pointer to PE
532 * @param[in] e pointer to event that we just received
533 * @param[in] buffer not currently used
534 */
535static void
536send_finish(tw_pe *me, tw_event *e, char * buffer)
537{
538 (void) buffer;
540 // instrumentation
543
544 if (e->state.owner == TW_net_asend) {
545 if (e->state.cancel_asend) {
546 /* Event was cancelled during transmission. We must
547 * send another message to pass the cancel flag to
548 * the other node.
549 */
550 e->state.cancel_asend = 0;
551 e->state.cancel_q = 1;
552 tw_eventq_push(&outq, e);
553 } else {
554 /* Event finished transmission and was not cancelled.
555 * Add to our sent event queue so we can retain the
556 * event in case we need to cancel it later. Note it
557 * is currently in remote format and must be converted
558 * back to local format for fossil collection.
559 */
562 tw_event_free(me, e);
563 }
564
565 return;
566 }
567
568 if (e->state.owner == TW_net_acancel) {
569 /* We just finished sending the cancellation message
570 * for this event. We need to free the buffer and
571 * make it available for reuse.
572 */
573 tw_event_free(me, e);
574 return;
575 }
576
577 /* Never should happen, not unless we somehow broke this
578 * module's other functions related to sending an event.
579 */
580
581 tw_error(
582 TW_LOC,
583 "Don't know how to finish send of owner=%u, cancel_q=%d",
584 e->state.owner,
585 e->state.cancel_q);
586
587}
588
589/**
590 * @brief Start checks for finished operations in send/recv queues,
591 * and post new sends/recvs if possible.
592 * @param[in] me pointer to PE
593 */
594static void
596{
597 int changed;
598 do {
599 changed = test_q(&posted_recvs, me, recv_finish);
600 changed |= test_q(&posted_sends, me, send_finish);
601 changed |= recv_begin(me);
602 changed |= send_begin(me);
603 } while (changed);
604}
605
606/*
607 * NOTE: Chris believes that this network layer is too aggressive at
608 * reading events out of the network.. so we are modifying the algorithm
609 * to only send events when tw_net_send it called, and only read events
610 * when tw_net_read is called.
611 */
612void
614{
615 service_queues(me);
616}
617
618void
620{
621 tw_pe * me = e->src_lp->pe;
622 int changed = 0;
623
624 e->state.remote = 0;
627
628 do
629 {
630 changed = test_q(&posted_sends, me, send_finish);
631 changed |= send_begin(me);
632 } while (changed);
633}
634
635void
637{
638 tw_pe *src_pe = e->src_lp->pe;
639
640 switch (e->state.owner) {
641 case TW_net_outq:
642 /* Cancelled before we could transmit it. Do not
643 * transmit the event and instead just release the
644 * buffer back into our own free list.
645 */
647 tw_event_free(src_pe, e);
648
649 return;
650
651 break;
652
653 case TW_net_asend:
654 /* Too late. We've already let MPI start to send
655 * this event over the network. We can't pull it
656 * back now without sending another message to do
657 * the cancel.
658 *
659 * Setting the cancel_q flag will signal us to do
660 * another message send once the current send of
661 * this message is completed.
662 */
663 e->state.cancel_asend = 1;
664 break;
665
666 case TW_pe_sevent_q:
667 /* Way late; the event was already sent and is in
668 * our sent event queue. Mark it as a cancel and
669 * place it at the front of the outq.
670 */
671 e->state.cancel_q = 1;
673 break;
674
675 default:
676 /* Huh? Where did you come from? Why are we being
677 * told about you? We did not send you so we cannot
678 * cancel you!
679 */
680 tw_error(
681 TW_LOC,
682 "Don't know how to cancel event owned by %u",
683 e->state.owner);
684 }
685
686 service_queues(src_pe);
687}
688
689/**
690 * tw_net_statistics
691 * @brief Function to output the statistics
692 * @attention Notice that the MPI_Reduce "count" parameter is greater than one.
693 * We are reducing on multiple variables *simultaneously* so if you change
694 * this function or the struct tw_statistics, you must update the other.
695 **/
698{
699 if(MPI_Reduce(&(s->s_max_run_time),
701 1,
702 MPI_DOUBLE,
703 MPI_MAX,
704 (int)g_tw_masternode,
705 MPI_COMM_ROSS) != MPI_SUCCESS)
706 tw_error(TW_LOC, "Unable to reduce statistics!");
707
708 if(MPI_Reduce(&(s->s_net_events),
709 &me->stats.s_net_events,
710 17,
711 MPI_UNSIGNED_LONG_LONG,
712 MPI_SUM,
713 (int)g_tw_masternode,
714 MPI_COMM_ROSS) != MPI_SUCCESS)
715 tw_error(TW_LOC, "Unable to reduce statistics!");
716
717 if(MPI_Reduce(&s->s_min_detected_offset,
719 1,
720 MPI_DOUBLE,
721 MPI_MIN,
722 (int)g_tw_masternode,
723 MPI_COMM_ROSS) != MPI_SUCCESS)
724 tw_error(TW_LOC, "Unable to reduce statistics!");
725
726 if(MPI_Reduce(&(s->s_total),
727 &me->stats.s_total,
728 16,
729 MPI_UNSIGNED_LONG_LONG,
730 MPI_MAX,
731 (int)g_tw_masternode,
732 MPI_COMM_ROSS) != MPI_SUCCESS)
733 tw_error(TW_LOC, "Unable to reduce statistics!");
734
735 if (MPI_Reduce(&s->s_events_past_end,
737 3,
738 MPI_UNSIGNED_LONG_LONG,
739 MPI_SUM,
740 (int)g_tw_masternode,
741 MPI_COMM_ROSS) != MPI_SUCCESS)
742 tw_error(TW_LOC, "Unable to reduce statistics!");
743
744#ifdef USE_RIO
745 if (MPI_Reduce(&s->s_rio_load,
746 &me->stats.s_rio_load,
747 1,
748 MPI_UNSIGNED_LONG_LONG,
749 MPI_MAX,
750 (int)g_tw_masternode,
751 MPI_COMM_ROSS) != MPI_SUCCESS)
752 tw_error(TW_LOC, "Unable to reduce statistics!");
753 if (MPI_Reduce(&s->s_rio_lp_init,
754 &me->stats.s_rio_lp_init,
755 1,
756 MPI_UNSIGNED_LONG_LONG,
757 MPI_MAX,
758 (int)g_tw_masternode,
759 MPI_COMM_ROSS) != MPI_SUCCESS)
760 tw_error(TW_LOC, "Unable to reduce statistics!");
761#endif
762
763 return &me->stats;
764}
static tw_clock tw_clock_read(void)
Definition aarch64.h:8
uint64_t tw_clock
Definition aarch64.h:6
static int tw_gvt_inprogress(tw_pe *pe)
void tw_hash_insert(void *h, tw_event *event, long pe)
tw_event * tw_hash_remove(void *h, tw_event *event, long pe)
tw_event_sig const * tw_net_minimum_sig_ptr(void)
Obtain the event signature for the lowest ordered event inside the network buffers.
unsigned tw_nnodes(void)
MPI_Comm MPI_COMM_ROSS
Definition network-mpi.c:4
void tw_pq_enqueue(tw_pq *, tw_event *)
Definition splay.c:245
unsigned long tw_peid
Definition ross-base.h:36
#define TW_STIME_MAX
Definition ross-base.h:45
#define TW_STIME_CMP(x, y)
Definition ross-base.h:43
double tw_stime
Definition ross-base.h:39
uint64_t tw_lpid
Definition ross-base.h:49
void * tw_calloc(const char *file, int line, const char *for_who, size_t e_sz, size_t n)
Definition tw-util.c:206
void tw_pe_init(void)
Definition tw-pe.c:32
tw_pe * g_tw_pe
Definition ross-global.c:79
static void tw_event_free(tw_pe *, tw_event *)
void tw_error(const char *file, int line, const char *fmt,...)
Definition tw-util.c:77
unsigned int g_tw_net_device_size
Definition ross-global.c:91
tw_peid g_tw_mynode
Definition ross-global.c:92
tw_peid g_tw_masternode
Definition ross-global.c:93
tw_synch g_tw_synchronization_protocol
Definition ross-global.c:19
#define TW_LOC
static tw_event * tw_event_grab(tw_pe *pe)
Definition ross-inline.h:11
static tw_lp * tw_getlocal_lp(tw_lpid gid)
static int tw_event_sig_compare_ptr(tw_event_sig const *e_sig, tw_event_sig const *n_sig)
Definition ross-types.h:512
tw_event_sig const g_tw_max_sig
static void tw_copy_event_sig(tw_event_sig *e, tw_event_sig const *sig)
Definition ross-types.h:493
@ OPTIMISTIC_REALTIME
Definition ross-types.h:41
@ SEQUENTIAL
Definition ross-types.h:37
@ CONSERVATIVE
Definition ross-types.h:38
@ OPTIMISTIC
Definition ross-types.h:39
@ OPTIMISTIC_DEBUG
Definition ross-types.h:40
@ NO_SYNCH
Definition ross-types.h:36
@ TW_pe_event_q
In a tw_pe.event_q list.
Definition ross-types.h:224
@ TW_pe_sevent_q
In tw_pe.sevent_q.
Definition ross-types.h:231
@ TW_net_asend
Network transmission in progress.
Definition ross-types.h:229
@ TW_net_acancel
Network transmission in progress.
Definition ross-types.h:230
@ TW_net_outq
Pending network transmission.
Definition ross-types.h:228
static tw_event * tw_eventq_peek(tw_eventq *q)
Definition tw-eventq.h:304
static void tw_eventq_delete_any(tw_eventq *q, tw_event *e)
Definition tw-eventq.h:408
static void tw_eventq_push(tw_eventq *q, tw_event *e)
Definition tw-eventq.h:281
static void tw_eventq_unshift(tw_eventq *q, tw_event *e)
Definition tw-eventq.h:344
static tw_event * tw_eventq_pop(tw_eventq *q)
Definition tw-eventq.h:313
#define TWOPT_UINT(n, v, h)
Definition tw-opts.h:33
#define TWOPT_GROUP(h)
Definition tw-opts.h:30
#define TWOPT_END()
Definition tw-opts.h:39
static void send_finish(tw_pe *me, tw_event *e, char *buffer)
Determines how to handle the buffer of event whose send operation just finished.
static void recv_finish(tw_pe *me, tw_event *e, char *buffer)
Determines how to handle the newly received event.
#define EVENT_TAG
Definition network-mpi.c:27
static struct act_q posted_recvs
Definition network-mpi.c:32
void tw_net_read(tw_pe *me)
starts service_queues() to poll network
unsigned int tw_nnodes(void)
int custom_communicator
Definition network-mpi.c:5
void tw_net_abort(void)
static int test_q(struct act_q *q, tw_pe *me, void(*finish)(tw_pe *, tw_event *, char *))
Calls MPI_Testsome on the provided queue, to check for finished operations.
static int send_begin(tw_pe *me)
If there are any openings in the posted_sends queue, start sends for events in the outgoing queue.
#define EVENT_SIZE(e)
Definition network-mpi.c:29
void tw_net_start(void)
Starts the network library after option parsing.
tw_statistics * tw_net_statistics(tw_pe *me, tw_statistics *s)
Function to output the statistics.
void tw_net_cancel(tw_event *e)
Cancel the given remote event by either removing from the outq or sending an antimessage,...
void tw_net_stop(void)
Stops the network library after simulation end.
static unsigned int read_buffer
Definition network-mpi.c:35
static void service_queues(tw_pe *me)
Start checks for finished operations in send/recv queues, and post new sends/recvs if possible.
static struct act_q posted_sends
Definition network-mpi.c:31
static int recv_begin(tw_pe *me)
If there are any openings in the posted_recvs queue, post more Irecvs.
static void init_q(struct act_q *q, const char *name, unsigned int size)
Initializes queues used for posted sends and receives.
Definition network-mpi.c:93
static tw_eventq outq
Definition network-mpi.c:33
const tw_optdef * tw_net_init(int *argc, char ***argv)
Initalize the network library and parse options.
Definition network-mpi.c:66
tw_stime tw_net_minimum(void)
Obtain the lowest timestamp inside the network buffers.
static unsigned int send_buffer
Definition network-mpi.c:36
static int world_size
Definition network-mpi.c:37
void tw_net_send(tw_event *e)
Adds the event to the outgoing queue of events to be sent, polls for finished sends,...
static const tw_optdef mpi_opts[]
Definition network-mpi.c:39
void tw_net_barrier(void)
void tw_comm_set(MPI_Comm comm)
Setup the MPI_COMM_ROSS communicator to use instead of MPI_COMM_WORLD.
Definition network-mpi.c:59
Keeps track of posted send or recv operations.
Definition network-mpi.c:17
MPI_Status * status_list
Definition network-mpi.c:23
tw_event ** event_list
Definition network-mpi.c:20
const char * name
Definition network-mpi.c:18
int * idx_list
Definition network-mpi.c:22
MPI_Request * req_list
Definition network-mpi.c:21
unsigned int cur
Definition network-mpi.c:24
unsigned int s_nsend_network
unsigned int s_nread_network
unsigned int s_nread_network
unsigned int s_nsend_network
tw_stime recv_ts
Definition ross-types.h:260
Event Stucture.
Definition ross-types.h:277
struct tw_event::@130070134144252114152124341363102114315067064025 state
tw_event * cause_next
Next in parent's caused_by_me chain.
Definition ross-types.h:289
tw_lp * src_lp
Sending LP ID.
Definition ross-types.h:313
tw_stime recv_ts
Actual time to be received.
Definition ross-types.h:314
unsigned char remote
Indicates union addr is in 'remote' storage.
Definition ross-types.h:303
unsigned char owner
Owner of the next/prev pointers; see tw_event_owner.
Definition ross-types.h:300
tw_lpid send_lp
sending LP ID for data collection uses
Definition ross-types.h:317
tw_event * caused_by_me
Start of event list caused by this event.
Definition ross-types.h:288
tw_event * next
Definition ross-types.h:278
unsigned char cancel_asend
Definition ross-types.h:302
tw_lp * dest_lp
Destination LP ID.
Definition ross-types.h:312
tw_peid send_pe
Definition ross-types.h:316
tw_event * cancel_next
Next event in the cancel queue for the dest_pe.
Definition ross-types.h:287
tw_event_sig sig
Event signature, to be used by tiebreaker.
Definition ross-types.h:294
unsigned char cancel_q
Actively on a dest_lp->pe's cancel_q.
Definition ross-types.h:301
struct st_kp_stats * kp_stats
Definition ross-types.h:407
tw_event_sig last_sig
Event signature of the current event being processed.
Definition ross-types.h:398
tw_stime last_time
Time of the current event being processed.
Definition ross-types.h:401
tw_pe * pe
Definition ross-types.h:340
tw_kp * kp
kp – Kernel process that we belong to (must match pe).
Definition ross-types.h:345
tw_lpid gid
global LP id
Definition ross-types.h:338
tw_lptype * type
Type of this LP, including service callbacks.
Definition ross-types.h:348
struct st_lp_stats * lp_stats
Definition ross-types.h:356
map_f map
LP Mapping of LP gid -> remote PE routine.
Definition ross-types.h:104
Holds the entire PE state.
Definition ross-types.h:416
void * hash_t
Array of incoming events from remote pes, Note: only necessary for distributed DSR.
Definition ross-types.h:466
tw_eventq event_q
Linked list of events sent to this PE.
Definition ross-types.h:420
tw_pq * pq
Priority queue used to sort events.
Definition ross-types.h:422
tw_event_sig trans_msg_sig
Last transient messages' time signature.
Definition ross-types.h:444
tw_event * cancel_q
List of canceled events.
Definition ross-types.h:421
tw_stime GVT
Global Virtual Time.
Definition ross-types.h:451
tw_stime trans_msg_ts
Last transient messages' time stamp.
Definition ross-types.h:450
tw_peid id
Definition ross-types.h:417
long long s_nwhite_recv
Definition ross-types.h:457
tw_event * abort_event
Placeholder event for when free_q is empty.
Definition ross-types.h:425
tw_statistics stats
per PE counters
Definition ross-types.h:463
tw_event_sig GVT_sig
Global Virtual Time Signature.
Definition ross-types.h:445
long long s_nwhite_sent
Definition ross-types.h:456
Statistics tallied over the duration of the simulation.
Definition ross-types.h:117
double s_min_detected_offset
Definition ross-types.h:142
tw_clock s_pq
Definition ross-types.h:153
tw_stat s_nread_network
Definition ross-types.h:132
tw_clock s_total
Definition ross-types.h:144
tw_stat s_net_events
Definition ross-types.h:120
tw_stat s_nsend_network
Definition ross-types.h:131
double s_max_run_time
Definition ross-types.h:118
tw_stat s_events_past_end
Definition ross-types.h:164