Add to git.
[pthrlib.git] / src / pthr_reactor.c
1 /* A specialized Reactor.
2  * - by Richard W.M. Jones <rich@annexia.org>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the Free
16  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17  *
18  * $Id: pthr_reactor.c,v 1.6 2002/08/21 10:42:19 rich Exp $
19  */
20
21 #include "config.h"
22
23 #include <stdio.h>
24 #include <stdlib.h>
25
26 #ifdef HAVE_SYS_POLL_H
27 #include <sys/poll.h>
28 #endif
29
30 #ifdef HAVE_SYS_TIME_H
31 #include <sys/time.h>
32 #endif
33
34 #ifdef HAVE_SIGNAL_H
35 #include <signal.h>
36 #endif
37
38 #ifdef HAVE_SYSLOG_H
39 #include <syslog.h>
40 #endif
41
42 #ifdef HAVE_ASSERT_H
43 #include <assert.h>
44 #endif
45
46 #ifdef HAVE_STRING_H
47 #include <string.h>
48 #endif
49
50 #include "pthr_reactor.h"
51
52 #define REACTOR_DEBUG 0
53
54 struct reactor_handle
55 {
56   int offset;                   /* Points into internal poll fds array. */
57   void (*fn) (int, int, void *);
58   void *data;
59 };
60
61 struct reactor_timer
62 {
63   pool pool;
64   struct reactor_timer *prev, *next;
65   unsigned long long delta;
66   void (*fn) (void *);
67   void *data;
68 };
69
70 struct reactor_prepoll
71 {
72   pool pool;
73   struct reactor_prepoll *next;
74   void (*fn) (void *);
75   void *data;
76   int fired;
77 };
78
79 /* This is how HANDLES and POLL_ARRAY work:
80  *
81  * HANDLES is a straightforward list of reactor handle objects. When
82  * a user registers an event handler in the reactor, they get back
83  * an integer which is in fact an offset into the HANDLES array.
84  *
85  * POLL_ARRAY is the array which actually gets passed to the poll(2)
86  * system call.
87  *
88  * HANDLES and POLL_ARRAY are related through the OFFSET field in
89  * a reactor handle. Thus:
90  *
91  * HANDLES:     +------+------+------+------+------+
92  *      offset: | 2    | 1    | 3    | 3    | 0    |
93  *              |      |      |      |      |      |
94  *              | |    | |    |  \   | |    | |    |
95  *              | |    | |    |   \  | |    | |    |
96  *              +-|----+-|----+----\-+-|----+-|----+
97  *                 \_____|_____     \  |      |
98  * POLL_ARRAY:  +------+-|----+\-----\-|----+ |
99  *              |      | |    | \    |\     | |
100  *              |      |      |      |      | |
101  *              |     _______________________/
102  *              |      |      |      |      |
103  *              +------+------+------+------+
104  *
105  * Note that you may have two handles pointing to the same offset
106  * in POLL_ARRAY: this happens when we share pollfds because they
107  * both contain the same fd and event information.
108  *
109  * If the OFFSET field of a handle contains -1, then the handle
110  * is unused. Notice that we never decrease the size of the HANDLES
111  * array.
112  *
113  * Adding (registering) a handle is easy enough: firstly we look for
114  * an unused (offset == -1) handle, and if not found we reallocate
115  * the HANDLES array to make it larger. Then we search through the
116  * POLL_ARRAY to see if it's possible to share. If not, we extend
117  * the POLL_ARRAY by 1 and set the offset accordingly.
118  *
119  * Deleting (unregistering) a handle is not so easy: deleting the
120  * handle is OK -- just set the offset to -1. But we must also
121  * check the POLL_ARRAY entry, and if it is not shared, delete it,
122  * shrink the POLL_ARRAY and update all handles which point to
123  * an offset larger than the deleted element in POLL_ARRAY.
124  */
125
126
127 /* The list of reactor handles registered. */
128 static struct reactor_handle *handles = 0;
129 static struct pollfd *poll_array = 0;
130 static int nr_handles_allocated = 0;
131 static int nr_array_allocated = 0;
132 static int nr_array_used = 0;
133
134 /* The list of timers, stored in time order (in a delta queue). */
135 static struct reactor_timer *head_timer = 0;
136
137 /* The list of prepoll handlers in no particular order. */
138 static struct reactor_prepoll *head_prepoll = 0;
139
140 /* The current time, or near as dammit, in milliseconds from Unix epoch. */
141 unsigned long long reactor_time;
142
143 /* Function prototypes. */
144 static void remove_timer (void *timerp);
145 static void remove_prepoll (void *timerp);
146
147 /* Cause reactor_init / reactor_stop to be called automatically. */
148 static void reactor_init (void) __attribute__ ((constructor));
149 static void reactor_stop (void) __attribute__ ((destructor));
150
151 static void
152 reactor_init ()
153 {
154   struct timeval tv;
155   struct sigaction sa;
156
157   /* Catch EPIPE errors rather than sending a signal. */
158   memset (&sa, 0, sizeof sa);
159   sa.sa_handler = SIG_IGN;
160   sa.sa_flags = SA_RESTART;
161   sigaction (SIGPIPE, &sa, 0);
162
163   /* Update the reactor time. */
164   gettimeofday (&tv, 0);
165   reactor_time = tv.tv_sec * 1000LL + tv.tv_usec / 1000;
166 }
167
168 static void
169 reactor_stop ()
170 {
171   int i;
172   reactor_timer p, p_next;
173   reactor_prepoll prepoll, prepoll_next;
174
175   /* There should be no prepoll handlers registered. Check it and free them.*/
176   for (prepoll = head_prepoll; prepoll; prepoll = prepoll_next)
177     {
178       syslog (LOG_WARNING, "prepoll handler left registered in reactor: fn=%p, data=%p",
179               prepoll->fn, prepoll->data);
180       prepoll_next = prepoll;
181       delete_pool (prepoll->pool);
182     }
183
184   /* There should be no timers registered. Check it and free them up. */
185   for (p = head_timer; p; p = p_next)
186     {
187       syslog (LOG_WARNING, "timer left registered in reactor: fn=%p, data=%p",
188               p->fn, p->data);
189       p_next = p->next;
190       delete_pool (p->pool);
191     }
192
193   /* There should be no handles registered. Check for this. */
194   for (i = 0; i < nr_handles_allocated; ++i)
195     if (handles[i].offset >= 0)
196       syslog (LOG_WARNING, "handle left registered in reactor: fn=%p, data=%p",
197               handles[i].fn, handles[i].data);
198
199   /* Free up memory used by handles. */
200   if (handles) free (handles);
201   if (poll_array) free (poll_array);
202 }
203
204 reactor_handle
205 reactor_register (int socket, int operations,
206                   void (*fn) (int socket, int events, void *), void *data)
207 {
208   int i, h, a;
209
210   /* Find an unused handle. */
211   for (i = 0; i < nr_handles_allocated; ++i)
212     if (handles[i].offset == -1)
213       {
214         h = i;
215         goto found_handle;
216       }
217
218   /* No free handles: allocate a new handle. */
219   h = nr_handles_allocated;
220   nr_handles_allocated += 8;
221   handles = realloc (handles,
222                      nr_handles_allocated * sizeof (struct reactor_handle));
223   for (i = h; i < nr_handles_allocated; ++i)
224     handles[i].offset = -1;
225
226  found_handle:
227   /* Examine the poll descriptors to see if we can share with an
228    * existing descriptor.
229    */
230   for (a = 0; a < nr_array_used; ++a)
231     if (poll_array[a].fd == socket &&
232         poll_array[a].events == operations)
233       goto found_poll_descriptor;
234
235   /* Allocate space in the array of poll descriptors. */
236   if (nr_array_used >= nr_array_allocated)
237     {
238       nr_array_allocated += 8;
239       poll_array = realloc (poll_array,
240                             nr_array_allocated * sizeof (struct pollfd));
241     }
242   a = nr_array_used++;
243
244   /* Create the poll descriptor. */
245   poll_array[a].fd = socket;
246   poll_array[a].events = operations;
247
248  found_poll_descriptor:
249   /* Create the handle. */
250   handles[h].offset = a;
251   handles[h].fn = fn;
252   handles[h].data = data;
253
254 #if REACTOR_DEBUG
255   fprintf (stderr,
256            "reactor_register (fd=%d, ops=0x%x, fn=%p, data=%p) = %p\n",
257            socket, operations, fn, data, &handles[h]);
258 #endif
259
260   /* Return the handle. */
261   return h;
262 }
263
264 void
265 reactor_unregister (reactor_handle handle)
266 {
267   int i, a = handles[handle].offset;
268
269 #if REACTOR_DEBUG
270   fprintf (stderr,
271            "reactor_unregister (handle=%d [fd=%d, ops=0x%x])\n",
272            handle, poll_array[a].fd, poll_array[a].events);
273 #endif
274
275   handles[handle].offset = -1;
276
277   /* Does any other handle share this element? If so, leave POLL_ARRAY alone.
278    */
279   for (i = 0; i < nr_handles_allocated; ++i)
280     if (handles[i].offset == a)
281       return;
282
283   /* Not shared. Remove this element from poll_array. */
284   memcpy (&poll_array[a], &poll_array[a+1],
285           (nr_array_used - a - 1) * sizeof (struct pollfd));
286   nr_array_used --;
287
288   /* Any handles in used which use offset > a should be updated. */
289   for (i = 0; i < nr_handles_allocated; ++i)
290     if (handles[i].offset > a)
291       handles[i].offset --;
292 }
293
294 reactor_timer
295 reactor_set_timer (pool pp,
296                    int timeout,
297                    void (*fn) (void *data),
298                    void *data)
299 {
300   pool sp;
301   reactor_timer timer, p, last_p;
302   unsigned long long trigger_time, this_time;
303
304   sp = new_subpool (pp);
305
306   timer = pmalloc (sp, sizeof *timer);
307
308   timer->pool = sp;
309   timer->fn = fn;
310   timer->data = data;
311
312   /* Register a function to clean up this timer when the subpool is deleted.*/
313   pool_register_cleanup_fn (sp, remove_timer, timer);
314
315   /* Calculate the trigger time. */
316   trigger_time = reactor_time + timeout;
317
318   if (head_timer == 0)          /* List is empty. */
319     {
320       timer->prev = timer->next = 0;
321       timer->delta = trigger_time;
322       return head_timer = timer;
323     }
324
325   /* Find out where to insert this handle in the delta queue. */
326   this_time = 0;
327   last_p = 0;
328   for (p = head_timer; p; last_p = p, p = p->next)
329     {
330       this_time += p->delta;
331
332       if (this_time >= trigger_time) /* Insert before element p. */
333         {
334           timer->prev = p->prev;
335           timer->next = p;
336           if (p->prev)          /* Not the first element. */
337             p->prev->next = timer;
338           else                  /* First element in list. */
339             head_timer = timer;
340           p->prev = timer;
341           timer->delta = trigger_time - (this_time - p->delta);
342           p->delta = this_time - trigger_time;
343           return timer;
344         }
345     }
346
347   /* Insert at the end of the list. */
348   last_p->next = timer;
349   timer->prev = last_p;
350   timer->next = 0;
351   timer->delta = trigger_time - this_time;
352   return timer;
353 }
354
355 static void
356 remove_timer (void *timerp)
357 {
358   struct reactor_timer *timer = (struct reactor_timer *) timerp;
359
360   /* Remove this timer from the list. */
361   if (timer->prev != 0)
362     timer->prev->next = timer->next;
363   else
364     head_timer = timer->next;
365
366   if (timer->next != 0)
367     {
368       timer->next->prev = timer->prev;
369       timer->next->delta += timer->delta;
370     }
371 }
372
373 void
374 reactor_unset_timer_early (reactor_timer timer)
375 {
376   delete_pool (timer->pool);
377 }
378
379 reactor_prepoll
380 reactor_register_prepoll (pool pp,
381                           void (*fn) (void *data),
382                           void *data)
383 {
384   pool sp;
385   reactor_prepoll p;
386
387   sp = new_subpool (pp);
388
389   p = pmalloc (sp, sizeof *p);
390
391   p->pool = sp;
392   p->fn = fn;
393   p->data = data;
394
395   pool_register_cleanup_fn (sp, remove_prepoll, p);
396
397   p->next = head_prepoll;
398   head_prepoll = p;
399
400   return p;
401 }
402
403 static void
404 remove_prepoll (void *handlep)
405 {
406   reactor_prepoll handle = (reactor_prepoll) handlep, prev = 0, this;
407
408   /* Find this handle in the list. */
409   for (this = head_prepoll;
410        this && this != handle;
411        prev = this, this = this->next)
412     ;
413
414   assert (this == handle);
415
416   if (prev == 0) {              /* Remove head handler. */
417     head_prepoll = head_prepoll->next;
418   } else {                      /* Remove inner handler. */
419     prev->next = this->next;
420   }
421 }
422
423 void
424 reactor_unregister_prepoll (reactor_prepoll handle)
425 {
426   delete_pool (handle->pool);
427 }
428
429 void
430 reactor_invoke ()
431 {
432   int i, r, a;
433   reactor_prepoll prepoll;
434   struct timeval tv;
435
436 #if 0
437   /* Update the reactor time. */
438   gettimeofday (&tv, 0);
439   reactor_time = tv.tv_sec * 1000LL + tv.tv_usec / 1000;
440 #endif
441
442   /* Fire any timers which are ready. */
443   while (head_timer != 0 && head_timer->delta <= reactor_time)
444     {
445       reactor_timer timer;
446       void (*fn) (void *);
447       void *data;
448
449       /* Calling fn might change head_timer. */
450       timer = head_timer;
451       fn = timer->fn;
452       data = timer->data;
453
454       /* Remove the timer from the queue now (this avoids a rare race
455        * condition exposed if code calls pth_sleep followed immediately
456        * by pth_exit).
457        */
458       reactor_unset_timer_early (timer);
459
460       fn (data);
461     }
462
463   /* Run the prepoll handlers. This is tricky -- we have to check
464    * (a) that we run every prepoll handler, even if new ones are
465    * added while we are running them, and (b) that we don't accidentally
466    * hit a prepoll handler which has been removed. The only thing we
467    * can be sure of is that HEAD_PREPOLL is always valid. Anything it
468    * points to can change any time we call a handler. Therefore this
469    * is how we do it: (1) go through the list, marked all of the handlers
470    * as not fired (ie. clearing the FIRED flag); (2) go through the list
471    * looking for the first non-fired handle, mark it as fired and run
472    * it; (3) repeat step (2) until there are no more non-fired handles.
473    */
474   for (prepoll = head_prepoll; prepoll; prepoll = prepoll->next)
475     prepoll->fired = 0;
476
477  prepoll_again:
478   for (prepoll = head_prepoll;
479        prepoll && prepoll->fired;
480        prepoll = prepoll->next)
481     ;
482
483   if (prepoll)
484     {
485       prepoll->fired = 1;
486       prepoll->fn (prepoll->data);
487       goto prepoll_again;
488     }
489
490   /* Poll file descriptors. */
491   if (nr_array_used >= 0)
492     {
493 #if REACTOR_DEBUG
494       fprintf (stderr, "reactor_invoke: poll [");
495       for (i = 0; i < nr_array_used; ++i)
496         fprintf (stderr, "(fd=%d, ops=0x%x)",
497                  poll_array[i].fd, poll_array[i].events);
498       fprintf (stderr, "]\n");
499 #endif
500
501       r = poll (poll_array, nr_array_used,
502                 head_timer ? head_timer->delta - reactor_time : -1);
503
504       /* Update the reactor time. */
505       gettimeofday (&tv, 0);
506       reactor_time = tv.tv_sec * 1000LL + tv.tv_usec / 1000;
507
508 #if REACTOR_DEBUG
509       fprintf (stderr, "reactor_invoke: poll returned %d [", r);
510       for (i = 0; i < nr_array_used; ++i)
511         if (poll_array[i].revents != 0)
512           fprintf (stderr, "(fd=%d, ops=0x%x)",
513                    poll_array[i].fd, poll_array[i].revents);
514       fprintf (stderr, "]\n");
515 #endif
516
517       if (r > 0)                /* Some descriptors are ready. */
518         {
519           /* Check for events happening in the array, but go via the
520            * handles because there is no back pointer back to the
521            * handles from the array. Surprisingly enough, this code
522            * appears to be free of race conditions (note that calling
523            * fn can register/unregister handles).
524            */
525           for (i = 0; i < nr_handles_allocated; ++i)
526             {
527               a = handles[i].offset;
528
529               if (a >= 0 && poll_array[a].revents != 0)
530                 {
531                   handles[i].fn (poll_array[a].fd, poll_array[a].revents,
532                                  handles[i].data);
533                 }
534             }
535         }
536       else if (r == 0)          /* The head timer has fired. */
537         {
538           reactor_timer timer;
539           void (*fn) (void *);
540           void *data;
541
542           /* Calling fn might change head_timer. */
543           timer = head_timer;
544           fn = timer->fn;
545           data = timer->data;
546
547           /* Remove the timer from the queue now (this avoids a rare race
548            * condition exposed if code calls pth_sleep followed immediately
549            * by pth_exit).
550            */
551           reactor_unset_timer_early (timer);
552
553           fn (data);
554         }
555     }
556 }