Skip to content

Commit 2c31c83

Browse files
committed
git commit -m "Implementation for : executeMultipartUploadIfEtagMatches"
1 parent 70ee923 commit 2c31c83

File tree

1 file changed

+167
-0
lines changed

1 file changed

+167
-0
lines changed

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
4141
import software.amazon.awssdk.services.s3.model.CommonPrefix;
4242
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
43+
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
4344
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
4445
import software.amazon.awssdk.services.s3.model.CompletedPart;
4546
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
@@ -53,6 +54,7 @@
5354
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
5455
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
5556
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
57+
import software.amazon.awssdk.services.s3.model.S3Exception;
5658
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
5759
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
5860
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
@@ -63,6 +65,7 @@
6365
import org.apache.logging.log4j.LogManager;
6466
import org.apache.logging.log4j.Logger;
6567
import org.apache.logging.log4j.message.ParameterizedMessage;
68+
import org.opensearch.OpenSearchException;
6669
import org.opensearch.action.support.PlainActionFuture;
6770
import org.opensearch.common.Nullable;
6871
import org.opensearch.common.SetOnce;
@@ -97,6 +100,7 @@
97100
import java.io.InputStream;
98101
import java.util.ArrayList;
99102
import java.util.List;
103+
import java.util.Locale;
100104
import java.util.Map;
101105
import java.util.concurrent.CompletableFuture;
102106
import java.util.concurrent.ExecutionException;
@@ -509,6 +513,169 @@ private String buildKey(String blobName) {
509513
return keyPath + blobName;
510514
}
511515

