- Works on UWP socket & ioqueue.
 - Media transport UDP: cancel any pending send on detach, otherwise there is possibility that send buffer is already freed by application (stream) when the send op starts.
 - Ioqueue common abs: rename 'generic' as it seems to be a keyword in C++/CX, fixed #if/#endif possition in ioqueue_init_key().
 - pjsua GUI app: fixed thread registration status check.




git-svn-id: https://svn.pjsip.org/repos/pjproject/branches/projects/uwp@5256 74dad513-b988-da41-8d7b-12977e46ad98
This commit is contained in:
Nanang Izzuddin 2016-03-11 04:17:32 +00:00
parent 4660ce3230
commit 31c9c139fd
7 changed files with 839 additions and 856 deletions

View File

@ -116,9 +116,9 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool,
/* Create mutex for the key. */
#if !PJ_IOQUEUE_HAS_SAFE_UNREG
rc = pj_lock_create_simple_mutex(pool, NULL, &key->lock);
#endif
if (rc != PJ_SUCCESS)
return rc;
#endif
/* Group lock */
key->grp_lock = grp_lock;

View File

@ -80,7 +80,7 @@ struct accept_operation
union operation_key
{
struct generic_operation generic;
struct generic_operation generic_op;
struct read_operation read;
struct write_operation write;
#if PJ_HAS_TCP

View File

@ -16,111 +16,151 @@
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/*
#include <pj/ioqueue.h>
#include <pj/assert.h>
#include <pj/errno.h>
#include <pj/list.h>
#include <pj/lock.h>
#include <pj/pool.h>
#include <pj/string.h>
#include <pj/ioqueue.h>
#include <pj/os.h>
#include <pj/lock.h>
#include <pj/log.h>
#include <pj/list.h>
#include <pj/pool.h>
#include <pj/string.h>
#include <pj/assert.h>
#include <pj/sock.h>
#include <pj/compat/socket.h>
#include <pj/sock_select.h>
#include <pj/sock_qos.h>
#include <pj/errno.h>
#include <pj/rand.h>
*/
#include <pj/ioqueue.h>
#include <pj/sock.h>
#include <pj/addr_resolv.h>
#include <pj/assert.h>
#include <pj/errno.h>
#include <pj/lock.h>
#include <pj/math.h>
#include <pj/os.h>
#include <pj/pool.h>
#include <pj/string.h>
#include <pj/unicode.h>
#include <pj/compat/socket.h>
#include <ppltasks.h>
#include <string>
#define THIS_FILE "ioq_uwp"
#include "sock_uwp.h"
#include "ioqueue_common_abs.h"
/*
* IO Queue structure.
*/
struct pj_ioqueue_t
{
int dummy;
};
/*
* IO Queue key structure.
/*
* This describes each key.
*/
struct pj_ioqueue_key_t
{
pj_sock_t sock;
void *user_data;
pj_ioqueue_callback cb;
DECLARE_COMMON_KEY
};
/*
* This describes the I/O queue itself.
*/
struct pj_ioqueue_t
{
DECLARE_COMMON_IOQUEUE
pj_thread_desc thread_desc[16];
unsigned thread_cnt;
};
#include "ioqueue_common_abs.c"
static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
pj_ioqueue_key_t *key,
enum ioqueue_event_type event_type)
{
PJ_UNUSED_ARG(ioqueue);
PJ_UNUSED_ARG(key);
PJ_UNUSED_ARG(event_type);
}
static void start_next_read(pj_ioqueue_key_t *key)
{
if (key_has_pending_read(key)) {
PjUwpSocket *s = (PjUwpSocket*)key->fd;
struct read_operation *op;
op = (struct read_operation*)key->read_list.next;
if (op->op == PJ_IOQUEUE_OP_RECV)
s->Recv(NULL, (pj_ssize_t*)&op->size);
else
s->RecvFrom(NULL, (pj_ssize_t*)&op->size, NULL);
}
}
static void start_next_write(pj_ioqueue_key_t *key)
{
if (key_has_pending_write(key)) {
PjUwpSocket *s = (PjUwpSocket*)key->fd;
struct write_operation *op;
op = (struct write_operation*)key->write_list.next;
if (op->op == PJ_IOQUEUE_OP_SEND)
s->Send(op->buf, (pj_ssize_t*)&op->size);
else
s->SendTo(op->buf, (pj_ssize_t*)&op->size, &op->rmt_addr);
}
}
static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
pj_ioqueue_key_t *key,
enum ioqueue_event_type event_type )
{
PJ_UNUSED_ARG(ioqueue);
if (event_type == READABLE_EVENT) {
/* This is either recv, recvfrom, or accept, do nothing on accept */
start_next_read(key);
} else if (event_type == WRITEABLE_EVENT) {
/* This is either send, sendto, or connect, do nothing on connect */
//start_next_write(key);
}
}
static void check_thread(pj_ioqueue_t *ioq) {
if (pj_thread_is_registered())
return;
pj_thread_t *t;
char tmp[16];
pj_ansi_snprintf(tmp, sizeof(tmp), "UwpThread%02d", ioq->thread_cnt);
pj_thread_register(tmp, ioq->thread_desc[ioq->thread_cnt++], &t);
pj_assert(ioq->thread_cnt < PJ_ARRAY_SIZE(ioq->thread_desc));
ioq->thread_cnt %= PJ_ARRAY_SIZE(ioq->thread_desc);
}
static void on_read(PjUwpSocket *s, int bytes_read)
{
pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->user_data;
if (key->cb.on_read_complete) {
(*key->cb.on_read_complete)(key, (pj_ioqueue_op_key_t*)s->read_userdata, bytes_read);
}
s->read_userdata = NULL;
pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->GetUserData();
pj_ioqueue_t *ioq = key->ioqueue;
check_thread(ioq);
ioqueue_dispatch_read_event(key->ioqueue, key);
if (bytes_read > 0)
start_next_read(key);
}
static void on_write(PjUwpSocket *s, int bytes_sent)
{
pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->user_data;
if (key->cb.on_write_complete) {
(*key->cb.on_write_complete)(key, (pj_ioqueue_op_key_t*)s->write_userdata, bytes_sent);
}
s->write_userdata = NULL;
PJ_UNUSED_ARG(bytes_sent);
pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->GetUserData();
pj_ioqueue_t *ioq = key->ioqueue;
check_thread(ioq);
ioqueue_dispatch_write_event(key->ioqueue, key);
//start_next_write(key);
}
static void on_accept(PjUwpSocket *s, pj_status_t status)
static void on_accept(PjUwpSocket *s)
{
pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->user_data;
if (key->cb.on_accept_complete) {
if (status == PJ_SUCCESS) {
pj_sock_t new_sock;
pj_sockaddr addr;
int addrlen;
status = pj_sock_accept(key->sock, &new_sock, &addr, &addrlen);
(*key->cb.on_accept_complete)(key, (pj_ioqueue_op_key_t*)s->accept_userdata, new_sock, status);
} else {
(*key->cb.on_accept_complete)(key, (pj_ioqueue_op_key_t*)s->accept_userdata, NULL, status);
}
}
s->accept_userdata = NULL;
pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->GetUserData();
pj_ioqueue_t *ioq = key->ioqueue;
check_thread(ioq);
ioqueue_dispatch_read_event(key->ioqueue, key);
}
static void on_connect(PjUwpSocket *s, pj_status_t status)
{
pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->user_data;
if (key->cb.on_connect_complete) {
(*key->cb.on_connect_complete)(key, status);
}
PJ_UNUSED_ARG(status);
pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->GetUserData();
pj_ioqueue_t *ioq = key->ioqueue;
check_thread(ioq);
ioqueue_dispatch_write_event(key->ioqueue, key);
}
@ -141,10 +181,24 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
pj_ioqueue_t **p_ioqueue)
{
pj_ioqueue_t *ioq;
pj_lock_t *lock;
pj_status_t rc;
PJ_UNUSED_ARG(max_fd);
ioq = PJ_POOL_ZALLOC_T(pool, pj_ioqueue_t);
/* Create and init ioqueue mutex */
rc = pj_lock_create_null_mutex(pool, "ioq%p", &lock);
if (rc != PJ_SUCCESS)
return rc;
rc = pj_ioqueue_set_lock(ioq, lock, PJ_TRUE);
if (rc != PJ_SUCCESS)
return rc;
PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioq));
*p_ioqueue = ioq;
return PJ_SUCCESS;
}
@ -155,66 +209,22 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
*/
PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioq )
{
PJ_UNUSED_ARG(ioq);
return PJ_SUCCESS;
return ioqueue_destroy(ioq);
}
/*
* Set the lock object to be used by the I/O Queue.
*/
PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioq,
pj_lock_t *lock,
pj_bool_t auto_delete )
{
/* Don't really need lock for now */
PJ_UNUSED_ARG(ioq);
if (auto_delete) {
pj_lock_destroy(lock);
}
return PJ_SUCCESS;
}
PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue,
pj_bool_t allow)
{
/* Not supported, just return PJ_SUCCESS silently */
PJ_UNUSED_ARG(ioqueue);
PJ_UNUSED_ARG(allow);
return PJ_SUCCESS;
}
/*
* Register a socket to the I/O queue framework.
*/
PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
pj_ioqueue_t *ioq,
pj_ioqueue_t *ioqueue,
pj_sock_t sock,
void *user_data,
const pj_ioqueue_callback *cb,
pj_ioqueue_key_t **p_key )
{
PJ_UNUSED_ARG(ioq);
pj_ioqueue_key_t *key;
key = PJ_POOL_ZALLOC_T(pool, pj_ioqueue_key_t);
key->sock = sock;
key->user_data = user_data;
pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
PjUwpSocket *s = (PjUwpSocket*)sock;
s->is_blocking = PJ_FALSE;
s->user_data = key;
s->on_read = &on_read;
s->on_write = &on_write;
s->on_accept = &on_accept;
s->on_connect = &on_connect;
*p_key = key;
return PJ_SUCCESS;
return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data,
cb, p_key);
}
PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool,
@ -225,9 +235,42 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool,
const pj_ioqueue_callback *cb,
pj_ioqueue_key_t **p_key)
{
PJ_UNUSED_ARG(grp_lock);
PjUwpSocketCallback uwp_cb =
{ &on_read, &on_write, &on_accept, &on_connect };
pj_ioqueue_key_t *key;
pj_status_t rc;
pj_lock_acquire(ioqueue->lock);
key = PJ_POOL_ZALLOC_T(pool, pj_ioqueue_key_t);
rc = ioqueue_init_key(pool, ioqueue, key, sock, grp_lock, user_data, cb);
if (rc != PJ_SUCCESS) {
key = NULL;
goto on_return;
}
/* Create ioqueue key lock, if not yet */
if (!key->lock) {
rc = pj_lock_create_simple_mutex(pool, NULL, &key->lock);
if (rc != PJ_SUCCESS) {
key = NULL;
goto on_return;
}
}
PjUwpSocket *s = (PjUwpSocket*)sock;
s->SetNonBlocking(&uwp_cb, key);
on_return:
if (rc != PJ_SUCCESS) {
if (key && key->grp_lock)
pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0);
}
*p_key = key;
pj_lock_release(ioqueue->lock);
return rc;
return pj_ioqueue_register_sock(pool, ioqueue, sock, user_data, cb, p_key);
}
/*
@ -235,214 +278,59 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool,
*/
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
{
if (key == NULL || key->sock == NULL)
return PJ_SUCCESS;
pj_ioqueue_t *ioqueue;
if (key->sock)
pj_sock_close(key->sock);
key->sock = NULL;
PJ_ASSERT_RETURN(key, PJ_EINVAL);
ioqueue = key->ioqueue;
/* Lock the key to make sure no callback is simultaneously modifying
* the key. We need to lock the key before ioqueue here to prevent
* deadlock.
*/
pj_ioqueue_lock_key(key);
/* Also lock ioqueue */
pj_lock_acquire(ioqueue->lock);
/* Close socket. */
pj_sock_close(key->fd);
/* Clear callback */
key->cb.on_accept_complete = NULL;
key->cb.on_connect_complete = NULL;
key->cb.on_read_complete = NULL;
key->cb.on_write_complete = NULL;
pj_lock_release(ioqueue->lock);
if (key->grp_lock) {
pj_grp_lock_t *grp_lock = key->grp_lock;
pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0);
pj_grp_lock_release(grp_lock);
} else {
pj_ioqueue_unlock_key(key);
}
pj_lock_destroy(key->lock);
return PJ_SUCCESS;
}
/*
* Get user data associated with an ioqueue key.
*/
PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
{
return key->user_data;
}
/*
* Set or change the user data to be associated with the file descriptor or
* handle or socket descriptor.
*/
PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
void *user_data,
void **old_data)
{
if (old_data)
*old_data = key->user_data;
key->user_data= user_data;
return PJ_SUCCESS;
}
/*
* Initialize operation key.
*/
PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
pj_size_t size )
{
pj_bzero(op_key, size);
}
/*
* Check if operation is pending on the specified operation key.
*/
PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key )
{
PJ_UNUSED_ARG(key);
PJ_UNUSED_ARG(op_key);
return PJ_FALSE;
}
/*
* Post completion status to the specified operation key and call the
* appropriate callback.
*/
PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_status )
{
PJ_UNUSED_ARG(key);
PJ_UNUSED_ARG(op_key);
PJ_UNUSED_ARG(bytes_status);
return PJ_ENOTSUP;
}
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
/**
* Instruct I/O Queue to accept incoming connection on the specified
* listening socket.
*/
PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_sock_t *new_sock,
pj_sockaddr_t *local,
pj_sockaddr_t *remote,
int *addrlen )
{
PJ_UNUSED_ARG(new_sock);
PJ_UNUSED_ARG(local);
PJ_UNUSED_ARG(remote);
PJ_UNUSED_ARG(addrlen);
PjUwpSocket *s = (PjUwpSocket*)key->sock;
s->accept_userdata = op_key;
return pj_sock_listen(key->sock, 0);
}
/*
* Initiate non-blocking socket connect.
*/
PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
const pj_sockaddr_t *addr,
int addrlen )
{
return pj_sock_connect(key->sock, addr, addrlen);
}
#endif /* PJ_HAS_TCP */
/*
* Poll the I/O Queue for completed events.
*/
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioq,
const pj_time_val *timeout)
{
/* Polling is not necessary on uwp, since all async activities
* are registered to active scheduler.
/* Polling is not necessary on UWP, since each socket handles
* its events already.
*/
PJ_UNUSED_ARG(ioq);
PJ_UNUSED_ARG(timeout);
pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
return 0;
}
/*
* Instruct the I/O Queue to read from the specified handle.
*/
PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
void *buffer,
pj_ssize_t *length,
pj_uint32_t flags )
{
PjUwpSocket *s = (PjUwpSocket*)key->sock;
s->read_userdata = op_key;
return pj_sock_recv(key->sock, buffer, length, flags);
}
/*
* This function behaves similarly as #pj_ioqueue_recv(), except that it is
* normally called for socket, and the remote address will also be returned
* along with the data.
*/
PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
void *buffer,
pj_ssize_t *length,
pj_uint32_t flags,
pj_sockaddr_t *addr,
int *addrlen)
{
PjUwpSocket *s = (PjUwpSocket*)key->sock;
s->read_userdata = op_key;
return pj_sock_recvfrom(key->sock, buffer, length, flags, addr, addrlen);
}
/*
* Instruct the I/O Queue to write to the handle.
*/
PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
const void *data,
pj_ssize_t *length,
pj_uint32_t flags )
{
PjUwpSocket *s = (PjUwpSocket*)key->sock;
s->write_userdata = op_key;
return pj_sock_send(key->sock, data, length, flags);
}
/*
* Instruct the I/O Queue to write to the handle.
*/
PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
const void *data,
pj_ssize_t *length,
pj_uint32_t flags,
const pj_sockaddr_t *addr,
int addrlen)
{
PjUwpSocket *s = (PjUwpSocket*)key->sock;
s->write_userdata = op_key;
return pj_sock_sendto(key->sock, data, length, flags, addr, addrlen);
}
PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
pj_bool_t allow)
{
/* Not supported, just return PJ_SUCCESS silently */
PJ_UNUSED_ARG(key);
PJ_UNUSED_ARG(allow);
return PJ_SUCCESS;
}
PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
{
/* Not supported, just return PJ_SUCCESS silently */
PJ_UNUSED_ARG(key);
return PJ_SUCCESS;
}
PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
{
/* Not supported, just return PJ_SUCCESS silently */
PJ_UNUSED_ARG(key);
return PJ_SUCCESS;
}

