Skip to content

Commit

Permalink
Added sorted.TimeSeries (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar authored Aug 3, 2021
1 parent 97ffeba commit 16852db
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 0 deletions.
115 changes: 115 additions & 0 deletions sorted/timeseries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.

package sorted

import (
bin "encoding/binary"
"math"
"math/bits"
"reflect"
"sort"

"github.com/kelindar/binary"
)

// ------------------------------------------------------------------------------

type tszCodec struct{}

// EncodeTo encodes a value into the encoder.
func (tszCodec) EncodeTo(e *binary.Encoder, rv reflect.Value) (err error) {
data := rv.Interface().(TimeSeries)
if !sort.IsSorted(&data) {
sort.Sort(&data)
}

buffer := make([]byte, 0, 4*len(data.Time))

// Write the timestamps into the buffer
prev := uint64(0)
for _, curr := range data.Time {
diff := curr - prev
prev = curr
buffer = appendDelta(buffer, diff)
}

// Write the values into the buffer
prev = uint64(0)
for _, v := range data.Data {
curr := uint64(bits.Reverse32(math.Float32bits(float32(v))))
diff := curr ^ prev
prev = curr
buffer = appendDelta(buffer, diff)
}

// Writhe the size and the buffer
e.WriteUvarint(uint64(len(data.Time)))
e.WriteUvarint(uint64(len(buffer)))
e.Write(buffer)
return
}

// DecodeTo decodes into a reflect value from the decoder.
func (tszCodec) DecodeTo(d *binary.Decoder, rv reflect.Value) error {

// Read the number of timestamps
count, err := d.ReadUvarint()
if err != nil {
return err
}

// Read the size in bytes
size, err := d.ReadUvarint()
if err != nil {
return err
}

// Read the timestamp buffer
buffer, err := d.Slice(int(size))
if err != nil {
return err
}

// Read the timestamps
result := TimeSeries{
Time: make([]uint64, count),
Data: make([]float64, count),
}

// Current offset
offset := 0

// Read encoded timestamps
prev := uint64(0)
for i := 0; i < int(count); i++ {
diff, n := bin.Uvarint(buffer[offset:])
prev = prev + diff
result.Time[i] = prev
offset += n
}

d.ReadUvarint()

// Read encoded values
prev = uint64(0)
for i := 0; i < int(count); i++ {
diff, n := bin.Uvarint(buffer[offset:])
prev = prev ^ diff
result.Data[i] = float64(math.Float32frombits(bits.Reverse32(uint32(prev))))
offset += n
}

rv.Set(reflect.ValueOf(result))
return nil
}

// appendDelta appends a delta into the buffer
func appendDelta(buffer []byte, delta uint64) []byte {
for delta >= 0x80 {
buffer = append(buffer, byte(delta)|0x80)
delta >>= 7
}

return append(buffer, byte(delta))
}
33 changes: 33 additions & 0 deletions sorted/timeseries_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.

package sorted

import (
"testing"

"github.com/kelindar/binary"
"github.com/stretchr/testify/assert"
)

func TestTimeSeries(t *testing.T) {

// Marshal
ts := makeTimeSeries(100)
b, err := binary.Marshal(ts)
assert.NoError(t, err)
assert.Equal(t, 341, len(b)) // Consider compressing using snappy after

// Unmarshal
var out TimeSeries
assert.NoError(t, binary.Unmarshal(b, &out))
assert.Equal(t, 100, len(out.Data))
}

func makeTimeSeries(count int) *TimeSeries {
var ts TimeSeries
for i := count - 1; i >= 0; i-- {
ts.Append(uint64(1500000000+i), float64(i))
}
return &ts
}
39 changes: 39 additions & 0 deletions sorted/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,42 @@ type Timestamps []uint64
func (ts *Timestamps) GetBinaryCodec() binary.Codec {
return timestampCodec{}
}

// ------------------------------------------------------------------------------

// TimeSeries represents a compressed time-series data. The implementation is based
// on Gorilla paper (https://www.vldb.org/pvldb/vol8/p1816-teller.pdf), but instead
// of bit-weaving it is byte-aligned. If you are using this, consider using snappy
// compression on the output, as it will give a significantly better compression than
// simply marshaling the time-series using this binary encoder.
type TimeSeries struct {
Time []uint64 // Sorted timestamps compressed using delta-encoding
Data []float64 // Corresponding float-64 values
}

// Append appends a new value into the time series.
func (ts *TimeSeries) Append(time uint64, value float64) {
ts.Time = append(ts.Time, time)
ts.Data = append(ts.Data, value)
}

// Len returns the length of the time-series
func (ts *TimeSeries) Len() int {
return len(ts.Time)
}

// Less compares two elements of the time series
func (ts *TimeSeries) Less(i, j int) bool {
return ts.Time[i] < ts.Time[j]
}

// Swap swaps two elements of the time series
func (ts *TimeSeries) Swap(i, j int) {
ts.Time[i], ts.Time[j] = ts.Time[j], ts.Time[i]
ts.Data[i], ts.Data[j] = ts.Data[j], ts.Data[i]
}

// GetBinaryCodec retrieves a custom binary codec.
func (ts *TimeSeries) GetBinaryCodec() binary.Codec {
return tszCodec{}
}

0 comments on commit 16852db

Please sign in to comment.