Attempted to fix epoll for Linux

git-svn-id: https://svn.pjsip.org/repos/pjproject/trunk@592 74dad513-b988-da41-8d7b-12977e46ad98
This commit is contained in:
Benny Prijono 2006-07-08 19:46:43 +00:00
parent bdf202a5b3
commit 63ab356dbb
7 changed files with 106 additions and 60 deletions

View File

@ -196,8 +196,8 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
/* Clear operation. */
h->connecting = 0;
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
@ -272,7 +272,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
pj_list_erase(write_op);
if (pj_list_empty(&h->write_list))
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
}
@ -325,7 +325,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
/* Clear operation if there's no more data to send. */
if (pj_list_empty(&h->write_list))
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
}
@ -378,7 +378,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
/* Clear bit in fdset if there is no more pending accept */
if (pj_list_empty(&h->accept_list))
ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
rc=pj_sock_accept(h->fd, accept_op->accept_fd,
accept_op->rmt_addr, accept_op->addrlen);
@ -411,7 +411,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
/* Clear fdset if there is no pending read. */
if (pj_list_empty(&h->read_list))
ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
bytes_read = read_op->size;
@ -518,8 +518,8 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
/* Clear operation. */
h->connecting = 0;
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
pj_mutex_unlock(h->mutex);
@ -585,7 +585,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
pj_mutex_lock(key->mutex);
pj_list_insert_before(&key->read_list, read_op);
ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
@ -653,7 +653,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
pj_mutex_lock(key->mutex);
pj_list_insert_before(&key->read_list, read_op);
ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
@ -759,7 +759,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
pj_mutex_lock(key->mutex);
pj_list_insert_before(&key->write_list, write_op);
ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
@ -827,6 +827,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
}
status = status;
}
PJ_LOG(3,(THIS_FILE, "pending write operation!!"));
}
/*
@ -876,7 +877,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
pj_mutex_lock(key->mutex);
pj_list_insert_before(&key->write_list, write_op);
ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
@ -945,7 +946,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
pj_mutex_lock(key->mutex);
pj_list_insert_before(&key->accept_list, accept_op);
ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
@ -981,8 +982,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
/* Pending! */
pj_mutex_lock(key->mutex);
key->connecting = PJ_TRUE;
ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT);
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
} else {

View File

@ -128,9 +128,9 @@ enum ioqueue_event_type
};
static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
pj_sock_t fd,
pj_ioqueue_key_t *key,
enum ioqueue_event_type event_type );
static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
pj_sock_t fd,
pj_ioqueue_key_t *key,
enum ioqueue_event_type event_type);

View File

