Skip to content

Commit

Permalink
Replace locking fieldmap with concurrent safe haxmap
Browse files Browse the repository at this point in the history
  • Loading branch information
Matej Spiller Muys committed Dec 23, 2024
1 parent 5185ff8 commit 22e752b
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 233 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
*.swp
*.swo
.idea
*.iml
vendor
_test/test
_test/echo_server
Expand Down
193 changes: 38 additions & 155 deletions field_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ package quickfix

import (
"bytes"
"sort"
"sync"
"slices"
"time"

"github.com/alphadose/haxmap"
)

// field stores a slice of TagValues.
Expand All @@ -40,46 +41,33 @@ func writeField(f field, buffer *bytes.Buffer) {
}

// tagOrder true if tag i should occur before tag j.
type tagOrder func(i, j Tag) bool

type tagSort struct {
tags []Tag
compare tagOrder
}

func (t tagSort) Len() int { return len(t.tags) }
func (t tagSort) Swap(i, j int) { t.tags[i], t.tags[j] = t.tags[j], t.tags[i] }
func (t tagSort) Less(i, j int) bool { return t.compare(t.tags[i], t.tags[j]) }
type tagOrder func(i, j Tag) int

// FieldMap is a collection of fix fields that make up a fix message.
type FieldMap struct {
tagLookup map[Tag]field
tagSort
rwLock *sync.RWMutex
tagLookup *haxmap.Map[Tag, field]
compare tagOrder
}

// ascending tags.
func normalFieldOrder(i, j Tag) bool { return i < j }
func normalFieldOrder(i, j Tag) int { return int(i - j) }

func (m *FieldMap) init() {
m.initWithOrdering(normalFieldOrder)
}

func (m *FieldMap) initWithOrdering(ordering tagOrder) {
m.rwLock = &sync.RWMutex{}
m.tagLookup = make(map[Tag]field)
m.tagLookup = haxmap.New[Tag, field]()
m.compare = ordering
}

