Skip to content

Commit 80d9119

Browse files
committed
make old bucket.DeletePrefix concurrent
Signed-off-by: Alex Le <[email protected]>
1 parent b8a8d87 commit 80d9119

File tree

3 files changed

+7
-11
lines changed

3 files changed

+7
-11
lines changed

pkg/compactor/blocks_cleaner.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
501501
return err
502502
}
503503

504-
if deleted, err := bucket.DeletePrefixConcurrent(ctx, userBucket, bucketindex.MarkersPathname, userLogger, defaultDeleteBlocksConcurrency); err != nil {
504+
if deleted, err := bucket.DeletePrefix(ctx, userBucket, bucketindex.MarkersPathname, userLogger, defaultDeleteBlocksConcurrency); err != nil {
505505
return errors.Wrap(err, "failed to delete marker files")
506506
} else if deleted > 0 {
507507
level.Info(userLogger).Log("msg", "deleted marker files for tenant marked for deletion", "count", deleted)
@@ -513,15 +513,15 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
513513
}
514514

515515
func (c *BlocksCleaner) deleteNonDataFiles(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) error {
516-
if deleted, err := bucket.DeletePrefixConcurrent(ctx, userBucket, block.DebugMetas, userLogger, defaultDeleteBlocksConcurrency); err != nil {
516+
if deleted, err := bucket.DeletePrefix(ctx, userBucket, block.DebugMetas, userLogger, defaultDeleteBlocksConcurrency); err != nil {
517517
return errors.Wrap(err, "failed to delete "+block.DebugMetas)
518518
} else if deleted > 0 {
519519
level.Info(userLogger).Log("msg", "deleted files under "+block.DebugMetas+" for tenant marked for deletion", "count", deleted)
520520
}
521521

522522
if c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
523523
// Clean up partitioned group info files
524-
if deleted, err := bucket.DeletePrefixConcurrent(ctx, userBucket, PartitionedGroupDirectory, userLogger, defaultDeleteBlocksConcurrency); err != nil {
524+
if deleted, err := bucket.DeletePrefix(ctx, userBucket, PartitionedGroupDirectory, userLogger, defaultDeleteBlocksConcurrency); err != nil {
525525
return errors.Wrap(err, "failed to delete "+PartitionedGroupDirectory)
526526
} else if deleted > 0 {
527527
level.Info(userLogger).Log("msg", "deleted files under "+PartitionedGroupDirectory+" for tenant marked for deletion", "count", deleted)
@@ -781,7 +781,7 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
781781

782782
if extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker {
783783
// Remove partition visit markers
784-
if _, err := bucket.DeletePrefixConcurrent(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger, defaultDeleteBlocksConcurrency); err != nil {
784+
if _, err := bucket.DeletePrefix(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger, defaultDeleteBlocksConcurrency); err != nil {
785785
level.Warn(userLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile, "err", err)
786786
} else {
787787
level.Info(userLogger).Log("msg", "deleted partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile)

pkg/storage/bucket/bucket_util.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,7 @@ import (
1515
// DeletePrefix removes all objects with given prefix, recursively.
1616
// It returns number of deleted objects.
1717
// If deletion of any object fails, it returns error and stops.
18-
func DeletePrefix(ctx context.Context, bkt objstore.Bucket, prefix string, logger log.Logger) (int, error) {
19-
return DeletePrefixConcurrent(ctx, bkt, prefix, logger, 1)
20-
}
21-
22-
func DeletePrefixConcurrent(ctx context.Context, bkt objstore.Bucket, prefix string, logger log.Logger, maxConcurrency int) (int, error) {
18+
func DeletePrefix(ctx context.Context, bkt objstore.Bucket, prefix string, logger log.Logger, maxConcurrency int) (int, error) {
2319
keys, err := ListPrefixes(ctx, bkt, prefix, logger)
2420
if err != nil {
2521
return 0, err

pkg/storage/bucket/bucket_util_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func TestDeletePrefix(t *testing.T) {
2222
require.NoError(t, mem.Upload(context.Background(), "prefix/sub2/4", strings.NewReader("hello")))
2323
require.NoError(t, mem.Upload(context.Background(), "outside/obj", strings.NewReader("hello")))
2424

25-
del, err := DeletePrefix(context.Background(), mem, "prefix", log.NewNopLogger())
25+
del, err := DeletePrefix(context.Background(), mem, "prefix", log.NewNopLogger(), 1)
2626
require.NoError(t, err)
2727
assert.Equal(t, 4, del)
2828
assert.Equal(t, 2, len(mem.Objects()))
@@ -42,7 +42,7 @@ func TestDeletePrefixConcurrent(t *testing.T) {
4242
require.NoError(t, mem.Upload(context.Background(), fmt.Sprintf("prefix/sub/%d", i), strings.NewReader(fmt.Sprintf("hello%d", i))))
4343
}
4444

45-
del, err := DeletePrefixConcurrent(context.Background(), mem, "prefix", log.NewNopLogger(), 100)
45+
del, err := DeletePrefix(context.Background(), mem, "prefix", log.NewNopLogger(), 100)
4646
require.NoError(t, err)
4747
assert.Equal(t, 4+n, del)
4848
assert.Equal(t, 2, len(mem.Objects()))

0 commit comments

Comments
 (0)