Skip to content

Changes for session and schema #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 66 additions & 16 deletions src/main/java/com/ing/data/cassandra/jdbc/CassandraConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,26 @@
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry;
import com.google.common.collect.Maps;
import com.ing.data.cassandra.jdbc.codec.BigintToBigDecimalCodec;
import com.ing.data.cassandra.jdbc.codec.DecimalToDoubleCodec;
import com.ing.data.cassandra.jdbc.codec.FloatToDoubleCodec;
import com.ing.data.cassandra.jdbc.codec.IntToLongCodec;
import com.ing.data.cassandra.jdbc.codec.LongToIntCodec;
import com.ing.data.cassandra.jdbc.codec.SmallintToIntCodec;
import com.ing.data.cassandra.jdbc.codec.TimestampToLongCodec;
import com.ing.data.cassandra.jdbc.codec.TinyintToIntCodec;
import com.ing.data.cassandra.jdbc.codec.VarintToIntCodec;
import com.ing.data.cassandra.jdbc.optionset.Default;
import com.ing.data.cassandra.jdbc.optionset.OptionSet;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLNonTransientConnectionException;
Expand All @@ -38,9 +50,11 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
Expand All @@ -55,6 +69,7 @@
import static com.ing.data.cassandra.jdbc.Utils.NO_TRANSACTIONS;
import static com.ing.data.cassandra.jdbc.Utils.PROTOCOL;
import static com.ing.data.cassandra.jdbc.Utils.TAG_ACTIVE_CQL_VERSION;
import static com.ing.data.cassandra.jdbc.Utils.TAG_COMPLIANCE_MODE;
import static com.ing.data.cassandra.jdbc.Utils.TAG_CONSISTENCY_LEVEL;
import static com.ing.data.cassandra.jdbc.Utils.TAG_CQL_VERSION;
import static com.ing.data.cassandra.jdbc.Utils.TAG_DATABASE_NAME;
Expand Down Expand Up @@ -117,6 +132,7 @@ public class CassandraConnection extends AbstractConnection implements Connectio
private final boolean debugMode;
private Properties clientInfo;
private volatile boolean isClosed;
private OptionSet optionSet;

/**
* Instantiates a new JDBC connection to a Cassandra cluster.
Expand All @@ -134,6 +150,7 @@ public CassandraConnection(final SessionHolder sessionHolder) throws SQLExceptio
this.clientInfo = new Properties();
this.url = PROTOCOL.concat(createSubName(sessionProperties));
this.currentKeyspace = sessionProperties.getProperty(TAG_DATABASE_NAME);
this.optionSet = lookupOptionSet(sessionProperties.getProperty(TAG_COMPLIANCE_MODE));
this.username = sessionProperties.getProperty(TAG_USER,
defaultConfigProfile.getString(DefaultDriverOption.AUTH_PROVIDER_USER_NAME, StringUtils.EMPTY));
final String cqlVersion = sessionProperties.getProperty(TAG_CQL_VERSION, DEFAULT_CQL_VERSION);
Expand Down Expand Up @@ -166,6 +183,29 @@ public CassandraConnection(final SessionHolder sessionHolder) throws SQLExceptio
});
}

public CassandraConnection(Session cSession, String currentKeyspace, ConsistencyLevel defaultConsistencyLevel, boolean debugMode) {
this.sessionHolder = null;
this.connectionProperties = new Properties();
this.optionSet = optionSet == null ? lookupOptionSet(null) : optionSet;
this.currentKeyspace = currentKeyspace;
this.cSession = cSession;
this.metadata = cSession.getMetadata();
this.defaultConsistencyLevel = defaultConsistencyLevel;
this.debugMode = debugMode;
final List<TypeCodec<?>> codecs = new ArrayList<>();
codecs.add(new TimestampToLongCodec());
codecs.add(new LongToIntCodec());
codecs.add(new IntToLongCodec());
codecs.add(new BigintToBigDecimalCodec());
codecs.add(new DecimalToDoubleCodec());
codecs.add(new FloatToDoubleCodec());
codecs.add(new VarintToIntCodec());
codecs.add(new SmallintToIntCodec());
codecs.add(new TinyintToIntCodec());

codecs.forEach(codec -> ((DefaultCodecRegistry) cSession.getContext().getCodecRegistry()).register(codec));
}

/**
* Checks whether the connection is closed.
*
Expand All @@ -187,7 +227,9 @@ public void clearWarnings() throws SQLException {

@Override
public void close() throws SQLException {
this.sessionHolder.release();
if (sessionHolder!= null) {
this.sessionHolder.release();
}
this.isClosed = true;
}

Expand Down Expand Up @@ -240,20 +282,7 @@ public void setAutoCommit(final boolean autoCommit) throws SQLException {
@Override
public String getCatalog() throws SQLException {
checkNotClosed();

// It requires a query to table system.local since DataStax driver 4+.
// If the query fails, return null.
try (final Statement stmt = createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT cluster_name FROM system.local");
if (rs.next()) {
return rs.getString("cluster_name");
}
} catch (final SQLException e) {
LOG.warn("Unable to retrieve the cluster name.", e);
return null;
}

return null;
return optionSet.getCatalog();
}

@Override
Expand Down Expand Up @@ -504,4 +533,25 @@ public <T> T unwrap(final Class<T> iface) throws SQLException {
throw new SQLFeatureNotSupportedException(String.format(NO_INTERFACE, iface.getSimpleName()));
}


public OptionSet getOptionSet() {
return optionSet;
}

private OptionSet lookupOptionSet(String property) {
ServiceLoader<OptionSet> loader = ServiceLoader
.load(OptionSet.class);
Iterator<OptionSet> iterator = loader.iterator();
while (iterator.hasNext()) {
OptionSet optionSet = iterator.next();
if (optionSet.getClass().getSimpleName().equalsIgnoreCase(property)) {
optionSet.setConnection(this);
return optionSet;
}
}
OptionSet optionSet = new Default();
optionSet.setConnection(this);
return optionSet;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,7 @@ public ResultSet executeQuery(final String cql) throws SQLException {
public int executeUpdate(final String cql) throws SQLException {
checkNotClosed();
doExecute(cql);
// There is no updateCount available in Datastax Java driver, so return 0.
return 0;
return connection.getOptionSet().getSQLUpdateResponse();
}

@Override
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/ing/data/cassandra/jdbc/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public final class Utils {
public static final String TAG_CLOUD_SECURE_CONNECT_BUNDLE = "secureConnectBundle";
public static final String TAG_CONFIG_FILE = "configFile";
public static final String TAG_REQUEST_TIMEOUT = "requestTimeout";
public static final String TAG_COMPLIANCE_MODE = "complianceMode";

public static final String JSSE_TRUSTSTORE_PROPERTY = "javax.net.ssl.trustStore";
public static final String JSSE_TRUSTSTORE_PASSWORD_PROPERTY = "javax.net.ssl.trustStorePassword";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.ing.data.cassandra.jdbc.optionset;

import com.ing.data.cassandra.jdbc.CassandraConnection;

public abstract class AbstractOptionSet implements OptionSet {

private CassandraConnection connection;

@Override
public CassandraConnection getConnection() {
return connection;
}

public void setConnection(CassandraConnection connection) {
this.connection = connection;
}
}
35 changes: 35 additions & 0 deletions src/main/java/com/ing/data/cassandra/jdbc/optionset/Default.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.ing.data.cassandra.jdbc.optionset;

import com.ing.data.cassandra.jdbc.CassandraConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class Default extends AbstractOptionSet {
private static final Logger LOG = LoggerFactory.getLogger(AbstractOptionSet.class);

@Override
public String getCatalog() {
// It requires a query to table system.local since DataStax driver 4+.
// If the query fails, return null.
try (final Statement stmt = getConnection().createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT cluster_name FROM system.local");
if (rs.next()) {
return rs.getString("cluster_name");
}
} catch (final SQLException e) {
LOG.warn("Unable to retrieve the cluster name.", e);
return null;
}

return null;
}

@Override
public int getSQLUpdateResponse() {
return 0;
}
}
14 changes: 14 additions & 0 deletions src/main/java/com/ing/data/cassandra/jdbc/optionset/Liquibase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.ing.data.cassandra.jdbc.optionset;

public class Liquibase extends AbstractOptionSet {

@Override
public String getCatalog() {
return null;
}

@Override
public int getSQLUpdateResponse() {
return -1;
}
}
32 changes: 32 additions & 0 deletions src/main/java/com/ing/data/cassandra/jdbc/optionset/OptionSet.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.ing.data.cassandra.jdbc.optionset;

import com.ing.data.cassandra.jdbc.CassandraConnection;

/**
* Option Set for compliance mode.
* Different use cases require one or more adjustments to the wrapper, to be compatible.
* Thus, OptionSet would provide convenience to set for different flavours.
*
*/
public interface OptionSet {
/**
* There is no Catalog concept in cassandra. Different flavour requires different response.
*
* @return
*/
String getCatalog();

/**
* There is no updateCount available in Datastax Java driver, different flavour requires different response.
*
* @return
*/
int getSQLUpdateResponse();

/**
* Referenced connection. See @{@link AbstractOptionSet}
* @param connection
*/
void setConnection(CassandraConnection connection);
CassandraConnection getConnection();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
com.ing.data.cassandra.jdbc.optionset.Liquibase
com.ing.data.cassandra.jdbc.optionset.Default