Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server is not making qos 1/2 assurance when subscribed client not response puback #439

Open
cxshun opened this issue Nov 25, 2024 · 3 comments
Labels
discussion Something to be discussed

Comments

@cxshun
Copy link

cxshun commented Nov 25, 2024

Background

We are testing qos assurance, but found that mochi-mqtt is not assure qos 1message's delivery.
But I walk through mochi-mqtt's readme. It says it implements Full qos 1/2.

Our testcase demonstrate like below:

  1. we create a mqtt client to subscribe specify topic like /testTopic, but not response puback packet.
  2. we publish mqtt message to the topic /testTopic with another client.
    So with this testcase, we want this message to resend interval.

Code walk through

I'm not sure if I was making the right conclusion.
Here I walk through the code, but not finding anything about qos =1/2's retrying.
Below is the code that I was walked through, Please feel free If I was missing something.

  1. publishToClient - github.com/mochi-mqtt/server/v2/server.go
select {
	case cl.State.outbound <- &out:
		atomic.AddInt32(&cl.State.outboundQty, 1)
	default:
		atomic.AddInt64(&s.Info.MessagesDropped, 1)
		cl.ops.hooks.OnPublishDropped(cl, pk)
		if out.FixedHeader.Qos > 0 {
			cl.State.Inflight.Delete(out.PacketID) // packet was dropped due to irregular circumstances, so rollback inflight.
			cl.State.Inflight.IncreaseSendQuota()
		}
		return out, packets.ErrPendingClientWritesExceeded
	}
  1. WriteLoop - github.com/mochi-mqtt/server/v2/clients.go
// WriteLoop ranges over pending outbound messages and writes them to the client connection.
func (cl *Client) WriteLoop() {
	for {
		select {
		case pk := <-cl.State.outbound:
			if err := cl.WritePacket(*pk); err != nil {
				// TODO : Figure out what to do with error
				cl.ops.log.Debug("failed publishing packet", "error", err, "client", cl.ID, "packet", pk)
			}
			atomic.AddInt32(&cl.State.outboundQty, -1)
		case <-cl.State.open.Done():
			return
		}
	}
}

I found that after WritePacket, if it didn't encouter any error, It will treat the message as publish success.

Expect

When subscribed client didn't response puback, server must resend message to ensure qos =1 or 2.

Please feel free to tell me that if I was missing something.

@cxshun cxshun changed the title server is not making qos 1 assurance when subscribed client not response puback server is not making qos 1/2 assurance when subscribed client not response puback Nov 25, 2024
@thedevop
Copy link
Collaborator

thedevop commented Nov 27, 2024

Given MQTT is based on TCP, with well behaved (conforming to MQTT spec) client and server, only when there is a break in the TCP connection may result in server not receive PubAck. Hence, the un-acked messages are only resent when the client reconnects to the server.

server/server.go

Lines 467 to 468 in 8f52b89

if sessionPresent {
err = cl.ResendInflightMessages(true)

In addition, with MQTT, there is Publisher, Broker and Subscriber:

Publisher ---> Broker ---> Subscriber

Each segment has its own respective QoS. For example, the publish QoS only dictates the delivery guarantee for Publisher to the Broker portion. Subscriber defines its QoS in the subscription message. Generally you can consider the end-to-end QoS is the min(Publisher QoS, Subscriber QoS), assuming the server allows up to QoS 2.

Given the Publish API is to "simulate" a Publisher publishing to a topic, it returns when the Broker can accept the message.

@thedevop thedevop added the discussion Something to be discussed label Dec 8, 2024
@thedevop
Copy link
Collaborator

thedevop commented Dec 8, 2024

@cxshun, does the above resolve your concern?

@cxshun
Copy link
Author

cxshun commented Dec 22, 2024

Thanks @thedevop for your reply, sorry to reply such late.
May be we are thinking two different situation.

  1. What you are talking about is about tcp disconnect then reconnect, it's already handle by mochi-mqtt.
  2. But I'm talking about is the situation like this:
    Our device may enter some place that has weak signal(may be remote mountains etc), so when it receive publish packet, then it's unable to respond normal puback, or it can not received publish packet in the worst case.

We know tcp disconnect only happen when keepalive timeout or active disconnect, so in my above situation, may be device is in its keepalive period, so it will not reconnect, so the message may not deliver again in its connection.

So the code segment that you post:

 if sessionPresent { 
 	err = cl.ResendInflightMessages(true) 

will not be able to trigger.

We try to check EMQX, It seems to implement this assurance.

Look forward to more dicussion!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Something to be discussed
Projects
None yet
Development

No branches or pull requests

2 participants