Skip to content

Commit bb0ed2b

Browse files
committed
PR Part 1: Support for Singular Conditional Upload function and its unit tests for meta opensearch-project#17859
1 parent caf5d71 commit bb0ed2b

File tree

2 files changed

+606
-23
lines changed

2 files changed

+606
-23
lines changed

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

Lines changed: 82 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,30 +32,13 @@
3232

3333
package org.opensearch.repositories.s3;
3434

35+
import org.opensearch.OpenSearchException;
3536
import software.amazon.awssdk.core.ResponseInputStream;
3637
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
3738
import software.amazon.awssdk.core.exception.SdkException;
3839
import software.amazon.awssdk.core.sync.RequestBody;
3940
import software.amazon.awssdk.services.s3.S3AsyncClient;
40-
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
41-
import software.amazon.awssdk.services.s3.model.CommonPrefix;
42-
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
43-
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
44-
import software.amazon.awssdk.services.s3.model.CompletedPart;
45-
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
46-
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest;
47-
import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse;
48-
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
49-
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
50-
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
51-
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
52-
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
53-
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
54-
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
55-
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
56-
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
57-
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
58-
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
41+
import software.amazon.awssdk.services.s3.model.*;
5942
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
6043
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
6144
import software.amazon.awssdk.utils.CollectionUtils;
@@ -97,12 +80,14 @@
9780
import java.io.InputStream;
9881
import java.util.ArrayList;
9982
import java.util.List;
83+
import java.util.Locale;
10084
import java.util.Map;
10185
import java.util.concurrent.CompletableFuture;
10286
import java.util.concurrent.ExecutionException;
10387
import java.util.concurrent.atomic.AtomicLong;
10488
import java.util.function.Function;
10589
import java.util.stream.Collectors;
90+
import java.util.Collections;
10691

10792
import org.reactivestreams.Subscriber;
10893
import org.reactivestreams.Subscription;
@@ -508,6 +493,84 @@ private ListObjectsV2Request listObjectsRequest(String keyPath, int limit) {
508493
private String buildKey(String blobName) {
509494
return keyPath + blobName;
510495
}
496+
/**
497+
* Executes a upload to S3 using a conditional If-Match header.
498+
* The upload only proceeds if the existing object's ETag matches the provided value.
499+
*
500+
* @param blobStore the S3 blob store
501+
* @param blobName the key (name) of the blob
502+
* @param input the input stream containing the blob data
503+
* @param blobSize the size of the blob in bytes
504+
* @param metadata optional metadata to be associated with the blob
505+
* @param ETag the expected ETag value for conditional upload
506+
* @param etagListener listener to handle the resulting ETag or error notifications
507+
* @throws IOException if an error occurs during upload or if validations fail
508+
*/
509+
void executeSingleUploadIfEtagMatches(final S3BlobStore blobStore,
510+
final String blobName,
511+
final InputStream input,
512+
final long blobSize,
513+
final Map<String, String> metadata,
514+
final String ETag,
515+
ActionListener<String> etagListener) throws IOException {
516+
// Extra safety checks remain the same.
517+
if (blobSize > MAX_FILE_SIZE.getBytes()) {
518+
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than " + MAX_FILE_SIZE);
519+
}
520+
if (blobSize > blobStore.bufferSizeInBytes()) {
521+
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than buffer size");
522+
}
523+
524+
PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder()
525+
.bucket(blobStore.bucket())
526+
.key(blobName)
527+
.contentLength(blobSize)
528+
.storageClass(blobStore.getStorageClass())
529+
.ifMatch(ETag)
530+
.acl(blobStore.getCannedACL())
531+
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher));
532+
533+
if (CollectionUtils.isNotEmpty(metadata)) {
534+
putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata);
535+
}
536+
if (blobStore.serverSideEncryption()) {
537+
putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
538+
}
539+
540+
PutObjectRequest putObjectRequest = putObjectRequestBuilder.build();
541+
542+
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
543+
final InputStream requestInputStream = blobStore.isUploadRetryEnabled()
544+
? new BufferedInputStream(input, (int) (blobSize + 1))
545+
: input;
546+
547+
PutObjectResponse response = SocketAccess.doPrivileged(() ->
548+
clientReference.get().putObject(putObjectRequest, RequestBody.fromInputStream(requestInputStream, blobSize))
549+
);
550+
551+
if(response.eTag()!=null){
552+
etagListener.onResponse(response.eTag());
553+
}
554+
555+
}
556+
catch (S3Exception e) {
557+
if (e.statusCode() == 412) {
558+
etagListener.onFailure(new OpenSearchException(
559+
"stale_primary_shard",
560+
"Precondition Failed : Etag Mismatch",
561+
blobName,
562+
e
563+
));
564+
} else {
565+
etagListener.onFailure(new IOException(
566+
String.format(Locale.ROOT, "S3 error during upload [%s]: %s", blobName, e.getMessage()), e));
567+
}
568+
} catch (SdkException e) {
569+
etagListener.onFailure(new IOException(
570+
String.format(Locale.ROOT, "S3 upload failed for [%s]", blobName), e));
571+
}
572+
}
573+
511574

512575
/**
513576
* Uploads a blob using a single upload request

0 commit comments

Comments
 (0)