62 printf(
"WARNING: did not allocate enough events to RIO buffer\n");
78 assert(
l_io_init_flag == 0 &&
"ERROR: RIO system already initialized");
93 g_io_free_events.
size = 0;
94 g_io_free_events.
head = g_io_free_events.
tail = NULL;
95 g_io_buffered_events.
size = 0;
96 g_io_buffered_events.
head = g_io_buffered_events.
tail = NULL;
129 MPI_Datatype MPI_IO_PART;
131 MPI_Type_commit(&MPI_IO_PART);
132 int partition_md_size;
133 MPI_Type_size(MPI_IO_PART, &partition_md_size);
134 MPI_Offset offset = (
long long) partition_md_size *
l_io_kp_offset;
139 rc = MPI_File_open(MPI_COMM_WORLD, filename, MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
141 printf(
"ERROR: could not MPI_File_open %s\n", filename);
143 MPI_File_read_at_all(fh, offset, &my_partitions,
g_tw_nkp, MPI_IO_PART, &status);
148 printf(
"Rank %d read metadata\n\tpart %d\n\tfile %d\n\toffset %d\n\tsize %d\n\tlp count %d\n\tevents %d\n\n", mpi_rank,
149 my_partitions[i].part, my_partitions[i].file, my_partitions[i].offset,
150 my_partitions[i].size, my_partitions[i].lp_count, my_partitions[i].ev_count);
157 count_sum += my_partitions[i].
lp_count;
159 assert(count_sum ==
g_tw_nlp &&
"ERROR: wrong number of LPs in partitions");
163 size_t * model_sizes = (
size_t *) calloc(
g_tw_nlp,
sizeof(
size_t));
167 rc = MPI_File_open(MPI_COMM_WORLD, filename, MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
169 printf(
"ERROR: could not MPI_File_open %s\n", filename);
171 for (cur_part = 0; cur_part <
g_tw_nkp; cur_part++){
172 int data_count = my_partitions[cur_part].
lp_count;
174 MPI_File_read_at_all(fh, offset, &model_sizes[index], data_count, MPI_UNSIGNED_LONG, &status);
176 MPI_File_read_at(fh, offset, &model_sizes[index], data_count, MPI_UNSIGNED_LONG, &status);
179 offset += (
long long) data_count *
sizeof(
size_t);
185 for (cur_part = 0; cur_part <
g_tw_nkp; cur_part++) {
187 char buffer[my_partitions[cur_part].
size];
192 rc = MPI_File_open(MPI_COMM_SELF, filename, MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
194 printf(
"ERROR: could not MPI_File_open %s\n", filename);
197 MPI_File_read_at_all(fh, (
long long) my_partitions[cur_part].offset, buffer, my_partitions[cur_part].size, MPI_BYTE, &status);
199 MPI_File_read_at(fh, (
long long) my_partitions[cur_part].offset, buffer, my_partitions[cur_part].size, MPI_BYTE, &status);
204 for (i = 0; i < my_partitions[cur_part].lp_count; i++, all_lp_i++) {
207 b += model_sizes[all_lp_i];
209 assert(my_partitions[cur_part].ev_count <= g_io_free_events.
size);
210 for (i = 0; i < my_partitions[cur_part].ev_count; i++) {
229 int event_count = g_io_buffered_events.
size;
234 for (i = 0; i < event_count; i++) {
243 memcpy(&(n->
cv), &(e->
cv),
sizeof(
tw_bf));
268 int file_number = data_file_number;
271 MPI_Comm_split(MPI_COMM_WORLD, file_number,
g_tw_mynode, &file_comm);
272 MPI_Comm_size(file_comm, &file_comm_count);
275 long long contribute = 0;
278 sprintf(filename,
"%s.rio-data-%d", master_filename, file_number);
287 for (cur_kp = 0; cur_kp <
g_tw_nkp; cur_kp++) {
292 int sum_model_size = 0;
300 sum_model_size += all_lp_sizes[all_lp_i];
306 int sum_event_size = 0;
309 event_count = g_io_buffered_events.
size;
313 int sum_lp_size = lps_on_kp * lp_size;
314 int sum_size = sum_lp_size + sum_model_size + sum_event_size;
317 my_partitions[cur_kp].
file = file_number;
318 my_partitions[cur_kp].
size = sum_size;
319 my_partitions[cur_kp].
lp_count = lps_on_kp;
320 my_partitions[cur_kp].
ev_count = event_count;
322 contribute += sum_size;
323 rank_events += event_count;
324 rank_lps += lps_on_kp;
328 offset = (
long long) 0;
329 if (file_comm_count > 1) {
330 MPI_Exscan(&contribute, &offset, 1, MPI_LONG_LONG, MPI_SUM, file_comm);
334 for (cur_kp = 0; cur_kp <
g_tw_nkp; cur_kp++) {
338 int sum_size = my_partitions[cur_kp].
size;
339 int event_count = my_partitions[cur_kp].
ev_count;
340 int lps_on_kp = my_partitions[cur_kp].
lp_count;
342 char buffer[sum_size];
346 for (c = 0, b = buffer; c <
g_tw_nlp; c++) {
351 b += all_lp_sizes[global_lp_i];
357 for (i = 0; i < event_count; i++) {
367 MPI_File_open(file_comm, filename, MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_APPEND, MPI_INFO_NULL, &fh);
369 MPI_File_write_at_all(fh, offset, &buffer, sum_size, MPI_BYTE, &status);
372 MPI_File_write_at(fh, offset, &buffer, sum_size, MPI_BYTE, &status);
376 my_partitions[cur_kp].
offset = offset;
377 offset += (
long long) sum_size;
380 MPI_Comm_free(&file_comm);
384 amode = MPI_MODE_CREATE | MPI_MODE_RDWR | MPI_MODE_APPEND;
386 amode = MPI_MODE_CREATE | MPI_MODE_RDWR;
390 MPI_Datatype MPI_IO_PART;
392 MPI_Type_commit(&MPI_IO_PART);
395 MPI_Type_size(MPI_IO_PART, &psize);
398 sprintf(filename,
"%s.rio-md", master_filename);
399 MPI_File_open(MPI_COMM_WORLD, filename, amode, MPI_INFO_NULL, &fh);
400 MPI_File_write_at_all(fh, offset, &my_partitions, g_tw_nkp, MPI_IO_PART, &status);
404 for (cur_kp = 0; cur_kp <
g_tw_nkp; cur_kp++) {
405 printf(
"Rank %d storing metadata\n\tpart %d\n\tfile %d\n\toffset:\t%lu\n\tsize %lu\n\tlp count %d\n\tevents %d\n\n", mpi_rank,
406 my_partitions[cur_kp].part, my_partitions[cur_kp].file, my_partitions[cur_kp].offset,
407 my_partitions[cur_kp].size, my_partitions[cur_kp].lp_count, my_partitions[cur_kp].ev_count);
413 sprintf(filename,
"%s.rio-lp", master_filename);
414 MPI_File_open(MPI_COMM_WORLD, filename, amode, MPI_INFO_NULL, &fh);
415 MPI_File_write_at_all(fh, offset, all_lp_sizes,
g_tw_nlp, MPI_UNSIGNED_LONG, &status);
419 printf(
"%lu parts written\n", g_tw_nkp);
422 int global_events = 0;
424 MPI_Reduce(&rank_events, &global_events, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
431 sprintf(filename,
"%s.txt", master_filename);
432 file = fopen(filename,
"w");
433 fprintf(file,
"This file was auto-generated by RIO.\n\n");
437 fprintf(file,
"Date Created:\t%s", ctime(&raw_time));
440 fprintf(file,
"\n## Version Information\n\n");
445 fprintf(file,
"RIO Version:\t%s\n", RIO_VERSION);
449 fprintf(file,
"\n## CHECKPOINT INFORMATION\n\n");
450 fprintf(file,
"Name:\t\t%s\n", master_filename);
454 fprintf(file,
"Total Events:\t%d\n", global_events);
457 fprintf(file,
"Append Flag:\tON\n");
460 fprintf(file,
"Total Events:\t%d+?\n", global_events);
465 fprintf(file,
"\n## BUILD SETTINGS\n\n");
467 fprintf(file,
"RAND_NORMAL\tON\n");
469 fprintf(file,
"RAND_NORMAL\tOFF\n");
471 #ifdef ROSS_CLOCK_i386
472 fprintf(file,
"ARCH:\t\ti386\n");
474 #ifdef ROSS_CLOCK_amd64
475 fprintf(file,
"ARCH:\t\tx86_64\n");
477 #ifdef ROSS_CLOCK_ia64
478 fprintf(file,
"ARCH:\t\tia64\n");
480 #ifdef ROSS_CLOCK_ppc
481 fprintf(file,
"ARCH:\t\tPPC 64\n");
483 #ifdef ROSS_CLOCK_bgl
484 fprintf(file,
"ARCH:\t\tBG/L\n");
486 #ifdef ROSS_CLOCK_bgq
487 fprintf(file,
"ARCH:\t\tBG/Q\n");
490 fprintf(file,
"\n## RUN TIME SETTINGS\n\n");
tw_lp * dest_lp
Destination LP ID.
io_partition * g_io_partitions
void(* serialize_f)(void *state, void *buffer, tw_lp *lp)
static int l_io_init_flag
tw_eventid event_id
Unique id assigned by src_lp->pe if remote.
void tw_error(const char *file, int line, const char *fmt,...) NORETURN
tw_statistics stats
per PE counters
size_t io_event_serialize(tw_event *e, void *buffer)
tw_stime recv_ts
Actual time to be received.
tw_eventq g_io_free_events
static void tw_eventq_delete_any(tw_eventq *q, tw_event *e)
static unsigned long l_io_kp_offset
tw_event * cancel_next
Next event in the cancel queue for the dest_pe.
static tw_clock tw_clock_read(void)
char g_io_checkpoint_name[1024]
Holds the entire PE state.
void io_store_checkpoint(char *master_filename, int data_file_number)
static int io_partition_field_count
io_load_type g_io_load_at
tw_bf cv
Used by app during reverse computation.
static unsigned long l_io_min_parts
unsigned int tw_nnodes(void)
size_t io_event_deserialize(tw_event *e, void *buffer)
size_t(* model_size_f)(void *state, tw_lp *lp)
tw_event * abort_event
Placeholder event for when free_q is empty.
tw_event * cur_event
Current event being processed.
void io_read_checkpoint()
void io_load_checkpoint(char *master_filename, io_load_type load_at)
static void * tw_event_data(tw_event *event)
size_t io_lp_serialize(tw_lp *lp, void *buffer)
tw_event * cause_next
Next in parent's caused_by_me chain.
tw_eventq g_io_buffered_events
tw_event * caused_by_me
Start of event list caused by this event.
Reverse Computation Bitfield.
#define TWOPT_UINT(n, v, h)
void(* deserialize_f)(void *state, void *buffer, tw_lp *lp)
static unsigned long l_io_lp_offset
static int l_io_append_flag
static tw_event * tw_eventq_pop(tw_eventq *q)
tw_typemap_f g_tw_lp_typemap
static void tw_eventq_push(tw_eventq *q, tw_event *e)
void io_register_model_version(char *sha1)
io_lptype * g_io_lp_types
struct tw_event::@0 state
const tw_optdef io_opts[]
tw_lp * src_lp
Sending LP ID.
void * cur_state
Current application LP data.
size_t io_lp_deserialize(tw_lp *lp, void *buffer)
void tw_opt_settings(FILE *outfile)
void tw_event_send(tw_event *event)
static unsigned long l0_io_total_lp
void io_event_cancel(tw_event *e)
enum io_load_e io_load_type
tw_event * io_event_grab(tw_pe *pe)
static unsigned long l0_io_total_kp
void io_load_events(tw_pe *me)
unsigned char cev_abort
Current event being processed must be aborted.
static tw_event * tw_event_new(tw_lpid dest_gid, tw_stime offset_ts, tw_lp *sender)
int g_io_events_buffered_per_rank
unsigned char owner
Owner of the next/prev pointers; see tw_event_owner.