Skip to content

Changes for session and schema #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 82 additions & 16 deletions src/main/java/com/ing/data/cassandra/jdbc/CassandraConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,26 @@
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry;
import com.google.common.collect.Maps;
import com.ing.data.cassandra.jdbc.codec.BigintToBigDecimalCodec;
import com.ing.data.cassandra.jdbc.codec.DecimalToDoubleCodec;
import com.ing.data.cassandra.jdbc.codec.FloatToDoubleCodec;
import com.ing.data.cassandra.jdbc.codec.IntToLongCodec;
import com.ing.data.cassandra.jdbc.codec.LongToIntCodec;
import com.ing.data.cassandra.jdbc.codec.SmallintToIntCodec;
import com.ing.data.cassandra.jdbc.codec.TimestampToLongCodec;
import com.ing.data.cassandra.jdbc.codec.TinyintToIntCodec;
import com.ing.data.cassandra.jdbc.codec.VarintToIntCodec;
import com.ing.data.cassandra.jdbc.optionset.Default;
import com.ing.data.cassandra.jdbc.optionset.OptionSet;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLNonTransientConnectionException;
Expand All @@ -38,9 +50,11 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
Expand All @@ -55,6 +69,7 @@
import static com.ing.data.cassandra.jdbc.Utils.NO_TRANSACTIONS;
import static com.ing.data.cassandra.jdbc.Utils.PROTOCOL;
import static com.ing.data.cassandra.jdbc.Utils.TAG_ACTIVE_CQL_VERSION;
import static com.ing.data.cassandra.jdbc.Utils.TAG_COMPLIANCE_MODE;
import static com.ing.data.cassandra.jdbc.Utils.TAG_CONSISTENCY_LEVEL;
import static com.ing.data.cassandra.jdbc.Utils.TAG_CQL_VERSION;
import static com.ing.data.cassandra.jdbc.Utils.TAG_DATABASE_NAME;
Expand Down Expand Up @@ -117,6 +132,7 @@ public class CassandraConnection extends AbstractConnection implements Connectio
private final boolean debugMode;
private Properties clientInfo;
private volatile boolean isClosed;
private final OptionSet optionSet;

/**
* Instantiates a new JDBC connection to a Cassandra cluster.
Expand All @@ -134,6 +150,7 @@ public CassandraConnection(final SessionHolder sessionHolder) throws SQLExceptio
this.clientInfo = new Properties();
this.url = PROTOCOL.concat(createSubName(sessionProperties));
this.currentKeyspace = sessionProperties.getProperty(TAG_DATABASE_NAME);
this.optionSet = lookupOptionSet(sessionProperties.getProperty(TAG_COMPLIANCE_MODE));
this.username = sessionProperties.getProperty(TAG_USER,
defaultConfigProfile.getString(DefaultDriverOption.AUTH_PROVIDER_USER_NAME, StringUtils.EMPTY));
final String cqlVersion = sessionProperties.getProperty(TAG_CQL_VERSION, DEFAULT_CQL_VERSION);
Expand Down Expand Up @@ -166,6 +183,45 @@ public CassandraConnection(final SessionHolder sessionHolder) throws SQLExceptio
});
}

/**
* Instantiates a new JDBC connection to a Cassandra cluster using preexisting session.
* @param cSession Session to use
* @param currentKeyspace Keyspace to use
* @param defaultConsistencyLevel Consistency level
* @param debugMode Debug mode flag
* @param optionSet Compliance mode option set
*/
public CassandraConnection(final Session cSession, final String currentKeyspace,
final ConsistencyLevel defaultConsistencyLevel,
final boolean debugMode, final OptionSet optionSet) {
this.sessionHolder = null;
this.connectionProperties = new Properties();

if (optionSet == null) {
this.optionSet = lookupOptionSet(null);
} else {
this.optionSet = optionSet;
}

this.currentKeyspace = currentKeyspace;
this.cSession = cSession;
this.metadata = cSession.getMetadata();
this.defaultConsistencyLevel = defaultConsistencyLevel;
this.debugMode = debugMode;
final List<TypeCodec<?>> codecs = new ArrayList<>();
codecs.add(new TimestampToLongCodec());
codecs.add(new LongToIntCodec());
codecs.add(new IntToLongCodec());
codecs.add(new BigintToBigDecimalCodec());
codecs.add(new DecimalToDoubleCodec());
codecs.add(new FloatToDoubleCodec());
codecs.add(new VarintToIntCodec());
codecs.add(new SmallintToIntCodec());
codecs.add(new TinyintToIntCodec());

codecs.forEach(codec -> ((DefaultCodecRegistry) cSession.getContext().getCodecRegistry()).register(codec));
}

