Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #1700: refactor the AMQP link lifecycle #1701

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 29 additions & 15 deletions include/qpid/dispatch/protocol_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,15 +386,18 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
void *bind_token);

/**
* qdr_connection_closed
* qdr_connection_notify_closed
*
* This function must be called when a connection is closed, either cleanly by protocol
* or uncleanly by lost connectivity. Once this function is called, the caller must never
* again refer to or use the connection pointer.
* This function is invoked by the adaptor to notify the core that the given connection has been closed. This must be
* called when a connection is closed, either cleanly by protocol or uncleanly by lost connectivity.
*
* This must be the last core API call made by the adaptor for this connection. The core thread will free the
* qdr_connection_t as a result of this call therefore the adaptor MUST NOT reference the qdr_connection_t on return
* from this call.
*
* @param conn The pointer returned by qdr_connection_opened
*/
void qdr_connection_closed(qdr_connection_t *conn);
void qdr_connection_notify_closed(qdr_connection_t *conn);

/**
* qdr_connection_set_tracing
Expand Down Expand Up @@ -653,12 +656,6 @@ void qdr_terminus_set_dnp_address_iterator(qdr_terminus_t *term, qd_iterator_t *
******************************************************************************
*/

typedef enum {
QD_DETACHED, // Protocol detach
QD_CLOSED, // Protocol close
QD_LOST // Connection or session closed
} qd_detach_type_t;

/**
* qdr_link_set_context
*
Expand Down Expand Up @@ -810,15 +807,32 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);

/**
* qdr_link_detach
* qdr_link_detach_received
*
* This function is invoked when a link detach arrives.
* This function is invoked when a link detach performative arrives from the remote peer. This may the first detach
* (peer-initiated link detach) or in response to a detach sent by the router (second detach).
*
* @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event.
* @param dt The type of detach that occurred.
* @param error The link error from the detach frame or 0 if none.
*/
void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error);
void qdr_link_detach_received(qdr_link_t *link, qdr_error_t *error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like renaming the function qdr_link_detach() to qdr_link_detach_received(). The new function name makes it clear that a detach (first or second) has been received from the remote peer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!



/**
* qdr_link_notify_closed
*
* This function is invoked by the adaptor to notify the core that the given link has been closed. This must be called
* when the link is closed, either cleanly by protocol or uncleanly by lost connectivity (e.g. parent connection
* drop). This will also be called during adaptor shutdown on any outstanding links.
*
* This must be the last core API call made by the adaptor for this link. The core thread will free the qdr_link_t as a
* result of this call therefore the adaptor MUST NOT reference the qdr_link_t on return from this call.
*
* @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event.
* @param forced True if the link was closed due to failure or shutdown. False if closed by clean detach handshake.
*/
void qdr_link_notify_closed(qdr_link_t *link, bool forced);


/**
* qdr_link_deliver
Expand Down
102 changes: 51 additions & 51 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,24 +139,6 @@ static qdr_delivery_t *qdr_node_delivery_qdr_from_pn(pn_delivery_t *dlv)
return ref ? (qdr_delivery_t*) ref->ref : 0;
}

// clean up all qdr_delivery/pn_delivery bindings for the link
//
void qd_link_abandoned_deliveries_handler(qd_router_t *router, qd_link_t *link)
{
qd_link_ref_list_t *list = qd_link_get_ref_list(link);
qd_link_ref_t *ref = DEQ_HEAD(*list);

while (ref) {
qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref;
pn_delivery_t *pdlv = qdr_delivery_get_context(dlv);
assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv));

// this will remove and release the ref
qdr_node_disconnect_deliveries(router->router_core, link, dlv, pdlv);
ref = DEQ_HEAD(*list);
}
}


// read the delivery-state set by the remote endpoint
//
Expand Down Expand Up @@ -1223,10 +1205,9 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link)
/**
* Link Detached Handler
*/
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_detach_type_t dt)
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link)
{
if (!link)
return 0;
assert(link);

pn_link_t *pn_link = qd_link_pn(link);
if (!pn_link)
Expand Down Expand Up @@ -1257,29 +1238,59 @@ static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_det
}
}

qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0;
// Notify the core that a detach has been received.

qdr_link_t *rlink = (qdr_link_t *) qd_link_get_context(link);
if (rlink) {
//
// If this is the second (response) detach or the link hasn't really detached but is being dropped due to parent
// connection/session loss then this is the last proton event that will be generated for this link. The qd_link
// will be freed on return from this call so remove the cross linkage between it and the qdr_link peer.

if (dt == QD_LOST || qdr_link_get_context(rlink) == 0) {
// note qdr_link context will be zeroed when the core sends the first detach, so if it is zero then this is
// the second detach!
qd_link_set_context(link, 0);
qdr_link_set_context(rlink, 0);
}

qdr_error_t *error = qdr_error_from_pn(cond);
qdr_link_detach(rlink, dt, error);
pn_condition_t *cond = pn_link_remote_condition(pn_link);
qdr_error_t *error = qdr_error_from_pn(cond);
qdr_link_detach_received(rlink, error);
} else if ((pn_link_state(pn_link) & PN_LOCAL_CLOSED) == 0) {
// Normally the core would be responsible for sending the response detach to close the link (via
// CORE_link_detach) but since there is no core link that will not happen.
pn_link_close(pn_link);
}

