diff --git a/examples/embedded2x/README.md b/examples/embedded2x/README.md new file mode 100644 index 0000000..84b53f8 --- /dev/null +++ b/examples/embedded2x/README.md @@ -0,0 +1,22 @@ +# Parquetgen example + +To generate the code needed to run this example: + + cd cmd/parquetgen + go get ./... + go install + cd ../../examples/people + go generate + +Go generate calls (see the top of main.go for the go:generate command): + + //parquetgen -input main.go -type Person -package main + +which produces a file called parquet.go. Now run: + + go run . + +You should now have a parquet file that encodes a Person struct. To +read the values back run: + + go run . -read people.parquet diff --git a/examples/embedded2x/main.go b/examples/embedded2x/main.go new file mode 100644 index 0000000..b1407f0 --- /dev/null +++ b/examples/embedded2x/main.go @@ -0,0 +1,72 @@ +package main + +//go:generate parquetgen -input main.go -type C -package main + +import ( + "encoding/json" + "log" + "os" +) + +func main() { + f, err := os.Create("parquet") + if err != nil { + log.Fatal(err) + } + + defer f.Close() + + w, err := NewParquetWriter(f, MaxPageSize(2)) + if err != nil { + log.Fatal(err) + } + + w.Add(C{B: B{Name: "a", A: A{ID: 1}}}) + w.Add(C{B: B{Name: "b", A: A{ID: 2}}}) + w.Add(C{B: B{Name: "c", A: A{ID: 3}}}) + + if err := w.Write(); err != nil { + log.Fatal(err) + } + + if err := w.Close(); err != nil { + log.Fatal(err) + } + + f2, err := os.Open("parquet") + if err != nil { + log.Fatal(err) + } + defer f2.Close() + + r, err := NewParquetReader(f2) + if err != nil { + log.Fatal(err) + } + + enc := json.NewEncoder(os.Stdout) + for r.Next() { + var c C + r.Scan(&c) + enc.Encode(c) + } + + if err := r.Error(); err != nil { + log.Fatal(err) + } +} + +// Being is split out only to show how embedded structs +// are handled. +type A struct { + ID int32 `parquet:"id"` +} + +type B struct { + A + Name string `parquet:"name"` +} + +type C struct { + B +} diff --git a/examples/embedded2x/parquet b/examples/embedded2x/parquet new file mode 100644 index 0000000..d4e9840 Binary files /dev/null and b/examples/embedded2x/parquet differ diff --git a/examples/embedded2x/parquet.go b/examples/embedded2x/parquet.go new file mode 100644 index 0000000..a214f72 --- /dev/null +++ b/examples/embedded2x/parquet.go @@ -0,0 +1,634 @@ +package main + +// This code is generated by github.com/parsyl/parquet. + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "strings" + + "github.com/parsyl/parquet" + + "math" + "sort" +) + +type compression int + +const ( + compressionUncompressed compression = 0 + compressionSnappy compression = 1 + compressionUnknown compression = -1 +) + +// ParquetWriter reprents a row group +type ParquetWriter struct { + fields []Field + + len int + + // child points to the next page + child *ParquetWriter + + // max is the number of Record items that can get written before + // a new set of column chunks is written + max int + + meta *parquet.Metadata + w io.Writer + compression compression +} + +func Fields(compression compression) []Field { + return []Field{ + NewInt32Field(readID, writeID, []string{"id"}, fieldCompression(compression)), + NewStringField(readName, writeName, []string{"name"}, fieldCompression(compression)), + } +} + +func readID(x C) int32 { + return x.ID +} + +func writeID(x *C, vals []int32) { + x.ID = vals[0] +} + +func readName(x C) string { + return x.Name +} + +func writeName(x *C, vals []string) { + x.Name = vals[0] +} + +func fieldCompression(c compression) func(*parquet.RequiredField) { + switch c { + case compressionUncompressed: + return parquet.RequiredFieldUncompressed + case compressionSnappy: + return parquet.RequiredFieldSnappy + default: + return parquet.RequiredFieldUncompressed + } +} + +func optionalFieldCompression(c compression) func(*parquet.OptionalField) { + switch c { + case compressionUncompressed: + return parquet.OptionalFieldUncompressed + case compressionSnappy: + return parquet.OptionalFieldSnappy + default: + return parquet.OptionalFieldUncompressed + } +} + +func NewParquetWriter(w io.Writer, opts ...func(*ParquetWriter) error) (*ParquetWriter, error) { + return newParquetWriter(w, append(opts, begin)...) +} + +func newParquetWriter(w io.Writer, opts ...func(*ParquetWriter) error) (*ParquetWriter, error) { + p := &ParquetWriter{ + max: 1000, + w: w, + compression: compressionSnappy, + } + + for _, opt := range opts { + if err := opt(p); err != nil { + return nil, err + } + } + + p.fields = Fields(p.compression) + if p.meta == nil { + ff := Fields(p.compression) + schema := make([]parquet.Field, len(ff)) + for i, f := range ff { + schema[i] = f.Schema() + } + p.meta = parquet.New(schema...) + } + + return p, nil +} + +// MaxPageSize is the maximum number of rows in each row groups' page. +func MaxPageSize(m int) func(*ParquetWriter) error { + return func(p *ParquetWriter) error { + p.max = m + return nil + } +} + +func begin(p *ParquetWriter) error { + _, err := p.w.Write([]byte("PAR1")) + return err +} + +func withMeta(m *parquet.Metadata) func(*ParquetWriter) error { + return func(p *ParquetWriter) error { + p.meta = m + return nil + } +} + +func Uncompressed(p *ParquetWriter) error { + p.compression = compressionUncompressed + return nil +} + +func Snappy(p *ParquetWriter) error { + p.compression = compressionSnappy + return nil +} + +func withCompression(c compression) func(*ParquetWriter) error { + return func(p *ParquetWriter) error { + p.compression = c + return nil + } +} + +func (p *ParquetWriter) Write() error { + for i, f := range p.fields { + if err := f.Write(p.w, p.meta); err != nil { + return err + } + + for child := p.child; child != nil; child = child.child { + if err := child.fields[i].Write(p.w, p.meta); err != nil { + return err + } + } + } + + p.fields = Fields(p.compression) + p.child = nil + p.len = 0 + + schema := make([]parquet.Field, len(p.fields)) + for i, f := range p.fields { + schema[i] = f.Schema() + } + p.meta.StartRowGroup(schema...) + return nil +} + +func (p *ParquetWriter) Close() error { + if err := p.meta.Footer(p.w); err != nil { + return err + } + + _, err := p.w.Write([]byte("PAR1")) + return err +} + +func (p *ParquetWriter) Add(rec C) { + if p.len == p.max { + if p.child == nil { + // an error can't happen here + p.child, _ = newParquetWriter(p.w, MaxPageSize(p.max), withMeta(p.meta), withCompression(p.compression)) + } + + p.child.Add(rec) + return + } + + p.meta.NextDoc() + for _, f := range p.fields { + f.Add(rec) + } + + p.len++ +} + +type Field interface { + Add(r C) + Write(w io.Writer, meta *parquet.Metadata) error + Schema() parquet.Field + Scan(r *C) + Read(r io.ReadSeeker, pg parquet.Page) error + Name() string + Key() string + Levels() ([]uint8, []uint8) +} + +func getFields(ff []Field) map[string]Field { + m := make(map[string]Field, len(ff)) + for _, f := range ff { + m[f.Key()] = f + } + return m +} + +func NewParquetReader(r io.ReadSeeker, opts ...func(*ParquetReader)) (*ParquetReader, error) { + ff := Fields(compressionUnknown) + pr := &ParquetReader{ + r: r, + } + + for _, opt := range opts { + opt(pr) + } + + schema := make([]parquet.Field, len(ff)) + for i, f := range ff { + pr.fieldNames = append(pr.fieldNames, f.Name()) + schema[i] = f.Schema() + } + + meta := parquet.New(schema...) + if err := meta.ReadFooter(r); err != nil { + return nil, err + } + pr.rows = meta.Rows() + var err error + pr.pages, err = meta.Pages() + if err != nil { + return nil, err + } + + pr.rowGroups = meta.RowGroups() + _, err = r.Seek(4, io.SeekStart) + if err != nil { + return nil, err + } + pr.meta = meta + + return pr, pr.readRowGroup() +} + +func readerIndex(i int) func(*ParquetReader) { + return func(p *ParquetReader) { + p.index = i + } +} + +// ParquetReader reads one page from a row group. +type ParquetReader struct { + fields map[string]Field + fieldNames []string + index int + cursor int64 + rows int64 + rowGroupCursor int64 + rowGroupCount int64 + pages map[string][]parquet.Page + meta *parquet.Metadata + err error + + r io.ReadSeeker + rowGroups []parquet.RowGroup +} + +type Levels struct { + Name string + Defs []uint8 + Reps []uint8 +} + +func (p *ParquetReader) Levels() []Levels { + var out []Levels + //for { + for _, name := range p.fieldNames { + f := p.fields[name] + d, r := f.Levels() + out = append(out, Levels{Name: f.Name(), Defs: d, Reps: r}) + } + // if err := p.readRowGroup(); err != nil { + // break + // } + //} + return out +} + +func (p *ParquetReader) Error() error { + return p.err +} + +func (p *ParquetReader) readRowGroup() error { + p.rowGroupCursor = 0 + + if len(p.rowGroups) == 0 { + p.rowGroupCount = 0 + return nil + } + + rg := p.rowGroups[0] + p.fields = getFields(Fields(compressionUnknown)) + p.rowGroupCount = rg.Rows + p.rowGroupCursor = 0 + for _, col := range rg.Columns() { + name := strings.ToLower(strings.Join(col.MetaData.PathInSchema, ".")) + f, ok := p.fields[name] + if !ok { + return fmt.Errorf("unknown field: %s", name) + } + pages := p.pages[name] + if len(pages) <= p.index { + break + } + + pg := pages[0] + if err := f.Read(p.r, pg); err != nil { + return fmt.Errorf("unable to read field %s, err: %s", f.Name(), err) + } + p.pages[name] = p.pages[name][1:] + } + p.rowGroups = p.rowGroups[1:] + return nil +} + +func (p *ParquetReader) Rows() int64 { + return p.rows +} + +func (p *ParquetReader) Next() bool { + if p.err == nil && p.cursor >= p.rows { + return false + } + if p.rowGroupCursor >= p.rowGroupCount { + p.err = p.readRowGroup() + if p.err != nil { + return false + } + } + + p.cursor++ + p.rowGroupCursor++ + return true +} + +func (p *ParquetReader) Scan(x *C) { + if p.err != nil { + return + } + + for _, name := range p.fieldNames { + f := p.fields[name] + f.Scan(x) + } +} + +type Int32Field struct { + vals []int32 + parquet.RequiredField + read func(r C) int32 + write func(r *C, vals []int32) + stats *int32stats +} + +func NewInt32Field(read func(r C) int32, write func(r *C, vals []int32), path []string, opts ...func(*parquet.RequiredField)) *Int32Field { + return &Int32Field{ + read: read, + write: write, + RequiredField: parquet.NewRequiredField(path, opts...), + stats: newInt32stats(), + } +} + +func (f *Int32Field) Schema() parquet.Field { + return parquet.Field{Name: f.Name(), Path: f.Path(), Type: parquet.Int32Type, RepetitionType: parquet.RepetitionRequired, Types: []int{0}} +} + +func (f *Int32Field) Read(r io.ReadSeeker, pg parquet.Page) error { + rr, _, err := f.DoRead(r, pg) + if err != nil { + return err + } + + v := make([]int32, int(pg.N)) + err = binary.Read(rr, binary.LittleEndian, &v) + f.vals = append(f.vals, v...) + return err +} + +func (f *Int32Field) Write(w io.Writer, meta *parquet.Metadata) error { + var buf bytes.Buffer + for _, v := range f.vals { + if err := binary.Write(&buf, binary.LittleEndian, v); err != nil { + return err + } + } + return f.DoWrite(w, meta, buf.Bytes(), len(f.vals), f.stats) +} + +func (f *Int32Field) Scan(r *C) { + if len(f.vals) == 0 { + return + } + + f.write(r, f.vals) + f.vals = f.vals[1:] +} + +func (f *Int32Field) Add(r C) { + v := f.read(r) + f.stats.add(v) + f.vals = append(f.vals, v) +} + +func (f *Int32Field) Levels() ([]uint8, []uint8) { + return nil, nil +} + +type StringField struct { + parquet.RequiredField + vals []string + read func(r C) string + write func(r *C, vals []string) + stats *stringStats +} + +func NewStringField(read func(r C) string, write func(r *C, vals []string), path []string, opts ...func(*parquet.RequiredField)) *StringField { + return &StringField{ + read: read, + write: write, + RequiredField: parquet.NewRequiredField(path, opts...), + stats: newStringStats(), + } +} + +func (f *StringField) Schema() parquet.Field { + return parquet.Field{Name: f.Name(), Path: f.Path(), Type: parquet.StringType, RepetitionType: parquet.RepetitionRequired, Types: []int{0}} +} + +func (f *StringField) Write(w io.Writer, meta *parquet.Metadata) error { + buf := bytes.Buffer{} + + for _, s := range f.vals { + if err := binary.Write(&buf, binary.LittleEndian, int32(len(s))); err != nil { + return err + } + buf.Write([]byte(s)) + } + + return f.DoWrite(w, meta, buf.Bytes(), len(f.vals), f.stats) +} + +func (f *StringField) Read(r io.ReadSeeker, pg parquet.Page) error { + rr, _, err := f.DoRead(r, pg) + if err != nil { + return err + } + + for j := 0; j < pg.N; j++ { + var x int32 + if err := binary.Read(rr, binary.LittleEndian, &x); err != nil { + return err + } + s := make([]byte, x) + if _, err := rr.Read(s); err != nil { + return err + } + + f.vals = append(f.vals, string(s)) + } + return nil +} + +func (f *StringField) Scan(r *C) { + if len(f.vals) == 0 { + return + } + + f.write(r, f.vals) + f.vals = f.vals[1:] +} + +func (f *StringField) Add(r C) { + v := f.read(r) + f.stats.add(v) + f.vals = append(f.vals, v) +} + +func (f *StringField) Levels() ([]uint8, []uint8) { + return nil, nil +} + +type int32stats struct { + min int32 + max int32 +} + +func newInt32stats() *int32stats { + return &int32stats{ + min: int32(math.MaxInt32), + } +} + +func (i *int32stats) add(val int32) { + if val < i.min { + i.min = val + } + if val > i.max { + i.max = val + } +} + +func (f *int32stats) bytes(val int32) []byte { + var buf bytes.Buffer + binary.Write(&buf, binary.LittleEndian, val) + return buf.Bytes() +} + +func (f *int32stats) NullCount() *int64 { + return nil +} + +func (f *int32stats) DistinctCount() *int64 { + return nil +} + +func (f *int32stats) Min() []byte { + return f.bytes(f.min) +} + +func (f *int32stats) Max() []byte { + return f.bytes(f.max) +} + +type stringStats struct { + vals []string + min []byte + max []byte +} + +func newStringStats() *stringStats { + return &stringStats{} +} + +func (s *stringStats) add(val string) { + s.vals = append(s.vals, val) +} + +func (s *stringStats) NullCount() *int64 { + return nil +} + +func (s *stringStats) DistinctCount() *int64 { + return nil +} + +func (s *stringStats) Min() []byte { + if s.min == nil { + s.minMax() + } + return s.min +} + +func (s *stringStats) Max() []byte { + if s.max == nil { + s.minMax() + } + return s.max +} + +func (s *stringStats) minMax() { + if len(s.vals) == 0 { + return + } + + tmp := make([]string, len(s.vals)) + copy(tmp, s.vals) + sort.Strings(tmp) + s.min = []byte(tmp[0]) + s.max = []byte(tmp[len(tmp)-1]) +} + +func pint32(i int32) *int32 { return &i } +func puint32(i uint32) *uint32 { return &i } +func pint64(i int64) *int64 { return &i } +func puint64(i uint64) *uint64 { return &i } +func pbool(b bool) *bool { return &b } +func pstring(s string) *string { return &s } +func pfloat32(f float32) *float32 { return &f } +func pfloat64(f float64) *float64 { return &f } + +// keeps track of the indices of repeated fields +// that have already been handled by a previous field +type indices []int + +func (i indices) rep(rep uint8) { + if rep > 0 { + r := int(rep) - 1 + i[r] = i[r] + 1 + for j := int(rep); j < len(i); j++ { + i[j] = 0 + } + } +} + +func maxDef(types []int) uint8 { + var out uint8 + for _, typ := range types { + if typ > 0 { + out++ + } + } + return out +} diff --git a/examples/embedded2x/people.parquet b/examples/embedded2x/people.parquet new file mode 100644 index 0000000..bcd2ced Binary files /dev/null and b/examples/embedded2x/people.parquet differ