Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage/transfermanager): prototype #10045

Merged
merged 15 commits into from
Jun 5, 2024
Prev Previous commit
Next Next commit
mvp, just missing some tests
  • Loading branch information
BrennaEpp committed Apr 25, 2024
commit a23db2e35a4ff1ce79173099c0f6eeb0098c224f
276 changes: 185 additions & 91 deletions storage/transfermanager/downloader.go
Original file line number Diff line number Diff line change
@@ -1,94 +1,199 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package transfermanager

import (
"context"
"errors"
"io"
"sync"
"time"

"cloud.google.com/go/storage"
"google.golang.org/api/iterator"
)

type TransferManagerOption interface {
apply(*transferManagerConfig)
}

func WithWorkers(numWorkers int) TransferManagerOption {
return &withWorkers{numWorkers: numWorkers}
}

type withWorkers struct {
numWorkers int
}

func (ww withWorkers) apply(tm *transferManagerConfig) {
tm.numWorkers = ww.numWorkers
}

func WithPartSize(partSize int) TransferManagerOption {
return &withPartSize{partSize: partSize}
}

type withPartSize struct {
partSize int
// Downloader manages parallel download operations from a Cloud Storage bucket.
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
type Downloader struct {
client *storage.Client
config *transferManagerConfig
work chan *DownloadObjectInput // Piece of work to be executed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably this should be send-only and output should be receive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are sending and receiving from both channels in different places in the downloader. Unidirectional channels could be used in subcomponents or if we were providing the channel to the user, but I don't see how we could implement this with unidirectional channels - if we only received from output, who would send us the output (and vice-versa for work)?

output chan *DownloadOutput // Channel for completed downloads; used to feed results iterator.
workers *sync.WaitGroup // Keeps track of the workers that are currently running.
}

func (wps withPartSize) apply(tm *transferManagerConfig) {
tm.partSize = wps.partSize
// DownloadObject queues the download of a single object. If it's larger than
// the specified part size, the download will automatically be broken up into
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
// multiple range reads. This will initiate the download but is non-blocking;
// call Downloader.Results to process the result.
func (d *Downloader) DownloadObject(ctx context.Context, input *DownloadObjectInput) {
input.ctx = ctx
d.work <- input
}

// etc for additional options.

type transferManagerConfig struct {
// Workers in thread pool; default numCPU/2 based on previous benchmarks?
numWorkers int
// Size of shards to transfer; Python found 32 MiB to be good default for
// JSON downloads but gRPC may benefit from larger.
partSize int
// Timeout for a single operation (including all retries).
perOperationTimeout time.Duration

// others, including opts that apply to uploads.
// Download a set of objects to a local path. Downloader will create the needed
// directory structure locally as the operations progress.
// This will initiate the download but is non-blocking; wait on Downloader.Results
// to process the result. Results will be split into individual objects.
// NOTE: Do not use, DownloadDirectory is not implemented.
func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirectoryInput) {
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
d.output <- &DownloadOutput{Bucket: input.Bucket, Err: errors.New("DownloadDirectory is not implemented")}
// This does obj listing/os.File calls and then converts to object inputs.
}

// etc

type Downloader struct {
ctx context.Context
client *storage.Client
config *transferManagerConfig
objectInputs []DownloadObjectInput // Will be sent via a channel to workers
directoryInputs []DownloadDirectoryInput // This does obj listing/os.File calls and then converts to object inputs
output <-chan *DownloadOutput // Channel for completed downloads; used to feed results iterator
done <-chan bool // Used to signal completion of all downloads.
// etc
// Waits for all outstanding downloads to complete. The Downloader must not be
// used to download more objects or directories after this has been called.
func (d *Downloader) WaitAndClose() error {
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
close(d.work)
d.workers.Wait()
close(d.output)
return nil
}

// Create a new Downloader to add operations to.
// Results returns the iterator for download outputs.
func (d *Downloader) Results() *DownloadOutputIterator {
return &DownloadOutputIterator{
output: d.output,
}
}

// downloadWorker continuously processes downloads until the work channel is closed.
func (d *Downloader) downloadWorker() {
d.workers.Add(1)
for {
input, ok := <-d.work
if !ok {
break // no more work; exit
}

// TODO: break down the input into smaller pieces if necessary; maybe as follows:
// Only request partSize data to begin with. If no error and we haven't finished
// reading the object, enqueue the remaining pieces of work and mark in the
// out var the amount of shards to wait for.
d.output <- input.downloadShard(d.client, d.config.perOperationTimeout)
}
d.workers.Done()
}

// NewDownloader creates a new Downloader to add operations to.
// Choice of transport, etc is configured on the client that's passed in.
func NewDownloader(c *storage.Client, opts ...TransferManagerOption) (*Downloader, error) {

return nil, nil
}

// Input for a single object to download.
const (
chanBufferSize = 1000 // how big is it reasonable to make this?
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
// We should probably expose this as max concurrent ops, because programs can deadlock if calling d.Waitclose before processing it.Next
)

d := &Downloader{
client: c,
config: initTransferManagerConfig(opts...),
output: make(chan *DownloadOutput, chanBufferSize),
work: make(chan *DownloadObjectInput, chanBufferSize),
workers: &sync.WaitGroup{},
}

// Start workers in background.
for i := 0; i < d.config.numWorkers; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably we could optimize this by spinning up workers as needed when there are objects enqueued? Doesn't have to be in this PR though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, though I'm not sure how much that would optimize this by... I guess it depends on the num of workers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah something we can test out later.

go d.downloadWorker()
}

return d, nil
}

// DownloadRange specifies the object range.
type DownloadRange struct {
// Offset is the starting offset (inclusive) from with the object is read.
// If offset is negative, the object is read abs(offset) bytes from the end,
// and length must also be negative to indicate all remaining bytes will be read.
Offset int64
// Length is the number of bytes to read.
// If length is negative or larger than the object size, the object is read
// until the end.
Length int64
}

// DownloadObjectInput is the input for a single object to download.
type DownloadObjectInput struct {
// Required fields
Bucket string
Source string
Object string
Destination io.WriterAt

// Optional fields
Generation *int64
Conditions *storage.Conditions
EncryptionKey []byte
Offset, Length int64 // if specified, reads only a range.
}

// Download a single object. If it's larger than the specified part size,
// the operation will automatically be broken up into multiple range reads.
// This will initiate the download but is non-blocking; wait on Downloader.Output
// to process the results.
func (d *Downloader) DownloadObject(ctx context.Context, input *DownloadObjectInput) {
Generation *int64
Conditions *storage.Conditions
EncryptionKey []byte
Range *DownloadRange // if specified, reads only a range

ctx context.Context
}

// downloadShard will read a specific object into in.Destination.
// TODO: download a single shard instead of the entire object.
func (in *DownloadObjectInput) downloadShard(client *storage.Client, timeout time.Duration) (out *DownloadOutput) {
out = &DownloadOutput{Bucket: in.Bucket, Object: in.Object}

// Set timeout.
ctx := in.ctx
if timeout > 0 {
c, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
ctx = c
}

// Set options on the object.
o := client.Bucket(in.Bucket).Object(in.Object)

if in.Conditions != nil {
o.If(*in.Conditions)
}
if in.Generation != nil {
o = o.Generation(*in.Generation)
}
if len(in.EncryptionKey) > 0 {
o = o.Key(in.EncryptionKey)
}

var offset, length int64 = 0, -1 // get the entire object by default

if in.Range != nil {
offset, length = in.Range.Offset, in.Range.Length
}

// Read.
r, err := o.NewRangeReader(ctx, offset, length)
if err != nil {
out.Err = err
return
}

// TODO: write at a specific offset.
off := io.NewOffsetWriter(in.Destination, 0)
_, err = io.Copy(off, r)
if err != nil {
out.Err = err
r.Close()
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to look into whether we should be attempting to close the writer if possible -- seems annoying for end users to have to clean up. Though, I guess we don't guarantee that a close method exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the writer provided through in.Destination? The io.WriterAt interface and for that matter io. OffsetWriter do not have a Close method, as you say. In any case, I feel like it'd be bad practice to close a writer we don't own?


if err = r.Close(); err != nil {
out.Err = err
return
}

out.Attrs = &r.Attrs
return
}

type DownloadDirectoryInput struct {
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -106,44 +211,33 @@ type DownloadDirectoryInput struct {
// Maybe some others about local file naming.
}

// Download a set of objects to a local path. Downloader will
// resolve the query and created the needed directory structure locally as the
// operations progress.
// This will initiate the download but is non-blocking; wait on Downloader.Output
// to process the result.
func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirectoryInput) {
}

// Waits for all outstanding downloads to complete. Then, closes the Output
// channel.
func (d *Downloader) WaitAndClose() error {
return nil
}

// Results returns the iterator for download outputs.
func (d *Downloader) Results() *DownloadOutputIterator {
return nil
}

// DownloadOutput provides output for a single object download, including all
// errors received while downloading object parts. If the download was successful,
// Attrs will be populated.
type DownloadOutput struct {
Name string // name of object
Err error // error occurring during download. Can use multi-error in Go 1.20+ if multiple failures.
Attrs *storage.ReaderObjectAttrs // Attributes of downloaded object, if successful.
Bucket string
Object string
Err error // error occurring during download. Can use multi-error in Go 1.20+ if multiple failures.
Attrs *storage.ReaderObjectAttrs // Attributes of downloaded object, if successful.
}

// DownloadOutputIterator allows the end user to iterate through completed
// object downloads.
type DownloadOutputIterator struct {
// unexported fields including buffered DownloadOutputs
output <-chan *DownloadOutput
}

// Use this to iterate through results. When complete, will return error
// iterator.Done.
// Next iterates through results. When complete, will return the iterator.Done
// error. It is considered complete once WaitAndClose() has been called on the
// Downloader.
// DownloadOutputs will be available as the downloads complete; they can
// be iterated through asynchronously or at the end of the job.
// Next will block if there are no more completed downloads (and the Downloader
// is not closed).
func (it *DownloadOutputIterator) Next() (*DownloadOutput, error) {
return nil, nil
out, ok := <-it.output
if !ok {
return nil, iterator.Done
}
return out, nil
}
Loading