return 0;
}


/**
* Link closed handler
*
* This is the last callback for the given link - the link will be freed on return from this call! Forced is true if the
* link has not properly closed (detach handshake completed).
*/
static void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bool forced)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if the AMQP_ prefix should be used only for functions that are called directly as a response to proton AMQP events. This function can be called qd_link_close() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be... but I'm not sure if other devs would be ok with that new naming concept. @ssorj @ted-ross and others - opinions?

{
assert(qd_link);

// Clean up all qdr_delivery/pn_delivery bindings for the link.

qd_link_ref_list_t *list = qd_link_get_ref_list(qd_link);
qd_link_ref_t *ref = DEQ_HEAD(*list);

while (ref) {
qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref;
pn_delivery_t *pdlv = qdr_delivery_get_context(dlv);
assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv));

// This will decrement the qdr_delivery_t reference count - do not access the dlv pointer after this call!
qdr_node_disconnect_deliveries(router->router_core, qd_link, dlv, pdlv);
ref = DEQ_HEAD(*list);
}

qdr_link_t *qdr_link = (qdr_link_t *) qd_link_get_context(qd_link);
if (qdr_link) {
// Notify core that this link no longer exists
qdr_link_set_context(qdr_link, 0);
qd_link_set_context(qd_link, 0);
qdr_link_notify_closed(qdr_link, forced);
// This will cause the core to free qdr_link at some point so:
qdr_link = 0;
}
}

static void bind_connection_context(qdr_connection_t *qdrc, void* token)
{
qd_connection_t *conn = (qd_connection_t*) token;
Expand Down Expand Up @@ -1761,7 +1772,7 @@ static int AMQP_closed_handler(qd_router_t *router, qd_connection_t *conn, void
if (!!conn->listener && qdrc->role != QDR_ROLE_INTER_ROUTER_DATA) {
qd_listener_remove_link(conn->listener);
}
qdr_connection_closed(qdrc);
qdr_connection_notify_closed(qdrc);
qd_connection_set_context(conn, 0);
}

Expand All @@ -1776,8 +1787,8 @@ static const qd_node_type_t router_node = {"router", 0,
AMQP_outgoing_link_handler,
AMQP_conn_wake_handler,
AMQP_link_detach_handler,
AMQP_link_closed_handler,
AMQP_link_attach_handler,
qd_link_abandoned_deliveries_handler,
AMQP_link_flow_handler,
0, // node_created_handler
0, // node_destroyed_handler
Expand Down Expand Up @@ -1920,7 +1931,7 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error
return;

pn_link_t *pn_link = qd_link_pn(qlink);
if (!pn_link)
if (!pn_link || !!(pn_link_state(pn_link) & PN_LOCAL_CLOSED)) // already detached
return;

if (error) {
Expand All @@ -1945,17 +1956,6 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error
}
}

//
// This is the last event for this link that the core is going to send into Proton so remove the core => adaptor
// linkage. If this is the response attach then there will be no further proton link events to send to the core so
// remove the adaptor => core linkage. If this is the first (request) detach preserve the adaptor => core linkage so
// we can notify the core when the second (response) detach arrives
//
qdr_link_set_context(link, 0);
if (!first) {
qd_link_set_context(qlink, 0);
}

qd_link_close(qlink);
}

