Stasis/messaging: tech subscriptions conflict with endpoint subscriptions.

When both a tech subscription and an endpoint subscription exist for a given
endpoint, TextMessageReceived events are dispatched to the tech subscription
only.

ASTERISK-29229

Change-Id: I9eac4cba5f9e27285a282509395347abc58fc2b8
This commit is contained in:
Jean Aunis 2020-12-30 14:56:47 +01:00 committed by George Joseph
parent c3fad2fd01
commit c10557c401
1 changed files with 36 additions and 22 deletions

View File

@ -289,18 +289,42 @@ static struct ast_json *msg_to_json(struct ast_msg *msg)
return json_obj;
}
static void dispatch_message(struct message_subscription *sub, const char *endpoint_name, struct ast_json *json_msg)
{
int i;
ast_debug(3, "Dispatching message to subscription %s for endpoint %s\n",
sub->token,
endpoint_name);
for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i);
tuple->callback(endpoint_name, json_msg, tuple->pvt);
}
}
static int handle_msg_cb(struct ast_msg *msg)
{
/* We have at most 3 subscriptions: TECH_WILDCARD, tech itself, and endpoint. */
struct message_subscription *matching_subscriptions[3];
struct message_subscription *sub;
int i;
int i, j;
int result;
char buf[256];
const char *endpoint_name;
struct ast_json *json_msg;
msg_to_endpoint(msg, buf, sizeof(buf));
endpoint_name = buf;
json_msg = msg_to_json(msg);
if (!json_msg) {
return -1;
}
result = -1;
/* Find subscriptions to TECH_WILDCARD and to the endpoint's technology. */
ast_rwlock_rdlock(&tech_subscriptions_lock);
for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
for (i = 0, j = 0; i < AST_VECTOR_SIZE(&tech_subscriptions) && j < 2; i++) {
sub = AST_VECTOR_GET(&tech_subscriptions, i);
if (!sub) {
@ -309,40 +333,30 @@ static int handle_msg_cb(struct ast_msg *msg)
if (!strcmp(sub->token, TECH_WILDCARD)
|| !strncasecmp(sub->token, buf, strlen(sub->token))) {
ast_rwlock_unlock(&tech_subscriptions_lock);
ao2_bump(sub);
endpoint_name = buf;
goto match;
ao2_ref(sub, +1);
matching_subscriptions[j++] = sub;
}
}
ast_rwlock_unlock(&tech_subscriptions_lock);
/* Find the subscription to this particular endpoint. */
sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
if (sub) {
endpoint_name = buf;
goto match;
matching_subscriptions[j++] = sub;
}
return -1;
/* Dispatch the message to all matching subscriptions. */
for (i = 0; i < j; i++) {
sub = matching_subscriptions[i];
match:
ast_debug(3, "Dispatching message for %s\n", endpoint_name);
dispatch_message(sub, endpoint_name, json_msg);
json_msg = msg_to_json(msg);
if (!json_msg) {
ao2_ref(sub, -1);
return -1;
}
for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i);
tuple->callback(endpoint_name, json_msg, tuple->pvt);
result = 0;
}
ast_json_unref(json_msg);
ao2_ref(sub, -1);
return 0;
return result;
}
struct ast_msg_handler ari_msg_handler = {