/**
* Checks whether the connection is closed.
*
Expand All @@ -187,7 +243,9 @@ public void clearWarnings() throws SQLException {

@Override
public void close() throws SQLException {
this.sessionHolder.release();
if (sessionHolder != null) {
this.sessionHolder.release();
}
this.isClosed = true;
}

Expand Down Expand Up @@ -240,20 +298,7 @@ public void setAutoCommit(final boolean autoCommit) throws SQLException {
@Override
public String getCatalog() throws SQLException {
checkNotClosed();

// It requires a query to table system.local since DataStax driver 4+.
// If the query fails, return null.
try (final Statement stmt = createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT cluster_name FROM system.local");
if (rs.next()) {
return rs.getString("cluster_name");
}
} catch (final SQLException e) {
LOG.warn("Unable to retrieve the cluster name.", e);
return null;
}

return null;
return optionSet.getCatalog();
}

@Override
Expand Down Expand Up @@ -504,4 +549,25 @@ public <T> T unwrap(final Class<T> iface) throws SQLException {
throw new SQLFeatureNotSupportedException(String.format(NO_INTERFACE, iface.getSimpleName()));
}


public OptionSet getOptionSet() {
return optionSet;
}

private OptionSet lookupOptionSet(final String property) {
final ServiceLoader<OptionSet> loader = ServiceLoader
.load(OptionSet.class);
final Iterator<OptionSet> iterator = loader.iterator();
while (iterator.hasNext()) {
final OptionSet optionSet = iterator.next();
if (optionSet.getClass().getSimpleName().equalsIgnoreCase(property)) {
optionSet.setConnection(this);
return optionSet;
}
}
final OptionSet optionSet = new Default();
optionSet.setConnection(this);
return optionSet;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,7 @@ public ResultSet executeQuery(final String cql) throws SQLException {
public int executeUpdate(final String cql) throws SQLException {
checkNotClosed();
doExecute(cql);
// There is no updateCount available in Datastax Java driver, so return 0.
return 0;
return connection.getOptionSet().getSQLUpdateResponse();
}

@Override
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/ing/data/cassandra/jdbc/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ public final class Utils {
* JDBC URL parameter key for the configuration file.
*/
public static final String KEY_CONFIG_FILE = "configfile";
/**
* JDBC URL parameter key for the compliance mode.
*/
public static final String KEY_COMPLIANCE_MODE = "compliancemode";

public static final String TAG_USER = "user";
public static final String TAG_PASSWORD = "password";
Expand All @@ -135,6 +139,7 @@ public final class Utils {
public static final String TAG_CLOUD_SECURE_CONNECT_BUNDLE = "secureConnectBundle";
public static final String TAG_CONFIG_FILE = "configFile";
public static final String TAG_REQUEST_TIMEOUT = "requestTimeout";
public static final String TAG_COMPLIANCE_MODE = "complianceMode";

public static final String JSSE_TRUSTSTORE_PROPERTY = "javax.net.ssl.trustStore";
public static final String JSSE_TRUSTSTORE_PASSWORD_PROPERTY = "javax.net.ssl.trustStorePassword";
Expand Down Expand Up @@ -306,6 +311,9 @@ public static Properties parseURL(final String url) throws SQLException {
if (params.containsKey(KEY_CONFIG_FILE)) {
props.setProperty(TAG_CONFIG_FILE, params.get(KEY_CONFIG_FILE));
}
if (params.containsKey(KEY_COMPLIANCE_MODE)) {
props.setProperty(TAG_COMPLIANCE_MODE, params.get(KEY_COMPLIANCE_MODE));
}
} else if (isDbaasConnection) {
throw new SQLNonTransientConnectionException(SECURECONENCTBUNDLE_REQUIRED);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ing.data.cassandra.jdbc.optionset;

import com.ing.data.cassandra.jdbc.CassandraConnection;

/**
* Abstract option set to set common parameter used by option sets.
*/
public abstract class AbstractOptionSet implements OptionSet {

private CassandraConnection connection;

@Override
public CassandraConnection getConnection() {
return connection;
}

public void setConnection(final CassandraConnection connection) {
this.connection = connection;
}
}
53 changes: 53 additions & 0 deletions src/main/java/com/ing/data/cassandra/jdbc/optionset/Default.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ing.data.cassandra.jdbc.optionset;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

/**
* Default Option set.
*/

public class Default extends AbstractOptionSet {
private static final Logger LOG = LoggerFactory.getLogger(AbstractOptionSet.class);

@Override
public String getCatalog() {
// It requires a query to table system.local since DataStax driver 4+.
// If the query fails, return null.
try (final Statement stmt = getConnection().createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT cluster_name FROM system.local");
if (rs.next()) {
return rs.getString("cluster_name");
}
} catch (final SQLException e) {
LOG.warn("Unable to retrieve the cluster name.", e);
return null;
}

return null;
}

@Override
public int getSQLUpdateResponse() {
return 0;
}
}
33 changes: 33 additions & 0 deletions src/main/java/com/ing/data/cassandra/jdbc/optionset/Liquibase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ing.data.cassandra.jdbc.optionset;

/**
* Option Set for liquibase compatability and flavour of JDBC.
*/
public class Liquibase extends AbstractOptionSet {


@Override
public String getCatalog() {
return null;
}

@Override
public int getSQLUpdateResponse() {
return -1;
}
}
53 changes: 53 additions & 0 deletions src/main/java/com/ing/data/cassandra/jdbc/optionset/OptionSet.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ing.data.cassandra.jdbc.optionset;

import com.ing.data.cassandra.jdbc.CassandraConnection;

/**
* Option Set for compliance mode.
* Different use cases require one or more adjustments to the wrapper, to be compatible.
* Thus, OptionSet would provide convenience to set for different flavours.
*
*/
public interface OptionSet {
/**
* There is no Catalog concept in cassandra. Different flavour requires different response.
*
* @return Catalog
*/
String getCatalog();

/**
* There is no updateCount available in Datastax Java driver, different flavour requires different response.
*
* @return Predefined update response
*/
int getSQLUpdateResponse();

/**
* Set referenced connection. See @{@link AbstractOptionSet}
* @param connection Connection to set
*/
void setConnection(CassandraConnection connection);

/**
* Get referenced connection. See @{@link AbstractOptionSet}
*
* @return referenced connection
*/
CassandraConnection getConnection();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
com.ing.data.cassandra.jdbc.optionset.Liquibase
com.ing.data.cassandra.jdbc.optionset.Default
Loading