Skip to content

Commit

Permalink
introduce ALTERNATE_RESPONSE_PORT TLV (#430)
Browse files Browse the repository at this point in the history
Summary:

Introduce a new TLV as a POC to support switching port on the GM side. This is an important step towards the asymmetry compensation.
We use a `offset` flag which is uint16 (65536 combinations) which ensures consistency:
```
offset 0 = noop
offset 1 = switch to the next available port
offset 2 = switch to the next next available port
...
offset 65535 = you get the idea
```

For example we have 2 buildings with 2 paths between - short and long (ex 1km vs 3+km)
When forward and return paths are the same (short + short)  or (long + long) everything is good.
Suddenly something changed on the network and path rehashing results in short + long paths (or long + short for completeness).
At this moment we see a large path delay change to from say `(5us + 5us)/ 2 = 5us` to `(5us + 15us) / 2 = 10us`.
This is a problem which only gonna get worse with other DC types where 1km vs 10km paths are possible.
What this change is going to do is:
If we suspect the path change - we can ask GM to start sending packets from different port until we recover the path symmetry. We can't influence the exact port of the GM, but we can basically ask to switch to a next one (in test plan it's visible as `34488`->`38509`). This will be a TLV with `offset = 1`
If this doesn't help we can try jumping to a "next next" port (which in test plan seen as `38509->47977`) - this is `offset = 2`
Because GM doesn't keep this state (hello simple PTP) we now have to submit this counter every time to preserve the "shift". If we set it to 0 (or don't submit the TLV) we will get back to original port `34488`.

Randomly changing count value results in:
```
08:07:02.490110 IP6 client.ptp-event > server.ptp-event: PTPv18
08:07:02.490357 IP6 server.34488 > client.ptp-event: PTPv18
08:07:02.490434 IP6 server.47064 > client.ptp-general: PTPv18

08:07:03.490066 IP6 client.ptp-event > server.ptp-event: PTPv18
08:07:03.490379 IP6 server.38509 > client.ptp-event: PTPv18
08:07:03.490392 IP6 server.63604 > client.ptp-general: PTPv18

08:07:04.490507 IP6 client.ptp-event > server.ptp-event: PTPv18
08:07:04.490755 IP6 server.47977 > client.ptp-event: PTPv18
08:07:04.490836 IP6 server.49796 > client.ptp-general: PTPv18

08:07:05.491305 IP6 client.ptp-event > server.ptp-event: PTPv18
08:07:05.491541 IP6 server.34987 > client.ptp-event: PTPv18
08:07:05.491562 IP6 server.35270 > client.ptp-general: PTPv18
```

One can see new port usages in server -> client communication

Reviewed By: abulimov

Differential Revision: D66656872
  • Loading branch information
leoleovich authored and facebook-github-bot committed Dec 5, 2024
1 parent 7e26de0 commit 54d3f76
Show file tree
Hide file tree
Showing 15 changed files with 172 additions and 61 deletions.
5 changes: 2 additions & 3 deletions fbclock/daemon/logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"bytes"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -70,7 +69,7 @@ func TestLogSample_CSVRecords(t *testing.T) {
// make sure we are in sync with header
require.Equal(t, len(header), len(got))

assert.Equal(t, want, got)
require.Equal(t, want, got)
}

func TestCSVLogger_Log(t *testing.T) {
Expand All @@ -91,5 +90,5 @@ func TestCSVLogger_Log(t *testing.T) {
1.1,1.2,1.3,1.4,1.5,1.6,1.7,1.8,1.9,2,2.1,2.2,2.3,25.1
`

assert.Equal(t, want, got)
require.Equal(t, want, got)
}
15 changes: 7 additions & 8 deletions fbclock/daemon/math_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package daemon
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -53,7 +52,7 @@ func TestConvolve(t *testing.T) {
-1.67567686e+08,
}
require.Equal(t, len(want1), len(got1))
assert.InEpsilonSlice(t, want1, got1, 1e+10)
require.InEpsilonSlice(t, want1, got1, 1e+10)

coeffs2 := []float64{1, 3, -3, 1}
got2, err := convolve(input, coeffs2)
Expand All @@ -67,25 +66,25 @@ func TestConvolve(t *testing.T) {
-3.29784203e+08,
}
require.Equal(t, len(want2), len(got2))
assert.InEpsilonSlice(t, want2, got2, 1e+10)
require.InEpsilonSlice(t, want2, got2, 1e+10)
}

func TestMean(t *testing.T) {
input := []float64{3, 5, 8, 8}
want := 6.0
assert.Equal(t, want, mean(input))
require.Equal(t, want, mean(input))
input = []float64{1, 4, 0, 3, 8}
want = 3.2
assert.Equal(t, want, mean(input))
require.Equal(t, want, mean(input))
}

func TestVariance(t *testing.T) {
input := []float64{8, 8, 8, 8}
want := 0.0
assert.Equal(t, want, variance(input))
require.Equal(t, want, variance(input))
input = []float64{1, 4, 0, 3, 8}
want = 9.7
assert.Equal(t, want, variance(input))
require.Equal(t, want, variance(input))
}

func TestPrepareExpression(t *testing.T) {
Expand All @@ -103,7 +102,7 @@ func TestPrepareExpression(t *testing.T) {
want := 250.1909601786838
got, err := expr.Evaluate(parameters)
require.Nil(t, err)
assert.Equal(t, want, got)
require.Equal(t, want, got)
}

func TestPrepareExpressionWrongVari(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions ptp/linearizability/linearizability_ptp4l.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func reqDelay(clockID ptp.ClockIdentity, port uint16) *ptp.SyncDelayReq {
Header: ptp.Header{
SdoIDAndMsgType: ptp.NewSdoIDAndMsgType(ptp.MessageDelayReq, 0),
Version: ptp.Version,
SequenceID: 0, // will be populated on sending
MessageLength: uint16(binary.Size(ptp.SyncDelayReq{})),
SequenceID: 0, // will be populated on sending
MessageLength: uint16(binary.Size(ptp.Header{}) + binary.Size(ptp.SyncDelayReqBody{})), //#nosec G115
FlagField: ptp.FlagUnicast,
SourcePortIdentity: ptp.PortIdentity{
PortNumber: port,
Expand Down
27 changes: 18 additions & 9 deletions ptp/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ var (
PortGeneral = 320
)

// TrailingBytes - PTP over UDPv6 requires adding extra two bytes that
// may be modified by the initiator or an intermediate PTP Instance to ensure that the UDP checksum
// remains uncompromised after any modification of PTP fields.
// We simply always add them - in worst case they add extra 2 unused bytes when used over UDPv4.
const TrailingBytes = 2

var twoZeros = []byte{0, 0}

// MgmtLogMessageInterval is the default LogInterval value used in Management packets
const MgmtLogMessageInterval LogInterval = 0x7f // as per Table 42 Values of logMessageInterval field

Expand Down Expand Up @@ -242,6 +250,7 @@ type SyncDelayReqBody struct {
type SyncDelayReq struct {
Header
SyncDelayReqBody
TLVs []TLV
}

// MarshalBinaryTo marshals bytes to SyncDelayReq
Expand All @@ -252,12 +261,14 @@ func (p *SyncDelayReq) MarshalBinaryTo(b []byte) (int, error) {
n := headerMarshalBinaryTo(&p.Header, b)
copy(b[n:], p.OriginTimestamp.Seconds[:]) //uint48
binary.BigEndian.PutUint32(b[n+6:], p.OriginTimestamp.Nanoseconds)
return n + 10, nil
pos := n + 10
tlvLen, err := writeTLVs(p.TLVs, b[pos:])
return pos + tlvLen, err
}

// MarshalBinary converts packet to []bytes
func (p *SyncDelayReq) MarshalBinary() ([]byte, error) {
buf := make([]byte, 44)
buf := make([]byte, 50)
n, err := p.MarshalBinaryTo(buf)
return buf[:n], err
}
Expand All @@ -273,7 +284,11 @@ func (p *SyncDelayReq) UnmarshalBinary(b []byte) error {
}
copy(p.OriginTimestamp.Seconds[:], b[headerSize:]) //uint48
p.OriginTimestamp.Nanoseconds = binary.BigEndian.Uint32(b[headerSize+6:])
return nil

pos := headerSize + 10
var err error
p.TLVs, err = readTLVs(p.TLVs, int(p.MessageLength)-pos, b[pos:])
return err
}

// FollowUpBody Table 45 Follow_Up message fields
Expand Down Expand Up @@ -426,13 +441,7 @@ func BytesTo(p BinaryMarshalerTo, buf []byte) (int, error) {
return n + 2, nil
}

var twoZeros = []byte{0, 0}

// Bytes converts any packet to []bytes
// PTP over UDPv6 requires adding extra two bytes that
// may be modified by the initiator or an intermediate PTP Instance to ensure that the UDP checksum
// remains uncompromised after any modification of PTP fields.
// We simply always add them - in worst case they add extra 2 unused bytes when used over UDPv4.
func Bytes(p Packet) ([]byte, error) {
// interface smuggling
if pp, ok := p.(encoding.BinaryMarshaler); ok {
Expand Down
14 changes: 14 additions & 0 deletions ptp/protocol/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,20 @@ func FuzzDecodePacket(f *testing.F) {
return
}
}
case MessageSync, MessageDelayReq:
m := packet.(*SyncDelayReq)
// ignore Sync/DelayReq with GrantUnicastTransmissionTLV, TLVPathTrace or TLVAlternateTimeOffsetIndicator TLVs
for _, tlv := range m.TLVs {
if tlv.Type() == TLVGrantUnicastTransmission {
return
}
if tlv.Type() == TLVAlternateTimeOffsetIndicator {
return
}
if tlv.Type() == TLVPathTrace {
return
}
}
}
bb, err := Bytes(packet)
require.NoError(t, err)
Expand Down
42 changes: 40 additions & 2 deletions ptp/protocol/tlvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func writeTLVs(tlvs []TLV, b []byte) (int, error) {
return pos, nil
}

// readTLVs reads TLVs from the bytes.
// tlvs is passed to save on allocations and it's user's task to ensure it's empty
func readTLVs(tlvs []TLV, maxLength int, b []byte) ([]TLV, error) {
pos := 0
var tlvType TLVType
Expand All @@ -109,15 +111,13 @@ func readTLVs(tlvs []TLV, maxLength int, b []byte) ([]TLV, error) {
}
tlvs = append(tlvs, tlv)
pos += tlvHeadSize + int(tlv.LengthField)

case TLVGrantUnicastTransmission:
tlv := &GrantUnicastTransmissionTLV{}
if err := tlv.UnmarshalBinary(b[pos:]); err != nil {
return tlvs, err
}
tlvs = append(tlvs, tlv)
pos += tlvHeadSize + int(tlv.LengthField)

case TLVRequestUnicastTransmission:
tlv := &RequestUnicastTransmissionTLV{}
if err := tlv.UnmarshalBinary(b[pos:]); err != nil {
Expand Down Expand Up @@ -146,6 +146,13 @@ func readTLVs(tlvs []TLV, maxLength int, b []byte) ([]TLV, error) {
}
tlvs = append(tlvs, tlv)
pos += tlvHeadSize + int(tlv.LengthField)
case TLVAlternateResponsePort:
tlv := &AlternateResponsePortTLV{}
if err := tlv.UnmarshalBinary(b[pos:]); err != nil {
return tlvs, err
}
tlvs = append(tlvs, tlv)
pos += tlvHeadSize + int(tlv.LengthField)
default:
return tlvs, fmt.Errorf("reading TLV %s (%d) is not yet implemented", tlvType, tlvType)
}
Expand Down Expand Up @@ -365,3 +372,34 @@ func (t *AlternateTimeOffsetIndicatorTLV) UnmarshalBinary(b []byte) error {
}
return nil
}

// AlternateResponsePortTLV is a CSPTP optional TLV to switch response source port of the server
// Offset flag indicates the number of the port steps, not the port number itself.
// Ex:
// 0 means no switch (use default port). For example 1234
// 1 means next port. For example 4567
// 2 means next next port. For example 6789
// etc
type AlternateResponsePortTLV struct {
TLVHead
Offset uint16
}

// MarshalBinaryTo marshals bytes to AlternateResponsePortTLV
func (a *AlternateResponsePortTLV) MarshalBinaryTo(b []byte) (int, error) {
tlvHeadMarshalBinaryTo(&a.TLVHead, b)
binary.BigEndian.PutUint16(b[tlvHeadSize:], a.Offset)
return tlvHeadSize + 2, nil
}

// UnmarshalBinary parses []byte and populates struct fields
func (a *AlternateResponsePortTLV) UnmarshalBinary(b []byte) error {
if err := unmarshalTLVHeader(&a.TLVHead, b); err != nil {
return err
}
if err := checkTLVLength(&a.TLVHead, len(b), 2, true); err != nil {
return err
}
a.Offset = binary.BigEndian.Uint16(b[tlvHeadSize:])
return nil
}
45 changes: 40 additions & 5 deletions ptp/protocol/tlvs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -83,12 +82,12 @@ func TestParseAnnounceWithPathTrace(t *testing.T) {
require.Equal(t, want, *packet)
b, err := Bytes(packet)
require.Nil(t, err)
assert.Equal(t, raw, b)
require.Equal(t, raw, b)

// test generic DecodePacket as well
pp, err := DecodePacket(b)
require.Nil(t, err)
assert.Equal(t, &want, pp)
require.Equal(t, &want, pp)
}

func TestParseAnnounceWithAlternateTimeOffsetIndicator(t *testing.T) {
Expand Down Expand Up @@ -142,10 +141,46 @@ func TestParseAnnounceWithAlternateTimeOffsetIndicator(t *testing.T) {
require.Equal(t, want, *packet)
b, err := Bytes(packet)
require.Nil(t, err)
assert.Equal(t, raw, b)
require.Equal(t, raw, b)

// test generic DecodePacket as well
pp, err := DecodePacket(raw)
require.Nil(t, err)
assert.Equal(t, &want, pp)
require.Equal(t, &want, pp)
}

func TestParseSyncDelayReqWithAlternateResponsePort(t *testing.T) {
raw := []byte{1, 18, 0, 50, 0, 0, 36, 0, 0, 0, 0, 0, 6, 32, 0, 2, 0, 0, 0, 0, 184, 206, 246, 255, 254, 68, 148, 144, 0, 1, 149, 17, 0, 127, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32, 7, 0, 2, 16, 146, 0, 0}
packet := new(SyncDelayReq)
err := FromBytes(raw, packet)
require.Nil(t, err)

want := SyncDelayReq{
Header: Header{
SdoIDAndMsgType: NewSdoIDAndMsgType(MessageDelayReq, 0),
Version: Version,
SequenceID: 38161,
MessageLength: 50,
FlagField: FlagUnicast | FlagProfileSpecific1,
SourcePortIdentity: PortIdentity{
PortNumber: 1,
ClockIdentity: 13316852727524136080,
},
LogMessageInterval: 0x7f,
CorrectionField: 102760450,
},
TLVs: []TLV{&AlternateResponsePortTLV{
TLVHead: TLVHead{TLVType: TLVAlternateResponsePort, LengthField: uint16(2)},
Offset: uint16(4242),
}},
}
require.Equal(t, want, *packet)
b, err := Bytes(packet)
require.Nil(t, err)
require.Equal(t, raw, b)

// test generic DecodePacket as well
pp, err := DecodePacket(raw)
require.Nil(t, err)
require.Equal(t, &want, pp)
}
2 changes: 2 additions & 0 deletions ptp/protocol/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ const (
TLVAcknowledgeCancelUnicastTransmission TLVType = 0x0007
TLVPathTrace TLVType = 0x0008
TLVAlternateTimeOffsetIndicator TLVType = 0x0009
TLVAlternateResponsePort TLVType = 0x2007
// Remaining 52 tlvType TLVs not implemented
)

Expand All @@ -112,6 +113,7 @@ var TLVTypeToString = map[TLVType]string{
TLVAcknowledgeCancelUnicastTransmission: "ACKNOWLEDGE_CANCEL_UNICAST_TRANSMISSION",
TLVPathTrace: "PATH_TRACE",
TLVAlternateTimeOffsetIndicator: "ALTERNATE_TIME_OFFSET_INDICATOR",
TLVAlternateResponsePort: "ALTERNATE_RESPONSE_PORT",
}

func (t TLVType) String() string {
Expand Down
Loading

0 comments on commit 54d3f76

Please sign in to comment.