Add to git.
[monolith.git] / chat / chatroom.c
1 /* Monolith chatroom.
2  * - by Richard W.M. Jones <rich@annexia.org>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the Free
16  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17  *
18  * $Id: chatroom.c,v 1.6 2003/02/22 15:34:27 rich Exp $
19  */
20
21 #include "config.h"
22
23 #include <stdio.h>
24 #include <stdlib.h>
25
26 #ifdef HAVE_ASSERT_H
27 #include <assert.h>
28 #endif
29
30 #ifdef HAVE_STRING_H
31 #include <string.h>
32 #endif
33
34 #include <pool.h>
35 #include <hash.h>
36 #include <pstring.h>
37
38 #include <pthr_dbi.h>
39 #include <pthr_wait_queue.h>
40
41 #include <monolith.h>
42
43 #include "chatroom.h"
44
45 static void chatroom_init (void) __attribute__((constructor));
46 static void chatroom_stop (void) __attribute__((destructor));
47
48 static pool cr_pool;            /* Pool for global allocations. */
49 static hash rooms;              /* Maps resid -> room structure. */
50
51 /* Chatbots wait on multiple rooms, and so as well as the per-room
52  * message_wq we also have this global message_wq. Chatbots wait on
53  * this and then check each room to find out which has new messages.
54  */
55 static wait_queue message_wq;   /* Global message wait queue. */
56
57 struct chatroom
58 {
59   int resid;                    /* Resource ID. */
60   vector messages;              /* Messages posted to the room, in order. */
61   wait_queue message_wq;        /* Sleep on this when waiting for messages. */
62
63   /* Threads is a hash of pth -> struct thread. It records each thread
64    * currently in the message display loop.
65    */
66   hash threads;
67
68   /* Users is a hash of userid -> number of times that user is registered
69    * in a struct thread (above). Element 0 is special because it records
70    * the number of anonymous users.
71    */
72   hash users;
73
74   wait_queue users_wq;          /* Sleep on this when waiting enter/leave. */
75 };
76
77 struct thread
78 {
79   pool sp;                      /* Subpool of thread pool. */
80   int userid;                   /* User ID associated with this thread. */
81   const char *username;         /* Username (NULL for anonymous). */
82 };
83
84 static void
85 chatroom_init ()
86 {
87   cr_pool = new_subpool (global_pool);
88   rooms = new_hash (cr_pool, int, struct chatroom *);
89   message_wq = new_wait_queue (cr_pool);
90 }
91
92 static void
93 chatroom_stop ()
94 {
95   delete_pool (cr_pool);
96   cr_pool = 0;                  /* Force segfault. */
97   rooms = 0;
98   message_wq = 0;
99 }
100
101 chatroom
102 get_chatroom (db_handle dbh, int resid)
103 {
104   st_handle sth;
105   const char *body, *username, *original_ip;
106   struct dbi_timestamp posted_date;
107   int authorid;
108   chatroom room;
109   message m;
110
111   /* Return the object from the hash, if found. */
112   if (hash_get (rooms, resid, room)) return room;
113
114   /* Not found, so we need to create the object. */
115   room = pmalloc (cr_pool, sizeof *room);
116   room->resid = resid;
117   room->messages = new_vector (cr_pool, message);
118   room->message_wq = new_wait_queue (cr_pool);
119   room->threads = new_hash (cr_pool, pseudothread, struct thread);
120   room->users = new_hash (cr_pool, int, int);
121   room->users_wq = new_wait_queue (cr_pool);
122
123   /* Pull out the recent messages for this room from the log. */
124   sth = st_prepare_cached
125     (dbh,
126      "select g.body, g.posted_date, g.author, u.username, g.original_ip "
127      "  from ml_chat_log g left outer join ml_users u on g.author = u.userid "
128      " where g.resid = ? and "
129      "       g.posted_date >= current_timestamp - interval '1 hour'"
130      " order by g.posted_date",
131      DBI_INT);
132   st_execute (sth, resid);
133
134   st_bind (sth, 0, body, DBI_STRING);
135   st_bind (sth, 1, posted_date, DBI_TIMESTAMP);
136   st_bind (sth, 2, authorid, DBI_INT);
137   st_bind (sth, 3, username, DBI_STRING);
138   st_bind (sth, 4, original_ip, DBI_STRING);
139
140   while (st_fetch (sth))
141     {
142       m = new_message_posting (cr_pool, posted_date.hour, posted_date.min,
143                                authorid, authorid ? username : 0, original_ip,
144                                body);
145
146       vector_push_back (room->messages, m);
147     }
148
149   /* Save the object. */
150   hash_insert (rooms, resid, room);
151
152   /* Return it. */
153   return room;
154 }
155
156 int
157 chatroom_first_message_index (const chatroom r)
158 {
159   /* This naive implementation keeps all messages around forever, hence ... */
160   return 0;
161 }
162
163 int
164 chatroom_last_message_index (const chatroom r)
165 {
166   /* Note that this returns last message index + 1. */
167   return vector_size (r->messages);
168 }
169
170 message
171 chatroom_get_message (const chatroom r, int index,
172                       void (*pre_sleep) (ml_session, void *data),
173                       ml_session session, void *data)
174 {
175   message m;
176
177   if (index < 0)
178     return 0;
179
180  again:
181   if (index < vector_size (r->messages))
182     {
183       vector_get (r->messages, index, m);
184       return m;
185     }
186
187   /* The caller is asking for a future message, so now we go to sleep until
188    * it appears.
189    */
190
191   /* Call the pre_sleep function, if any. */
192   if (pre_sleep)
193     {
194       pre_sleep (session, data);
195       pre_sleep = 0;            /* Don't go to sleep again. */
196     }
197
198   /* Go to sleep. */
199   ml_session_release_lock (session);
200   wq_sleep_on (r->message_wq);
201   ml_session_acquire_lock (session);
202
203   /* Try to get the message again. */
204   goto again;
205 }
206
207 extern int chatroom_get_message_multiple (const vector rooms, const hash next_msgids, message *msg_rtn, chatroom *room_rtn);
208
209 void
210 chatroom_post (chatroom r, ml_session session, const char *conninfo,
211                const char *body)
212 {
213   db_handle dbh;
214   st_handle sth;
215   int userid, msgid;
216   const char *username, *original_ip;
217   struct dbi_timestamp posted_date;
218   message m;
219
220   /* Get the currently logged in user. */
221   userid = ml_session_userid (session);
222
223   dbh = get_db_handle (conninfo, DBI_THROW_ERRORS);
224
225   /* Insert the message into the database. */
226   sth = st_prepare_cached
227     (dbh,
228      "insert into ml_chat_log (resid, body, author, original_ip) "
229      "values (?, ?, ?, ?)",
230      DBI_INT, DBI_STRING, DBI_INT_OR_NULL, DBI_STRING);
231   st_execute (sth, r->resid, body, userid,
232               ml_session_get_peernamestr (session));
233
234   /* Get the serial number of the message. */
235   msgid = st_serial (sth, "ml_chat_log_id_seq");
236
237   /* Fetch back the message, which will now have the default fields (in
238    * particular, the posting date) prefilled.
239    */
240   sth = st_prepare_cached
241     (dbh,
242      "select g.posted_date, u.username, g.original_ip "
243      "  from ml_chat_log g left outer join ml_users u on g.author = u.userid "
244      " where g.id = ?",
245      DBI_INT);
246   st_execute (sth, msgid);
247
248   st_bind (sth, 0, posted_date, DBI_TIMESTAMP);
249   st_bind (sth, 1, username, DBI_STRING);
250   st_bind (sth, 2, original_ip, DBI_STRING);
251
252   if (!st_fetch (sth))
253     pth_die (psprintf (ml_session_pool (session),
254                        "missing message ID: %d", msgid));
255
256   /* Save the new message. */
257   m = new_message_posting (cr_pool, posted_date.hour, posted_date.min,
258                            userid, userid ? username : 0, original_ip,
259                            body);
260
261   vector_push_back (r->messages, m);
262
263   /* Wake up anyone waiting for the message. */
264   wq_wake_up (r->message_wq);
265   wq_wake_up (message_wq);
266
267   /* Commit changes to the database. */
268   db_commit (dbh);
269   put_db_handle (dbh);
270 }
271
272 void
273 chatroom_spam_blank (pool thread_pool)
274 {
275   pool pool = new_subpool (thread_pool);
276   vector resids = hash_keys_in_pool (rooms, pool);
277   int i;
278
279   for (i = 0; i < vector_size (resids); ++i)
280     {
281       message m;
282       chatroom r;
283       int resid;
284
285       /* Get the room structure. */
286       vector_get (resids, i, resid);
287       hash_get (rooms, resid, r);
288
289       /* Send a blank message to this room. */
290       m = new_message_blank (cr_pool);
291       vector_push_back (r->messages, m);
292
293       /* Wake up anyone waiting for the message. */
294       wq_wake_up (r->message_wq);
295       wq_wake_up (message_wq);
296     }
297
298   delete_pool (pool);
299 }
300
301 static void leave_room (void *vr);
302
303 void
304 chatroom_enter (chatroom r, int userid, const char *username)
305 {
306   pool sp;
307   struct thread thread;
308   int count;
309   message m;
310
311   /* Create a subpool of the _thread_ pool, so that if the thread dies
312    * unexpectedly we can catch it.
313    */
314   sp = new_subpool (pth_get_pool (current_pth));
315   pool_register_cleanup_fn (sp, leave_room, r);
316
317   /* Update threads hash. */
318   thread.userid = userid;
319   thread.username = username ? pstrdup (sp, username) : 0;
320   thread.sp = sp;
321   hash_insert (r->threads, current_pth, thread);
322
323   /* Update users hash. */
324   if (hash_get (r->users, userid, count))
325     count++;
326   else
327     count = 1;
328   hash_insert (r->users, userid, count);
329
330   /* Someone has entered the room, so wake up sleepers. */
331   wq_wake_up (r->users_wq);
332
333   if (count == 1 || userid == 0)
334     {
335       /* Also post a message in the room. */
336       m = new_message_enter (cr_pool, username);
337       vector_push_back (r->messages, m);
338
339       /* Wake up anyone waiting for the message. */
340       wq_wake_up (r->message_wq);
341       wq_wake_up (message_wq);
342     }
343 }
344
345 void
346 chatroom_leave (chatroom r)
347 {
348   struct thread thread;
349
350   /* If this fails, you probably tried to call chatroom_leave twice. */
351   assert (hash_get (r->threads, current_pth, thread));
352
353   /* This calls leave_room. */
354   delete_pool (thread.sp);
355 }
356
357 static void
358 leave_room (void *vr)
359 {
360   chatroom r = (chatroom) vr;
361   struct thread thread;
362   int count;
363   message m;
364
365   assert (hash_get (r->threads, current_pth, thread));
366   assert (hash_erase (r->threads, current_pth));
367
368   /* Update the users hash. */
369   assert (hash_get (r->users, thread.userid, count));
370   if (count > 1)
371     {
372       count--;
373       hash_insert (r->users, thread.userid, count);
374     }
375   else
376     {
377       count--;
378       assert (hash_erase (r->users, thread.userid));
379     }
380
381   /* Someone has left the room, so wake up sleepers. */
382   wq_wake_up (r->users_wq);
383
384   if (count == 0 || thread.userid == 0)
385     {
386       /* Also post a message in the room. */
387       m = new_message_leave (cr_pool, thread.username);
388       vector_push_back (r->messages, m);
389
390       /* Wake up anyone waiting for the message. */
391       wq_wake_up (r->message_wq);
392       wq_wake_up (message_wq);
393     }
394 }
395
396 const hash
397 chatroom_users (chatroom r)
398 {
399   return r->users;
400 }
401
402 void
403 chatroom_wait_enter_leave_event (chatroom r, ml_session session)
404 {
405   ml_session_release_lock (session);
406   wq_sleep_on (r->users_wq);
407   ml_session_acquire_lock (session);
408 }