/* * Copyright (C) 2019 by Sukchan Lee * * This file is part of Open5GS. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "ogs-core.h" #undef OGS_LOG_DOMAIN #define OGS_LOG_DOMAIN __ogs_event_domain typedef struct ogs_queue_s { void **data; unsigned int nelts; /**< # elements */ unsigned int in; /**< next empty location */ unsigned int out; /**< next filled location */ unsigned int bounds;/**< max size of queue */ unsigned int full_waiters; unsigned int empty_waiters; ogs_thread_mutex_t one_big_mutex; ogs_thread_cond_t not_empty; ogs_thread_cond_t not_full; int terminated; } ogs_queue_t; /** * Detects when the ogs_queue_t is full. This utility function is expected * to be called from within critical sections, and is not threadsafe. */ #define ogs_queue_full(queue) ((queue)->nelts == (queue)->bounds) /** * Detects when the ogs_queue_t is empty. This utility function is expected * to be called from within critical sections, and is not threadsafe. */ #define ogs_queue_empty(queue) ((queue)->nelts == 0) /** * Callback routine that is called to destroy this * ogs_queue_t when its pool is destroyed. */ ogs_queue_t *ogs_queue_create(unsigned int capacity) { ogs_queue_t *queue = ogs_calloc(1, sizeof *queue); ogs_assert(queue); ogs_thread_mutex_init(&queue->one_big_mutex); ogs_thread_cond_init(&queue->not_empty); ogs_thread_cond_init(&queue->not_full); queue->data = ogs_calloc(1, capacity * sizeof(void*)); queue->bounds = capacity; queue->nelts = 0; queue->in = 0; queue->out = 0; queue->terminated = 0; queue->full_waiters = 0; queue->empty_waiters = 0; return queue; } void ogs_queue_destroy(ogs_queue_t *queue) { ogs_assert(queue); ogs_free(queue->data); ogs_thread_cond_destroy(&queue->not_empty); ogs_thread_cond_destroy(&queue->not_full); ogs_thread_mutex_destroy(&queue->one_big_mutex); ogs_free(queue); } static int queue_push(ogs_queue_t *queue, void *data, ogs_time_t timeout) { int rv; if (queue->terminated) { return OGS_DONE; /* no more elements ever again */ } ogs_thread_mutex_lock(&queue->one_big_mutex); if (ogs_queue_full(queue)) { if (!timeout) { ogs_thread_mutex_unlock(&queue->one_big_mutex); return OGS_RETRY; } if (!queue->terminated) { queue->full_waiters++; if (timeout > 0) { rv = ogs_thread_cond_timedwait(&queue->not_full, &queue->one_big_mutex, timeout); } else { rv = ogs_thread_cond_wait(&queue->not_full, &queue->one_big_mutex); } queue->full_waiters--; if (rv != OGS_OK) { ogs_thread_mutex_unlock(&queue->one_big_mutex); return rv; } } /* If we wake up and it's still empty, then we were interrupted */ if (ogs_queue_full(queue)) { ogs_warn("queue full (intr)"); ogs_thread_mutex_unlock(&queue->one_big_mutex); if (queue->terminated) { return OGS_DONE; /* no more elements ever again */ } else { return OGS_ERROR; } } } queue->data[queue->in] = data; queue->in++; if (queue->in >= queue->bounds) queue->in -= queue->bounds; queue->nelts++; if (queue->empty_waiters) { ogs_trace("signal !empty"); ogs_thread_cond_signal(&queue->not_empty); } ogs_thread_mutex_unlock(&queue->one_big_mutex); return OGS_OK; } int ogs_queue_push(ogs_queue_t *queue, void *data) { return queue_push(queue, data, OGS_INFINITE_TIME); } /** * Push new data onto the queue. If the queue is full, return OGS_RETRY. If * the push operation completes successfully, it signals other threads * waiting in ogs_queue_pop() that they may continue consuming sockets. */ int ogs_queue_trypush(ogs_queue_t *queue, void *data) { return queue_push(queue, data, 0); } int ogs_queue_timedpush(ogs_queue_t *queue, void *data, ogs_time_t timeout) { return queue_push(queue, data, timeout); } /** * not thread safe */ unsigned int ogs_queue_size(ogs_queue_t *queue) { return queue->nelts; } /** * Retrieves the next item from the queue. If there are no * items available, it will either return OGS_RETRY (timeout = 0), * or block until one becomes available (infinitely with timeout < 0, * otherwise until the given timeout expires). Once retrieved, the * item is placed into the address specified by 'data'. */ static int queue_pop(ogs_queue_t *queue, void **data, ogs_time_t timeout) { int rv; if (queue->terminated) { return OGS_DONE; /* no more elements ever again */ } ogs_thread_mutex_lock(&queue->one_big_mutex); /* Keep waiting until we wake up and find that the queue is not empty. */ if (ogs_queue_empty(queue)) { if (!timeout) { ogs_thread_mutex_unlock(&queue->one_big_mutex); return OGS_RETRY; } if (!queue->terminated) { queue->empty_waiters++; if (timeout > 0) { rv = ogs_thread_cond_timedwait(&queue->not_empty, &queue->one_big_mutex, timeout); } else { rv = ogs_thread_cond_wait(&queue->not_empty, &queue->one_big_mutex); } queue->empty_waiters--; if (rv != OGS_OK) { ogs_thread_mutex_unlock(&queue->one_big_mutex); return rv; } } /* If we wake up and it's still empty, then we were interrupted */ if (ogs_queue_empty(queue)) { ogs_warn("queue empty (intr)"); ogs_thread_mutex_unlock(&queue->one_big_mutex); if (queue->terminated) { return OGS_DONE; /* no more elements ever again */ } else { return OGS_ERROR; } } } *data = queue->data[queue->out]; queue->nelts--; queue->out++; if (queue->out >= queue->bounds) queue->out -= queue->bounds; if (queue->full_waiters) { ogs_trace("signal !full"); ogs_thread_cond_signal(&queue->not_full); } ogs_thread_mutex_unlock(&queue->one_big_mutex); return OGS_OK; } int ogs_queue_pop(ogs_queue_t *queue, void **data) { return queue_pop(queue, data, OGS_INFINITE_TIME); } int ogs_queue_trypop(ogs_queue_t *queue, void **data) { return queue_pop(queue, data, 0); } int ogs_queue_timedpop(ogs_queue_t *queue, void **data, ogs_time_t timeout) { return queue_pop(queue, data, timeout); } int ogs_queue_interrupt_all(ogs_queue_t *queue) { ogs_debug("interrupt all"); ogs_thread_mutex_lock(&queue->one_big_mutex); ogs_thread_cond_broadcast(&queue->not_empty); ogs_thread_cond_broadcast(&queue->not_full); ogs_thread_mutex_unlock(&queue->one_big_mutex); return OGS_OK; } int ogs_queue_term(ogs_queue_t *queue) { ogs_thread_mutex_lock(&queue->one_big_mutex); /* we must hold one_big_mutex when setting this... otherwise, * we could end up setting it and waking everybody up just after a * would-be popper checks it but right before they block */ queue->terminated = 1; ogs_thread_mutex_unlock(&queue->one_big_mutex); return ogs_queue_interrupt_all(queue); }