Committed ticket #337: Ability to restart PJSIP UDP transport

git-svn-id: https://svn.pjsip.org/repos/pjproject/trunk@1382 74dad513-b988-da41-8d7b-12977e46ad98
This commit is contained in:
Benny Prijono 2007-06-22 11:32:49 +00:00
parent dfdfc21f9f
commit dfc15beb2e
5 changed files with 488 additions and 98 deletions

View File

@ -102,6 +102,10 @@ SOURCE=..\src\samples\footprint.c
# End Source File
# Begin Source File
SOURCE=..\src\samples\invtester.c
# End Source File
# Begin Source File
SOURCE=..\src\samples\level.c
# End Source File
# Begin Source File
@ -168,6 +172,10 @@ SOURCE=..\src\samples\streamutil.c
SOURCE=..\src\samples\tonegen.c
# End Source File
# Begin Source File
SOURCE=..\src\samples\transportpausetest.c
# End Source File
# End Group
# Begin Group "Header Files"

View File

@ -222,7 +222,13 @@ PJ_BEGIN_DECL
* the specified destination.
*/
#define PJSIP_ETPNOTSUITABLE (PJSIP_ERRNO_START_PJSIP + 64) /* 171064 */
/**
* @hideinitializer
* Transport not available. This error occurs for example when the SIP stack
* is trying to use a SIP transport while the transport is being paused by
* application.
*/
#define PJSIP_ETPNOTAVAIL (PJSIP_ERRNO_START_PJSIP + 65) /* 171065 */
/************************************************************
* TRANSACTION ERRORS

View File

@ -37,6 +37,27 @@ PJ_BEGIN_DECL
* the transport to the framework.
*/
/**
* Flag that can be specified when calling #pjsip_udp_transport_pause() or
* #pjsip_udp_transport_restart().
*/
enum
{
/**
* This flag tells the transport to keep the existing/internal socket
* handle.
*/
PJSIP_UDP_TRANSPORT_KEEP_SOCKET = 1,
/**
* This flag tells the transport to destroy the existing/internal socket
* handle. Naturally this flag and PJSIP_UDP_TRANSPORT_KEEP_SOCKET are
* mutually exclusive.
*/
PJSIP_UDP_TRANSPORT_DESTROY_SOCKET = 2
};
/**
* Start UDP transport.
*
@ -81,6 +102,96 @@ PJ_DECL(pj_status_t) pjsip_udp_transport_attach(pjsip_endpoint *endpt,
pjsip_transport **p_transport);
/**
* Retrieve the internal socket handle used by the UDP transport. Note
* that this socket normally is registered to ioqueue, so if application
* wants to make use of this socket, it should temporarily pause the
* transport.
*
* @param transport The UDP transport.
*
* @return The socket handle, or PJ_INVALID_SOCKET if no socket
* is currently being used (for example, when transport
* is being paused).
*/
PJ_DECL(pj_sock_t) pjsip_udp_transport_get_socket(pjsip_transport *transport);
/**
* Temporarily pause or shutdown the transport. When transport is being
* paused, it cannot be used by the SIP stack to send or receive SIP
* messages.
*
* Two types of operations are supported by this function:
* - to temporarily make this transport unavailable for SIP uses, but
* otherwise keep the socket handle intact. Application then can
* retrieve the socket handle with #pjsip_udp_transport_get_socket()
* and use it to send/receive application data (for example, STUN
* messages). In this case, application should specify
* PJSIP_UDP_TRANSPORT_KEEP_SOCKET when calling this function, and
* also to specify this flag when calling #pjsip_udp_transport_restart()
* later.
* - to temporarily shutdown the transport, including closing down
* the internal socket handle. This is useful for example to
* temporarily suspend the application for an indefinite period. In
* this case, application should specify PJSIP_UDP_TRANSPORT_DESTROY_SOCKET
* flag when calling this function, and specify a new socket when
* calling #pjsip_udp_transport_restart().
*
* @param transport The UDP transport.
* @param option Pause option.
*
* @return PJ_SUCCESS if transport is paused successfully,
* or the appropriate error code.
*/
PJ_DECL(pj_status_t) pjsip_udp_transport_pause(pjsip_transport *transport,
unsigned option);
/**
* Restart the transport. Several operations are supported by this function:
* - if transport was made temporarily unavailable to SIP stack with
* pjsip_udp_transport_pause() and PJSIP_UDP_TRANSPORT_KEEP_SOCKET,
* application can make the transport available to the SIP stack
* again, by specifying PJSIP_UDP_TRANSPORT_KEEP_SOCKET flag here.
* - if application wants to replace the internal socket with a new
* socket, it must specify PJSIP_UDP_TRANSPORT_DESTROY_SOCKET when
* calling this function, so that the internal socket will be destroyed
* if it hasn't been closed. In this case, application has two choices
* on how to create the new socket: 1) to let the transport create
* the new socket, in this case the \a sock option should be set
* to \a PJ_INVALID_SOCKET and optionally the \a local parameter can be
* filled with the desired address and port where the new socket
* should be bound to, or 2) to specify its own socket to be used
* by this transport, by specifying a valid socket in \a sock argument
* and set the \a local argument to NULL. In both cases, application
* may specify the published address of the socket in \a a_name
* argument.
*
* @param transport The UDP transport.
* @param option Restart option.
* @param sock Optional socket to be used by the transport.
* @param local The address where the socket should be bound to.
* If this argument is NULL, socket will be bound
* to any available port.
* @param a_name Optionally specify the published address for
* this transport. If the socket is not replaced
* (PJSIP_UDP_TRANSPORT_KEEP_SOCKET flag is
* specified), then if this argument is NULL, the
* previous value will be used. If the socket is
* replaced and this argument is NULL, the bound
* address will be used as the published address
* of the transport.
*
* @return PJ_SUCCESS if transport can be restarted, or
* the appropriate error code.
*/
PJ_DECL(pj_status_t) pjsip_udp_transport_restart(pjsip_transport *transport,
unsigned option,
pj_sock_t sock,
const pj_sockaddr_in *local,
const pjsip_host_port *a_name);
PJ_END_DECL
/**

View File

@ -68,6 +68,7 @@ static const struct
PJ_BUILD_ERR( PJSIP_ERXOVERFLOW, "Rx buffer overflow"),
PJ_BUILD_ERR( PJSIP_EBUFDESTROYED, "Buffer destroyed"),
PJ_BUILD_ERR( PJSIP_ETPNOTSUITABLE, "Unsuitable transport selected"),
PJ_BUILD_ERR( PJSIP_ETPNOTAVAIL, "Transport not available for use"),
/* Transaction errors */
PJ_BUILD_ERR( PJSIP_ETSXDESTROYED, "Transaction has been destroyed"),

