From 3be6e8dcb6d1128c3a106985ba1b995d94e6ab2d Mon Sep 17 00:00:00 2001 From: Rob Findley Date: Tue, 1 Oct 2024 17:49:48 +0000 Subject: [PATCH] godev/devtools/cmd/copyuploads: download data from the public endpoint Download and split telemetry reports from the public merged bucket in the copyuploads tool, so that GCS permissions are not a barrier to local development. Change-Id: Ia68231db08556df377c07c1cf1d964a41f0599bd Reviewed-on: https://go-review.googlesource.com/c/telemetry/+/617176 Reviewed-by: Hyang-Ah Hana Kim LUCI-TryBot-Result: Go LUCI --- godev/devtools/cmd/copyuploads/copyuploads.go | 103 ++++++++++-------- 1 file changed, 59 insertions(+), 44 deletions(-) diff --git a/godev/devtools/cmd/copyuploads/copyuploads.go b/godev/devtools/cmd/copyuploads/copyuploads.go index bfc5652..f1e6335 100644 --- a/godev/devtools/cmd/copyuploads/copyuploads.go +++ b/godev/devtools/cmd/copyuploads/copyuploads.go @@ -5,35 +5,32 @@ // The copyuploads command copies uploads from GCS to the local filesystem // storage, for use with local development of the worker. // -// By default, this command copies the last 3 days of uploads from the -// dev-telemetry-uploaded bucket in GCS to the local filesystem bucket -// local-telemetry-uploaded, at which point this data will be available when -// running ./godev/cmd/worker with no arguments. -// -// This command requires read permission to the go-telemetry GCS buckets. -// TODO(rfindley): we could avoid the need for read permission by instead -// downloading the public merged reports, and reassembling the individual -// uploads. +// By default, this command copies the last 3 days of uploads from +// telemetry.go.dev to the local filesystem bucket local-telemetry-uploaded, at +// which point this data will be available when running ./godev/cmd/worker with +// no arguments. // // See --help for more details. package main import ( + "bytes" "context" - "errors" + "encoding/json" "flag" + "fmt" + "io" "log" + "net/http" "os" - "strings" + "path" "time" - "golang.org/x/sync/errgroup" "golang.org/x/telemetry/godev/internal/config" "golang.org/x/telemetry/godev/internal/storage" ) var ( - bucket = flag.String("bucket", "dev-telemetry-uploaded", "The bucket to copy from.") daysBack = flag.Int("days_back", 3, "The number of days back to copy") verbose = flag.Bool("v", false, "If set, enable verbose logging.") ) @@ -41,39 +38,25 @@ var ( func main() { flag.Parse() - if !strings.HasSuffix(*bucket, "-uploaded") { - log.Fatal("-bucket must end in -uploaded") - } - cfg := config.NewConfig() ctx := context.Background() - gcs, err := storage.NewGCSBucket(ctx, cfg.ProjectID, *bucket) - if err != nil { - log.Fatal(err) - } fs, err := storage.NewFSBucket(ctx, cfg.LocalStorage, "local-telemetry-uploaded") if err != nil { log.Fatal(err) } - // Copy files concurrently. - const concurrency = 5 - g, ctx := errgroup.WithContext(ctx) - g.SetLimit(concurrency) - start := time.Now() for dayOffset := range *daysBack { - date := start.AddDate(0, 0, -dayOffset) - it := gcs.Objects(ctx, date.Format(time.DateOnly)) - for { - name, err := it.Next() - if errors.Is(err, storage.ErrObjectIteratorDone) { - break - } - + date := start.AddDate(0, 0, -dayOffset-1) // today's merged reports may not yet be available + dateString := date.Format(time.DateOnly) + byFile, err := downloadData(dateString) + if err != nil { + log.Fatalf("Downloading data for %s: %v", dateString, err) + } + for name, content := range byFile { // Skip objects that already exist in local storage. - dest := fs.Object(name) + dest := fs.Object(path.Join(dateString, name)) if _, err := os.Stat(dest.(*storage.FSObject).Filename()); err == nil { if *verbose { log.Printf("Skipping existing object %s", name) @@ -83,17 +66,49 @@ func main() { if *verbose { log.Printf("Starting copying object %s", name) } - - g.Go(func() error { - if err != nil { - return err - } - return storage.Copy(ctx, dest, gcs.Object(name)) - }) + w, err := dest.NewWriter(ctx) + if err != nil { + log.Fatal(err) + } + if _, err := io.Copy(w, bytes.NewReader(content)); err != nil { + log.Fatal(err) + } } } +} - if err := g.Wait(); err != nil { - log.Fatal(err) +// downloadData downloads the merged telemetry data for the given date string +// (which must be in time.DateOnly format), and splits it back into individual +// uploaded files, keyed by their original name (.json). +func downloadData(dateString string) (map[string][]byte, error) { + url := fmt.Sprintf("https://storage.googleapis.com/prod-telemetry-merged/%s.json", dateString) + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("downloading %s failed with status %d", url, resp.StatusCode) + } + + data, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + byFile := make(map[string][]byte) + for _, line := range bytes.Split(data, []byte("\n")) { + line = bytes.TrimSpace(line) + if len(line) == 0 { + continue // defensive: skip empty lines + } + var x struct{ X float64 } + if err := json.Unmarshal(line, &x); err != nil { + return nil, err + } + file := fmt.Sprintf("%g.json", x.X) + byFile[file] = append(line, '\n') // uploaded data is newline terminated } + return byFile, nil }