Expand Down
60 changes: 34 additions & 26 deletions src/adaptors/amqp/container.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ struct qd_link_t {
ALLOC_DEFINE_SAFE(qd_link_t);
ALLOC_DEFINE(qd_link_ref_t);

static void qd_link_free(qd_link_t *);


/** Encapsulates a proton session */
struct qd_session_t {
DEQ_LINKS(qd_session_t);
Expand Down Expand Up @@ -177,7 +180,7 @@ static qd_link_t *setup_outgoing_link(qd_container_t *container, pn_link_t *pn_l
qd_session_incref(link->qd_session);

pn_link_set_context(pn_link, link);
container->ntype->outgoing_handler(container->qd_router, link);
container->ntype->outgoing_link_handler(container->qd_router, link);
return link;
}

Expand Down Expand Up @@ -209,7 +212,7 @@ static qd_link_t *setup_incoming_link(qd_container_t *container, pn_link_t *pn_l
pn_link_set_max_message_size(pn_link, max_size);
}
pn_link_set_context(pn_link, link);
container->ntype->incoming_handler(container->qd_router, link);
container->ntype->incoming_link_handler(container->qd_router, link);
return link;
}

Expand Down Expand Up @@ -277,7 +280,8 @@ static void notify_closed(qd_container_t *container, qd_connection_t *conn, void


// The given connection has dropped. There will be no further link events for this connection so manually clean up all
// links
// links. Note that we do not free the pn_link_t - proton will free all links when the parent connection is freed.
//
static void close_links(qd_container_t *container, pn_connection_t *conn, bool print_log)
{
pn_link_t *pn_link = pn_link_head(conn, 0);
Expand All @@ -289,7 +293,7 @@ static void close_links(qd_container_t *container, pn_connection_t *conn, bool p
if (print_log)
qd_log(LOG_CONTAINER, QD_LOG_DEBUG, "Aborting link '%s' due to parent connection end",
pn_link_name(pn_link));
container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST);
container->ntype->link_closed_handler(container->qd_router, qd_link, true); // true == forced
qd_link_free(qd_link);
}

Expand Down Expand Up @@ -318,6 +322,7 @@ static void cleanup_link(qd_link_t *link)
// cleanup any inbound message that has not been forwarded
qd_message_t *msg = qd_alloc_deref_safe_ptr(&link->incoming_msg);
if (msg) {
qd_nullify_safe_ptr(&link->incoming_msg);
qd_message_free(msg);
}
}
Expand All @@ -326,8 +331,7 @@ static void cleanup_link(qd_link_t *link)
static int close_handler(qd_container_t *container, pn_connection_t *conn, qd_connection_t* qd_conn)
{
//
// Close all links, passing QD_LOST as the reason. These links are not
// being properly 'detached'. They are being orphaned.
// Close all links. These links are not being properly 'detached'. They are being orphaned.
//
if (qd_conn)
qd_conn->closed = true;
Expand Down Expand Up @@ -508,9 +512,9 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
// Remote has nuked our session. Check for any links that were
// left open and forcibly detach them, since no detaches will
// arrive on this session.
// Remote has closed the session. Check for any child links and forcibly close them since there will be
// no detach performatives arriving for these links. Note that we do not free the pn_link_t since proton
// will free all child pn_link_t when it frees the session.
pn_link = pn_link_head(conn, 0);
while (pn_link) {
pn_link_t *next_link = pn_link_next(pn_link, 0);
Expand All @@ -529,7 +533,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
qd_log(LOG_CONTAINER, QD_LOG_DEBUG,
"Aborting link '%s' due to parent session end", pn_link_name(pn_link));
container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST);
container->ntype->link_closed_handler(container->qd_router, qd_link, true);
qd_link_free(qd_link);
}
}
Expand Down Expand Up @@ -590,10 +594,6 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
pn_link = pn_event_link(event);
qd_link = (qd_link_t*) pn_link_get_context(pn_link);
if (qd_link) {
qd_detach_type_t dt = pn_event_type(event) == PN_LINK_REMOTE_CLOSE ? QD_CLOSED : QD_DETACHED;
if (qd_link->pn_link == pn_link) {
pn_link_close(pn_link);
}
if (qd_link->policy_counted) {
qd_link->policy_counted = false;
if (pn_link_is_sender(pn_link)) {
Expand All @@ -609,25 +609,35 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
}

container->ntype->link_detach_handler(container->qd_router, qd_link, dt);
// notify arrival of inbound detach
container->ntype->link_detach_handler(container->qd_router, qd_link);

if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) {
// link fully closed
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
// Link now fully detached
container->ntype->link_closed_handler(container->qd_router, qd_link, false);
qd_link_free(qd_link);
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
}
} else { // no qd_link, manually detach or free
if ((pn_link_state(pn_link) & PN_LOCAL_CLOSED) == 0) {
pn_link_close(pn_link);
} else {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
}

} else {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
}
}
break;

case PN_LINK_LOCAL_CLOSE:
pn_link = pn_event_link(event);
if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
qd_link_free((qd_link_t *) pn_link_get_context(pn_link));
qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link);
if (qd_link) {
// Link now fully detached
container->ntype->link_closed_handler(container->qd_router, qd_link, false);
qd_link_free(qd_link);
}
add_link_to_free_list(&qd_conn->free_link_list, pn_link); // why???
}
break;

Expand Down Expand Up @@ -775,16 +785,14 @@ qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char* name,
}


void qd_link_free(qd_link_t *link)
static void qd_link_free(qd_link_t *link)
{
if (!link) return;

sys_mutex_lock(&amqp_adaptor.container->lock);
DEQ_REMOVE(amqp_adaptor.container->links, link);
sys_mutex_unlock(&amqp_adaptor.container->lock);

amqp_adaptor.container->ntype->link_abandoned_deliveries_handler(amqp_adaptor.container->qd_router, link);

cleanup_link(link);
free_qd_link_t(link);
}
Expand Down
Loading
Loading