Skip to content

Commit e280866

Browse files
authored
Move DataSourceServiceImpl to core module (opensearch-project#1084)
Signed-off-by: Peng Huo <[email protected]>
1 parent fef20f8 commit e280866

File tree

24 files changed

+686
-448
lines changed

24 files changed

+686
-448
lines changed

core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,37 @@
77

88
import java.util.Set;
99
import org.opensearch.sql.datasource.model.DataSource;
10-
import org.opensearch.sql.storage.StorageEngine;
10+
import org.opensearch.sql.datasource.model.DataSourceMetadata;
1111

1212
/**
13-
* DataSource Service manages datasources.
13+
* DataSource Service manage {@link DataSource}.
1414
*/
1515
public interface DataSourceService {
1616

1717
/**
18-
* Returns all datasource objects.
18+
* Returns all DataSource objects.
1919
*
20-
* @return DataSource datasources.
20+
* @return set of {@link DataSource}.
2121
*/
2222
Set<DataSource> getDataSources();
2323

2424
/**
25-
* Returns DataSource with corresponding to the datasource name.
25+
* Returns {@link DataSource} with corresponding to the DataSource name.
2626
*
27-
* @param dataSourceName Name of the datasource.
28-
* @return DataSource datasource.
27+
* @param dataSourceName Name of the {@link DataSource}.
28+
* @return {@link DataSource}.
2929
*/
3030
DataSource getDataSource(String dataSourceName);
3131

3232
/**
33-
* Default opensearch engine is not defined in datasources config.
34-
* So the registration of default datasource happens separately.
33+
* Register {@link DataSource} defined by {@link DataSourceMetadata}.
3534
*
36-
* @param storageEngine StorageEngine.
35+
* @param metadatas list of {@link DataSourceMetadata}.
3736
*/
38-
void registerDefaultOpenSearchDataSource(StorageEngine storageEngine);
37+
void addDataSource(DataSourceMetadata... metadatas);
3938

39+
/**
40+
* remove all the registered {@link DataSource}.
41+
*/
42+
void clear();
4043
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.datasource;
7+
8+
import com.google.common.base.Preconditions;
9+
import com.google.common.base.Strings;
10+
import java.util.Map;
11+
import java.util.Objects;
12+
import java.util.Set;
13+
import java.util.concurrent.ConcurrentHashMap;
14+
import java.util.stream.Collectors;
15+
import org.opensearch.sql.common.utils.StringUtils;
16+
import org.opensearch.sql.datasource.model.DataSource;
17+
import org.opensearch.sql.datasource.model.DataSourceMetadata;
18+
import org.opensearch.sql.datasource.model.DataSourceType;
19+
import org.opensearch.sql.storage.DataSourceFactory;
20+
21+
/**
22+
* Default implementation of {@link DataSourceService}. It is per-jvm single instance.
23+
*
24+
* <p>{@link DataSourceService} is constructed by the list of {@link DataSourceFactory} at service
25+
* bootstrap time. The set of {@link DataSourceFactory} is immutable. Client could add {@link
26+
* DataSource} defined by {@link DataSourceMetadata} at any time. {@link DataSourceService} use
27+
* {@link DataSourceFactory} to create {@link DataSource}.
28+
*/
29+
public class DataSourceServiceImpl implements DataSourceService {
30+
31+
private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*";
32+
33+
private final ConcurrentHashMap<String, DataSource> dataSourceMap;
34+
35+
private final Map<DataSourceType, DataSourceFactory> dataSourceFactoryMap;
36+
37+
/**
38+
* Construct from the set of {@link DataSourceFactory} at bootstrap time.
39+
*/
40+
public DataSourceServiceImpl(Set<DataSourceFactory> dataSourceFactories) {
41+
dataSourceFactoryMap =
42+
dataSourceFactories.stream()
43+
.collect(Collectors.toMap(DataSourceFactory::getDataSourceType, f -> f));
44+
dataSourceMap = new ConcurrentHashMap<>();
45+
}
46+
47+
@Override
48+
public Set<DataSource> getDataSources() {
49+
return Set.copyOf(dataSourceMap.values());
50+
}
51+
52+
@Override
53+
public DataSource getDataSource(String dataSourceName) {
54+
if (!dataSourceMap.containsKey(dataSourceName)) {
55+
throw new IllegalArgumentException(
56+
String.format("DataSource with name %s doesn't exist.", dataSourceName));
57+
}
58+
return dataSourceMap.get(dataSourceName);
59+
}
60+
61+
@Override
62+
public void addDataSource(DataSourceMetadata... metadatas) {
63+
for (DataSourceMetadata metadata : metadatas) {
64+
validateDataSourceMetaData(metadata);
65+
dataSourceMap.put(
66+
metadata.getName(),
67+
dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata));
68+
}
69+
}
70+
71+
@Override
72+
public void clear() {
73+
dataSourceMap.clear();
74+
}
75+
76+
/**
77+
* This can be moved to a different validator class when we introduce more connectors.
78+
*
79+
* @param metadata {@link DataSourceMetadata}.
80+
*/
81+
private void validateDataSourceMetaData(DataSourceMetadata metadata) {
82+
Preconditions.checkArgument(
83+
!Strings.isNullOrEmpty(metadata.getName()),
84+
"Missing Name Field from a DataSource. Name is a required parameter.");
85+
Preconditions.checkArgument(
86+
!dataSourceMap.containsKey(metadata.getName()),
87+
StringUtils.format(
88+
"Datasource name should be unique, Duplicate datasource found %s.",
89+
metadata.getName()));
90+
Preconditions.checkArgument(
91+
metadata.getName().matches(DATASOURCE_NAME_REGEX),
92+
StringUtils.format(
93+
"DataSource Name: %s contains illegal characters. Allowed characters: a-zA-Z0-9_-*@.",
94+
metadata.getName()));
95+
Preconditions.checkArgument(
96+
!Objects.isNull(metadata.getProperties()),
97+
"Missing properties field in catalog configuration. Properties are required parameters.");
98+
}
99+
}

core/src/main/java/org/opensearch/sql/datasource/model/DataSource.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@
1212
import lombok.RequiredArgsConstructor;
1313
import org.opensearch.sql.storage.StorageEngine;
1414

15+
/**
16+
* Each user configured datasource mapping to one instance of DataSource per JVM.
17+
*/
1518
@Getter
1619
@RequiredArgsConstructor
1720
@EqualsAndHashCode
1821
public class DataSource {
1922

2023
private final String name;
2124

22-
private final ConnectorType connectorType;
25+
private final DataSourceType connectorType;
2326

2427
@EqualsAndHashCode.Exclude
2528
private final StorageEngine storageEngine;

core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,44 @@
55

66
package org.opensearch.sql.datasource.model;
77

8+
9+
import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
10+
811
import com.fasterxml.jackson.annotation.JsonFormat;
912
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
1013
import com.fasterxml.jackson.annotation.JsonProperty;
14+
import com.google.common.collect.ImmutableMap;
1115
import java.util.Map;
16+
import lombok.EqualsAndHashCode;
1217
import lombok.Getter;
1318
import lombok.Setter;
19+
import org.opensearch.sql.datasource.DataSourceService;
1420

1521
@JsonIgnoreProperties(ignoreUnknown = true)
1622
@Getter
1723
@Setter
24+
@EqualsAndHashCode
1825
public class DataSourceMetadata {
1926

2027
@JsonProperty(required = true)
2128
private String name;
2229

2330
@JsonProperty(required = true)
2431
@JsonFormat(with = JsonFormat.Feature.ACCEPT_CASE_INSENSITIVE_PROPERTIES)
25-
private ConnectorType connector;
32+
private DataSourceType connector;
2633

2734
@JsonProperty(required = true)
2835
private Map<String, String> properties;
2936

37+
/**
38+
* Default OpenSearch {@link DataSourceMetadata}. Which is used to register default OpenSearch
39+
* {@link DataSource} to {@link DataSourceService}.
40+
*/
41+
public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() {
42+
DataSourceMetadata dataSourceMetadata = new DataSourceMetadata();
43+
dataSourceMetadata.setName(DEFAULT_DATASOURCE_NAME);
44+
dataSourceMetadata.setConnector(DataSourceType.OPENSEARCH);
45+
dataSourceMetadata.setProperties(ImmutableMap.of());
46+
return dataSourceMetadata;
47+
}
3048
}

core/src/main/java/org/opensearch/sql/datasource/model/ConnectorType.java renamed to core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@
55

66
package org.opensearch.sql.datasource.model;
77

8-
public enum ConnectorType {
8+
public enum DataSourceType {
99
PROMETHEUS,OPENSEARCH
1010
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
*
3+
* * Copyright OpenSearch Contributors
4+
* * SPDX-License-Identifier: Apache-2.0
5+
*
6+
*/
7+
8+
package org.opensearch.sql.storage;
9+
10+
import org.opensearch.sql.datasource.DataSourceService;
11+
import org.opensearch.sql.datasource.model.DataSource;
12+
import org.opensearch.sql.datasource.model.DataSourceMetadata;
13+
import org.opensearch.sql.datasource.model.DataSourceType;
14+
15+
/**
16+
* {@link DataSourceFactory} is used to create {@link DataSource} from {@link DataSourceMetadata}.
17+
* Each data source define {@link DataSourceFactory} and register to {@link DataSourceService}.
18+
* {@link DataSourceFactory} is one instance per JVM . Each {@link DataSourceType} mapping to one
19+
* {@link DataSourceFactory}.
20+
*/
21+
public interface DataSourceFactory {
22+
/**
23+
* Get {@link DataSourceType}.
24+
*/
25+
DataSourceType getDataSourceType();
26+
27+
/**
28+
* Create {@link DataSource}.
29+
*/
30+
DataSource createDataSource(DataSourceMetadata metadata);
31+
}

core/src/main/java/org/opensearch/sql/storage/StorageEngineFactory.java

Lines changed: 0 additions & 19 deletions
This file was deleted.

core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
import org.opensearch.sql.config.TestConfig;
2323
import org.opensearch.sql.data.type.ExprType;
2424
import org.opensearch.sql.datasource.DataSourceService;
25-
import org.opensearch.sql.datasource.model.ConnectorType;
2625
import org.opensearch.sql.datasource.model.DataSource;
26+
import org.opensearch.sql.datasource.model.DataSourceMetadata;
27+
import org.opensearch.sql.datasource.model.DataSourceType;
2728
import org.opensearch.sql.exception.ExpressionEvaluationException;
28-
import org.opensearch.sql.expression.DSL;
2929
import org.opensearch.sql.expression.Expression;
3030
import org.opensearch.sql.expression.ReferenceExpression;
3131
import org.opensearch.sql.expression.env.Environment;
@@ -144,9 +144,7 @@ protected Environment<Expression, ExprType> typeEnv() {
144144
@Bean
145145
protected Analyzer analyzer(ExpressionAnalyzer expressionAnalyzer,
146146
DataSourceService dataSourceService,
147-
StorageEngine storageEngine,
148147
Table table) {
149-
dataSourceService.registerDefaultOpenSearchDataSource(storageEngine);
150148
BuiltinFunctionRepository functionRepository = BuiltinFunctionRepository.getInstance();
151149
functionRepository.register("prometheus", new FunctionResolver() {
152150

@@ -195,7 +193,7 @@ private class DefaultDataSourceService implements DataSourceService {
195193

196194
private StorageEngine storageEngine = storageEngine();
197195
private final DataSource dataSource
198-
= new DataSource("prometheus", ConnectorType.PROMETHEUS, storageEngine);
196+
= new DataSource("prometheus", DataSourceType.PROMETHEUS, storageEngine);
199197

200198

201199
@Override
@@ -209,8 +207,13 @@ public DataSource getDataSource(String dataSourceName) {
209207
}
210208

211209
@Override
212-
public void registerDefaultOpenSearchDataSource(StorageEngine storageEngine) {
213-
this.storageEngine = storageEngine;
210+
public void addDataSource(DataSourceMetadata... metadatas) {
211+
throw new UnsupportedOperationException();
212+
}
213+
214+
@Override
215+
public void clear() {
216+
throw new UnsupportedOperationException();
214217
}
215218
}
216219

0 commit comments

Comments
 (0)