ROSS
analysis-lp.c
Go to the documentation of this file.
1 #include "ross.h"
2 #include "analysis-lp.h"
3 #include <math.h>
4 #include <limits.h>
5 
6 /*
7  * The code for virtual time sampling is based off of the model-net sampling for
8  * CODES network models
9  */
10 
11 static void st_create_sample_event(tw_lp *lp);
12 
14 {
15  int i, j, idx = 0, sim_idx = 0;
16  tw_lp *cur_lp;
17 
18  // set our id relative to all analysis LPs
20  s->num_lps = ceil((double)g_tw_nlp / g_tw_nkp);
21 
22  // create list of LPs this is responsible for
23  s->lp_list = (tw_lpid*)tw_calloc(TW_LOC, "analysis LPs", sizeof(tw_lpid), s->num_lps);
24  s->lp_list_sim = (tw_lpid*)tw_calloc(TW_LOC, "analysis LPs", sizeof(tw_lpid), s->num_lps);
25  // size of lp_list is max number of LPs this analysis LP is responsible for
26  for (i = 0; i < s->num_lps; i++)
27  {
28  s->lp_list[i] = ULLONG_MAX;
29  s->lp_list_sim[i] = ULLONG_MAX;
30  }
31 
32  for (i = 0; (unsigned int)i < g_tw_nlp; i++)
33  {
34  cur_lp = g_tw_lp[i];
35 
36  if (cur_lp->kp->id == s->analysis_id % g_tw_nkp)
37  {
38  s->lp_list_sim[sim_idx] = cur_lp->gid;
39  sim_idx++;
40 
41  // check if this LP even needs sampling performed
42  if (cur_lp->model_types == NULL || cur_lp->model_types->sample_struct_sz == 0)
43  continue;
44 
45  s->lp_list[idx] = cur_lp->gid;
46  idx++;
47  }
48  }
49 
50  // update num_lps
51  s->num_lps = idx;
52  s->num_lps_sim = sim_idx;
53 
54  // setup memory to use for model samples
56  {
60  for (i = 0; i < g_st_sample_count; i++)
61  {
62  if (i == 0)
63  {
64  sample->prev = NULL;
65  sample->next = sample + 1;
66  sample->next->prev = sample;
67  }
68  else if (i == g_st_sample_count - 1)
69  {
70  sample->next = NULL;
71  s->model_samples_tail = sample;
72  }
73  else
74  {
75  sample->next = sample + 1;
76  sample->next->prev = sample;
77  }
78  if (s->num_lps <= 0)
79  tw_error(TW_LOC, "s->num_lps <= 0!");
80  sample->lp_data = (void**) tw_calloc(TW_LOC, "analysis LPs", sizeof(void*), s->num_lps);
81  for (j = 0; j < s->num_lps; j++)
82  {
83  cur_lp = tw_getlocal_lp(s->lp_list[j]);
84  sample->lp_data[j] = (void *) tw_calloc(TW_LOC, "analysis LPs", cur_lp->model_types->sample_struct_sz, 1);
85  }
86  sample = sample->next;
87  }
88  }
89 
90  // schedule 1st sampling event
92 }
93 
95 {
96  int i;
97  tw_lp *model_lp;
98 
99  lp->pe->stats.s_alp_nevent_processed++; //don't undo in RC
100 
102  {
104  // TODO handle this situation better
105  if (sample == s->model_samples_tail)
106  printf("WARNING: last available sample space for analysis lp!\n");
107 
108  sample->timestamp = tw_now(lp);
109  m->timestamp = tw_now(lp);
110 
111  // call the model sampling function for each LP on this KP
112  for (i = 0; i < s->num_lps; i++)
113  {
114  if (s->lp_list[i] == ULLONG_MAX)
115  break;
116 
117  model_lp = tw_getlocal_lp(s->lp_list[i]);
118  if (model_lp->model_types == NULL || model_lp->model_types->sample_struct_sz == 0)
119  continue;
120 
121  model_lp->model_types->sample_event_fn(model_lp->cur_state, bf, lp, sample->lp_data[i]);
122  }
123 
125  }
126 
127  // sim engine sampling
130  {
131 #ifdef USE_DAMARIS
132  if (g_st_damaris_enabled)
133  st_damaris_expose_data(lp->pe, tw_now(lp), ANALYSIS_LP);
134  else
136 #else
138 #endif
139  }
140  //collect_sim_engine_data(lp->pe, lp, s, (tw_stime) tw_clock_read() / g_tw_clock_rate);
141 
142  // create next sampling event
144 }
145 
147 {
148  tw_lp *model_lp;
149  int j;
150 
151  lp->pe->stats.s_alp_e_rbs++;
152 
154  {
155  // need to remove sample associated with this event from the list
156  model_sample_data *sample;
157  // start at end, because it's most likely closer to the timestamp we're looking for
158  for (sample = s->model_samples_current->prev; sample != NULL; sample = sample->prev)
159  {
160  //sample = &s->model_samples[i];
161  if (TW_STIME_CMP(sample->timestamp, m->timestamp) == 0)
162  {
163  for (j = 0; j < s->num_lps; j++)
164  {
165  if (s->lp_list[j] == ULLONG_MAX)
166  break;
167 
168  // first call the appropriate RC fn, to allow it to undo any state changes
169  model_lp = tw_getlocal_lp(s->lp_list[j]);
170  if (model_lp->model_types == NULL || model_lp->model_types->sample_struct_sz == 0)
171  continue;
172  model_lp->model_types->sample_revent_fn(model_lp->cur_state, bf, lp, sample->lp_data[j]);
173  }
174 
175  sample->timestamp = TW_STIME_CRT(0);
176 
177  if (sample->prev)
178  sample->prev->next = sample->next;
179  if (sample->next)
180  sample->next->prev = sample->prev;
181  if (s->model_samples_head == sample)
182  s->model_samples_head = sample->next;
183  if (s->model_samples_tail != sample)
184  { // move this freed sample to the end of the list, so we don't have a memory leak
185  sample->prev = s->model_samples_tail;
186  sample->prev->next = sample;
187  sample->next = NULL;
188  s->model_samples_tail = sample;
189  }
190 
191 
192  break;
193  }
194  }
195  }
196 
197 }
198 
200 {
201  (void) bf;
202  (void) lp;
204  {
205  // write committed data to buffer
206  model_sample_data *sample;
207  int j;
208  tw_lp *model_lp;
209  lp_metadata metadata;
210  // start at beginning
211  for (sample = s->model_samples_head; sample != NULL; sample = sample->next)
212  {
213  if (TW_STIME_CMP(sample->timestamp, m->timestamp) == 0)
214  {
215  for (j = 0; j < s->num_lps; j++)
216  {
217  model_lp = tw_getlocal_lp(s->lp_list[j]);
218  if (model_lp->model_types == NULL || model_lp->model_types->sample_struct_sz == 0)
219  continue;
220  metadata.lpid = model_lp->gid;
221  metadata.kpid = model_lp->kp->id;
222  metadata.peid = model_lp->pe->id;
223  metadata.ts = m->timestamp;
224  metadata.sample_sz = model_lp->model_types->sample_struct_sz;
225  metadata.flag = MODEL_TYPE;
226 
227  char buffer[sizeof(lp_metadata) + model_lp->model_types->sample_struct_sz];
228  memcpy(&buffer[0], (char*)&metadata, sizeof(lp_metadata));
229  memcpy(&buffer[sizeof(lp_metadata)], (char*)sample->lp_data[j], model_lp->model_types->sample_struct_sz);
231  st_buffer_push(ANALYSIS_LP, &buffer[0], sizeof(lp_metadata) + model_lp->model_types->sample_struct_sz);
233  fwrite(buffer, sizeof(lp_metadata) + model_lp->model_types->sample_struct_sz, 1, seq_analysis);
234  }
235 
236  sample->timestamp = TW_STIME_CRT(0);
237 
238  if (sample->prev)
239  sample->prev->next = sample->next;
240  if (sample->next)
241  sample->next->prev = sample->prev;
242  if (s->model_samples_head == sample)
243  s->model_samples_head = sample->next;
244  if (s->model_samples_tail != sample)
245  { // move this freed sample to the end of the list, so we don't have a memory leak
246  sample->prev = s->model_samples_tail;
247  sample->prev->next = sample;
248  sample->next = NULL;
249  s->model_samples_tail = sample;
250  }
251 
252  break;
253  }
254  }
255  }
256 }
257 
259 {
260  (void) lp;
261  // TODO all samples should be written by the commit function, right?
262  // Does anything actually need to be done here?
264  printf("WARNING: there is model sample data that has not been committed!\n");
265 }
266 
268 {
270  {
273  m->src = lp->gid;
274  tw_event_send(e);
275  }
276 }
277 
278 /*
279  * any specialized ROSS LP should be hidden from the model
280  * need to add in these LP after all model LPs
281  */
283 {
284  tw_lpid local_id = gid - analysis_start_gid;
285  return local_id / g_tw_nkp; // because there is 1 LP for each KP
286 }
287 
290  (pre_run_f) NULL,
296  sizeof(analysis_state)},
297  {0},
298 };
299 
301 {
302  tw_lp_settype(lpid, &analysis_lp[0]);
303 }
tw_peid peid
Definition: analysis-lp.h:30
tw_synch g_tw_synchronization_protocol
Definition: ross-global.c:18
#define TW_LOC
Definition: ross-extern.h:164
tw_lpid analysis_start_gid
void analysis_init(analysis_state *s, tw_lp *lp)
Definition: analysis-lp.c:13
tw_lp ** g_tw_lp
Definition: ross-global.c:26
#define TW_STIME_CRT(x)
Definition: ross.h:152
tw_peid analysis_map(tw_lpid gid)
Definition: analysis-lp.c:282
void tw_error(const char *file, int line, const char *fmt,...) NORETURN
Definition: tw-util.c:74
model_sample_data * model_samples_current
Definition: analysis-lp.h:53
tw_statistics stats
per PE counters
Definition: ross-types.h:415
struct lp_metadata lp_metadata
Definition: analysis-lp.h:9
void st_buffer_push(int type, char *data, int size)
tw_lpid src
Definition: analysis-lp.h:21
int g_st_engine_stats
void analysis_event(analysis_state *s, tw_bf *bf, analysis_msg *m, tw_lp *lp)
Definition: analysis-lp.c:94
void(* event_f)(void *sv, tw_bf *cv, void *msg, tw_lp *me)
Definition: ross-types.h:77
void analysis_finish(analysis_state *s, tw_lp *lp)
Definition: analysis-lp.c:258
tw_lpid g_tw_nlp
Definition: ross-global.c:23
tw_stime timestamp
Definition: analysis-lp.h:41
void tw_lp_settype(tw_lpid lp, tw_lptype *type)
Definition: tw-lp.c:38
uint64_t tw_lpid
Definition: ross.h:160
Function Pointers for ROSS Event Handlers.
Definition: ross-types.h:87
tw_kp * kp
kp – Kernel process that we belong to (must match pe).
Definition: ross-types.h:313
Event Stucture.
Definition: ross-types.h:250
tw_peid(* map_f)(tw_lpid)
Definition: ross-types.h:73
tw_lpid gid
global LP id
Definition: ross-types.h:306
tw_lptype analysis_lp[]
Definition: analysis-lp.c:288
#define TW_STIME_CMP(x, y)
Definition: ross.h:154
static tw_stime tw_now(tw_lp const *lp)
tw_lpid * lp_list_sim
Definition: analysis-lp.h:51
void analysis_commit(analysis_state *s, tw_bf *bf, analysis_msg *m, tw_lp *lp)
Definition: analysis-lp.c:199
static void * tw_event_data(tw_event *event)
void st_analysis_lp_settype(tw_lpid lpid)
Definition: analysis-lp.c:300
tw_stat s_alp_e_rbs
Definition: ross-types.h:157
Reverse Computation Bitfield.
Definition: ross-types.h:178
double g_st_sampling_end
tw_kpid kpid
Definition: analysis-lp.h:29
void(* init_f)(void *sv, tw_lp *me)
Definition: ross-types.h:72
void analysis_event_rc(analysis_state *s, tw_bf *bf, analysis_msg *m, tw_lp *lp)
Definition: analysis-lp.c:146
static void st_create_sample_event(tw_lp *lp)
Definition: analysis-lp.c:267
void(* revent_f)(void *sv, tw_bf *cv, void *msg, tw_lp *me)
Definition: ross-types.h:78
void st_collect_engine_data(tw_pe *me, int col_type)
Definition: st-sim-engine.c:10
void(* pre_run_f)(void *sv, tw_lp *me)
Definition: ross-types.h:76
model_sample_data * prev
Definition: analysis-lp.h:39
int g_st_model_stats
sample_revent_f sample_revent_fn
FILE * seq_analysis
sample_event_f sample_event_fn
struct st_model_types * model_types
Definition: ross-types.h:322
double g_st_vt_interval
tw_pe * pe
Definition: ross-types.h:308
tw_lpid * lp_list
Definition: analysis-lp.h:50
unsigned long tw_peid
Definition: ross.h:147
tw_kpid id
ID number, otherwise its not available to the app.
Definition: ross-types.h:341
void * cur_state
Current application LP data.
Definition: ross-types.h:315
tw_lpid analysis_id
Definition: analysis-lp.h:47
model_sample_data * next
Definition: analysis-lp.h:40
void tw_event_send(tw_event *event)
Definition: tw-event.c:9
int g_st_sample_count
tw_stat s_alp_nevent_processed
Definition: ross-types.h:156
tw_kpid g_tw_nkp
Definition: ross-global.c:25
tw_peid id
Definition: ross-types.h:376
static tw_lp * tw_getlocal_lp(tw_lpid gid)
model_sample_data * model_samples_tail
Definition: analysis-lp.h:54
int g_st_disable_out
tw_stime ts
Definition: analysis-lp.h:31
model_sample_data * model_samples_head
Definition: analysis-lp.h:52
void(* final_f)(void *sv, tw_lp *me)
Definition: ross-types.h:80
void(* commit_f)(void *sv, tw_bf *cv, void *msg, tw_lp *me)
Definition: ross-types.h:79
void * tw_calloc(const char *file, int line, const char *for_who, size_t e_sz, size_t n)
Definition: tw-util.c:203
#define TW_STIME_DBL(x)
Definition: ross.h:153
static tw_event * tw_event_new(tw_lpid dest_gid, tw_stime offset_ts, tw_lp *sender)
Definition: ross-inline.h:40
tw_lpid lpid
Definition: analysis-lp.h:28
LP State Structure.
Definition: ross-types.h:304
tw_stime timestamp
Definition: analysis-lp.h:22