516+
public void executeMultipartUploadIfEtagMatches(
517+
final S3BlobStore blobStore,
518+
final String blobName,
519+
final InputStream input,
520+
final long blobSize,
521+
final Map<String, String> metadata,
522+
final String eTag,
523+
final ActionListener<String> etagListener
524+
) throws IOException {
525+
526+
ensureMultiPartUploadSize(blobSize);
527+
528+
final long partSize = blobStore.bufferSizeInBytes();
529+
final Tuple<Long, Long> multiparts = numberOfMultiparts(blobSize, partSize);
530+
if (multiparts.v1() > Integer.MAX_VALUE) {
531+
throw new IllegalArgumentException("Too many multipart upload parts; consider a larger buffer size.");
532+
}
533+
final int nbParts = multiparts.v1().intValue();
534+
final long lastPartSize = multiparts.v2();
535+
assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes";
536+
// test
537+
CreateMultipartUploadRequest.Builder createRequestBuilder = CreateMultipartUploadRequest.builder()
538+
.bucket(blobStore.bucket())
539+
.key(blobName)
540+
.storageClass(blobStore.getStorageClass())
541+
.acl(blobStore.getCannedACL())
542+
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector));
543+
544+
if (metadata != null && !metadata.isEmpty()) {
545+
createRequestBuilder.metadata(metadata);
546+
}
547+
if (blobStore.serverSideEncryption()) {
548+
createRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
549+
}
550+
551+
final CreateMultipartUploadRequest createMultipartUploadRequest = createRequestBuilder.build();
552+
final SetOnce<String> uploadId = new SetOnce<>();
553+
final String bucketName = blobStore.bucket();
554+
boolean success = false;
555+
556+
final InputStream requestInputStream = blobStore.isUploadRetryEnabled()
557+
? new BufferedInputStream(input, (int) (partSize + 1))
558+
: input;
559+
560+
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
561+
uploadId.set(
562+
SocketAccess.doPrivileged(() -> clientReference.get().createMultipartUpload(createMultipartUploadRequest).uploadId())
563+
);
564+
if (Strings.isEmpty(uploadId.get())) {
565+
IOException exception = new IOException("Failed to initialize multipart upload for " + blobName);
566+
etagListener.onFailure(exception);
567+
throw exception;
568+
}
569+
570+
final List<CompletedPart> parts = new ArrayList<>(nbParts);
571+
long bytesCount = 0;
572+
573+
for (int i = 1; i <= nbParts; i++) {
574+
long currentPartSize = (i < nbParts) ? partSize : lastPartSize;
575+
final UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
576+
.bucket(bucketName)
577+
.key(blobName)
578+
.uploadId(uploadId.get())
579+
.partNumber(i)
580+
.contentLength(currentPartSize)
581+
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector))
582+
.build();
583+
584+
bytesCount += currentPartSize;
585+
586+
final UploadPartResponse uploadResponse = SocketAccess.doPrivileged(
587+
() -> clientReference.get()
588+
.uploadPart(uploadPartRequest, RequestBody.fromInputStream(requestInputStream, currentPartSize))
589+
);
590+
591+
String partETag = uploadResponse.eTag();
592+
if (partETag == null) {
593+
IOException exception = new IOException(
594+
String.format(Locale.ROOT, "S3 part upload for [%s] part [%d] returned null ETag", blobName, i)
595+
);
596+
etagListener.onFailure(exception);
597+
throw exception;
598+
}
599+
600+
parts.add(CompletedPart.builder().partNumber(i).eTag(partETag).build());
601+
}
602+
603+
if (bytesCount != blobSize) {
604+
IOException exception = new IOException(
605+
String.format(Locale.ROOT, "Multipart upload for [%s] sent %d bytes; expected %d bytes", blobName, bytesCount, blobSize)
606+
);
607+
etagListener.onFailure(exception);
608+
throw exception;
609+
}
610+
611+
CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder()
612+
.bucket(bucketName)
613+
.key(blobName)
614+
.uploadId(uploadId.get())
615+
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build())
616+
.ifMatch(eTag)
617+
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector))
618+
.build();
619+
620+
CompleteMultipartUploadResponse completeResponse = SocketAccess.doPrivileged(
621+
() -> clientReference.get().completeMultipartUpload(completeRequest)
622+
);
623+
624+
if (completeResponse.eTag() != null) {
625+
success = true;
626+
etagListener.onResponse(completeResponse.eTag());
627+
} else {
628+
IOException exception = new IOException(
629+
"S3 multipart upload for [" + blobName + "] returned null ETag, violating data integrity expectations"
630+
);
631+
etagListener.onFailure(exception);
632+
throw exception;
633+
}
634+
635+
} catch (S3Exception e) {
636+
if (e.statusCode() == 412) {
637+
etagListener.onFailure(new OpenSearchException("stale_primary_shard", e, "Precondition Failed : Etag Mismatch", blobName));
638+
throw new IOException("Unable to upload object [" + blobName + "] due to ETag mismatch", e);
639+
} else {
640+
IOException exception = new IOException(
641+
String.format(Locale.ROOT, "S3 error during multipart upload [%s]: %s", blobName, e.getMessage()),
642+
e
643+
);
644+
etagListener.onFailure(exception);
645+
throw exception;
646+
}
647+
} catch (SdkException e) {
648+
IOException exception = new IOException(String.format(Locale.ROOT, "S3 multipart upload failed for [%s]", blobName), e);
649+
etagListener.onFailure(exception);
650+
throw exception;
651+
} catch (Exception e) {
652+
IOException exception = new IOException(
653+
String.format(Locale.ROOT, "Unexpected error during multipart upload [%s]: %s", blobName, e.getMessage()),
654+
e
655+
);
656+
etagListener.onFailure(exception);
657+
throw exception;
658+
} finally {
659+
if (!success && Strings.hasLength(uploadId.get())) {
660+
AbortMultipartUploadRequest abortRequest = AbortMultipartUploadRequest.builder()
661+
.bucket(bucketName)
662+
.key(blobName)
663+
.uploadId(uploadId.get())
664+
.build();
665+
try (AmazonS3Reference abortClient = blobStore.clientReference()) {
666+
SocketAccess.doPrivilegedVoid(() -> abortClient.get().abortMultipartUpload(abortRequest));
667+
} catch (Exception abortException) {
668+
logger.warn(
669+
"Failed to abort incomplete multipart upload [{}] with ID [{}]. "
670+
+ "This may result in orphaned S3 data and charges.",
671+
new Object[] { blobName, uploadId.get() },
672+
abortException
673+
);
674+
}
675+
}
676+
}
677+
}
678+
512679
/**
513680
* Uploads a blob using a single upload request
514681
*/

0 commit comments

Comments
 (0)