ROSS
tw-kp.c
Go to the documentation of this file.
1 #include <ross.h>
2 
3 void
5 {
6  if(id >= g_tw_nkp)
7  tw_error(TW_LOC, "ID %d exceeded MAX KPs", id);
8 
9  if(g_tw_kp[id])
10  tw_error(TW_LOC, "KP already allocated: %lld\n", id);
11 
12  g_tw_kp[id] = (tw_kp *) tw_calloc(TW_LOC, "Local KP", sizeof(tw_kp), 1);
13 
14  g_tw_kp[id]->id = id;
15  g_tw_kp[id]->pe = pe;
16 
17 #ifdef ROSS_QUEUE_kp_splay
18  g_tw_kp[id]->pq = tw_eventpq_create();
19 #endif
20 }
21 
22 void
24 {
25  tw_event *e;
26  tw_clock pq_start;
27 
28  kp->s_rb_total++;
29  // instrumentation
30  kp->kp_stats->s_rb_total++;
31 
32 #if VERIFY_ROLLBACK
33  printf("%d %d: rb_to %f, now = %f \n",
34  kp->pe->id, kp->id, TW_STIME_DBL(to), TW_STIME_DBL(kp->last_time));
35 #endif
36 
37  while(kp->pevent_q.size && TW_STIME_CMP(kp->pevent_q.head->recv_ts, to) >= 0)
38  {
39  e = tw_eventq_shift(&kp->pevent_q);
40 
41  /*
42  * rollback first
43  */
45 
46  /*
47  * reset kp pointers
48  */
49  if (kp->pevent_q.size == 0)
50  {
51  kp->last_time = kp->pe->GVT;
52  } else
53  {
54  kp->last_time = kp->pevent_q.head->recv_ts;
55  }
56 
57  /*
58  * place event back into priority queue
59  */
60  pq_start = tw_clock_read();
61  tw_pq_enqueue(kp->pe->pq, e);
62  kp->pe->stats.s_pq += tw_clock_read() - pq_start;
63  }
64 }
65 
66 void
68 {
69  tw_event *e = NULL;
70  tw_kp *kp;
71  tw_pe *pe;
72  tw_clock pq_start;
73 
74  kp = event->dest_lp->kp;
75  pe = kp->pe;
76 
77  kp->s_rb_total++;
78  kp->s_rb_secondary++;
79  // instrumentation
80  kp->kp_stats->s_rb_total++;
81  kp->kp_stats->s_rb_secondary++;
82 
83 #if VERIFY_ROLLBACK
84  printf("%d %d: rb_event: %f \n", pe->id, kp->id, event->recv_ts);
85 
86  if(!kp->pevent_q.size)
87  tw_error(TW_LOC, "Attempting to rollback empty pevent_q!");
88 #endif
89 
90  e = tw_eventq_shift(&kp->pevent_q);
91  while(e != event)
92  {
93  kp->last_time = kp->pevent_q.head->recv_ts;
95  pq_start = tw_clock_read();
96  tw_pq_enqueue(pe->pq, e);
97  pe->stats.s_pq += tw_clock_read() - pq_start;
98 
99  e = tw_eventq_shift(&kp->pevent_q);
100  }
101 
103 
104  if (0 == kp->pevent_q.size)
105  kp->last_time = kp->pe->GVT;
106  else
107  kp->last_time = kp->pevent_q.head->recv_ts;
108 }
109 
110 #ifndef NUM_OUT_MESG
111 #define NUM_OUT_MESG 2000
112 #endif
113 static tw_out*
115 {
116  int i;
117 
118  tw_out *ret = (tw_out *) tw_calloc(TW_LOC, "tw_out", sizeof(struct tw_out), NUM_OUT_MESG);
119 
120  for (i = 0; i < NUM_OUT_MESG - 1; i++) {
121  ret[i].next = &ret[i + 1];
122  ret[i].owner = kp;
123  }
124  ret[i].next = NULL;
125  ret[i].owner = kp;
126 
127  return ret;
128 }
129 
130 void
132 {
133  tw_kpid i;
134  int j;
135 
136  for (i = 0; i < g_tw_nkp; i++)
137  {
138  tw_kp *kp = tw_getkp(i);
139 
140  if (kp->pe != me)
141  continue;
142 
143  kp->id = i;
144  kp->s_nevent_processed = 0;
145  kp->s_e_rbs = 0;
146  kp->s_rb_total = 0;
147  kp->s_rb_secondary = 0;
151  kp->output = init_output_messages(kp);
152  }
153 
154  // instrumentation setup
155  kp->kp_stats = (st_kp_stats*) tw_calloc(TW_LOC, "KP instrumentation", sizeof(st_kp_stats), 1);
156  for (j = 0; j < 3; j++)
157  kp->last_stats[j] = (st_kp_stats*) tw_calloc(TW_LOC, "KP instrumentation", sizeof(st_kp_stats), 1);
158  }
159 }
160 
161 tw_out *
163 {
164  if (kp->output) {
165  tw_out *ret = kp->output;
166  kp->output = kp->output->next;
167  ret->next = 0;
168  return ret;
169  }
170 
171  return NULL;
172 }
173 
174 void
176 {
177  tw_kp *kp = out->owner;
178 
179  if (kp->output) {
180  out->next = kp->output;
181  kp->output = out;
182  }
183  else {
184  kp->output = out;
185  kp->output->next = NULL;
186  }
187 }
tw_synch g_tw_synchronization_protocol
Definition: ross-global.c:18
tw_out * tw_kp_grab_output_buffer(tw_kp *kp)
Definition: tw-kp.c:162
#define TW_LOC
Definition: ross-extern.h:164
tw_kp * owner
Definition: ross-types.h:238
double tw_stime
Definition: ross.h:150
tw_clock s_pq
Definition: ross-types.h:143
tw_event * head
Definition: ross-types.h:167
void tw_error(const char *file, int line, const char *fmt,...) NORETURN
Definition: tw-util.c:74
struct st_kp_stats * kp_stats
Definition: ross-types.h:366
tw_statistics stats
per PE counters
Definition: ross-types.h:415
struct st_kp_stats * last_stats[3]
Definition: ross-types.h:367
tw_stime recv_ts
Actual time to be received.
Definition: ross-types.h:282
tw_pe * pe
PE that services this KP.
Definition: ross-types.h:342
long s_rb_secondary
Number of secondary rollbacks by this LP.
Definition: ross-types.h:365
static tw_clock tw_clock_read(void)
Definition: aarch64.h:6
Holds the entire PE state.
Definition: ross-types.h:375
static tw_event * tw_eventq_shift(tw_eventq *q)
Definition: tw-eventq.h:353
tw_eventq pevent_q
Events processed by LPs bound to this KP.
Definition: ross-types.h:359
static tw_kp * tw_getkp(tw_kpid id)
Event Stucture.
Definition: ross-types.h:250
#define TW_STIME_CMP(x, y)
Definition: ross.h:154
tw_out * output
Output messages.
Definition: ross-types.h:344
void tw_init_kps(tw_pe *me)
Definition: tw-kp.c:131
unsigned int s_rb_secondary
void tw_pq_enqueue(splay_tree *st, tw_event *e)
Definition: splay.c:195
void tw_kp_rollback_to(tw_kp *kp, tw_stime to)
Definition: tw-kp.c:23
struct tw_out * next
Definition: ross-types.h:237
long s_e_rbs
Number of events rolled back by this LP.
Definition: ross-types.h:363
#define NUM_OUT_MESG
Definition: tw-kp.c:111
tw_pq * pq
Priority queue used to sort events.
Definition: ross-types.h:381
long s_rb_total
Number of total rollbacks by this LP.
Definition: ross-types.h:364
void tw_kp_onpe(tw_kpid id, tw_pe *pe)
Definition: tw-kp.c:4
tw_stime GVT
Global Virtual Time.
Definition: ross-types.h:403
tw_peid tw_kpid
Definition: ross-types.h:45
tw_pe * pe
Definition: avl_tree.c:11
unsigned int s_rb_total
uint64_t tw_clock
Definition: aarch64.h:4
void tw_kp_rollback_event(tw_event *event)
Definition: tw-kp.c:67
tw_kpid id
ID number, otherwise its not available to the app.
Definition: ross-types.h:341
Rollback-aware output mechanism.
Definition: ross-types.h:236
size_t size
Definition: ross-types.h:166
static tw_out * init_output_messages(tw_kp *kp)
Definition: tw-kp.c:114
void tw_event_rollback(tw_event *event)
Definition: tw-event.c:181
tw_kpid g_tw_nkp
Definition: ross-global.c:25
tw_peid id
Definition: ross-types.h:376
void * tw_calloc(const char *file, int line, const char *for_who, size_t e_sz, size_t n)
Definition: tw-util.c:203
void tw_kp_put_back_output_buffer(tw_out *out)
Definition: tw-kp.c:175
#define TW_STIME_DBL(x)
Definition: ross.h:153
tw_stime last_time
Time of the current event being processed.
Definition: ross-types.h:360
tw_stat s_nevent_processed
Number of events processed.
Definition: ross-types.h:361
tw_kp ** g_tw_kp
Definition: ross-global.c:27