Skip to content

Commit 874981e

Browse files
authored
Merge branch 'main' into index-stats-issue-10766
Signed-off-by: Andrew Ross <[email protected]>
2 parents 9e6d8fc + d9661c0 commit 874981e

File tree

13 files changed

+800
-0
lines changed

13 files changed

+800
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1212
- Create equivalents of JSM's AccessController in the java agent ([#18346](https://github.com/opensearch-project/OpenSearch/issues/18346))
1313
- Introduced a new cluster-level API to fetch remote store metadata (segments and translogs) for each shard of an index. ([#18257](https://github.com/opensearch-project/OpenSearch/pull/18257))
1414
- Add last index request timestamp columns to the `_cat/indices` API. ([10766](https://github.com/opensearch-project/OpenSearch/issues/10766))
15+
- Introduce a new pull-based ingestion plugin for file-based indexing (for local testing) ([#18591](https://github.com/opensearch-project/OpenSearch/pull/18591))
1516
- Add support for search pipeline in search and msearch template ([#18564](https://github.com/opensearch-project/OpenSearch/pull/18564))
1617

1718
### Changed

plugins/ingestion-fs/README.md

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# ingestion-fs plugin
2+
3+
The ingestion-fs plugin enables pull-based ingestion from the local file system into OpenSearch. It is primarily intended for local testing and development purposes to facilitate testing without setting up Kafka/Kinesis.
4+
5+
## Overview
6+
7+
This plugin implements a custom ingestion source for the [pull-based ingestion framework](https://docs.opensearch.org/docs/latest/api-reference/document-apis/pull-based-ingestion/). It allows OpenSearch to ingest documents from `.ndjson` files on the file system.
8+
9+
Each shard-specific file is expected to follow the path:
10+
```
11+
${base_directory}/${stream}/${shard_id}.ndjson
12+
```
13+
14+
## Usage
15+
16+
### 1. Prepare test data
17+
18+
Create the `ndjson` files with sample data following the format mentioned [here](https://docs.opensearch.org/docs/latest/api-reference/document-apis/pull-based-ingestion/).
19+
For example, create a file `${base_directory}/test-stream/0.ndjson` with data
20+
21+
```
22+
{"_id":"1","_version":"1","_op_type":"index","_source":{"name":"name1", "age": 30}}
23+
{"_id":"2","_version":"1","_op_type":"index","_source":{"name":"name2", "age": 31}}
24+
```
25+
26+
### 2. Start OpenSearch with the Plugin
27+
28+
```
29+
./gradlew run -PinstalledPlugins="['ingestion-fs']"
30+
```
31+
32+
### 3. Create a pull-based index
33+
34+
Create an index by specifying ingestion source settings as follows
35+
36+
<pre>
37+
PUT /test-index
38+
{
39+
"settings": {
40+
"ingestion_source": {
41+
"type": "file",
42+
"param": {
43+
"stream": "test_stream",
44+
"base_directory": "path to the base directory"
45+
}
46+
},
47+
"index.number_of_shards": 1,
48+
"index.number_of_replicas": 0,
49+
"index": {
50+
"replication.type": "SEGMENT"
51+
}
52+
},
53+
"mappings": {
54+
"properties": {
55+
"name": {
56+
"type": "text"
57+
},
58+
"age": {
59+
"type": "integer"
60+
}
61+
}
62+
}
63+
}
64+
</pre>

plugins/ingestion-fs/build.gradle

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
apply plugin: 'opensearch.internal-cluster-test'
10+
11+
opensearchplugin {
12+
description = 'Pull-based ingestion plugin to read from local files for testing'
13+
classname = 'org.opensearch.plugin.ingestion.fs.FilePlugin'
14+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.ingestion.fs;
10+
11+
import org.opensearch.index.IngestionConsumerFactory;
12+
13+
import java.util.Map;
14+
15+
/**
16+
* Factory for creating file-based ingestion consumers.
17+
*/
18+
public class FileConsumerFactory implements IngestionConsumerFactory<FilePartitionConsumer, FileOffset> {
19+
20+
private FileSourceConfig config;
21+
22+
/**
23+
* Initialize a FileConsumerFactory for file-based indexing.
24+
*/
25+
public FileConsumerFactory() {}
26+
27+
@Override
28+
public void initialize(Map<String, Object> params) {
29+
this.config = new FileSourceConfig(params);
30+
}
31+
32+
@Override
33+
public FilePartitionConsumer createShardConsumer(String clientId, int shardId) {
34+
assert config != null;
35+
return new FilePartitionConsumer(config, shardId);
36+
}
37+
38+
@Override
39+
public FileOffset parsePointerFromString(String pointer) {
40+
return new FileOffset(Long.parseLong(pointer));
41+
}
42+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.ingestion.fs;
10+
11+
import org.opensearch.index.Message;
12+
13+
/**
14+
* Message abstraction for file-based ingestion.
15+
*/
16+
public class FileMessage implements Message<byte[]> {
17+
private final byte[] payload;
18+
private final Long timestamp;
19+
20+
/**
21+
* Create a file message.
22+
* @param payload Line contents from the file, as a byte array.
23+
* @param timestamp Millisecond timestamp for when the line was read.
24+
*/
25+
public FileMessage(byte[] payload, Long timestamp) {
26+
this.payload = payload;
27+
this.timestamp = timestamp;
28+
}
29+
30+
@Override
31+
public byte[] getPayload() {
32+
return payload;
33+
}
34+
35+
@Override
36+
public Long getTimestamp() {
37+
return timestamp;
38+
}
39+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.ingestion.fs;
10+
11+
import org.apache.lucene.document.Field;
12+
import org.apache.lucene.document.LongPoint;
13+
import org.apache.lucene.search.Query;
14+
import org.opensearch.index.IngestionShardPointer;
15+
16+
import java.nio.ByteBuffer;
17+
18+
/**
19+
* Offset for a file-based ingestion source (line number).
20+
*/
21+
public class FileOffset implements IngestionShardPointer {
22+
23+
private final long line;
24+
25+
/**
26+
* Create a new file offset based on line number.
27+
* @param line line number (offset)
28+
*/
29+
public FileOffset(long line) {
30+
assert line >= 0;
31+
this.line = line;
32+
}
33+
34+
/**
35+
* Returns the line number (offset).
36+
*/
37+
public long getLine() {
38+
return line;
39+
}
40+
41+
@Override
42+
public byte[] serialize() {
43+
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
44+
buffer.putLong(line);
45+
return buffer.array();
46+
}
47+
48+
@Override
49+
public String asString() {
50+
return Long.toString(line);
51+
}
52+
53+
@Override
54+
public Field asPointField(String fieldName) {
55+
return new LongPoint(fieldName, line);
56+
}
57+
58+
@Override
59+
public Query newRangeQueryGreaterThan(String fieldName) {
60+
return LongPoint.newRangeQuery(fieldName, line, Long.MAX_VALUE);
61+
}
62+
63+
@Override
64+
public int compareTo(IngestionShardPointer o) {
65+
if (o == null || !(o instanceof FileOffset)) {
66+
throw new IllegalArgumentException("Incompatible pointer type: " + (o == null ? "null" : o.getClass()));
67+
}
68+
return Long.compare(this.line, ((FileOffset) o).line);
69+
}
70+
71+
@Override
72+
public boolean equals(Object o) {
73+
if (this == o) return true;
74+
if (!(o instanceof FileOffset)) return false;
75+
return this.line == ((FileOffset) o).line;
76+
}
77+
78+
@Override
79+
public int hashCode() {
80+
return Long.hashCode(line);
81+
}
82+
83+
@Override
84+
public String toString() {
85+
return "FileOffset{" + "line=" + line + '}';
86+
}
87+
}

0 commit comments

Comments
 (0)