Skip to content

Commit bbda09f

Browse files
author
Sachin Kale
committed
Add support to clone existing pinned timestamp
Signed-off-by: Sachin Kale <[email protected]>
1 parent 5c4dcf9 commit bbda09f

File tree

3 files changed

+146
-7
lines changed

3 files changed

+146
-7
lines changed

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.remotestore;
1010

11+
import org.opensearch.action.LatchedActionListener;
1112
import org.opensearch.common.collect.Tuple;
1213
import org.opensearch.common.settings.Settings;
1314
import org.opensearch.common.unit.TimeValue;
@@ -17,6 +18,7 @@
1718
import org.opensearch.test.OpenSearchIntegTestCase;
1819

1920
import java.util.Set;
21+
import java.util.concurrent.CountDownLatch;
2022

2123
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
2224
public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase {
@@ -75,10 +77,25 @@ public void testTimestampPinUnpin() throws Exception {
7577

7678
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
7779

78-
// This should be a no-op as pinning entity is different
79-
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "no-snapshot", noOpActionListener);
8080
// Unpinning already pinned entity
8181
remoteStorePinnedTimestampService.unpinTimestamp(timestamp2, "ss3", noOpActionListener);
82+
83+
// This should fail as timestamp is not pinned by pinning entity
84+
CountDownLatch latch = new CountDownLatch(1);
85+
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "no-snapshot", new LatchedActionListener<>(new ActionListener<Void>() {
86+
@Override
87+
public void onResponse(Void unused) {
88+
// onResponse should not get called.
89+
fail();
90+
}
91+
92+
@Override
93+
public void onFailure(Exception e) {
94+
assertTrue(e instanceof IllegalArgumentException);
95+
}
96+
}, latch));
97+
latch.await();
98+
8299
// Adding different entity to already pinned timestamp
83100
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss5", noOpActionListener);
84101

@@ -93,4 +110,74 @@ public void testTimestampPinUnpin() throws Exception {
93110

94111
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
95112
}
113+
114+
public void testPinnedTimestampClone() throws Exception {
115+
prepareCluster(1, 1, INDEX_NAME, 0, 2);
116+
ensureGreen(INDEX_NAME);
117+
118+
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
119+
RemoteStorePinnedTimestampService.class,
120+
primaryNodeName(INDEX_NAME)
121+
);
122+
123+
long timestamp1 = System.currentTimeMillis() + 30000L;
124+
long timestamp2 = System.currentTimeMillis() + 60000L;
125+
long timestamp3 = System.currentTimeMillis() + 900000L;
126+
remoteStorePinnedTimestampService.pinTimestamp(timestamp1, "ss2", noOpActionListener);
127+
remoteStorePinnedTimestampService.pinTimestamp(timestamp2, "ss3", noOpActionListener);
128+
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss4", noOpActionListener);
129+
130+
// Clone timestamp1
131+
remoteStorePinnedTimestampService.cloneTimestamp(timestamp1, "ss2", "ss2-2", noOpActionListener);
132+
133+
// With clone, set of pinned timestamp will not change
134+
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
135+
assertBusy(
136+
() -> assertEquals(Set.of(timestamp1, timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2())
137+
);
138+
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
139+
140+
// Clone timestamp1 but provide invalid existing entity
141+
CountDownLatch latch = new CountDownLatch(1);
142+
remoteStorePinnedTimestampService.cloneTimestamp(
143+
timestamp1,
144+
"ss3",
145+
"ss2-3",
146+
new LatchedActionListener<>(new ActionListener<Void>() {
147+
@Override
148+
public void onResponse(Void unused) {
149+
// onResponse should not get called.
150+
fail();
151+
}
152+
153+
@Override
154+
public void onFailure(Exception e) {
155+
assertTrue(e instanceof IllegalArgumentException);
156+
}
157+
}, latch)
158+
);
159+
latch.await();
160+
161+
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
162+
assertBusy(
163+
() -> assertEquals(Set.of(timestamp1, timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2())
164+
);
165+
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
166+
167+
// Now we have timestamp1 pinned by 2 entities, unpin 1, this should not change set of pinned timestamps
168+
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "ss2", noOpActionListener);
169+
170+
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
171+
assertBusy(
172+
() -> assertEquals(Set.of(timestamp1, timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2())
173+
);
174+
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
175+
176+
// Now unpin second entity as well, set of pinned timestamp should be reduced by 1
177+
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "ss2-2", noOpActionListener);
178+
179+
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
180+
assertBusy(() -> assertEquals(Set.of(timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2()));
181+
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
182+
}
96183
}

