diff --git a/packets/tpackets.go b/packets/tpackets.go index bc00943a..081ed6c2 100644 --- a/packets/tpackets.go +++ b/packets/tpackets.go @@ -1824,13 +1824,10 @@ var TPacketData = map[byte]TPacketCases{ Case: TPublishRetainMqtt5, Desc: "retain mqtt5", RawBytes: []byte{ - Publish<<4 | 1<<0, 35, // Fixed header + Publish<<4 | 1<<0, 19, // Fixed header 0, 5, // Topic Name - LSB+MSB 'a', '/', 'b', '/', 'c', // Topic Name - 16, // properties length - 38, // User Properties (38) - 0, 5, 'h', 'e', 'l', 'l', 'o', - 0, 6, 228, 184, 150, 231, 149, 140, + 0, // properties length 'h', 'e', 'l', 'l', 'o', ' ', 'm', 'o', 'c', 'h', 'i', // Payload }, Packet: &Packet{ @@ -1838,18 +1835,11 @@ var TPacketData = map[byte]TPacketCases{ FixedHeader: FixedHeader{ Type: Publish, Retain: true, - Remaining: 35, + Remaining: 19, }, - TopicName: "a/b/c", - Properties: Properties{ - User: []UserProperty{ - { - Key: "hello", - Val: "世界", - }, - }, - }, - Payload: []byte("hello mochi"), + TopicName: "a/b/c", + Properties: Properties{}, + Payload: []byte("hello mochi"), }, }, { diff --git a/server.go b/server.go index 1d218890..d828a26c 100644 --- a/server.go +++ b/server.go @@ -26,7 +26,7 @@ import ( ) const ( - Version = "2.1.4" // the current server version. + Version = "2.1.5" // the current server version. defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes defaultFanPoolSize uint64 = 32 // the number of concurrent workers in the pool defaultFanPoolQueueSize uint64 = 1024 // the capacity of each worker queue @@ -61,13 +61,13 @@ type Capabilities struct { ReceiveMaximum uint16 TopicAliasMaximum uint16 ServerKeepAlive uint16 + SharedSubAvailable byte + MinimumProtocolVersion byte Compatibilities Compatibilities MaximumQos byte RetainAvailable byte WildcardSubAvailable byte SubIDAvailable byte - SharedSubAvailable byte - MinimumProtocolVersion byte } // Compatibilities provides flags for using compatibility modes. @@ -785,7 +785,7 @@ func (s *Server) publishToClient(cl *Client, sub packets.Subscription, pk packet } out := pk.Copy(false) - if !sub.RetainAsPublished { // ![MQTT-3.3.1-13] + if cl.Properties.ProtocolVersion == 5 && !sub.RetainAsPublished { // ![MQTT-3.3.1-13] out.FixedHeader.Retain = false // [MQTT-3.3.1-12] } diff --git a/server_test.go b/server_test.go index d1f2c465..8a9f4175 100644 --- a/server_test.go +++ b/server_test.go @@ -1023,7 +1023,7 @@ func TestServerProcessPacketPublishAndReceive(t *testing.T) { w2.Close() }() - require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).RawBytes, <-receiverBuf) + require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).RawBytes, <-receiverBuf) require.Equal(t, 1, len(s.Topics.Messages("a/b/c"))) } @@ -1537,7 +1537,7 @@ func TestPublishRetainedToClient(t *testing.T) { subbed := s.Topics.Subscribe(cl.ID, packets.Subscription{Filter: "a/b/c", Qos: 2}) require.True(t, subbed) - retained := s.Topics.RetainMessage(*packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).Packet) + retained := s.Topics.RetainMessage(*packets.TPacketData[packets.Publish].Get(packets.TPublishRetainMqtt5).Packet) require.Equal(t, int64(1), retained) go func() { @@ -1548,7 +1548,7 @@ func TestPublishRetainedToClient(t *testing.T) { buf, err := io.ReadAll(r) require.NoError(t, err) - require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).RawBytes, buf) + require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).RawBytes, buf) } func TestPublishRetainedToClientIsShared(t *testing.T) { @@ -2064,7 +2064,7 @@ func TestServerProcessSubscribeWithRetain(t *testing.T) { require.NoError(t, err) require.Equal(t, append( packets.TPacketData[packets.Suback].Get(packets.TSuback).RawBytes, - packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).RawBytes..., + packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).RawBytes..., ), buf) } @@ -2452,7 +2452,7 @@ func TestServerSendLWTDelayed(t *testing.T) { recv <- buf }() - require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).RawBytes, <-recv) + require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).RawBytes, <-recv) } func TestServerReadStore(t *testing.T) {