2 * - by Richard W.M. Jones <rich@annexia.org>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the Free
16 * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 * $Id: chatroom.c,v 1.6 2003/02/22 15:34:27 rich Exp $
39 #include <pthr_wait_queue.h>
45 static void chatroom_init (void) __attribute__((constructor));
46 static void chatroom_stop (void) __attribute__((destructor));
48 static pool cr_pool; /* Pool for global allocations. */
49 static hash rooms; /* Maps resid -> room structure. */
51 /* Chatbots wait on multiple rooms, and so as well as the per-room
52 * message_wq we also have this global message_wq. Chatbots wait on
53 * this and then check each room to find out which has new messages.
55 static wait_queue message_wq; /* Global message wait queue. */
59 int resid; /* Resource ID. */
60 vector messages; /* Messages posted to the room, in order. */
61 wait_queue message_wq; /* Sleep on this when waiting for messages. */
63 /* Threads is a hash of pth -> struct thread. It records each thread
64 * currently in the message display loop.
68 /* Users is a hash of userid -> number of times that user is registered
69 * in a struct thread (above). Element 0 is special because it records
70 * the number of anonymous users.
74 wait_queue users_wq; /* Sleep on this when waiting enter/leave. */
79 pool sp; /* Subpool of thread pool. */
80 int userid; /* User ID associated with this thread. */
81 const char *username; /* Username (NULL for anonymous). */
87 cr_pool = new_subpool (global_pool);
88 rooms = new_hash (cr_pool, int, struct chatroom *);
89 message_wq = new_wait_queue (cr_pool);
95 delete_pool (cr_pool);
96 cr_pool = 0; /* Force segfault. */
102 get_chatroom (db_handle dbh, int resid)
105 const char *body, *username, *original_ip;
106 struct dbi_timestamp posted_date;
111 /* Return the object from the hash, if found. */
112 if (hash_get (rooms, resid, room)) return room;
114 /* Not found, so we need to create the object. */
115 room = pmalloc (cr_pool, sizeof *room);
117 room->messages = new_vector (cr_pool, message);
118 room->message_wq = new_wait_queue (cr_pool);
119 room->threads = new_hash (cr_pool, pseudothread, struct thread);
120 room->users = new_hash (cr_pool, int, int);
121 room->users_wq = new_wait_queue (cr_pool);
123 /* Pull out the recent messages for this room from the log. */
124 sth = st_prepare_cached
126 "select g.body, g.posted_date, g.author, u.username, g.original_ip "
127 " from ml_chat_log g left outer join ml_users u on g.author = u.userid "
128 " where g.resid = ? and "
129 " g.posted_date >= current_timestamp - interval '1 hour'"
130 " order by g.posted_date",
132 st_execute (sth, resid);
134 st_bind (sth, 0, body, DBI_STRING);
135 st_bind (sth, 1, posted_date, DBI_TIMESTAMP);
136 st_bind (sth, 2, authorid, DBI_INT);
137 st_bind (sth, 3, username, DBI_STRING);
138 st_bind (sth, 4, original_ip, DBI_STRING);
140 while (st_fetch (sth))
142 m = new_message_posting (cr_pool, posted_date.hour, posted_date.min,
143 authorid, authorid ? username : 0, original_ip,
146 vector_push_back (room->messages, m);
149 /* Save the object. */
150 hash_insert (rooms, resid, room);
157 chatroom_first_message_index (const chatroom r)
159 /* This naive implementation keeps all messages around forever, hence ... */
164 chatroom_last_message_index (const chatroom r)
166 /* Note that this returns last message index + 1. */
167 return vector_size (r->messages);
171 chatroom_get_message (const chatroom r, int index,
172 void (*pre_sleep) (ml_session, void *data),
173 ml_session session, void *data)
181 if (index < vector_size (r->messages))
183 vector_get (r->messages, index, m);
187 /* The caller is asking for a future message, so now we go to sleep until
191 /* Call the pre_sleep function, if any. */
194 pre_sleep (session, data);
195 pre_sleep = 0; /* Don't go to sleep again. */
199 ml_session_release_lock (session);
200 wq_sleep_on (r->message_wq);
201 ml_session_acquire_lock (session);
203 /* Try to get the message again. */
207 extern int chatroom_get_message_multiple (const vector rooms, const hash next_msgids, message *msg_rtn, chatroom *room_rtn);
210 chatroom_post (chatroom r, ml_session session, const char *conninfo,
216 const char *username, *original_ip;
217 struct dbi_timestamp posted_date;
220 /* Get the currently logged in user. */
221 userid = ml_session_userid (session);
223 dbh = get_db_handle (conninfo, DBI_THROW_ERRORS);
225 /* Insert the message into the database. */
226 sth = st_prepare_cached
228 "insert into ml_chat_log (resid, body, author, original_ip) "
229 "values (?, ?, ?, ?)",
230 DBI_INT, DBI_STRING, DBI_INT_OR_NULL, DBI_STRING);
231 st_execute (sth, r->resid, body, userid,
232 ml_session_get_peernamestr (session));
234 /* Get the serial number of the message. */
235 msgid = st_serial (sth, "ml_chat_log_id_seq");
237 /* Fetch back the message, which will now have the default fields (in
238 * particular, the posting date) prefilled.
240 sth = st_prepare_cached
242 "select g.posted_date, u.username, g.original_ip "
243 " from ml_chat_log g left outer join ml_users u on g.author = u.userid "
246 st_execute (sth, msgid);
248 st_bind (sth, 0, posted_date, DBI_TIMESTAMP);
249 st_bind (sth, 1, username, DBI_STRING);
250 st_bind (sth, 2, original_ip, DBI_STRING);
253 pth_die (psprintf (ml_session_pool (session),
254 "missing message ID: %d", msgid));
256 /* Save the new message. */
257 m = new_message_posting (cr_pool, posted_date.hour, posted_date.min,
258 userid, userid ? username : 0, original_ip,
261 vector_push_back (r->messages, m);
263 /* Wake up anyone waiting for the message. */
264 wq_wake_up (r->message_wq);
265 wq_wake_up (message_wq);
267 /* Commit changes to the database. */
273 chatroom_spam_blank (pool thread_pool)
275 pool pool = new_subpool (thread_pool);
276 vector resids = hash_keys_in_pool (rooms, pool);
279 for (i = 0; i < vector_size (resids); ++i)
285 /* Get the room structure. */
286 vector_get (resids, i, resid);
287 hash_get (rooms, resid, r);
289 /* Send a blank message to this room. */
290 m = new_message_blank (cr_pool);
291 vector_push_back (r->messages, m);
293 /* Wake up anyone waiting for the message. */
294 wq_wake_up (r->message_wq);
295 wq_wake_up (message_wq);
301 static void leave_room (void *vr);
304 chatroom_enter (chatroom r, int userid, const char *username)
307 struct thread thread;
311 /* Create a subpool of the _thread_ pool, so that if the thread dies
312 * unexpectedly we can catch it.
314 sp = new_subpool (pth_get_pool (current_pth));
315 pool_register_cleanup_fn (sp, leave_room, r);
317 /* Update threads hash. */
318 thread.userid = userid;
319 thread.username = username ? pstrdup (sp, username) : 0;
321 hash_insert (r->threads, current_pth, thread);
323 /* Update users hash. */
324 if (hash_get (r->users, userid, count))
328 hash_insert (r->users, userid, count);
330 /* Someone has entered the room, so wake up sleepers. */
331 wq_wake_up (r->users_wq);
333 if (count == 1 || userid == 0)
335 /* Also post a message in the room. */
336 m = new_message_enter (cr_pool, username);
337 vector_push_back (r->messages, m);
339 /* Wake up anyone waiting for the message. */
340 wq_wake_up (r->message_wq);
341 wq_wake_up (message_wq);
346 chatroom_leave (chatroom r)
348 struct thread thread;
350 /* If this fails, you probably tried to call chatroom_leave twice. */
351 assert (hash_get (r->threads, current_pth, thread));
353 /* This calls leave_room. */
354 delete_pool (thread.sp);
358 leave_room (void *vr)
360 chatroom r = (chatroom) vr;
361 struct thread thread;
365 assert (hash_get (r->threads, current_pth, thread));
366 assert (hash_erase (r->threads, current_pth));
368 /* Update the users hash. */
369 assert (hash_get (r->users, thread.userid, count));
373 hash_insert (r->users, thread.userid, count);
378 assert (hash_erase (r->users, thread.userid));
381 /* Someone has left the room, so wake up sleepers. */
382 wq_wake_up (r->users_wq);
384 if (count == 0 || thread.userid == 0)
386 /* Also post a message in the room. */
387 m = new_message_leave (cr_pool, thread.username);
388 vector_push_back (r->messages, m);
390 /* Wake up anyone waiting for the message. */
391 wq_wake_up (r->message_wq);
392 wq_wake_up (message_wq);
397 chatroom_users (chatroom r)
403 chatroom_wait_enter_leave_event (chatroom r, ml_session session)
405 ml_session_release_lock (session);
406 wq_sleep_on (r->users_wq);
407 ml_session_acquire_lock (session);