Skip to content

Enabling Support for Conditional Multi-Part Upload #18093

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
70ee923
git commit -m "UTs for : executeMultipartUploadIfEtagMatches"
tqranjan Apr 27, 2025
2c31c83
git commit -m "Implementation for : executeMultipartUploadIfEtagMatches"
tqranjan Apr 27, 2025
eb3778c
Adds a constant HTTP_STATUS_PRECONDITION_FAILED with value 412
tqranjan Apr 28, 2025
44d45e8
UploadRequest : Add ConditionalWriteOptions
tqranjan May 27, 2025
a31d15e
Add ConditionalWrite utility for conditional blob store writes
tqranjan May 27, 2025
a1df80a
Adds logic to Upload an object to S3 conditionally using the async cl…
tqranjan May 27, 2025
2c2c72a
UTs for async Conditional upload logic
tqranjan May 27, 2025
f0857dc
Test Suite for ASync Conditional upload process
tqranjan May 28, 2025
062a76d
S3 Blobcontainer Implementation for Async Conditional upload
tqranjan May 28, 2025
2f8f4de
Javadoc for builder
tqranjan May 28, 2025
9fdefaa
asyncBlobUploadConditionally : Interfaces for AsyncMultiStreamBlobCon…
tqranjan May 28, 2025
d8d8f9a
Log fix
tqranjan May 28, 2025
7a7e240
Merge branch 'main' into pr-branch-conditionalMultipartUpload
x-INFiN1TY-x May 29, 2025
5cfeb6e
SpotlessApply post Conflict resolution
tqranjan May 29, 2025
0a90680
Conflict resolution in executeMultipartUploadConditionally
tqranjan May 29, 2025
cbc0188
Conflict resolution in executeMultipartUploadConditionally's UTs
tqranjan May 29, 2025
0eedd2c
Conflict resolution in S3BlobContainerMockClientTests & AsyncTransfer…
tqranjan May 29, 2025
0e31c62
S3BlobContainer Refactor
tqranjan Jun 11, 2025
50498ac
AsyncTransferManager Refactor
tqranjan Jun 11, 2025
656a3a1
removed stale_primary_shard : generic rewording
tqranjan Jun 11, 2025
11dc264
AsyncTransferManagerTests.java refactor
tqranjan Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.ConditionalWrite.ConditionalWriteOptions;
import org.opensearch.common.blobstore.stream.write.WritePriority;

