asterisk/res/res_pjsip/pjsip_scheduler.c
George Joseph 4bdf5d329f res_pjsip_pubsub: Correctly implement persisted subscriptions
This patch fixes 2 original issues and more that those 2 exposed.

* When we send a NOTIFY, and the client either doesn't respond or
  responds with a non OK, pjproject only calls our
  pubsub_on_evsub_state callback, no others.  Since
  pubsub_on_evsub_state (which does the sub_tree cleanup) does not
  expect to be called back without the other callbacks being called
  first, it just returns leaving the sub_tree orphaned.  Now
  pubsub_on_evsub_state checks the event for PJSIP_EVENT_TSX_STATE
  which is what pjproject will set to tell us that it was the
  transaction that timed out or failed and not the subscription
  itself timing our or being terminated by the client. If is
  TSX_STATE, pubsub_on_evsub_state now does the proper cleanup
  regardless of the state of the subscription.

* When a client renews a subscription, we don't update the
  persisted subscription with the new expires timestamp.  This causes
  subscription_persistence_recreate to prune the subscription if/when
  asterisk restarts.  Now, pubsub_on_rx_refresh calls
  subscription_persistence_update to apply the new expires timestamp.
  This exposed other issues however...

* When creating a dialog from rdata (which sub_persistence_recreate
  does from the packet buffer) there must NOT be a tag on the To
  header (which there will be when a client refreshes a
  subscription).  If there is one, pjsip_dlg_create_uas will fail.
  To address this, subscription_persistence_update now accepts a flag
  that indicates that the original packet buffer must not be updated.
  New subscribes don't set the flag and renews do.  This makes sure
  that when the rdata is recreated on asterisk startup, it's done
  from the original subscribe packet which won't have the tag on To.

* When creating a dialog from rdata, we were setting the dialog's
  remote (SUBSCRIBE) cseq to be the same as the local (NOTIFY) cseq.
  When the client tried to resubscribe after a restart with the
  correct cseq, we'd reject the request with an Invalid CSeq error.

* The acts of creating a dialog and evsub by themselves when
  recreating a subscription does NOT restart pjproject's subscription
  timer.  The result was that even if we did correctly recreate the
  subscription, we never removed it if the client happened to go away
  or send a non-OK response to a NOTIFY.  However, there is no
  pjproject function exposed to just set the timer on an evsub that
  wasn't created by an incoming subscribe request.  To address this,
  we create our own timer using ast_sip_schedule_task.  This timer is
  used only for re-establishing subscriptions after a restart.

  An earlier approach was to add support for setting pjproject's
  timer (via a pjproject patch) and while that patch is still included
  here, we don't use that call at the moment.

While addressing these issues, additional debugging was added and
some existing messages made more useful.  A few formatting changes
were also made to 'pjsip show scheduled tasks' to make displaying
the subscription timers a little more friendly.

ASTERISK-26696
ASTERISK-26756

Change-Id: I8c605fc1e3923f466a74db087d5ab6f90abce68e
2017-02-15 13:11:46 -06:00

490 lines
12 KiB
C