server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -542,8 +542,11 @@ public static List<String> filterOutMetadataFilesBasedOnAge(
542542
if (RemoteStoreSettings.isPinnedTimestampsEnabled() == false) {
543543
return new ArrayList<>(metadataFiles);
544544
}
545-
long maximumAllowedTimestamp = lastSuccessfulFetchOfPinnedTimestamps - RemoteStoreSettings.getPinnedTimestampsLookbackInterval()
546-
.getMillis();
545+
// We allow now() - loopback interval to be pinned. Also, the actual pinning can take at most loopback interval
546+
// This means the pinned timestamp can be available for read after at most (2 * loopback interval)
547+
long maximumAllowedTimestamp = lastSuccessfulFetchOfPinnedTimestamps - (2 * RemoteStoreSettings
548+
.getPinnedTimestampsLookbackInterval()
549+
.getMillis());
547550
List<String> metadataFilesWithMinAge = new ArrayList<>();
548551
for (String metadataFileName : metadataFiles) {
549552
long metadataTimestamp = getTimestampFunction.apply(metadataFileName);

server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,61 @@ public void pinTimestamp(long timestamp, String pinningEntity, ActionListener<Vo
108108
"Timestamp to be pinned is less than current timestamp - value of cluster.remote_store.pinned_timestamps.lookback_interval"
109109
);
110110
}
111+
long startTime = System.nanoTime();
111112
try {
112113
logger.debug("Pinning timestamp = {} against entity = {}", timestamp, pinningEntity);
113114
blobContainer.writeBlob(getBlobName(timestamp, pinningEntity), new ByteArrayInputStream(new byte[0]), 0, true);
115+
long elapsedTime = System.nanoTime() - startTime;
116+
if (elapsedTime > RemoteStoreSettings.getPinnedTimestampsLookbackInterval().nanos()) {
117+
String errorMessage = String.format(
118+
"Timestamp pinning took %s nanoseconds which is more than limit of %s nanoseconds, failing the operation",
119+
elapsedTime,
120+
RemoteStoreSettings.getPinnedTimestampsLookbackInterval().nanos()
121+
);
122+
unpinTimestamp(timestamp, pinningEntity, ActionListener.wrap(() -> listener.onFailure(new RuntimeException(errorMessage))));
123+
} else {
124+
listener.onResponse(null);
125+
}
126+
} catch (IOException e) {
127+
listener.onFailure(e);
128+
}
129+
}
130+
131+
/**
132+
* Clones a timestamp by creating a new pinning entity for an existing timestamp.
133+
*
134+
* This method attempts to create a new pinning entity for a given timestamp that is already
135+
* associated with an existing pinning entity. If the timestamp exists for the existing entity,
136+
* a new blob is created for the new pinning entity. If the timestamp doesn't exist for the
137+
* existing entity, the operation fails with an IllegalArgumentException.
138+
*
139+
* @param timestamp The timestamp to be cloned.
140+
* @param existingPinningEntity The name of the existing entity that has pinned the timestamp.
141+
* @param newPinningEntity The name of the new entity to pin the timestamp to.
142+
* @param listener An ActionListener that will be notified of the operation's success or failure.
143+
* On success, onResponse will be called with null. On failure, onFailure will
144+
* be called with the appropriate exception.
145+
*/
146+
public void cloneTimestamp(long timestamp, String existingPinningEntity, String newPinningEntity, ActionListener<Void> listener) {
147+
try {
148+
logger.debug(
149+
"cloning timestamp = {} with existing pinningEntity = {} with new pinningEntity = {}",
150+
timestamp,
151+
existingPinningEntity,
152+
newPinningEntity
153+
);
154+
String blobName = getBlobName(timestamp, existingPinningEntity);
155+
if (blobContainer.blobExists(blobName)) {
156+
logger.debug("Pinning timestamp = {} against entity = {}", timestamp, newPinningEntity);
157+
blobContainer.writeBlob(getBlobName(timestamp, newPinningEntity), new ByteArrayInputStream(new byte[0]), 0, true);
158+
listener.onResponse(null);
159+
} else {
160+
String errorMessage = String.format("Timestamp: %s is not pinned by existing entity: %s", timestamp, existingPinningEntity);
161+
listener.onFailure(new IllegalArgumentException(errorMessage));
162+
}
114163
} catch (IOException e) {
115164
listener.onFailure(e);
116165
}
117-
listener.onResponse(null);
118166
}
119167

120168
private String getBlobName(long timestamp, String pinningEntity) {
@@ -147,13 +195,14 @@ public void unpinTimestamp(long timestamp, String pinningEntity, ActionListener<
147195
String blobName = getBlobName(timestamp, pinningEntity);
148196
if (blobContainer.blobExists(blobName)) {
149197
blobContainer.deleteBlobsIgnoringIfNotExists(List.of(blobName));
198+
listener.onResponse(null);
150199
} else {
151-
logger.warn("Timestamp: {} is not pinned by entity: {}", timestamp, pinningEntity);
200+
String errorMessage = String.format("Timestamp: %s is not pinned by entity: %s", timestamp, pinningEntity);
201+
listener.onFailure(new IllegalArgumentException(errorMessage));
152202
}
153203
} catch (IOException e) {
154204
listener.onFailure(e);
155205
}
156-
listener.onResponse(null);
157206
}
158207

159208
@Override

0 commit comments

Comments
 (0)