asterisk/main/manager_channels.c
David M. Lee 2de42c2a25 Multiple revisions 399887,400138,400178,400180-400181
........
  r399887 | dlee | 2013-09-26 10:41:47 -0500 (Thu, 26 Sep 2013) | 1 line
  
  Minor performance bump by not allocate manager variable struct if we don't need it
........
  r400138 | dlee | 2013-09-30 10:24:00 -0500 (Mon, 30 Sep 2013) | 23 lines
  
  Stasis performance improvements
  
  This patch addresses several performance problems that were found in
  the initial performance testing of Asterisk 12.
  
  The Stasis dispatch object was allocated as an AO2 object, even though
  it has a very confined lifecycle. This was replaced with a straight
  ast_malloc().
  
  The Stasis message router was spending an inordinate amount of time
  searching hash tables. In this case, most of our routers had 6 or
  fewer routes in them to begin with. This was replaced with an array
  that's searched linearly for the route.
  
  We more heavily rely on AO2 objects in Asterisk 12, and the memset()
  in ao2_ref() actually became noticeable on the profile. This was
  #ifdef'ed to only run when AO2_DEBUG was enabled.
  
  After being misled by an erroneous comment in taskprocessor.c during
  profiling, the wrong comment was removed.
  
  Review: https://reviewboard.asterisk.org/r/2873/
........
  r400178 | dlee | 2013-09-30 13:26:27 -0500 (Mon, 30 Sep 2013) | 24 lines
  
  Taskprocessor optimization; switch Stasis to use taskprocessors
  
  This patch optimizes taskprocessor to use a semaphore for signaling,
  which the OS can do a better job at managing contention and waiting
  that we can with a mutex and condition.
  
  The taskprocessor execution was also slightly optimized to reduce the
  number of locks taken.
  
  The only observable difference in the taskprocessor implementation is
  that when the final reference to the taskprocessor goes away, it will
  execute all tasks to completion instead of discarding the unexecuted
  tasks.
  
  For systems where unnamed semaphores are not supported, a really
  simple semaphore implementation is provided. (Which gives identical
  performance as the original taskprocessor implementation).
  
  The way we ended up implementing Stasis caused the threadpool to be a
  burden instead of a boost to performance. This was switched to just
  use taskprocessors directly for subscriptions.
  
  Review: https://reviewboard.asterisk.org/r/2881/
........
  r400180 | dlee | 2013-09-30 13:39:34 -0500 (Mon, 30 Sep 2013) | 28 lines
  
  Optimize how Stasis forwards are dispatched
  
  This patch optimizes how forwards are dispatched in Stasis.
  
  Originally, forwards were dispatched as subscriptions that are invoked
  on the publishing thread. This did not account for the vast number of
  forwards we would end up having in the system, and the amount of work it
  would take to walk though the forward subscriptions.
  
  This patch modifies Stasis so that rather than walking the tree of
  forwards on every dispatch, when forwards and subscriptions are changed,
  the subscriber list for every topic in the tree is changed.
  
  This has a couple of benefits. First, this reduces the workload of
  dispatching messages. It also reduces contention when dispatching to
  different topics that happen to forward to the same aggregation topic
  (as happens with all of the channel, bridge and endpoint topics).
  
  Since forwards are no longer subscriptions, the bulk of this patch is
  simply changing stasis_subscription objects to stasis_forward objects
  (which, admittedly, I should have done in the first place.)
  
  Since this required me to yet again put in a growing array, I finally
  abstracted that out into a set of ast_vector macros in
  asterisk/vector.h.
  
  Review: https://reviewboard.asterisk.org/r/2883/
........
  r400181 | dlee | 2013-09-30 13:48:57 -0500 (Mon, 30 Sep 2013) | 28 lines
  
  Remove dispatch object allocation from Stasis publishing
  
  While looking for areas for performance improvement, I realized that an
  unused feature in Stasis was negatively impacting performance.
  
  When a message is sent to a subscriber, a dispatch object is allocated
  for the dispatch, containing the topic the message was published to, the
  subscriber the message is being sent to, and the message itself.
  
  The topic is actually unused by any subscriber in Asterisk today. And
  the subscriber is associated with the taskprocessor the message is being
  dispatched to.
  
  First, this patch removes the unused topic parameter from Stasis
  subscription callbacks.
  
  Second, this patch introduces the concept of taskprocessor local data,
  data that may be set on a taskprocessor and provided along with the data
  pointer when a task is pushed using the ast_taskprocessor_push_local()
  call. This allows the task to have both data specific to that
  taskprocessor, in addition to data specific to that invocation.
  
  With those two changes, the dispatch object can be removed completely,
  and the message is simply refcounted and sent directly to the
  taskprocessor.
  
  Review: https://reviewboard.asterisk.org/r/2884/