/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2016, Fairview 5 Engineering, LLC
*
* George Joseph <george.joseph@fairview5.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*! \file
*
* \brief res_pjsip Scheduler
*
* \author George Joseph <george.joseph@fairview5.com>
*/
#include "asterisk.h"
#include "asterisk/res_pjsip.h"
#include "include/res_pjsip_private.h"
#include "asterisk/res_pjsip_cli.h"
#define TASK_BUCKETS 53
static struct ast_sched_context *scheduler_context;
static struct ao2_container *tasks;
static int task_count;
struct ast_sip_sched_task {
/*! ast_sip_sched task id */
uint32_t task_id;
/*! ast_sched scheudler id */
int current_scheduler_id;
/*! task is currently running */
int is_running;
/*! task */
ast_sip_task task;
/*! task data */
void *task_data;
/*! reschedule interval in milliseconds */
int interval;
/*! the time the task was queued */
struct timeval when_queued;
/*! the last time the task was started */
struct timeval last_start;
/*! the last time the task was ended */
struct timeval last_end;
/*! times run */
int run_count;
/*! the task reschedule, cleanup and policy flags */
enum ast_sip_scheduler_task_flags flags;
/*! the serializer to be used (if any) */
struct ast_taskprocessor *serializer;
/* A name to be associated with the task */
char name[0];
};
AO2_STRING_FIELD_HASH_FN(ast_sip_sched_task, name);
AO2_STRING_FIELD_CMP_FN(ast_sip_sched_task, name);
AO2_STRING_FIELD_SORT_FN(ast_sip_sched_task, name);
static int push_to_serializer(const void *data);
/*
* This function is run in the context of the serializer.
* It runs the task with a simple call and reschedules based on the result.
*/
static int run_task(void *data)
{
RAII_VAR(struct ast_sip_sched_task *, schtd, ao2_bump(data), ao2_cleanup);
int res;
int delay;
ao2_lock(schtd);
schtd->last_start = ast_tvnow();
schtd->is_running = 1;
schtd->run_count++;
ao2_unlock(schtd);
res = schtd->task(schtd->task_data);
ao2_lock(schtd);
schtd->is_running = 0;
schtd->last_end = ast_tvnow();
/*
* Don't restart if the task returned 0 or if the interval
* was set to 0 while the task was running
*/
if (!res || !schtd->interval) {
schtd->interval = 0;
ao2_unlock(schtd);
ao2_unlink(tasks, schtd);
return -1;
}
if (schtd->flags & AST_SIP_SCHED_TASK_VARIABLE) {
schtd->interval = res;
}
if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
delay = schtd->interval;
} else {
delay = schtd->interval - (ast_tvdiff_ms(schtd->last_end, schtd->last_start) % schtd->interval);
}
schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, (const void *)schtd);
if (schtd->current_scheduler_id < 0) {
schtd->interval = 0;
ao2_unlock(schtd);
ao2_unlink(tasks, schtd);
return -1;
}
ao2_unlock(schtd);
return 0;
}
/*
* This function is run by the scheduler thread. Its only job is to push the task
* to the serialize and return. It returns 0 so it's not rescheduled.
*/
static int push_to_serializer(const void *data)
{
struct ast_sip_sched_task *schtd = (struct ast_sip_sched_task *)data;
if (ast_sip_push_task(schtd->serializer, run_task, schtd)) {
ao2_ref(schtd, -1);
}
return 0;
}
int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd)
{
int res;
if (!ao2_ref_and_lock(schtd)) {
return -1;
}
if (schtd->current_scheduler_id < 0 || schtd->interval <= 0) {
ao2_unlock_and_unref(schtd);
return 0;
}
schtd->interval = 0;
ao2_unlock_and_unref(schtd);
ao2_unlink(tasks, schtd);
res = ast_sched_del(scheduler_context, schtd->current_scheduler_id);
return res;
}
int ast_sip_sched_task_cancel_by_name(const char *name)
{
RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
if (ast_strlen_zero(name)) {
return -1;
}
schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!schtd) {
return -1;
}
return ast_sip_sched_task_cancel(schtd);
}
int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd,
struct timeval *queued, struct timeval *last_start, struct timeval *last_end)
{
if (!ao2_ref_and_lock(schtd)) {
return -1;
}
if (queued) {
memcpy(queued, &schtd->when_queued, sizeof(struct timeval));
}
if (last_start) {
memcpy(last_start, &schtd->last_start, sizeof(struct timeval));
}
if (last_end) {
memcpy(last_end, &schtd->last_end, sizeof(struct timeval));
}
ao2_unlock_and_unref(schtd);
return 0;
}
int ast_sip_sched_task_get_times_by_name(const char *name,
struct timeval *queued, struct timeval *last_start, struct timeval *last_end)
{
RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
if (ast_strlen_zero(name)) {
return -1;
}
schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!schtd) {
return -1;
}
return ast_sip_sched_task_get_times(schtd, queued, last_start, last_end);
}
int ast_sip_sched_task_get_name(struct ast_sip_sched_task *schtd, char *name, size_t maxlen)
{
if (maxlen <= 0) {
return -1;
}
if (!ao2_ref_and_lock(schtd)) {
return -1;
}
ast_copy_string(name, schtd->name, maxlen);
ao2_unlock_and_unref(schtd);
return 0;
}
int ast_sip_sched_task_get_next_run(struct ast_sip_sched_task *schtd)
{
int delay;
struct timeval since_when;
struct timeval now;
if (!ao2_ref_and_lock(schtd)) {
return -1;
}
if (schtd->interval) {
delay = schtd->interval;
now = ast_tvnow();
if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
since_when = schtd->is_running ? now : schtd->last_end;
} else {
since_when = schtd->last_start.tv_sec ? schtd->last_start : schtd->when_queued;
}
delay -= ast_tvdiff_ms(now, since_when);
delay = delay < 0 ? 0 : delay;
} else {
delay = -1;
}
ao2_unlock_and_unref(schtd);
return delay;
}
int ast_sip_sched_task_get_next_run_by_name(const char *name)
{
RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
if (ast_strlen_zero(name)) {
return -1;
}
schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!schtd) {
return -1;
}
return ast_sip_sched_task_get_next_run(schtd);
}
int ast_sip_sched_is_task_running(struct ast_sip_sched_task *schtd)
{
if (!schtd) {
return 0;
}
return schtd->is_running;
}
int ast_sip_sched_is_task_running_by_name(const char *name)
{
RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
if (ast_strlen_zero(name)) {
return 0;
}
schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!schtd) {
return 0;
}
return schtd->is_running;
}
static void schtd_destructor(void *data)
{
struct ast_sip_sched_task *schtd = data;
if (schtd->flags & AST_SIP_SCHED_TASK_DATA_AO2) {
/* release our own ref, then release the callers if asked to do so */
ao2_ref(schtd->task_data, (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE) ? -2 : -1);
} else if (schtd->task_data && (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE)) {
ast_free(schtd->task_data);
}
}
struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer,
int interval, ast_sip_task sip_task, char *name, void *task_data, enum ast_sip_scheduler_task_flags flags)
{
#define ID_LEN 13 /* task_deadbeef */
struct ast_sip_sched_task *schtd;
int res;
if (interval < 0) {
return NULL;
}
schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1), schtd_destructor);
if (!schtd) {
return NULL;
}
schtd->task_id = ast_atomic_fetchadd_int(&task_count, 1);
schtd->serializer = serializer;
schtd->task = sip_task;
if (!ast_strlen_zero(name)) {
strcpy(schtd->name, name); /* Safe */
} else {
sprintf(schtd->name, "task_%08x", schtd->task_id);
}
schtd->task_data = task_data;
schtd->flags = flags;
schtd->interval = interval;
schtd->when_queued = ast_tvnow();
if (flags & AST_SIP_SCHED_TASK_DATA_AO2) {
ao2_ref(task_data, +1);
}
res = ast_sched_add(scheduler_context, interval, push_to_serializer, (const void *)schtd);
if (res < 0) {
ao2_ref(schtd, -1);
return NULL;
} else {
schtd->current_scheduler_id = res;
ao2_link(tasks, schtd);
}
return schtd;
#undef ID_LEN
}
static char *cli_show_tasks(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
struct ao2_iterator i;
struct ast_sip_sched_task *schtd;
const char *log_format = ast_logger_get_dateformat();
struct ast_tm tm;
char queued[32];
char last_start[32];
char next_start[32];
int datelen;
struct timeval now = ast_tvnow();
const char *separator = "======================================";
switch (cmd) {
case CLI_INIT:
e->command = "pjsip show scheduled_tasks";
e->usage = "Usage: pjsip show scheduled_tasks\n"
" Show all scheduled tasks\n";
return NULL;
case CLI_GENERATE:
return NULL;
}
if (a->argc != 3) {
return CLI_SHOWUSAGE;
}
ast_localtime(&now, &tm, NULL);
datelen = ast_strftime(queued, sizeof(queued), log_format, &tm);
ast_cli(a->fd, "PJSIP Scheduled Tasks:\n\n");
ast_cli(a->fd, " %1$-24s %2$-9s %3$-9s %4$-5s %6$-*5$s %7$-*5$s %8$-*5$s %9$7s\n",
"Task Name", "Interval", "Times Run", "State",
datelen, "Queued", "Last Started", "Next Start", "( secs)");
ast_cli(a->fd, " %1$-24.24s %2$-9.9s %3$-9.9s %4$-5.5s %6$-*5$.*5$s %7$-*5$.*5$s %9$-*8$.*8$s\n",
separator, separator, separator, separator,
datelen, separator, separator, datelen + 8, separator);
ao2_ref(tasks, +1);
ao2_rdlock(tasks);
i = ao2_iterator_init(tasks, 0);
while ((schtd = ao2_iterator_next(&i))) {
int next_run_sec = ast_sip_sched_task_get_next_run(schtd) / 1000;
struct timeval next = ast_tvadd(now, (struct timeval) {next_run_sec, 0});
ast_localtime(&schtd->when_queued, &tm, NULL);
ast_strftime(queued, sizeof(queued), log_format, &tm);
if (ast_tvzero(schtd->last_start)) {
strcpy(last_start, "not yet started");
} else {
ast_localtime(&schtd->last_start, &tm, NULL);
ast_strftime(last_start, sizeof(last_start), log_format, &tm);
}
ast_localtime(&next, &tm, NULL);
ast_strftime(next_start, sizeof(next_start), log_format, &tm);
ast_cli(a->fd, " %1$-24.24s %2$9.3f %3$9d %4$-5s %6$-*5$s %7$-*5$s %8$-*5$s (%9$5d)\n",
schtd->name,
schtd->interval / 1000.0,
schtd->run_count,
schtd->is_running ? "run" : "wait",
datelen, queued, last_start,
next_start,
next_run_sec);
ao2_cleanup(schtd);
}
ao2_iterator_destroy(&i);
ao2_unlock(tasks);
ao2_ref(tasks, -1);
ast_cli(a->fd, "\n");
return CLI_SUCCESS;
}
static struct ast_cli_entry cli_commands[] = {
AST_CLI_DEFINE(cli_show_tasks, "Show all scheduled tasks"),
};
int ast_sip_initialize_scheduler(void)
{
if (!(scheduler_context = ast_sched_context_create())) {
ast_log(LOG_ERROR, "Failed to create scheduler. Aborting load\n");
return -1;
}
if (ast_sched_start_thread(scheduler_context)) {
ast_log(LOG_ERROR, "Failed to start scheduler. Aborting load\n");
ast_sched_context_destroy(scheduler_context);
return -1;
}
tasks = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
TASK_BUCKETS, ast_sip_sched_task_hash_fn, ast_sip_sched_task_sort_fn, ast_sip_sched_task_cmp_fn);
if (!tasks) {
ast_log(LOG_ERROR, "Failed to allocate task container. Aborting load\n");
ast_sched_context_destroy(scheduler_context);
return -1;
}
ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
return 0;
}
int ast_sip_destroy_scheduler(void)
{
ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
if (scheduler_context) {
ast_sched_context_destroy(scheduler_context);
}
ao2_cleanup(tasks);
tasks = NULL;
return 0;
}