// Tags returns all of the Field Tags in this FieldMap.
func (m FieldMap) Tags() []Tag {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

tags := make([]Tag, 0, len(m.tagLookup))
for t := range m.tagLookup {
tags = append(tags, t)
}
var tags []Tag
m.tagLookup.ForEach(func(tag Tag, _ field) bool {
tags = append(tags, tag)
return true
})

return tags
}
Expand All @@ -91,33 +79,13 @@ func (m FieldMap) Get(parser Field) MessageRejectError {

// Has returns true if the Tag is present in this FieldMap.
func (m FieldMap) Has(tag Tag) bool {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

_, ok := m.tagLookup[tag]
_, ok := m.tagLookup.Get(tag)
return ok
}

// GetField parses of a field with Tag tag. Returned reject may indicate the field is not present, or the field value is invalid.
func (m FieldMap) GetField(tag Tag, parser FieldValueReader) MessageRejectError {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

f, ok := m.tagLookup[tag]
if !ok {
return ConditionallyRequiredFieldMissing(tag)
}

if err := parser.Read(f[0].value); err != nil {
return IncorrectDataFormatForValue(tag)
}

return nil
}

// GetField parses of a field with Tag tag. Returned reject may indicate the field is not present, or the field value is invalid.
func (m FieldMap) getFieldNoLock(tag Tag, parser FieldValueReader) MessageRejectError {
f, ok := m.tagLookup[tag]
f, ok := m.tagLookup.Get(tag)
if !ok {
return ConditionallyRequiredFieldMissing(tag)
}
Expand All @@ -131,20 +99,7 @@ func (m FieldMap) getFieldNoLock(tag Tag, parser FieldValueReader) MessageReject

// GetBytes is a zero-copy GetField wrapper for []bytes fields.
func (m FieldMap) GetBytes(tag Tag) ([]byte, MessageRejectError) {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

f, ok := m.tagLookup[tag]
if !ok {
return nil, ConditionallyRequiredFieldMissing(tag)
}

return f[0].value, nil
}

// getBytesNoLock is a lock free zero-copy GetField wrapper for []bytes fields.
func (m FieldMap) getBytesNoLock(tag Tag) ([]byte, MessageRejectError) {
f, ok := m.tagLookup[tag]
f, ok := m.tagLookup.Get(tag)
if !ok {
return nil, ConditionallyRequiredFieldMissing(tag)
}
Expand Down Expand Up @@ -176,26 +131,8 @@ func (m FieldMap) GetInt(tag Tag) (int, MessageRejectError) {
return int(val), err
}

// GetInt is a lock free GetField wrapper for int fields.
func (m FieldMap) getIntNoLock(tag Tag) (int, MessageRejectError) {
bytes, err := m.getBytesNoLock(tag)
if err != nil {
return 0, err
}

var val FIXInt
if val.Read(bytes) != nil {
err = IncorrectDataFormatForValue(tag)
}

return int(val), err
}

// GetTime is a GetField wrapper for utc timestamp fields.
func (m FieldMap) GetTime(tag Tag) (t time.Time, err MessageRejectError) {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

bytes, err := m.GetBytes(tag)
if err != nil {
return
Expand All @@ -218,21 +155,9 @@ func (m FieldMap) GetString(tag Tag) (string, MessageRejectError) {
return string(val), nil
}

// GetString is a GetField wrapper for string fields.
func (m FieldMap) getStringNoLock(tag Tag) (string, MessageRejectError) {
var val FIXString
if err := m.getFieldNoLock(tag, &val); err != nil {
return "", err
}
return string(val), nil
}

// GetGroup is a Get function specific to Group Fields.
func (m FieldMap) GetGroup(parser FieldGroupReader) MessageRejectError {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

f, ok := m.tagLookup[parser.Tag()]
f, ok := m.tagLookup.Get(parser.Tag())
if !ok {
return ConditionallyRequiredFieldMissing(parser.Tag())
}
Expand Down Expand Up @@ -277,67 +202,38 @@ func (m *FieldMap) SetString(tag Tag, value string) *FieldMap {

// Remove removes a tag from field map.
func (m *FieldMap) Remove(tag Tag) {
m.rwLock.Lock()
defer m.rwLock.Unlock()

delete(m.tagLookup, tag)
m.tagLookup.Del(tag)
}

// Clear purges all fields from field map.
func (m *FieldMap) Clear() {
m.rwLock.Lock()
defer m.rwLock.Unlock()

m.tags = m.tags[0:0]
for k := range m.tagLookup {
delete(m.tagLookup, k)
}
}

func (m *FieldMap) clearNoLock() {
m.tags = m.tags[0:0]
for k := range m.tagLookup {
delete(m.tagLookup, k)
}
m.tagLookup.Clear()
}

// CopyInto overwrites the given FieldMap with this one.
func (m *FieldMap) CopyInto(to *FieldMap) {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

to.tagLookup = make(map[Tag]field)
for tag, f := range m.tagLookup {
to.tagLookup = haxmap.New[Tag, field]()
m.tagLookup.ForEach(func(tag Tag, f field) bool {
clone := make(field, 1)
clone[0] = f[0]
to.tagLookup[tag] = clone
}
to.tags = make([]Tag, len(m.tags))
copy(to.tags, m.tags)
to.tagLookup.Set(tag, clone)
return true
})
to.compare = m.compare
}

func (m *FieldMap) add(f field) {
t := fieldTag(f)
if _, ok := m.tagLookup[t]; !ok {
m.tags = append(m.tags, t)
}

m.tagLookup[t] = f
m.tagLookup.Set(fieldTag(f), f)
}

func (m *FieldMap) getOrCreate(tag Tag) field {
m.rwLock.Lock()
defer m.rwLock.Unlock()

if f, ok := m.tagLookup[tag]; ok {
if f, ok := m.tagLookup.Get(tag); ok {
f = f[:1]
return f
}

f := make(field, 1)
m.tagLookup[tag] = f
m.tags = append(m.tags, tag)
m.tagLookup.Set(tag, f)
return f
}

Expand All @@ -350,65 +246,52 @@ func (m *FieldMap) Set(field FieldWriter) *FieldMap {

// SetGroup is a setter specific to group fields.
func (m *FieldMap) SetGroup(field FieldGroupWriter) *FieldMap {
m.rwLock.Lock()
defer m.rwLock.Unlock()

_, ok := m.tagLookup[field.Tag()]
if !ok {
m.tags = append(m.tags, field.Tag())
}
m.tagLookup[field.Tag()] = field.Write()
m.tagLookup.Set(field.Tag(), field.Write())
return m
}

func (m *FieldMap) sortedTags() []Tag {
sort.Sort(m)
return m.tags
tags := m.Tags()
slices.SortFunc(tags, m.compare)
return tags
}

func (m FieldMap) write(buffer *bytes.Buffer) {
m.rwLock.Lock()
defer m.rwLock.Unlock()

for _, tag := range m.sortedTags() {
if f, ok := m.tagLookup[tag]; ok {
if f, ok := m.tagLookup.Get(tag); ok {
writeField(f, buffer)
}
}
}

func (m FieldMap) total() int {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

total := 0
for _, fields := range m.tagLookup {
m.tagLookup.ForEach(func(_ Tag, fields field) bool {
for _, tv := range fields {
switch tv.tag {
case tagCheckSum: // Tag does not contribute to total.
default:
total += tv.total()
}
}
}
return true
})

return total
}

func (m FieldMap) length() int {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

length := 0
for _, fields := range m.tagLookup {
m.tagLookup.ForEach(func(_ Tag, fields field) bool {
for _, tv := range fields {
switch tv.tag {
case tagBeginString, tagBodyLength, tagCheckSum: // Tags do not contribute to length.
default:
length += tv.length()
}
}
}
return true
})

return length
}
20 changes: 11 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,32 @@ module github.com/quickfixgo/quickfix
go 1.21

require (
github.com/alphadose/haxmap v1.4.1
github.com/mattn/go-sqlite3 v1.14.22
github.com/pires/go-proxyproto v0.7.0
github.com/pkg/errors v0.9.1
github.com/shopspring/decimal v1.4.0
github.com/stretchr/testify v1.8.4
go.mongodb.org/mongo-driver v1.15.0
golang.org/x/net v0.24.0
github.com/stretchr/testify v1.10.0
go.mongodb.org/mongo-driver v1.17.1
golang.org/x/net v0.33.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/klauspost/compress v1.15.12 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/montanaflynn/stats v0.6.6 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/text v0.14.0 // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/exp v0.0.0-20221031165847-c99f073a8326 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/text v0.21.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 22e752b

Please sign in to comment.