app_queue uses a taskprocessor for device state changes
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@115270 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
parent
b0b72e89a8
commit
8fdea12f33
116
apps/app_queue.c
116
apps/app_queue.c
|
@ -92,6 +92,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
|
||||||
#include "asterisk/astobj2.h"
|
#include "asterisk/astobj2.h"
|
||||||
#include "asterisk/strings.h"
|
#include "asterisk/strings.h"
|
||||||
#include "asterisk/global_datastores.h"
|
#include "asterisk/global_datastores.h"
|
||||||
|
#include "asterisk/taskprocessor.h"
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \par Please read before modifying this file.
|
* \par Please read before modifying this file.
|
||||||
|
@ -131,6 +132,8 @@ static const struct strategy {
|
||||||
{ QUEUE_STRATEGY_WRANDOM, "wrandom"},
|
{ QUEUE_STRATEGY_WRANDOM, "wrandom"},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static struct ast_taskprocessor *devicestate_tps;
|
||||||
|
|
||||||
#define DEFAULT_RETRY 5
|
#define DEFAULT_RETRY 5
|
||||||
#define DEFAULT_TIMEOUT 15
|
#define DEFAULT_TIMEOUT 15
|
||||||
#define RECHECK 1 /*!< Recheck every second to see we we're at the top yet */
|
#define RECHECK 1 /*!< Recheck every second to see we we're at the top yet */
|
||||||
|
@ -739,18 +742,20 @@ static int update_status(const char *interface, const int status)
|
||||||
}
|
}
|
||||||
|
|
||||||
/*! \brief set a member's status based on device state of that member's interface*/
|
/*! \brief set a member's status based on device state of that member's interface*/
|
||||||
static void *handle_statechange(struct statechange *sc)
|
static int handle_statechange(void *datap)
|
||||||
{
|
{
|
||||||
struct member_interface *curint;
|
struct member_interface *curint;
|
||||||
char *loc;
|
char *loc;
|
||||||
char *technology;
|
char *technology;
|
||||||
|
struct statechange *sc = datap;
|
||||||
|
|
||||||
technology = ast_strdupa(sc->dev);
|
technology = ast_strdupa(sc->dev);
|
||||||
loc = strchr(technology, '/');
|
loc = strchr(technology, '/');
|
||||||
if (loc) {
|
if (loc) {
|
||||||
*loc++ = '\0';
|
*loc++ = '\0';
|
||||||
} else {
|
} else {
|
||||||
return NULL;
|
ast_free(sc);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
AST_LIST_LOCK(&interfaces);
|
AST_LIST_LOCK(&interfaces);
|
||||||
|
@ -770,84 +775,14 @@ static void *handle_statechange(struct statechange *sc)
|
||||||
if (!curint) {
|
if (!curint) {
|
||||||
if (option_debug > 2)
|
if (option_debug > 2)
|
||||||
ast_log(LOG_DEBUG, "Device '%s/%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", technology, loc, sc->state, devstate2str(sc->state));
|
ast_log(LOG_DEBUG, "Device '%s/%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", technology, loc, sc->state, devstate2str(sc->state));
|
||||||
return NULL;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (option_debug)
|
if (option_debug)
|
||||||
ast_log(LOG_DEBUG, "Device '%s/%s' changed to state '%d' (%s)\n", technology, loc, sc->state, devstate2str(sc->state));
|
ast_log(LOG_DEBUG, "Device '%s/%s' changed to state '%d' (%s)\n", technology, loc, sc->state, devstate2str(sc->state));
|
||||||
|
|
||||||
update_status(sc->dev, sc->state);
|
update_status(sc->dev, sc->state);
|
||||||
|
ast_free(sc);
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*! \brief Data used by the device state thread */
|
|
||||||
static struct {
|
|
||||||
/*! Set to 1 to stop the thread */
|
|
||||||
unsigned int stop:1;
|
|
||||||
/*! The device state monitoring thread */
|
|
||||||
pthread_t thread;
|
|
||||||
/*! Lock for the state change queue */
|
|
||||||
ast_mutex_t lock;
|
|
||||||
/*! Condition for the state change queue */
|
|
||||||
ast_cond_t cond;
|
|
||||||
/*! Queue of state changes */
|
|
||||||
AST_LIST_HEAD_NOLOCK(, statechange) state_change_q;
|
|
||||||
} device_state = {
|
|
||||||
.thread = AST_PTHREADT_NULL,
|
|
||||||
};
|
|
||||||
|
|
||||||
/*! \brief Consumer of the statechange queue */
|
|
||||||
static void *device_state_thread(void *data)
|
|
||||||
{
|
|
||||||
struct statechange *sc = NULL;
|
|
||||||
|
|
||||||
while (!device_state.stop) {
|
|
||||||
ast_mutex_lock(&device_state.lock);
|
|
||||||
if (!(sc = AST_LIST_REMOVE_HEAD(&device_state.state_change_q, entry))) {
|
|
||||||
ast_cond_wait(&device_state.cond, &device_state.lock);
|
|
||||||
sc = AST_LIST_REMOVE_HEAD(&device_state.state_change_q, entry);
|
|
||||||
}
|
|
||||||
ast_mutex_unlock(&device_state.lock);
|
|
||||||
|
|
||||||
/* Check to see if we were woken up to see the request to stop */
|
|
||||||
if (device_state.stop)
|
|
||||||
break;
|
|
||||||
|
|
||||||
if (!sc)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
handle_statechange(sc);
|
|
||||||
|
|
||||||
ast_free(sc);
|
|
||||||
sc = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sc)
|
|
||||||
ast_free(sc);
|
|
||||||
|
|
||||||
while ((sc = AST_LIST_REMOVE_HEAD(&device_state.state_change_q, entry)))
|
|
||||||
ast_free(sc);
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*! \brief Producer of the statechange queue */
|
|
||||||
static int statechange_queue(const char *dev, enum ast_device_state state)
|
|
||||||
{
|
|
||||||
struct statechange *sc;
|
|
||||||
|
|
||||||
if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(dev) + 1)))
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
sc->state = state;
|
|
||||||
strcpy(sc->dev, dev);
|
|
||||||
|
|
||||||
ast_mutex_lock(&device_state.lock);
|
|
||||||
AST_LIST_INSERT_TAIL(&device_state.state_change_q, sc, entry);
|
|
||||||
ast_cond_signal(&device_state.cond);
|
|
||||||
ast_mutex_unlock(&device_state.lock);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -855,6 +790,8 @@ static void device_state_cb(const struct ast_event *event, void *unused)
|
||||||
{
|
{
|
||||||
enum ast_device_state state;
|
enum ast_device_state state;
|
||||||
const char *device;
|
const char *device;
|
||||||
|
struct statechange *sc;
|
||||||
|
size_t datapsize;
|
||||||
|
|
||||||
state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
|
state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
|
||||||
device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
|
device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
|
||||||
|
@ -863,8 +800,16 @@ static void device_state_cb(const struct ast_event *event, void *unused)
|
||||||
ast_log(LOG_ERROR, "Received invalid event that had no device IE\n");
|
ast_log(LOG_ERROR, "Received invalid event that had no device IE\n");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
datapsize = sizeof(*sc) + strlen(device) + 1;
|
||||||
statechange_queue(device, state);
|
if (!(sc = ast_calloc(1, datapsize))) {
|
||||||
|
ast_log(LOG_ERROR, "failed to calloc a state change struct\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
sc->state = state;
|
||||||
|
strcpy(sc->dev, device);
|
||||||
|
if (ast_taskprocessor_push(devicestate_tps, handle_statechange, sc) < 0) {
|
||||||
|
ast_free(sc);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*! \brief allocate space for new queue member and set fields based on parameters passed */
|
/*! \brief allocate space for new queue member and set fields based on parameters passed */
|
||||||
|
@ -6249,14 +6194,6 @@ static int unload_module(void)
|
||||||
struct ao2_iterator q_iter;
|
struct ao2_iterator q_iter;
|
||||||
struct call_queue *q = NULL;
|
struct call_queue *q = NULL;
|
||||||
|
|
||||||
if (device_state.thread != AST_PTHREADT_NULL) {
|
|
||||||
device_state.stop = 1;
|
|
||||||
ast_mutex_lock(&device_state.lock);
|
|
||||||
ast_cond_signal(&device_state.cond);
|
|
||||||
ast_mutex_unlock(&device_state.lock);
|
|
||||||
pthread_join(device_state.thread, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
ast_cli_unregister_multiple(cli_queue, sizeof(cli_queue) / sizeof(struct ast_cli_entry));
|
ast_cli_unregister_multiple(cli_queue, sizeof(cli_queue) / sizeof(struct ast_cli_entry));
|
||||||
res = ast_manager_unregister("QueueStatus");
|
res = ast_manager_unregister("QueueStatus");
|
||||||
res |= ast_manager_unregister("Queues");
|
res |= ast_manager_unregister("Queues");
|
||||||
|
@ -6296,7 +6233,7 @@ static int unload_module(void)
|
||||||
queue_unref(q);
|
queue_unref(q);
|
||||||
}
|
}
|
||||||
ao2_ref(queues, -1);
|
ao2_ref(queues, -1);
|
||||||
|
devicestate_tps = ast_taskprocessor_unreference(devicestate_tps);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6319,10 +6256,6 @@ static int load_module(void)
|
||||||
if (queue_persistent_members)
|
if (queue_persistent_members)
|
||||||
reload_queue_members();
|
reload_queue_members();
|
||||||
|
|
||||||
ast_mutex_init(&device_state.lock);
|
|
||||||
ast_cond_init(&device_state.cond, NULL);
|
|
||||||
ast_pthread_create(&device_state.thread, NULL, device_state_thread, NULL);
|
|
||||||
|
|
||||||
ast_cli_register_multiple(cli_queue, sizeof(cli_queue) / sizeof(struct ast_cli_entry));
|
ast_cli_register_multiple(cli_queue, sizeof(cli_queue) / sizeof(struct ast_cli_entry));
|
||||||
res = ast_register_application(app, queue_exec, synopsis, descrip);
|
res = ast_register_application(app, queue_exec, synopsis, descrip);
|
||||||
res |= ast_register_application(app_aqm, aqm_exec, app_aqm_synopsis, app_aqm_descrip);
|
res |= ast_register_application(app_aqm, aqm_exec, app_aqm_synopsis, app_aqm_descrip);
|
||||||
|
@ -6345,6 +6278,11 @@ static int load_module(void)
|
||||||
res |= ast_custom_function_register(&queuememberlist_function);
|
res |= ast_custom_function_register(&queuememberlist_function);
|
||||||
res |= ast_custom_function_register(&queuewaitingcount_function);
|
res |= ast_custom_function_register(&queuewaitingcount_function);
|
||||||
res |= ast_custom_function_register(&queuememberpenalty_function);
|
res |= ast_custom_function_register(&queuememberpenalty_function);
|
||||||
|
|
||||||
|
if (!(devicestate_tps = ast_taskprocessor_get("app_queue", 0))) {
|
||||||
|
ast_log(LOG_WARNING, "devicestate taskprocessor reference failed - devicestate notifications will not occur\n");
|
||||||
|
}
|
||||||
|
|
||||||
if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL, AST_EVENT_IE_END)))
|
if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL, AST_EVENT_IE_END)))
|
||||||
res = -1;
|
res = -1;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue