Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(storage/transfermanager): non-positive partSize will turn off sharding #10509

Merged
merged 4 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion storage/transfermanager/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,13 @@ func numShards(attrs *storage.ReaderObjectAttrs, r *DownloadRange, partSize int6
return 1
}

// Sharding turned off with partSize < 1.
if partSize < 1 {
return 1
}

// Divide entire object into shards if no range given.
if r == nil {
// Divide entire object into shards.
return int(math.Ceil(float64(objectSize) / float64(partSize)))
}
// Negative offset reads the whole object in one go.
Expand Down Expand Up @@ -742,6 +747,11 @@ func shardRange(r *DownloadRange, partSize int64, shard int) DownloadRange {
}
}

// No sharding if partSize is less than 1.
if partSize < 1 {
return *r
}

// Negative offset reads the whole object in one go.
if r.Offset < 0 {
return *r
Expand Down
13 changes: 13 additions & 0 deletions storage/transfermanager/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,19 @@ func TestCalculateRange(t *testing.T) {
Length: 1000,
},
},
{
desc: "sharding turned off",
objRange: &DownloadRange{
Offset: 1024 * 1024 * 1024 * 1024 / 2,
Length: 1024 * 1024 * 1024 * 1024,
},
partSize: 0,
shard: 0,
want: DownloadRange{
Offset: 1024 * 1024 * 1024 * 1024 / 2,
Length: 1024 * 1024 * 1024 * 1024,
},
},
{
desc: "large object",
objRange: &DownloadRange{
Expand Down
11 changes: 8 additions & 3 deletions storage/transfermanager/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,14 @@ func (wpt withPerOpTimeout) apply(tm *transferManagerConfig) {
tm.perOperationTimeout = wpt.timeout
}

// WithPartSize returns a TransferManagerOption that specifies the size of the
// shards to transfer; that is, if the object is larger than this size, it will
// be uploaded or downloaded in concurrent pieces.
// WithPartSize returns a TransferManagerOption that specifies the size in bytes
// of the shards to transfer; that is, if the object is larger than partSize,
// it will be uploaded or downloaded in concurrent pieces of size partSize.
//
// The default is 32 MiB for downloads.
//
// To turn off sharding, set partSize to 0.
//
// Note that files that support decompressive transcoding will be downloaded in
// a single piece regardless of the partSize set here.
func WithPartSize(partSize int64) Option {
Expand All @@ -93,6 +97,7 @@ type transferManagerConfig struct {

// Size of shards to transfer; Python found 32 MiB to be good default for
// JSON downloads but gRPC may benefit from larger.
// A partSize smaller than 1 indicates to turn off sharding.
partSize int64

// Timeout for a single operation (including all retries). Zero value means
Expand Down
57 changes: 39 additions & 18 deletions storage/transfermanager/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,46 @@ import (
)

func TestApply(t *testing.T) {
opts := []Option{
WithWorkers(3),
WithPerOpTimeout(time.Hour),
WithCallbacks(),
WithPartSize(30),
}
var got transferManagerConfig
for _, opt := range opts {
opt.apply(&got)
}
want := transferManagerConfig{
numWorkers: 3,
perOperationTimeout: time.Hour,
asynchronous: true,
partSize: 30,
}
for _, test := range []struct {
desc string
opts []Option
want transferManagerConfig
}{
{
desc: "all options",
opts: []Option{
WithWorkers(3),
WithPerOpTimeout(time.Hour),
WithCallbacks(),
WithPartSize(300000),
},
want: transferManagerConfig{
numWorkers: 3,
perOperationTimeout: time.Hour,
asynchronous: true,
partSize: 300000,
},
},
{
desc: "small partSize",
opts: []Option{
WithPartSize(30),
},
want: transferManagerConfig{
partSize: 30,
},
},
} {
t.Run(test.desc, func(t *testing.T) {
var got transferManagerConfig
for _, opt := range test.opts {
opt.apply(&got)
}

if got != want {
t.Errorf("got: %+v, want: %+v", got, want)
if got != test.want {
t.Errorf("got: %+v, want: %+v", got, test.want)
}
})
}
}

Expand Down
Loading