diff --git a/aconfigure b/aconfigure index d6f0e8809..c068ff944 100755 --- a/aconfigure +++ b/aconfigure @@ -822,6 +822,7 @@ ac_subst_files='' ac_user_opts=' enable_option_checking enable_floating_point +enable_kqueue enable_epoll enable_shared enable_pjsua2 @@ -1518,6 +1519,7 @@ Optional Features: --enable-FEATURE[=ARG] include FEATURE [ARG=yes] --disable-floating-point Disable floating point where possible + --enable-kqueue Use kqueue ioqueue on macos/BSD (experimental) --enable-epoll Use /dev/epoll ioqueue on Linux (experimental) --enable-shared Build shared libraries --disable-pjsua2 Exclude pjsua2 library and application from the @@ -6448,26 +6450,49 @@ rm -f core conftest.err conftest.$ac_objext conftest.beam conftest.$ac_ext { printf "%s\n" "$as_me:${as_lineno-$LINENO}: checking ioqueue backend" >&5 printf %s "checking ioqueue backend... " >&6; } -# Check whether --enable-epoll was given. -if test ${enable_epoll+y} -then : - enableval=$enable_epoll; - ac_os_objs=ioqueue_epoll.o - { printf "%s\n" "$as_me:${as_lineno-$LINENO}: result: /dev/epoll" >&5 -printf "%s\n" "/dev/epoll" >&6; } - printf "%s\n" "#define PJ_HAS_LINUX_EPOLL 1" >>confdefs.h - ac_linux_poll=epoll +case $target in + *darwin* | *bsd*) + # Check whether --enable-kqueue was given. +if test ${enable_kqueue+y} +then : + enableval=$enable_kqueue; + ac_os_objs=ioqueue_kqueue.o + { printf "%s\n" "$as_me:${as_lineno-$LINENO}: result: kqueue()" >&5 +printf "%s\n" "kqueue()" >&6; } else $as_nop - ac_os_objs=ioqueue_select.o - { printf "%s\n" "$as_me:${as_lineno-$LINENO}: result: select()" >&5 + ac_os_objs=ioqueue_select.o + { printf "%s\n" "$as_me:${as_lineno-$LINENO}: result: select()" >&5 printf "%s\n" "select()" >&6; } - ac_linux_poll=select fi + ;; + *) + # Check whether --enable-epoll was given. +if test ${enable_epoll+y} +then : + enableval=$enable_epoll; + ac_os_objs=ioqueue_epoll.o + { printf "%s\n" "$as_me:${as_lineno-$LINENO}: result: /dev/epoll" >&5 +printf "%s\n" "/dev/epoll" >&6; } + printf "%s\n" "#define PJ_HAS_LINUX_EPOLL 1" >>confdefs.h + + ac_linux_poll=epoll + +else $as_nop + + ac_os_objs=ioqueue_select.o + { printf "%s\n" "$as_me:${as_lineno-$LINENO}: result: select()" >&5 +printf "%s\n" "select()" >&6; } + ac_linux_poll=select + +fi + + ;; +esac # Check whether --enable-shared was given. diff --git a/aconfigure.ac b/aconfigure.ac index 16b311045..ab405a83f 100644 --- a/aconfigure.ac +++ b/aconfigure.ac @@ -441,20 +441,38 @@ dnl # AC_SUBST(ac_os_objs) AC_SUBST(ac_linux_poll) AC_MSG_CHECKING([ioqueue backend]) -AC_ARG_ENABLE(epoll, - AS_HELP_STRING([--enable-epoll], - [Use /dev/epoll ioqueue on Linux (experimental)]), - [ - ac_os_objs=ioqueue_epoll.o - AC_MSG_RESULT([/dev/epoll]) - AC_DEFINE(PJ_HAS_LINUX_EPOLL,1) - ac_linux_poll=epoll - ], - [ - ac_os_objs=ioqueue_select.o - AC_MSG_RESULT([select()]) - ac_linux_poll=select - ]) + +case $target in + *darwin* | *bsd*) + AC_ARG_ENABLE(kqueue, + AS_HELP_STRING([--enable-kqueue], + [Use kqueue ioqueue on macos/BSD (experimental)]), + [ + ac_os_objs=ioqueue_kqueue.o + AC_MSG_RESULT([kqueue()]) + ], + [ + ac_os_objs=ioqueue_select.o + AC_MSG_RESULT([select()]) + ]) + ;; + *) + AC_ARG_ENABLE(epoll, + AS_HELP_STRING([--enable-epoll], + [Use /dev/epoll ioqueue on Linux (experimental)]), + [ + ac_os_objs=ioqueue_epoll.o + AC_MSG_RESULT([/dev/epoll]) + AC_DEFINE(PJ_HAS_LINUX_EPOLL,1) + ac_linux_poll=epoll + ], + [ + ac_os_objs=ioqueue_select.o + AC_MSG_RESULT([select()]) + ac_linux_poll=select + ]) + ;; +esac AC_SUBST(ac_shared_libraries) AC_ARG_ENABLE(shared, diff --git a/pjlib/src/pj/ioqueue_kqueue.c b/pjlib/src/pj/ioqueue_kqueue.c new file mode 100644 index 000000000..e579685ba --- /dev/null +++ b/pjlib/src/pj/ioqueue_kqueue.c @@ -0,0 +1,680 @@ +/* + * Copyright (C) 2022 Teluu Inc. (http://www.teluu.com) + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +/* + * ioqueue_kqueue.c + * + * This is the implementation of IOQueue framework using kqueue on macos/BSD + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#define os_kqueue_open kqueue +#define os_kqueue_close close +#define os_kqueue_ctl kevent +#define os_kqueue_wait kevent + +#define THIS_FILE "ioq_kqueue" + +//#define TRACE_(expr) PJ_LOG(3,expr) +#define TRACE_(expr) + +/* + * Include common ioqueue abstraction. + */ +#include "ioqueue_common_abs.h" + +/* + * This describes each key. + */ +struct pj_ioqueue_key_t { + DECLARE_COMMON_KEY +}; + +struct queue { + pj_ioqueue_key_t *key; + enum ioqueue_event_type event_type; +}; + +/* + * This describes the I/O queue. + */ +struct pj_ioqueue_t { + DECLARE_COMMON_IOQUEUE unsigned max, count; + pj_ioqueue_key_t active_list; + int kfd; // kqueue fd + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + pj_mutex_t *ref_cnt_mutex; + pj_ioqueue_key_t closing_list; + pj_ioqueue_key_t free_list; +#endif +}; + +/* Include implementation for common abstraction after we declare + * pj_ioqueue_key_t and pj_ioqueue_t. + */ +#include "ioqueue_common_abs.c" + +#if PJ_IOQUEUE_HAS_SAFE_UNREG +/* Scan closing keys to be put to free list again */ +static void scan_closing_keys(pj_ioqueue_t *ioqueue); +#endif + +/* + * pj_ioqueue_name() + */ +PJ_DEF(const char *) pj_ioqueue_name(void) +{ + return "kqueue"; +} + +/* + * pj_ioqueue_create() + * + * Create kqueue ioqueue. + */ +PJ_DEF(pj_status_t) +pj_ioqueue_create(pj_pool_t *pool, pj_size_t max_fd, pj_ioqueue_t **p_ioqueue) +{ + pj_ioqueue_t *ioqueue; + pj_status_t rc; + pj_lock_t *lock; + int i; + + /* Check that arguments are valid. */ + PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && max_fd > 0, + PJ_EINVAL); + + /* Check that size of pj_ioqueue_op_key_t is sufficient */ + PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t) - sizeof(void *) >= + sizeof(union operation_key), + PJ_EBUG); + + ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); + + ioqueue_init(ioqueue); + + ioqueue->max = max_fd; + ioqueue->count = 0; + pj_list_init(&ioqueue->active_list); + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* When safe unregistration is used (the default), we pre-create + * all keys and put them in the free list. + */ + + /* Mutex to protect key's reference counter + * We don't want to use key's mutex or ioqueue's mutex because + * that would create deadlock situation in some cases. + */ + rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex); + if (rc != PJ_SUCCESS) + return rc; + + /* Init key list */ + pj_list_init(&ioqueue->free_list); + pj_list_init(&ioqueue->closing_list); + + /* Pre-create all keys according to max_fd */ + for (i = 0; i < max_fd; ++i) { + pj_ioqueue_key_t *key; + + key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t); + key->ref_count = 0; + rc = pj_lock_create_recursive_mutex(pool, NULL, &key->lock); + if (rc != PJ_SUCCESS) { + key = ioqueue->free_list.next; + while (key != &ioqueue->free_list) { + pj_lock_destroy(key->lock); + key = key->next; + } + pj_mutex_destroy(ioqueue->ref_cnt_mutex); + return rc; + } + + pj_list_push_back(&ioqueue->free_list, key); + } +#endif + + rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); + if (rc != PJ_SUCCESS) + return rc; + + rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); + if (rc != PJ_SUCCESS) + return rc; + + /* create kqueue */ + ioqueue->kfd = os_kqueue_open(); + if (ioqueue->kfd == -1) { + pj_lock_acquire(ioqueue->lock); + ioqueue_destroy(ioqueue); + return PJ_RETURN_OS_ERROR(pj_get_native_os_error()); + } + + PJ_LOG(4, + ("pjlib", "%s I/O Queue created (%p)", pj_ioqueue_name(), ioqueue)); + + *p_ioqueue = ioqueue; + return PJ_SUCCESS; +} + +/* + * pj_ioqueue_destroy() + * + * Destroy ioqueue. + */ +PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) +{ + pj_ioqueue_key_t *key; + + PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); + PJ_ASSERT_RETURN(ioqueue->kfd > 0, PJ_EINVALIDOP); + + pj_lock_acquire(ioqueue->lock); + os_kqueue_close(ioqueue->kfd); + ioqueue->kfd = -1; + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Destroy reference counters */ + key = ioqueue->active_list.next; + while (key != &ioqueue->active_list) { + pj_lock_destroy(key->lock); + key = key->next; + } + + key = ioqueue->closing_list.next; + while (key != &ioqueue->closing_list) { + pj_lock_destroy(key->lock); + key = key->next; + } + + key = ioqueue->free_list.next; + while (key != &ioqueue->free_list) { + pj_lock_destroy(key->lock); + key = key->next; + } + + pj_mutex_destroy(ioqueue->ref_cnt_mutex); +#endif + return ioqueue_destroy(ioqueue); +} + +/* + * pj_ioqueue_register_sock() + * + * Register a socket to ioqueue. + */ +PJ_DEF(pj_status_t) +pj_ioqueue_register_sock2(pj_pool_t *pool, pj_ioqueue_t *ioqueue, + pj_sock_t sock, pj_grp_lock_t *grp_lock, + void *user_data, const pj_ioqueue_callback *cb, + pj_ioqueue_key_t **p_key) +{ + pj_ioqueue_key_t *key = NULL; + pj_uint32_t value; + int status; + pj_status_t rc = PJ_SUCCESS; + struct kevent events[2]; + + PJ_ASSERT_RETURN( + pool && ioqueue && sock != PJ_INVALID_SOCKET && cb && p_key, PJ_EINVAL); + + pj_lock_acquire(ioqueue->lock); + + if (ioqueue->count >= ioqueue->max) { + rc = PJ_ETOOMANY; + TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files")); + goto on_return; + } + + /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get + * the key from the free list. Otherwise allocate a new one. + */ +#if PJ_IOQUEUE_HAS_SAFE_UNREG + + /* Scan closing_keys first to let them come back to free_list */ + scan_closing_keys(ioqueue); + + pj_assert(!pj_list_empty(&ioqueue->free_list)); + if (pj_list_empty(&ioqueue->free_list)) { + rc = PJ_ETOOMANY; + goto on_return; + } + + key = ioqueue->free_list.next; + pj_list_erase(key); +#else + /* Create key. */ + key = (pj_ioqueue_key_t *)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); +#endif + + rc = ioqueue_init_key(pool, ioqueue, key, sock, grp_lock, user_data, cb); + if (rc != PJ_SUCCESS) { + key = NULL; + goto on_return; + } + + /* Initialize kevent structure, ADD read/write, default disable write */ + EV_SET(&events[0], key->fd, EVFILT_READ, EV_ADD, 0, 0, key); + EV_SET(&events[1], key->fd, EVFILT_WRITE, EV_ADD | EV_DISABLE, 0, 0, key); + + /* add event to kqueue */ + status = os_kqueue_ctl(ioqueue->kfd, events, 2, NULL, 0, NULL); + if (status == -1) { + rc = pj_get_os_error(); + pj_lock_destroy(key->lock); + key = NULL; + TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: kevent rc=%d", + status)); + goto on_return; + } + + /* Set socket to nonblocking. */ + value = 1; + if (ioctl(sock, FIONBIO, &value)) { + rc = pj_get_netos_error(); + goto on_return; + } + + /* Put in active list. */ + pj_list_insert_before(&ioqueue->active_list, key); + ++ioqueue->count; + +on_return: + if (rc != PJ_SUCCESS) { + if (key && key->grp_lock) + pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0); + } + *p_key = key; + pj_lock_release(ioqueue->lock); + + return rc; +} + +PJ_DEF(pj_status_t) +pj_ioqueue_register_sock(pj_pool_t *pool, pj_ioqueue_t *ioqueue, pj_sock_t sock, + void *user_data, const pj_ioqueue_callback *cb, + pj_ioqueue_key_t **p_key) +{ + return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data, cb, + p_key); +} + +#if PJ_IOQUEUE_HAS_SAFE_UNREG +/* Increment key's reference counter */ +static void increment_counter(pj_ioqueue_key_t *key) +{ + pj_mutex_lock(key->ioqueue->ref_cnt_mutex); + ++key->ref_count; + pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); +} + +/* Decrement the key's reference counter, and when the counter reach zero, + * destroy the key. + * + * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK. + */ +static void decrement_counter(pj_ioqueue_key_t *key) +{ + pj_lock_acquire(key->ioqueue->lock); + pj_mutex_lock(key->ioqueue->ref_cnt_mutex); + --key->ref_count; + if (key->ref_count == 0) { + + pj_assert(key->closing == 1); + pj_gettickcount(&key->free_time); + key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; + pj_time_val_normalize(&key->free_time); + + pj_list_erase(key); + pj_list_push_back(&key->ioqueue->closing_list, key); + } + pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); + pj_lock_release(key->ioqueue->lock); +} +#endif + +/* + * pj_ioqueue_unregister() + * + * Unregister handle from ioqueue. + */ +PJ_DEF(pj_status_t) pj_ioqueue_unregister(pj_ioqueue_key_t *key) +{ + pj_ioqueue_t *ioqueue; + int status; + struct kevent events[2]; + + PJ_ASSERT_RETURN(key != NULL, PJ_EINVAL); + + ioqueue = key->ioqueue; + + /* Lock the key to make sure no callback is simultaneously modifying + * the key. We need to lock the key before ioqueue here to prevent + * deadlock. + */ + pj_ioqueue_lock_key(key); + + /* Best effort to avoid double key-unregistration */ + if (IS_CLOSING(key)) { + pj_ioqueue_unlock_key(key); + return PJ_SUCCESS; + } + + /* Also lock ioqueue */ + pj_lock_acquire(ioqueue->lock); + + /* Avoid "negative" ioqueue count */ + if (ioqueue->count > 0) { + --ioqueue->count; + } else { + /* If this happens, very likely there is double unregistration + * of a key. + */ + pj_assert(!"Bad ioqueue count in key unregistration!"); + PJ_LOG(1, (THIS_FILE, "Bad ioqueue count in key unregistration!")); + } + +#if !PJ_IOQUEUE_HAS_SAFE_UNREG + pj_list_erase(key); +#endif + + /* Initialize kevent structure, DELETE read/write */ + EV_SET(&events[0], key->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + EV_SET(&events[1], key->fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + + /* delete event from kqueue */ + status = os_kqueue_ctl(ioqueue->kfd, events, 2, NULL, 0, NULL); + if (status == -1) { + pj_status_t rc = pj_get_os_error(); + pj_lock_release(ioqueue->lock); + pj_ioqueue_unlock_key(key); + return rc; + } + + /* Destroy the key. */ + pj_sock_close(key->fd); + + pj_lock_release(ioqueue->lock); + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Mark key is closing. */ + key->closing = 1; + + /* Decrement counter. */ + decrement_counter(key); + + /* Done. */ + if (key->grp_lock) { + /* just dec_ref and unlock. we will set grp_lock to NULL + * elsewhere */ + pj_grp_lock_t *grp_lock = key->grp_lock; + // Don't set grp_lock to NULL otherwise the other thread + // will crash. Just leave it as dangling pointer, but this + // should be safe + // key->grp_lock = NULL; + pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0); + pj_grp_lock_release(grp_lock); + } else { + pj_ioqueue_unlock_key(key); + } +#else + if (key->grp_lock) { + /* set grp_lock to NULL and unlock */ + pj_grp_lock_t *grp_lock = key->grp_lock; + // Don't set grp_lock to NULL otherwise the other thread + // will crash. Just leave it as dangling pointer, but this + // should be safe + // key->grp_lock = NULL; + pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0); + pj_grp_lock_release(grp_lock); + } else { + pj_ioqueue_unlock_key(key); + } + + pj_lock_destroy(key->lock); +#endif + + return PJ_SUCCESS; +} + +/* ioqueue_remove_from_set() + * This function is called from ioqueue_dispatch_event() to instruct + * the ioqueue to remove the specified descriptor from ioqueue's descriptor + * set for the specified event. + */ +static void ioqueue_remove_from_set(pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *key, + enum ioqueue_event_type event_type) +{ + struct kevent event; + switch (event_type) { + case READABLE_EVENT: + /* + if (!key_has_pending_read(key) && !key_has_pending_accept(key)) { + EV_SET(&event, key->fd, EVFILT_READ, EV_DISABLE, 0, 0, key); + os_kqueue_ctl(ioqueue->kfd, &event, 1, NULL, 0, NULL); + } + */ + break; + case WRITEABLE_EVENT: + if (!key_has_pending_write(key) && !key_has_pending_connect(key)) { + EV_SET(&event, key->fd, EVFILT_WRITE, EV_DISABLE, 0, 0, key); + os_kqueue_ctl(ioqueue->kfd, &event, 1, NULL, 0, NULL); + } + break; + default: + break; + } +} + +/* + * ioqueue_add_to_set() + * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc + * to instruct the ioqueue to add the specified handle to ioqueue's descriptor + * set for the specified event. + */ +static void ioqueue_add_to_set(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *key, + enum ioqueue_event_type event_type) +{ + struct kevent event; + switch (event_type) { + case READABLE_EVENT: + /* + EV_SET(&event, key->fd, EVFILT_READ, EV_ENABLE, 0, 0, key); + os_kqueue_ctl(ioqueue->kfd, &event, 1, NULL, 0, NULL); + */ + break; + case WRITEABLE_EVENT: + EV_SET(&event, key->fd, EVFILT_WRITE, EV_ENABLE, 0, 0, key); + os_kqueue_ctl(ioqueue->kfd, &event, 1, NULL, 0, NULL); + break; + default: + break; + } +} + +#if PJ_IOQUEUE_HAS_SAFE_UNREG +/* Scan closing keys to be put to free list again */ +static void scan_closing_keys(pj_ioqueue_t *ioqueue) +{ + pj_time_val now; + pj_ioqueue_key_t *h; + + pj_gettickcount(&now); + h = ioqueue->closing_list.next; + while (h != &ioqueue->closing_list) { + pj_ioqueue_key_t *next = h->next; + + pj_assert(h->closing != 0); + + if (PJ_TIME_VAL_GTE(now, h->free_time)) { + pj_list_erase(h); + // Don't set grp_lock to NULL otherwise the other thread + // will crash. Just leave it as dangling pointer, but this + // should be safe + // h->grp_lock = NULL; + pj_list_push_back(&ioqueue->free_list, h); + } + h = next; + } +} +#endif + +/* + * pj_ioqueue_poll() + * + */ +PJ_DEF(int) pj_ioqueue_poll(pj_ioqueue_t *ioqueue, const pj_time_val *timeout) +{ + int i, count, event_cnt, processed_cnt; + struct timespec xtimeout = {1, 0}; + enum + { + MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS + }; + struct kevent events[MAX_EVENTS]; + struct queue queue[MAX_EVENTS]; + + PJ_CHECK_STACK(); + + if (timeout) { + xtimeout.tv_sec = timeout->sec; + xtimeout.tv_nsec = timeout->msec * 1000; + } + + TRACE_((THIS_FILE, "start kqueue wait, msec=%d", + xtimeout.tv_sec * 1000 + xtimeout.tv_nsec / 1000)); + count = + os_kqueue_wait(ioqueue->kfd, NULL, 0, events, MAX_EVENTS, &xtimeout); + if (count == 0) { +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Check the closing keys only when there's no activity and when there + * are pending closing keys. + */ + if (count == 0 && !pj_list_empty(&ioqueue->closing_list)) { + pj_lock_acquire(ioqueue->lock); + scan_closing_keys(ioqueue); + pj_lock_release(ioqueue->lock); + } +#endif + TRACE_((THIS_FILE, "kqueue wait timed out")); + return count; + } else if (count < 0) { + TRACE_((THIS_FILE, "kqueue wait error")); + return -pj_get_netos_error(); + } + + /* Lock ioqueue. */ + pj_lock_acquire(ioqueue->lock); + + for (event_cnt = 0, i = 0; i < count; ++i) { + pj_ioqueue_key_t *h = (pj_ioqueue_key_t *)events[i].udata; + + TRACE_((THIS_FILE, "event %d: events=%d", i, events[i].filter)); + + /* + * Check readability. + */ + if ((events[i].filter & EVFILT_READ) && + (key_has_pending_read(h) || key_has_pending_accept(h)) && + !IS_CLOSING(h)) { + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + increment_counter(h); +#endif + queue[event_cnt].key = h; + queue[event_cnt].event_type = READABLE_EVENT; + ++event_cnt; + continue; + } + + /* + * Check for writeability. + */ + if ((events[i].filter & EVFILT_WRITE) && + (key_has_pending_write(h) || key_has_pending_connect(h)) && + !IS_CLOSING(h)) { + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + increment_counter(h); +#endif + queue[event_cnt].key = h; + queue[event_cnt].event_type = WRITEABLE_EVENT; + ++event_cnt; + continue; + } + } + + for (i = 0; i < event_cnt; ++i) { + if (queue[i].key->grp_lock) + pj_grp_lock_add_ref_dbg(queue[i].key->grp_lock, "ioqueue", 0); + } + + pj_lock_release(ioqueue->lock); + + processed_cnt = 0; + + /* Now process the events. */ + for (i = 0; i < event_cnt; ++i) { + + /* Just do not exceed PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL */ + if (processed_cnt < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) { + switch (queue[i].event_type) { + case READABLE_EVENT: + if (ioqueue_dispatch_read_event(ioqueue, queue[i].key)) + ++processed_cnt; + break; + case WRITEABLE_EVENT: + if (ioqueue_dispatch_write_event(ioqueue, queue[i].key)) + ++processed_cnt; + break; + case EXCEPTION_EVENT: + if (ioqueue_dispatch_exception_event(ioqueue, queue[i].key)) + ++processed_cnt; + break; + case NO_EVENT: + pj_assert(!"Invalid event!"); + break; + } + } +#if PJ_IOQUEUE_HAS_SAFE_UNREG + decrement_counter(queue[i].key); +#endif + + if (queue[i].key->grp_lock) + pj_grp_lock_dec_ref_dbg(queue[i].key->grp_lock, "ioqueue", 0); + } + + TRACE_((THIS_FILE, " poll: count=%d events=%d processed=%d", count, + event_cnt, processed_cnt)); + + return processed_cnt; +}