-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes #1700: refactor the AMQP link lifecycle #1701
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -139,24 +139,6 @@ static qdr_delivery_t *qdr_node_delivery_qdr_from_pn(pn_delivery_t *dlv) | |
return ref ? (qdr_delivery_t*) ref->ref : 0; | ||
} | ||
|
||
// clean up all qdr_delivery/pn_delivery bindings for the link | ||
// | ||
void qd_link_abandoned_deliveries_handler(qd_router_t *router, qd_link_t *link) | ||
{ | ||
qd_link_ref_list_t *list = qd_link_get_ref_list(link); | ||
qd_link_ref_t *ref = DEQ_HEAD(*list); | ||
|
||
while (ref) { | ||
qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref; | ||
pn_delivery_t *pdlv = qdr_delivery_get_context(dlv); | ||
assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv)); | ||
|
||
// this will remove and release the ref | ||
qdr_node_disconnect_deliveries(router->router_core, link, dlv, pdlv); | ||
ref = DEQ_HEAD(*list); | ||
} | ||
} | ||
|
||
|
||
// read the delivery-state set by the remote endpoint | ||
// | ||
|
@@ -1223,10 +1205,9 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link) | |
/** | ||
* Link Detached Handler | ||
*/ | ||
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_detach_type_t dt) | ||
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link) | ||
{ | ||
if (!link) | ||
return 0; | ||
assert(link); | ||
|
||
pn_link_t *pn_link = qd_link_pn(link); | ||
if (!pn_link) | ||
|
@@ -1257,29 +1238,59 @@ static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_det | |
} | ||
} | ||
|
||
qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link); | ||
pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0; | ||
// Notify the core that a detach has been received. | ||
|
||
qdr_link_t *rlink = (qdr_link_t *) qd_link_get_context(link); | ||
if (rlink) { | ||
// | ||
// If this is the second (response) detach or the link hasn't really detached but is being dropped due to parent | ||
// connection/session loss then this is the last proton event that will be generated for this link. The qd_link | ||
// will be freed on return from this call so remove the cross linkage between it and the qdr_link peer. | ||
|
||
if (dt == QD_LOST || qdr_link_get_context(rlink) == 0) { | ||
// note qdr_link context will be zeroed when the core sends the first detach, so if it is zero then this is | ||
// the second detach! | ||
qd_link_set_context(link, 0); | ||
qdr_link_set_context(rlink, 0); | ||
} | ||
|
||
qdr_error_t *error = qdr_error_from_pn(cond); | ||
qdr_link_detach(rlink, dt, error); | ||
pn_condition_t *cond = pn_link_remote_condition(pn_link); | ||
qdr_error_t *error = qdr_error_from_pn(cond); | ||
qdr_link_detach_received(rlink, error); | ||
} else if ((pn_link_state(pn_link) & PN_LOCAL_CLOSED) == 0) { | ||
// Normally the core would be responsible for sending the response detach to close the link (via | ||
// CORE_link_detach) but since there is no core link that will not happen. | ||
pn_link_close(pn_link); | ||
} | ||
|
||
return 0; | ||
} | ||
|
||
|
||
/** | ||
* Link closed handler | ||
* | ||
* This is the last callback for the given link - the link will be freed on return from this call! Forced is true if the | ||
* link has not properly closed (detach handshake completed). | ||
*/ | ||
static void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bool forced) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was wondering if the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
{ | ||
assert(qd_link); | ||
|
||
// Clean up all qdr_delivery/pn_delivery bindings for the link. | ||
|
||
qd_link_ref_list_t *list = qd_link_get_ref_list(qd_link); | ||
qd_link_ref_t *ref = DEQ_HEAD(*list); | ||
|
||
while (ref) { | ||
qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref; | ||
pn_delivery_t *pdlv = qdr_delivery_get_context(dlv); | ||
assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv)); | ||
|
||
// This will decrement the qdr_delivery_t reference count - do not access the dlv pointer after this call! | ||
qdr_node_disconnect_deliveries(router->router_core, qd_link, dlv, pdlv); | ||
ref = DEQ_HEAD(*list); | ||
} | ||
|
||
qdr_link_t *qdr_link = (qdr_link_t *) qd_link_get_context(qd_link); | ||
if (qdr_link) { | ||
// Notify core that this link no longer exists | ||
qdr_link_set_context(qdr_link, 0); | ||
qd_link_set_context(qd_link, 0); | ||
qdr_link_notify_closed(qdr_link, forced); | ||
// This will cause the core to free qdr_link at some point so: | ||
qdr_link = 0; | ||
} | ||
} | ||
|
||
static void bind_connection_context(qdr_connection_t *qdrc, void* token) | ||
{ | ||
qd_connection_t *conn = (qd_connection_t*) token; | ||
|
@@ -1761,7 +1772,7 @@ static int AMQP_closed_handler(qd_router_t *router, qd_connection_t *conn, void | |
if (!!conn->listener && qdrc->role != QDR_ROLE_INTER_ROUTER_DATA) { | ||
qd_listener_remove_link(conn->listener); | ||
} | ||
qdr_connection_closed(qdrc); | ||
qdr_connection_notify_closed(qdrc); | ||
qd_connection_set_context(conn, 0); | ||
} | ||
|
||
|
@@ -1776,8 +1787,8 @@ static const qd_node_type_t router_node = {"router", 0, | |
AMQP_outgoing_link_handler, | ||
AMQP_conn_wake_handler, | ||
AMQP_link_detach_handler, | ||
AMQP_link_closed_handler, | ||
AMQP_link_attach_handler, | ||
qd_link_abandoned_deliveries_handler, | ||
AMQP_link_flow_handler, | ||
0, // node_created_handler | ||
0, // node_destroyed_handler | ||
|
@@ -1920,7 +1931,7 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error | |
return; | ||
|
||
pn_link_t *pn_link = qd_link_pn(qlink); | ||
if (!pn_link) | ||
if (!pn_link || !!(pn_link_state(pn_link) & PN_LOCAL_CLOSED)) // already detached | ||
return; | ||
|
||
if (error) { | ||
|
@@ -1945,17 +1956,6 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error | |
} | ||
} | ||
|
||
// | ||
// This is the last event for this link that the core is going to send into Proton so remove the core => adaptor | ||
// linkage. If this is the response attach then there will be no further proton link events to send to the core so | ||
// remove the adaptor => core linkage. If this is the first (request) detach preserve the adaptor => core linkage so | ||
// we can notify the core when the second (response) detach arrives | ||
// | ||
qdr_link_set_context(link, 0); | ||
if (!first) { | ||
qd_link_set_context(qlink, 0); | ||
} | ||
|
||
qd_link_close(qlink); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like renaming the function
qdr_link_detach()
toqdr_link_detach_received()
. The new function name makes it clear that a detach (first or second) has been received from the remote peer.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!