........

Merged revisions 399887,400138,400178,400180-400181 from http://svn.asterisk.org/svn/asterisk/branches/12


git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@400186 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-09-30 18:55:27 +00:00

1196 lines
36 KiB
C

/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2013, Digium, Inc.
*
* David M. Lee, II <dlee@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*! \file
*
* \brief The Asterisk Management Interface - AMI (channel event handling)
*
* \author David M. Lee, II <dlee@digium.com>
*
* AMI generated many per-channel and global-channel events by converting Stasis
* messages to AMI events. It makes sense to simply put them into a single file.
*/
#include "asterisk.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/callerid.h"
#include "asterisk/channel.h"
#include "asterisk/manager.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/pbx.h"
#include "asterisk/stasis_channels.h"
/*** DOCUMENTATION
<managerEvent language="en_US" name="Newchannel">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>Raised when a new channel is created.</synopsis>
<syntax>
<channel_snapshot/>
</syntax>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="Newstate">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>Raised when a channel's state changes.</synopsis>
<syntax>
<channel_snapshot/>
</syntax>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="Hangup">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>Raised when a channel is hung up.</synopsis>
<syntax>
<channel_snapshot/>
<parameter name="Cause">
<para>A numeric cause code for why the channel was hung up.</para>
</parameter>
<parameter name="Cause-txt">
<para>A description of why the channel was hung up.</para>
</parameter>
</syntax>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="HangupRequest">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>Raised when a hangup is requested.</synopsis>
<syntax>
<channel_snapshot/>
<xi:include xpointer="xpointer(/docs/managerEvent[@name='Hangup']/managerEventInstance/syntax/parameter[@name='Cause'])" />
</syntax>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="SoftHangupRequest">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>Raised when a soft hangup is requested with a specific cause code.</synopsis>
<syntax>
<channel_snapshot/>
<xi:include xpointer="xpointer(/docs/managerEvent[@name='Hangup']/managerEventInstance/syntax/parameter[@name='Cause'])" />
</syntax>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="NewExten">
<managerEventInstance class="EVENT_FLAG_DIALPLAN">
<synopsis>Raised when a channel enters a new context, extension, priority.</synopsis>
<syntax>
<channel_snapshot/>
<parameter name="Extension">
<para>Deprecated in 12, but kept for
backward compatability. Please use
'Exten' instead.</para>
</parameter>
<parameter name="Application">
<para>The application about to be executed.</para>
</parameter>
<parameter name="AppData">
<para>The data to be passed to the application.</para>
</parameter>
</syntax>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="NewCallerid">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>Raised when a channel receives new Caller ID information.</synopsis>
<syntax>
<channel_snapshot/>
<parameter name="CID-CallingPres">
<para>A description of the Caller ID presentation.</para>
</parameter>
</syntax>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="NewAccountCode">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>Raised when a Channel's AccountCode is changed.</synopsis>
<syntax>
<channel_snapshot/>
<parameter name="OldAccountCode">
<para>The channel's previous account code</para>
</parameter>
</syntax>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="DialBegin">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>Raised when a dial action has started.</synopsis>
<syntax>
<channel_snapshot/>
<channel_snapshot prefix="Dest"/>
<parameter name="DialString">
<para>The non-technology specific device being dialed.</para>
</parameter>
</syntax>
<see-also>
<ref type="application">Dial</ref>
</see-also>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="DialEnd">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>Raised when a dial action has completed.</synopsis>
<syntax>
<channel_snapshot/>
<channel_snapshot prefix="Dest"/>
<parameter name="DialStatus">
<para>The result of the dial operation.</para>
<enumlist>
<enum name="ANSWER" />
<enum name="BUSY" />
<enum name="CANCEL" />
<enum name="CHANUNAVAIL" />
<enum name="CONGESTION" />
<enum name="NOANSWER" />
</enumlist>
</parameter>
</syntax>
<see-also>
<ref type="application">Dial</ref>
</see-also>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="Hold">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>Raised when a channel goes on hold.</synopsis>
<syntax>
<channel_snapshot/>
<parameter name="MusicClass">
<para>The suggested MusicClass, if provided.</para>
</parameter>
</syntax>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="Unhold">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>Raised when a channel goes off hold.</synopsis>
<syntax>
<channel_snapshot/>
</syntax>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="ChanSpyStart">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>Raised when one channel begins spying on another channel.</synopsis>
<syntax>
<channel_snapshot prefix="Spyer"/>
<channel_snapshot prefix="Spyee"/>
</syntax>
<see-also>
<ref type="application">ChanSpyStop</ref>
</see-also>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="ChanSpyStop">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>Raised when a channel has stopped spying.</synopsis>
<syntax>
<channel_snapshot prefix="Spyer"/>
<channel_snapshot prefix="Spyee"/>
</syntax>
<see-also>
<ref type="application">ChanSpyStart</ref>
</see-also>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="HangupHandlerRun">
<managerEventInstance class="EVENT_FLAG_DIALPLAN">
<synopsis>Raised when a hangup handler is about to be called.</synopsis>
<syntax>
<channel_snapshot/>
<parameter name="Handler">
<para>Hangup handler parameter string passed to the Gosub application.</para>
</parameter>
</syntax>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="HangupHandlerPop">
<managerEventInstance class="EVENT_FLAG_DIALPLAN">
<synopsis>
Raised when a hangup handler is removed from the handler stack
by the CHANNEL() function.
</synopsis>
<syntax>
<channel_snapshot/>
<xi:include xpointer="xpointer(/docs/managerEvent[@name='HangupHandlerRun']/managerEventInstance/syntax/parameter)" />
</syntax>
<see-also>
<ref type="managerEvent">HangupHandlerPush</ref>
<ref type="function">CHANNEL</ref>
</see-also>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="HangupHandlerPush">
<managerEventInstance class="EVENT_FLAG_DIALPLAN">
<synopsis>
Raised when a hangup handler is added to the handler stack by
the CHANNEL() function.
</synopsis>
<syntax>
<channel_snapshot/>
<xi:include xpointer="xpointer(/docs/managerEvent[@name='HangupHandlerRun']/managerEventInstance/syntax/parameter)" />
</syntax>
<see-also>
<ref type="managerEvent">HangupHandlerPop</ref>
<ref type="function">CHANNEL</ref>
</see-also>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="FAXStatus">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>
Raised periodically during a fax transmission.
</synopsis>
<syntax>
<channel_snapshot/>
<parameter name="Operation">
<enumlist>
<enum name="gateway"/>
<enum name="receive"/>
<enum name="send"/>
</enumlist>
</parameter>
<parameter name="Status">
<para>A text message describing the current status of the fax</para>
</parameter>
<xi:include xpointer="xpointer(/docs/managerEvent[@name='ReceiveFAX']/managerEventInstance/syntax/parameter[@name='LocalStationID'])" />
<xi:include xpointer="xpointer(/docs/managerEvent[@name='ReceiveFAX']/managerEventInstance/syntax/parameter[@name='FileName'])" />
</syntax>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="ReceiveFAX">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>
Raised when a receive fax operation has completed.
</synopsis>
<syntax>
<channel_snapshot/>
<parameter name="LocalStationID">
<para>The value of the <variable>LOCALSTATIONID</variable> channel variable</para>
</parameter>
<parameter name="RemoteStationID">
<para>The value of the <variable>REMOTESTATIONID</variable> channel variable</para>
</parameter>
<parameter name="PagesTransferred">
<para>The number of pages that have been transferred</para>
</parameter>
<parameter name="Resolution">
<para>The negotiated resolution</para>
</parameter>
<parameter name="TransferRate">
<para>The negotiated transfer rate</para>
</parameter>
<parameter name="FileName" multiple="yes">
<para>The files being affected by the fax operation</para>
</parameter>
</syntax>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="SendFAX">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>
Raised when a send fax operation has completed.
</synopsis>
<syntax>
<channel_snapshot/>
<xi:include xpointer="xpointer(/docs/managerEvent[@name='ReceiveFAX']/managerEventInstance/syntax/parameter)" />
</syntax>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="MusicOnHoldStart">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>Raised when music on hold has started on a channel.</synopsis>
<syntax>
<channel_snapshot/>
<parameter name="Class">
<para>The class of music being played on the channel</para>
</parameter>
</syntax>
<see-also>
<ref type="managerEvent">MusicOnHoldStop</ref>
<ref type="application">MusicOnHold</ref>
</see-also>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="MusicOnHoldStop">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>Raised when music on hold has stopped on a channel.</synopsis>
<syntax>
<channel_snapshot/>
</syntax>
<see-also>
<ref type="managerEvent">MusicOnHoldStart</ref>
<ref type="application">StopMusicOnHold</ref>
</see-also>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="MonitorStart">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>Raised when monitoring has started on a channel.</synopsis>
<syntax>
<channel_snapshot/>
</syntax>
<see-also>
<ref type="managerEvent">MonitorStop</ref>
<ref type="application">Monitor</ref>
<ref type="manager">Monitor</ref>
</see-also>
</managerEventInstance>
</managerEvent>
<managerEvent language="en_US" name="MonitorStop">
<managerEventInstance class="EVENT_FLAG_CALL">
<synopsis>Raised when monitoring has stopped on a channel.</synopsis>
<syntax>
<channel_snapshot/>
</syntax>
<see-also>
<ref type="managerEvent">MonitorStart</ref>
<ref type="application">StopMonitor</ref>
<ref type="manager">StopMonitor</ref>
</see-also>
</managerEventInstance>
</managerEvent>
***/
/*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
* to the manager topic
*/
static struct stasis_forward *topic_forwarder;
struct ast_str *ast_manager_build_channel_state_string_prefix(
const struct ast_channel_snapshot *snapshot,
const char *prefix)
{
struct ast_str *out = ast_str_create(1024);
int res = 0;
if (!out) {
return NULL;
}
if (snapshot->tech_properties & AST_CHAN_TP_INTERNAL) {
ast_free(out);
return NULL;
}
res = ast_str_set(&out, 0,
"%sChannel: %s\r\n"
"%sChannelState: %d\r\n"
"%sChannelStateDesc: %s\r\n"
"%sCallerIDNum: %s\r\n"
"%sCallerIDName: %s\r\n"
"%sConnectedLineNum: %s\r\n"
"%sConnectedLineName: %s\r\n"
"%sAccountCode: %s\r\n"
"%sContext: %s\r\n"
"%sExten: %s\r\n"
"%sPriority: %d\r\n"
"%sUniqueid: %s\r\n",
prefix, snapshot->name,
prefix, snapshot->state,
prefix, ast_state2str(snapshot->state),
prefix, S_OR(snapshot->caller_number, "<unknown>"),
prefix, S_OR(snapshot->caller_name, "<unknown>"),
prefix, S_OR(snapshot->connected_number, "<unknown>"),
prefix, S_OR(snapshot->connected_name, "<unknown>"),
prefix, snapshot->accountcode,
prefix, snapshot->context,
prefix, snapshot->exten,
prefix, snapshot->priority,
prefix, snapshot->uniqueid);
if (!res) {
ast_free(out);
return NULL;
}
if (snapshot->manager_vars) {
struct ast_var_t *var;
AST_LIST_TRAVERSE(snapshot->manager_vars, var, entries) {
ast_str_append(&out, 0, "%sChanVariable: %s=%s\r\n",
prefix,
var->name, var->value);
}
}
return out;
}
struct ast_str *ast_manager_build_channel_state_string(
const struct ast_channel_snapshot *snapshot)
{
return ast_manager_build_channel_state_string_prefix(snapshot, "");
}
/*! \brief Typedef for callbacks that get called on channel snapshot updates */
typedef struct ast_manager_event_blob *(*channel_snapshot_monitor)(
struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot);
/*! \brief Handle channel state changes */
static struct ast_manager_event_blob *channel_state_change(
struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot)
{
int is_hungup, was_hungup;
if (!new_snapshot) {
/* Ignore cache clearing events; we'll see the hangup first */
return NULL;
}
/* The Newchannel, Newstate and Hangup events are closely related, in
* in that they are mutually exclusive, basically different flavors
* of a new channel state event.
*/
if (!old_snapshot) {
return ast_manager_event_blob_create(
EVENT_FLAG_CALL, "Newchannel", NO_EXTRA_FIELDS);
}
was_hungup = ast_test_flag(&old_snapshot->flags, AST_FLAG_DEAD) ? 1 : 0;
is_hungup = ast_test_flag(&new_snapshot->flags, AST_FLAG_DEAD) ? 1 : 0;
if (!was_hungup && is_hungup) {
return ast_manager_event_blob_create(
EVENT_FLAG_CALL, "Hangup",
"Cause: %d\r\n"
"Cause-txt: %s\r\n",
new_snapshot->hangupcause,
ast_cause2str(new_snapshot->hangupcause));
}
if (old_snapshot->state != new_snapshot->state) {
return ast_manager_event_blob_create(
EVENT_FLAG_CALL, "Newstate", NO_EXTRA_FIELDS);
}
/* No event */
return NULL;
}
static struct ast_manager_event_blob *channel_newexten(
struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot)
{
/* No Newexten event on cache clear */
if (!new_snapshot) {
return NULL;
}
/* Empty application is not valid for a Newexten event */
if (ast_strlen_zero(new_snapshot->appl)) {
return NULL;
}
/* Ignore any updates if we're hungup */
if (ast_test_flag(&new_snapshot->flags, AST_FLAG_DEAD)) {
return NULL;
}
if (old_snapshot && ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)
&& !strcmp(old_snapshot->appl, new_snapshot->appl)) {
return NULL;
}
/* DEPRECATED: Extension field deprecated in 12; remove in 14 */
return ast_manager_event_blob_create(
EVENT_FLAG_CALL, "Newexten",
"Extension: %s\r\n"
"Application: %s\r\n"
"AppData: %s\r\n",
new_snapshot->exten,
new_snapshot->appl,
new_snapshot->data);
}
static struct ast_manager_event_blob *channel_new_callerid(
struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot)
{
/* No NewCallerid event on cache clear or first event */
if (!old_snapshot || !new_snapshot) {
return NULL;
}
if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
return NULL;
}
return ast_manager_event_blob_create(
EVENT_FLAG_CALL, "NewCallerid",
"CID-CallingPres: %d (%s)\r\n",
new_snapshot->caller_pres,
ast_describe_caller_presentation(new_snapshot->caller_pres));
}
static struct ast_manager_event_blob *channel_new_accountcode(
struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot)
{
if (!old_snapshot || !new_snapshot) {
return NULL;
}
if (!strcmp(old_snapshot->accountcode, new_snapshot->accountcode)) {
return NULL;
}
return ast_manager_event_blob_create(
EVENT_FLAG_CALL, "NewAccountCode",
"OldAccountCode: %s\r\n", old_snapshot->accountcode);
}
channel_snapshot_monitor channel_monitors[] = {
channel_state_change,
channel_newexten,
channel_new_callerid,
channel_new_accountcode
};
static void channel_snapshot_update(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
struct stasis_cache_update *update;
struct ast_channel_snapshot *old_snapshot;
struct ast_channel_snapshot *new_snapshot;
size_t i;
update = stasis_message_data(message);
ast_assert(ast_channel_snapshot_type() == update->type);
old_snapshot = stasis_message_data(update->old_snapshot);
new_snapshot = stasis_message_data(update->new_snapshot);
for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
RAII_VAR(struct ast_manager_event_blob *, ev, NULL, ao2_cleanup);
ev = channel_monitors[i](old_snapshot, new_snapshot);
if (!ev) {
continue;
}
/* If we haven't already, build the channel event string */
if (!channel_event_string) {
channel_event_string =
ast_manager_build_channel_state_string(new_snapshot);
if (!channel_event_string) {
return;
}
}
manager_event(ev->event_flags, ev->manager_event, "%s%s",
ast_str_buffer(channel_event_string),
ev->extra_fields);
}
}
static int userevent_exclusion_cb(const char *key)
{
if (!strcmp("type", key)) {
return 1;
}
if (!strcmp("eventname", key)) {
return 1;
}
return 0;
}
static void channel_user_event_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
RAII_VAR(struct ast_str *, body, NULL, ast_free);
const char *eventname;
eventname = ast_json_string_get(ast_json_object_get(obj->blob, "eventname"));
body = ast_manager_str_from_json_object(obj->blob, userevent_exclusion_cb);
channel_event_string = ast_manager_build_channel_state_string(obj->snapshot);
if (!channel_event_string || !body) {
return;
}
/*** DOCUMENTATION
<managerEventInstance>
<synopsis>A user defined event raised from the dialplan.</synopsis>
<syntax>
<channel_snapshot/>
<parameter name="UserEvent">
<para>The event name, as specified in the dialplan.</para>
</parameter>
</syntax>
<see-also>
<ref type="application">UserEvent</ref>
</see-also>
</managerEventInstance>
***/
manager_event(EVENT_FLAG_USER, "UserEvent",
"%s"
"UserEvent: %s\r\n"
"%s",
ast_str_buffer(channel_event_string), eventname, ast_str_buffer(body));
}
static void publish_basic_channel_event(const char *event, int class, struct ast_channel_snapshot *snapshot)
{
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
channel_event_string = ast_manager_build_channel_state_string(snapshot);
if (!channel_event_string) {
return;
}
manager_event(class, event,
"%s",
ast_str_buffer(channel_event_string));
}
static void channel_hangup_request_cb(void *data,
struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, extra, NULL, ast_free);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
struct ast_json *cause;
int is_soft;
char *manager_event = "HangupRequest";
extra = ast_str_create(20);
if (!extra) {
return;
}
channel_event_string = ast_manager_build_channel_state_string(obj->snapshot);
if (!channel_event_string) {
return;
}
cause = ast_json_object_get(obj->blob, "cause");
if (cause) {
ast_str_append(&extra, 0,
"Cause: %jd\r\n",
ast_json_integer_get(cause));
}
is_soft = ast_json_is_true(ast_json_object_get(obj->blob, "soft"));
if (is_soft) {
manager_event = "SoftHangupRequest";
}
manager_event(EVENT_FLAG_CALL, manager_event,
"%s%s",
ast_str_buffer(channel_event_string),
ast_str_buffer(extra));
}
static void channel_chanspy_stop_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, spyer_channel_string, NULL, ast_free);
struct ast_channel_snapshot *spyer;
struct ast_multi_channel_blob *payload = stasis_message_data(message);
spyer = ast_multi_channel_blob_get_channel(payload, "spyer_channel");
if (!spyer) {
ast_log(AST_LOG_WARNING, "Received ChanSpy Start event with no spyer channel!\n");
return;
}
spyer_channel_string = ast_manager_build_channel_state_string_prefix(spyer, "Spyer");
if (!spyer_channel_string) {
return;
}
manager_event(EVENT_FLAG_CALL, "ChanSpyStop",
"%s",
ast_str_buffer(spyer_channel_string));
}
static void channel_chanspy_start_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, spyer_channel_string, NULL, ast_free);
RAII_VAR(struct ast_str *, spyee_channel_string, NULL, ast_free);
struct ast_channel_snapshot *spyer;
struct ast_channel_snapshot *spyee;
struct ast_multi_channel_blob *payload = stasis_message_data(message);
spyer = ast_multi_channel_blob_get_channel(payload, "spyer_channel");
if (!spyer) {
ast_log(AST_LOG_WARNING, "Received ChanSpy Start event with no spyer channel!\n");
return;
}
spyee = ast_multi_channel_blob_get_channel(payload, "spyee_channel");
if (!spyee) {
ast_log(AST_LOG_WARNING, "Received ChanSpy Start event with no spyee channel!\n");
return;
}
spyer_channel_string = ast_manager_build_channel_state_string_prefix(spyer, "Spyer");
if (!spyer_channel_string) {
return;
}
spyee_channel_string = ast_manager_build_channel_state_string_prefix(spyee, "Spyee");
if (!spyee_channel_string) {
return;
}
manager_event(EVENT_FLAG_CALL, "ChanSpyStart",
"%s%s",
ast_str_buffer(spyer_channel_string),
ast_str_buffer(spyee_channel_string));
}
static void channel_dtmf_begin_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
const char *digit =
ast_json_string_get(ast_json_object_get(obj->blob, "digit"));
const char *direction =
ast_json_string_get(ast_json_object_get(obj->blob, "direction"));
channel_event_string = ast_manager_build_channel_state_string(obj->snapshot);
if (!channel_event_string) {
return;
}
/*** DOCUMENTATION
<managerEventInstance>
<synopsis>Raised when a DTMF digit has started on a channel.</synopsis>
<syntax>
<channel_snapshot/>
<parameter name="Digit">
<para>DTMF digit received or transmitted (0-9, A-E, # or *</para>
</parameter>
<parameter name="Direction">
<enumlist>
<enum name="Received"/>
<enum name="Sent"/>
</enumlist>
</parameter>
</syntax>
</managerEventInstance>
***/
manager_event(EVENT_FLAG_DTMF, "DTMFBegin",
"%s"
"Digit: %s\r\n"
"Direction: %s\r\n",
ast_str_buffer(channel_event_string),
digit, direction);
}
static void channel_dtmf_end_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
const char *digit =
ast_json_string_get(ast_json_object_get(obj->blob, "digit"));
const char *direction =
ast_json_string_get(ast_json_object_get(obj->blob, "direction"));
long duration_ms =
ast_json_integer_get(ast_json_object_get(obj->blob, "duration_ms"));
channel_event_string = ast_manager_build_channel_state_string(obj->snapshot);
if (!channel_event_string) {
return;
}
/*** DOCUMENTATION
<managerEventInstance>
<synopsis>Raised when a DTMF digit has ended on a channel.</synopsis>
<syntax>
<channel_snapshot/>
<parameter name="Digit">
<para>DTMF digit received or transmitted (0-9, A-E, # or *</para>
</parameter>
<parameter name="DurationMs">
<para>Duration (in milliseconds) DTMF was sent/received</para>
</parameter>
<parameter name="Direction">
<enumlist>
<enum name="Received"/>
<enum name="Sent"/>
</enumlist>
</parameter>
</syntax>
</managerEventInstance>
***/
manager_event(EVENT_FLAG_DTMF, "DTMFEnd",
"%s"
"Digit: %s\r\n"
"DurationMs: %ld\r\n"
"Direction: %s\r\n",
ast_str_buffer(channel_event_string),
digit, duration_ms, direction);
}
static void channel_hangup_handler_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
struct ast_channel_blob *payload = stasis_message_data(message);
const char *action = ast_json_string_get(ast_json_object_get(payload->blob, "type"));
const char *handler = ast_json_string_get(ast_json_object_get(payload->blob, "handler"));
const char *event;
channel_event_string = ast_manager_build_channel_state_string(payload->snapshot);
if (!channel_event_string) {
return;
}
if (!strcmp(action, "type")) {
event = "HangupHandlerRun";
} else if (!strcmp(action, "type")) {
event = "HangupHandlerPop";
} else if (!strcmp(action, "type")) {
event = "HangupHandlerPush";
} else {
return;
}
manager_event(EVENT_FLAG_DIALPLAN, event,
"%s"
"Handler: %s\r\n",
ast_str_buffer(channel_event_string),
handler);
}
static void channel_fax_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
RAII_VAR(struct ast_str *, event_buffer, ast_str_create(256), ast_free);
struct ast_channel_blob *payload = stasis_message_data(message);
const char *type = ast_json_string_get(ast_json_object_get(payload->blob, "type"));
struct ast_json *operation = ast_json_object_get(payload->blob, "operation");
struct ast_json *status = ast_json_object_get(payload->blob, "status");
struct ast_json *local_station_id = ast_json_object_get(payload->blob, "local_station_id");
struct ast_json *remote_station_id = ast_json_object_get(payload->blob, "remote_station_id");
struct ast_json *fax_pages = ast_json_object_get(payload->blob, "fax_pages");
struct ast_json *fax_resolution = ast_json_object_get(payload->blob, "fax_resolution");
struct ast_json *fax_bitrate = ast_json_object_get(payload->blob, "fax_bitrate");
struct ast_json *filenames = ast_json_object_get(payload->blob, "filenames");
const char *event;
size_t array_len;
size_t i;
if (!event_buffer) {
return;
}
channel_event_string = ast_manager_build_channel_state_string(payload->snapshot);
if (!channel_event_string) {
return;
}
if (!strcmp(type, "status")) {
event = "FAXStatus";
} else if (!strcmp(type, "receive")) {
event = "ReceiveFAX";
} else if (!strcmp(type, "send")) {
event = "SendFAX";
} else {
return;
}
if (operation) {
ast_str_append(&event_buffer, 0, "Operation: %s\r\n", ast_json_string_get(operation));
}
if (status) {
ast_str_append(&event_buffer, 0, "Status: %s\r\n", ast_json_string_get(status));
}
if (local_station_id) {
ast_str_append(&event_buffer, 0, "LocalStationID: %s\r\n", ast_json_string_get(local_station_id));
}
if (remote_station_id) {
ast_str_append(&event_buffer, 0, "RemoteStationID: %s\r\n", ast_json_string_get(remote_station_id));
}
if (fax_pages) {
ast_str_append(&event_buffer, 0, "PagesTransferred: %s\r\n", ast_json_string_get(fax_pages));
}
if (fax_resolution) {
ast_str_append(&event_buffer, 0, "Resolution: %s\r\n", ast_json_string_get(fax_resolution));
}
if (fax_bitrate) {
ast_str_append(&event_buffer, 0, "TransferRate: %s\r\n", ast_json_string_get(fax_bitrate));
}
if (filenames) {
array_len = ast_json_array_size(filenames);
for (i = 0; i < array_len; i++) {
ast_str_append(&event_buffer, 0, "FileName: %s\r\n", ast_json_string_get(ast_json_array_get(filenames, i)));
}
}
manager_event(EVENT_FLAG_CALL, event,
"%s"
"%s",
ast_str_buffer(channel_event_string),
ast_str_buffer(event_buffer));
}
static void channel_moh_start_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
struct ast_json *blob = payload->blob;
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
channel_event_string = ast_manager_build_channel_state_string(payload->snapshot);
if (!channel_event_string) {
return;
}
manager_event(EVENT_FLAG_CALL, "MusicOnHoldStart",
"%s"
"Class: %s\r\n",
ast_str_buffer(channel_event_string),
ast_json_string_get(ast_json_object_get(blob, "class")));
}
static void channel_moh_stop_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
publish_basic_channel_event("MusicOnHoldStop", EVENT_FLAG_CALL, payload->snapshot);
}
static void channel_monitor_start_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
publish_basic_channel_event("MonitorStart", EVENT_FLAG_CALL, payload->snapshot);
}
static void channel_monitor_stop_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
publish_basic_channel_event("MonitorStop", EVENT_FLAG_CALL, payload->snapshot);
}
/*!
* \brief Callback processing messages for channel dialing
*/
static void channel_dial_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_multi_channel_blob *obj = stasis_message_data(message);
const char *dialstatus;
const char *dialstring;
struct ast_channel_snapshot *caller;
struct ast_channel_snapshot *peer;
RAII_VAR(struct ast_str *, caller_event_string, NULL, ast_free);
RAII_VAR(struct ast_str *, peer_event_string, NULL, ast_free);
caller = ast_multi_channel_blob_get_channel(obj, "caller");
peer = ast_multi_channel_blob_get_channel(obj, "peer");
/* Peer is required - otherwise, who are we dialing? */
ast_assert(peer != NULL);
peer_event_string = ast_manager_build_channel_state_string_prefix(peer, "Dest");
if (!peer_event_string) {
return;
}
if (caller && !(caller_event_string = ast_manager_build_channel_state_string(caller))) {
return;
}
dialstatus = ast_json_string_get(ast_json_object_get(ast_multi_channel_blob_get_json(obj), "dialstatus"));
dialstring = ast_json_string_get(ast_json_object_get(ast_multi_channel_blob_get_json(obj), "dialstring"));
if (ast_strlen_zero(dialstatus)) {
manager_event(EVENT_FLAG_CALL, "DialBegin",
"%s"
"%s"
"DialString: %s\r\n",
caller_event_string ? ast_str_buffer(caller_event_string) : "",
ast_str_buffer(peer_event_string),
S_OR(dialstring, "unknown"));
} else {
manager_event(EVENT_FLAG_CALL, "DialEnd",
"%s"
"%s"
"DialStatus: %s\r\n",
caller_event_string ? ast_str_buffer(caller_event_string) : "",
ast_str_buffer(peer_event_string),
S_OR(dialstatus, "unknown"));
}
}
static void channel_hold_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
const char *musicclass;
RAII_VAR(struct ast_str *, musicclass_string, NULL, ast_free);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
if (!(musicclass_string = ast_str_create(32))) {
return;
}
channel_event_string = ast_manager_build_channel_state_string(obj->snapshot);
if (!channel_event_string) {
return;
}
if (obj->blob) {
musicclass = ast_json_string_get(ast_json_object_get(obj->blob, "musicclass"));
if (!ast_strlen_zero(musicclass)) {
ast_str_set(&musicclass_string, 0, "MusicClass: %s\r\n", musicclass);
}
}
manager_event(EVENT_FLAG_CALL, "Hold",
"%s"
"%s",
ast_str_buffer(channel_event_string),
ast_str_buffer(musicclass_string));
}
static void channel_unhold_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
channel_event_string = ast_manager_build_channel_state_string(obj->snapshot);
if (!channel_event_string) {
return;
}
manager_event(EVENT_FLAG_CALL, "Unhold",
"%s",
ast_str_buffer(channel_event_string));
}
static void manager_channels_shutdown(void)
{
stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
int manager_channels_init(void)
{
int ret = 0;
struct stasis_topic *manager_topic;
struct stasis_topic *channel_topic;
struct stasis_message_router *message_router;
manager_topic = ast_manager_get_topic();
if (!manager_topic) {
return -1;
}
message_router = ast_manager_get_message_router();
if (!message_router) {
return -1;
}
channel_topic = ast_channel_topic_all_cached();
if (!channel_topic) {
return -1;
}
topic_forwarder = stasis_forward_all(channel_topic, manager_topic);
if (!topic_forwarder) {
return -1;
}
ast_register_atexit(manager_channels_shutdown);
ret |= stasis_message_router_add_cache_update(message_router,
ast_channel_snapshot_type(), channel_snapshot_update, NULL);
ret |= stasis_message_router_add(message_router,
ast_channel_user_event_type(), channel_user_event_cb, NULL);
ret |= stasis_message_router_add(message_router,
ast_channel_dtmf_begin_type(), channel_dtmf_begin_cb, NULL);
ret |= stasis_message_router_add(message_router,
ast_channel_dtmf_end_type(), channel_dtmf_end_cb, NULL);
ret |= stasis_message_router_add(message_router,
ast_channel_hangup_request_type(), channel_hangup_request_cb,
NULL);
ret |= stasis_message_router_add(message_router,
ast_channel_dial_type(), channel_dial_cb, NULL);
ret |= stasis_message_router_add(message_router,
ast_channel_hold_type(), channel_hold_cb, NULL);
ret |= stasis_message_router_add(message_router,
ast_channel_unhold_type(), channel_unhold_cb, NULL);
ret |= stasis_message_router_add(message_router,
ast_channel_fax_type(), channel_fax_cb, NULL);
ret |= stasis_message_router_add(message_router,
ast_channel_chanspy_start_type(), channel_chanspy_start_cb,
NULL);
ret |= stasis_message_router_add(message_router,
ast_channel_chanspy_stop_type(), channel_chanspy_stop_cb, NULL);
ret |= stasis_message_router_add(message_router,
ast_channel_hangup_handler_type(), channel_hangup_handler_cb,
NULL);
ret |= stasis_message_router_add(message_router,
ast_channel_moh_start_type(), channel_moh_start_cb, NULL);
ret |= stasis_message_router_add(message_router,
ast_channel_moh_stop_type(), channel_moh_stop_cb, NULL);
ret |= stasis_message_router_add(message_router,
ast_channel_monitor_start_type(), channel_monitor_start_cb,
NULL);
ret |= stasis_message_router_add(message_router,
ast_channel_monitor_stop_type(), channel_monitor_stop_cb, NULL);
/* If somehow we failed to add any routes, just shut down the whole
* thing and fail it.
*/
if (ret) {
manager_channels_shutdown();
return -1;
}
return 0;
}