/* Monolith chatroom. * - by Richard W.M. Jones * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the Free * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. * * $Id: chatroom.c,v 1.6 2003/02/22 15:34:27 rich Exp $ */ #include "config.h" #include #include #ifdef HAVE_ASSERT_H #include #endif #ifdef HAVE_STRING_H #include #endif #include #include #include #include #include #include #include "chatroom.h" static void chatroom_init (void) __attribute__((constructor)); static void chatroom_stop (void) __attribute__((destructor)); static pool cr_pool; /* Pool for global allocations. */ static hash rooms; /* Maps resid -> room structure. */ /* Chatbots wait on multiple rooms, and so as well as the per-room * message_wq we also have this global message_wq. Chatbots wait on * this and then check each room to find out which has new messages. */ static wait_queue message_wq; /* Global message wait queue. */ struct chatroom { int resid; /* Resource ID. */ vector messages; /* Messages posted to the room, in order. */ wait_queue message_wq; /* Sleep on this when waiting for messages. */ /* Threads is a hash of pth -> struct thread. It records each thread * currently in the message display loop. */ hash threads; /* Users is a hash of userid -> number of times that user is registered * in a struct thread (above). Element 0 is special because it records * the number of anonymous users. */ hash users; wait_queue users_wq; /* Sleep on this when waiting enter/leave. */ }; struct thread { pool sp; /* Subpool of thread pool. */ int userid; /* User ID associated with this thread. */ const char *username; /* Username (NULL for anonymous). */ }; static void chatroom_init () { cr_pool = new_subpool (global_pool); rooms = new_hash (cr_pool, int, struct chatroom *); message_wq = new_wait_queue (cr_pool); } static void chatroom_stop () { delete_pool (cr_pool); cr_pool = 0; /* Force segfault. */ rooms = 0; message_wq = 0; } chatroom get_chatroom (db_handle dbh, int resid) { st_handle sth; const char *body, *username, *original_ip; struct dbi_timestamp posted_date; int authorid; chatroom room; message m; /* Return the object from the hash, if found. */ if (hash_get (rooms, resid, room)) return room; /* Not found, so we need to create the object. */ room = pmalloc (cr_pool, sizeof *room); room->resid = resid; room->messages = new_vector (cr_pool, message); room->message_wq = new_wait_queue (cr_pool); room->threads = new_hash (cr_pool, pseudothread, struct thread); room->users = new_hash (cr_pool, int, int); room->users_wq = new_wait_queue (cr_pool); /* Pull out the recent messages for this room from the log. */ sth = st_prepare_cached (dbh, "select g.body, g.posted_date, g.author, u.username, g.original_ip " " from ml_chat_log g left outer join ml_users u on g.author = u.userid " " where g.resid = ? and " " g.posted_date >= current_timestamp - interval '1 hour'" " order by g.posted_date", DBI_INT); st_execute (sth, resid); st_bind (sth, 0, body, DBI_STRING); st_bind (sth, 1, posted_date, DBI_TIMESTAMP); st_bind (sth, 2, authorid, DBI_INT); st_bind (sth, 3, username, DBI_STRING); st_bind (sth, 4, original_ip, DBI_STRING); while (st_fetch (sth)) { m = new_message_posting (cr_pool, posted_date.hour, posted_date.min, authorid, authorid ? username : 0, original_ip, body); vector_push_back (room->messages, m); } /* Save the object. */ hash_insert (rooms, resid, room); /* Return it. */ return room; } int chatroom_first_message_index (const chatroom r) { /* This naive implementation keeps all messages around forever, hence ... */ return 0; } int chatroom_last_message_index (const chatroom r) { /* Note that this returns last message index + 1. */ return vector_size (r->messages); } message chatroom_get_message (const chatroom r, int index, void (*pre_sleep) (ml_session, void *data), ml_session session, void *data) { message m; if (index < 0) return 0; again: if (index < vector_size (r->messages)) { vector_get (r->messages, index, m); return m; } /* The caller is asking for a future message, so now we go to sleep until * it appears. */ /* Call the pre_sleep function, if any. */ if (pre_sleep) { pre_sleep (session, data); pre_sleep = 0; /* Don't go to sleep again. */ } /* Go to sleep. */ ml_session_release_lock (session); wq_sleep_on (r->message_wq); ml_session_acquire_lock (session); /* Try to get the message again. */ goto again; } extern int chatroom_get_message_multiple (const vector rooms, const hash next_msgids, message *msg_rtn, chatroom *room_rtn); void chatroom_post (chatroom r, ml_session session, const char *conninfo, const char *body) { db_handle dbh; st_handle sth; int userid, msgid; const char *username, *original_ip; struct dbi_timestamp posted_date; message m; /* Get the currently logged in user. */ userid = ml_session_userid (session); dbh = get_db_handle (conninfo, DBI_THROW_ERRORS); /* Insert the message into the database. */ sth = st_prepare_cached (dbh, "insert into ml_chat_log (resid, body, author, original_ip) " "values (?, ?, ?, ?)", DBI_INT, DBI_STRING, DBI_INT_OR_NULL, DBI_STRING); st_execute (sth, r->resid, body, userid, ml_session_get_peernamestr (session)); /* Get the serial number of the message. */ msgid = st_serial (sth, "ml_chat_log_id_seq"); /* Fetch back the message, which will now have the default fields (in * particular, the posting date) prefilled. */ sth = st_prepare_cached (dbh, "select g.posted_date, u.username, g.original_ip " " from ml_chat_log g left outer join ml_users u on g.author = u.userid " " where g.id = ?", DBI_INT); st_execute (sth, msgid); st_bind (sth, 0, posted_date, DBI_TIMESTAMP); st_bind (sth, 1, username, DBI_STRING); st_bind (sth, 2, original_ip, DBI_STRING); if (!st_fetch (sth)) pth_die (psprintf (ml_session_pool (session), "missing message ID: %d", msgid)); /* Save the new message. */ m = new_message_posting (cr_pool, posted_date.hour, posted_date.min, userid, userid ? username : 0, original_ip, body); vector_push_back (r->messages, m); /* Wake up anyone waiting for the message. */ wq_wake_up (r->message_wq); wq_wake_up (message_wq); /* Commit changes to the database. */ db_commit (dbh); put_db_handle (dbh); } void chatroom_spam_blank (pool thread_pool) { pool pool = new_subpool (thread_pool); vector resids = hash_keys_in_pool (rooms, pool); int i; for (i = 0; i < vector_size (resids); ++i) { message m; chatroom r; int resid; /* Get the room structure. */ vector_get (resids, i, resid); hash_get (rooms, resid, r); /* Send a blank message to this room. */ m = new_message_blank (cr_pool); vector_push_back (r->messages, m); /* Wake up anyone waiting for the message. */ wq_wake_up (r->message_wq); wq_wake_up (message_wq); } delete_pool (pool); } static void leave_room (void *vr); void chatroom_enter (chatroom r, int userid, const char *username) { pool sp; struct thread thread; int count; message m; /* Create a subpool of the _thread_ pool, so that if the thread dies * unexpectedly we can catch it. */ sp = new_subpool (pth_get_pool (current_pth)); pool_register_cleanup_fn (sp, leave_room, r); /* Update threads hash. */ thread.userid = userid; thread.username = username ? pstrdup (sp, username) : 0; thread.sp = sp; hash_insert (r->threads, current_pth, thread); /* Update users hash. */ if (hash_get (r->users, userid, count)) count++; else count = 1; hash_insert (r->users, userid, count); /* Someone has entered the room, so wake up sleepers. */ wq_wake_up (r->users_wq); if (count == 1 || userid == 0) { /* Also post a message in the room. */ m = new_message_enter (cr_pool, username); vector_push_back (r->messages, m); /* Wake up anyone waiting for the message. */ wq_wake_up (r->message_wq); wq_wake_up (message_wq); } } void chatroom_leave (chatroom r) { struct thread thread; /* If this fails, you probably tried to call chatroom_leave twice. */ assert (hash_get (r->threads, current_pth, thread)); /* This calls leave_room. */ delete_pool (thread.sp); } static void leave_room (void *vr) { chatroom r = (chatroom) vr; struct thread thread; int count; message m; assert (hash_get (r->threads, current_pth, thread)); assert (hash_erase (r->threads, current_pth)); /* Update the users hash. */ assert (hash_get (r->users, thread.userid, count)); if (count > 1) { count--; hash_insert (r->users, thread.userid, count); } else { count--; assert (hash_erase (r->users, thread.userid)); } /* Someone has left the room, so wake up sleepers. */ wq_wake_up (r->users_wq); if (count == 0 || thread.userid == 0) { /* Also post a message in the room. */ m = new_message_leave (cr_pool, thread.username); vector_push_back (r->messages, m); /* Wake up anyone waiting for the message. */ wq_wake_up (r->message_wq); wq_wake_up (message_wq); } } const hash chatroom_users (chatroom r) { return r->users; } void chatroom_wait_enter_leave_event (chatroom r, ml_session session) { ml_session_release_lock (session); wq_sleep_on (r->users_wq); ml_session_acquire_lock (session); }