ROSS
io-mpi.c
Go to the documentation of this file.
1 //Elsa Gonsiorowski
2 //Rensselaer Polytechnic Institute
3 //Decemeber 13, 2013
4 
5 #include <assert.h>
6 #include "ross.h"
7 
8 //#define RIO_DEBUG
9 
10 // Command Line Options
12 const tw_optdef io_opts[] = {
13  TWOPT_GROUP("RIO"),
14  TWOPT_UINT("io-files", g_io_number_of_files, "io files"),
15  TWOPT_END()
16 };
17 
18 // User-Set Variable Initializations
26 
27 // Local Variables
28 static unsigned long l_io_kp_offset = 0; // MPI_Exscan
29 static unsigned long l_io_lp_offset = 0; // MPI_Exscan
30 static unsigned long l0_io_total_kp = 0; // MPI_Reuced on 0
31 static unsigned long l0_io_total_lp = 0; // MPI_Reuced on 0
32 static unsigned long l_io_min_parts = 0; // MPI_Allreduce
33 static int l_io_init_flag = 0;
34 static int l_io_append_flag = 0;
35 
36 char model_version[41];
37 
38 void io_register_model_version (char *sha1) {
39  strcpy(model_version, sha1);
40 }
41 
44  // the RIO system has not been initialized
45  // or we are not buffering events
46  return pe->abort_event;
47  }
48 
49  tw_clock start = tw_clock_read();
50  tw_event *e = tw_eventq_pop(&g_io_free_events);
51 
52  if (e) {
53  e->cancel_next = NULL;
54  e->caused_by_me = NULL;
55  e->cause_next = NULL;
56  e->prev = e->next = NULL;
57 
58  memset(&e->state, 0, sizeof(e->state));
59  memset(&e->event_id, 0, sizeof(e->event_id));
60  tw_eventq_push(&g_io_buffered_events, e);
61  } else {
62  printf("WARNING: did not allocate enough events to RIO buffer\n");
63  e = pe->abort_event;
64  }
65  pe->stats.s_rio_load += (tw_clock_read() - start);
66  e->state.owner = IO_buffer;
67  return e;
68 }
69 
71  tw_eventq_delete_any(&g_io_buffered_events, e);
72  tw_eventq_push(&g_io_free_events, e);
73 }
74 
75 void io_init() {
76  int i;
77 
78  assert(l_io_init_flag == 0 && "ERROR: RIO system already initialized");
79  l_io_init_flag = 1;
80 
81  MPI_Exscan(&g_tw_nkp, &l_io_kp_offset, 1, MPI_UNSIGNED_LONG, MPI_SUM, MPI_COMM_WORLD);
82  MPI_Exscan(&g_tw_nlp, &l_io_lp_offset, 1, MPI_UNSIGNED_LONG, MPI_SUM, MPI_COMM_WORLD);
83  MPI_Reduce(&g_tw_nkp, &l0_io_total_kp, 1, MPI_UNSIGNED_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
84  MPI_Reduce(&g_tw_nlp, &l0_io_total_lp, 1, MPI_UNSIGNED_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
85 
86  // Use collectives where ever possible
87  MPI_Allreduce(&g_tw_nkp, &l_io_min_parts, 1, MPI_UNSIGNED_LONG, MPI_MIN, MPI_COMM_WORLD);
88 
89  if (g_tw_mynode == 0) {
90  printf("*** IO SYSTEM INIT ***\n\tFiles: %d\n\tParts: %lu\n\n", g_io_number_of_files, l0_io_total_kp);
91  }
92 
93  g_io_free_events.size = 0;
94  g_io_free_events.head = g_io_free_events.tail = NULL;
95  g_io_buffered_events.size = 0;
96  g_io_buffered_events.head = g_io_buffered_events.tail = NULL;
97 }
98 
99 // This run is part of a larger set of DISPARATE runs
100 // append the .md and .lp files
102  if (l_io_init_flag == 1) {
103  l_io_append_flag = 1;
104  }
105 }
106 
107 void io_load_checkpoint(char * master_filename, io_load_type load_at) {
108  strcpy(g_io_checkpoint_name, master_filename);
109  g_io_load_at = load_at;
110 }
111 
113  int i, cur_part, rc;
114  int mpi_rank = g_tw_mynode;
115  int number_of_mpitasks = tw_nnodes();
116 
117  // assert that IO system has been init
118  assert(g_io_number_of_files != 0 && "ERROR: IO variables not set: # of files\n");
119 
120  // TODO: check to make sure io system is init'd?
121 
122  MPI_File fh;
123  MPI_Status status;
124  char filename[257];
125 
126  // Read MH
127 
128  // Metadata datatype
129  MPI_Datatype MPI_IO_PART;
130  MPI_Type_contiguous(io_partition_field_count, MPI_INT, &MPI_IO_PART);
131  MPI_Type_commit(&MPI_IO_PART);
132  int partition_md_size;
133  MPI_Type_size(MPI_IO_PART, &partition_md_size);
134  MPI_Offset offset = (long long) partition_md_size * l_io_kp_offset;
135 
136  io_partition my_partitions[g_tw_nkp];
137 
138  sprintf(filename, "%s.rio-md", g_io_checkpoint_name);
139  rc = MPI_File_open(MPI_COMM_WORLD, filename, MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
140  if (rc != 0) {
141  printf("ERROR: could not MPI_File_open %s\n", filename);
142  }
143  MPI_File_read_at_all(fh, offset, &my_partitions, g_tw_nkp, MPI_IO_PART, &status);
144  MPI_File_close(&fh);
145 
146 #ifdef RIO_DEBUG
147  for (i = 0; i < g_tw_nkp; i++) {
148  printf("Rank %d read metadata\n\tpart %d\n\tfile %d\n\toffset %d\n\tsize %d\n\tlp count %d\n\tevents %d\n\n", mpi_rank,
149  my_partitions[i].part, my_partitions[i].file, my_partitions[i].offset,
150  my_partitions[i].size, my_partitions[i].lp_count, my_partitions[i].ev_count);
151  }
152 #endif
153 
154  // error check
155  int count_sum = 0;
156  for (i = 0; i < g_tw_nkp; i++) {
157  count_sum += my_partitions[i].lp_count;
158  }
159  assert(count_sum == g_tw_nlp && "ERROR: wrong number of LPs in partitions");
160 
161  // read size array
162  offset = sizeof(size_t) * l_io_lp_offset;
163  size_t * model_sizes = (size_t *) calloc(g_tw_nlp, sizeof(size_t));
164  int index = 0;
165 
166  sprintf(filename, "%s.rio-lp", g_io_checkpoint_name);
167  rc = MPI_File_open(MPI_COMM_WORLD, filename, MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
168  if (rc != 0) {
169  printf("ERROR: could not MPI_File_open %s\n", filename);
170  }
171  for (cur_part = 0; cur_part < g_tw_nkp; cur_part++){
172  int data_count = my_partitions[cur_part].lp_count;
173  if (cur_part < l_io_min_parts) {
174  MPI_File_read_at_all(fh, offset, &model_sizes[index], data_count, MPI_UNSIGNED_LONG, &status);
175  } else {
176  MPI_File_read_at(fh, offset, &model_sizes[index], data_count, MPI_UNSIGNED_LONG, &status);
177  }
178  index += data_count;
179  offset += (long long) data_count * sizeof(size_t);
180  }
181  MPI_File_close(&fh);
182 
183  // DATA FILES
184  int all_lp_i = 0;
185  for (cur_part = 0; cur_part < g_tw_nkp; cur_part++) {
186  // Read file
187  char buffer[my_partitions[cur_part].size];
188  void * b = buffer;
189  sprintf(filename, "%s.rio-data-%d", g_io_checkpoint_name, my_partitions[cur_part].file);
190 
191  // Must use non-collectives, can't know status of other MPI-ranks
192  rc = MPI_File_open(MPI_COMM_SELF, filename, MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
193  if (rc != 0) {
194  printf("ERROR: could not MPI_File_open %s\n", filename);
195  }
196  if (cur_part < l_io_min_parts) {
197  MPI_File_read_at_all(fh, (long long) my_partitions[cur_part].offset, buffer, my_partitions[cur_part].size, MPI_BYTE, &status);
198  } else {
199  MPI_File_read_at(fh, (long long) my_partitions[cur_part].offset, buffer, my_partitions[cur_part].size, MPI_BYTE, &status);
200  }
201  MPI_File_close(&fh);
202 
203  // Load Data
204  for (i = 0; i < my_partitions[cur_part].lp_count; i++, all_lp_i++) {
205  b += io_lp_deserialize(g_tw_lp[all_lp_i], b);
206  ((deserialize_f)g_io_lp_types[0].deserialize)(g_tw_lp[all_lp_i]->cur_state, b, g_tw_lp[all_lp_i]);
207  b += model_sizes[all_lp_i];
208  }
209  assert(my_partitions[cur_part].ev_count <= g_io_free_events.size);
210  for (i = 0; i < my_partitions[cur_part].ev_count; i++) {
211  // SEND THESE EVENTS
212  tw_event *ev = tw_eventq_pop(&g_io_free_events);
213  b += io_event_deserialize(ev, b);
214  void * msg = tw_event_data(ev);
215  memcpy(msg, b, g_tw_msg_sz);
216  b += g_tw_msg_sz;
217  // buffer event to send after initialization
218  tw_eventq_push(&g_io_buffered_events, ev);
219  }
220  }
221 
222  free(model_sizes);
223 
224  return;
225 }
226 
227 void io_load_events(tw_pe * me) {
228  int i;
229  int event_count = g_io_buffered_events.size;
230  tw_stime original_lookahead = g_tw_lookahead;
231  //These messages arrive before the first conservative window
232  //checking for valid lookahead is unnecessary
233  g_tw_lookahead = 0;
234  for (i = 0; i < event_count; i++) {
235  me->cur_event = me->abort_event;
236  me->cur_event->caused_by_me = NULL;
237 
238  tw_event *e = tw_eventq_pop(&g_io_buffered_events);
239  // e->dest_lp will be a GID after being loaded from checkpoint
241  void *emsg = tw_event_data(e);
242  void *nmsg = tw_event_data(n);
243  memcpy(&(n->cv), &(e->cv), sizeof(tw_bf));
244  memcpy(nmsg, emsg, g_tw_msg_sz);
245  tw_eventq_push(&g_io_free_events, e);
246  tw_event_send(n);
247 
248  if (me->cev_abort) {
249  tw_error(TW_LOC, "ran out of events during io_load_events");
250  }
251  }
252  g_tw_lookahead = original_lookahead;
253 }
254 
255 void io_store_checkpoint(char * master_filename, int data_file_number) {
256  int i, c, cur_kp;
257  int mpi_rank = g_tw_mynode;
258  int number_of_mpitasks = tw_nnodes();
259  int rank_events = 0;
260  int rank_lps = 0;
261 
262  assert(g_io_number_of_files != 0 && "Error: IO variables not set: # of file or # of parts\n");
263 
264  // Set up Comms
265  MPI_File fh;
266  MPI_Status status;
267  MPI_Comm file_comm;
268  int file_number = data_file_number;
269  int file_comm_count;
270 
271  MPI_Comm_split(MPI_COMM_WORLD, file_number, g_tw_mynode, &file_comm);
272  MPI_Comm_size(file_comm, &file_comm_count);
273 
274  MPI_Offset offset;
275  long long contribute = 0;
276 
277  char filename[256];
278  sprintf(filename, "%s.rio-data-%d", master_filename, file_number);
279 
280  // ASSUMPTION FOR MULTIPLE PARTS-PER-RANK
281  // Each MPI-Rank gets its own file
282  io_partition my_partitions[g_tw_nkp];
283 
284  size_t all_lp_sizes[g_tw_nlp];
285  int all_lp_i = 0;
286 
287  for (cur_kp = 0; cur_kp < g_tw_nkp; cur_kp++) {
288  int lps_on_kp = g_tw_kp[cur_kp]->lp_count;
289 
290  // Gather LP size data
291  int lp_size = sizeof(io_lp_store);
292  int sum_model_size = 0;
293 
294  // always do this loop to allow for interleaved LP types in g_tw_lp
295  // TODO: add short cut for one-type, non-dynamic models?
296  for (c = 0; c < g_tw_nlp; c++) {
297  if (g_tw_lp[c]->kp->id == cur_kp) {
298  int lp_type_index = g_tw_lp_typemap(g_tw_lp[c]->gid);
299  all_lp_sizes[all_lp_i] = ((model_size_f)g_io_lp_types[lp_type_index].model_size)(g_tw_lp[c]->cur_state, g_tw_lp[c]);
300  sum_model_size += all_lp_sizes[all_lp_i];
301  all_lp_i++;
302  }
303  }
304 
305  int event_count = 0;
306  int sum_event_size = 0;
307  if (cur_kp == 0) {
308  // Event Metadata
309  event_count = g_io_buffered_events.size;
310  sum_event_size = event_count * (g_tw_msg_sz + sizeof(io_event_store));
311  }
312 
313  int sum_lp_size = lps_on_kp * lp_size;
314  int sum_size = sum_lp_size + sum_model_size + sum_event_size;
315 
316  my_partitions[cur_kp].part = cur_kp + l_io_kp_offset;
317  my_partitions[cur_kp].file = file_number;
318  my_partitions[cur_kp].size = sum_size;
319  my_partitions[cur_kp].lp_count = lps_on_kp;
320  my_partitions[cur_kp].ev_count = event_count;
321 
322  contribute += sum_size;
323  rank_events += event_count;
324  rank_lps += lps_on_kp;
325  }
326 
327  // MPI EXSCAN FOR OFFSET
328  offset = (long long) 0;
329  if (file_comm_count > 1) {
330  MPI_Exscan(&contribute, &offset, 1, MPI_LONG_LONG, MPI_SUM, file_comm);
331  }
332 
333  int global_lp_i = 0;
334  for (cur_kp = 0; cur_kp < g_tw_nkp; cur_kp++) {
335 
336  // ** START Serialize **
337 
338  int sum_size = my_partitions[cur_kp].size;
339  int event_count = my_partitions[cur_kp].ev_count;
340  int lps_on_kp = my_partitions[cur_kp].lp_count;
341 
342  char buffer[sum_size];
343  void * b;
344 
345  // LPs
346  for (c = 0, b = buffer; c < g_tw_nlp; c++) {
347  if (g_tw_lp[c]->kp->id == cur_kp) {
348  b += io_lp_serialize(g_tw_lp[c], b);
349  int lp_type_index = g_tw_lp_typemap(g_tw_lp[c]->gid);
350  ((serialize_f)g_io_lp_types[lp_type_index].serialize)(g_tw_lp[c]->cur_state, b, g_tw_lp[c]);
351  b += all_lp_sizes[global_lp_i];
352  global_lp_i++;
353  }
354  }
355 
356  // Events
357  for (i = 0; i < event_count; i++) {
358  tw_event *ev = tw_eventq_pop(&g_io_buffered_events);
359  b += io_event_serialize(ev, b);
360  void * msg = tw_event_data(ev);
361  memcpy(b, msg, g_tw_msg_sz);
362  tw_eventq_push(&g_io_free_events, ev);
363  b += g_tw_msg_sz;
364  }
365 
366  // Write
367  MPI_File_open(file_comm, filename, MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_APPEND, MPI_INFO_NULL, &fh);
368  if (cur_kp < l_io_min_parts) {
369  MPI_File_write_at_all(fh, offset, &buffer, sum_size, MPI_BYTE, &status);
370  // possible optimization here: re-calc l_io_min_parts for file_comm
371  } else {
372  MPI_File_write_at(fh, offset, &buffer, sum_size, MPI_BYTE, &status);
373  }
374  MPI_File_close(&fh);
375 
376  my_partitions[cur_kp].offset = offset;
377  offset += (long long) sum_size;
378  }
379 
380  MPI_Comm_free(&file_comm);
381 
382  int amode;
383  if (l_io_append_flag) {
384  amode = MPI_MODE_CREATE | MPI_MODE_RDWR | MPI_MODE_APPEND;
385  } else {
386  amode = MPI_MODE_CREATE | MPI_MODE_RDWR;
387  }
388 
389  // Write Metadata
390  MPI_Datatype MPI_IO_PART;
391  MPI_Type_contiguous(io_partition_field_count, MPI_INT, &MPI_IO_PART);
392  MPI_Type_commit(&MPI_IO_PART);
393 
394  int psize;
395  MPI_Type_size(MPI_IO_PART, &psize);
396 
397  offset = (long long) sizeof(io_partition) * l_io_kp_offset;
398  sprintf(filename, "%s.rio-md", master_filename);
399  MPI_File_open(MPI_COMM_WORLD, filename, amode, MPI_INFO_NULL, &fh);
400  MPI_File_write_at_all(fh, offset, &my_partitions, g_tw_nkp, MPI_IO_PART, &status);
401  MPI_File_close(&fh);
402 
403 #ifdef RIO_DEBUG
404  for (cur_kp = 0; cur_kp < g_tw_nkp; cur_kp++) {
405  printf("Rank %d storing metadata\n\tpart %d\n\tfile %d\n\toffset:\t%lu\n\tsize %lu\n\tlp count %d\n\tevents %d\n\n", mpi_rank,
406  my_partitions[cur_kp].part, my_partitions[cur_kp].file, my_partitions[cur_kp].offset,
407  my_partitions[cur_kp].size, my_partitions[cur_kp].lp_count, my_partitions[cur_kp].ev_count);
408  }
409 #endif
410 
411  // Write model size array
412  offset = sizeof(size_t) * l_io_lp_offset;
413  sprintf(filename, "%s.rio-lp", master_filename);
414  MPI_File_open(MPI_COMM_WORLD, filename, amode, MPI_INFO_NULL, &fh);
415  MPI_File_write_at_all(fh, offset, all_lp_sizes, g_tw_nlp, MPI_UNSIGNED_LONG, &status);
416  MPI_File_close(&fh);
417 
418  if (l_io_append_flag == 1) {
419  printf("%lu parts written\n", g_tw_nkp);
420  }
421 
422  int global_events = 0;
423  int global_lps = 0;
424  MPI_Reduce(&rank_events, &global_events, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
425 
426  // WRITE READ ME
427  if (mpi_rank == 0 && (l_io_append_flag == 0 || data_file_number == 0) ) {
428  FILE *file;
429  char filename[256];
430 
431  sprintf(filename, "%s.txt", master_filename);
432  file = fopen(filename, "w");
433  fprintf(file, "This file was auto-generated by RIO.\n\n");
434 #if HAVE_CTIME
435  time_t raw_time;
436  time(&raw_time);
437  fprintf(file, "Date Created:\t%s", ctime(&raw_time));
438 #endif
439 
440  fprintf(file, "\n## Version Information\n\n");
441 #ifdef ROSS_VERSION
442  fprintf(file, "ROSS Version:\t%s\n", ROSS_VERSION);
443 #endif
444 #ifdef RIO_VERSION
445  fprintf(file, "RIO Version:\t%s\n", RIO_VERSION);
446 #endif
447  fprintf(file, "MODEL Version:\t%s\n", model_version);
448 
449  fprintf(file, "\n## CHECKPOINT INFORMATION\n\n");
450  fprintf(file, "Name:\t\t%s\n", master_filename);
451  if (l_io_append_flag == 0) {
452  fprintf(file, "Data Files:\t%d\n", g_io_number_of_files);
453  fprintf(file, "Partitions:\t%lu\n", l0_io_total_kp);
454  fprintf(file, "Total Events:\t%d\n", global_events);
455  fprintf(file, "Total LPs:\t%lu\n", l0_io_total_lp);
456  } else {
457  fprintf(file, "Append Flag:\tON\n");
458  fprintf(file, "Data Files:\t%d+?\n", g_io_number_of_files);
459  fprintf(file, "Partitions:\t%lu+?\n", l0_io_total_kp);
460  fprintf(file, "Total Events:\t%d+?\n", global_events);
461  fprintf(file, "Total LPs:\t%lu+?\n", l0_io_total_lp);
462  }
463 
464 
465  fprintf(file, "\n## BUILD SETTINGS\n\n");
466 #ifdef RAND_NORMAL
467  fprintf(file, "RAND_NORMAL\tON\n");
468 #else
469  fprintf(file, "RAND_NORMAL\tOFF\n");
470 #endif
471 #ifdef ROSS_CLOCK_i386
472  fprintf(file, "ARCH:\t\ti386\n");
473 #endif
474 #ifdef ROSS_CLOCK_amd64
475  fprintf(file, "ARCH:\t\tx86_64\n");
476 #endif
477 #ifdef ROSS_CLOCK_ia64
478  fprintf(file, "ARCH:\t\tia64\n");
479 #endif
480 #ifdef ROSS_CLOCK_ppc
481  fprintf(file, "ARCH:\t\tPPC 64\n");
482 #endif
483 #ifdef ROSS_CLOCK_bgl
484  fprintf(file, "ARCH:\t\tBG/L\n");
485 #endif
486 #ifdef ROSS_CLOCK_bgq
487  fprintf(file, "ARCH:\t\tBG/Q\n");
488 #endif
489 
490  fprintf(file, "\n## RUN TIME SETTINGS\n\n");
491  tw_opt_settings(file);
492  }
493 }
#define ROSS_VERSION
Definition: config.h:11
#define TW_LOC
Definition: ross-extern.h:164
void io_init()
Definition: io-mpi.c:75
tw_lp * dest_lp
Destination LP ID.
Definition: ross-types.h:280
io_partition * g_io_partitions
Definition: io-mpi.c:19
void(* serialize_f)(void *state, void *buffer, tw_lp *lp)
Definition: io.h:41
tw_lp ** g_tw_lp
Definition: ross-global.c:26
void io_appending_job()
Definition: io-mpi.c:101
double tw_stime
Definition: ross.h:150
static int l_io_init_flag
Definition: io-mpi.c:33
tw_eventid event_id
Unique id assigned by src_lp->pe if remote.
Definition: ross-types.h:264
tw_event * head
Definition: ross-types.h:167
void tw_error(const char *file, int line, const char *fmt,...) NORETURN
Definition: tw-util.c:74
size_t g_tw_msg_sz
Definition: ross-global.c:33
tw_statistics stats
per PE counters
Definition: ross-types.h:415
int lp_count
Definition: io.h:60
size_t io_event_serialize(tw_event *e, void *buffer)
Definition: io-serialize.c:52
tw_stime recv_ts
Actual time to be received.
Definition: ross-types.h:282
tw_eventq g_io_free_events
Definition: io-mpi.c:25
static void tw_eventq_delete_any(tw_eventq *q, tw_event *e)
Definition: tw-eventq.h:384
static unsigned long l_io_kp_offset
Definition: io-mpi.c:28
tw_event * cancel_next
Next event in the cancel queue for the dest_pe.
Definition: ross-types.h:260
static tw_clock tw_clock_read(void)
Definition: aarch64.h:6
char g_io_checkpoint_name[1024]
Definition: io-mpi.c:22
Holds the entire PE state.
Definition: ross-types.h:375
void io_store_checkpoint(char *master_filename, int data_file_number)
Definition: io-mpi.c:255
int size
Definition: io.h:59
static int io_partition_field_count
Definition: io.h:63
tw_lpid g_tw_nlp
Definition: ross-global.c:23
io_load_type g_io_load_at
Definition: io-mpi.c:21
tw_bf cv
Used by app during reverse computation.
Definition: ross-types.h:274
static unsigned long l_io_min_parts
Definition: io-mpi.c:32
unsigned int tw_nnodes(void)
Definition: network-mpi.c:103
size_t io_event_deserialize(tw_event *e, void *buffer)
Definition: io-serialize.c:68
int lp_count
Definition: ross-types.h:345
tw_event * prev
Definition: ross-types.h:252
uint64_t tw_lpid
Definition: ross.h:160
Event Stucture.
Definition: ross-types.h:250
size_t(* model_size_f)(void *state, tw_lp *lp)
Definition: io.h:43
tw_event * abort_event
Placeholder event for when free_q is empty.
Definition: ross-types.h:384
tw_event * next
Definition: ross-types.h:251
tw_event * cur_event
Current event being processed.
Definition: ross-types.h:385
void io_read_checkpoint()
Definition: io-mpi.c:112
void io_load_checkpoint(char *master_filename, io_load_type load_at)
Definition: io-mpi.c:107
static void * tw_event_data(tw_event *event)
size_t io_lp_serialize(tw_lp *lp, void *buffer)
Definition: io-serialize.c:3
int g_io_number_of_files
Definition: io-mpi.c:11
tw_event * cause_next
Next in parent's caused_by_me chain.
Definition: ross-types.h:262
tw_eventq g_io_buffered_events
Definition: io-mpi.c:24
tw_event * caused_by_me
Start of event list caused by this event.
Definition: ross-types.h:261
Definition: io.h:18
tw_peid g_tw_mynode
Definition: ross-global.c:88
Reverse Computation Bitfield.
Definition: ross-types.h:178
#define TWOPT_UINT(n, v, h)
Definition: tw-opts.h:30
int ev_count
Definition: io.h:61
#define TWOPT_END()
Definition: tw-opts.h:35
void(* deserialize_f)(void *state, void *buffer, tw_lp *lp)
Definition: io.h:42
static unsigned long l_io_lp_offset
Definition: io-mpi.c:29
static int l_io_append_flag
Definition: io-mpi.c:34
static tw_event * tw_eventq_pop(tw_eventq *q)
Definition: tw-eventq.h:289
tw_typemap_f g_tw_lp_typemap
Definition: ross-global.c:102
static void tw_eventq_push(tw_eventq *q, tw_event *e)
Definition: tw-eventq.h:257
void io_register_model_version(char *sha1)
Definition: io-mpi.c:38
int part
Definition: io.h:56
tw_event * tail
Definition: ross-types.h:168
io_lptype * g_io_lp_types
Definition: io-mpi.c:20
struct tw_event::@0 state
tw_lpid id
local LP id
Definition: ross-types.h:305
tw_pe * pe
Definition: avl_tree.c:11
const tw_optdef io_opts[]
Definition: io-mpi.c:12
uint64_t tw_clock
Definition: aarch64.h:4
#define TWOPT_GROUP(h)
Definition: tw-opts.h:27
tw_lp * src_lp
Sending LP ID.
Definition: ross-types.h:281
void * cur_state
Current application LP data.
Definition: ross-types.h:315
size_t io_lp_deserialize(tw_lp *lp, void *buffer)
Definition: io-serialize.c:27
size_t size
Definition: ross-types.h:166
void tw_opt_settings(FILE *outfile)
Definition: tw-opts.c:138
int offset
Definition: io.h:58
void tw_event_send(tw_event *event)
Definition: tw-event.c:9
Definition: io.h:65
tw_kpid g_tw_nkp
Definition: ross-global.c:25
static unsigned long l0_io_total_lp
Definition: io-mpi.c:31
void io_event_cancel(tw_event *e)
Definition: io-mpi.c:70
enum io_load_e io_load_type
Definition: io.h:23
char model_version[41]
Definition: io-mpi.c:36
double g_tw_lookahead
Definition: ross-global.c:49
int file
Definition: io.h:57
tw_event * io_event_grab(tw_pe *pe)
Definition: io-mpi.c:42
static unsigned long l0_io_total_kp
Definition: io-mpi.c:30
Definition: io.h:45
void io_load_events(tw_pe *me)
Definition: io-mpi.c:227
unsigned char cev_abort
Current event being processed must be aborted.
Definition: ross-types.h:399
static tw_event * tw_event_new(tw_lpid dest_gid, tw_stime offset_ts, tw_lp *sender)
Definition: ross-inline.h:40
int g_io_events_buffered_per_rank
Definition: io-mpi.c:23
tw_kp ** g_tw_kp
Definition: ross-global.c:27
unsigned char owner
Owner of the next/prev pointers; see tw_event_owner.
Definition: ross-types.h:268