@ -141,8 +141,8 @@
#define THIS_FILE "ioq_epoll"
#define TRACE_(expr) PJ_LOG(3,expr)
//#define TRACE_(expr)
//#define TRACE_(expr) PJ_LOG(3,expr)
#define TRACE_(expr)
/*
* Include common ioqueue abstraction.
@ -157,6 +157,12 @@ struct pj_ioqueue_key_t
DECLARE_COMMON_KEY
};
struct queue
{
pj_ioqueue_key_t *key;
enum ioqueue_event_type event_type;
};
/*
* This describes the I/O queue.
*/
@ -167,6 +173,8 @@ struct pj_ioqueue_t
unsigned max, count;
pj_ioqueue_key_t hlist;
int epfd;
struct epoll_event *events;
struct queue *queue;
};
/* Include implementation for common abstraction after we declare
@ -229,6 +237,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
return PJ_RETURN_OS_ERROR(pj_get_native_os_error());
}
ioqueue->events = pj_pool_calloc(pool, max_fd, sizeof(struct epoll_event));
PJ_ASSERT_RETURN(ioqueue->events != NULL, PJ_ENOMEM);
ioqueue->queue = pj_pool_calloc(pool, max_fd, sizeof(struct queue));
PJ_ASSERT_RETURN(ioqueue->queue != NULL, PJ_ENOMEM);
PJ_LOG(4, ("pjlib", "epoll I/O Queue created (%p)", ioqueue));
*p_ioqueue = ioqueue;
@ -305,7 +319,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
}
/* os_epoll_ctl. */
ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
ev.events = EPOLLIN | EPOLLERR;
ev.epoll_data = (epoll_data_type)key;
status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev);
if (status < 0) {
@ -322,6 +336,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
pj_list_insert_before(&ioqueue->hlist, key);
++ioqueue->count;
//TRACE_((THIS_FILE, "socket registered, count=%d", ioqueue->count));
on_return:
*p_key = key;
pj_lock_release(ioqueue->lock);
@ -373,9 +389,16 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
* set for the specified event.
*/
static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
pj_sock_t fd,
pj_ioqueue_key_t *key,
enum ioqueue_event_type event_type)
{
if (event_type == WRITEABLE_EVENT) {
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLERR;
ev.epoll_data = (epoll_data_type)key;
os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_MOD, key->fd, &ev);
}
}
/*
@ -385,9 +408,16 @@ static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
* set for the specified event.
*/
static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
pj_sock_t fd,
pj_ioqueue_key_t *key,
enum ioqueue_event_type event_type )
{
if (event_type == WRITEABLE_EVENT) {
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
ev.epoll_data = (epoll_data_type)key;
os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_MOD, key->fd, &ev);
}
}
/*
@ -397,22 +427,31 @@ static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
int i, count, processed;
struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
int msec;
struct queue {
pj_ioqueue_key_t *key;
enum ioqueue_event_type event_type;
} queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
struct epoll_event *events = ioqueue->events;
struct queue *queue = ioqueue->queue;
pj_timestamp t1, t2;
PJ_CHECK_STACK();
msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000;
count = os_epoll_wait( ioqueue->epfd, events, PJ_ARRAY_SIZE(events), msec);
if (count == 0)
TRACE_((THIS_FILE, "start os_epoll_wait, msec=%d", msec));
pj_get_timestamp(&t1);
count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec);
if (count == 0) {
TRACE_((THIS_FILE, "os_epoll_wait timed out"));
return count;
else if (count < 0)
}
else if (count < 0) {
TRACE_((THIS_FILE, "os_epoll_wait error"));
return -pj_get_netos_error();
}
pj_get_timestamp(&t2);
TRACE_((THIS_FILE, "os_epoll_wait returns %d, time=%d usec",
count, pj_elapsed_usec(&t1, &t2)));
/* Lock ioqueue. */
pj_lock_acquire(ioqueue->lock);
@ -421,6 +460,8 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type)
events[i].epoll_data;
TRACE_((THIS_FILE, "event %d: events=%d", i, events[i].events));
/*
* Check readability.
*/
@ -486,6 +527,11 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
if (count > 0 && !processed && msec > 0) {
pj_thread_sleep(msec);
}
pj_get_timestamp(&t1);
TRACE_((THIS_FILE, "ioqueue_poll() returns %d, time=%d usec",
processed, pj_elapsed_usec(&t2, &t1)));
return processed;
}

View File

@ -542,17 +542,17 @@ static void validate_sets(const pj_ioqueue_t *ioqueue,
* set for the specified event.
*/
static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
pj_sock_t fd,
pj_ioqueue_key_t *key,
enum ioqueue_event_type event_type)
{
pj_lock_acquire(ioqueue->lock);
if (event_type == READABLE_EVENT)
PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset);
PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->rfdset);
else if (event_type == WRITEABLE_EVENT)
PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset);
PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->wfdset);
else if (event_type == EXCEPTION_EVENT)
PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset);
PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->xfdset);
else
pj_assert(0);
@ -566,17 +566,17 @@ static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
* set for the specified event.
*/
static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
pj_sock_t fd,
pj_ioqueue_key_t *key,
enum ioqueue_event_type event_type )
{
pj_lock_acquire(ioqueue->lock);
if (event_type == READABLE_EVENT)
PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset);
PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->rfdset);
else if (event_type == WRITEABLE_EVENT)
PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset);
PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->wfdset);
else if (event_type == EXCEPTION_EVENT)
PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset);
PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->xfdset);
else
pj_assert(0);

View File

