ROSS
tw-eventq.h
Go to the documentation of this file.
1#ifndef INC_tw_eventq_h
2#define INC_tw_eventq_h
3
4#include "buddy.h"
5#include "hash-quadratic.h"
7#include "ross-extern.h"
9#include "ross-types.h"
10
11#define ROSS_DEBUG 0
12
13/**
14 * debug assitant fuction
15 */
16static inline void
18{
19#if ROSS_DEBUG
20 tw_event *next;
21 tw_event *last;
22
23 unsigned int cnt;
24
25 cnt = 0;
26 next = q->head;
27 last = NULL;
28
29 while(next)
30 {
31 cnt++;
32
33 if(next->prev != last)
34 tw_error(TW_LOC, "Prev pointer not correct!");
35
36 last = next;
37 next = next->next;
38 }
39
40 if(q->tail != last)
41 tw_error(TW_LOC, "Tail pointer not correct!");
42
43 if(cnt != q->size)
44 tw_error(TW_LOC, "Size not correct!");
45#else
46 (void)q; // avoid "unused parameter" warning
47#endif
48}
49
50/**
51 * push the contents of one list onto another??
52 */
53static inline void
55{
56 tw_pe *pe;
57 tw_event *e;
58 tw_event *cev;
59 tw_event *next;
60
62
63 t->next = q->head;
64
65 if(q->head)
66 q->head->prev = t;
67
68 q->head = h;
69 q->head->prev = NULL;
70
71 if(!q->tail)
72 q->tail = t;
73
74 q->size += cnt;
75
76 // iterate through list to collect sent events
77 // go in reverse to ensure correct commit order
78 e = t;
79 t = t->next;
80 while(1)
81 {
82 if (g_st_ev_trace == COMMIT_TRACE) // doesn't rely on commit function existing, so should be outside of commit function check
84 tw_lp * clp = e->dest_lp;
85 tw_clock const event_start = tw_clock_read();
86 if (*clp->type->commit) {
87 (*clp->type->commit)(clp->cur_state, &e->cv, tw_event_data(e), clp);
88 }
89 tw_clock const process_time = tw_clock_read() - event_start;
90 clp->lp_stats->s_process_event += process_time;
91 g_tw_pe->stats.s_event_process += process_time;
93
94 if (e->delta_buddy) {
95 tw_clock start = tw_clock_read();
97 g_tw_pe->stats.s_buddy += (tw_clock_read() - start);
98 e->delta_buddy = 0;
99 }
100
101 pe = e->dest_lp->pe;
102 // have to reclaim non-cancelled remote events from hash table
103 if(e->event_id && e->state.remote)
104 tw_hash_remove(pe->hash_t, e, e->send_pe);
105
106 if(e->caused_by_me)
107 {
108 cev = next = e->caused_by_me;
109
110 while(cev)
111 {
112 next = cev->cause_next;
113 cev->cause_next = NULL;
114
115 if(cev->state.owner == TW_pe_sevent_q)
116 tw_event_free(cev->src_lp->pe, cev);
117
118 cev = next;
119 }
120
121 e->caused_by_me = NULL;
123 }
124 if (e == h) {
125 break;
126 }
127
128 // // check for event tie with previous event here (no need to check if prev == NULL as we break from this loop if e is the head of the list)
129 // // event ties should be when timestamp AND destination LP are the same
130 // // because this queue is ordered based on TS, this will find any pairwise event ties
131 // // if three events are tied, then this will result in counting two ties (because there are n-1 pairwise ties in an n-way tie)
132 // if (e->recv_ts == e->prev->recv_ts) {
133 // if (e->dest_lp->gid == e->prev->dest_lp->gid)
134 // pe->stats.s_pe_event_ties++;
135 // }
136
137 e = e->prev;
138 }
139
141}
142
143// To be used in `tw_eventq_fossil_collect`
144#ifdef USE_RAND_TIEBREAKER
145#define CMP_EVENT_TO_GVT(e, pe) tw_event_sig_compare_ptr(&e->sig, &pe->GVT_sig)
146#else
147#define CMP_EVENT_TO_GVT(e, pe) TW_STIME_CMP(e->recv_ts, pe->GVT)
148#endif
149
150/**
151 * Given a list, move the portion of its contents that is older than GVT to
152 * the free list.
153 *
154 * Assumptions:
155 * - The provided q is not the free_q
156 * - The head of the list has the maximum time stamp in the list. Therefore,
157 * if the head is older than GVT, everything in the list is as well.
158 */
159static inline void
161{
162 tw_event *h = q->head;
163 tw_event *t = q->tail;
164
165 int cnt;
166
167 /* Nothing to collect from this event list? */
168 if (!t || (CMP_EVENT_TO_GVT(t, pe) >= 0))
169 return;
170
171 if (CMP_EVENT_TO_GVT(h, pe) < 0)
172 {
173 /* Everything in the queue can be collected */
174 tw_eventq_push_list(&pe->free_q, h, t, q->size);
175 q->head = q->tail = NULL;
176 q->size = 0;
177 } else {
178 /* Only some of the list can be collected. We'll wind up
179 * with at least one event being collected and at least
180 * another event staying behind in the eventq structure so
181 * we can really optimize this list splicing operation for
182 * these conditions.
183 */
184 tw_event *n;
185
186 /* Search the leading part of the list... */
187 for (h = t->prev, cnt = 1; h && (CMP_EVENT_TO_GVT(h, pe) < 0); cnt++)
188 h = h->prev;
189
190 /* t isn't eligible for collection; its the new head */
191 n = h;
192
193 /* Back up one cell, we overshot where to cut the list */
194 h = h->next;
195
196 /* Cut h..t out of the event queue */
197 q->tail = n;
198 n->next = NULL;
199 q->size -= cnt;
200
201 /* Free h..t (inclusive) */
202 tw_eventq_push_list(&pe->free_q, h, t, cnt);
203 }
204}
205
206/**
207 * allocate a events into a given tw_eventq
208 */
209static inline void
210tw_eventq_alloc(tw_eventq * q, unsigned int cnt)
211{
212 tw_event *event;
213 size_t event_len;
214 size_t align;
215#ifdef ROSS_ALLOC_DEBUG
216 tw_event *event_prev = NULL;
217#endif
218
219 /* Construct a linked list of free events. We allocate
220 * the events such that they look like this in memory:
221 *
222 * ------------------
223 * | tw_event |
224 * | user_data |
225 * ------------------
226 * | tw_event |
227 * | user_data |
228 * ------------------
229 * ......
230 * ------------------
231 */
232
233 align = ROSS_MAX(sizeof(double), sizeof(void*));
234 event_len = sizeof(tw_event) + g_tw_msg_sz;
235 if (event_len & (align - 1))
236 {
237 event_len += align - (event_len & (align - 1));
238 //tw_error(TW_LOC, "REALIGNING EVENT MEMORY!\n");
239 }
240 g_tw_event_msg_sz = event_len;
241
242 // compute number of events needed for the network.
245 cnt += g_tw_gvt_threshold;
246
247 q->size = cnt;
248 /* allocate one at a time so tools like valgrind can detect buffer
249 * overflows */
250#ifdef ROSS_ALLOC_DEBUG
251 q->head = event = (tw_event *)tw_calloc(TW_LOC, "events", event_len, 1);
252 while (--cnt) {
253 event->state.owner = TW_pe_free_q;
254 event->prev = event_prev;
255 event_prev = event;
256 event->next = (tw_event *) tw_calloc(TW_LOC, "events", event_len, 1);
257 event = event->next;
258 }
259 event->prev = event_prev;
260#else
261 /* alloc in one large block for performance/locality */
262 q->head = event = (tw_event *)tw_calloc(TW_LOC, "events", event_len, cnt);
263 while (--cnt) {
264 event->state.owner = TW_pe_free_q;
265 event->prev = (tw_event *) (((char *)event) - event_len);
266 event->next = (tw_event *) (((char *)event) + event_len);
267 event = event->next;
268 }
269 event->prev = (tw_event *) (((char *)event) - event_len);
270#endif
271
272 event->state.owner = TW_pe_free_q;
273 q->head->prev = event->next = NULL;
274 q->tail = event;
275}
276
277/**
278 * push to tail of list
279 */
280static inline void
282{
283 tw_event *t = q->tail;
284
286
287 e->next = NULL;
288 e->prev = t;
289 if (t)
290 t->next = e;
291 else
292 q->head = e;
293
294 q->tail = e;
295 q->size++;
296
298}
299
300/**
301 * peek to tail of list
302 */
303static inline tw_event *
305{
306 return q->tail;
307}
308
309/**
310 * pop to tail of list
311 */
312static inline tw_event *
314{
315 tw_event *t = q->tail;
316 tw_event *p;
317
319
320 if(!t)
321 return NULL;
322
323 p = t->prev;
324
325 if (p)
326 p->next = NULL;
327 else
328 q->head = NULL;
329
330 q->tail = p;
331 q->size--;
332
333 t->next = t->prev = NULL;
334
336
337 return t;
338}
339
340/**
341 * push to head of list
342 */
343static inline void
345{
346 tw_event *h = q->head;
347
349
350 e->prev = NULL;
351 e->next = h;
352
353 if (h)
354 h->prev = e;
355 else
356 q->tail = e;
357
358 q->head = e;
359 q->size++;
360
362}
363
364/**
365 * peek at head of list
366 */
367static inline tw_event *
369{
370 return q->head;
371}
372
373/**
374 * pop from head of list
375 */
376static inline tw_event *
378{
379 tw_event *h = q->head;
380 tw_event *n;
381
383
384 if(!h)
385 return NULL;
386
387 n = h->next;
388
389 if (n)
390 n->prev = NULL;
391 else
392 q->tail = NULL;
393
394 q->head = n;
395 q->size--;
396
397 h->next = h->prev = NULL;
398
400
401 return h;
402}
403
404/**
405 * delete an event from anywhere in the list
406 */
407static inline void
409{
410 tw_event *p = e->prev;
411 tw_event *n = e->next;
412
414
415 if (p)
416 p->next = n;
417 else
418 q->head = n;
419
420 if (n)
421 n->prev = p;
422 else
423 q->tail = p;
424
425 e->next = e->prev = NULL;
426 q->size--;
427
429}
430
431/**
432 * pop the entire list.
433 * After this operation, the size of the provided q is 0.
434 */
435static inline tw_event *
437{
438 tw_event *h = q->head;
439
440 q->size = 0;
441 q->head = q->tail = NULL;
442
443 return h;
444}
445
446/**
447 * The purpose of this function is to be able to remove some
448 * part of a list.. could be all of list, from head to some inner
449 * buffer, or from some inner buffer to tail. I only care about the
450 * last case..
451 */
452static inline void
454{
456
457 if(h == q->head && t == q->tail)
458 {
459 q->size = 0;
460 q->head = q->tail = NULL;
461 return;
462 }
463
464 if(h == q->head)
465 q->head = t->next;
466 else
467 h->prev->next = t->next;
468
469 if(t == q->tail)
470 q->tail = h->prev;
471 else
472 t->next->prev = h->prev;
473
474 q->size -= cnt;
475
477}
478
479#endif
tw_pe * pe
Definition avl_tree.c:10
void buddy_free(void *ptr)
Definition buddy.c:137
static tw_clock tw_clock_read(void)
Definition aarch64.h:8
uint64_t tw_clock
Definition aarch64.h:6
tw_event * tw_hash_remove(void *h, tw_event *event, long pe)
void st_collect_event_data(tw_event *cev, double recv_rt)
int g_st_ev_trace
@ COMMIT_TRACE
void * tw_calloc(const char *file, int line, const char *for_who, size_t e_sz, size_t n)
Definition tw-util.c:206
unsigned int g_tw_gvt_threshold
Definition ross-global.c:84
static void * tw_event_data(tw_event *event)
tw_pe * g_tw_pe
Definition ross-global.c:79
static void tw_free_output_messages(tw_event *e, int print_message)
unsigned long long g_tw_clock_rate
static void tw_event_free(tw_pe *, tw_event *)
unsigned int g_tw_events_per_pe
Definition ross-global.c:80
void tw_error(const char *file, int line, const char *fmt,...)
Definition tw-util.c:77
unsigned int g_tw_net_device_size
Definition ross-global.c:91
size_t g_tw_event_msg_sz
Definition ross-global.c:47
size_t g_tw_msg_sz
Definition ross-global.c:37
#define TW_LOC
#define ROSS_MAX(a, b)
@ TW_pe_free_q
In tw_pe.free_q.
Definition ross-types.h:232
@ TW_pe_sevent_q
In tw_pe.sevent_q.
Definition ross-types.h:231
static tw_event * tw_eventq_peek(tw_eventq *q)
Definition tw-eventq.h:304
static void tw_eventq_debug(tw_eventq *q)
Definition tw-eventq.h:17
static tw_event * tw_eventq_pop_list(tw_eventq *q)
Definition tw-eventq.h:436
static void tw_eventq_delete_any(tw_eventq *q, tw_event *e)
Definition tw-eventq.h:408
static void tw_eventq_push(tw_eventq *q, tw_event *e)
Definition tw-eventq.h:281
static tw_event * tw_eventq_peek_head(tw_eventq *q)
Definition tw-eventq.h:368
static void tw_eventq_unshift(tw_eventq *q, tw_event *e)
Definition tw-eventq.h:344
static void tw_eventq_splice(tw_eventq *q, tw_event *h, tw_event *t, int cnt)
Definition tw-eventq.h:453
static void tw_eventq_fossil_collect(tw_eventq *q, tw_pe *pe)
Definition tw-eventq.h:160
#define CMP_EVENT_TO_GVT(e, pe)
Definition tw-eventq.h:145
static void tw_eventq_push_list(tw_eventq *q, tw_event *h, tw_event *t, long cnt)
Definition tw-eventq.h:54
static void tw_eventq_alloc(tw_eventq *q, unsigned int cnt)
Definition tw-eventq.h:210
static tw_event * tw_eventq_shift(tw_eventq *q)
Definition tw-eventq.h:377
static tw_event * tw_eventq_pop(tw_eventq *q)
Definition tw-eventq.h:313
Buddy-system memory allocator.
Definition of ROSS basic types.
tw_clock s_process_event
Event Stucture.
Definition ross-types.h:277
struct tw_event::@130070134144252114152124341363102114315067064025 state
tw_event * cause_next
Next in parent's caused_by_me chain.
Definition ross-types.h:289
tw_lp * src_lp
Sending LP ID.
Definition ross-types.h:313
unsigned char remote
Indicates union addr is in 'remote' storage.
Definition ross-types.h:303
unsigned char owner
Owner of the next/prev pointers; see tw_event_owner.
Definition ross-types.h:300
void * delta_buddy
Delta memory from buddy allocator.
Definition ross-types.h:307
tw_event * caused_by_me
Start of event list caused by this event.
Definition ross-types.h:288
tw_bf cv
Used by app during reverse computation.
Definition ross-types.h:306
tw_event * next
Definition ross-types.h:278
tw_lp * dest_lp
Destination LP ID.
Definition ross-types.h:312
tw_event * prev
Definition ross-types.h:279
tw_eventid event_id
Unique id assigned by src_lp->pe if remote.
Definition ross-types.h:291
tw_peid send_pe
Definition ross-types.h:316
size_t size
Definition ross-types.h:176
tw_event * head
Definition ross-types.h:177
tw_event * tail
Definition ross-types.h:178
LP State Structure.
Definition ross-types.h:336
tw_pe * pe
Definition ross-types.h:340
tw_lptype * type
Type of this LP, including service callbacks.
Definition ross-types.h:348
struct st_lp_stats * lp_stats
Definition ross-types.h:356
void * cur_state
Current application LP data.
Definition ross-types.h:347
commit_f commit
LP Commit event routine.
Definition ross-types.h:102
Holds the entire PE state.
Definition ross-types.h:416