Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trying to publish a message inside the 'connected' signal fails on reconnect. #41

Open
alaendle opened this issue Mar 22, 2016 · 9 comments

Comments

@alaendle
Copy link

If I create a simple sender that just publishes a message on 'connected' it fails during re-connecting. Just take a look at the following sample (a small modification of the sender-example):

Sender::Sender(QObject *parent) : QObject(parent) {
    m_client.setAutoReconnect(true);
}

void Sender::start() {
        connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected()));
        m_client.connectToHost();
}

void Sender::clientConnected() {
    QAmqpExchange *defaultExchange = m_client.createExchange("test");
    defaultExchange->publish("Hello World!", "test");
    qDebug() << " [x] Sent 'Hello World!'";
}

If I now restart the broker the client tries to publish before a channel is established:

...
trying to reconnect after: 5000 ms
connecting to host: "localhost" , port: 5672
-> connection#start( version_major=0, version_minor=9, mechanisms=(PLAIN,AMQPLAIN), locales=en_US
<- connection#startOk()
-> connection#tune( channel_max=0, frame_max=131072, heartbeat=60 )
<- connection#tuneOk( channelMax=0, frameMax=131072, heartbeatDelay=60
<- connection#open( virtualHost=/, reserved-1=0, reserved-2=0
-> connection#openOk()
<- basic#publish( exchange=test, routing-key=test, mandatory=0, immediate=0 )
[x] Sent 'Hello World!'
<- channel#open( channel=1 )
-> connection#close( reply-code=504, reply-text=CHANNEL_ERROR - expected 'channel.open', class-id=60, method-id:40 )
exchange disconnected: "test"
<- connection#closeOk()
socket error: "The remote host closed the connection"
exchange disconnected: "test"
trying to reconnect after: 1000 ms
connecting to host: "localhost" , port: 5672
-> connection#start( version_major=0, version_minor=9, mechanisms=(PLAIN,AMQPLAIN), locales=en_US
<- connection#startOk()
-> connection#tune( channel_max=0, frame_max=131072, heartbeat=60 )
<- connection#tuneOk( channelMax=0, frameMax=131072, heartbeatDelay=60
<- connection#open( virtualHost=/, reserved-1=0, reserved-2=0
-> connection#openOk()
<- basic#publish( exchange=test, routing-key=test, mandatory=0, immediate=0 )
[x] Sent 'Hello World!'
<- channel#open( channel=1 )
-> connection#close( reply-code=504, reply-text=CHANNEL_ERROR - expected 'channel.open', class-id=60, method-id:40 )
exchange disconnected: "test"
<- connection#closeOk()
socket error: "The remote host closed the connection"
exchange disconnected: "test"
trying to reconnect after: 1000 ms
...

For me it seems like this is a bug, but maybe I'm just making wrong assumptions about the API of qampq. If I was unclear or if you need more information just let me know. Thanks in advance for any assistance or any hint how to work around this problem.

@mbroadst
Copy link
Owner

@alaendle sorry, was on vacation! hmm this does indeed look like a bug, I'll try to repro on my end here and see what's going on.

@mbroadst
Copy link
Owner

@alaendle can you post your whole code here? It looks like you're never declaring the exchange?

@mbroadst
Copy link
Owner

@alaendle ah, I think I understand your issue. Yeah it looks like there is currently a problem if you try publishing when not properly connected. I wonder if maybe you'd be interested in helping out with this feature? The exchange would have to maintain an internal list of pending messages to send, which would be published upon connection and the channel being established.

@mbroadst
Copy link
Owner

@alaendle so I took a first stab at supporting pending sends on this branch. The work is incomplete because of other issues I uncovered with certain reconnect scenarios, but the code there does indeed resend the data on reconnect, solving your problem. I won't have time for this for a few days, but I'll try to get around to completely fixing the issue.

@alaendle
Copy link
Author

alaendle commented Apr 5, 2016

@mbroadst please excuse the late response, was also on vocation. Will take a look at your work now...

@alaendle
Copy link
Author

alaendle commented Apr 5, 2016

I still have a problem in my original scenario. If I just shutdown and restart rabbitmq (v3.6.1) I couldn't publish messages after reconnecting... the opened flag never switches back to 'true', because channel#openOk is never received. The new log...