@ -38,7 +38,6 @@
#if PJ_HAS_TCP
#define THIS_FILE "test_tcp"
#define PORT 50000
#define NON_EXISTANT_PORT 50123
#define LOOP 100
#define BUF_MIN_SIZE 32
@ -257,14 +256,21 @@ static int compliance_test_0(void)
}
// Bind server socket.
memset(&addr, 0, sizeof(addr));
addr.sin_family = PJ_AF_INET;
addr.sin_port = pj_htons(PORT);
if (pj_sock_bind(ssock, &addr, sizeof(addr))) {
pj_sockaddr_in_init(&addr, 0, 0);
if ((rc=pj_sock_bind(ssock, &addr, sizeof(addr))) != 0 ) {
app_perror("...bind error", rc);
status=-10; goto on_error;
}
// Get server address.
client_addr_len = sizeof(addr);
rc = pj_sock_getsockname(ssock, &addr, &client_addr_len);
if (rc != PJ_SUCCESS) {
app_perror("...ERROR in pj_sock_getsockname()", rc);
status=-15; goto on_error;
}
addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1"));
// Create I/O Queue.
rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
if (rc != PJ_SUCCESS) {
@ -302,12 +308,6 @@ static int compliance_test_0(void)
++pending_op;
}
// Initialize remote address.
memset(&addr, 0, sizeof(addr));
addr.sin_family = PJ_AF_INET;
addr.sin_port = pj_htons(PORT);
addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1"));
// Client socket connect()
status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr));
if (status!=PJ_SUCCESS && status != PJ_EPENDING) {
@ -461,10 +461,7 @@ static int compliance_test_1(void)
}
// Initialize remote address.
memset(&addr, 0, sizeof(addr));
addr.sin_family = PJ_AF_INET;
addr.sin_port = pj_htons(NON_EXISTANT_PORT);
addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1"));
pj_sockaddr_in_init(&addr, pj_cstr(&s, "127.0.0.1"), NON_EXISTANT_PORT);
// Client socket connect()
status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr));

View File

@ -40,7 +40,7 @@
#define THIS_FILE "test_udp"
#define PORT 51233
#define LOOP 100
#define LOOP 2
///#define LOOP 2
#define BUF_MIN_SIZE 32
#define BUF_MAX_SIZE 2048
@ -817,7 +817,7 @@ int udp_ioqueue_test()
int status;
int bufsize, sock_count;
goto pass1;
//goto pass1;
PJ_LOG(3, (THIS_FILE, "...compliance test (%s)", pj_ioqueue_name()));
if ((status=compliance_test()) != 0) {
@ -836,6 +836,8 @@ int udp_ioqueue_test()
return status;
}
//return 0;
PJ_LOG(4, (THIS_FILE, "...benchmarking different buffer size:"));
PJ_LOG(4, (THIS_FILE, "... note: buf=bytes sent, fds=# of fds, "
"elapsed=in timer ticks"));
@ -847,7 +849,7 @@ pass1:
PJ_LOG(3, (THIS_FILE, "... (bytes) (nanosec)"));
PJ_LOG(3, (THIS_FILE, "...====================================="));
goto pass2;
//goto pass2;
for (bufsize=BUF_MIN_SIZE; bufsize <= BUF_MAX_SIZE; bufsize *= 2) {
if ((status=bench_test(bufsize, SOCK_INACTIVE_MIN)) != 0)
@ -859,7 +861,7 @@ pass2:
sock_count<=SOCK_INACTIVE_MAX+2;
sock_count *= 2)
{
PJ_LOG(3,(THIS_FILE, "...testing with %d fds", sock_count));
//PJ_LOG(3,(THIS_FILE, "...testing with %d fds", sock_count));
if ((status=bench_test(bufsize, sock_count-2)) != 0)
return status;
}

View File

@ -24,7 +24,7 @@
#define GROUP_LIBC 0
#define GROUP_OS 0
#define GROUP_DATA_STRUCTURE 0
#define GROUP_NETWORK 0
#define GROUP_NETWORK 1
#define GROUP_FILE 0
#define INCLUDE_ERRNO_TEST GROUP_LIBC
@ -45,7 +45,7 @@
#define INCLUDE_SOCK_TEST GROUP_NETWORK
#define INCLUDE_SOCK_PERF_TEST GROUP_NETWORK
#define INCLUDE_SELECT_TEST GROUP_NETWORK
#define INCLUDE_UDP_IOQUEUE_TEST 1 //GROUP_NETWORK
#define INCLUDE_UDP_IOQUEUE_TEST GROUP_NETWORK
#define INCLUDE_TCP_IOQUEUE_TEST GROUP_NETWORK
#define INCLUDE_IOQUEUE_PERF_TEST GROUP_NETWORK
#define INCLUDE_IOQUEUE_UNREG_TEST GROUP_NETWORK