import java.io.IOException;
Expand All @@ -28,6 +29,7 @@ public class UploadRequest {
private final Long expectedChecksum;
private final Map<String, String> metadata;
private final boolean uploadRetryEnabled;
private final ConditionalWriteOptions conditionalOptions;
private volatile String serverSideEncryptionType;
private volatile String serverSideEncryptionKmsKey;
private volatile boolean serverSideEncryptionBucketKey;
Expand All @@ -45,6 +47,12 @@ public class UploadRequest {
* @param doRemoteDataIntegrityCheck A boolean to inform vendor plugins whether remote data integrity checks need to be done
* @param expectedChecksum Checksum of the file being uploaded for remote data integrity check
* @param metadata Metadata of the file being uploaded
* @param conditionalOptions Conditions that must be satisfied for the write to succeed
* @param serverSideEncryptionType Type of server-side encryption
* @param serverSideEncryptionKmsKey KMS key for server-side encryption
* @param serverSideEncryptionBucketKey Whether to use bucket keys for server-side encryption
* @param serverSideEncryptionEncryptionContext Encryption context for server-side encryption
* @param expectedBucketOwner Expected owner of the bucket
*/
public UploadRequest(
String bucket,
Expand All @@ -56,6 +64,7 @@ public UploadRequest(
Long expectedChecksum,
boolean uploadRetryEnabled,
@Nullable Map<String, String> metadata,
@Nullable ConditionalWriteOptions conditionalOptions,
String serverSideEncryptionType,
String serverSideEncryptionKmsKey,
boolean serverSideEncryptionBucketKey,
Expand All @@ -71,6 +80,7 @@ public UploadRequest(
this.expectedChecksum = expectedChecksum;
this.uploadRetryEnabled = uploadRetryEnabled;
this.metadata = metadata;
this.conditionalOptions = conditionalOptions;
this.serverSideEncryptionType = serverSideEncryptionType;
this.serverSideEncryptionKmsKey = serverSideEncryptionKmsKey;
this.serverSideEncryptionBucketKey = serverSideEncryptionBucketKey;
Expand Down Expand Up @@ -117,6 +127,14 @@ public Map<String, String> getMetadata() {
return metadata;
}

/**
* @return conditional write options for this upload, or null if none are specified
*/
@Nullable
public ConditionalWriteOptions getConditionalOptions() {
return conditionalOptions;
}

public String getServerSideEncryptionType() {
return serverSideEncryptionType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@
import software.amazon.awssdk.services.s3.model.UploadPartResponse;

import org.apache.lucene.store.IndexInput;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.ConditionalWrite.ConditionalWriteOptions;
import org.opensearch.common.blobstore.ConditionalWrite.ConditionalWriteResponse;
import org.opensearch.common.blobstore.stream.write.StreamContextSupplier;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
Expand Down Expand Up @@ -76,6 +79,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;

import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;

Expand All @@ -90,6 +94,7 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -774,4 +779,224 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W
}
});
}

private void testLargeFilesRedirectedToSlowSyncClientConditional(
boolean expectException,
WritePriority writePriority,
int conditionalResponseCode
) throws IOException, InterruptedException {

ByteSizeValue capacity = new ByteSizeValue(1, ByteSizeUnit.GB);
int numberOfParts = 20;
final ByteSizeValue partSize = new ByteSizeValue(capacity.getBytes() / numberOfParts + 1, ByteSizeUnit.BYTES);

GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10);
SizeBasedBlockingQ sizeBasedBlockingQ = new SizeBasedBlockingQ(
capacity,
transferQueueConsumerService,
10,
genericStatsMetricPublisher,
SizeBasedBlockingQ.QueueEventType.NORMAL
);

final long lastPartSize = new ByteSizeValue(200, ByteSizeUnit.MB).getBytes();
final long blobSize = ((numberOfParts - 1) * partSize.getBytes()) + lastPartSize;
CountDownLatch countDownLatch = new CountDownLatch(1);
AtomicReference<ConditionalWriteResponse> responseRef = new AtomicReference<>();
AtomicReference<Exception> exceptionRef = new AtomicReference<>();

ActionListener<ConditionalWriteResponse> completionListener = ActionListener.wrap(resp -> {
responseRef.set(resp);
countDownLatch.countDown();
}, ex -> {
exceptionRef.set(ex);
countDownLatch.countDown();
});

final String bucketName = randomAlphaOfLengthBetween(1, 10);

final BlobPath blobPath = new BlobPath();
if (randomBoolean()) {
IntStream.of(randomIntBetween(1, 5)).forEach(value -> blobPath.add("path_" + value));
}

final long bufferSize = ByteSizeUnit.MB.toBytes(randomIntBetween(5, 1024));

final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize);

when(blobStore.getLowPrioritySizeBasedBlockingQ()).thenReturn(sizeBasedBlockingQ);
when(blobStore.getNormalPrioritySizeBasedBlockingQ()).thenReturn(sizeBasedBlockingQ);

final StorageClass storageClass = randomFrom(StorageClass.values());
when(blobStore.getStorageClass()).thenReturn(storageClass);
when(blobStore.isRedirectLargeUploads()).thenReturn(true);
boolean uploadRetryEnabled = randomBoolean();
when(blobStore.isUploadRetryEnabled()).thenReturn(uploadRetryEnabled);

final ObjectCannedACL cannedAccessControlList = randomBoolean() ? randomFrom(ObjectCannedACL.values()) : null;
if (cannedAccessControlList != null) {
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
}

if (randomBoolean()) {
when(blobStore.serverSideEncryptionType()).thenReturn(ServerSideEncryption.AES256.toString());
} else {
when(blobStore.serverSideEncryptionType()).thenReturn(ServerSideEncryption.AWS_KMS.toString());
when(blobStore.serverSideEncryptionKmsKey()).thenReturn(randomAlphaOfLength(10));
when(blobStore.serverSideEncryptionBucketKey()).thenReturn(randomBoolean());
when(blobStore.serverSideEncryptionEncryptionContext()).thenReturn(randomAlphaOfLength(10));
}

final S3Client client = mock(S3Client.class);
final AmazonS3Reference clientReference = Mockito.spy(new AmazonS3Reference(client));
doNothing().when(clientReference).close();
when(blobStore.clientReference()).thenReturn(clientReference);

final String uploadId = randomAlphaOfLength(10);
final CreateMultipartUploadResponse createMultipartUploadResponse = CreateMultipartUploadResponse.builder()
.uploadId(uploadId)
.build();
when(client.createMultipartUpload(any(CreateMultipartUploadRequest.class))).thenReturn(createMultipartUploadResponse);

if (expectException) {
when(client.uploadPart(any(UploadPartRequest.class), any(RequestBody.class))).thenThrow(
SdkException.create("Expected upload part request to fail", new RuntimeException())
);
} else {
when(client.uploadPart(any(UploadPartRequest.class), any(RequestBody.class))).thenReturn(
UploadPartResponse.builder().eTag("part-etag-" + randomAlphaOfLength(5)).build()
);
}

if (conditionalResponseCode == 412) {
when(client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))).thenThrow(
software.amazon.awssdk.services.s3.model.S3Exception.builder().statusCode(412).message("Precondition Failed").build()
);
} else if (conditionalResponseCode == 409) {
when(client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))).thenThrow(
software.amazon.awssdk.services.s3.model.S3Exception.builder().statusCode(409).message("Resource Already Exists").build()
);
} else {
String eTag = "\"multipart-etag-" + randomAlphaOfLength(5) + "\"";
when(client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))).thenReturn(
CompleteMultipartUploadResponse.builder().eTag(eTag).build()
);
}