-> connection#start( version_major=0, version_minor=9, mechanisms=(AMQPLAIN,PLAIN), locales=en_US )
<- connection#startOk()
-> connection#tune( channel_max=0, frame_max=131072, heartbeat=60 )
<- connection#tuneOk( channelMax=0, frameMax=131072, heartbeatDelay=60 )
<- connection#open( virtualHost=/, reserved-1=0, reserved-2=0 )
-> connection#openOk()
channel name: ""
<- channel#open( channel=1 )
[x] Sent 'Hello World!'
-> channel#openOk( channel=1 )
<- basic#publish( exchange=, routing-key=test, mandatory=0, immediate=0 )
<- basic#publish( exchange=, routing-key=test, mandatory=0, immediate=0 )
[x] Sent 'Hello World!'
socket error: "The remote host closed the connection"
exchange disconnected: ""
trying to reconnect after: 1000 ms
connecting to host: "localhost" , port: 5672
socket error: "Connection refused"
trying to reconnect after: 5000 ms
connecting to host: "localhost" , port: 5672
-> connection#start( version_major=0, version_minor=9, mechanisms=(AMQPLAIN,PLAIN), locales=en_US )
<- connection#startOk()
-> connection#tune( channel_max=0, frame_max=131072, heartbeat=60 )
<- connection#tuneOk( channelMax=0, frameMax=131072, heartbeatDelay=60 )
<- connection#open( virtualHost=/, reserved-1=0, reserved-2=0 )
-> connection#openOk()
[x] Sent 'Hello World!'
channel name: ""
<- channel#open( channel=2 )
[x] Sent 'Hello World!'
[x] Sent 'Hello World!'
AMQP: Heartbeat

@alaendle
Copy link
Author

alaendle commented Apr 5, 2016

The problem seems to have its cause in the fact that during reconnect the channel numbers change - so the maps pointing to the frame handler methods (in this case QAmqpClientPrivate::methodHandlersByChannel) are no longer correct. So two solutions come to my mind - a) don't change channel numbers, b) don't rely on channel numbers while looking for registered handlers. I did a short proof-of-concept implementation of b) and it seems to work - but for sure this isn't a fully fledged solution to the problem...

diff --git "a/C:\\Users\\laa6bh\\AppData\\Local\\Temp\\TortoiseGit\\qam37F6.tmp\\qamqpclient-7d9f803-left.cpp" "b/C:\\Tools\\qamqp\\src\\qamqpclient.cpp"
index e460e05..27c7415 100644
--- "a/C:\\Users\\laa6bh\\AppData\\Local\\Temp\\TortoiseGit\\qam37F6.tmp\\qamqpclient-7d9f803-left.cpp"
+++ "b/C:\\Tools\\qamqp\\src\\qamqpclient.cpp"
@@ -258,8 +258,13 @@ void QAmqpClientPrivate::_q_readyRead()
             if (frame.methodClass() == QAmqpFrame::Connection) {
                 _q_method(frame);
             } else {
-                foreach (QAmqpMethodFrameHandler *methodHandler, methodHandlersByChannel[frame.channel()])
-                    methodHandler->_q_method(frame);
+                QHashIterator<QAmqpChannel*, QList<QAmqpMethodFrameHandler*>> i(methodHandlersByChannel);
+                while (i.hasNext()) {
+                    i.next();
+                    if(i.key()->channelNumber() == frame.channel())
+                        foreach (QAmqpMethodFrameHandler *methodHandler, i.value())
+                            methodHandler->_q_method(frame);
+                }
             }
         }
             break;
@@ -689,7 +694,7 @@ QAmqpExchange *QAmqpClient::createExchange(const QString &name, int channelNumbe
     }

     exchange = new QAmqpExchange(channelNumber, this);
-    d->methodHandlersByChannel[exchange->channelNumber()].append(exchange->d_func());
+    d->methodHandlersByChannel[exchange].append(exchange->d_func());
     connect(this, SIGNAL(connected()), exchange, SLOT(_q_open()));
     connect(this, SIGNAL(disconnected()), exchange, SLOT(_q_disconnected()));
     exchange->d_func()->open();
@@ -716,7 +721,7 @@ QAmqpQueue *QAmqpClient::createQueue(const QString &name, int channelNumber)
     }

     queue = new QAmqpQueue(channelNumber, this);
-    d->methodHandlersByChannel[queue->channelNumber()].append(queue->d_func());
+    d->methodHandlersByChannel[queue].append(queue->d_func());
     d->contentHandlerByChannel[queue->channelNumber()].append(queue->d_func());
     d->bodyHandlersByChannel[queue->channelNumber()].append(queue->d_func());
     connect(this, SIGNAL(connected()), queue, SLOT(_q_open()));

@mbroadst
Copy link
Owner

mbroadst commented Apr 5, 2016

@alaendle are you using the code in that branch or the code in master? The issue initially with the code in master is that AmqpExchange tries to send data without concern for whether its currently open or not, so the work in that branch was focused on caching attempted sends until the exchange is open again (there are a number of caveats with this approach which would have to be tested, first and foremost the fact that this can totally leak memory if the exchange is never connected again!).

As for methodHandlersByChannel.. The channel numbers should be the same, so that would be a problem with the reconnect code in the client itself. Apologies for the slow nature of the fix here, but I haven't actually used this code in about three years! I'm interested in helping you fix the issue, but let's take it one step at a time if you'll accommodate me.

@alaendle
Copy link
Author

alaendle commented Apr 6, 2016

@mbroadst Don't want to stress you, for sure we could/should improve the behaviour step-by-step. I just played around with the code in the 'pending-sends' branch and thought it might help to update this issue with the 'new' behaviour I've experienced. Also I'm aware about what it takes to maintain an open-source project over years - so let me take this chance, to just thank you for sharing the code and your support.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants