Skip to content

Commit 553010a

Browse files
committed
Add support for multiple hosts with specific ports (#41)
1 parent 9ffb712 commit 553010a

File tree

11 files changed

+342
-150
lines changed

11 files changed

+342
-150
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
66

77
## Unreleased
88
### Added
9+
- Add support for connections with multiple contact points using different ports (see feature request
10+
[#41](https://github.com/ing-bank/cassandra-jdbc-wrapper/issues/41)).
911
- Handle additional types and conversions in the methods `CassandraPreparedStatement.setObject()`:
1012
- JDBC types `BLOB`, `CLOB`, `NCLOB` and Java types `java.sql.Blob`, `java.sql.Clob`, and `java.sql.NClob` handled as
1113
arrays of bytes (CQL type `blob`)

README.md

+14-2
Original file line numberDiff line numberDiff line change
@@ -114,15 +114,27 @@ Connect to a Cassandra cluster using the following arguments:
114114
please read the section "[Connecting to DBaaS](#connecting-to-dbaas)"; to use a configuration file, please read the
115115
section "[Using a configuration file](#using-a-configuration-file)")
116116

117-
You can give the driver any number of hosts you want separated by "--".
117+
You can give the driver any number of hosts you want separated by "--". You can optionally specify a port for each host.
118+
If only one port is specified after all the listed hosts, it applies to all hosts. If no port is specified at all, the
119+
default Cassandra port (9042) is used.
118120
They will be used as contact points for the driver to discover the entire cluster.
119121
Give enough hosts taking into account that some nodes may be unavailable upon establishing the JDBC connection.
120122

123+
Here are some examples of connection strings with single or multiple contact points:
124+
125+
| Valid JDBC URL | Contact points used for connection |
126+
|--------------------------------------------------------------|------------------------------------|
127+
| jdbc:cassandra://localhost/keyspace | localhost:9042 |
128+
| jdbc:cassandra://localhost:9043/keyspace | localhost:9043 |
129+
| jdbc:cassandra://host1--host2/keyspace | host1:9042, host2:9042 |
130+
| jdbc:cassandra://host1--host2:9043/keyspace | host1:9043, host2:9043 |
131+
| jdbc:cassandra://host1:9042--host2--host3:9043/keyspace | host1:9042, host2:9043, host3:9043 |
132+
121133
You also have to specify the name of the local data center to use when the default load balancing policy is defined
122134
(see paragraph below about load balancing policies) and no configuration file is specified.
123135

124136
Statements and prepared statements can be executed as with any JDBC driver, but note that queries must be expressed in
125-
CQL3.
137+
CQL3 (Cassandra Query Language).
126138

127139
Java example:
128140
```java

src/main/java/com/ing/data/cassandra/jdbc/CassandraDataSource.java

+40-54
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,28 @@
1717

1818
import com.datastax.oss.driver.api.core.ConsistencyLevel;
1919
import com.datastax.oss.driver.internal.core.loadbalancing.DefaultLoadBalancingPolicy;
20-
import com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil;
20+
import com.ing.data.cassandra.jdbc.utils.ContactPoint;
2121

2222
import javax.sql.ConnectionPoolDataSource;
2323
import javax.sql.DataSource;
2424
import java.io.PrintWriter;
2525
import java.sql.DriverManager;
2626
import java.sql.SQLException;
2727
import java.sql.SQLFeatureNotSupportedException;
28-
import java.sql.SQLNonTransientConnectionException;
28+
import java.util.Collections;
29+
import java.util.List;
2930
import java.util.Properties;
3031
import java.util.logging.Logger;
3132

32-
import static com.ing.data.cassandra.jdbc.utils.ErrorConstants.HOST_REQUIRED;
3333
import static com.ing.data.cassandra.jdbc.utils.ErrorConstants.NOT_SUPPORTED;
3434
import static com.ing.data.cassandra.jdbc.utils.ErrorConstants.NO_INTERFACE;
35-
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.DEFAULT_PORT;
3635
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.PROTOCOL;
3736
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_CONSISTENCY_LEVEL;
37+
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_CONTACT_POINTS;
3838
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_CQL_VERSION;
3939
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_DATABASE_NAME;
4040
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_LOCAL_DATACENTER;
4141
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_PASSWORD;
42-
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_PORT_NUMBER;
43-
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_SERVER_NAME;
4442
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_USER;
4543
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.createSubName;
4644

@@ -63,13 +61,9 @@ public class CassandraDataSource implements ConnectionPoolDataSource, DataSource
6361
*/
6462
protected static final String DATA_SOURCE_DESCRIPTION = "Cassandra Data Source";
6563
/**
66-
* The server host name where the data source is located.
64+
* The contact points of the data source.
6765
*/
68-
protected String serverName;
69-
/**
70-
* The port number of the data source, by default {@value JdbcUrlUtil#DEFAULT_PORT}.
71-
*/
72-
protected int portNumber = DEFAULT_PORT;
66+
protected List<ContactPoint> contactPoints;
7367
/**
7468
* The database name. In case of Cassandra, i.e. the keyspace used as data source.
7569
*/
@@ -110,10 +104,26 @@ public class CassandraDataSource implements ConnectionPoolDataSource, DataSource
110104
* @param user The username used to connect.
111105
* @param password The password used to connect.
112106
* @param consistency The consistency level.
107+
* @deprecated For removal. Use {@link #CassandraDataSource(List, String, String, String, String)} instead.
113108
*/
109+
@Deprecated
114110
public CassandraDataSource(final String host, final int port, final String keyspace, final String user,
115111
final String password, final String consistency) {
116-
this(host, port, keyspace, user, password, null, consistency, null);
112+
this(Collections.singletonList(ContactPoint.of(host, port)), keyspace, user, password, null, consistency, null);
113+
}
114+
115+
/**
116+
* Constructor.
117+
*
118+
* @param contactPoints The contact points.
119+
* @param keyspace The keyspace.
120+
* @param user The username used to connect.
121+
* @param password The password used to connect.
122+
* @param consistency The consistency level.
123+
*/
124+
public CassandraDataSource(final List<ContactPoint> contactPoints, final String keyspace, final String user,
125+
final String password, final String consistency) {
126+
this(contactPoints, keyspace, user, password, null, consistency, null);
117127
}
118128

119129
/**
@@ -126,34 +136,31 @@ public CassandraDataSource(final String host, final int port, final String keysp
126136
* @param password The password used to connect.
127137
* @param version The CQL version. Deprecated, do not use anymore.
128138
* @param consistency The consistency level.
129-
* @deprecated For removal. Use {@link #CassandraDataSource(String, int, String, String, String, String)} instead.
139+
* @deprecated For removal. Use {@link #CassandraDataSource(List, String, String, String, String)} instead.
130140
*/
131141
@Deprecated
132142
public CassandraDataSource(final String host, final int port, final String keyspace, final String user,
133143
final String password, final String version, final String consistency) {
134-
this(host, port, keyspace, user, password, version, consistency, null);
144+
this(Collections.singletonList(ContactPoint.of(host, port)),
145+
keyspace, user, password, version, consistency, null);
135146
}
136147

137148
/**
138149
* Constructor specifying a local datacenter (required to use {@link DefaultLoadBalancingPolicy}).
139150
*
140-
* @param host The host name.
141-
* @param port The port.
151+
* @param contactPoints The contact points.
142152
* @param keyspace The keyspace.
143153
* @param user The username used to connect.
144154
* @param password The password used to connect.
145155
* @param version The CQL version. Deprecated, do not use anymore.
146156
* @param consistency The consistency level.
147157
* @param localDataCenter The local datacenter.
148158
*/
149-
public CassandraDataSource(final String host, final int port, final String keyspace, final String user,
159+
public CassandraDataSource(final List<ContactPoint> contactPoints, final String keyspace, final String user,
150160
final String password, final String version, final String consistency,
151161
final String localDataCenter) {
152-
if (host != null) {
153-
setServerName(host);
154-
}
155-
if (port >= 0) {
156-
setPortNumber(port);
162+
if (contactPoints != null && !contactPoints.isEmpty()) {
163+
setContactPoints(contactPoints);
157164
}
158165
if (version != null) {
159166
setVersion(version);
@@ -179,21 +186,21 @@ public String getDescription() {
179186
}
180187

181188
/**
182-
* Gets the server host name where the data source is located.
189+
* Gets the contact points of the data source.
183190
*
184-
* @return The server host name where the data source is located.
191+
* @return The contact points of the data source.
185192
*/
186-
public String getServerName() {
187-
return this.serverName;
193+
public List<ContactPoint> getContactPoints() {
194+
return this.contactPoints;
188195
}
189196

190197
/**
191-
* Sets the server host name where the data source is located.
198+
* Sets the contact points of the data source.
192199
*
193-
* @param serverName The host name.
200+
* @param contactPoints The contact points of the data source.
194201
*/
195-
public void setServerName(final String serverName) {
196-
this.serverName = serverName;
202+
public void setContactPoints(final List<ContactPoint> contactPoints) {
203+
this.contactPoints = contactPoints;
197204
}
198205

199206
/**
@@ -246,24 +253,6 @@ public void setConsistency(final String consistency) {
246253
this.consistency = consistency;
247254
}
248255

249-
/**
250-
* Gets the port number of the data source.
251-
*
252-
* @return The port number of the data source.
253-
*/
254-
public int getPortNumber() {
255-
return this.portNumber;
256-
}
257-
258-
/**
259-
* Sets the port number of the data source.
260-
*
261-
* @param portNumber The port number of the data source.
262-
*/
263-
public void setPortNumber(final int portNumber) {
264-
this.portNumber = portNumber;
265-
}
266-
267256
/**
268257
* Gets the database name. In case of Cassandra, i.e. the keyspace used as data source.
269258
*
@@ -347,12 +336,9 @@ public CassandraConnection getConnection(final String user, final String passwor
347336
this.user = user;
348337
this.password = password;
349338

350-
if (this.serverName != null) {
351-
props.setProperty(TAG_SERVER_NAME, this.serverName);
352-
} else {
353-
throw new SQLNonTransientConnectionException(HOST_REQUIRED);
339+
if (this.contactPoints != null && !this.contactPoints.isEmpty()) {
340+
props.put(TAG_CONTACT_POINTS, this.contactPoints);
354341
}
355-
props.setProperty(TAG_PORT_NUMBER, String.valueOf(this.portNumber));
356342
if (this.databaseName != null) {
357343
props.setProperty(TAG_DATABASE_NAME, this.databaseName);
358344
}

src/main/java/com/ing/data/cassandra/jdbc/CassandraDriver.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import static com.ing.data.cassandra.jdbc.utils.DriverUtil.parseVersion;
3737
import static com.ing.data.cassandra.jdbc.utils.ErrorConstants.NOT_SUPPORTED;
3838
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.PROTOCOL;
39+
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_CONTACT_POINTS;
3940
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_PASSWORD;
4041
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_USER;
4142

@@ -75,7 +76,9 @@ public Connection connect(final String url, final Properties properties) throws
7576
final Enumeration<Object> keys = properties.keys();
7677
while (keys.hasMoreElements()) {
7778
final String key = (String) keys.nextElement();
78-
params.put(key, properties.getProperty(key));
79+
if (!TAG_CONTACT_POINTS.equals(key)) {
80+
params.put(key, properties.getProperty(key));
81+
}
7982
}
8083
params.put(SessionHolder.URL_KEY, url);
8184

src/main/java/com/ing/data/cassandra/jdbc/SessionHolder.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,18 @@
4040
import com.ing.data.cassandra.jdbc.codec.TimestampToLongCodec;
4141
import com.ing.data.cassandra.jdbc.codec.TinyintToIntCodec;
4242
import com.ing.data.cassandra.jdbc.codec.VarintToIntCodec;
43+
import com.ing.data.cassandra.jdbc.utils.ContactPoint;
4344
import org.apache.commons.lang3.StringUtils;
4445
import org.apache.commons.lang3.math.NumberUtils;
4546
import org.slf4j.Logger;
4647
import org.slf4j.LoggerFactory;
4748

4849
import java.io.File;
49-
import java.net.InetSocketAddress;
5050
import java.sql.SQLException;
5151
import java.sql.SQLNonTransientConnectionException;
5252
import java.time.Duration;
5353
import java.time.temporal.ChronoUnit;
5454
import java.util.ArrayList;
55-
import java.util.Arrays;
5655
import java.util.HashMap;
5756
import java.util.List;
5857
import java.util.Map;
@@ -70,18 +69,17 @@
7069
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_CONFIG_FILE;
7170
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_CONNECT_TIMEOUT;
7271
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_CONSISTENCY_LEVEL;
72+
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_CONTACT_POINTS;
7373
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_DATABASE_NAME;
7474
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_DEBUG;
7575
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_ENABLE_SSL;
7676
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_KEEP_ALIVE;
7777
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_LOAD_BALANCING_POLICY;
7878
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_LOCAL_DATACENTER;
7979
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_PASSWORD;
80-
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_PORT_NUMBER;
8180
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_RECONNECT_POLICY;
8281
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_REQUEST_TIMEOUT;
8382
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_RETRY_POLICY;
84-
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_SERVER_NAME;
8583
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_SSL_ENGINE_FACTORY;
8684
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_SSL_HOSTNAME_VERIFICATION;
8785
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_TCP_NO_DELAY;
@@ -181,6 +179,7 @@ boolean acquire() {
181179
}
182180
}
183181

182+
@SuppressWarnings("unchecked")
184183
private Session createSession(final Properties properties) throws SQLException {
185184
File configurationFile = null;
186185
boolean configurationFileExists = false;
@@ -209,8 +208,7 @@ private Session createSession(final Properties properties) throws SQLException {
209208
}
210209
}
211210

212-
final String hosts = properties.getProperty(TAG_SERVER_NAME);
213-
final int port = Integer.parseInt(properties.getProperty(TAG_PORT_NUMBER));
211+
final List<ContactPoint> contactPoints = (List<ContactPoint>) properties.get(TAG_CONTACT_POINTS);
214212
final String cloudSecureConnectBundle = properties.getProperty(TAG_CLOUD_SECURE_CONNECT_BUNDLE);
215213
final String keyspace = properties.getProperty(TAG_DATABASE_NAME);
216214
final String username = properties.getProperty(TAG_USER, StringUtils.EMPTY);
@@ -243,12 +241,14 @@ private Session createSession(final Properties properties) throws SQLException {
243241
if (StringUtils.isNotBlank(cloudSecureConnectBundle)) {
244242
driverConfigLoaderBuilder.withString(DefaultDriverOption.CLOUD_SECURE_CONNECT_BUNDLE,
245243
cloudSecureConnectBundle);
246-
LOG.info("Cloud secure connect bundle used. Host(s) {} will be ignored.", hosts);
244+
LOG.info("Cloud secure connect bundle used. Host(s) {} will be ignored.",
245+
contactPoints.stream()
246+
.map(ContactPoint::toString)
247+
.collect(Collectors.joining(", ")));
247248
} else {
248-
builder.addContactPoints(Arrays.stream(hosts.split("--"))
249-
.map(host -> InetSocketAddress.createUnresolved(host, port))
250-
.collect(Collectors.toList())
251-
);
249+
builder.addContactPoints(contactPoints.stream()
250+
.map(ContactPoint::toInetSocketAddress)
251+
.collect(Collectors.toList()));
252252
}
253253

254254
// Set request timeout (in milliseconds) if defined.

0 commit comments

Comments
 (0)