From 31c9c139fdfbd22d10cd7839b8575853532a4ded Mon Sep 17 00:00:00 2001 From: Nanang Izzuddin Date: Fri, 11 Mar 2016 04:17:32 +0000 Subject: [PATCH] Re #1900: - Works on UWP socket & ioqueue. - Media transport UDP: cancel any pending send on detach, otherwise there is possibility that send buffer is already freed by application (stream) when the send op starts. - Ioqueue common abs: rename 'generic' as it seems to be a keyword in C++/CX, fixed #if/#endif possition in ioqueue_init_key(). - pjsua GUI app: fixed thread registration status check. git-svn-id: https://svn.pjsip.org/repos/pjproject/branches/projects/uwp@5256 74dad513-b988-da41-8d7b-12977e46ad98 --- pjlib/src/pj/ioqueue_common_abs.c | 2 +- pjlib/src/pj/ioqueue_common_abs.h | 2 +- pjlib/src/pj/ioqueue_uwp.cpp | 524 ++++---- pjlib/src/pj/sock_uwp.cpp | 1054 +++++++++-------- pjlib/src/pj/sock_uwp.h | 99 +- pjmedia/src/pjmedia/transport_udp.c | 9 + .../pjsua/winrt/gui/uwp/VoipBackEnd/MyApp.cpp | 5 +- 7 files changed, 839 insertions(+), 856 deletions(-) diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index 55e48e35a..edb3c0272 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -116,9 +116,9 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool, /* Create mutex for the key. */ #if !PJ_IOQUEUE_HAS_SAFE_UNREG rc = pj_lock_create_simple_mutex(pool, NULL, &key->lock); -#endif if (rc != PJ_SUCCESS) return rc; +#endif /* Group lock */ key->grp_lock = grp_lock; diff --git a/pjlib/src/pj/ioqueue_common_abs.h b/pjlib/src/pj/ioqueue_common_abs.h index 3bdbb5248..d5e36b4d6 100644 --- a/pjlib/src/pj/ioqueue_common_abs.h +++ b/pjlib/src/pj/ioqueue_common_abs.h @@ -80,7 +80,7 @@ struct accept_operation union operation_key { - struct generic_operation generic; + struct generic_operation generic_op; struct read_operation read; struct write_operation write; #if PJ_HAS_TCP diff --git a/pjlib/src/pj/ioqueue_uwp.cpp b/pjlib/src/pj/ioqueue_uwp.cpp index 4b22b004d..8dee694a4 100644 --- a/pjlib/src/pj/ioqueue_uwp.cpp +++ b/pjlib/src/pj/ioqueue_uwp.cpp @@ -16,111 +16,151 @@ * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -/* #include -#include #include -#include -#include -#include -#include - -#include -#include #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -*/ - -#include -#include -#include -#include -#include -#include -#include #include #include -#include -#include -#include #include #include +#define THIS_FILE "ioq_uwp" + #include "sock_uwp.h" +#include "ioqueue_common_abs.h" - -/* - * IO Queue structure. - */ -struct pj_ioqueue_t -{ - int dummy; -}; - - -/* - * IO Queue key structure. + /* + * This describes each key. */ struct pj_ioqueue_key_t { - pj_sock_t sock; - void *user_data; - pj_ioqueue_callback cb; + DECLARE_COMMON_KEY +}; + +/* +* This describes the I/O queue itself. +*/ +struct pj_ioqueue_t +{ + DECLARE_COMMON_IOQUEUE + pj_thread_desc thread_desc[16]; + unsigned thread_cnt; }; +#include "ioqueue_common_abs.c" + +static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *key, + enum ioqueue_event_type event_type) +{ + PJ_UNUSED_ARG(ioqueue); + PJ_UNUSED_ARG(key); + PJ_UNUSED_ARG(event_type); +} + + +static void start_next_read(pj_ioqueue_key_t *key) +{ + if (key_has_pending_read(key)) { + PjUwpSocket *s = (PjUwpSocket*)key->fd; + struct read_operation *op; + op = (struct read_operation*)key->read_list.next; + + if (op->op == PJ_IOQUEUE_OP_RECV) + s->Recv(NULL, (pj_ssize_t*)&op->size); + else + s->RecvFrom(NULL, (pj_ssize_t*)&op->size, NULL); + } +} + + +static void start_next_write(pj_ioqueue_key_t *key) +{ + if (key_has_pending_write(key)) { + PjUwpSocket *s = (PjUwpSocket*)key->fd; + struct write_operation *op; + op = (struct write_operation*)key->write_list.next; + + if (op->op == PJ_IOQUEUE_OP_SEND) + s->Send(op->buf, (pj_ssize_t*)&op->size); + else + s->SendTo(op->buf, (pj_ssize_t*)&op->size, &op->rmt_addr); + } +} + + +static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *key, + enum ioqueue_event_type event_type ) +{ + PJ_UNUSED_ARG(ioqueue); + + if (event_type == READABLE_EVENT) { + /* This is either recv, recvfrom, or accept, do nothing on accept */ + start_next_read(key); + } else if (event_type == WRITEABLE_EVENT) { + /* This is either send, sendto, or connect, do nothing on connect */ + //start_next_write(key); + } +} + + +static void check_thread(pj_ioqueue_t *ioq) { + if (pj_thread_is_registered()) + return; + + pj_thread_t *t; + char tmp[16]; + pj_ansi_snprintf(tmp, sizeof(tmp), "UwpThread%02d", ioq->thread_cnt); + pj_thread_register(tmp, ioq->thread_desc[ioq->thread_cnt++], &t); + pj_assert(ioq->thread_cnt < PJ_ARRAY_SIZE(ioq->thread_desc)); + ioq->thread_cnt %= PJ_ARRAY_SIZE(ioq->thread_desc); +} + static void on_read(PjUwpSocket *s, int bytes_read) { - pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->user_data; - if (key->cb.on_read_complete) { - (*key->cb.on_read_complete)(key, (pj_ioqueue_op_key_t*)s->read_userdata, bytes_read); - } - s->read_userdata = NULL; + pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->GetUserData(); + pj_ioqueue_t *ioq = key->ioqueue; + check_thread(ioq); + + ioqueue_dispatch_read_event(key->ioqueue, key); + + if (bytes_read > 0) + start_next_read(key); } static void on_write(PjUwpSocket *s, int bytes_sent) { - pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->user_data; - if (key->cb.on_write_complete) { - (*key->cb.on_write_complete)(key, (pj_ioqueue_op_key_t*)s->write_userdata, bytes_sent); - } - s->write_userdata = NULL; + PJ_UNUSED_ARG(bytes_sent); + pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->GetUserData(); + pj_ioqueue_t *ioq = key->ioqueue; + check_thread(ioq); + + ioqueue_dispatch_write_event(key->ioqueue, key); + + //start_next_write(key); } -static void on_accept(PjUwpSocket *s, pj_status_t status) +static void on_accept(PjUwpSocket *s) { - pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->user_data; - if (key->cb.on_accept_complete) { - if (status == PJ_SUCCESS) { - pj_sock_t new_sock; - pj_sockaddr addr; - int addrlen; - status = pj_sock_accept(key->sock, &new_sock, &addr, &addrlen); - (*key->cb.on_accept_complete)(key, (pj_ioqueue_op_key_t*)s->accept_userdata, new_sock, status); - } else { - (*key->cb.on_accept_complete)(key, (pj_ioqueue_op_key_t*)s->accept_userdata, NULL, status); - } - } - s->accept_userdata = NULL; + pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->GetUserData(); + pj_ioqueue_t *ioq = key->ioqueue; + check_thread(ioq); + + ioqueue_dispatch_read_event(key->ioqueue, key); } static void on_connect(PjUwpSocket *s, pj_status_t status) { - pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->user_data; - if (key->cb.on_connect_complete) { - (*key->cb.on_connect_complete)(key, status); - } + PJ_UNUSED_ARG(status); + pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->GetUserData(); + pj_ioqueue_t *ioq = key->ioqueue; + check_thread(ioq); + + ioqueue_dispatch_write_event(key->ioqueue, key); } @@ -141,10 +181,24 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_ioqueue_t **p_ioqueue) { pj_ioqueue_t *ioq; + pj_lock_t *lock; + pj_status_t rc; PJ_UNUSED_ARG(max_fd); ioq = PJ_POOL_ZALLOC_T(pool, pj_ioqueue_t); + + /* Create and init ioqueue mutex */ + rc = pj_lock_create_null_mutex(pool, "ioq%p", &lock); + if (rc != PJ_SUCCESS) + return rc; + + rc = pj_ioqueue_set_lock(ioq, lock, PJ_TRUE); + if (rc != PJ_SUCCESS) + return rc; + + PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioq)); + *p_ioqueue = ioq; return PJ_SUCCESS; } @@ -155,66 +209,22 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, */ PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioq ) { - PJ_UNUSED_ARG(ioq); - return PJ_SUCCESS; + return ioqueue_destroy(ioq); } -/* - * Set the lock object to be used by the I/O Queue. - */ -PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioq, - pj_lock_t *lock, - pj_bool_t auto_delete ) -{ - /* Don't really need lock for now */ - PJ_UNUSED_ARG(ioq); - - if (auto_delete) { - pj_lock_destroy(lock); - } - - return PJ_SUCCESS; -} - -PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue, - pj_bool_t allow) -{ - /* Not supported, just return PJ_SUCCESS silently */ - PJ_UNUSED_ARG(ioqueue); - PJ_UNUSED_ARG(allow); - return PJ_SUCCESS; -} - /* * Register a socket to the I/O queue framework. */ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, - pj_ioqueue_t *ioq, + pj_ioqueue_t *ioqueue, pj_sock_t sock, void *user_data, const pj_ioqueue_callback *cb, pj_ioqueue_key_t **p_key ) { - PJ_UNUSED_ARG(ioq); - - pj_ioqueue_key_t *key; - - key = PJ_POOL_ZALLOC_T(pool, pj_ioqueue_key_t); - key->sock = sock; - key->user_data = user_data; - pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); - - PjUwpSocket *s = (PjUwpSocket*)sock; - s->is_blocking = PJ_FALSE; - s->user_data = key; - s->on_read = &on_read; - s->on_write = &on_write; - s->on_accept = &on_accept; - s->on_connect = &on_connect; - - *p_key = key; - return PJ_SUCCESS; + return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data, + cb, p_key); } PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool, @@ -225,9 +235,42 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool, const pj_ioqueue_callback *cb, pj_ioqueue_key_t **p_key) { - PJ_UNUSED_ARG(grp_lock); + PjUwpSocketCallback uwp_cb = + { &on_read, &on_write, &on_accept, &on_connect }; + pj_ioqueue_key_t *key; + pj_status_t rc; + + pj_lock_acquire(ioqueue->lock); + + key = PJ_POOL_ZALLOC_T(pool, pj_ioqueue_key_t); + rc = ioqueue_init_key(pool, ioqueue, key, sock, grp_lock, user_data, cb); + if (rc != PJ_SUCCESS) { + key = NULL; + goto on_return; + } + + /* Create ioqueue key lock, if not yet */ + if (!key->lock) { + rc = pj_lock_create_simple_mutex(pool, NULL, &key->lock); + if (rc != PJ_SUCCESS) { + key = NULL; + goto on_return; + } + } + + PjUwpSocket *s = (PjUwpSocket*)sock; + s->SetNonBlocking(&uwp_cb, key); + +on_return: + if (rc != PJ_SUCCESS) { + if (key && key->grp_lock) + pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0); + } + *p_key = key; + pj_lock_release(ioqueue->lock); + + return rc; - return pj_ioqueue_register_sock(pool, ioqueue, sock, user_data, cb, p_key); } /* @@ -235,214 +278,59 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool, */ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) { - if (key == NULL || key->sock == NULL) - return PJ_SUCCESS; + pj_ioqueue_t *ioqueue; - if (key->sock) - pj_sock_close(key->sock); - key->sock = NULL; + PJ_ASSERT_RETURN(key, PJ_EINVAL); + + ioqueue = key->ioqueue; + + /* Lock the key to make sure no callback is simultaneously modifying + * the key. We need to lock the key before ioqueue here to prevent + * deadlock. + */ + pj_ioqueue_lock_key(key); + + /* Also lock ioqueue */ + pj_lock_acquire(ioqueue->lock); + + /* Close socket. */ + pj_sock_close(key->fd); + + /* Clear callback */ + key->cb.on_accept_complete = NULL; + key->cb.on_connect_complete = NULL; + key->cb.on_read_complete = NULL; + key->cb.on_write_complete = NULL; + + pj_lock_release(ioqueue->lock); + + if (key->grp_lock) { + pj_grp_lock_t *grp_lock = key->grp_lock; + pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0); + pj_grp_lock_release(grp_lock); + } else { + pj_ioqueue_unlock_key(key); + } + + pj_lock_destroy(key->lock); return PJ_SUCCESS; } -/* - * Get user data associated with an ioqueue key. - */ -PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) -{ - return key->user_data; -} - - -/* - * Set or change the user data to be associated with the file descriptor or - * handle or socket descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, - void *user_data, - void **old_data) -{ - if (old_data) - *old_data = key->user_data; - key->user_data= user_data; - - return PJ_SUCCESS; -} - - -/* - * Initialize operation key. - */ -PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key, - pj_size_t size ) -{ - pj_bzero(op_key, size); -} - - -/* - * Check if operation is pending on the specified operation key. - */ -PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key ) -{ - PJ_UNUSED_ARG(key); - PJ_UNUSED_ARG(op_key); - return PJ_FALSE; -} - - -/* - * Post completion status to the specified operation key and call the - * appropriate callback. - */ -PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - pj_ssize_t bytes_status ) -{ - PJ_UNUSED_ARG(key); - PJ_UNUSED_ARG(op_key); - PJ_UNUSED_ARG(bytes_status); - return PJ_ENOTSUP; -} - - -#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 -/** - * Instruct I/O Queue to accept incoming connection on the specified - * listening socket. - */ -PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - pj_sock_t *new_sock, - pj_sockaddr_t *local, - pj_sockaddr_t *remote, - int *addrlen ) -{ - PJ_UNUSED_ARG(new_sock); - PJ_UNUSED_ARG(local); - PJ_UNUSED_ARG(remote); - PJ_UNUSED_ARG(addrlen); - - PjUwpSocket *s = (PjUwpSocket*)key->sock; - s->accept_userdata = op_key; - return pj_sock_listen(key->sock, 0); -} - - -/* - * Initiate non-blocking socket connect. - */ -PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, - const pj_sockaddr_t *addr, - int addrlen ) -{ - return pj_sock_connect(key->sock, addr, addrlen); -} - - -#endif /* PJ_HAS_TCP */ - /* * Poll the I/O Queue for completed events. */ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioq, const pj_time_val *timeout) { - /* Polling is not necessary on uwp, since all async activities - * are registered to active scheduler. + /* Polling is not necessary on UWP, since each socket handles + * its events already. */ PJ_UNUSED_ARG(ioq); - PJ_UNUSED_ARG(timeout); + + pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout)); + return 0; } - -/* - * Instruct the I/O Queue to read from the specified handle. - */ -PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - void *buffer, - pj_ssize_t *length, - pj_uint32_t flags ) -{ - PjUwpSocket *s = (PjUwpSocket*)key->sock; - s->read_userdata = op_key; - return pj_sock_recv(key->sock, buffer, length, flags); -} - - -/* - * This function behaves similarly as #pj_ioqueue_recv(), except that it is - * normally called for socket, and the remote address will also be returned - * along with the data. - */ -PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - void *buffer, - pj_ssize_t *length, - pj_uint32_t flags, - pj_sockaddr_t *addr, - int *addrlen) -{ - PjUwpSocket *s = (PjUwpSocket*)key->sock; - s->read_userdata = op_key; - return pj_sock_recvfrom(key->sock, buffer, length, flags, addr, addrlen); -} - - -/* - * Instruct the I/O Queue to write to the handle. - */ -PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - const void *data, - pj_ssize_t *length, - pj_uint32_t flags ) -{ - PjUwpSocket *s = (PjUwpSocket*)key->sock; - s->write_userdata = op_key; - return pj_sock_send(key->sock, data, length, flags); -} - - -/* - * Instruct the I/O Queue to write to the handle. - */ -PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - const void *data, - pj_ssize_t *length, - pj_uint32_t flags, - const pj_sockaddr_t *addr, - int addrlen) -{ - PjUwpSocket *s = (PjUwpSocket*)key->sock; - s->write_userdata = op_key; - return pj_sock_sendto(key->sock, data, length, flags, addr, addrlen); -} - -PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, - pj_bool_t allow) -{ - /* Not supported, just return PJ_SUCCESS silently */ - PJ_UNUSED_ARG(key); - PJ_UNUSED_ARG(allow); - return PJ_SUCCESS; -} - -PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) -{ - /* Not supported, just return PJ_SUCCESS silently */ - PJ_UNUSED_ARG(key); - return PJ_SUCCESS; -} - -PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) -{ - /* Not supported, just return PJ_SUCCESS silently */ - PJ_UNUSED_ARG(key); - return PJ_SUCCESS; -} diff --git a/pjlib/src/pj/sock_uwp.cpp b/pjlib/src/pj/sock_uwp.cpp index e2b8e12e6..b7bbd077e 100644 --- a/pjlib/src/pj/sock_uwp.cpp +++ b/pjlib/src/pj/sock_uwp.cpp @@ -16,23 +16,19 @@ * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include -#include #include #include #include #include -#include -#include #include #include #include -#include "sock_uwp.h" - #define THIS_FILE "sock_uwp.cpp" +#include "sock_uwp.h" + /* * Address families conversion. * The values here are indexed based on pj_addr_family. @@ -200,24 +196,18 @@ internal: DatagramSocketMessageReceivedEventArgs ^args) { try { + if (uwp_sock->sock_state >= SOCKSTATE_DISCONNECTED) + return; + recv_args = args; avail_data_len = args->GetDataReader()->UnconsumedBufferLength; - // Notify application asynchronously - concurrency::create_task([this]() - { - if (uwp_sock->on_read) { - if (!pj_thread_is_registered()) - pj_thread_register("MsgReceive", thread_desc, - &rec_thread); - - (tp)(*uwp_sock->read_userdata) - (*uwp_sock->on_read)(uwp_sock, avail_data_len); - } - }); + if (uwp_sock->cb.on_read) { + (*uwp_sock->cb.on_read)(uwp_sock, avail_data_len); + } WaitForSingleObjectEx(recv_wait, INFINITE, false); - } catch (Exception^ e) {} + } catch (...) {} } pj_status_t ReadDataIfAvailable(void *buf, pj_ssize_t *len, @@ -264,8 +254,6 @@ private: EventRegistrationToken event_token; HANDLE recv_wait; int avail_data_len; - pj_thread_desc thread_desc; - pj_thread_t *rec_thread; }; @@ -287,23 +275,15 @@ internal: try { conn_args = args; - // Notify application asynchronously - concurrency::create_task([this]() - { - if (uwp_sock->on_accept) { - if (!pj_thread_is_registered()) - pj_thread_register("ConnReceive", thread_desc, - &listener_thread); - - (*uwp_sock->on_accept)(uwp_sock, PJ_SUCCESS); - } - }); + if (uwp_sock->cb.on_accept) { + (*uwp_sock->cb.on_accept)(uwp_sock); + } WaitForSingleObjectEx(conn_wait, INFINITE, false); } catch (Exception^ e) {} } - pj_status_t GetAcceptedSocket(StreamSocket^ stream_sock) + pj_status_t GetAcceptedSocket(StreamSocket^& stream_sock) { if (conn_args == nullptr) return PJ_ENOTFOUND; @@ -332,23 +312,26 @@ private: StreamSocketListenerConnectionReceivedEventArgs^ conn_args; EventRegistrationToken event_token; HANDLE conn_wait; - - pj_thread_desc thread_desc; - pj_thread_t *listener_thread; }; PjUwpSocket::PjUwpSocket(int af_, int type_, int proto_) : af(af_), type(type_), proto(proto_), - sock_type(SOCKTYPE_UNKNOWN), sock_state(SOCKSTATE_NULL), - is_blocking(PJ_TRUE), is_busy_sending(PJ_FALSE) + sock_type(SOCKTYPE_UNKNOWN), + sock_state(SOCKSTATE_NULL), + is_blocking(PJ_TRUE), + has_pending_bind(PJ_FALSE), + has_pending_send(PJ_FALSE), + has_pending_recv(PJ_FALSE) { pj_sockaddr_init(pj_AF_INET(), &local_addr, NULL, 0); pj_sockaddr_init(pj_AF_INET(), &remote_addr, NULL, 0); } PjUwpSocket::~PjUwpSocket() -{} +{ + DeinitSocket(); +} PjUwpSocket* PjUwpSocket::CreateAcceptSocket(Windows::Networking::Sockets::StreamSocket^ stream_sock_) { @@ -358,6 +341,7 @@ PjUwpSocket* PjUwpSocket::CreateAcceptSocket(Windows::Networking::Sockets::Strea new_sock->sock_state = SOCKSTATE_CONNECTED; new_sock->socket_reader = ref new DataReader(new_sock->stream_sock->InputStream); new_sock->socket_writer = ref new DataWriter(new_sock->stream_sock->OutputStream); + new_sock->socket_reader->InputStreamOptions = InputStreamOptions::Partial; new_sock->send_buffer = ref new Buffer(SEND_BUFFER_SIZE); new_sock->is_blocking = is_blocking; @@ -399,6 +383,492 @@ pj_status_t PjUwpSocket::InitSocket(enum PjUwpSocketType sock_type_) } +void PjUwpSocket::DeinitSocket() +{ + if (stream_sock) { + concurrency::create_task(stream_sock->CancelIOAsync()).wait(); + } + if (datagram_sock) { + concurrency::create_task(datagram_sock->CancelIOAsync()).wait(); + } + if (listener_sock) { + concurrency::create_task(listener_sock->CancelIOAsync()).wait(); + } + stream_sock = nullptr; + datagram_sock = nullptr; + dgram_recv_helper = nullptr; + listener_sock = nullptr; + listener_helper = nullptr; + socket_writer = nullptr; + socket_reader = nullptr; + sock_state = SOCKSTATE_NULL; +} + +pj_status_t PjUwpSocket::Bind(const pj_sockaddr_t *addr) +{ + /* Not initialized yet, socket type is perhaps TCP, just not decided yet + * whether it is a stream or a listener. + */ + if (sock_state < SOCKSTATE_INITIALIZED) { + pj_sockaddr_cp(&local_addr, addr); + has_pending_bind = PJ_TRUE; + return PJ_SUCCESS; + } + + PJ_ASSERT_RETURN(sock_state == SOCKSTATE_INITIALIZED, PJ_EINVALIDOP); + if (sock_type != SOCKTYPE_DATAGRAM && sock_type != SOCKTYPE_LISTENER) + return PJ_EINVALIDOP; + + if (has_pending_bind) { + has_pending_bind = PJ_FALSE; + if (!addr) + addr = &local_addr; + } + + /* If no bound address is set, just return */ + if (!pj_sockaddr_has_addr(addr) && !pj_sockaddr_get_port(addr)) + return PJ_SUCCESS; + + if (addr != &local_addr) + pj_sockaddr_cp(&local_addr, addr); + + HRESULT err = 0; + try { + concurrency::create_task([this, addr]() { + HostName ^host; + int port; + sockaddr_to_hostname_port(addr, host, &port); + if (pj_sockaddr_has_addr(addr)) { + if (sock_type == SOCKTYPE_DATAGRAM) + return datagram_sock->BindEndpointAsync(host, port.ToString()); + else + return listener_sock->BindEndpointAsync(host, port.ToString()); + } else /* if (pj_sockaddr_get_port(addr) != 0) */ { + if (sock_type == SOCKTYPE_DATAGRAM) + return datagram_sock->BindServiceNameAsync(port.ToString()); + else + return listener_sock->BindServiceNameAsync(port.ToString()); + } + }).then([this, &err](concurrency::task t) + { + try { + t.get(); + } catch (Exception^ e) { + err = e->HResult; + } + }).get(); + } catch (Exception^ e) { + err = e->HResult; + } + + return (err? PJ_RETURN_OS_ERROR(err) : PJ_SUCCESS); +} + + +pj_status_t PjUwpSocket::SendImp(const void *buf, pj_ssize_t *len) +{ + if (has_pending_send) + return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); + + if (*len > (pj_ssize_t)send_buffer->Capacity) + return PJ_ETOOBIG; + + CopyToIBuffer((unsigned char*)buf, *len, send_buffer); + send_buffer->Length = *len; + socket_writer->WriteBuffer(send_buffer); + + /* Blocking version */ + if (is_blocking) { + pj_status_t status = PJ_SUCCESS; + concurrency::cancellation_token_source cts; + auto cts_token = cts.get_token(); + auto t = concurrency::create_task(socket_writer->StoreAsync(), + cts_token); + *len = cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). + then([cts_token, &status](concurrency::task t_) + { + int sent = 0; + try { + if (cts_token.is_canceled()) + status = PJ_ETIMEDOUT; + else + sent = t_.get(); + } catch (Exception^ e) { + status = PJ_RETURN_OS_ERROR(e->HResult); + } + return sent; + }).get(); + + return status; + } + + /* Non-blocking version */ + has_pending_send = PJ_TRUE; + concurrency::create_task(socket_writer->StoreAsync()). + then([this](concurrency::task t_) + { + try { + unsigned int l = t_.get(); + has_pending_send = PJ_FALSE; + + // invoke callback + if (cb.on_write) { + (*cb.on_write)(this, l); + } + } catch (...) { + has_pending_send = PJ_FALSE; + sock_state = SOCKSTATE_ERROR; + DeinitSocket(); + + // invoke callback + if (cb.on_write) { + (*cb.on_write)(this, -PJ_EUNKNOWN); + } + } + }); + + return PJ_SUCCESS; +} + + +pj_status_t PjUwpSocket::Send(const void *buf, pj_ssize_t *len) +{ + if ((sock_type!=SOCKTYPE_STREAM && sock_type!=SOCKTYPE_DATAGRAM) || + (sock_state!=SOCKSTATE_CONNECTED)) + { + return PJ_EINVALIDOP; + } + + /* Sending for SOCKTYPE_DATAGRAM is implemented in pj_sock_sendto() */ + if (sock_type == SOCKTYPE_DATAGRAM) { + return SendTo(buf, len, &remote_addr); + } + + return SendImp(buf, len); +} + + +pj_status_t PjUwpSocket::SendTo(const void *buf, pj_ssize_t *len, + const pj_sockaddr_t *to) +{ + if (sock_type != SOCKTYPE_DATAGRAM || sock_state < SOCKSTATE_INITIALIZED + || sock_state >= SOCKSTATE_DISCONNECTED) + { + return PJ_EINVALIDOP; + } + + if (has_pending_send) + return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); + + if (*len > (pj_ssize_t)send_buffer->Capacity) + return PJ_ETOOBIG; + + HostName ^hostname; + int port; + sockaddr_to_hostname_port(to, hostname, &port); + + concurrency::cancellation_token_source cts; + auto cts_token = cts.get_token(); + auto t = concurrency::create_task(datagram_sock->GetOutputStreamAsync( + hostname, port.ToString()), cts_token); + pj_status_t status = PJ_SUCCESS; + + cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). + then([this, cts_token, &status](concurrency::task t_) + { + try { + if (cts_token.is_canceled()) { + status = PJ_ETIMEDOUT; + } else { + IOutputStream^ outstream = t_.get(); + socket_writer = ref new DataWriter(outstream); + } + } catch (Exception^ e) { + status = PJ_RETURN_OS_ERROR(e->HResult); + } + }).get(); + + if (status != PJ_SUCCESS) + return status; + + status = SendImp(buf, len); + if ((status == PJ_SUCCESS || status == PJ_EPENDING) && + sock_state < SOCKSTATE_CONNECTED) + { + sock_state = SOCKSTATE_CONNECTED; + } + + return status; +} + + +int PjUwpSocket::ConsumeReadBuffer(void *buf, int max_len) +{ + if (socket_reader->UnconsumedBufferLength == 0) + return 0; + + int read_len = PJ_MIN((int)socket_reader->UnconsumedBufferLength,max_len); + IBuffer^ buffer = socket_reader->ReadBuffer(read_len); + read_len = buffer->Length; + CopyFromIBuffer((unsigned char*)buf, read_len, buffer); + return read_len; +} + + +pj_status_t PjUwpSocket::Recv(void *buf, pj_ssize_t *len) +{ + /* Only for TCP, at least for now! */ + if (sock_type == SOCKTYPE_DATAGRAM) + return PJ_ENOTSUP; + + if (sock_type != SOCKTYPE_STREAM || sock_state != SOCKSTATE_CONNECTED) + return PJ_EINVALIDOP; + + if (has_pending_recv) + return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); + + /* First check if there is already some data in the read buffer */ + if (buf) { + int avail_len = ConsumeReadBuffer(buf, *len); + if (avail_len > 0) { + *len = avail_len; + return PJ_SUCCESS; + } + } + + /* Blocking version */ + if (is_blocking) { + pj_status_t status = PJ_SUCCESS; + concurrency::cancellation_token_source cts; + auto cts_token = cts.get_token(); + auto t = concurrency::create_task(socket_reader->LoadAsync(*len), + cts_token); + *len = cancel_after_timeout(t, cts, READ_TIMEOUT) + .then([this, len, buf, cts_token, &status] + (concurrency::task t_) + { + try { + if (cts_token.is_canceled()) { + status = PJ_ETIMEDOUT; + return 0; + } + t_.get(); + } catch (Exception^) { + status = PJ_ETIMEDOUT; + return 0; + } + + *len = ConsumeReadBuffer(buf, *len); + return (int)*len; + }).get(); + + return status; + } + + /* Non-blocking version */ + + has_pending_recv = PJ_TRUE; + concurrency::create_task(socket_reader->LoadAsync(*len)) + .then([this](concurrency::task t_) + { + try { + // catch any exception + t_.get(); + has_pending_recv = PJ_FALSE; + + // invoke callback + int read_len = socket_reader->UnconsumedBufferLength; + if (read_len > 0 && cb.on_read) { + (*cb.on_read)(this, read_len); + } + } catch (Exception^ e) { + has_pending_recv = PJ_FALSE; + + // invoke callback + if (cb.on_read) { + (*cb.on_read)(this, -PJ_EUNKNOWN); + } + } + }); + + return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); +} + + +pj_status_t PjUwpSocket::RecvFrom(void *buf, pj_ssize_t *len, + pj_sockaddr_t *from) +{ + if (sock_type != SOCKTYPE_DATAGRAM || sock_state < SOCKSTATE_INITIALIZED + || sock_state >= SOCKSTATE_DISCONNECTED) + { + return PJ_EINVALIDOP; + } + + /* Start receive, if not yet */ + if (dgram_recv_helper == nullptr) { + dgram_recv_helper = ref new PjUwpSocketDatagramRecvHelper(this); + } + + /* Try to read any available data first */ + if (buf || is_blocking) { + pj_status_t status; + status = dgram_recv_helper->ReadDataIfAvailable(buf, len, from); + if (status != PJ_ENOTFOUND) + return status; + } + + /* Blocking version */ + if (is_blocking) { + int max_loop = 0; + pj_status_t status = PJ_ENOTFOUND; + while (status == PJ_ENOTFOUND && sock_state <= SOCKSTATE_CONNECTED) + { + status = dgram_recv_helper->ReadDataIfAvailable(buf, len, from); + if (status != PJ_SUCCESS) + pj_thread_sleep(100); + + if (++max_loop > 10) + return PJ_ETIMEDOUT; + } + return status; + } + + /* For non-blocking version, just return PJ_EPENDING */ + return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); +} + + +pj_status_t PjUwpSocket::Connect(const pj_sockaddr_t *addr) +{ + pj_status_t status; + + PJ_ASSERT_RETURN((sock_type == SOCKTYPE_UNKNOWN && sock_state == SOCKSTATE_NULL) || + (sock_type == SOCKTYPE_DATAGRAM && sock_state == SOCKSTATE_INITIALIZED), + PJ_EINVALIDOP); + + if (sock_type == SOCKTYPE_UNKNOWN) { + InitSocket(SOCKTYPE_STREAM); + // No need to check pending bind, no bind for TCP client socket + } + + pj_sockaddr_cp(&remote_addr, addr); + + auto t = concurrency::create_task([this, addr]() + { + HostName ^hostname; + int port; + sockaddr_to_hostname_port(&remote_addr, hostname, &port); + if (sock_type == SOCKTYPE_STREAM) + return stream_sock->ConnectAsync(hostname, port.ToString(), + SocketProtectionLevel::PlainSocket); + else + return datagram_sock->ConnectAsync(hostname, port.ToString()); + }).then([=](concurrency::task t_) + { + try { + t_.get(); + + sock_state = SOCKSTATE_CONNECTED; + + // Update local & remote address + HostName^ local_address; + String^ local_port; + + if (sock_type == SOCKTYPE_STREAM) { + local_address = stream_sock->Information->LocalAddress; + local_port = stream_sock->Information->LocalPort; + + socket_reader = ref new DataReader(stream_sock->InputStream); + socket_writer = ref new DataWriter(stream_sock->OutputStream); + socket_reader->InputStreamOptions = InputStreamOptions::Partial; + } else { + local_address = datagram_sock->Information->LocalAddress; + local_port = datagram_sock->Information->LocalPort; + } + if (local_address && local_port) { + wstr_addr_to_sockaddr(local_address->CanonicalName->Data(), + local_port->Data(), + &local_addr); + } + + if (!is_blocking && cb.on_connect) { + (*cb.on_connect)(this, PJ_SUCCESS); + } + return (pj_status_t)PJ_SUCCESS; + + } catch (Exception^ ex) { + + SocketErrorStatus status = SocketError::GetStatus(ex->HResult); + + switch (status) + { + case SocketErrorStatus::UnreachableHost: + break; + case SocketErrorStatus::ConnectionTimedOut: + break; + case SocketErrorStatus::ConnectionRefused: + break; + default: + break; + } + + if (!is_blocking && cb.on_connect) { + (*cb.on_connect)(this, PJ_EUNKNOWN); + } + + return (pj_status_t)PJ_EUNKNOWN; + } + }); + + if (!is_blocking) + return PJ_RETURN_OS_ERROR(PJ_BLOCKING_CONNECT_ERROR_VAL); + + try { + status = t.get(); + } catch (Exception^) { + return PJ_EUNKNOWN; + } + return status; +} + +pj_status_t PjUwpSocket::Listen() +{ + PJ_ASSERT_RETURN((sock_type == SOCKTYPE_UNKNOWN) || + (sock_type == SOCKTYPE_LISTENER && + sock_state == SOCKSTATE_INITIALIZED), + PJ_EINVALIDOP); + + if (sock_type == SOCKTYPE_UNKNOWN) + InitSocket(SOCKTYPE_LISTENER); + + if (has_pending_bind) + Bind(); + + /* Start listen */ + if (listener_helper == nullptr) { + listener_helper = ref new PjUwpSocketListenerHelper(this); + } + + return PJ_SUCCESS; +} + +pj_status_t PjUwpSocket::Accept(PjUwpSocket **new_sock) +{ + if (sock_type != SOCKTYPE_LISTENER || sock_state != SOCKSTATE_INITIALIZED) + return PJ_EINVALIDOP; + + StreamSocket^ accepted_sock; + pj_status_t status = listener_helper->GetAcceptedSocket(accepted_sock); + if (status == PJ_ENOTFOUND) + return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); + + if (status != PJ_SUCCESS) + return status; + + *new_sock = CreateAcceptSocket(accepted_sock); + return PJ_SUCCESS; +} + ///////////////////////////////////////////////////////////////////////////// // @@ -717,48 +1187,8 @@ PJ_DEF(pj_status_t) pj_sock_bind( pj_sock_t sock, PJ_CHECK_STACK(); PJ_ASSERT_RETURN(sock, PJ_EINVAL); PJ_ASSERT_RETURN(addr && len>=(int)sizeof(pj_sockaddr_in), PJ_EINVAL); - PjUwpSocket *s = (PjUwpSocket*)sock; - - if (s->sock_state > SOCKSTATE_INITIALIZED) - return PJ_EINVALIDOP; - - pj_sockaddr_cp(&s->local_addr, addr); - - /* Bind now if this is UDP. But if it is TCP, unfortunately we don't - * know yet whether it is SocketStream or Listener! - */ - if (s->type == pj_SOCK_DGRAM()) { - HRESULT err = 0; - - try { - concurrency::create_task([s, addr]() { - HostName ^hostname; - int port; - sockaddr_to_hostname_port(addr, hostname, &port); - if (pj_sockaddr_has_addr(addr)) { - s->datagram_sock->BindEndpointAsync(hostname, - port.ToString()); - } else if (pj_sockaddr_get_port(addr) != 0) { - s->datagram_sock->BindServiceNameAsync(port.ToString()); - } - }).then([s, &err](concurrency::task t) - { - try { - t.get(); - s->sock_state = SOCKSTATE_CONNECTED; - } catch (Exception^ e) { - err = e->HResult; - } - }).get(); - } catch (Exception^ e) { - err = e->HResult; - } - - return (err? PJ_RETURN_OS_ERROR(err) : PJ_SUCCESS); - } - - return PJ_SUCCESS; + return s->Bind(addr); } @@ -811,9 +1241,8 @@ PJ_DEF(pj_status_t) pj_sock_getpeername( pj_sock_t sock, *namelen>=(int)sizeof(pj_sockaddr_in), PJ_EINVAL); PjUwpSocket *s = (PjUwpSocket*)sock; - - pj_sockaddr_cp(addr, &s->remote_addr); - *namelen = pj_sockaddr_get_len(&s->remote_addr); + pj_sockaddr_cp(addr, s->GetRemoteAddr()); + *namelen = pj_sockaddr_get_len(addr); return PJ_SUCCESS; } @@ -830,78 +1259,13 @@ PJ_DEF(pj_status_t) pj_sock_getsockname( pj_sock_t sock, *namelen>=(int)sizeof(pj_sockaddr_in), PJ_EINVAL); PjUwpSocket *s = (PjUwpSocket*)sock; - - pj_sockaddr_cp(addr, &s->local_addr); - *namelen = pj_sockaddr_get_len(&s->local_addr); + pj_sockaddr_cp(addr, s->GetLocalAddr()); + *namelen = pj_sockaddr_get_len(addr); return PJ_SUCCESS; } -static pj_status_t sock_send_imp(PjUwpSocket *s, const void *buf, - pj_ssize_t *len) -{ - if (s->is_busy_sending) - return PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK); - - if (*len > (pj_ssize_t)s->send_buffer->Capacity) - return PJ_ETOOBIG; - - CopyToIBuffer((unsigned char*)buf, *len, s->send_buffer); - s->send_buffer->Length = *len; - s->socket_writer->WriteBuffer(s->send_buffer); - - if (s->is_blocking) { - pj_status_t status = PJ_SUCCESS; - concurrency::cancellation_token_source cts; - auto cts_token = cts.get_token(); - auto t = concurrency::create_task(s->socket_writer->StoreAsync(), - cts_token); - *len = cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). - then([cts_token, &status](concurrency::task t_) - { - int sent = 0; - try { - if (cts_token.is_canceled()) - status = PJ_ETIMEDOUT; - else - sent = t_.get(); - } catch (Exception^ e) { - status = PJ_RETURN_OS_ERROR(e->HResult); - } - return sent; - }).get(); - - return status; - } - - s->is_busy_sending = true; - concurrency::create_task(s->socket_writer->StoreAsync()). - then([s](concurrency::task t_) - { - try { - unsigned int l = t_.get(); - s->is_busy_sending = false; - - // invoke callback - if (s->on_write) { - (*s->on_write)(s, l); - } - } catch (Exception^ e) { - s->is_busy_sending = false; - if (s->sock_type == SOCKTYPE_STREAM) - s->sock_state = SOCKSTATE_DISCONNECTED; - - // invoke callback - if (s->on_write) { - (*s->on_write)(s, -PJ_RETURN_OS_ERROR(e->HResult)); - } - } - }); - - return PJ_EPENDING; -} - /* * Send data */ @@ -915,20 +1279,7 @@ PJ_DEF(pj_status_t) pj_sock_send(pj_sock_t sock, PJ_UNUSED_ARG(flags); PjUwpSocket *s = (PjUwpSocket*)sock; - - if ((s->sock_type!=SOCKTYPE_STREAM && s->sock_type!=SOCKTYPE_DATAGRAM) || - (s->sock_state!=SOCKSTATE_CONNECTED)) - { - return PJ_EINVALIDOP; - } - - /* Sending for SOCKTYPE_DATAGRAM is implemented in pj_sock_sendto() */ - if (s->sock_type == SOCKTYPE_DATAGRAM) { - return pj_sock_sendto(sock, buf, len, flags, &s->remote_addr, - pj_sockaddr_get_len(&s->remote_addr)); - } - - return sock_send_imp(s, buf, len); + return s->Send(buf, len); } @@ -948,71 +1299,7 @@ PJ_DEF(pj_status_t) pj_sock_sendto(pj_sock_t sock, PJ_UNUSED_ARG(tolen); PjUwpSocket *s = (PjUwpSocket*)sock; - - if (s->sock_type != SOCKTYPE_DATAGRAM || - s->sock_state < SOCKSTATE_INITIALIZED) - { - return PJ_EINVALIDOP; - } - - if (s->is_busy_sending) - return PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK); - - if (*len > (pj_ssize_t)s->send_buffer->Capacity) - return PJ_ETOOBIG; - - HostName ^hostname; - int port; - sockaddr_to_hostname_port(to, hostname, &port); - - concurrency::cancellation_token_source cts; - auto cts_token = cts.get_token(); - auto t = concurrency::create_task( - s->datagram_sock->GetOutputStreamAsync( - hostname, port.ToString()), cts_token); - pj_status_t status = PJ_SUCCESS; - - cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). - then([s, cts_token, &status](concurrency::task t_) - { - try { - if (cts_token.is_canceled()) { - status = PJ_ETIMEDOUT; - } else { - IOutputStream^ outstream = t_.get(); - s->socket_writer = ref new DataWriter(outstream); - } - } catch (Exception^ e) { - status = PJ_RETURN_OS_ERROR(e->HResult); - } - }).get(); - - if (status != PJ_SUCCESS) - return status; - - status = sock_send_imp(s, buf, len); - - if ((status == PJ_SUCCESS || status == PJ_EPENDING) && - s->sock_state < SOCKSTATE_CONNECTED) - { - s->sock_state = SOCKSTATE_CONNECTED; - } - - return status; -} - - -static int consume_read_buffer(PjUwpSocket *s, void *buf, int max_len) -{ - if (s->socket_reader->UnconsumedBufferLength == 0) - return 0; - - int read_len = PJ_MIN((int)s->socket_reader->UnconsumedBufferLength, - max_len); - IBuffer^ buffer = s->socket_reader->ReadBuffer(read_len); - read_len = buffer->Length; - CopyFromIBuffer((unsigned char*)buf, read_len, buffer); - return read_len; + return s->SendTo(buf, len, to); } @@ -1030,78 +1317,7 @@ PJ_DEF(pj_status_t) pj_sock_recv(pj_sock_t sock, PJ_UNUSED_ARG(flags); PjUwpSocket *s = (PjUwpSocket*)sock; - - /* Only for TCP, at least for now! */ - if (s->sock_type == SOCKTYPE_DATAGRAM) - return PJ_ENOTSUP; - - if (s->sock_type != SOCKTYPE_STREAM || - s->sock_state != SOCKSTATE_CONNECTED) - { - return PJ_EINVALIDOP; - } - - /* First check if there is already some data in the read buffer */ - int avail_len = consume_read_buffer(s, buf, *len); - if (avail_len > 0) { - *len = avail_len; - return PJ_SUCCESS; - } - - /* Start sync read */ - if (s->is_blocking) { - pj_status_t status = PJ_SUCCESS; - concurrency::cancellation_token_source cts; - auto cts_token = cts.get_token(); - auto t = concurrency::create_task(s->socket_reader->LoadAsync(*len), cts_token); - *len = cancel_after_timeout(t, cts, READ_TIMEOUT) - .then([s, len, buf, cts_token, &status](concurrency::task t_) - { - try { - if (cts_token.is_canceled()) { - status = PJ_ETIMEDOUT; - return 0; - } - t_.get(); - } catch (Exception^) { - status = PJ_ETIMEDOUT; - return 0; - } - - *len = consume_read_buffer(s, buf, *len); - return (int)*len; - }).get(); - - return status; - } - - /* Start async read */ - int read_len = *len; - concurrency::create_task(s->socket_reader->LoadAsync(read_len)) - .then([s, &read_len](concurrency::task t_) - { - try { - // catch any exception - t_.get(); - - // invoke callback - read_len = PJ_MIN((int)s->socket_reader->UnconsumedBufferLength, - read_len); - if (read_len > 0 && s->on_read) { - (*s->on_read)(s, read_len); - } - } catch (Exception^ e) { - // invoke callback - if (s->on_read) { - (*s->on_read)(s, -PJ_RETURN_OS_ERROR(e->HResult)); - } - return 0; - } - - return (int)read_len; - }); - - return PJ_EPENDING; + return s->Recv(buf, len); } /* @@ -1122,37 +1338,10 @@ PJ_DEF(pj_status_t) pj_sock_recvfrom(pj_sock_t sock, PJ_UNUSED_ARG(flags); PjUwpSocket *s = (PjUwpSocket*)sock; - - if (s->sock_type != SOCKTYPE_DATAGRAM || - s->sock_state < SOCKSTATE_INITIALIZED) - { - return PJ_EINVALIDOP; - } - - /* Start receive, if not yet */ - if (s->datagram_recv_helper == nullptr) { - s->datagram_recv_helper = ref new PjUwpSocketDatagramRecvHelper(s); - } - - /* Try to read any available data first */ - pj_status_t status = s->datagram_recv_helper-> - ReadDataIfAvailable(buf, len, from); - if (status != PJ_ENOTFOUND) - return status; - - /* Start sync read */ - if (s->is_blocking) { - while (status == PJ_ENOTFOUND && s->sock_state <= SOCKSTATE_CONNECTED) - { - status = s->datagram_recv_helper-> - ReadDataIfAvailable(buf, len, from); - pj_thread_sleep(100); - } - return PJ_SUCCESS; - } - - /* For async read, just return PJ_EPENDING */ - return PJ_EPENDING; + pj_status_t status = s->RecvFrom(buf, len, from); + if (status == PJ_SUCCESS) + *fromlen = pj_sockaddr_get_len(from); + return status; } /* @@ -1221,41 +1410,6 @@ PJ_DEF(pj_status_t) pj_sock_setsockopt_params( pj_sock_t sockfd, return retval; } -static pj_status_t tcp_bind(PjUwpSocket *s) -{ - /* If no bound address is set, just return */ - if (!pj_sockaddr_has_addr(&s->local_addr) && - pj_sockaddr_get_port(&s->local_addr)==0) - { - return PJ_SUCCESS; - } - - HRESULT err = 0; - - try { - concurrency::create_task([s]() { - HostName ^hostname; - int port; - sockaddr_to_hostname_port(&s->local_addr, hostname, &port); - - s->listener_sock->BindEndpointAsync(hostname, - port.ToString()); - }).then([s, &err](concurrency::task t) - { - try { - t.get(); - s->sock_state = SOCKSTATE_CONNECTED; - } catch (Exception^ e) { - err = e->HResult; - } - }).get(); - } catch (Exception^ e) { - err = e->HResult; - } - - return (err? PJ_RETURN_OS_ERROR(err) : PJ_SUCCESS); -} - /* * Connect socket. @@ -1266,131 +1420,10 @@ PJ_DEF(pj_status_t) pj_sock_connect( pj_sock_t sock, { PJ_CHECK_STACK(); PJ_ASSERT_RETURN(sock && addr, PJ_EINVAL); - PJ_UNUSED_ARG(namelen); PjUwpSocket *s = (PjUwpSocket*)sock; - pj_status_t status; - - pj_sockaddr_cp(&s->remote_addr, addr); - - /* UDP */ - - if (s->sock_type == SOCKTYPE_DATAGRAM) { - if (s->sock_state != SOCKSTATE_INITIALIZED) - return PJ_EINVALIDOP; - - HostName ^hostname; - int port; - sockaddr_to_hostname_port(addr, hostname, &port); - - try { - concurrency::create_task(s->datagram_sock->ConnectAsync - (hostname, port.ToString())) - .then([s](concurrency::task t_) - { - try { - t_.get(); - } catch (Exception^ ex) - { - - } - }).get(); - } catch (Exception^) { - return PJ_EUNKNOWN; - } - - // Update local & remote address - wstr_addr_to_sockaddr(s->datagram_sock->Information->RemoteAddress->CanonicalName->Data(), - s->datagram_sock->Information->RemotePort->Data(), - &s->remote_addr); - wstr_addr_to_sockaddr(s->datagram_sock->Information->LocalAddress->CanonicalName->Data(), - s->datagram_sock->Information->LocalPort->Data(), - &s->local_addr); - - s->sock_state = SOCKSTATE_CONNECTED; - - return PJ_SUCCESS; - } - - /* TCP */ - - /* Init stream socket now */ - s->InitSocket(SOCKTYPE_STREAM); - - pj_sockaddr_cp(&s->remote_addr, addr); - wstr_addr_to_sockaddr(s->stream_sock->Information->LocalAddress->CanonicalName->Data(), - s->stream_sock->Information->LocalPort->Data(), - &s->local_addr); - - /* Perform any pending bind */ - status = tcp_bind(s); - if (status != PJ_SUCCESS) - return status; - - char tmp[PJ_INET6_ADDRSTRLEN]; - wchar_t wtmp[PJ_INET6_ADDRSTRLEN]; - pj_sockaddr_print(addr, tmp, PJ_INET6_ADDRSTRLEN, 0); - pj_ansi_to_unicode(tmp, pj_ansi_strlen(tmp), wtmp, - PJ_INET6_ADDRSTRLEN); - auto host = ref new HostName(ref new String(wtmp)); - int port = pj_sockaddr_get_port(addr); - - auto t = concurrency::create_task(s->stream_sock->ConnectAsync - (host, port.ToString(), SocketProtectionLevel::PlainSocket)) - .then([=](concurrency::task t_) - { - try { - t_.get(); - s->socket_reader = ref new DataReader(s->stream_sock->InputStream); - s->socket_writer = ref new DataWriter(s->stream_sock->OutputStream); - - // Update local & remote address - wstr_addr_to_sockaddr(s->stream_sock->Information->RemoteAddress->CanonicalName->Data(), - s->stream_sock->Information->RemotePort->Data(), - &s->remote_addr); - wstr_addr_to_sockaddr(s->stream_sock->Information->LocalAddress->CanonicalName->Data(), - s->stream_sock->Information->LocalPort->Data(), - &s->local_addr); - - s->sock_state = SOCKSTATE_CONNECTED; - - if (!s->is_blocking && s->on_connect) { - (*s->on_connect)(s, PJ_SUCCESS); - } - return (pj_status_t)PJ_SUCCESS; - } catch (Exception^ ex) { - SocketErrorStatus status = SocketError::GetStatus(ex->HResult); - - switch (status) - { - case SocketErrorStatus::UnreachableHost: - break; - case SocketErrorStatus::ConnectionTimedOut: - break; - case SocketErrorStatus::ConnectionRefused: - break; - default: - break; - } - - if (!s->is_blocking && s->on_connect) { - (*s->on_connect)(s, PJ_EUNKNOWN); - } - - return (pj_status_t)PJ_EUNKNOWN; - } - }); - - if (!s->is_blocking) - return PJ_EPENDING; - - try { - status = t.get(); - } catch (Exception^) { - return PJ_EUNKNOWN; - } - return status; + return s->Connect(addr); } @@ -1419,22 +1452,7 @@ PJ_DEF(pj_status_t) pj_sock_listen( pj_sock_t sock, PJ_ASSERT_RETURN(sock, PJ_EINVAL); PjUwpSocket *s = (PjUwpSocket*)sock; - pj_status_t status; - - /* Init listener socket now */ - s->InitSocket(SOCKTYPE_LISTENER); - - /* Perform any pending bind */ - status = tcp_bind(s); - if (status != PJ_SUCCESS) - return status; - - /* Start listen */ - if (s->listener_helper == nullptr) { - s->listener_helper = ref new PjUwpSocketListenerHelper(s); - } - - return PJ_SUCCESS; + return s->Listen(); } /* @@ -1445,31 +1463,23 @@ PJ_DEF(pj_status_t) pj_sock_accept( pj_sock_t serverfd, pj_sockaddr_t *addr, int *addrlen) { - PJ_CHECK_STACK(); + pj_status_t status; + PJ_CHECK_STACK(); PJ_ASSERT_RETURN(serverfd && newsock, PJ_EINVAL); PjUwpSocket *s = (PjUwpSocket*)serverfd; + PjUwpSocket *new_uwp_sock; - if (s->sock_type != SOCKTYPE_LISTENER || - s->sock_state != SOCKSTATE_INITIALIZED) - { - return PJ_EINVALIDOP; - } - - StreamSocket^ accepted_sock; - pj_status_t status = s->listener_helper->GetAcceptedSocket(accepted_sock); - if (status == PJ_ENOTFOUND) - return PJ_EPENDING; - + status = s->Accept(&new_uwp_sock); if (status != PJ_SUCCESS) return status; + if (newsock == NULL) + return PJ_ENOTFOUND; - PjUwpSocket *new_sock = s->CreateAcceptSocket(accepted_sock); - - pj_sockaddr_cp(addr, &new_sock->remote_addr); - *addrlen = pj_sockaddr_get_len(&new_sock->remote_addr); - *newsock = (pj_sock_t)new_sock; + *newsock = (pj_sock_t)new_uwp_sock; + pj_sockaddr_cp(addr, new_uwp_sock->GetRemoteAddr()); + *addrlen = pj_sockaddr_get_len(addr); return PJ_SUCCESS; } diff --git a/pjlib/src/pj/sock_uwp.h b/pjlib/src/pj/sock_uwp.h index 846002a0f..6647f3acb 100644 --- a/pjlib/src/pj/sock_uwp.h +++ b/pjlib/src/pj/sock_uwp.h @@ -18,6 +18,13 @@ */ #pragma once + +#include +#include +#include +#include + + enum { READ_TIMEOUT = 60 * 1000, WRITE_TIMEOUT = 60 * 1000, @@ -36,6 +43,17 @@ enum PjUwpSocketState { ref class PjUwpSocketDatagramRecvHelper; ref class PjUwpSocketListenerHelper; +class PjUwpSocket; + + +typedef struct PjUwpSocketCallback +{ + void (*on_read)(PjUwpSocket *s, int bytes_read); + void (*on_write)(PjUwpSocket *s, int bytes_sent); + void (*on_accept)(PjUwpSocket *s); + void (*on_connect)(PjUwpSocket *s, pj_status_t status); +} PjUwpSocketCallback; + /* * UWP Socket Wrapper. @@ -44,19 +62,55 @@ class PjUwpSocket { public: PjUwpSocket(int af_, int type_, int proto_); - PjUwpSocket* CreateAcceptSocket(Windows::Networking::Sockets::StreamSocket^ stream_sock_); virtual ~PjUwpSocket(); - pj_status_t InitSocket(enum PjUwpSocketType sock_type_); + void DeinitSocket(); + + void* GetUserData() { return user_data; } + void SetNonBlocking(const PjUwpSocketCallback *cb_, void *user_data_) + { + is_blocking = PJ_FALSE; + cb=*cb_; + user_data = user_data_; + } + + enum PjUwpSocketType GetType() { return sock_type; } + enum PjUwpSocketState GetState() { return sock_state; } + + pj_sockaddr* GetLocalAddr() { return &local_addr; } + pj_sockaddr* GetRemoteAddr() { return &remote_addr; } + + + pj_status_t Bind(const pj_sockaddr_t *addr = NULL); + pj_status_t Send(const void *buf, pj_ssize_t *len); + pj_status_t SendTo(const void *buf, pj_ssize_t *len, const pj_sockaddr_t *to); + pj_status_t Recv(void *buf, pj_ssize_t *len); + pj_status_t RecvFrom(void *buf, pj_ssize_t *len, pj_sockaddr_t *from); + pj_status_t Connect(const pj_sockaddr_t *addr); + pj_status_t Listen(); + pj_status_t Accept(PjUwpSocket **new_sock); + + void (*on_read)(PjUwpSocket *s, int bytes_read); + void (*on_write)(PjUwpSocket *s, int bytes_sent); + void (*on_accept)(PjUwpSocket *s, pj_status_t status); + void (*on_connect)(PjUwpSocket *s, pj_status_t status); + +private: + PjUwpSocket* CreateAcceptSocket(Windows::Networking::Sockets::StreamSocket^ stream_sock_); + pj_status_t SendImp(const void *buf, pj_ssize_t *len); + int ConsumeReadBuffer(void *buf, int max_len); -public: int af; int type; int proto; pj_sockaddr local_addr; pj_sockaddr remote_addr; pj_bool_t is_blocking; + pj_bool_t has_pending_bind; + pj_bool_t has_pending_send; + pj_bool_t has_pending_recv; void *user_data; + PjUwpSocketCallback cb; enum PjUwpSocketType sock_type; enum PjUwpSocketState sock_state; @@ -65,22 +119,15 @@ public: Windows::Networking::Sockets::StreamSocketListener^ listener_sock; /* Helper objects */ - PjUwpSocketDatagramRecvHelper^ datagram_recv_helper; + PjUwpSocketDatagramRecvHelper^ dgram_recv_helper; PjUwpSocketListenerHelper^ listener_helper; Windows::Storage::Streams::DataReader^ socket_reader; Windows::Storage::Streams::DataWriter^ socket_writer; Windows::Storage::Streams::IBuffer^ send_buffer; - pj_bool_t is_busy_sending; - void *read_userdata; - void *write_userdata; - void *accept_userdata; - - void (*on_read)(PjUwpSocket *s, int bytes_read); - void (*on_write)(PjUwpSocket *s, int bytes_sent); - void (*on_accept)(PjUwpSocket *s, pj_status_t status); - void (*on_connect)(PjUwpSocket *s, pj_status_t status); + friend PjUwpSocketDatagramRecvHelper; + friend PjUwpSocketListenerHelper; }; @@ -92,6 +139,7 @@ inline pj_status_t wstr_addr_to_sockaddr(const wchar_t *waddr, const wchar_t *wport, pj_sockaddr_t *sockaddr) { +#if 0 char tmp_str_buf[PJ_INET6_ADDRSTRLEN+1]; pj_assert(wcslen(waddr) < sizeof(tmp_str_buf)); pj_unicode_to_ansi(waddr, wcslen(waddr), tmp_str_buf, sizeof(tmp_str_buf)); @@ -100,6 +148,31 @@ inline pj_status_t wstr_addr_to_sockaddr(const wchar_t *waddr, pj_sockaddr_parse(pj_AF_UNSPEC(), 0, &remote_host, (pj_sockaddr*)sockaddr); pj_sockaddr_set_port((pj_sockaddr*)sockaddr, (pj_uint16_t)_wtoi(wport)); + return PJ_SUCCESS; +#endif + char tmp_str_buf[PJ_INET6_ADDRSTRLEN+1]; + pj_assert(wcslen(waddr) < sizeof(tmp_str_buf)); + pj_unicode_to_ansi(waddr, wcslen(waddr), tmp_str_buf, sizeof(tmp_str_buf)); + pj_str_t remote_host; + pj_strset(&remote_host, tmp_str_buf, pj_ansi_strlen(tmp_str_buf)); + pj_sockaddr *addr = (pj_sockaddr*)sockaddr; + pj_bool_t got_addr = PJ_FALSE; + + if (pj_inet_pton(PJ_AF_INET, &remote_host, &addr->ipv4.sin_addr) + == PJ_SUCCESS) + { + addr->addr.sa_family = PJ_AF_INET; + got_addr = PJ_TRUE; + } else if (pj_inet_pton(PJ_AF_INET6, &remote_host, &addr->ipv6.sin6_addr) + == PJ_SUCCESS) + { + addr->addr.sa_family = PJ_AF_INET6; + got_addr = PJ_TRUE; + } + if (!got_addr) + return PJ_EINVAL; + + pj_sockaddr_set_port(addr, (pj_uint16_t)_wtoi(wport)); return PJ_SUCCESS; } diff --git a/pjmedia/src/pjmedia/transport_udp.c b/pjmedia/src/pjmedia/transport_udp.c index 70f16d6d7..688ef2402 100644 --- a/pjmedia/src/pjmedia/transport_udp.c +++ b/pjmedia/src/pjmedia/transport_udp.c @@ -752,6 +752,8 @@ static void transport_detach( pjmedia_transport *tp, pj_assert(tp); if (udp->attached) { + int i; + /* Lock the ioqueue keys to make sure that callbacks are * not executed. See ticket #460 for details. */ @@ -772,6 +774,13 @@ static void transport_detach( pjmedia_transport *tp, udp->rtcp_cb = NULL; udp->user_data = NULL; + /* Cancel any outstanding send */ + for (i=0; irtp_pending_write); ++i) { + pj_ioqueue_post_completion(udp->rtp_key, + &udp->rtp_pending_write[i].op_key, 0); + } + pj_ioqueue_post_completion(udp->rtcp_key, &udp->rtcp_write_op, 0); + /* Unlock keys */ pj_ioqueue_unlock_key(udp->rtcp_key); pj_ioqueue_unlock_key(udp->rtp_key); diff --git a/pjsip-apps/src/pjsua/winrt/gui/uwp/VoipBackEnd/MyApp.cpp b/pjsip-apps/src/pjsua/winrt/gui/uwp/VoipBackEnd/MyApp.cpp index 89658a4a4..f547cf036 100644 --- a/pjsip-apps/src/pjsua/winrt/gui/uwp/VoipBackEnd/MyApp.cpp +++ b/pjsip-apps/src/pjsua/winrt/gui/uwp/VoipBackEnd/MyApp.cpp @@ -470,6 +470,7 @@ void MyAppRT::init(IntAccount^ iAcc, IntCall^ iCall) /* Create transports. */ try { + sipTpConfig->port = 5060; ep.transportCreate(::pjsip_transport_type_e::PJSIP_TRANSPORT_TCP, *sipTpConfig); } catch (pj::Error& e) { @@ -582,7 +583,9 @@ void MyAppRT::registerThread(Platform::String^ name) bool MyAppRT::isThreadRegistered() { - return ep.libIsThreadRegistered(); + // Some threads are registered using PJLIB API, ep.libIsThreadRegistered() will return false on those threads. + //return ep.libIsThreadRegistered(); + return pj_thread_is_registered() != PJ_FALSE; } AccountInfo^ MyAppRT::getAccountInfo()