ROSS
tw-sched.c
Go to the documentation of this file.
1 #include <ross.h>
2 
3 /**
4  * \brief Reset the event bitfield prior to entering the event handler
5  * post-reverse - reset the bitfield so that a potential re-running of the
6  * event is presented with a consistent bitfield state
7  * NOTE: the size checks are to better support the experimental reverse
8  * computation compiler, which can use a larger bitfield.
9  * Courtesy of John P Jenkins
10  */
11 static inline void reset_bitfields(tw_event *revent)
12 {
13  memset(&revent->cv, 0, sizeof(revent->cv));
14 }
15 
16 /**
17  * Get all events out of my event queue and spin them out into
18  * the priority queue so they can be processed in time stamp
19  * order.
20  */
21 static void tw_sched_event_q(tw_pe * me) {
22  tw_clock start;
23  tw_kp *dest_kp;
24  tw_event *cev;
25  tw_event *nev;
26 
27  while (me->event_q.size) {
28  cev = tw_eventq_pop_list(&me->event_q);
29 
30  for (; cev; cev = nev) {
31  nev = cev->next;
32 
33  if(!cev->state.owner || cev->state.owner == TW_pe_free_q) {
34  tw_error(TW_LOC, "no owner!");
35  }
36  if (cev->state.cancel_q) {
37  cev->state.owner = TW_pe_anti_msg;
38  cev->next = cev->prev = NULL;
39  continue;
40  }
41 
42  switch (cev->state.owner) {
43  case TW_pe_event_q:
44  dest_kp = cev->dest_lp->kp;
45 
46  if (TW_STIME_CMP(dest_kp->last_time, cev->recv_ts) > 0) {
47  /* cev is a straggler message which has arrived
48  * after we processed events occuring after it.
49  * We need to jump back to before cev's timestamp.
50  */
51  start = tw_clock_read();
52  tw_kp_rollback_to(dest_kp, cev->recv_ts);
53  me->stats.s_rollback += tw_clock_read() - start;
54  if (g_st_ev_trace == RB_TRACE)
55  st_collect_event_data(cev, (double)start / g_tw_clock_rate);
56  }
57  start = tw_clock_read();
58  tw_pq_enqueue(me->pq, cev);
59  me->stats.s_pq += tw_clock_read() - start;
60  break;
61 
62  default:
63  tw_error(TW_LOC, "Event in event_q, but owner %d not recognized", cev->state.owner);
64  }
65  }
66  }
67 }
68 
69 /**
70  * OPT: need to link events into canq in reverse order so
71  * that when we rollback the 1st event, we should not
72  * need to do any further rollbacks.
73  */
74 static void tw_sched_cancel_q(tw_pe * me) {
75  tw_clock start=0, pq_start;
76  tw_event *cev;
77  tw_event *nev;
78 
79  start = tw_clock_read();
80  while (me->cancel_q) {
81  cev = me->cancel_q;
82  me->cancel_q = NULL;
83 
84  for (; cev; cev = nev) {
85  nev = cev->cancel_next;
86 
87  if (!cev->state.cancel_q) {
88  tw_error(TW_LOC, "No cancel_q bit on event in cancel_q");
89  }
90 
91  if(!cev->state.owner || cev->state.owner == TW_pe_free_q) {
92  tw_error(TW_LOC, "Cancelled event, no owner!");
93  }
94 
95  switch (cev->state.owner) {
96  case TW_pe_event_q:
97  /* This event hasn't been added to our pq yet and we
98  * have not officially received it yet either. We'll
99  * do the actual free of this event when we receive it
100  * as we spin out the event_q chain.
101  */
102  tw_eventq_delete_any(&me->event_q, cev);
103 
104  tw_event_free(me, cev);
105  break;
106 
107  case TW_pe_anti_msg:
108  tw_event_free(me, cev);
109  break;
110 
111  case TW_pe_pq:
112  /* Event was not cancelled directly from the event_q
113  * because the cancel message came after we popped it
114  * out of that queue but before we could process it.
115  */
116  pq_start = tw_clock_read();
117  tw_pq_delete_any(me->pq, cev);
118  me->stats.s_pq += tw_clock_read() - pq_start;
119  tw_event_free(me, cev);
120  break;
121 
122  case TW_kp_pevent_q:
123  /* The event was already processed.
124  * SECONDARY ROLLBACK
125  */
127  tw_event_free(me, cev);
128  break;
129 
130  default:
131  tw_error(TW_LOC, "Event in cancel_q, but owner %d not recognized", cev->state.owner);
132  }
133  }
134  }
135 
136  me->stats.s_cancel_q += tw_clock_read() - start;
137 }
138 
139 static void tw_sched_batch(tw_pe * me) {
140  /* Number of consecutive times we gave up because there were no free event buffers. */
141  static int no_free_event_buffers = 0;
142  static int warned_no_free_event_buffers = 0;
143  const int max_alloc_fail_count = 20;
144 
145  tw_clock start, pq_start;
146  unsigned int msg_i;
147 
148  /* Process g_tw_mblock events, or until the PQ is empty
149  * (whichever comes first).
150  */
151  for (msg_i = g_tw_mblock; msg_i; msg_i--) {
152  tw_event *cev;
153  tw_lp *clp;
154  tw_kp *ckp;
155 
156  /* OUT OF FREE EVENT BUFFERS. BAD.
157  * Go do fossil collect immediately.
158  */
159  if (me->free_q.size <= g_tw_gvt_threshold) {
160  /* Suggested by Adam Crume */
161  if (++no_free_event_buffers > 10) {
162  if (!warned_no_free_event_buffers) {
163  fprintf(stderr, "WARNING: No free event buffers. Try increasing memory via the --extramem option.\n");
164  warned_no_free_event_buffers = 1;
165  }
166  if (no_free_event_buffers >= max_alloc_fail_count) {
167  tw_error(TW_LOC, "Event allocation failed %d consecutive times. Exiting.", max_alloc_fail_count);
168  }
169  }
171  break;
172  }
173  no_free_event_buffers = 0;
174 
175  start = tw_clock_read();
176  if (!(cev = tw_pq_dequeue(me->pq))) {
177  break;
178  }
179  me->stats.s_pq += tw_clock_read() - start;
180  if(TW_STIME_CMP(cev->recv_ts, tw_pq_minimum(me->pq)) == 0) {
181  me->stats.s_pe_event_ties++;
182  }
183 
184  clp = cev->dest_lp;
185 
186  ckp = clp->kp;
187  me->cur_event = cev;
188  ckp->last_time = cev->recv_ts;
189 
190  /* Save state if no reverse computation is available */
191  if (!clp->type->revent) {
192  tw_error(TW_LOC, "Reverse Computation must be implemented!");
193  }
194 
195  start = tw_clock_read();
196  reset_bitfields(cev);
197 
198  // if NOT A SUSPENDED LP THEN FORWARD PROC EVENTS
199  if( !(clp->suspend_flag) )
200  {
201  // state-save and update the LP's critical path
202  unsigned int prev_cp = clp->critical_path;
203  clp->critical_path = ROSS_MAX(clp->critical_path, cev->critical_path)+1;
204  (*clp->type->event)(clp->cur_state, &cev->cv,
205  tw_event_data(cev), clp);
206  if (g_st_ev_trace == FULL_TRACE)
208  cev->critical_path = prev_cp;
209  }
210  ckp->s_nevent_processed++;
211  // instrumentation
214  me->stats.s_event_process += tw_clock_read() - start;
215 
216  /* We ran out of events while processing this event. We
217  * cannot continue without doing GVT and fossil collect.
218  */
219 
220  if (me->cev_abort)
221  {
222  start = tw_clock_read();
223  me->stats.s_nevent_abort++;
224  // instrumentation
225  ckp->kp_stats->s_nevent_abort++;
226  clp->lp_stats->s_nevent_abort++;
227  me->cev_abort = 0;
228 
229  tw_event_rollback(cev);
230  pq_start = tw_clock_read();
231  tw_pq_enqueue(me->pq, cev);
232  me->stats.s_pq += tw_clock_read() - pq_start;
233 
234  cev = tw_eventq_peek(&ckp->pevent_q);
235  ckp->last_time = cev ? cev->recv_ts : me->GVT;
236 
238 
239  me->stats.s_event_abort += tw_clock_read() - start;
240 
241 
242  break;
243  } // END ABORT CHECK
244 
245  /* Thread current event into processed queue of kp */
246  cev->state.owner = TW_kp_pevent_q;
247  tw_eventq_unshift(&ckp->pevent_q, cev);
248 
249  if(g_st_rt_sampling &&
251  {
252  tw_clock current_rt = tw_clock_read();
253 #ifdef USE_DAMARIS
255  {
256  if (g_st_damaris_enabled)
257  st_damaris_expose_data(me, me->GVT, RT_COL);
258  else
260  }
261 #else
265  st_collect_model_data(me, ((double) current_rt) / g_tw_clock_rate, RT_STATS);
266 #endif
268  }
269 
270  }
271 }
272 
273 static void tw_sched_batch_realtime(tw_pe * me) {
274  /* Number of consecutive times we gave up because there were no free event buffers. */
275  static int no_free_event_buffers = 0;
276  static int warned_no_free_event_buffers = 0;
277  const int max_alloc_fail_count = 20;
278 
279  tw_clock start, pq_start;
280  unsigned int msg_i;
281 
282  /* Process g_tw_mblock events, or until the PQ is empty
283  * (whichever comes first).
284  */
285  for (msg_i = g_tw_mblock; msg_i; msg_i--) {
286  tw_event *cev;
287  tw_lp *clp;
288  tw_kp *ckp;
289 
290  /* OUT OF FREE EVENT BUFFERS. BAD.
291  * Go do fossil collect immediately.
292  */
293  if (me->free_q.size <= g_tw_gvt_threshold) {
294  /* Suggested by Adam Crume */
295  if (++no_free_event_buffers > 10) {
296  if (!warned_no_free_event_buffers) {
297  fprintf(stderr, "WARNING: No free event buffers. Try increasing memory via the --extramem option.\n");
298  warned_no_free_event_buffers = 1;
299  }
300  if (no_free_event_buffers >= max_alloc_fail_count) {
301  tw_error(TW_LOC, "Event allocation failed %d consecutive times. Exiting.", max_alloc_fail_count);
302  }
303  }
305  break;
306  }
307  no_free_event_buffers = 0;
308 
309  start = tw_clock_read();
310  if (!(cev = tw_pq_dequeue(me->pq))) {
311  break; // leave the batch function
312  }
313  me->stats.s_pq += tw_clock_read() - start;
314  if(TW_STIME_CMP(cev->recv_ts, tw_pq_minimum(me->pq)) == 0) {
315  me->stats.s_pe_event_ties++;
316  }
317 
318  clp = cev->dest_lp;
319 
320  ckp = clp->kp;
321  me->cur_event = cev;
322  ckp->last_time = cev->recv_ts;
323 
324  /* Save state if no reverse computation is available */
325  if (!clp->type->revent) {
326  tw_error(TW_LOC, "Reverse Computation must be implemented!");
327  }
328 
329  start = tw_clock_read();
330 
331  reset_bitfields(cev);
332 
333  // if NOT A SUSPENDED LP THEN FORWARD PROC EVENTS
334  if( !(clp->suspend_flag) )
335  {
336  // state-save and update the LP's critical path
337  unsigned int prev_cp = clp->critical_path;
338  clp->critical_path = ROSS_MAX(clp->critical_path, cev->critical_path)+1;
339  (*clp->type->event)(clp->cur_state, &cev->cv,
340  tw_event_data(cev), clp);
341  if (g_st_ev_trace == FULL_TRACE)
343  cev->critical_path = prev_cp;
344  }
345  ckp->s_nevent_processed++;
346  // instrumentation
349  me->stats.s_event_process += tw_clock_read() - start;
350 
351  /* We ran out of events while processing this event. We
352  * cannot continue without doing GVT and fossil collect.
353  */
354 
355  if (me->cev_abort)
356  {
357  start = tw_clock_read();
358  me->stats.s_nevent_abort++;
359  // instrumentation
360  ckp->kp_stats->s_nevent_abort++;
361  clp->lp_stats->s_nevent_abort++;
362  me->cev_abort = 0;
363 
364  tw_event_rollback(cev);
365  pq_start = tw_clock_read();
366  tw_pq_enqueue(me->pq, cev);
367  me->stats.s_pq += tw_clock_read() - pq_start;
368 
369  cev = tw_eventq_peek(&ckp->pevent_q);
370  ckp->last_time = cev ? cev->recv_ts : me->GVT;
371 
373 
374  me->stats.s_event_abort += tw_clock_read() - start;
375 
376  break; // leave the batch function
377  } // END ABORT CHECK
378 
379  /* Thread current event into processed queue of kp */
380  cev->state.owner = TW_kp_pevent_q;
381  tw_eventq_unshift(&ckp->pevent_q, cev);
382 
383  /* Check if realtime GVT time interval has expired */
385  {
387  break; // leave the batch function
388  }
389 
390  if(g_st_rt_sampling &&
392  {
393  tw_clock current_rt = tw_clock_read();
397  st_collect_model_data(me, ((double)current_rt) / g_tw_clock_rate, RT_STATS);
398 
400  }
401  }
402 }
403 
404 void tw_sched_init(tw_pe * me) {
405  /* First Stage Init */
406  (*me->type.pre_lp_init)(me);
407  tw_init_kps(me);
408  tw_init_lps(me);
409  (*me->type.post_lp_init)(me);
410 
411  tw_net_barrier();
412 
413  /* Second Stage Init -- all LPs are created and have proper mappings */
414  tw_pre_run_lps(me);
415  tw_net_barrier();
416 
417 #ifdef USE_RIO
418  tw_clock start = tw_clock_read();
419  io_load_events(me);
420  me->stats.s_rio_load += (tw_clock_read() - start);
421  tw_net_barrier();
422 #endif
423 
424  /*
425  * Recv all of the startup events out of the network before
426  * starting simulation.. at this point, all LPs are done with init.
427  */
428  if (tw_nnodes() > 1) {
429  tw_net_read(me);
430  tw_net_barrier();
431  tw_clock_init(me);
432  }
433 
434  /* This lets the signal handler know that we have started
435  * the scheduler loop, and to print out the stats before
436  * finishing if someone should type CTRL-c
437  */
438  g_tw_sim_started = 1;
439 }
440 
441 /*************************************************************************/
442 /* Primary Schedulers -- In order: Sequential, Conservative, Optimistic */
443 /*************************************************************************/
444 
446  tw_stime gvt = TW_STIME_CRT(0.0);
447 
448  if(tw_nnodes() > 1) {
449  tw_error(TW_LOC, "Sequential Scheduler used for world size greater than 1.");
450  }
451 
452  tw_event *cev;
453 
454  printf("*** START SEQUENTIAL SIMULATION ***\n\n");
455 
456  tw_wall_now(&me->start_time);
457  me->stats.s_total = tw_clock_read();
458 
459  while ((cev = tw_pq_dequeue(me->pq))) {
460  tw_lp *clp = cev->dest_lp;
461  tw_kp *ckp = clp->kp;
462 
463  me->cur_event = cev;
464  ckp->last_time = cev->recv_ts;
465 
466  if(TW_STIME_CMP(cev->recv_ts, tw_pq_minimum(me->pq)) == 0) {
467  me->stats.s_pe_event_ties++;
468  }
469 
470  gvt = cev->recv_ts;
472  gvt_print(gvt);
473  }
474 
475  reset_bitfields(cev);
476  clp->critical_path = ROSS_MAX(clp->critical_path, cev->critical_path)+1;
477  (*clp->type->event)(clp->cur_state, &cev->cv, tw_event_data(cev), clp);
478  if (g_st_ev_trace == FULL_TRACE)
480  if (*clp->type->commit) {
481  (*clp->type->commit)(clp->cur_state, &cev->cv, tw_event_data(cev), clp);
482  }
483 
484  if (me->cev_abort){
485  tw_error(TW_LOC, "insufficient event memory");
486  }
487 
488  ckp->s_nevent_processed++;
489  // instrumentation
492  tw_event_free(me, cev);
493 
494  if(g_st_rt_sampling &&
496  {
497  tw_clock current_rt = tw_clock_read();
499  st_collect_model_data(me, ((double)current_rt) / g_tw_clock_rate, RT_STATS);
500 
502  }
503  }
504  tw_wall_now(&me->end_time);
505  me->stats.s_total = tw_clock_read() - me->stats.s_total;
506 
507  printf("*** END SIMULATION ***\n\n");
508 
509  tw_stats(me);
510 
511  (*me->type.final)(me);
512 }
513 
515  tw_clock start;
516  unsigned int msg_i;
517 
518  if (g_tw_mynode == g_tw_masternode) {
519  printf("*** START PARALLEL CONSERVATIVE SIMULATION ***\n\n");
520  }
521 
522  tw_wall_now(&me->start_time);
523  me->stats.s_total = tw_clock_read();
524 
525  for (;;){
526  if (tw_nnodes() > 1){
527  start = tw_clock_read();
528  tw_net_read(me);
529  me->stats.s_net_read += tw_clock_read() - start;
530  }
531 
532  tw_gvt_step1(me);
533  tw_sched_event_q(me);
534  tw_gvt_step2(me);
535 
536  if (TW_STIME_DBL(me->GVT) > g_tw_ts_end) {
537  break;
538  }
539 
540  // put "batch" loop directly here
541  /* Process g_tw_mblock events, or until the PQ is empty
542  * (whichever comes first).
543  */
544  for (msg_i = g_tw_mblock; msg_i; msg_i--) {
545  tw_event *cev;
546  tw_lp *clp;
547  tw_kp *ckp;
548 
549  /* OUT OF FREE EVENT BUFFERS. BAD.
550  * Go do fossil collect immediately.
551  */
552  if (me->free_q.size <= g_tw_gvt_threshold) {
554  break;
555  }
556 
558  break;
559  }
560 
561  start = tw_clock_read();
562  if (!(cev = tw_pq_dequeue(me->pq))) {
563  break;
564  }
565  me->stats.s_pq += tw_clock_read() - start;
566  if(TW_STIME_CMP(cev->recv_ts, tw_pq_minimum(me->pq)) == 0) {
567  me->stats.s_pe_event_ties++;
568  }
569 
570  clp = cev->dest_lp;
571  ckp = clp->kp;
572  me->cur_event = cev;
573  if( TW_STIME_CMP(ckp->last_time, cev->recv_ts) > 0 ){
574  tw_error(TW_LOC, "Found KP last time %lf > current event time %lf for LP %d, PE %lu"
575  "src LP %lu, src PE %lu",
576  ckp->last_time, cev->recv_ts, clp->gid, clp->pe->id,
577  cev->send_lp, cev->send_pe);
578  }
579  ckp->last_time = cev->recv_ts;
580 
581  start = tw_clock_read();
582  reset_bitfields(cev);
583  clp->critical_path = ROSS_MAX(clp->critical_path, cev->critical_path)+1;
584  (*clp->type->event)(clp->cur_state, &cev->cv, tw_event_data(cev), clp);
585  if (g_st_ev_trace == FULL_TRACE)
587  if (*clp->type->commit) {
588  (*clp->type->commit)(clp->cur_state, &cev->cv, tw_event_data(cev), clp);
589  }
590 
591  ckp->s_nevent_processed++;
592  // instrumentation
595  me->stats.s_event_process += tw_clock_read() - start;
596 
597  if (me->cev_abort) {
598  tw_error(TW_LOC, "insufficient event memory");
599  }
600 
601  tw_event_free(me, cev);
602 
603  if(g_st_rt_sampling &&
605  {
606  tw_clock current_rt = tw_clock_read();
610  st_collect_model_data(me, ((double)current_rt) / g_tw_clock_rate, RT_STATS);
611 
613  }
614  }
615  }
616 
617  tw_wall_now(&me->end_time);
618  me->stats.s_total = tw_clock_read() - me->stats.s_total;
619 
620  if (g_tw_mynode == g_tw_masternode) {
621  printf("*** END SIMULATION ***\n\n");
622  }
623 
624  tw_net_barrier();
625 
626  // call the model PE finalize function
627  (*me->type.final)(me);
628 
629  st_inst_finalize(me);
630 
631  tw_stats(me);
632 }
633 
635  tw_clock start;
636 
637  if (g_tw_mynode == g_tw_masternode) {
638  printf("*** START PARALLEL OPTIMISTIC SIMULATION WITH SUSPEND LP FEATURE ***\n\n");
639  }
640 
641  tw_wall_now(&me->start_time);
642  me->stats.s_total = tw_clock_read();
643 
644  for (;;) {
645  if (tw_nnodes() > 1) {
646  start = tw_clock_read();
647  tw_net_read(me);
648  me->stats.s_net_read += tw_clock_read() - start;
649  }
650 
651  tw_gvt_step1(me);
652  tw_sched_event_q(me);
653  tw_sched_cancel_q(me);
654  tw_gvt_step2(me);
655 
656  if (TW_STIME_DBL(me->GVT) > g_tw_ts_end) {
657  break;
658  }
659 
660  tw_sched_batch(me);
661  }
662 
663  tw_wall_now(&me->end_time);
664  me->stats.s_total = tw_clock_read() - me->stats.s_total;
665 
666  tw_net_barrier();
667 
668  if (g_tw_mynode == g_tw_masternode) {
669  printf("*** END SIMULATION ***\n\n");
670  }
671 
672  // call the model PE finalize function
673  (*me->type.final)(me);
674 
675  st_inst_finalize(me);
676 
677  tw_stats(me);
678 }
679 
681  tw_clock start;
682 
684 
685  if (g_tw_mynode == g_tw_masternode) {
686  printf("*** START PARALLEL OPTIMISTIC SIMULATION WITH SUSPEND LP FEATURE AND REAL TIME GVT ***\n\n");
687  }
688 
689  tw_wall_now(&me->start_time);
690  me->stats.s_total = tw_clock_read();
691 
692  // init the realtime GVT
694 
695  for (;;) {
696  if (tw_nnodes() > 1) {
697  start = tw_clock_read();
698  tw_net_read(me);
699  me->stats.s_net_read += tw_clock_read() - start;
700  }
701 
703  tw_sched_event_q(me);
704  tw_sched_cancel_q(me);
705  tw_gvt_step2(me); // use regular step2 at this point
706 
707  if (TW_STIME_DBL(me->GVT) > g_tw_ts_end) {
708  break;
709  }
710 
712  }
713 
714  tw_wall_now(&me->end_time);
715  me->stats.s_total = tw_clock_read() - me->stats.s_total;
716 
717  tw_net_barrier();
718 
719  if (g_tw_mynode == g_tw_masternode) {
720  printf("*** END SIMULATION ***\n\n");
721  }
722 
723  // call the model PE finalize function
724  (*me->type.final)(me);
725 
726  st_inst_finalize(me);
727 
728  tw_stats(me);
729 }
730 
731 double g_tw_rollback_time = 0.000000001;
732 
734  tw_event *cev=NULL;
735 
736  if(tw_nnodes() > 1) {
737  tw_error(TW_LOC, "Sequential Scheduler used for world size greater than 1.");
738  }
739 
740  printf("/***************************************************************************/\n");
741  printf("/***** WARNING: Starting Optimistic Debug Scheduler!! **********************/\n");
742  printf("This schedule assumes the following: \n");
743  printf(" 1) One 1 Processor/Core is used.\n");
744  printf(" 2) One 1 KP is used.\n");
745  printf(" NOTE: use the --nkp=1 argument to the simulation to ensure that\n");
746  printf(" it only uses 1 KP.\n");
747  printf(" 3) Events ARE NEVER RECLAIMED (LP Commit Functions are not called).\n");
748  printf(" 4) Executes til out of memory (16 events left) and \n injects rollback to first before primodal init event.\n");
749  printf(" 5) g_tw_rollback_time = %13.12lf \n", g_tw_rollback_time);
750  printf("/***************************************************************************/\n");
751 
752  if( g_tw_nkp > 1 ) {
753  tw_error(TW_LOC, "Number of KPs is greater than 1.");
754  }
755 
756  tw_wall_now(&me->start_time);
757 
758  while ((cev = tw_pq_dequeue(me->pq))) {
759  tw_lp *clp = cev->dest_lp;
760  tw_kp *ckp = clp->kp;
761 
762  me->cur_event = cev;
763  ckp->last_time = cev->recv_ts;
764 
765  /* don't update GVT */
766  reset_bitfields(cev);
767 
768  // state-save and update the LP's critical path
769  unsigned int prev_cp = clp->critical_path;
770  clp->critical_path = ROSS_MAX(clp->critical_path, cev->critical_path)+1;
771  (*clp->type->event)(clp->cur_state, &cev->cv, tw_event_data(cev), clp);
772  cev->critical_path = prev_cp;
773 
774  ckp->s_nevent_processed++;
775 
776  /* Thread current event into processed queue of kp */
777  cev->state.owner = TW_kp_pevent_q;
778  tw_eventq_unshift(&ckp->pevent_q, cev);
779 
780  /* stop when we have 1024 events left */
781  if ( me->free_q.size <= 1024) {
782  break;
783  }
784  }
785 
786  // If we've run out of free events or events to process (maybe we're past end time?)
787  // Perform all the rollbacks!
788  printf("/******************* Starting Rollback Phase ******************************/\n");
790  printf("/******************* Completed Rollback Phase ******************************/\n");
791 
792  tw_wall_now(&me->end_time);
793 
794  printf("*** END SIMULATION ***\n\n");
795 
796  tw_stats(me);
797 
798  (*me->type.final)(me);
799 }
#define TW_LOC
Definition: ross-extern.h:164
tw_clock g_st_rt_interval
tw_clock s_event_process
Definition: ross-types.h:142
unsigned long long g_tw_clock_rate
Definition: ross-global.c:98
tw_lp * dest_lp
Destination LP ID.
Definition: ross-types.h:280
static void reset_bitfields(tw_event *revent)
Reset the event bitfield prior to entering the event handler post-reverse - reset the bitfield so tha...
Definition: tw-sched.c:11
tw_eventq event_q
Linked list of events sent to this PE.
Definition: ross-types.h:379
void tw_clock_init(tw_pe *me)
Definition: aarch64.c:28
double tw_stime
Definition: ross.h:150
tw_stime tw_pq_minimum(splay_tree *pq)
Definition: splay.c:345
revent_f revent
LP Reverse event handler routine.
Definition: ross-types.h:91
#define TW_STIME_CRT(x)
Definition: ross.h:152
tw_clock s_pq
Definition: ross-types.h:143
void tw_init_kps(tw_pe *me)
Definition: tw-kp.c:131
double g_tw_ts_end
Definition: ross-global.c:68
struct st_lp_stats * lp_stats
Definition: ross-types.h:323
unsigned int critical_path
Critical path value for this LP.
Definition: ross-types.h:319
void tw_error(const char *file, int line, const char *fmt,...) NORETURN
Definition: tw-util.c:74
tw_lptype * type
Type of this LP, including service callbacks.
Definition: ross-types.h:316
struct st_kp_stats * kp_stats
Definition: ross-types.h:366
tw_stat s_nevent_abort
Definition: ross-types.h:112
tw_statistics stats
per PE counters
Definition: ross-types.h:415
static tw_event * tw_eventq_peek(tw_eventq *q)
Definition: tw-eventq.h:280
unsigned int g_tw_sim_started
Definition: ross-global.c:32
pe_init_f pre_lp_init
PE initialization routine, before LP init.
Definition: ross-types.h:60
unsigned int g_tw_gvt_threshold
Definition: ross-global.c:80
pe_final_f final
PE finilization routine.
Definition: ross-types.h:63
tw_stime recv_ts
Actual time to be received.
Definition: ross-types.h:282
Anti-message.
Definition: ross-types.h:217
unsigned int s_nevent_abort
tw_clock g_st_rt_samp_start_cycles
static void tw_eventq_delete_any(tw_eventq *q, tw_event *e)
Definition: tw-eventq.h:384
void tw_scheduler_sequential(tw_pe *me)
Definition: tw-sched.c:445
void tw_net_read(tw_pe *me)
starts service_queues() to poll network
Definition: network-mpi.c:572
tw_event * cancel_next
Next event in the cancel queue for the dest_pe.
Definition: ross-types.h:260
static tw_clock tw_clock_read(void)
Definition: aarch64.h:6
static void tw_event_free(tw_pe *, tw_event *)
int g_st_engine_stats
void tw_gvt_force_update_realtime(void)
Definition: mpi_allreduce.c:46
void st_collect_model_data(tw_pe *pe, double current_rt, int stats_type)
Definition: st-model-data.c:45
static void tw_sched_batch(tw_pe *me)
Definition: tw-sched.c:139
void tw_kp_rollback_to(tw_kp *kp, tw_stime to)
Definition: tw-kp.c:23
void st_collect_event_data(tw_event *cev, double recv_rt)
Definition: st-event-trace.c:9
Holds the entire PE state.
Definition: ross-types.h:375
unsigned int suspend_flag
Definition: ross-types.h:331
unsigned int s_nevent_processed
tw_eventq pevent_q
Events processed by LPs bound to this KP.
Definition: ross-types.h:359
tw_event * cancel_q
List of canceled events.
Definition: ross-types.h:380
pe_init_f post_lp_init
PE initialization routine, after LP init.
Definition: ross-types.h:61
tw_bf cv
Used by app during reverse computation.
Definition: ross-types.h:274
unsigned int tw_nnodes(void)
Definition: network-mpi.c:103
In a tw_kp.pevent_q.
Definition: ross-types.h:216
void tw_net_barrier(void)
Definition: network-mpi.c:175
tw_petype type
Model defined PE type routines.
Definition: ross-types.h:377
void tw_gvt_step1(tw_pe *me)
Definition: mpi_allreduce.c:69
unsigned long long g_tw_gvt_realtime_interval
Definition: ross-global.c:65
double g_tw_rollback_time
Definition: tw-sched.c:731
tw_event * prev
Definition: ross-types.h:252
tw_eventq free_q
Linked list of free tw_events.
Definition: ross-types.h:383
tw_kp * kp
kp – Kernel process that we belong to (must match pe).
Definition: ross-types.h:313
void tw_stats(tw_pe *me)
Definition: tw-stats.c:125
Event Stucture.
Definition: ross-types.h:250
void tw_pq_delete_any(splay_tree *st, tw_event *r)
Definition: splay.c:288
tw_event * tw_pq_dequeue(splay_tree *st)
Definition: splay.c:245
tw_lpid gid
global LP id
Definition: ross-types.h:306
#define TW_STIME_CMP(x, y)
Definition: ross.h:154
unsigned int g_tw_gvt_interval
Definition: ross-global.c:63
unsigned int critical_path
Critical path of this event.
Definition: ross-types.h:278
tw_event * next
Definition: ross-types.h:251
tw_event * cur_event
Current event being processed.
Definition: ross-types.h:385
unsigned int g_tw_mblock
Definition: ross-global.c:62
static tw_event * tw_eventq_pop_list(tw_eventq *q)
Definition: tw-eventq.h:412
void tw_scheduler_optimistic(tw_pe *me)
Definition: tw-sched.c:634
void tw_pre_run_lps(tw_pe *me)
Definition: tw-lp.c:147
static void * tw_event_data(tw_event *event)
void tw_gvt_force_update(void)
Definition: mpi_allreduce.c:39
int g_st_rt_sampling
void tw_pq_enqueue(splay_tree *st, tw_event *e)
Definition: splay.c:195
void tw_sched_init(tw_pe *me)
Definition: tw-sched.c:404
unsigned int s_nevent_processed
tw_lpid send_lp
sending LP ID for data collection uses
Definition: ross-types.h:285
tw_wtime start_time
When this PE first started execution.
Definition: ross-types.h:412
tw_peid g_tw_mynode
Definition: ross-global.c:88
In tw_pe.free_q.
Definition: ross-types.h:222
tw_peid g_tw_masternode
Definition: ross-global.c:89
void tw_wall_now(tw_wtime *t)
Definition: tw-timing.c:4
tw_pq * pq
Priority queue used to sort events.
Definition: ross-types.h:381
tw_clock s_net_read
Definition: ross-types.h:136
void st_collect_engine_data(tw_pe *me, int col_type)
Definition: st-sim-engine.c:10
commit_f commit
LP Commit event routine.
Definition: ross-types.h:92
int g_st_model_stats
tw_peid send_pe
Definition: ross-types.h:284
void tw_scheduler_optimistic_realtime(tw_pe *me)
Definition: tw-sched.c:680
void tw_scheduler_optimistic_debug(tw_pe *me)
Definition: tw-sched.c:733
tw_stat s_pe_event_ties
Definition: ross-types.h:130
void tw_kp_rollback_event(tw_event *event)
Definition: tw-kp.c:67
void tw_init_lps(tw_pe *me)
Definition: tw-lp.c:83
struct tw_event::@0 state
tw_wtime end_time
When this PE finished its execution.
Definition: ross-types.h:413
static void tw_eventq_unshift(tw_eventq *q, tw_event *e)
Definition: tw-eventq.h:320
tw_pe * pe
Definition: ross-types.h:308
tw_stime GVT
Global Virtual Time.
Definition: ross-types.h:403
static double percent_complete
Definition: mpi_allreduce.h:5
void st_inst_finalize(tw_pe *me)
static void gvt_print(tw_stime gvt)
Definition: mpi_allreduce.h:14
uint64_t tw_clock
Definition: aarch64.h:4
void tw_gvt_step1_realtime(tw_pe *me)
Definition: mpi_allreduce.c:79
#define ROSS_MAX(a, b)
void * cur_state
Current application LP data.
Definition: ross-types.h:315
event_f event
LP event handler routine.
Definition: ross-types.h:90
size_t size
Definition: ross-types.h:166
int g_st_ev_trace
Definition: st-event-trace.c:3
void tw_scheduler_conservative(tw_pe *me)
Definition: tw-sched.c:514
void tw_event_rollback(tw_event *event)
Definition: tw-event.c:181
tw_kpid g_tw_nkp
Definition: ross-global.c:25
void tw_gvt_step2(tw_pe *me)
static void tw_sched_batch_realtime(tw_pe *me)
Definition: tw-sched.c:273
tw_peid id
Definition: ross-types.h:376
In a tw_pe.pq.
Definition: ross-types.h:215
tw_clock s_total
Definition: ross-types.h:134
tw_clock s_event_abort
Definition: ross-types.h:141
static void tw_sched_cancel_q(tw_pe *me)
Definition: tw-sched.c:74
double g_tw_lookahead
Definition: ross-global.c:49
tw_clock s_rollback
Definition: ross-types.h:144
unsigned char cancel_q
Actively on a dest_lp->pe's cancel_q.
Definition: ross-types.h:269
#define TW_STIME_DBL(x)
Definition: ross.h:153
unsigned int s_nevent_abort
void io_load_events(tw_pe *me)
Definition: io-mpi.c:227
static void tw_sched_event_q(tw_pe *me)
Definition: tw-sched.c:21
tw_stime last_time
Time of the current event being processed.
Definition: ross-types.h:360
tw_clock s_cancel_q
Definition: ross-types.h:146
unsigned char cev_abort
Current event being processed must be aborted.
Definition: ross-types.h:399
tw_stat s_nevent_processed
Number of events processed.
Definition: ross-types.h:361
unsigned long long g_tw_gvt_interval_start_cycles
Definition: ross-global.c:66
LP State Structure.
Definition: ross-types.h:304
In a tw_pe.event_q list.
Definition: ross-types.h:214
tw_kp ** g_tw_kp
Definition: ross-global.c:27
unsigned char owner
Owner of the next/prev pointers; see tw_event_owner.
Definition: ross-types.h:268