-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Messages getting mixed up #23
Comments
@sjlongland its entirely possible that this is happening, from my travels through implementations in a few other languages (I've now implemented this in 3 😄), I've seen other multiplex incoming messages. On the other hand the spec indicates that incoming frames should be sequential so the way we currently handle incoming bodies should just work (though it seems this is not the case). The best course of action would be if you could submit a test case appended to the autotests showing this failure. I test against RabbitMQ explicitly, so you shouldn't find too much of a problem there (in fact, if you have RabbitMQ running locally without modification, you can run Thanks for the report, hope we can get this cleared up |
@sjlongland in your studies you'll probably want to focus on: https://github.com/mbroadst/qamqp/blob/master/src/qamqpqueue.cpp#L115. This basically assumes that there is one message being processed at a time (perhaps a wildly inappropriate assumption!). It's worked for me thus far in production, but maybe we haven't experienced the load/size of messages you are working with. |
On 31/03/15 07:14, Matt Broadstone wrote:
Indeed, it's not all the time, just intermittently, so I'm not sure I'm using RabbitMQ 3.5.0 (from their repository, not Debian's) here on The problem is rare it seems, so it looks as if the assumption isn't too The good news is QAMQP is fairly compact, so there isn't big globs of Regards,Stuart Longland (aka Redhatter, VK4MSL) I haven't lost my mind... |
Okay, a bit more information on the bug. It seems as if a buffer isn't cleared when the message is delivered, so old message data is left in the buffer when the next message comes along.
There, three of them got mixed up. When I look for that UUID in my Python-based client, I see this:
So it was supposed to be an
So if I now look at the C++ program log for that message, I get this:
By a shear fluke, the YAML parser ignored the bits of the I wonder if the |
As example you can try make tcpdump by Wireshark for this case. Compare received packet with processed by QAMQP, and log leftSize for current message. |
Okay, for whatever reason a "basic.deliver" got missed. Or at least from my tinkering it appears that way. I've made some changes to the library so that I can trace what's going on. These are in a separate branch: master...sjlongland:investigate-issue-23 I've also added a shim layer that exposes QDebug messages to log4cpp in my code base, so that QAMQP's debug statements appear along side my own log messages with the prefix QDebug.
Note that at this point, QAMQP has dispatched the message, and my code has dealt with it, sent its reply. Another message comes in from the same queue. Note this time, we don't get the "New message object" message seen earlier.
Now there are two places in QAmqpQueue where a message is replaced like that: You'll notice I've added logging statements around there. I managed to get a trace out of
Looking at the above, it seems QAmqpQueuePrivate::deliver should get called, but for whatever reason, doesn't. I'm not sure where the headers are being picked up from. Is there some other method that plucks out header details and inserts them into the waiting message? |
@sjlongland it would be useful to throw in the output of enabling QAMQP debugging (QAMQP_DEBUG=1 ./test/auto/sometest) as well |
On 31/03/15 22:30, Matt Broadstone wrote:
Ahh, so THAT's what QAmqpDebug does. Okay, I've added that to my Regards,Stuart Longland (aka Redhatter, VK4MSL) I haven't lost my mind... |
@sjlongland it's not exactly the best, but it's more data 😄 |
Indeed, and I think I see a problem:
So somewhere it is getting the consumer tags muddled. If I look back in the log, this was what was captured when the program started declaring that particular queue.
I'll double check that consume() isn't being called by mistake twice. Ideally this shouldn't be happening but if it is, that might be the culprit. |
@sjlongland ah that would be a bug indeed 😄 |
I think there's my problem. So a state machine bug in my code, triggering a bug in QAMQP. There are a couple of ways we can prevent this from accidentally occurring. One would be to put a boolean flag that gets set when A third would be to make a note of the last consumer tag, and ignore all message bodies not belonging to that consumer tag. |
@sjlongland Ah great sleuthing! Yes there should be a 1-1 guarantee between a queue and a consumer tag. Can we start with a test case expressing the problem? I'm not sure I understand quite how you're trigging the error as we already have a check to prevent multiple consumption on a given QAmqpQueue: https://github.com/mbroadst/qamqp/blob/master/src/qamqpqueue.cpp#L448. |
Okay, what I think is happening… in my code I've got a state machine that wraps the queue, basically so my objects just hold a reference (thank-you QSharedPointer) to my state machine object. The Queue state machine (AMQPQueueHandler) begins in the DISCONNECTED state, when it receives the connected signal from the connection (AMQPConnectionHandler; which wraps QAmqpClient), it creates a queue object then calls When binding to an exchange, it listens for the So there's a race between the queue and the exchange. Not a problem yet. AMQPQueueHandler receives AMQPQueueHandler then receives a So I think the problem is me accidentally calling Something like the following might reproduce the bug:
Since we don't wait for the I'll have a look at some point and see if that reproduces the issue. On this, it would appear that should a buggy AMQP broker send messages with the wrong consumer tag, that will trigger the same bug. It'd be worth having a flag in QAmqpQueue that ignores message body frames when the last-seen consumer tag doesn't match the one for that queue. In fact, I'd go so far as to trigger an abort at that point (or at least complain very loudly). |
to me it seems more likely that you are calling declare multiple times here, there is no corresponding guard in the declare() method. Try adding a line there to bail |
Okay, I spent the last day re-working my code to ensure correct state when setting up objects, only to hit a separate issue with socket operation timeouts. So I've branched that code off and backtracked to where I was yesterday morning. It appears there is some logic there in QAMQP to do an automatic re-connect, and it appears that queues then get re-declared on reconnect. Not sure if it re-binds and re-starts consumers though, and that's something I was trying to achieve. The fact that re-connecting doesn't appear to invalidate queue/exchange pointers is a good thing. So in looking at this I'm looking at https://github.com/mbroadst/qamqp/blob/master/src/qamqpqueue.cpp#L440 So that immediately triggers a frame to be written to the channel, if and only if the queue hasn't already been placed in the "consuming" state. Sounds sane, now what constitutes "consuming"? It's a boolean flag, the same one returned by the https://github.com/mbroadst/qamqp/blob/master/src/qamqpqueue.cpp#L212 It appears that if code calls |
And it looks like I've nailed it. The fixes will need some cleaning up but this is probably closer to the target than what we had before. master...sjlongland:investigate-issue-23 There are a couple of problems that are superimposed on top of each-other. Firstly, is the lack of checking to see if some operations (notably Attempting to fix this I ran into a separate issue, that is it's real easy to piss off the AMQP server if you're not careful to wait until one synchronous operation completes before starting the next. A number of operations require that you wait for the reply before sending the next message, or at least that's my reading of the spec. An approach I've seen used in Pika is to queue the tasks and execute them later. So taking on a similar approach, we can stash the frame to be sent in a buffer, then when the previous operations are clear, we can recall the frame and push it down the pipe. For this purpose, I've hijacked the When the reply is received, So far, having done both these things, I'm finding that subscription is a little more reliable and I'm not getting double-ups on consumers. |
@sjlongland cool, I'm glad you found a solution here. I'm still concerned that without a test case showing the behavior, we will not be able to fully verify that the issue(s) you are facing are completely solved. There's proof enough of this being the case by simply observing your initial bug blossom into at least one other one. Can you provide a PR that shows your initial bug within the context of one of the auto tests? |
On 02/04/15 23:34, Matt Broadstone wrote:
I'll see what I can do. Should be possible by creating a queue, double-consuming it then sending a message to it.Stuart Longland (aka Redhatter, VK4MSL) I haven't lost my mind... |
Okay, see #24 |
Well, I did some further exploration to see what minimal fixes would nail this issue, and the result of that is in #25. I note that Travis-CI reports a successful fix. |
Hi,
Just been experimenting with QAMQP, so far it's been a very easy introduction to AMQP on C++. The project I'm using AMQP for is a crude SCADA system for energy management. So we've got energy meters being polled (Modbus or BACnet) with data being collected and reported via AMQP messaging.
For testing purposes, I've done a dummy driver. It uses the
rand()
function out ofstdlib.h
to generate dummy readings. Message content uses YAML format.I have some existing code using Python (and the Pika library) to act as a client, and I'm implementing the server in C++ for speed. One thing I observe is that sometimes QAMQP seems to get messages mixed up, so one message's content gets appended to another unrelated message sent to the same queue. (In this case, both are for an energy meter named "B79_04_RT"; so sent to a fan-out exchange named "entity.rq.B79_04_RT".)
Example:
The code I'm using looks like this:
Somehow, a real-time read request got mixed up with a demand-add request. It could be a timing issue, my application is single-threaded however RabbitMQ is running on a dual-core industrial computer. The Python code inter-operates with other Python scripts without issues, so far this problem has been unique to QAMQP.
I'll see if I can trace what's going on a little more closely
The text was updated successfully, but these errors were encountered: