22
22
import com .datastax .oss .driver .api .core .config .DriverExecutionProfile ;
23
23
import com .datastax .oss .driver .api .core .metadata .Metadata ;
24
24
import com .datastax .oss .driver .api .core .session .Session ;
25
+ import com .datastax .oss .driver .api .core .type .codec .TypeCodec ;
26
+ import com .datastax .oss .driver .internal .core .type .codec .registry .DefaultCodecRegistry ;
25
27
import com .google .common .collect .Maps ;
28
+ import com .ing .data .cassandra .jdbc .codec .BigintToBigDecimalCodec ;
29
+ import com .ing .data .cassandra .jdbc .codec .DecimalToDoubleCodec ;
30
+ import com .ing .data .cassandra .jdbc .codec .FloatToDoubleCodec ;
31
+ import com .ing .data .cassandra .jdbc .codec .IntToLongCodec ;
32
+ import com .ing .data .cassandra .jdbc .codec .LongToIntCodec ;
33
+ import com .ing .data .cassandra .jdbc .codec .SmallintToIntCodec ;
34
+ import com .ing .data .cassandra .jdbc .codec .TimestampToLongCodec ;
35
+ import com .ing .data .cassandra .jdbc .codec .TinyintToIntCodec ;
36
+ import com .ing .data .cassandra .jdbc .codec .VarintToIntCodec ;
37
+ import com .ing .data .cassandra .jdbc .optionset .Default ;
38
+ import com .ing .data .cassandra .jdbc .optionset .OptionSet ;
26
39
import org .apache .commons .lang3 .StringUtils ;
27
40
import org .slf4j .Logger ;
28
41
import org .slf4j .LoggerFactory ;
29
42
30
43
import java .sql .Connection ;
31
44
import java .sql .DatabaseMetaData ;
32
- import java .sql .ResultSet ;
33
45
import java .sql .SQLException ;
34
46
import java .sql .SQLFeatureNotSupportedException ;
35
47
import java .sql .SQLNonTransientConnectionException ;
38
50
import java .sql .Statement ;
39
51
import java .util .ArrayList ;
40
52
import java .util .HashMap ;
53
+ import java .util .Iterator ;
41
54
import java .util .List ;
42
55
import java .util .Map ;
43
56
import java .util .Properties ;
57
+ import java .util .ServiceLoader ;
44
58
import java .util .Set ;
45
59
import java .util .concurrent .ConcurrentMap ;
46
60
import java .util .concurrent .ConcurrentSkipListSet ;
55
69
import static com .ing .data .cassandra .jdbc .Utils .NO_TRANSACTIONS ;
56
70
import static com .ing .data .cassandra .jdbc .Utils .PROTOCOL ;
57
71
import static com .ing .data .cassandra .jdbc .Utils .TAG_ACTIVE_CQL_VERSION ;
72
+ import static com .ing .data .cassandra .jdbc .Utils .TAG_COMPLIANCE_MODE ;
58
73
import static com .ing .data .cassandra .jdbc .Utils .TAG_CONSISTENCY_LEVEL ;
59
74
import static com .ing .data .cassandra .jdbc .Utils .TAG_CQL_VERSION ;
60
75
import static com .ing .data .cassandra .jdbc .Utils .TAG_DATABASE_NAME ;
@@ -117,6 +132,7 @@ public class CassandraConnection extends AbstractConnection implements Connectio
117
132
private final boolean debugMode ;
118
133
private Properties clientInfo ;
119
134
private volatile boolean isClosed ;
135
+ private final OptionSet optionSet ;
120
136
121
137
/**
122
138
* Instantiates a new JDBC connection to a Cassandra cluster.
@@ -134,6 +150,7 @@ public CassandraConnection(final SessionHolder sessionHolder) throws SQLExceptio
134
150
this .clientInfo = new Properties ();
135
151
this .url = PROTOCOL .concat (createSubName (sessionProperties ));
136
152
this .currentKeyspace = sessionProperties .getProperty (TAG_DATABASE_NAME );
153
+ this .optionSet = lookupOptionSet (sessionProperties .getProperty (TAG_COMPLIANCE_MODE ));
137
154
this .username = sessionProperties .getProperty (TAG_USER ,
138
155
defaultConfigProfile .getString (DefaultDriverOption .AUTH_PROVIDER_USER_NAME , StringUtils .EMPTY ));
139
156
final String cqlVersion = sessionProperties .getProperty (TAG_CQL_VERSION , DEFAULT_CQL_VERSION );
@@ -166,6 +183,45 @@ public CassandraConnection(final SessionHolder sessionHolder) throws SQLExceptio
166
183
});
167
184
}
168
185
186
+ /**
187
+ * Instantiates a new JDBC connection to a Cassandra cluster using preexisting session.
188
+ * @param cSession Session to use
189
+ * @param currentKeyspace Keyspace to use
190
+ * @param defaultConsistencyLevel Consistency level
191
+ * @param debugMode Debug mode flag
192
+ * @param optionSet Compliance mode option set
193
+ */
194
+ public CassandraConnection (final Session cSession , final String currentKeyspace ,
195
+ final ConsistencyLevel defaultConsistencyLevel ,
196
+ final boolean debugMode , final OptionSet optionSet ) {
197
+ this .sessionHolder = null ;
198
+ this .connectionProperties = new Properties ();
199
+
200
+ if (optionSet == null ) {
201
+ this .optionSet = lookupOptionSet (null );
202
+ } else {
203
+ this .optionSet = optionSet ;
204
+ }
205
+
206
+ this .currentKeyspace = currentKeyspace ;
207
+ this .cSession = cSession ;
208
+ this .metadata = cSession .getMetadata ();
209
+ this .defaultConsistencyLevel = defaultConsistencyLevel ;
210
+ this .debugMode = debugMode ;
211
+ final List <TypeCodec <?>> codecs = new ArrayList <>();
212
+ codecs .add (new TimestampToLongCodec ());
213
+ codecs .add (new LongToIntCodec ());
214
+ codecs .add (new IntToLongCodec ());
215
+ codecs .add (new BigintToBigDecimalCodec ());
216
+ codecs .add (new DecimalToDoubleCodec ());
217
+ codecs .add (new FloatToDoubleCodec ());
218
+ codecs .add (new VarintToIntCodec ());
219
+ codecs .add (new SmallintToIntCodec ());
220
+ codecs .add (new TinyintToIntCodec ());
221
+
222
+ codecs .forEach (codec -> ((DefaultCodecRegistry ) cSession .getContext ().getCodecRegistry ()).register (codec ));
223
+ }
224
+
169
225
/**
170
226
* Checks whether the connection is closed.
171
227
*
@@ -187,7 +243,9 @@ public void clearWarnings() throws SQLException {
187
243
188
244
@ Override
189
245
public void close () throws SQLException {
190
- this .sessionHolder .release ();
246
+ if (sessionHolder != null ) {
247
+ this .sessionHolder .release ();
248
+ }
191
249
this .isClosed = true ;
192
250
}
193
251
@@ -240,20 +298,7 @@ public void setAutoCommit(final boolean autoCommit) throws SQLException {
240
298
@ Override
241
299
public String getCatalog () throws SQLException {
242
300
checkNotClosed ();
243
-
244
- // It requires a query to table system.local since DataStax driver 4+.
245
- // If the query fails, return null.
246
- try (final Statement stmt = createStatement ()) {
247
- final ResultSet rs = stmt .executeQuery ("SELECT cluster_name FROM system.local" );
248
- if (rs .next ()) {
249
- return rs .getString ("cluster_name" );
250
- }
251
- } catch (final SQLException e ) {
252
- LOG .warn ("Unable to retrieve the cluster name." , e );
253
- return null ;
254
- }
255
-
256
- return null ;
301
+ return optionSet .getCatalog ();
257
302
}
258
303
259
304
@ Override
@@ -504,4 +549,25 @@ public <T> T unwrap(final Class<T> iface) throws SQLException {
504
549
throw new SQLFeatureNotSupportedException (String .format (NO_INTERFACE , iface .getSimpleName ()));
505
550
}
506
551
552
+
553
+ public OptionSet getOptionSet () {
554
+ return optionSet ;
555
+ }
556
+
557
+ private OptionSet lookupOptionSet (final String property ) {
558
+ final ServiceLoader <OptionSet > loader = ServiceLoader
559
+ .load (OptionSet .class );
560
+ final Iterator <OptionSet > iterator = loader .iterator ();
561
+ while (iterator .hasNext ()) {
562
+ final OptionSet optionSet = iterator .next ();
563
+ if (optionSet .getClass ().getSimpleName ().equalsIgnoreCase (property )) {
564
+ optionSet .setConnection (this );
565
+ return optionSet ;
566
+ }
567
+ }
568
+ final OptionSet optionSet = new Default ();
569
+ optionSet .setConnection (this );
570
+ return optionSet ;
571
+ }
572
+
507
573
}
0 commit comments