1 /* A specialized Reactor.
2 * - by Richard W.M. Jones <rich@annexia.org>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the Free
16 * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 * $Id: pthr_reactor.c,v 1.6 2002/08/21 10:42:19 rich Exp $
26 #ifdef HAVE_SYS_POLL_H
30 #ifdef HAVE_SYS_TIME_H
50 #include "pthr_reactor.h"
52 #define REACTOR_DEBUG 0
56 int offset; /* Points into internal poll fds array. */
57 void (*fn) (int, int, void *);
64 struct reactor_timer *prev, *next;
65 unsigned long long delta;
70 struct reactor_prepoll
73 struct reactor_prepoll *next;
79 /* This is how HANDLES and POLL_ARRAY work:
81 * HANDLES is a straightforward list of reactor handle objects. When
82 * a user registers an event handler in the reactor, they get back
83 * an integer which is in fact an offset into the HANDLES array.
85 * POLL_ARRAY is the array which actually gets passed to the poll(2)
88 * HANDLES and POLL_ARRAY are related through the OFFSET field in
89 * a reactor handle. Thus:
91 * HANDLES: +------+------+------+------+------+
92 * offset: | 2 | 1 | 3 | 3 | 0 |
94 * | | | | | \ | | | | |
95 * | | | | | \ | | | | |
96 * +-|----+-|----+----\-+-|----+-|----+
98 * POLL_ARRAY: +------+-|----+\-----\-|----+ |
101 * | _______________________/
103 * +------+------+------+------+
105 * Note that you may have two handles pointing to the same offset
106 * in POLL_ARRAY: this happens when we share pollfds because they
107 * both contain the same fd and event information.
109 * If the OFFSET field of a handle contains -1, then the handle
110 * is unused. Notice that we never decrease the size of the HANDLES
113 * Adding (registering) a handle is easy enough: firstly we look for
114 * an unused (offset == -1) handle, and if not found we reallocate
115 * the HANDLES array to make it larger. Then we search through the
116 * POLL_ARRAY to see if it's possible to share. If not, we extend
117 * the POLL_ARRAY by 1 and set the offset accordingly.
119 * Deleting (unregistering) a handle is not so easy: deleting the
120 * handle is OK -- just set the offset to -1. But we must also
121 * check the POLL_ARRAY entry, and if it is not shared, delete it,
122 * shrink the POLL_ARRAY and update all handles which point to
123 * an offset larger than the deleted element in POLL_ARRAY.
127 /* The list of reactor handles registered. */
128 static struct reactor_handle *handles = 0;
129 static struct pollfd *poll_array = 0;
130 static int nr_handles_allocated = 0;
131 static int nr_array_allocated = 0;
132 static int nr_array_used = 0;
134 /* The list of timers, stored in time order (in a delta queue). */
135 static struct reactor_timer *head_timer = 0;
137 /* The list of prepoll handlers in no particular order. */
138 static struct reactor_prepoll *head_prepoll = 0;
140 /* The current time, or near as dammit, in milliseconds from Unix epoch. */
141 unsigned long long reactor_time;
143 /* Function prototypes. */
144 static void remove_timer (void *timerp);
145 static void remove_prepoll (void *timerp);
147 /* Cause reactor_init / reactor_stop to be called automatically. */
148 static void reactor_init (void) __attribute__ ((constructor));
149 static void reactor_stop (void) __attribute__ ((destructor));
157 /* Catch EPIPE errors rather than sending a signal. */
158 memset (&sa, 0, sizeof sa);
159 sa.sa_handler = SIG_IGN;
160 sa.sa_flags = SA_RESTART;
161 sigaction (SIGPIPE, &sa, 0);
163 /* Update the reactor time. */
164 gettimeofday (&tv, 0);
165 reactor_time = tv.tv_sec * 1000LL + tv.tv_usec / 1000;
172 reactor_timer p, p_next;
173 reactor_prepoll prepoll, prepoll_next;
175 /* There should be no prepoll handlers registered. Check it and free them.*/
176 for (prepoll = head_prepoll; prepoll; prepoll = prepoll_next)
178 syslog (LOG_WARNING, "prepoll handler left registered in reactor: fn=%p, data=%p",
179 prepoll->fn, prepoll->data);
180 prepoll_next = prepoll;
181 delete_pool (prepoll->pool);
184 /* There should be no timers registered. Check it and free them up. */
185 for (p = head_timer; p; p = p_next)
187 syslog (LOG_WARNING, "timer left registered in reactor: fn=%p, data=%p",
190 delete_pool (p->pool);
193 /* There should be no handles registered. Check for this. */
194 for (i = 0; i < nr_handles_allocated; ++i)
195 if (handles[i].offset >= 0)
196 syslog (LOG_WARNING, "handle left registered in reactor: fn=%p, data=%p",
197 handles[i].fn, handles[i].data);
199 /* Free up memory used by handles. */
200 if (handles) free (handles);
201 if (poll_array) free (poll_array);
205 reactor_register (int socket, int operations,
206 void (*fn) (int socket, int events, void *), void *data)
210 /* Find an unused handle. */
211 for (i = 0; i < nr_handles_allocated; ++i)
212 if (handles[i].offset == -1)
218 /* No free handles: allocate a new handle. */
219 h = nr_handles_allocated;
220 nr_handles_allocated += 8;
221 handles = realloc (handles,
222 nr_handles_allocated * sizeof (struct reactor_handle));
223 for (i = h; i < nr_handles_allocated; ++i)
224 handles[i].offset = -1;
227 /* Examine the poll descriptors to see if we can share with an
228 * existing descriptor.
230 for (a = 0; a < nr_array_used; ++a)
231 if (poll_array[a].fd == socket &&
232 poll_array[a].events == operations)
233 goto found_poll_descriptor;
235 /* Allocate space in the array of poll descriptors. */
236 if (nr_array_used >= nr_array_allocated)
238 nr_array_allocated += 8;
239 poll_array = realloc (poll_array,
240 nr_array_allocated * sizeof (struct pollfd));
244 /* Create the poll descriptor. */
245 poll_array[a].fd = socket;
246 poll_array[a].events = operations;
248 found_poll_descriptor:
249 /* Create the handle. */
250 handles[h].offset = a;
252 handles[h].data = data;
256 "reactor_register (fd=%d, ops=0x%x, fn=%p, data=%p) = %p\n",
257 socket, operations, fn, data, &handles[h]);
260 /* Return the handle. */
265 reactor_unregister (reactor_handle handle)
267 int i, a = handles[handle].offset;
271 "reactor_unregister (handle=%d [fd=%d, ops=0x%x])\n",
272 handle, poll_array[a].fd, poll_array[a].events);
275 handles[handle].offset = -1;
277 /* Does any other handle share this element? If so, leave POLL_ARRAY alone.
279 for (i = 0; i < nr_handles_allocated; ++i)
280 if (handles[i].offset == a)
283 /* Not shared. Remove this element from poll_array. */
284 memcpy (&poll_array[a], &poll_array[a+1],
285 (nr_array_used - a - 1) * sizeof (struct pollfd));
288 /* Any handles in used which use offset > a should be updated. */
289 for (i = 0; i < nr_handles_allocated; ++i)
290 if (handles[i].offset > a)
291 handles[i].offset --;
295 reactor_set_timer (pool pp,
297 void (*fn) (void *data),
301 reactor_timer timer, p, last_p;
302 unsigned long long trigger_time, this_time;
304 sp = new_subpool (pp);
306 timer = pmalloc (sp, sizeof *timer);
312 /* Register a function to clean up this timer when the subpool is deleted.*/
313 pool_register_cleanup_fn (sp, remove_timer, timer);
315 /* Calculate the trigger time. */
316 trigger_time = reactor_time + timeout;
318 if (head_timer == 0) /* List is empty. */
320 timer->prev = timer->next = 0;
321 timer->delta = trigger_time;
322 return head_timer = timer;
325 /* Find out where to insert this handle in the delta queue. */
328 for (p = head_timer; p; last_p = p, p = p->next)
330 this_time += p->delta;
332 if (this_time >= trigger_time) /* Insert before element p. */
334 timer->prev = p->prev;
336 if (p->prev) /* Not the first element. */
337 p->prev->next = timer;
338 else /* First element in list. */
341 timer->delta = trigger_time - (this_time - p->delta);
342 p->delta = this_time - trigger_time;
347 /* Insert at the end of the list. */
348 last_p->next = timer;
349 timer->prev = last_p;
351 timer->delta = trigger_time - this_time;
356 remove_timer (void *timerp)
358 struct reactor_timer *timer = (struct reactor_timer *) timerp;
360 /* Remove this timer from the list. */
361 if (timer->prev != 0)
362 timer->prev->next = timer->next;
364 head_timer = timer->next;
366 if (timer->next != 0)
368 timer->next->prev = timer->prev;
369 timer->next->delta += timer->delta;
374 reactor_unset_timer_early (reactor_timer timer)
376 delete_pool (timer->pool);
380 reactor_register_prepoll (pool pp,
381 void (*fn) (void *data),
387 sp = new_subpool (pp);
389 p = pmalloc (sp, sizeof *p);
395 pool_register_cleanup_fn (sp, remove_prepoll, p);
397 p->next = head_prepoll;
404 remove_prepoll (void *handlep)
406 reactor_prepoll handle = (reactor_prepoll) handlep, prev = 0, this;
408 /* Find this handle in the list. */
409 for (this = head_prepoll;
410 this && this != handle;
411 prev = this, this = this->next)
414 assert (this == handle);
416 if (prev == 0) { /* Remove head handler. */
417 head_prepoll = head_prepoll->next;
418 } else { /* Remove inner handler. */
419 prev->next = this->next;
424 reactor_unregister_prepoll (reactor_prepoll handle)
426 delete_pool (handle->pool);
433 reactor_prepoll prepoll;
437 /* Update the reactor time. */
438 gettimeofday (&tv, 0);
439 reactor_time = tv.tv_sec * 1000LL + tv.tv_usec / 1000;
442 /* Fire any timers which are ready. */
443 while (head_timer != 0 && head_timer->delta <= reactor_time)
449 /* Calling fn might change head_timer. */
454 /* Remove the timer from the queue now (this avoids a rare race
455 * condition exposed if code calls pth_sleep followed immediately
458 reactor_unset_timer_early (timer);
463 /* Run the prepoll handlers. This is tricky -- we have to check
464 * (a) that we run every prepoll handler, even if new ones are
465 * added while we are running them, and (b) that we don't accidentally
466 * hit a prepoll handler which has been removed. The only thing we
467 * can be sure of is that HEAD_PREPOLL is always valid. Anything it
468 * points to can change any time we call a handler. Therefore this
469 * is how we do it: (1) go through the list, marked all of the handlers
470 * as not fired (ie. clearing the FIRED flag); (2) go through the list
471 * looking for the first non-fired handle, mark it as fired and run
472 * it; (3) repeat step (2) until there are no more non-fired handles.
474 for (prepoll = head_prepoll; prepoll; prepoll = prepoll->next)
478 for (prepoll = head_prepoll;
479 prepoll && prepoll->fired;
480 prepoll = prepoll->next)
486 prepoll->fn (prepoll->data);
490 /* Poll file descriptors. */
491 if (nr_array_used >= 0)
494 fprintf (stderr, "reactor_invoke: poll [");
495 for (i = 0; i < nr_array_used; ++i)
496 fprintf (stderr, "(fd=%d, ops=0x%x)",
497 poll_array[i].fd, poll_array[i].events);
498 fprintf (stderr, "]\n");
501 r = poll (poll_array, nr_array_used,
502 head_timer ? head_timer->delta - reactor_time : -1);
504 /* Update the reactor time. */
505 gettimeofday (&tv, 0);
506 reactor_time = tv.tv_sec * 1000LL + tv.tv_usec / 1000;
509 fprintf (stderr, "reactor_invoke: poll returned %d [", r);
510 for (i = 0; i < nr_array_used; ++i)
511 if (poll_array[i].revents != 0)
512 fprintf (stderr, "(fd=%d, ops=0x%x)",
513 poll_array[i].fd, poll_array[i].revents);
514 fprintf (stderr, "]\n");
517 if (r > 0) /* Some descriptors are ready. */
519 /* Check for events happening in the array, but go via the
520 * handles because there is no back pointer back to the
521 * handles from the array. Surprisingly enough, this code
522 * appears to be free of race conditions (note that calling
523 * fn can register/unregister handles).
525 for (i = 0; i < nr_handles_allocated; ++i)
527 a = handles[i].offset;
529 if (a >= 0 && poll_array[a].revents != 0)
531 handles[i].fn (poll_array[a].fd, poll_array[a].revents,
536 else if (r == 0) /* The head timer has fired. */
542 /* Calling fn might change head_timer. */
547 /* Remove the timer from the queue now (this avoids a rare race
548 * condition exposed if code calls pth_sleep followed immediately
551 reactor_unset_timer_early (timer);