Skip to content

Commit

Permalink
godev/devtools/cmd/copyuploads: download data from the public endpoint
Browse files Browse the repository at this point in the history
Download and split telemetry reports from the public merged bucket in
the copyuploads tool, so that GCS permissions are not a barrier to local
development.

Change-Id: Ia68231db08556df377c07c1cf1d964a41f0599bd
Reviewed-on: https://go-review.googlesource.com/c/telemetry/+/617176
Reviewed-by: Hyang-Ah Hana Kim <[email protected]>
LUCI-TryBot-Result: Go LUCI <[email protected]>
  • Loading branch information
findleyr committed Oct 3, 2024
1 parent 338fe24 commit 3be6e8d
Showing 1 changed file with 59 additions and 44 deletions.
103 changes: 59 additions & 44 deletions godev/devtools/cmd/copyuploads/copyuploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,75 +5,58 @@
// The copyuploads command copies uploads from GCS to the local filesystem
// storage, for use with local development of the worker.
//
// By default, this command copies the last 3 days of uploads from the
// dev-telemetry-uploaded bucket in GCS to the local filesystem bucket
// local-telemetry-uploaded, at which point this data will be available when
// running ./godev/cmd/worker with no arguments.
//
// This command requires read permission to the go-telemetry GCS buckets.
// TODO(rfindley): we could avoid the need for read permission by instead
// downloading the public merged reports, and reassembling the individual
// uploads.
// By default, this command copies the last 3 days of uploads from
// telemetry.go.dev to the local filesystem bucket local-telemetry-uploaded, at
// which point this data will be available when running ./godev/cmd/worker with
// no arguments.
//
// See --help for more details.
package main

import (
"bytes"
"context"
"errors"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
"path"
"time"

"golang.org/x/sync/errgroup"
"golang.org/x/telemetry/godev/internal/config"
"golang.org/x/telemetry/godev/internal/storage"
)

var (
bucket = flag.String("bucket", "dev-telemetry-uploaded", "The bucket to copy from.")
daysBack = flag.Int("days_back", 3, "The number of days back to copy")
verbose = flag.Bool("v", false, "If set, enable verbose logging.")
)

func main() {
flag.Parse()

if !strings.HasSuffix(*bucket, "-uploaded") {
log.Fatal("-bucket must end in -uploaded")
}

cfg := config.NewConfig()
ctx := context.Background()

gcs, err := storage.NewGCSBucket(ctx, cfg.ProjectID, *bucket)
if err != nil {
log.Fatal(err)
}
fs, err := storage.NewFSBucket(ctx, cfg.LocalStorage, "local-telemetry-uploaded")
if err != nil {
log.Fatal(err)
}

// Copy files concurrently.
const concurrency = 5
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(concurrency)

start := time.Now()
for dayOffset := range *daysBack {
date := start.AddDate(0, 0, -dayOffset)
it := gcs.Objects(ctx, date.Format(time.DateOnly))
for {
name, err := it.Next()
if errors.Is(err, storage.ErrObjectIteratorDone) {
break
}

date := start.AddDate(0, 0, -dayOffset-1) // today's merged reports may not yet be available
dateString := date.Format(time.DateOnly)
byFile, err := downloadData(dateString)
if err != nil {
log.Fatalf("Downloading data for %s: %v", dateString, err)
}
for name, content := range byFile {
// Skip objects that already exist in local storage.
dest := fs.Object(name)
dest := fs.Object(path.Join(dateString, name))
if _, err := os.Stat(dest.(*storage.FSObject).Filename()); err == nil {
if *verbose {
log.Printf("Skipping existing object %s", name)
Expand All @@ -83,17 +66,49 @@ func main() {
if *verbose {
log.Printf("Starting copying object %s", name)
}

g.Go(func() error {
if err != nil {
return err
}
return storage.Copy(ctx, dest, gcs.Object(name))
})
w, err := dest.NewWriter(ctx)
if err != nil {
log.Fatal(err)
}
if _, err := io.Copy(w, bytes.NewReader(content)); err != nil {
log.Fatal(err)
}
}
}
}

if err := g.Wait(); err != nil {
log.Fatal(err)
// downloadData downloads the merged telemetry data for the given date string
// (which must be in time.DateOnly format), and splits it back into individual
// uploaded files, keyed by their original name (<X>.json).
func downloadData(dateString string) (map[string][]byte, error) {
url := fmt.Sprintf("https://storage.googleapis.com/prod-telemetry-merged/%s.json", dateString)
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return nil, fmt.Errorf("downloading %s failed with status %d", url, resp.StatusCode)
}

data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

byFile := make(map[string][]byte)
for _, line := range bytes.Split(data, []byte("\n")) {
line = bytes.TrimSpace(line)
if len(line) == 0 {
continue // defensive: skip empty lines
}
var x struct{ X float64 }
if err := json.Unmarshal(line, &x); err != nil {
return nil, err
}
file := fmt.Sprintf("%g.json", x.X)
byFile[file] = append(line, '\n') // uploaded data is newline terminated
}
return byFile, nil
}

0 comments on commit 3be6e8d

Please sign in to comment.