Merge "stasis_cache: Prune stasis_subscription_change messages"

This commit is contained in:
Joshua Colp 2018-09-07 05:40:36 -05:00 committed by Gerrit Code Review
commit c1942e1cca
1 changed files with 49 additions and 2 deletions

View File

@ -48,6 +48,7 @@ struct stasis_cache {
snapshot_get_id id_fn;
cache_aggregate_calc_fn aggregate_calc_fn;
cache_aggregate_publish_fn aggregate_publish_fn;
int registered;
};
/*! \internal */
@ -69,6 +70,8 @@ static void stasis_caching_topic_dtor(void *obj)
* be bad. */
ast_assert(stasis_subscription_is_done(caching_topic->sub));
ao2_container_unregister(stasis_topic_name(caching_topic->topic));
ao2_cleanup(caching_topic->sub);
caching_topic->sub = NULL;
ao2_cleanup(caching_topic->cache);
@ -813,7 +816,31 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
}
msg_type = stasis_message_type(message);
if (stasis_cache_clear_type() == msg_type) {
if (stasis_subscription_change_type() == msg_type) {
struct stasis_subscription_change *change = stasis_message_data(message);
/*
* If this change type is an unsubscribe, we need to find the original
* subscribe and remove it from the cache otherwise the cache will
* continue to grow unabated.
*/
if (strcmp(change->description, "Unsubscribe") == 0) {
struct stasis_cache_entry *sub;
ao2_wrlock(caching_topic->cache->entries);
sub = cache_find(caching_topic->cache->entries, stasis_subscription_change_type(), change->uniqueid);
if (sub) {
cache_remove(caching_topic->cache->entries, sub, stasis_message_eid(message));
ao2_cleanup(sub);
}
ao2_unlock(caching_topic->cache->entries);
ao2_cleanup(caching_topic_needs_unref);
return;
}
msg_put = message;
msg = message;
} else if (stasis_cache_clear_type() == msg_type) {
/* Cache clear event. */
msg_put = NULL;
msg = stasis_message_data(message);
@ -866,6 +893,17 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
ao2_cleanup(caching_topic_needs_unref);
}
static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
{
struct stasis_cache_entry *entry = v_obj;
if (!entry) {
return;
}
prnt(where, "Type: %s ID: %s Hash: %u", stasis_message_type_name(entry->key.type),
entry->key.id, entry->key.hash);
}
struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
{
struct stasis_caching_topic *caching_topic;
@ -886,15 +924,24 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or
}
caching_topic->topic = stasis_topic_create(new_name);
ast_free(new_name);
if (caching_topic->topic == NULL) {
ao2_ref(caching_topic, -1);
ast_free(new_name);
return NULL;
}
ao2_ref(cache, +1);
caching_topic->cache = cache;
if (!cache->registered) {
if (ao2_container_register(new_name, cache->entries, print_cache_entry)) {
ast_log(LOG_ERROR, "Stasis cache container '%p' for '%s' did not register\n",
cache->entries, new_name);
} else {
cache->registered = 1;
}
}
ast_free(new_name);
caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0);
if (caching_topic->sub == NULL) {