when(client.abortMultipartUpload(any(AbortMultipartUploadRequest.class))).thenReturn(
AbortMultipartUploadResponse.builder().build()
);

List<InputStream> openInputStreams = new ArrayList<>();
final S3BlobContainer s3BlobContainer = Mockito.spy(new S3BlobContainer(blobPath, blobStore));

doCallRealMethod().when(s3BlobContainer)
.executeMultipartUploadConditionally(
any(S3BlobStore.class),
anyString(),
any(InputStream.class),
anyLong(),
any(),
any(ConditionalWriteOptions.class),
ArgumentMatchers.<ActionListener<ConditionalWriteResponse>>any()
);

StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));

WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob_conditional")
.streamContextSupplier(streamContextSupplier)
.fileSize(blobSize)
.failIfAlreadyExists(false)
.writePriority(writePriority)
.uploadFinalizer(success -> {
Assert.assertTrue(success);
})
.doRemoteDataIntegrityCheck(false)
.metadata(new HashMap<>())
.build();

ConditionalWriteOptions conditionalOptions;
if (conditionalResponseCode == 412) {
conditionalOptions = ConditionalWriteOptions.ifMatch("invalid-etag");
} else if (conditionalResponseCode == 409) {
conditionalOptions = ConditionalWriteOptions.ifNotExists();
} else {
conditionalOptions = ConditionalWriteOptions.ifMatch("valid-etag");
}

s3BlobContainer.asyncBlobUploadConditionally(writeContext, conditionalOptions, completionListener);

boolean awaitSuccess = countDownLatch.await(5000, TimeUnit.SECONDS);
assertTrue(awaitSuccess);

if (expectException || conditionalResponseCode != 0) {
assertNotNull("Should have received an exception", exceptionRef.get());

if (conditionalResponseCode == 412 && !expectException) {
assertTrue(
"Should have conditional error",
exceptionRef.get() instanceof OpenSearchException
|| exceptionRef.get().getMessage().contains("Precondition Failed")
|| exceptionRef.get().getMessage().contains("ETag mismatch")
);
} else if (conditionalResponseCode == 409 && !expectException) {
assertTrue(
"Should have conflict error",
exceptionRef.get().getMessage().contains("Resource Already Exists")
|| exceptionRef.get().getMessage().contains("already exists")
);
}
} else {
assertNull("Should not have received an exception", exceptionRef.get());
assertNotNull("Should have received a response", responseRef.get());
assertNotNull("Response should have version identifier", responseRef.get().getVersionIdentifier());
}

verify(s3BlobContainer, times(1)).executeMultipartUploadConditionally(
any(S3BlobStore.class),
anyString(),
any(InputStream.class),
anyLong(),
anyMap(),
any(ConditionalWriteOptions.class),
ArgumentMatchers.<ActionListener<ConditionalWriteResponse>>any()
);

boolean shouldAbort = expectException || (conditionalResponseCode != 0 && !expectException);
verify(client, times(shouldAbort ? 1 : 0)).abortMultipartUpload(any(AbortMultipartUploadRequest.class));

openInputStreams.forEach(inputStream -> {
try {
inputStream.close();
} catch (IOException ex) {}
});
}

public void testFailureWhenLargeFileRedirectedConditional() throws IOException, InterruptedException {
testLargeFilesRedirectedToSlowSyncClientConditional(true, WritePriority.LOW, 0);
testLargeFilesRedirectedToSlowSyncClientConditional(true, WritePriority.NORMAL, 0);
}

public void testLargeFileRedirectedConditional() throws IOException, InterruptedException {
testLargeFilesRedirectedToSlowSyncClientConditional(false, WritePriority.LOW, 0);
testLargeFilesRedirectedToSlowSyncClientConditional(false, WritePriority.NORMAL, 0);
}

public void testLargeFileRedirectedConditionalPreconditionFailed() throws IOException, InterruptedException {
testLargeFilesRedirectedToSlowSyncClientConditional(false, WritePriority.LOW, 412);
testLargeFilesRedirectedToSlowSyncClientConditional(false, WritePriority.NORMAL, 412);
}

public void testLargeFileRedirectedConditionalConflict() throws IOException, InterruptedException {
testLargeFilesRedirectedToSlowSyncClientConditional(false, WritePriority.LOW, 409);
testLargeFilesRedirectedToSlowSyncClientConditional(false, WritePriority.NORMAL, 409);
}

}
Loading