ROSS
tw-kp.c
Go to the documentation of this file.
1#include <ross.h>
2
3void
5{
6 if(id >= g_tw_nkp)
7 tw_error(TW_LOC, "ID %d exceeded MAX KPs", id);
8
9 if(g_tw_kp[id])
10 tw_error(TW_LOC, "KP already allocated: %lld\n", id);
11
12 g_tw_kp[id] = (tw_kp *) tw_calloc(TW_LOC, "Local KP", sizeof(tw_kp), 1);
13
14 g_tw_kp[id]->id = id;
15 g_tw_kp[id]->pe = pe;
16
17#ifdef ROSS_QUEUE_kp_splay
18 g_tw_kp[id]->pq = tw_eventpq_create();
19#endif
20}
21
22
23#ifdef USE_RAND_TIEBREAKER
24void
25tw_kp_rollback_to_sig(tw_kp * kp, tw_event_sig const * to_sig)
26{
27 tw_event *e;
28 tw_clock pq_start;
29
30 kp->s_rb_total++;
31 kp->kp_stats->s_rb_total++;
32
33 while (kp->pevent_q.size && tw_event_sig_compare_ptr(&kp->pevent_q.head->sig, to_sig) >= 0)
34 {
35 e = tw_eventq_shift(&kp->pevent_q);
36
37 // rollback first
39
40 // reset kp pointers
41 if (kp->pevent_q.size == 0)
42 {
43 // kp->last_time = kp->pe->GVT;
45 } else
46 {
47 // kp->last_time = kp->pevent_q.head->recv_ts;
49 }
50
51 // place event back into priority queue
52 pq_start = tw_clock_read();
53 tw_pq_enqueue(kp->pe->pq, e);
54 kp->pe->stats.s_pq += tw_clock_read() - pq_start;
55 }
56}
57#else
58void
60{
61 tw_event *e;
62 tw_clock pq_start;
63
64 kp->s_rb_total++;
65 // instrumentation
66 kp->kp_stats->s_rb_total++;
67
68#if VERIFY_ROLLBACK
69 printf("%d %d: rb_to %f, now = %f \n",
70 kp->pe->id, kp->id, TW_STIME_DBL(to), TW_STIME_DBL(kp->last_time));
71#endif
72
73 while(kp->pevent_q.size && TW_STIME_CMP(kp->pevent_q.head->recv_ts, to) >= 0)
74 {
75 e = tw_eventq_shift(&kp->pevent_q);
76
77 /*
78 * rollback first
79 */
81
82 /*
83 * reset kp pointers
84 */
85 if (kp->pevent_q.size == 0)
86 {
87 kp->last_time = kp->pe->GVT;
88 } else
89 {
90 kp->last_time = kp->pevent_q.head->recv_ts;
91 }
92
93 /*
94 * place event back into priority queue
95 */
96 pq_start = tw_clock_read();
97 tw_pq_enqueue(kp->pe->pq, e);
98 kp->pe->stats.s_pq += tw_clock_read() - pq_start;
99 }
100}
101#endif
102
103void
105{
106 tw_event *e = NULL;
107 tw_kp *kp;
108 tw_pe *pe;
109 tw_clock pq_start;
110
111 kp = event->dest_lp->kp;
112 pe = kp->pe;
113
114 kp->s_rb_total++;
115 kp->s_rb_secondary++;
116 // instrumentation
117 kp->kp_stats->s_rb_total++;
119
120#if VERIFY_ROLLBACK
121 printf("%d %d: rb_event: %f \n", pe->id, kp->id, event->recv_ts);
122
123 if(!kp->pevent_q.size)
124 tw_error(TW_LOC, "Attempting to rollback empty pevent_q!");
125#endif
126
127 e = tw_eventq_shift(&kp->pevent_q);
128 while(e != event)
129 {
130#ifdef USE_RAND_TIEBREAKER
131 kp->last_sig = kp->pevent_q.head->sig;
132#else
133 kp->last_time = kp->pevent_q.head->recv_ts;
134#endif
136 pq_start = tw_clock_read();
137 tw_pq_enqueue(pe->pq, e);
138 pe->stats.s_pq += tw_clock_read() - pq_start;
139
140 e = tw_eventq_shift(&kp->pevent_q);
141 }
142
144
145#ifdef USE_RAND_TIEBREAKER
146 if (0 == kp->pevent_q.size)
147 kp->last_sig = kp->pe->GVT_sig;
148 else
149 kp->last_sig = kp->pevent_q.head->sig;
150#else
151 if (0 == kp->pevent_q.size)
152 kp->last_time = kp->pe->GVT;
153 else
154 kp->last_time = kp->pevent_q.head->recv_ts;
155#endif
156}
157
158#ifndef NUM_OUT_MESG
159#define NUM_OUT_MESG 2000
160#endif
161static tw_out*
163{
164 int i;
165
166 tw_out *ret = (tw_out *) tw_calloc(TW_LOC, "tw_out", sizeof(struct tw_out), NUM_OUT_MESG);
167
168 for (i = 0; i < NUM_OUT_MESG - 1; i++) {
169 ret[i].next = &ret[i + 1];
170 ret[i].owner = kp;
171 }
172 ret[i].next = NULL;
173 ret[i].owner = kp;
174
175 return ret;
176}
177
178void
180{
181 tw_kpid i;
182 int j;
183
184 for (i = 0; i < g_tw_nkp; i++)
185 {
186 tw_kp *kp = tw_getkp(i);
187
188 if (kp->pe != me)
189 continue;
190
191 kp->id = i;
192 kp->s_nevent_processed = 0;
193 kp->s_e_rbs = 0;
194 kp->s_rb_total = 0;
195 kp->s_rb_secondary = 0;
200 }
201
202 // instrumentation setup
203 kp->kp_stats = (st_kp_stats*) tw_calloc(TW_LOC, "KP instrumentation", sizeof(st_kp_stats), 1);
204 for (j = 0; j < 3; j++)
205 kp->last_stats[j] = (st_kp_stats*) tw_calloc(TW_LOC, "KP instrumentation", sizeof(st_kp_stats), 1);
206 }
207}
208
209tw_out *
211{
212 if (kp->output) {
213 tw_out *ret = kp->output;
214 kp->output = kp->output->next;
215 ret->next = 0;
216 return ret;
217 }
218
219 return NULL;
220}
221
222void
224{
225 tw_kp *kp = out->owner;
226
227 if (kp->output) {
228 out->next = kp->output;
229 kp->output = out;
230 }
231 else {
232 kp->output = out;
233 kp->output->next = NULL;
234 }
235}
tw_pe * pe
Definition avl_tree.c:10
static tw_clock tw_clock_read(void)
Definition aarch64.h:8
uint64_t tw_clock
Definition aarch64.h:6
void tw_pq_enqueue(tw_pq *, tw_event *)
Definition splay.c:245
#define TW_STIME_DBL(x)
Definition ross-base.h:42
#define TW_STIME_CMP(x, y)
Definition ross-base.h:43
double tw_stime
Definition ross-base.h:39
void * tw_calloc(const char *file, int line, const char *for_who, size_t e_sz, size_t n)
Definition tw-util.c:206
void tw_kp_rollback_to_sig(tw_kp *kp, tw_event_sig const *to_sig)
tw_kpid g_tw_nkp
Definition ross-global.c:27
tw_kp ** g_tw_kp
Definition ross-global.c:29
void tw_error(const char *file, int line, const char *fmt,...)
Definition tw-util.c:77
void tw_event_rollback(tw_event *event)
Definition tw-event.c:221
tw_synch g_tw_synchronization_protocol
Definition ross-global.c:19
#define TW_LOC
static tw_kp * tw_getkp(tw_kpid id)
static int tw_event_sig_compare_ptr(tw_event_sig const *e_sig, tw_event_sig const *n_sig)
Definition ross-types.h:512
tw_peid tw_kpid
Definition ross-types.h:55
static void tw_copy_event_sig(tw_event_sig *e, tw_event_sig const *sig)
Definition ross-types.h:493
@ OPTIMISTIC_REALTIME
Definition ross-types.h:41
@ OPTIMISTIC
Definition ross-types.h:39
@ OPTIMISTIC_DEBUG
Definition ross-types.h:40
static tw_event * tw_eventq_shift(tw_eventq *q)
Definition tw-eventq.h:377
unsigned int s_rb_secondary
unsigned int s_rb_total
Event Stucture.
Definition ross-types.h:277
tw_stime recv_ts
Actual time to be received.
Definition ross-types.h:314
tw_event_sig sig
Event signature, to be used by tiebreaker.
Definition ross-types.h:294
size_t size
Definition ross-types.h:176
tw_event * head
Definition ross-types.h:177
long s_e_rbs
Number of events rolled back by this LP.
Definition ross-types.h:404
tw_eventq pevent_q
Events processed by LPs bound to this KP.
Definition ross-types.h:396
tw_out * output
Output messages.
Definition ross-types.h:381
struct st_kp_stats * kp_stats
Definition ross-types.h:407
struct st_kp_stats * last_stats[3]
Definition ross-types.h:408
tw_pe * pe
PE that services this KP.
Definition ross-types.h:379
tw_event_sig last_sig
Event signature of the current event being processed.
Definition ross-types.h:398
tw_stime last_time
Time of the current event being processed.
Definition ross-types.h:401
long s_rb_secondary
Number of secondary rollbacks by this LP.
Definition ross-types.h:406
tw_stat s_nevent_processed
Number of events processed.
Definition ross-types.h:402
long s_rb_total
Number of total rollbacks by this LP.
Definition ross-types.h:405
tw_kpid id
ID number, otherwise its not available to the app.
Definition ross-types.h:378
Rollback-aware output mechanism.
Definition ross-types.h:246
tw_kp * owner
Definition ross-types.h:248
struct tw_out * next
Definition ross-types.h:247
Holds the entire PE state.
Definition ross-types.h:416
tw_pq * pq
Priority queue used to sort events.
Definition ross-types.h:422
tw_stime GVT
Global Virtual Time.
Definition ross-types.h:451
tw_peid id
Definition ross-types.h:417
tw_statistics stats
per PE counters
Definition ross-types.h:463
tw_event_sig GVT_sig
Global Virtual Time Signature.
Definition ross-types.h:445
tw_clock s_pq
Definition ross-types.h:153
static tw_out * init_output_messages(tw_kp *kp)
Definition tw-kp.c:162
void tw_init_kps(tw_pe *me)
Definition tw-kp.c:179
void tw_kp_rollback_to(tw_kp *kp, tw_stime to)
Definition tw-kp.c:59
#define NUM_OUT_MESG
Definition tw-kp.c:159
void tw_kp_rollback_event(tw_event *event)
Definition tw-kp.c:104
void tw_kp_onpe(tw_kpid id, tw_pe *pe)
Definition tw-kp.c:4
void tw_kp_put_back_output_buffer(tw_out *out)
Definition tw-kp.c:223
tw_out * tw_kp_grab_output_buffer(tw_kp *kp)
Definition tw-kp.c:210