Skip to content

Commit 82d55c6

Browse files
sachinpkaleSachin Kale
authored andcommitted
Upload segment to remote store post refresh (#3403)
Signed-off-by: Sachin Kale <[email protected]>
1 parent f2a3889 commit 82d55c6

File tree

2 files changed

+192
-3
lines changed

2 files changed

+192
-3
lines changed

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,19 @@
88

99
package org.opensearch.index.shard;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.logging.log4j.message.ParameterizedMessage;
1114
import org.apache.lucene.search.ReferenceManager;
1215
import org.apache.lucene.store.Directory;
16+
import org.apache.lucene.store.IOContext;
1317

1418
import java.io.IOException;
19+
import java.nio.file.NoSuchFileException;
20+
import java.util.Arrays;
21+
import java.util.HashSet;
22+
import java.util.Set;
23+
import java.util.stream.Collectors;
1524

1625
/**
1726
* RefreshListener implementation to upload newly created segment files to the remote store
@@ -20,19 +29,60 @@ public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListe
2029

2130
private final Directory storeDirectory;
2231
private final Directory remoteDirectory;
32+
// ToDo: This can be a map with metadata of the uploaded file as value of the map (GitHub #3398)
33+
private final Set<String> filesUploadedToRemoteStore;
34+
private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class);
2335

24-
public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) {
36+
public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) throws IOException {
2537
this.storeDirectory = storeDirectory;
2638
this.remoteDirectory = remoteDirectory;
39+
// ToDo: Handle failures in reading list of files (GitHub #3397)
40+
this.filesUploadedToRemoteStore = new HashSet<>(Arrays.asList(remoteDirectory.listAll()));
2741
}
2842

2943
@Override
3044
public void beforeRefresh() throws IOException {
31-
// ToDo Add implementation
45+
// Do Nothing
3246
}
3347

48+
/**
49+
* Upload new segment files created as part of the last refresh to the remote segment store.
50+
* The method also deletes segment files from remote store which are not part of local filesystem.
51+
* @param didRefresh true if the refresh opened a new reference
52+
* @throws IOException in case of I/O error in reading list of local files
53+
*/
3454
@Override
3555
public void afterRefresh(boolean didRefresh) throws IOException {
36-
// ToDo Add implementation
56+
if (didRefresh) {
57+
Set<String> localFiles = Arrays.stream(storeDirectory.listAll()).collect(Collectors.toSet());
58+
localFiles.stream().filter(file -> !filesUploadedToRemoteStore.contains(file)).forEach(file -> {
59+
try {
60+
remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT);
61+
filesUploadedToRemoteStore.add(file);
62+
} catch (NoSuchFileException e) {
63+
logger.info(
64+
() -> new ParameterizedMessage("The file {} does not exist anymore. It can happen in case of temp files", file),
65+
e
66+
);
67+
} catch (IOException e) {
68+
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
69+
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e);
70+
}
71+
});
72+
73+
Set<String> remoteFilesToBeDeleted = new HashSet<>();
74+
// ToDo: Instead of deleting files in sync, mark them and delete in async/periodic flow (GitHub #3142)
75+
filesUploadedToRemoteStore.stream().filter(file -> !localFiles.contains(file)).forEach(file -> {
76+
try {
77+
remoteDirectory.deleteFile(file);
78+
remoteFilesToBeDeleted.add(file);
79+
} catch (IOException e) {
80+
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
81+
logger.warn(() -> new ParameterizedMessage("Exception while deleting file {} from the remote segment store", file), e);
82+
}
83+
});
84+
85+
remoteFilesToBeDeleted.forEach(filesUploadedToRemoteStore::remove);
86+
}
3787
}
3888
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.shard;
10+
11+
import org.apache.lucene.store.Directory;
12+
import org.apache.lucene.store.IOContext;
13+
import org.opensearch.test.OpenSearchTestCase;
14+
15+
import java.io.IOException;
16+
import java.nio.file.NoSuchFileException;
17+
18+
import static org.mockito.ArgumentMatchers.any;
19+
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.when;
21+
import static org.mockito.Mockito.verify;
22+
import static org.mockito.Mockito.times;
23+
import static org.mockito.Mockito.doThrow;
24+
25+
public class RemoteStoreRefreshListenerTests extends OpenSearchTestCase {
26+
private Directory storeDirectory;
27+
private Directory remoteDirectory;
28+
29+
private RemoteStoreRefreshListener remoteStoreRefreshListener;
30+
31+
public void setup(String[] remoteFiles) throws IOException {
32+
storeDirectory = mock(Directory.class);
33+
remoteDirectory = mock(Directory.class);
34+
when(remoteDirectory.listAll()).thenReturn(remoteFiles);
35+
remoteStoreRefreshListener = new RemoteStoreRefreshListener(storeDirectory, remoteDirectory);
36+
}
37+
38+
public void testAfterRefreshFalse() throws IOException {
39+
setup(new String[0]);
40+
remoteStoreRefreshListener.afterRefresh(false);
41+
verify(storeDirectory, times(0)).listAll();
42+
}
43+
44+
public void testAfterRefreshTrueNoLocalFiles() throws IOException {
45+
setup(new String[0]);
46+
47+
when(storeDirectory.listAll()).thenReturn(new String[0]);
48+
49+
remoteStoreRefreshListener.afterRefresh(true);
50+
verify(storeDirectory).listAll();
51+
verify(remoteDirectory, times(0)).copyFrom(any(), any(), any(), any());
52+
verify(remoteDirectory, times(0)).deleteFile(any());
53+
}
54+
55+
public void testAfterRefreshOnlyUploadFiles() throws IOException {
56+
setup(new String[0]);
57+
58+
String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs", "0.cfe" };
59+
when(storeDirectory.listAll()).thenReturn(localFiles);
60+
61+
remoteStoreRefreshListener.afterRefresh(true);
62+
verify(storeDirectory).listAll();
63+
verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT);
64+
verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT);
65+
verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT);
66+
verify(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT);
67+
verify(remoteDirectory, times(0)).deleteFile(any());
68+
}
69+
70+
public void testAfterRefreshOnlyUploadAndDelete() throws IOException {
71+
setup(new String[] { "0.si", "0.cfs" });
72+
73+
String[] localFiles = new String[] { "segments_1", "1.si", "1.cfs", "1.cfe" };
74+
when(storeDirectory.listAll()).thenReturn(localFiles);
75+
76+
remoteStoreRefreshListener.afterRefresh(true);
77+
verify(storeDirectory).listAll();
78+
verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT);
79+
verify(remoteDirectory).copyFrom(storeDirectory, "1.si", "1.si", IOContext.DEFAULT);
80+
verify(remoteDirectory).copyFrom(storeDirectory, "1.cfs", "1.cfs", IOContext.DEFAULT);
81+
verify(remoteDirectory).copyFrom(storeDirectory, "1.cfe", "1.cfe", IOContext.DEFAULT);
82+
verify(remoteDirectory).deleteFile("0.si");
83+
verify(remoteDirectory).deleteFile("0.cfs");
84+
}
85+
86+
public void testAfterRefreshOnlyDelete() throws IOException {
87+
setup(new String[] { "0.si", "0.cfs" });
88+
89+
String[] localFiles = new String[] { "0.si" };
90+
when(storeDirectory.listAll()).thenReturn(localFiles);
91+
92+
remoteStoreRefreshListener.afterRefresh(true);
93+
verify(storeDirectory).listAll();
94+
verify(remoteDirectory, times(0)).copyFrom(any(), any(), any(), any());
95+
verify(remoteDirectory).deleteFile("0.cfs");
96+
}
97+
98+
public void testAfterRefreshTempLocalFile() throws IOException {
99+
setup(new String[0]);
100+
101+
String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs.tmp" };
102+
when(storeDirectory.listAll()).thenReturn(localFiles);
103+
doThrow(new NoSuchFileException("0.cfs.tmp")).when(remoteDirectory)
104+
.copyFrom(storeDirectory, "0.cfs.tmp", "0.cfs.tmp", IOContext.DEFAULT);
105+
106+
remoteStoreRefreshListener.afterRefresh(true);
107+
verify(storeDirectory).listAll();
108+
verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT);
109+
verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT);
110+
verify(remoteDirectory, times(0)).deleteFile(any());
111+
}
112+
113+
public void testAfterRefreshConsecutive() throws IOException {
114+
setup(new String[0]);
115+
116+
String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs", "0.cfe" };
117+
when(storeDirectory.listAll()).thenReturn(localFiles);
118+
doThrow(new IOException("0.cfs")).when(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfe", IOContext.DEFAULT);
119+
doThrow(new IOException("0.cfe")).when(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT);
120+
121+
remoteStoreRefreshListener.afterRefresh(true);
122+
verify(storeDirectory).listAll();
123+
verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT);
124+
verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT);
125+
verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT);
126+
verify(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT);
127+
verify(remoteDirectory, times(0)).deleteFile(any());
128+
129+
String[] localFilesSecondRefresh = new String[] { "segments_1", "0.cfs", "1.cfs", "1.cfe" };
130+
when(storeDirectory.listAll()).thenReturn(localFilesSecondRefresh);
131+
132+
remoteStoreRefreshListener.afterRefresh(true);
133+
134+
verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT);
135+
verify(remoteDirectory).copyFrom(storeDirectory, "1.cfs", "1.cfs", IOContext.DEFAULT);
136+
verify(remoteDirectory).copyFrom(storeDirectory, "1.cfe", "1.cfe", IOContext.DEFAULT);
137+
verify(remoteDirectory).deleteFile("0.si");
138+
}
139+
}

0 commit comments

Comments
 (0)