File diff suppressed because it is too large Load Diff

View File

@ -18,6 +18,13 @@
*/
#pragma once
#include <pj/assert.h>
#include <pj/sock.h>
#include <pj/string.h>
#include <pj/unicode.h>
enum {
READ_TIMEOUT = 60 * 1000,
WRITE_TIMEOUT = 60 * 1000,
@ -36,6 +43,17 @@ enum PjUwpSocketState {
ref class PjUwpSocketDatagramRecvHelper;
ref class PjUwpSocketListenerHelper;
class PjUwpSocket;
typedef struct PjUwpSocketCallback
{
void (*on_read)(PjUwpSocket *s, int bytes_read);
void (*on_write)(PjUwpSocket *s, int bytes_sent);
void (*on_accept)(PjUwpSocket *s);
void (*on_connect)(PjUwpSocket *s, pj_status_t status);
} PjUwpSocketCallback;
/*
* UWP Socket Wrapper.
@ -44,19 +62,55 @@ class PjUwpSocket
{
public:
PjUwpSocket(int af_, int type_, int proto_);
PjUwpSocket* CreateAcceptSocket(Windows::Networking::Sockets::StreamSocket^ stream_sock_);
virtual ~PjUwpSocket();
pj_status_t InitSocket(enum PjUwpSocketType sock_type_);
void DeinitSocket();
void* GetUserData() { return user_data; }
void SetNonBlocking(const PjUwpSocketCallback *cb_, void *user_data_)
{
is_blocking = PJ_FALSE;
cb=*cb_;
user_data = user_data_;
}
enum PjUwpSocketType GetType() { return sock_type; }
enum PjUwpSocketState GetState() { return sock_state; }
pj_sockaddr* GetLocalAddr() { return &local_addr; }
pj_sockaddr* GetRemoteAddr() { return &remote_addr; }
pj_status_t Bind(const pj_sockaddr_t *addr = NULL);
pj_status_t Send(const void *buf, pj_ssize_t *len);
pj_status_t SendTo(const void *buf, pj_ssize_t *len, const pj_sockaddr_t *to);
pj_status_t Recv(void *buf, pj_ssize_t *len);
pj_status_t RecvFrom(void *buf, pj_ssize_t *len, pj_sockaddr_t *from);
pj_status_t Connect(const pj_sockaddr_t *addr);
pj_status_t Listen();
pj_status_t Accept(PjUwpSocket **new_sock);
void (*on_read)(PjUwpSocket *s, int bytes_read);
void (*on_write)(PjUwpSocket *s, int bytes_sent);
void (*on_accept)(PjUwpSocket *s, pj_status_t status);
void (*on_connect)(PjUwpSocket *s, pj_status_t status);
private:
PjUwpSocket* CreateAcceptSocket(Windows::Networking::Sockets::StreamSocket^ stream_sock_);
pj_status_t SendImp(const void *buf, pj_ssize_t *len);
int ConsumeReadBuffer(void *buf, int max_len);
public:
int af;
int type;
int proto;
pj_sockaddr local_addr;
pj_sockaddr remote_addr;
pj_bool_t is_blocking;
pj_bool_t has_pending_bind;
pj_bool_t has_pending_send;
pj_bool_t has_pending_recv;
void *user_data;
PjUwpSocketCallback cb;
enum PjUwpSocketType sock_type;
enum PjUwpSocketState sock_state;
@ -65,22 +119,15 @@ public:
Windows::Networking::Sockets::StreamSocketListener^ listener_sock;
/* Helper objects */
PjUwpSocketDatagramRecvHelper^ datagram_recv_helper;
PjUwpSocketDatagramRecvHelper^ dgram_recv_helper;
PjUwpSocketListenerHelper^ listener_helper;
Windows::Storage::Streams::DataReader^ socket_reader;
Windows::Storage::Streams::DataWriter^ socket_writer;
Windows::Storage::Streams::IBuffer^ send_buffer;
pj_bool_t is_busy_sending;
void *read_userdata;
void *write_userdata;
void *accept_userdata;
void (*on_read)(PjUwpSocket *s, int bytes_read);
void (*on_write)(PjUwpSocket *s, int bytes_sent);
void (*on_accept)(PjUwpSocket *s, pj_status_t status);
void (*on_connect)(PjUwpSocket *s, pj_status_t status);
friend PjUwpSocketDatagramRecvHelper;
friend PjUwpSocketListenerHelper;
};
@ -92,6 +139,7 @@ inline pj_status_t wstr_addr_to_sockaddr(const wchar_t *waddr,
const wchar_t *wport,
pj_sockaddr_t *sockaddr)
{
#if 0
char tmp_str_buf[PJ_INET6_ADDRSTRLEN+1];
pj_assert(wcslen(waddr) < sizeof(tmp_str_buf));
pj_unicode_to_ansi(waddr, wcslen(waddr), tmp_str_buf, sizeof(tmp_str_buf));
@ -100,6 +148,31 @@ inline pj_status_t wstr_addr_to_sockaddr(const wchar_t *waddr,
pj_sockaddr_parse(pj_AF_UNSPEC(), 0, &remote_host, (pj_sockaddr*)sockaddr);
pj_sockaddr_set_port((pj_sockaddr*)sockaddr, (pj_uint16_t)_wtoi(wport));
return PJ_SUCCESS;
#endif
char tmp_str_buf[PJ_INET6_ADDRSTRLEN+1];
pj_assert(wcslen(waddr) < sizeof(tmp_str_buf));
pj_unicode_to_ansi(waddr, wcslen(waddr), tmp_str_buf, sizeof(tmp_str_buf));
pj_str_t remote_host;
pj_strset(&remote_host, tmp_str_buf, pj_ansi_strlen(tmp_str_buf));
pj_sockaddr *addr = (pj_sockaddr*)sockaddr;
pj_bool_t got_addr = PJ_FALSE;
if (pj_inet_pton(PJ_AF_INET, &remote_host, &addr->ipv4.sin_addr)
== PJ_SUCCESS)
{
addr->addr.sa_family = PJ_AF_INET;
got_addr = PJ_TRUE;
} else if (pj_inet_pton(PJ_AF_INET6, &remote_host, &addr->ipv6.sin6_addr)
== PJ_SUCCESS)
{
addr->addr.sa_family = PJ_AF_INET6;
got_addr = PJ_TRUE;
}
if (!got_addr)
return PJ_EINVAL;
pj_sockaddr_set_port(addr, (pj_uint16_t)_wtoi(wport));
return PJ_SUCCESS;
}

View File

@ -752,6 +752,8 @@ static void transport_detach( pjmedia_transport *tp,
pj_assert(tp);
if (udp->attached) {
int i;
/* Lock the ioqueue keys to make sure that callbacks are
* not executed. See ticket #460 for details.
*/
@ -772,6 +774,13 @@ static void transport_detach( pjmedia_transport *tp,
udp->rtcp_cb = NULL;
udp->user_data = NULL;
/* Cancel any outstanding send */
for (i=0; i<PJ_ARRAY_SIZE(udp->rtp_pending_write); ++i) {
pj_ioqueue_post_completion(udp->rtp_key,
&udp->rtp_pending_write[i].op_key, 0);
}
pj_ioqueue_post_completion(udp->rtcp_key, &udp->rtcp_write_op, 0);
/* Unlock keys */
pj_ioqueue_unlock_key(udp->rtcp_key);
pj_ioqueue_unlock_key(udp->rtp_key);

View File

@ -470,6 +470,7 @@ void MyAppRT::init(IntAccount^ iAcc, IntCall^ iCall)
/* Create transports. */
try {
sipTpConfig->port = 5060;
ep.transportCreate(::pjsip_transport_type_e::PJSIP_TRANSPORT_TCP,
*sipTpConfig);
} catch (pj::Error& e) {
@ -582,7 +583,9 @@ void MyAppRT::registerThread(Platform::String^ name)
bool MyAppRT::isThreadRegistered()
{
return ep.libIsThreadRegistered();
// Some threads are registered using PJLIB API, ep.libIsThreadRegistered() will return false on those threads.
//return ep.libIsThreadRegistered();
return pj_thread_is_registered() != PJ_FALSE;
}
AccountInfo^ MyAppRT::getAccountInfo()