forked from acouzens/open5gs
1329 lines
45 KiB
C
1329 lines
45 KiB
C
/*********************************************************************************************************
|
|
* Software License Agreement (BSD License) *
|
|
* Author: Sebastien Decugis <sdecugis@freediameter.net> *
|
|
* *
|
|
* Copyright (c) 2015, WIDE Project and NICT *
|
|
* All rights reserved. *
|
|
* *
|
|
* Redistribution and use of this software in source and binary forms, with or without modification, are *
|
|
* permitted provided that the following conditions are met: *
|
|
* *
|
|
* * Redistributions of source code must retain the above *
|
|
* copyright notice, this list of conditions and the *
|
|
* following disclaimer. *
|
|
* *
|
|
* * Redistributions in binary form must reproduce the above *
|
|
* copyright notice, this list of conditions and the *
|
|
* following disclaimer in the documentation and/or other *
|
|
* materials provided with the distribution. *
|
|
* *
|
|
* * Neither the name of the WIDE Project or NICT nor the *
|
|
* names of its contributors may be used to endorse or *
|
|
* promote products derived from this software without *
|
|
* specific prior written permission of WIDE Project and *
|
|
* NICT. *
|
|
* *
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
|
|
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
|
|
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
|
|
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *
|
|
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS *
|
|
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
|
|
* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF *
|
|
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *
|
|
*********************************************************************************************************/
|
|
|
|
#include "fdcore-internal.h"
|
|
|
|
/********************************************************************************/
|
|
/* First part : handling the extensions callbacks */
|
|
/********************************************************************************/
|
|
|
|
/* Lists of the callbacks, and locks to protect them */
|
|
static pthread_rwlock_t rt_fwd_lock = PTHREAD_RWLOCK_INITIALIZER;
|
|
static struct fd_list rt_fwd_list = FD_LIST_INITIALIZER_O(rt_fwd_list, &rt_fwd_lock);
|
|
|
|
static pthread_rwlock_t rt_out_lock = PTHREAD_RWLOCK_INITIALIZER;
|
|
static struct fd_list rt_out_list = FD_LIST_INITIALIZER_O(rt_out_list, &rt_out_lock);
|
|
|
|
/* Items in the lists are the same */
|
|
struct rt_hdl {
|
|
struct fd_list chain; /* link in the rt_fwd_list or rt_out_list */
|
|
void * cbdata; /* the registered data */
|
|
union {
|
|
int order; /* This value is used to sort the list */
|
|
int dir; /* It is the direction for FWD handlers */
|
|
int prio; /* and the priority for OUT handlers */
|
|
};
|
|
union {
|
|
int (*rt_fwd_cb)(void * cbdata, struct msg ** msg);
|
|
int (*rt_out_cb)(void * cbdata, struct msg ** msg, struct fd_list * candidates);
|
|
};
|
|
};
|
|
|
|
/* Add a new entry in the list */
|
|
static int add_ordered(struct rt_hdl * new, struct fd_list * list)
|
|
{
|
|
/* The list is ordered by prio parameter */
|
|
struct fd_list * li;
|
|
|
|
CHECK_POSIX( pthread_rwlock_wrlock(list->o) );
|
|
|
|
for (li = list->next; li != list; li = li->next) {
|
|
struct rt_hdl * h = (struct rt_hdl *) li;
|
|
if (new->order <= h->order)
|
|
break;
|
|
}
|
|
|
|
fd_list_insert_before(li, &new->chain);
|
|
|
|
CHECK_POSIX( pthread_rwlock_unlock(list->o) );
|
|
|
|
return 0;
|
|
}
|
|
|
|
/* Register a new FWD callback */
|
|
int fd_rt_fwd_register ( int (*rt_fwd_cb)(void * cbdata, struct msg ** msg), void * cbdata, enum fd_rt_fwd_dir dir, struct fd_rt_fwd_hdl ** handler )
|
|
{
|
|
struct rt_hdl * new;
|
|
|
|
TRACE_ENTRY("%p %p %d %p", rt_fwd_cb, cbdata, dir, handler);
|
|
CHECK_PARAMS( rt_fwd_cb );
|
|
CHECK_PARAMS( (dir >= RT_FWD_REQ) && ( dir <= RT_FWD_ANS) );
|
|
|
|
/* Create a new container */
|
|
CHECK_MALLOC(new = malloc(sizeof(struct rt_hdl)));
|
|
memset(new, 0, sizeof(struct rt_hdl));
|
|
|
|
/* Write the content */
|
|
fd_list_init(&new->chain, NULL);
|
|
new->cbdata = cbdata;
|
|
new->dir = dir;
|
|
new->rt_fwd_cb = rt_fwd_cb;
|
|
|
|
/* Save this in the list */
|
|
CHECK_FCT( add_ordered(new, &rt_fwd_list) );
|
|
|
|
/* Give it back to the extension if needed */
|
|
if (handler)
|
|
*handler = (void *)new;
|
|
|
|
return 0;
|
|
}
|
|
|
|
/* Remove it */
|
|
int fd_rt_fwd_unregister ( struct fd_rt_fwd_hdl * handler, void ** cbdata )
|
|
{
|
|
struct rt_hdl * del;
|
|
TRACE_ENTRY( "%p %p", handler, cbdata);
|
|
CHECK_PARAMS( handler );
|
|
|
|
del = (struct rt_hdl *)handler;
|
|
CHECK_PARAMS( del->chain.head == &rt_fwd_list );
|
|
|
|
/* Unlink */
|
|
CHECK_POSIX( pthread_rwlock_wrlock(&rt_fwd_lock) );
|
|
fd_list_unlink(&del->chain);
|
|
CHECK_POSIX( pthread_rwlock_unlock(&rt_fwd_lock) );
|
|
|
|
if (cbdata)
|
|
*cbdata = del->cbdata;
|
|
|
|
free(del);
|
|
return 0;
|
|
}
|
|
|
|
/* Register a new OUT callback */
|
|
int fd_rt_out_register ( int (*rt_out_cb)(void * cbdata, struct msg ** pmsg, struct fd_list * candidates), void * cbdata, int priority, struct fd_rt_out_hdl ** handler )
|
|
{
|
|
struct rt_hdl * new;
|
|
|
|
TRACE_ENTRY("%p %p %d %p", rt_out_cb, cbdata, priority, handler);
|
|
CHECK_PARAMS( rt_out_cb );
|
|
|
|
/* Create a new container */
|
|
CHECK_MALLOC(new = malloc(sizeof(struct rt_hdl)));
|
|
memset(new, 0, sizeof(struct rt_hdl));
|
|
|
|
/* Write the content */
|
|
fd_list_init(&new->chain, NULL);
|
|
new->cbdata = cbdata;
|
|
new->prio = priority;
|
|
new->rt_out_cb = rt_out_cb;
|
|
|
|
/* Save this in the list */
|
|
CHECK_FCT( add_ordered(new, &rt_out_list) );
|
|
|
|
/* Give it back to the extension if needed */
|
|
if (handler)
|
|
*handler = (void *)new;
|
|
|
|
return 0;
|
|
}
|
|
|
|
/* Remove it */
|
|
int fd_rt_out_unregister ( struct fd_rt_out_hdl * handler, void ** cbdata )
|
|
{
|
|
struct rt_hdl * del;
|
|
TRACE_ENTRY( "%p %p", handler, cbdata);
|
|
CHECK_PARAMS( handler );
|
|
|
|
del = (struct rt_hdl *)handler;
|
|
CHECK_PARAMS( del->chain.head == &rt_out_list );
|
|
|
|
/* Unlink */
|
|
CHECK_POSIX( pthread_rwlock_wrlock(&rt_out_lock) );
|
|
fd_list_unlink(&del->chain);
|
|
CHECK_POSIX( pthread_rwlock_unlock(&rt_out_lock) );
|
|
|
|
if (cbdata)
|
|
*cbdata = del->cbdata;
|
|
|
|
free(del);
|
|
return 0;
|
|
}
|
|
|
|
/********************************************************************************/
|
|
/* Some default OUT routing callbacks */
|
|
/********************************************************************************/
|
|
|
|
/* Prevent sending to peers that do not support the message application */
|
|
static int dont_send_if_no_common_app(void * cbdata, struct msg ** pmsg, struct fd_list * candidates)
|
|
{
|
|
struct msg * msg = *pmsg;
|
|
struct fd_list * li;
|
|
struct msg_hdr * hdr;
|
|
|
|
TRACE_ENTRY("%p %p %p", cbdata, msg, candidates);
|
|
CHECK_PARAMS(msg && candidates);
|
|
|
|
CHECK_FCT( fd_msg_hdr(msg, &hdr) );
|
|
|
|
/* For Base Diameter Protocol, every peer is supposed to support it, so skip */
|
|
if (hdr->msg_appl == 0)
|
|
return 0;
|
|
|
|
/* Otherwise, check that the peers support the application */
|
|
for (li = candidates->next; li != candidates; li = li->next) {
|
|
struct rtd_candidate *c = (struct rtd_candidate *) li;
|
|
struct fd_peer * peer;
|
|
struct fd_app *found;
|
|
CHECK_FCT( fd_peer_getbyid( c->diamid, c->diamidlen, 0, (void *)&peer ) );
|
|
if (peer && !peer->p_hdr.info.runtime.pir_relay) {
|
|
/* Check if the remote peer advertised the message's appli */
|
|
CHECK_FCT( fd_app_check(&peer->p_hdr.info.runtime.pir_apps, hdr->msg_appl, &found) );
|
|
if (!found)
|
|
c->score += FD_SCORE_NO_DELIVERY;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
/* Detect if the Destination-Host and Destination-Realm match the peer */
|
|
static int score_destination_avp(void * cbdata, struct msg ** pmsg, struct fd_list * candidates)
|
|
{
|
|
struct msg * msg = *pmsg;
|
|
struct fd_list * li;
|
|
struct avp * avp;
|
|
union avp_value *dh = NULL, *dr = NULL;
|
|
|
|
TRACE_ENTRY("%p %p %p", cbdata, msg, candidates);
|
|
CHECK_PARAMS(msg && candidates);
|
|
|
|
/* Search the Destination-Host and Destination-Realm AVPs -- we could also use fd_msg_search_avp here, but this one is slightly more efficient */
|
|
CHECK_FCT( fd_msg_browse(msg, MSG_BRW_FIRST_CHILD, &avp, NULL) );
|
|
while (avp) {
|
|
struct avp_hdr * ahdr;
|
|
CHECK_FCT( fd_msg_avp_hdr( avp, &ahdr ) );
|
|
|
|
if (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) {
|
|
switch (ahdr->avp_code) {
|
|
case AC_DESTINATION_HOST:
|
|
/* Parse this AVP */
|
|
CHECK_FCT( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ) );
|
|
ASSERT( ahdr->avp_value );
|
|
dh = ahdr->avp_value;
|
|
break;
|
|
|
|
case AC_DESTINATION_REALM:
|
|
/* Parse this AVP */
|
|
CHECK_FCT( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ) );
|
|
ASSERT( ahdr->avp_value );
|
|
dr = ahdr->avp_value;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (dh && dr)
|
|
break;
|
|
|
|
/* Go to next AVP */
|
|
CHECK_FCT( fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) );
|
|
}
|
|
|
|
/* Now, check each candidate against these AVP values */
|
|
for (li = candidates->next; li != candidates; li = li->next) {
|
|
struct rtd_candidate *c = (struct rtd_candidate *) li;
|
|
|
|
#if 0 /* this is actually useless since the sending process will also ensure that the peer is still available */
|
|
struct fd_peer * peer;
|
|
/* Since the candidates list comes from the peers list, we do not have any issue with upper/lower case to find the peer object */
|
|
CHECK_FCT( fd_peer_getbyid( c->diamid, c->diamidlen, 0, (void *)&peer ) );
|
|
if (!peer)
|
|
continue; /* it has been deleted since the candidate list was generated; avoid sending to this one in that case. */
|
|
#endif /* 0 */
|
|
|
|
/* In the AVPs, the value comes from the network, so let's be case permissive */
|
|
if (dh && !fd_os_almostcasesrch(dh->os.data, dh->os.len, c->diamid, c->diamidlen, NULL) ) {
|
|
/* The candidate is the Destination-Host */
|
|
c->score += FD_SCORE_FINALDEST;
|
|
} else {
|
|
if (dr && !fd_os_almostcasesrch(dr->os.data, dr->os.len, c->realm, c->realmlen, NULL) ) {
|
|
/* The candidate's realm matchs the Destination-Realm */
|
|
c->score += FD_SCORE_REALM;
|
|
}
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
/********************************************************************************/
|
|
/* Helper functions */
|
|
/********************************************************************************/
|
|
|
|
/* Find (first) '!' and '@' positions in a UTF-8 encoded string (User-Name AVP value) */
|
|
static void nai_get_indexes(union avp_value * un, int * excl_idx, int * at_idx)
|
|
{
|
|
int i;
|
|
|
|
TRACE_ENTRY("%p %p %p", un, excl_idx, at_idx);
|
|
CHECK_PARAMS_DO( un && excl_idx && at_idx, return );
|
|
|
|
*excl_idx = 0;
|
|
*at_idx = 0;
|
|
|
|
/* Search if there is a '!' before any '@' -- do we need to check it contains a '.' ? */
|
|
for (i = 0; i < un->os.len; i++) {
|
|
/* The '!' marks the decorated NAI */
|
|
if ( un->os.data[i] == (unsigned char) '!' ) {
|
|
if (!*excl_idx)
|
|
*excl_idx = i;
|
|
continue;
|
|
}
|
|
/* If we reach the realm part, we can stop */
|
|
if ( un->os.data[i] == (unsigned char) '@' ) {
|
|
*at_idx = i;
|
|
break;
|
|
}
|
|
/* Stop if we find a \0 in the middle */
|
|
if ( un->os.data[i] == 0 ) {
|
|
return;
|
|
}
|
|
/* Skip escaped characters */
|
|
if ( un->os.data[i] == (unsigned char) '\\' ) {
|
|
i++;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
/* Test if a User-Name AVP contains a Decorated NAI -- RFC4282, RFC5729 */
|
|
/* Create new User-Name and Destination-Realm values */
|
|
static int process_decorated_NAI(int * was_nai, union avp_value * un, union avp_value * dr)
|
|
{
|
|
int at_idx, sep_idx;
|
|
unsigned char * old_un;
|
|
TRACE_ENTRY("%p %p %p", was_nai, un, dr);
|
|
CHECK_PARAMS(was_nai && un && dr);
|
|
|
|
/* Save the decorated User-Name, for example 'homerealm.example.net!user@otherrealm.example.net' */
|
|
old_un = un->os.data;
|
|
|
|
/* Search the positions of the first '!' and the '@' in the string */
|
|
nai_get_indexes(un, &sep_idx, &at_idx);
|
|
if ((!sep_idx) || (sep_idx > at_idx) || !fd_os_is_valid_DiameterIdentity(old_un, sep_idx /* this is the new realm part */)) {
|
|
*was_nai = 0;
|
|
return 0;
|
|
}
|
|
|
|
*was_nai = 1;
|
|
|
|
/* Create the new User-Name value */
|
|
CHECK_MALLOC( un->os.data = malloc( at_idx ) );
|
|
memcpy( un->os.data, old_un + sep_idx + 1, at_idx - sep_idx ); /* user@ */
|
|
memcpy( un->os.data + at_idx - sep_idx, old_un, sep_idx ); /* homerealm.example.net */
|
|
|
|
/* Create the new Destination-Realm value */
|
|
CHECK_MALLOC( dr->os.data = realloc(dr->os.data, sep_idx) );
|
|
memcpy( dr->os.data, old_un, sep_idx );
|
|
dr->os.len = sep_idx;
|
|
|
|
TRACE_DEBUG(FULL, "Processed Decorated NAI : '%.*s' became '%.*s' (%.*s)",
|
|
(int)un->os.len, old_un,
|
|
(int)at_idx, un->os.data,
|
|
(int)dr->os.len, dr->os.data);
|
|
|
|
un->os.len = at_idx;
|
|
free(old_un);
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
/* Function to return an error to an incoming request */
|
|
static int return_error(struct msg ** pmsg, char * error_code, char * error_message, struct avp * failedavp)
|
|
{
|
|
struct fd_peer * peer;
|
|
int is_loc = 0;
|
|
|
|
/* Get the source of the message */
|
|
{
|
|
DiamId_t id;
|
|
size_t idlen;
|
|
CHECK_FCT( fd_msg_source_get( *pmsg, &id, &idlen ) );
|
|
|
|
if (id == NULL) {
|
|
is_loc = 1; /* The message was issued locally */
|
|
} else {
|
|
|
|
/* Search the peer with this id */
|
|
CHECK_FCT( fd_peer_getbyid( id, idlen, 0, (void *)&peer ) );
|
|
|
|
if (!peer) {
|
|
char buf[256];
|
|
snprintf(buf, sizeof(buf), "Unable to send error '%s' to deleted peer '%s' in reply to this message.", error_code, id);
|
|
fd_hook_call(HOOK_MESSAGE_DROPPED, *pmsg, NULL, buf, fd_msg_pmdl_get(*pmsg));
|
|
fd_msg_free(*pmsg);
|
|
*pmsg = NULL;
|
|
return 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Create the error message */
|
|
CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, pmsg, MSGFL_ANSW_ERROR ) );
|
|
|
|
/* Set the error code */
|
|
CHECK_FCT( fd_msg_rescode_set(*pmsg, error_code, error_message, failedavp, 1 ) );
|
|
|
|
/* Send the answer */
|
|
if (is_loc) {
|
|
CHECK_FCT( fd_fifo_post(fd_g_incoming, pmsg) );
|
|
} else {
|
|
CHECK_FCT( fd_out_send(pmsg, NULL, peer, 1) );
|
|
}
|
|
|
|
/* Done */
|
|
return 0;
|
|
}
|
|
|
|
|
|
/****************************************************************************/
|
|
/* Second part : threads moving messages in the daemon */
|
|
/****************************************************************************/
|
|
|
|
/* The DISPATCH message processing */
|
|
static int msg_dispatch(struct msg * msg)
|
|
{
|
|
struct msg_hdr * hdr;
|
|
int is_req = 0;
|
|
struct session * sess;
|
|
enum disp_action action;
|
|
char * ec = NULL;
|
|
char * em = NULL;
|
|
struct msg *msgptr = msg, *error = NULL;
|
|
|
|
/* Read the message header */
|
|
CHECK_FCT( fd_msg_hdr(msg, &hdr) );
|
|
is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
|
|
|
|
/* Note: if the message is for local delivery, we should test for duplicate
|
|
(draft-asveren-dime-dupcons-00). This may conflict with path validation decisions, no clear answer yet */
|
|
|
|
/* At this point, we need to understand the message content, so parse it */
|
|
CHECK_FCT_DO( fd_msg_parse_or_error( &msgptr, &error ),
|
|
{
|
|
int rescue = 0;
|
|
if (__ret__ != EBADMSG) {
|
|
fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, "Error while parsing received answer", fd_msg_pmdl_get(msgptr));
|
|
fd_msg_free(msgptr);
|
|
} else {
|
|
if (!msgptr) {
|
|
fd_hook_call(HOOK_MESSAGE_PARSING_ERROR2, error, NULL, NULL, fd_msg_pmdl_get(error));
|
|
/* error now contains the answer message to send back */
|
|
CHECK_FCT( fd_fifo_post(fd_g_outgoing, &error) );
|
|
} else if (!error) {
|
|
/* We have received an invalid answer to our query */
|
|
fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, "Received answer failed the dictionary / rules parsing", fd_msg_pmdl_get(msgptr));
|
|
fd_msg_free(msgptr);
|
|
} else {
|
|
/* We will pass the invalid received error to the application */
|
|
rescue = 1;
|
|
}
|
|
}
|
|
if (!rescue)
|
|
return 0; /* We are done with this message, go to the next */
|
|
} );
|
|
|
|
/* First, if the original request was registered with a callback and we receive the answer, call it. */
|
|
if ( ! is_req ) {
|
|
struct msg * qry;
|
|
void (*anscb)(void *, struct msg **) = NULL;
|
|
void * data = NULL;
|
|
|
|
/* Retrieve the corresponding query */
|
|
CHECK_FCT( fd_msg_answ_getq( msgptr, &qry ) );
|
|
|
|
/* Retrieve any registered handler */
|
|
CHECK_FCT( fd_msg_anscb_get( qry, &anscb, NULL, &data ) );
|
|
|
|
/* If a callback was registered, pass the message to it */
|
|
if (anscb != NULL) {
|
|
|
|
TRACE_DEBUG(FULL, "Calling callback registered when query was sent (%p, %p)", anscb, data);
|
|
(*anscb)(data, &msgptr);
|
|
|
|
/* If the message is processed, we're done */
|
|
if (msgptr == NULL) {
|
|
return 0;
|
|
}
|
|
|
|
/* otherwise continue the dispatching --hoping that the anscb callback did not mess with our message :) */
|
|
}
|
|
}
|
|
|
|
/* Retrieve the session of the message */
|
|
CHECK_FCT( fd_msg_sess_get(fd_g_config->cnf_dict, msgptr, &sess, NULL) );
|
|
|
|
/* Now, call any callback registered for the message */
|
|
CHECK_FCT( fd_msg_dispatch ( &msgptr, sess, &action, &ec, &em, &error) );
|
|
|
|
/* Now, act depending on msg and action and ec */
|
|
if (msgptr) {
|
|
switch ( action ) {
|
|
case DISP_ACT_CONT:
|
|
/* No callback has handled the message, let's reply with a generic error or relay it */
|
|
if (!fd_g_config->cnf_flags.no_fwd) {
|
|
/* requeue to fd_g_outgoing */
|
|
fd_hook_call(HOOK_MESSAGE_ROUTING_FORWARD, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT( fd_fifo_post(fd_g_outgoing, &msgptr) );
|
|
break;
|
|
}
|
|
/* We don't relay => reply error */
|
|
em = "The message was not handled by any extension callback";
|
|
ec = "DIAMETER_COMMAND_UNSUPPORTED";
|
|
/* and continue as if an error occurred... */
|
|
case DISP_ACT_ERROR:
|
|
/* We have a problem with delivering the message */
|
|
if (ec == NULL) {
|
|
ec = "DIAMETER_UNABLE_TO_COMPLY";
|
|
}
|
|
|
|
if (!is_req) {
|
|
fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, "Internal error: Answer received to locally issued request, but not handled by any handler.", fd_msg_pmdl_get(msgptr));
|
|
fd_msg_free(msgptr);
|
|
break;
|
|
}
|
|
|
|
/* Create an answer with the error code and message */
|
|
CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msgptr, 0 ) );
|
|
CHECK_FCT( fd_msg_rescode_set(msgptr, ec, em, NULL, 1 ) );
|
|
|
|
case DISP_ACT_SEND:
|
|
/* Now, send the message */
|
|
CHECK_FCT( fd_fifo_post(fd_g_outgoing, &msgptr) );
|
|
}
|
|
} else if (em) {
|
|
fd_hook_call(HOOK_MESSAGE_DROPPED, error, NULL, em, fd_msg_pmdl_get(error));
|
|
fd_msg_free(error);
|
|
}
|
|
|
|
/* We're done with dispatching this message */
|
|
return 0;
|
|
}
|
|
|
|
/* The ROUTING-IN message processing */
|
|
static int msg_rt_in(struct msg * msg)
|
|
{
|
|
struct msg_hdr * hdr;
|
|
int is_req = 0;
|
|
int is_err = 0;
|
|
DiamId_t qry_src = NULL;
|
|
struct msg *msgptr = msg;
|
|
|
|
/* Read the message header */
|
|
CHECK_FCT( fd_msg_hdr(msg, &hdr) );
|
|
is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
|
|
is_err = hdr->msg_flags & CMD_FLAG_ERROR;
|
|
|
|
/* Handle incorrect bits */
|
|
if (is_req && is_err) {
|
|
fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "R & E bits were set", fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT( return_error( &msgptr, "DIAMETER_INVALID_HDR_BITS", "R & E bits were set", NULL) );
|
|
return 0;
|
|
}
|
|
|
|
/* If it is a request, we must analyze its content to decide what we do with it */
|
|
if (is_req) {
|
|
struct avp * avp, *un = NULL;
|
|
union avp_value * un_val = NULL, *dr_val = NULL;
|
|
enum status { UNKNOWN, YES, NO };
|
|
/* Are we Destination-Host? */
|
|
enum status is_dest_host = UNKNOWN;
|
|
/* Are we Destination-Realm? */
|
|
enum status is_dest_realm = UNKNOWN;
|
|
/* Do we support the application of the message? */
|
|
enum status is_local_app = UNKNOWN;
|
|
|
|
/* Check if we have local support for the message application */
|
|
if ( (hdr->msg_appl == 0) || (hdr->msg_appl == AI_RELAY) ) {
|
|
fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Received a routable message with application id 0 or " _stringize(AI_RELAY) " (relay)", fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT( return_error( &msgptr, "DIAMETER_APPLICATION_UNSUPPORTED", "Routable message with application id 0 or relay", NULL) );
|
|
return 0;
|
|
} else {
|
|
struct fd_app * app;
|
|
CHECK_FCT( fd_app_check(&fd_g_config->cnf_apps, hdr->msg_appl, &app) );
|
|
is_local_app = (app ? YES : NO);
|
|
}
|
|
|
|
/* Parse the message for Dest-Host, Dest-Realm, and Route-Record */
|
|
CHECK_FCT( fd_msg_browse(msgptr, MSG_BRW_FIRST_CHILD, &avp, NULL) );
|
|
while (avp) {
|
|
struct avp_hdr * ahdr;
|
|
struct fd_pei error_info;
|
|
int ret;
|
|
|
|
memset(&error_info, 0, sizeof(struct fd_pei));
|
|
|
|
CHECK_FCT( fd_msg_avp_hdr( avp, &ahdr ) );
|
|
|
|
if (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) {
|
|
switch (ahdr->avp_code) {
|
|
case AC_DESTINATION_HOST:
|
|
/* Parse this AVP */
|
|
CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
|
|
{
|
|
if (error_info.pei_errcode) {
|
|
fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error_info.pei_message ?: error_info.pei_errcode, fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
|
|
if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
|
|
return 0;
|
|
} else {
|
|
fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Unspecified error while parsing Destination-Host AVP", fd_msg_pmdl_get(msgptr));
|
|
return ret;
|
|
}
|
|
} );
|
|
ASSERT( ahdr->avp_value );
|
|
/* Compare the Destination-Host AVP of the message with our identity */
|
|
if (!fd_os_almostcasesrch(ahdr->avp_value->os.data, ahdr->avp_value->os.len, fd_g_config->cnf_diamid, fd_g_config->cnf_diamid_len, NULL)) {
|
|
is_dest_host = YES;
|
|
} else {
|
|
is_dest_host = NO;
|
|
}
|
|
break;
|
|
|
|
case AC_DESTINATION_REALM:
|
|
/* Parse this AVP */
|
|
CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
|
|
{
|
|
if (error_info.pei_errcode) {
|
|
fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error_info.pei_message ?: error_info.pei_errcode, fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
|
|
if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
|
|
return 0;
|
|
} else {
|
|
fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Unspecified error while parsing Destination-Realm AVP", fd_msg_pmdl_get(msgptr));
|
|
return ret;
|
|
}
|
|
} );
|
|
ASSERT( ahdr->avp_value );
|
|
dr_val = ahdr->avp_value;
|
|
/* Compare the Destination-Realm AVP of the message with our identity */
|
|
if (!fd_os_almostcasesrch(dr_val->os.data, dr_val->os.len, fd_g_config->cnf_diamrlm, fd_g_config->cnf_diamrlm_len, NULL)) {
|
|
is_dest_realm = YES;
|
|
} else {
|
|
is_dest_realm = NO;
|
|
}
|
|
break;
|
|
|
|
/* we also use User-Name for decorated NAI */
|
|
case AC_USER_NAME:
|
|
/* Parse this AVP */
|
|
CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
|
|
{
|
|
if (error_info.pei_errcode) {
|
|
fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error_info.pei_message ?: error_info.pei_errcode, fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
|
|
if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
|
|
return 0;
|
|
} else {
|
|
fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Unspecified error while parsing User-Name AVP", fd_msg_pmdl_get(msgptr));
|
|
return ret;
|
|
}
|
|
} );
|
|
ASSERT( ahdr->avp_value );
|
|
un = avp;
|
|
un_val = ahdr->avp_value;
|
|
break;
|
|
|
|
case AC_ROUTE_RECORD:
|
|
/* Parse this AVP */
|
|
CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
|
|
{
|
|
if (error_info.pei_errcode) {
|
|
fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error_info.pei_message ?: error_info.pei_errcode, fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
|
|
if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
|
|
return 0;
|
|
} else {
|
|
fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Unspecified error while parsing Route-Record AVP", fd_msg_pmdl_get(msgptr));
|
|
return ret;
|
|
}
|
|
} );
|
|
ASSERT( ahdr->avp_value );
|
|
/* Is this our own name ? */
|
|
if (!fd_os_almostcasesrch(ahdr->avp_value->os.data, ahdr->avp_value->os.len, fd_g_config->cnf_diamid, fd_g_config->cnf_diamid_len, NULL)) {
|
|
/* Yes: then we must return DIAMETER_LOOP_DETECTED according to Diameter RFC */
|
|
char * error = "DIAMETER_LOOP_DETECTED";
|
|
fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error, fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT( return_error( &msgptr, error, NULL, NULL) );
|
|
return 0;
|
|
}
|
|
break;
|
|
|
|
|
|
}
|
|
}
|
|
|
|
/* Stop when we found all 3 AVPs -- they are supposed to be at the beginning of the message, so this should be fast */
|
|
if ((is_dest_host != UNKNOWN) && (is_dest_realm != UNKNOWN) && un)
|
|
break;
|
|
|
|
/* Go to next AVP */
|
|
CHECK_FCT( fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) );
|
|
}
|
|
|
|
/* OK, now decide what we do with the request */
|
|
|
|
/* Handle the missing routing AVPs first */
|
|
if ( is_dest_realm == UNKNOWN ) {
|
|
fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Non-routable message not supported (invalid bit ? missing Destination-Realm ?)", fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT( return_error( &msgptr, "DIAMETER_COMMAND_UNSUPPORTED", "Non-routable message not supported (invalid bit ? missing Destination-Realm ?)", NULL) );
|
|
return 0;
|
|
}
|
|
|
|
/* If we are listed as Destination-Host */
|
|
if (is_dest_host == YES) {
|
|
if (is_local_app == YES) {
|
|
/* Ok, give the message to the dispatch thread */
|
|
fd_hook_call(HOOK_MESSAGE_ROUTING_LOCAL, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT( fd_fifo_post(fd_g_local, &msgptr) );
|
|
} else {
|
|
/* We don't support the application, reply an error */
|
|
fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Application unsupported", fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT( return_error( &msgptr, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL) );
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
/* If the message is explicitely for someone else */
|
|
if ((is_dest_host == NO) || (is_dest_realm == NO)) {
|
|
if (fd_g_config->cnf_flags.no_fwd) {
|
|
fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, "Message for another realm/host", fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT( return_error( &msgptr, "DIAMETER_UNABLE_TO_DELIVER", "I am not a Diameter agent", NULL) );
|
|
return 0;
|
|
}
|
|
} else {
|
|
/* Destination-Host was not set, and Destination-Realm is matching : we may handle or pass to a fellow peer */
|
|
int is_nai = 0;
|
|
|
|
/* test for decorated NAI (RFC5729 section 4.4) */
|
|
/* Handle the decorated NAI */
|
|
if (un_val) {
|
|
CHECK_FCT_DO( process_decorated_NAI(&is_nai, un_val, dr_val),
|
|
{
|
|
/* If the process failed, we assume it is because of the AVP format */
|
|
fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Failed to process decorated NAI", fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT( return_error( &msgptr, "DIAMETER_INVALID_AVP_VALUE", "Failed to process decorated NAI", un) );
|
|
return 0;
|
|
} );
|
|
}
|
|
|
|
if (is_nai) {
|
|
/* We have transformed the AVP, now submit it again in the queue */
|
|
CHECK_FCT(fd_fifo_post(fd_g_incoming, &msgptr) );
|
|
return 0;
|
|
}
|
|
|
|
if (is_local_app == YES) {
|
|
/* Handle localy since we are able to */
|
|
fd_hook_call(HOOK_MESSAGE_ROUTING_LOCAL, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT(fd_fifo_post(fd_g_local, &msgptr) );
|
|
return 0;
|
|
}
|
|
|
|
if (fd_g_config->cnf_flags.no_fwd) {
|
|
/* We return an error */
|
|
fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, "Application unsupported", fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT( return_error( &msgptr, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL) );
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
/* From that point, for requests, we will call the registered callbacks, then forward to another peer */
|
|
|
|
} else {
|
|
/* The message is an answer */
|
|
struct msg * qry;
|
|
|
|
/* Retrieve the corresponding query and its origin */
|
|
CHECK_FCT( fd_msg_answ_getq( msgptr, &qry ) );
|
|
CHECK_FCT( fd_msg_source_get( qry, &qry_src, NULL ) );
|
|
|
|
if ((!qry_src) && (!is_err)) {
|
|
/* The message is a normal answer to a request issued localy, we do not call the callbacks chain on it. */
|
|
fd_hook_call(HOOK_MESSAGE_ROUTING_LOCAL, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT(fd_fifo_post(fd_g_local, &msgptr) );
|
|
return 0;
|
|
}
|
|
|
|
/* From that point, for answers, we will call the registered callbacks, then pass it to the dispatch module or forward it */
|
|
}
|
|
|
|
/* Call all registered callbacks for this message */
|
|
{
|
|
struct fd_list * li;
|
|
|
|
CHECK_FCT( pthread_rwlock_rdlock( &rt_fwd_lock ) );
|
|
pthread_cleanup_push( fd_cleanup_rwlock, &rt_fwd_lock );
|
|
|
|
/* requests: dir = 1 & 2 => in order; answers = 3 & 2 => in reverse order */
|
|
for ( li = (is_req ? rt_fwd_list.next : rt_fwd_list.prev) ; msgptr && (li != &rt_fwd_list) ; li = (is_req ? li->next : li->prev) ) {
|
|
struct rt_hdl * rh = (struct rt_hdl *)li;
|
|
int ret;
|
|
|
|
if (is_req && (rh->dir > RT_FWD_ALL))
|
|
break;
|
|
if ((!is_req) && (rh->dir < RT_FWD_ALL))
|
|
break;
|
|
|
|
/* Ok, call this cb */
|
|
TRACE_DEBUG(ANNOYING, "Calling next FWD callback on %p : %p", msgptr, rh->rt_fwd_cb);
|
|
CHECK_FCT_DO( ret = (*rh->rt_fwd_cb)(rh->cbdata, &msgptr),
|
|
{
|
|
char buf[256];
|
|
snprintf(buf, sizeof(buf), "A FWD routing callback returned an error: %s", strerror(ret));
|
|
fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
|
|
fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
|
|
fd_msg_free(msgptr);
|
|
msgptr = NULL;
|
|
break;
|
|
} );
|
|
}
|
|
|
|
pthread_cleanup_pop(0);
|
|
CHECK_FCT( pthread_rwlock_unlock( &rt_fwd_lock ) );
|
|
|
|
/* If a callback has handled the message, we stop now */
|
|
if (!msgptr)
|
|
return 0;
|
|
}
|
|
|
|
/* Now pass the message to the next step: either forward to another peer, or dispatch to local extensions */
|
|
if (is_req || qry_src) {
|
|
fd_hook_call(HOOK_MESSAGE_ROUTING_FORWARD, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT(fd_fifo_post(fd_g_outgoing, &msgptr) );
|
|
} else {
|
|
fd_hook_call(HOOK_MESSAGE_ROUTING_LOCAL, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
|
|
CHECK_FCT(fd_fifo_post(fd_g_local, &msgptr) );
|
|
}
|
|
|
|
/* We're done with this message */
|
|
return 0;
|
|
}
|
|
|
|
|
|
/* The ROUTING-OUT message processing */
|
|
static int msg_rt_out(struct msg * msg)
|
|
{
|
|
struct rt_data * rtd = NULL;
|
|
struct msg_hdr * hdr;
|
|
int is_req = 0;
|
|
int ret;
|
|
struct fd_list * li, *candidates;
|
|
struct avp * avp;
|
|
struct rtd_candidate * c;
|
|
struct msg *msgptr = msg;
|
|
DiamId_t qry_src = NULL;
|
|
size_t qry_src_len = 0;
|
|
|
|
/* Read the message header */
|
|
CHECK_FCT( fd_msg_hdr(msgptr, &hdr) );
|
|
is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
|
|
|
|
/* For answers, the routing is very easy */
|
|
if ( ! is_req ) {
|
|
struct msg * qry;
|
|
struct msg_hdr * qry_hdr;
|
|
struct fd_peer * peer = NULL;
|
|
|
|
/* Retrieve the corresponding query and its origin */
|
|
CHECK_FCT( fd_msg_answ_getq( msgptr, &qry ) );
|
|
CHECK_FCT( fd_msg_source_get( qry, &qry_src, &qry_src_len ) );
|
|
|
|
ASSERT( qry_src ); /* if it is NULL, the message should have been in the LOCAL queue! */
|
|
|
|
/* Find the peer corresponding to this name */
|
|
CHECK_FCT( fd_peer_getbyid( qry_src, qry_src_len, 0, (void *) &peer ) );
|
|
if (fd_peer_getstate(peer) != STATE_OPEN && fd_peer_getstate(peer) != STATE_CLOSING_GRACE) {
|
|
char buf[128];
|
|
snprintf(buf, sizeof(buf), "Unable to forward answer to deleted / closed peer '%s'.", qry_src);
|
|
fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
|
|
fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
|
|
fd_msg_free(msgptr);
|
|
return 0;
|
|
}
|
|
|
|
/* We must restore the hop-by-hop id */
|
|
CHECK_FCT( fd_msg_hdr(qry, &qry_hdr) );
|
|
hdr->msg_hbhid = qry_hdr->msg_hbhid;
|
|
|
|
/* Push the message into this peer */
|
|
CHECK_FCT( fd_out_send(&msgptr, NULL, peer, 1) );
|
|
|
|
/* We're done with this answer */
|
|
return 0;
|
|
}
|
|
|
|
/* From that point, the message is a request */
|
|
CHECK_FCT( fd_msg_source_get( msgptr, &qry_src, &qry_src_len ) );
|
|
/* if qry_src != NULL, this message is relayed, otherwise it is locally issued */
|
|
|
|
/* Get the routing data out of the message if any (in case of re-transmit) */
|
|
CHECK_FCT( fd_msg_rt_get ( msgptr, &rtd ) );
|
|
|
|
/* If there is no routing data already, let's create it */
|
|
if (rtd == NULL) {
|
|
CHECK_FCT( fd_rtd_init(&rtd) );
|
|
|
|
/* Add all peers currently in OPEN state */
|
|
CHECK_FCT( pthread_rwlock_rdlock(&fd_g_activ_peers_rw) );
|
|
for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) {
|
|
struct fd_peer * p = (struct fd_peer *)li->o;
|
|
CHECK_FCT_DO( ret = fd_rtd_candidate_add(rtd,
|
|
p->p_hdr.info.pi_diamid,
|
|
p->p_hdr.info.pi_diamidlen,
|
|
p->p_hdr.info.runtime.pir_realm,
|
|
p->p_hdr.info.runtime.pir_realmlen),
|
|
{ CHECK_FCT_DO( pthread_rwlock_unlock(&fd_g_activ_peers_rw), ); return ret; } );
|
|
}
|
|
CHECK_FCT( pthread_rwlock_unlock(&fd_g_activ_peers_rw) );
|
|
|
|
/* Now let's remove all peers from the Route-Records */
|
|
CHECK_FCT( fd_msg_browse(msgptr, MSG_BRW_FIRST_CHILD, &avp, NULL) );
|
|
while (avp) {
|
|
struct avp_hdr * ahdr;
|
|
struct fd_pei error_info;
|
|
CHECK_FCT( fd_msg_avp_hdr( avp, &ahdr ) );
|
|
|
|
if ((ahdr->avp_code == AC_ROUTE_RECORD) && (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) ) {
|
|
/* Parse this AVP */
|
|
CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
|
|
{
|
|
if (error_info.pei_errcode) {
|
|
CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
|
|
if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
|
|
return 0;
|
|
} else {
|
|
return ret;
|
|
}
|
|
} );
|
|
ASSERT( ahdr->avp_value );
|
|
/* Remove this value from the list. We don't need to pay special attention to the contents here. */
|
|
fd_rtd_candidate_del(rtd, ahdr->avp_value->os.data, ahdr->avp_value->os.len);
|
|
}
|
|
|
|
/* Go to next AVP */
|
|
CHECK_FCT( fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) );
|
|
}
|
|
|
|
/* Save the routing information in the message */
|
|
CHECK_FCT( fd_msg_rt_associate ( msgptr, rtd ) );
|
|
}
|
|
|
|
/* Note: we reset the scores and pass the message to the callbacks, maybe we could re-use the saved scores when we have received an error ? -- TODO */
|
|
|
|
/* Ok, we have our list in rtd now, let's (re)initialize the scores */
|
|
fd_rtd_candidate_extract(rtd, &candidates, FD_SCORE_INI);
|
|
|
|
/* Pass the list to registered callbacks (even if it is empty list) */
|
|
{
|
|
CHECK_FCT( pthread_rwlock_rdlock( &rt_out_lock ) );
|
|
pthread_cleanup_push( fd_cleanup_rwlock, &rt_out_lock );
|
|
|
|
/* We call the cb by reverse priority order */
|
|
for ( li = rt_out_list.prev ; (msgptr != NULL) && (li != &rt_out_list) ; li = li->prev ) {
|
|
struct rt_hdl * rh = (struct rt_hdl *)li;
|
|
|
|
TRACE_DEBUG(ANNOYING, "Calling next OUT callback on %p : %p (prio %d)", msgptr, rh->rt_out_cb, rh->prio);
|
|
CHECK_FCT_DO( ret = (*rh->rt_out_cb)(rh->cbdata, &msgptr, candidates),
|
|
{
|
|
char buf[256];
|
|
snprintf(buf, sizeof(buf), "An OUT routing callback returned an error: %s", strerror(ret));
|
|
fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
|
|
fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
|
|
fd_msg_free(msgptr);
|
|
msgptr = NULL;
|
|
} );
|
|
}
|
|
|
|
pthread_cleanup_pop(0);
|
|
CHECK_FCT( pthread_rwlock_unlock( &rt_out_lock ) );
|
|
|
|
/* If an error occurred or the callback disposed of the message, go to next message */
|
|
if (! msgptr) {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
/* Order the candidate peers by score attributed by the callbacks */
|
|
CHECK_FCT( fd_rtd_candidate_reorder(candidates) );
|
|
|
|
/* Now try sending the message */
|
|
for (li = candidates->prev; li != candidates; li = li->prev) {
|
|
struct fd_peer * peer;
|
|
|
|
c = (struct rtd_candidate *) li;
|
|
|
|
/* Stop when we have reached the end of valid candidates */
|
|
if (c->score < 0)
|
|
break;
|
|
|
|
/* Search for the peer */
|
|
CHECK_FCT( fd_peer_getbyid( c->diamid, c->diamidlen, 0, (void *)&peer ) );
|
|
|
|
if (fd_peer_getstate(peer) == STATE_OPEN) {
|
|
/* Send to this one */
|
|
CHECK_FCT_DO( fd_out_send(&msgptr, NULL, peer, 1), continue );
|
|
|
|
/* If the sending was successful */
|
|
break;
|
|
}
|
|
}
|
|
|
|
/* If the message has not been sent, return an error */
|
|
if (msgptr) {
|
|
fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, "No remaining suitable candidate to route the message to", fd_msg_pmdl_get(msgptr));
|
|
return_error( &msgptr, "DIAMETER_UNABLE_TO_DELIVER", "No suitable candidate to route the message to", NULL);
|
|
}
|
|
|
|
/* We're done with this message */
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
/********************************************************************************/
|
|
/* Management of the threads */
|
|
/********************************************************************************/
|
|
|
|
/* Note: in the first version, we only create one thread of each kind.
|
|
We could improve the scalability by using the threshold feature of the queues
|
|
to create additional threads if a queue is filling up, or at least giving a configurable
|
|
number of threads of each kind.
|
|
*/
|
|
|
|
/* Control of the threads */
|
|
static enum { RUN = 0, STOP = 1 } order_val = RUN;
|
|
static pthread_mutex_t order_state_lock = PTHREAD_MUTEX_INITIALIZER;
|
|
|
|
/* Threads report their status */
|
|
enum thread_state { NOTRUNNING = 0, RUNNING = 1 };
|
|
static void cleanup_state(void * state_loc)
|
|
{
|
|
CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
|
|
*(enum thread_state *)state_loc = NOTRUNNING;
|
|
CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
|
|
}
|
|
|
|
/* This is the common thread code (same for routing and dispatching) */
|
|
static void * process_thr(void * arg, int (*action_cb)(struct msg * msg), struct fifo * queue, char * action_name)
|
|
{
|
|
TRACE_ENTRY("%p %p %p %p", arg, action_cb, queue, action_name);
|
|
|
|
/* Set the thread name */
|
|
{
|
|
char buf[48];
|
|
snprintf(buf, sizeof(buf), "%s (%p)", action_name, arg);
|
|
fd_log_threadname ( buf );
|
|
}
|
|
|
|
/* The thread reports its status when canceled */
|
|
CHECK_PARAMS_DO(arg, return NULL);
|
|
pthread_cleanup_push( cleanup_state, arg );
|
|
|
|
/* Mark the thread running */
|
|
CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
|
|
*(enum thread_state *)arg = RUNNING;
|
|
CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
|
|
|
|
do {
|
|
struct msg * msg;
|
|
|
|
/* Test the current order */
|
|
{
|
|
int must_stop;
|
|
CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), { ASSERT(0); } ); /* we lock to flush the caches */
|
|
must_stop = (order_val == STOP);
|
|
CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), { ASSERT(0); } );
|
|
if (must_stop)
|
|
goto end;
|
|
|
|
pthread_testcancel();
|
|
}
|
|
|
|
/* Ok, we are allowed to run */
|
|
|
|
/* Get the next message from the queue */
|
|
{
|
|
int ret;
|
|
struct timespec ts;
|
|
|
|
CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), goto fatal_error );
|
|
ts.tv_sec += 1;
|
|
|
|
ret = fd_fifo_timedget ( queue, &msg, &ts );
|
|
if (ret == ETIMEDOUT)
|
|
/* loop, check if the thread must stop now */
|
|
continue;
|
|
if (ret == EPIPE)
|
|
/* The queue was destroyed, we are probably exiting */
|
|
goto end;
|
|
|
|
/* check if another error occurred */
|
|
CHECK_FCT_DO( ret, goto fatal_error );
|
|
}
|
|
|
|
LOG_A("%s: Picked next message", action_name);
|
|
|
|
/* Now process the message */
|
|
CHECK_FCT_DO( (*action_cb)(msg), goto fatal_error);
|
|
|
|
/* We're done with this message */
|
|
|
|
} while (1);
|
|
|
|
fatal_error:
|
|
TRACE_DEBUG(INFO, "An unrecoverable error occurred, %s thread is terminating...", action_name);
|
|
CHECK_FCT_DO(fd_core_shutdown(), );
|
|
|
|
end:
|
|
; /* noop so that we get rid of "label at end of compund statement" warning */
|
|
/* Mark the thread as terminated */
|
|
pthread_cleanup_pop(1);
|
|
return NULL;
|
|
}
|
|
|
|
/* The dispatch thread */
|
|
static void * dispatch_thr(void * arg)
|
|
{
|
|
return process_thr(arg, msg_dispatch, fd_g_local, "Dispatch");
|
|
}
|
|
|
|
/* The (routing-in) thread -- see description in freeDiameter.h */
|
|
static void * routing_in_thr(void * arg)
|
|
{
|
|
return process_thr(arg, msg_rt_in, fd_g_incoming, "Routing-IN");
|
|
}
|
|
|
|
/* The (routing-out) thread -- see description in freeDiameter.h */
|
|
static void * routing_out_thr(void * arg)
|
|
{
|
|
return process_thr(arg, msg_rt_out, fd_g_outgoing, "Routing-OUT");
|
|
}
|
|
|
|
|
|
/********************************************************************************/
|
|
/* The functions for the other files */
|
|
/********************************************************************************/
|
|
|
|
static pthread_t * dispatch = NULL;
|
|
static enum thread_state * disp_state = NULL;
|
|
|
|
/* Later: make this more dynamic */
|
|
static pthread_t rt_out = (pthread_t)NULL;
|
|
static enum thread_state out_state = NOTRUNNING;
|
|
|
|
static pthread_t rt_in = (pthread_t)NULL;
|
|
static enum thread_state in_state = NOTRUNNING;
|
|
|
|
/* Initialize the routing and dispatch threads */
|
|
int fd_rtdisp_init(void)
|
|
{
|
|
int i;
|
|
|
|
/* Prepare the array for dispatch */
|
|
CHECK_MALLOC( disp_state = calloc(fd_g_config->cnf_dispthr, sizeof(enum thread_state)) );
|
|
CHECK_MALLOC( dispatch = calloc(fd_g_config->cnf_dispthr, sizeof(pthread_t)) );
|
|
|
|
/* Create the threads */
|
|
for (i=0; i < fd_g_config->cnf_dispthr; i++) {
|
|
CHECK_POSIX( pthread_create( &dispatch[i], NULL, dispatch_thr, &disp_state[i] ) );
|
|
}
|
|
CHECK_POSIX( pthread_create( &rt_out, NULL, routing_out_thr, &out_state) );
|
|
CHECK_POSIX( pthread_create( &rt_in, NULL, routing_in_thr, &in_state) );
|
|
|
|
/* Later: TODO("Set the thresholds for the queues to create more threads as needed"); */
|
|
|
|
/* Register the built-in callbacks */
|
|
CHECK_FCT( fd_rt_out_register( dont_send_if_no_common_app, NULL, 10, NULL ) );
|
|
CHECK_FCT( fd_rt_out_register( score_destination_avp, NULL, 10, NULL ) );
|
|
|
|
return 0;
|
|
}
|
|
|
|
/* Ask the thread to terminate after next iteration */
|
|
int fd_rtdisp_cleanstop(void)
|
|
{
|
|
CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
|
|
order_val = STOP;
|
|
CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void stop_thread_delayed(enum thread_state *st, pthread_t * thr, char * th_name)
|
|
{
|
|
TRACE_ENTRY("%p %p", st, thr);
|
|
CHECK_PARAMS_DO(st && thr, return);
|
|
int terminated;
|
|
|
|
CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
|
|
terminated = (*st == NOTRUNNING);
|
|
CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
|
|
|
|
|
|
/* Wait for a second for the thread to complete, by monitoring my_state */
|
|
if (!terminated) {
|
|
TRACE_DEBUG(INFO, "Waiting for the %s thread to have a chance to terminate", th_name);
|
|
do {
|
|
struct timespec ts, ts_final;
|
|
|
|
CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), break );
|
|
|
|
ts_final.tv_sec = ts.tv_sec + 1;
|
|
ts_final.tv_nsec = ts.tv_nsec;
|
|
|
|
while (TS_IS_INFERIOR( &ts, &ts_final )) {
|
|
|
|
CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
|
|
terminated = (*st == NOTRUNNING);
|
|
CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
|
|
if (terminated)
|
|
break;
|
|
|
|
usleep(100000);
|
|
CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), break );
|
|
}
|
|
} while (0);
|
|
}
|
|
|
|
/* Now stop the thread and reclaim its resources */
|
|
CHECK_FCT_DO( fd_thr_term(thr ), /* continue */);
|
|
|
|
}
|
|
|
|
/* Stop the thread after up to one second of wait */
|
|
int fd_rtdisp_fini(void)
|
|
{
|
|
int i;
|
|
|
|
/* Destroy the incoming queue */
|
|
CHECK_FCT_DO( fd_queues_fini(&fd_g_incoming), /* ignore */);
|
|
|
|
/* Stop the routing IN thread */
|
|
stop_thread_delayed(&in_state, &rt_in, "IN routing");
|
|
|
|
/* Destroy the outgoing queue */
|
|
CHECK_FCT_DO( fd_queues_fini(&fd_g_outgoing), /* ignore */);
|
|
|
|
/* Stop the routing OUT thread */
|
|
stop_thread_delayed(&out_state, &rt_out, "OUT routing");
|
|
|
|
/* Destroy the local queue */
|
|
CHECK_FCT_DO( fd_queues_fini(&fd_g_local), /* ignore */);
|
|
|
|
/* Stop the Dispatch threads */
|
|
if (dispatch != NULL) {
|
|
for (i=0; i < fd_g_config->cnf_dispthr; i++) {
|
|
stop_thread_delayed(&disp_state[i], &dispatch[i], "Dispatching");
|
|
}
|
|
free(dispatch);
|
|
dispatch = NULL;
|
|
}
|
|
if (disp_state != NULL) {
|
|
free(disp_state);
|
|
disp_state = NULL;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
/* Cleanup handlers */
|
|
int fd_rtdisp_cleanup(void)
|
|
{
|
|
/* Cleanup all remaining handlers */
|
|
while (!FD_IS_LIST_EMPTY(&rt_fwd_list)) {
|
|
CHECK_FCT_DO( fd_rt_fwd_unregister ( (void *)rt_fwd_list.next, NULL ), /* continue */ );
|
|
}
|
|
while (!FD_IS_LIST_EMPTY(&rt_out_list)) {
|
|
CHECK_FCT_DO( fd_rt_out_unregister ( (void *)rt_out_list.next, NULL ), /* continue */ );
|
|
}
|
|
|
|
fd_disp_unregister_all(); /* destroy remaining handlers */
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
/********************************************************************************/
|
|
/* For extensions to register a new appl */
|
|
/********************************************************************************/
|
|
|
|
/* Add an application into the peer's supported apps */
|
|
int fd_disp_app_support ( struct dict_object * app, struct dict_object * vendor, int auth, int acct )
|
|
{
|
|
application_id_t aid = 0;
|
|
vendor_id_t vid = 0;
|
|
|
|
TRACE_ENTRY("%p %p %d %d", app, vendor, auth, acct);
|
|
CHECK_PARAMS( app && (auth || acct) );
|
|
|
|
{
|
|
enum dict_object_type type = 0;
|
|
struct dict_application_data data;
|
|
CHECK_FCT( fd_dict_gettype(app, &type) );
|
|
CHECK_PARAMS( type == DICT_APPLICATION );
|
|
CHECK_FCT( fd_dict_getval(app, &data) );
|
|
aid = data.application_id;
|
|
}
|
|
|
|
if (vendor) {
|
|
enum dict_object_type type = 0;
|
|
struct dict_vendor_data data;
|
|
CHECK_FCT( fd_dict_gettype(vendor, &type) );
|
|
CHECK_PARAMS( type == DICT_VENDOR );
|
|
CHECK_FCT( fd_dict_getval(vendor, &data) );
|
|
vid = data.vendor_id;
|
|
}
|
|
|
|
return fd_app_merge(&fd_g_config->cnf_apps, aid, vid, auth, acct);
|
|
}
|
|
|
|
|
|
|