View File

@ -67,6 +67,7 @@ struct udp_transport
int rdata_cnt;
pjsip_rx_data **rdata;
int is_closing;
pj_bool_t is_paused;
};
@ -122,6 +123,10 @@ static void udp_on_read_complete( pj_ioqueue_key_t *key,
return;
}
/* Don't do anything if transport is being paused. */
if (tp->is_paused)
return;
/*
* The idea of the loop is to process immediate data received by
* pj_ioqueue_recvfrom(), as long as i < MAX_IMMEDIATE_PACKET. When
@ -205,6 +210,13 @@ static void udp_on_read_complete( pj_ioqueue_key_t *key,
op_key = &rdata->tp_info.op_key.op_key;
}
/* Only read next packet if transport is not being paused. This
* check handles the case where transport is paused while endpoint
* is still processing a SIP message.
*/
if (tp->is_paused)
return;
/* Read next packet. */
bytes_read = sizeof(rdata->pkt_info.packet);
rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);
@ -295,6 +307,10 @@ static pj_status_t udp_send_msg( pjsip_transport *transport,
PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);
PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX);
/* Return error if transport is paused */
if (tp->is_paused)
return PJSIP_ETPNOTAVAIL;
/* Init op key. */
tdata->op_key.tdata = tdata;
tdata->op_key.token = token;
@ -395,29 +411,100 @@ static pj_status_t udp_shutdown(pjsip_transport *transport)
}
/*
* pjsip_udp_transport_attach()
*
* Attach UDP socket and start transport.
*/
PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt,
pj_sock_t sock,
const pjsip_host_port *a_name,
unsigned async_cnt,
pjsip_transport **p_transport)
/* Create socket */
static pj_status_t create_socket(const pj_sockaddr_in *local_a,
pj_sock_t *p_sock)
{
enum { M = 80 };
pj_pool_t *pool;
struct udp_transport *tp;
pj_ioqueue_t *ioqueue;
pj_ioqueue_callback ioqueue_cb;
long sobuf_size;
unsigned i;
pj_sock_t sock;
pj_sockaddr_in tmp_addr;
pj_status_t status;
PJ_ASSERT_RETURN(endpt && sock!=PJ_INVALID_SOCKET && a_name && async_cnt>0,
PJ_EINVAL);
status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &sock);
if (status != PJ_SUCCESS)
return status;
if (local_a == NULL) {
pj_sockaddr_in_init(&tmp_addr, NULL, 0);
local_a = &tmp_addr;
}
status = pj_sock_bind(sock, local_a, sizeof(*local_a));
if (status != PJ_SUCCESS) {
pj_sock_close(sock);
return status;
}
*p_sock = sock;
return PJ_SUCCESS;
}
/* Generate transport's published address */
static pj_status_t get_published_name(pj_sock_t sock,
char hostbuf[],
pjsip_host_port *bound_name)
{
pj_sockaddr_in tmp_addr;
int addr_len;
pj_status_t status;
addr_len = sizeof(tmp_addr);
status = pj_sock_getsockname(sock, &tmp_addr, &addr_len);
if (status != PJ_SUCCESS)
return status;
bound_name->host.ptr = hostbuf;
bound_name->port = pj_ntohs(tmp_addr.sin_port);
/* If bound address specifies "0.0.0.0", get the IP address
* of local hostname.
*/
if (tmp_addr.sin_addr.s_addr == PJ_INADDR_ANY) {
pj_in_addr hostip;
status = pj_gethostip(&hostip);
if (status != PJ_SUCCESS)
return status;
pj_strcpy2(&bound_name->host, pj_inet_ntoa(hostip));
} else {
/* Otherwise use bound address. */
pj_strcpy2(&bound_name->host, pj_inet_ntoa(tmp_addr.sin_addr));
}
return PJ_SUCCESS;
}
/* Set the published address of the transport */
static void udp_set_pub_name(struct udp_transport *tp,
const pjsip_host_port *a_name)
{
enum { INFO_LEN = 80 };
pj_assert(a_name->host.slen != 0);
pj_strdup_with_null(tp->base.pool, &tp->base.local_name.host,
&a_name->host);
tp->base.local_name.port = a_name->port;
/* Update transport info. */
if (tp->base.info == NULL) {
tp->base.info = (char*) pj_pool_alloc(tp->base.pool, INFO_LEN);
}
pj_ansi_snprintf(
tp->base.info, INFO_LEN, "udp %s:%d [published as %s:%d]",
pj_inet_ntoa(((pj_sockaddr_in*)&tp->base.local_addr)->sin_addr),
pj_ntohs(((pj_sockaddr_in*)&tp->base.local_addr)->sin_port),
tp->base.local_name.host.ptr,
tp->base.local_name.port);
}
/* Set the socket handle of the transport */
static void udp_set_socket(struct udp_transport *tp,
pj_sock_t sock,
const pjsip_host_port *a_name)
{
long sobuf_size;
pj_status_t status;
/* Adjust socket rcvbuf size */
sobuf_size = PJSIP_UDP_SO_RCVBUF_SIZE;
@ -441,6 +528,83 @@ PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt,
status));
}
/* Set the socket. */
tp->sock = sock;
/* Init address name (published address) */
udp_set_pub_name(tp, a_name);
}
/* Register socket to ioqueue */
static pj_status_t register_to_ioqueue(struct udp_transport *tp)
{
pj_ioqueue_t *ioqueue;
pj_ioqueue_callback ioqueue_cb;
/* Register to ioqueue. */
ioqueue = pjsip_endpt_get_ioqueue(tp->base.endpt);
pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb));
ioqueue_cb.on_read_complete = &udp_on_read_complete;
ioqueue_cb.on_write_complete = &udp_on_write_complete;
return pj_ioqueue_register_sock(tp->base.pool, ioqueue, tp->sock, tp,
&ioqueue_cb, &tp->key);
}
/* Start ioqueue asynchronous reading to all rdata */
static pj_status_t start_async_read(struct udp_transport *tp)
{
pj_ioqueue_t *ioqueue;
int i;
pj_status_t status;
ioqueue = pjsip_endpt_get_ioqueue(tp->base.endpt);
/* Start reading the ioqueue. */
for (i=0; i<tp->rdata_cnt; ++i) {
pj_ssize_t size;
size = sizeof(tp->rdata[i]->pkt_info.packet);
tp->rdata[i]->pkt_info.src_addr_len = sizeof(tp->rdata[i]->pkt_info.src_addr);
status = pj_ioqueue_recvfrom(tp->key,
&tp->rdata[i]->tp_info.op_key.op_key,
tp->rdata[i]->pkt_info.packet,
&size, PJ_IOQUEUE_ALWAYS_ASYNC,
&tp->rdata[i]->pkt_info.src_addr,
&tp->rdata[i]->pkt_info.src_addr_len);
if (status == PJ_SUCCESS) {
pj_assert(!"Shouldn't happen because PJ_IOQUEUE_ALWAYS_ASYNC!");
udp_on_read_complete(tp->key, &tp->rdata[i]->tp_info.op_key.op_key,
size);
} else if (status != PJ_EPENDING) {
/* Error! */
return status;
}
}
return PJ_SUCCESS;
}
/*
* pjsip_udp_transport_attach()
*
* Attach UDP socket and start transport.
*/
PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt,
pj_sock_t sock,
const pjsip_host_port *a_name,
unsigned async_cnt,
pjsip_transport **p_transport)
{
pj_pool_t *pool;
struct udp_transport *tp;
unsigned i;
pj_status_t status;
PJ_ASSERT_RETURN(endpt && sock!=PJ_INVALID_SOCKET && a_name && async_cnt>0,
PJ_EINVAL);
/* Create pool. */
pool = pjsip_endpt_create_pool(endpt, "udp%p", PJSIP_POOL_LEN_TRANSPORT,
PJSIP_POOL_INC_TRANSPORT);
@ -489,38 +653,20 @@ PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt,
if (status != PJ_SUCCESS)
goto on_error;
/* Init address name (published address) */
pj_strdup_with_null(pool, &tp->base.local_name.host, &a_name->host);
tp->base.local_name.port = a_name->port;
/* Init remote name. */
tp->base.remote_name.host = pj_str("0.0.0.0");
tp->base.remote_name.port = 0;
/* Transport info. */
tp->base.info = (char*) pj_pool_alloc(pool, M);
pj_ansi_snprintf(
tp->base.info, M, "udp %s:%d [published as %s:%d]",
pj_inet_ntoa(((pj_sockaddr_in*)&tp->base.local_addr)->sin_addr),
pj_ntohs(((pj_sockaddr_in*)&tp->base.local_addr)->sin_port),
tp->base.local_name.host.ptr,
tp->base.local_name.port);
/* Set endpoint. */
tp->base.endpt = endpt;
/* Transport manager and timer will be initialized by tpmgr */
/* Attach socket. */
tp->sock = sock;
/* Attach socket and assign name. */
udp_set_socket(tp, sock, a_name);
/* Register to ioqueue. */
ioqueue = pjsip_endpt_get_ioqueue(endpt);
pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb));
ioqueue_cb.on_read_complete = &udp_on_read_complete;
ioqueue_cb.on_write_complete = &udp_on_write_complete;
status = pj_ioqueue_register_sock(pool, ioqueue, tp->sock, tp,
&ioqueue_cb, &tp->key);
/* Register to ioqueue */
status = register_to_ioqueue(tp);
if (status != PJ_SUCCESS)
goto on_error;
@ -562,26 +708,10 @@ PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt,
}
/* Start reading the ioqueue. */
for (i=0; i<async_cnt; ++i) {
pj_ssize_t size;
size = sizeof(tp->rdata[i]->pkt_info.packet);
tp->rdata[i]->pkt_info.src_addr_len = sizeof(tp->rdata[i]->pkt_info.src_addr);
status = pj_ioqueue_recvfrom(tp->key,
&tp->rdata[i]->tp_info.op_key.op_key,
tp->rdata[i]->pkt_info.packet,
&size, PJ_IOQUEUE_ALWAYS_ASYNC,
&tp->rdata[i]->pkt_info.src_addr,
&tp->rdata[i]->pkt_info.src_addr_len);
if (status == PJ_SUCCESS) {
pj_assert(!"Shouldn't happen because PJ_IOQUEUE_ALWAYS_ASYNC!");
udp_on_read_complete(tp->key, &tp->rdata[i]->tp_info.op_key.op_key,
size);
} else if (status != PJ_EPENDING) {
/* Error! */
pjsip_transport_destroy(&tp->base);
return status;
}
status = start_async_read(tp);
if (status != PJ_SUCCESS) {
pjsip_transport_destroy(&tp->base);
return status;
}
/* Done. */
@ -615,59 +745,25 @@ PJ_DEF(pj_status_t) pjsip_udp_transport_start( pjsip_endpoint *endpt,
pj_sock_t sock;
pj_status_t status;
char addr_buf[16];
pj_sockaddr_in tmp_addr;
pjsip_host_port bound_name;
PJ_ASSERT_RETURN(endpt && async_cnt, PJ_EINVAL);
status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &sock);
status = create_socket(local_a, &sock);
if (status != PJ_SUCCESS)
return status;
if (local_a == NULL) {
pj_sockaddr_in_init(&tmp_addr, NULL, 0);
local_a = &tmp_addr;
}
status = pj_sock_bind(sock, local_a, sizeof(*local_a));
if (status != PJ_SUCCESS) {
pj_sock_close(sock);
return status;
}
if (a_name == NULL) {
/* Address name is not specified.
* Build a name based on bound address.
*/
int addr_len;
addr_len = sizeof(tmp_addr);
status = pj_sock_getsockname(sock, &tmp_addr, &addr_len);
status = get_published_name(sock, addr_buf, &bound_name);
if (status != PJ_SUCCESS) {
pj_sock_close(sock);
return status;
}
a_name = &bound_name;
bound_name.host.ptr = addr_buf;
bound_name.port = pj_ntohs(tmp_addr.sin_port);
/* If bound address specifies "0.0.0.0", get the IP address
* of local hostname.
*/
if (tmp_addr.sin_addr.s_addr == PJ_INADDR_ANY) {
pj_in_addr hostip;
status = pj_gethostip(&hostip);
if (status != PJ_SUCCESS)
return status;
pj_strcpy2(&bound_name.host, pj_inet_ntoa(hostip));
} else {
/* Otherwise use bound address. */
pj_strcpy2(&bound_name.host, pj_inet_ntoa(tmp_addr.sin_addr));
}
}
return pjsip_udp_transport_attach( endpt, sock, a_name, async_cnt,
@ -675,3 +771,171 @@ PJ_DEF(pj_status_t) pjsip_udp_transport_start( pjsip_endpoint *endpt,
}
/*
* Retrieve the internal socket handle used by the UDP transport.
*/
PJ_DEF(pj_sock_t) pjsip_udp_transport_get_socket(pjsip_transport *transport)
{
struct udp_transport *tp;
PJ_ASSERT_RETURN(transport != NULL, PJ_INVALID_SOCKET);
tp = (struct udp_transport*) transport;
return tp->sock;
}
/*
* Temporarily pause or shutdown the transport.
*/
PJ_DEF(pj_status_t) pjsip_udp_transport_pause(pjsip_transport *transport,
unsigned option)
{
struct udp_transport *tp;
unsigned i;
PJ_ASSERT_RETURN(transport != NULL, PJ_EINVAL);
/* Flag must be specified */
PJ_ASSERT_RETURN((option & 0x03) != 0, PJ_EINVAL);
tp = (struct udp_transport*) transport;
/* Transport must not have been paused */
PJ_ASSERT_RETURN(tp->is_paused==0, PJ_EINVALIDOP);
/* Set transport to paused first, so that when the read callback is
* called by pj_ioqueue_post_completion() it will not try to
* re-register the rdata.
*/
tp->is_paused = PJ_TRUE;
/* Cancel the ioqueue operation. */
for (i=0; i<(unsigned)tp->rdata_cnt; ++i) {
pj_ioqueue_post_completion(tp->key,
&tp->rdata[i]->tp_info.op_key.op_key, -1);
}
/* Destroy the socket? */
if (option & PJSIP_UDP_TRANSPORT_DESTROY_SOCKET) {
if (tp->key) {
/* This implicitly closes the socket */
pj_ioqueue_unregister(tp->key);
tp->key = NULL;
} else {
/* Close socket. */
if (tp->sock && tp->sock != PJ_INVALID_SOCKET) {
pj_sock_close(tp->sock);
tp->sock = PJ_INVALID_SOCKET;
}
}
tp->sock = PJ_INVALID_SOCKET;
}
PJ_LOG(4,(tp->base.obj_name, "SIP UDP transport paused"));
return PJ_SUCCESS;
}
/*
* Restart transport.
*
* If option is KEEP_SOCKET, just re-activate ioqueue operation.
*
* If option is DESTROY_SOCKET:
* - if socket is specified, replace.
* - if socket is not specified, create and replace.
*/
PJ_DEF(pj_status_t) pjsip_udp_transport_restart(pjsip_transport *transport,
unsigned option,
pj_sock_t sock,
const pj_sockaddr_in *local,
const pjsip_host_port *a_name)
{
struct udp_transport *tp;
pj_status_t status;
PJ_ASSERT_RETURN(transport != NULL, PJ_EINVAL);
/* Flag must be specified */
PJ_ASSERT_RETURN((option & 0x03) != 0, PJ_EINVAL);
tp = (struct udp_transport*) transport;
if (option & PJSIP_UDP_TRANSPORT_DESTROY_SOCKET) {
char addr_buf[16];
pjsip_host_port bound_name;
/* Request to recreate transport */
/* Destroy existing socket, if any. */
if (tp->key) {
/* This implicitly closes the socket */
pj_ioqueue_unregister(tp->key);
tp->key = NULL;
} else {
/* Close socket. */
if (tp->sock && tp->sock != PJ_INVALID_SOCKET) {
pj_sock_close(tp->sock);
tp->sock = PJ_INVALID_SOCKET;
}
}
tp->sock = PJ_INVALID_SOCKET;
/* Create the socket if it's not specified */
if (sock == PJ_INVALID_SOCKET) {
status = create_socket(local, &sock);
if (status != PJ_SUCCESS)
return status;
}
/* If transport published name is not specified, calculate it
* from the bound address.
*/
if (a_name == NULL) {
status = get_published_name(sock, addr_buf, &bound_name);
if (status != PJ_SUCCESS) {
pj_sock_close(sock);
return status;
}
a_name = &bound_name;
}
/* Assign the socket and published address to transport. */
udp_set_socket(tp, sock, a_name);
} else {
/* For KEEP_SOCKET, transport must have been paused before */
PJ_ASSERT_RETURN(tp->is_paused, PJ_EINVALIDOP);
/* If address name is specified, update it */
if (a_name != NULL)
udp_set_pub_name(tp, a_name);
}
/* Re-register new or existing socket to ioqueue. */
status = register_to_ioqueue(tp);
if (status != PJ_SUCCESS) {
return status;
}
/* Restart async read operation. */
status = start_async_read(tp);
if (status != PJ_SUCCESS)
return status;
/* Everything has been set up */
tp->is_paused = PJ_FALSE;
PJ_LOG(4,(tp->base.obj_name,
"SIP UDP transport restarted, published address is %.*s:%d",
(int)tp->base.local_name.host.slen,
tp->base.local_name.host.ptr,
tp->base.local_name.port));
return PJ_SUCCESS;
}