@@ -13,7 +13,6 @@ use std::collections::BTreeMap;
13
13
use std:: convert:: { self , TryInto } ;
14
14
use std:: sync:: { Arc , Mutex } ;
15
15
16
- use anyhow:: bail;
17
16
use rdkafka:: client:: ClientContext ;
18
17
use rdkafka:: consumer:: { BaseConsumer , Consumer , ConsumerContext } ;
19
18
use rdkafka:: { Offset , TopicPartitionList } ;
@@ -29,6 +28,7 @@ use mz_storage::client::connections::{
29
28
} ;
30
29
31
30
use crate :: normalize:: SqlValueOrSecret ;
31
+ use crate :: plan:: PlanError ;
32
32
33
33
enum ValType {
34
34
String { transform : fn ( String ) -> String } ,
@@ -96,7 +96,7 @@ impl Config {
96
96
fn extract (
97
97
input : & mut BTreeMap < String , SqlValueOrSecret > ,
98
98
configs : & [ Config ] ,
99
- ) -> Result < BTreeMap < String , StringOrSecret > , anyhow :: Error > {
99
+ ) -> Result < BTreeMap < String , StringOrSecret > , PlanError > {
100
100
let mut out = BTreeMap :: new ( ) ;
101
101
for config in configs {
102
102
// Look for config.name
@@ -109,7 +109,7 @@ fn extract(
109
109
Ok ( parsed_n) if * lower <= parsed_n && parsed_n <= * upper => {
110
110
StringOrSecret :: String ( n. to_string ( ) )
111
111
}
112
- _ => bail ! ( "must be a number between {} and {}" , lower, upper) ,
112
+ _ => sql_bail ! ( "must be a number between {} and {}" , lower, upper) ,
113
113
}
114
114
}
115
115
( Some ( SqlValueOrSecret :: Value ( Value :: String ( s) ) ) , ValType :: String { transform } ) => {
@@ -128,14 +128,14 @@ fn extract(
128
128
None => continue ,
129
129
} ,
130
130
( Some ( SqlValueOrSecret :: Value ( v) ) , _) => {
131
- bail ! (
131
+ sql_bail ! (
132
132
"Invalid WITH option {}={}: unexpected value type" ,
133
133
config. name,
134
134
v
135
135
) ;
136
136
}
137
137
( Some ( SqlValueOrSecret :: Secret ( _) ) , _) => {
138
- bail ! (
138
+ sql_bail ! (
139
139
"WITH option {} does not accept secret references" ,
140
140
config. name
141
141
) ;
@@ -158,7 +158,7 @@ fn extract(
158
158
/// `sql_parser::ast::Value::String`.
159
159
pub fn extract_config (
160
160
with_options : & mut BTreeMap < String , SqlValueOrSecret > ,
161
- ) -> anyhow :: Result < BTreeMap < String , StringOrSecret > > {
161
+ ) -> Result < BTreeMap < String , StringOrSecret > , PlanError > {
162
162
extract (
163
163
with_options,
164
164
& [
@@ -222,7 +222,7 @@ pub async fn create_consumer(
222
222
options : & BTreeMap < String , StringOrSecret > ,
223
223
librdkafka_log_level : tracing:: Level ,
224
224
secrets_reader : & SecretsReader ,
225
- ) -> Result < Arc < BaseConsumer < KafkaErrCheckContext > > , anyhow :: Error > {
225
+ ) -> Result < Arc < BaseConsumer < KafkaErrCheckContext > > , PlanError > {
226
226
let mut config = create_new_client_config ( librdkafka_log_level) ;
227
227
mz_storage:: client:: connections:: populate_client_config (
228
228
kafka_connection. clone ( ) ,
@@ -238,8 +238,11 @@ pub async fn create_consumer(
238
238
. get ( "bootstrap.servers" )
239
239
. expect ( "callers must have already set bootstrap.servers" ) ;
240
240
241
- let consumer: Arc < BaseConsumer < KafkaErrCheckContext > > =
242
- Arc :: new ( config. create_with_context ( KafkaErrCheckContext :: default ( ) ) ?) ;
241
+ let consumer: Arc < BaseConsumer < KafkaErrCheckContext > > = Arc :: new (
242
+ config
243
+ . create_with_context ( KafkaErrCheckContext :: default ( ) )
244
+ . map_err ( |e| sql_err ! ( "{}" , e) ) ?,
245
+ ) ;
243
246
let context = Arc :: clone ( & consumer. context ( ) ) ;
244
247
let owned_topic = String :: from ( topic) ;
245
248
// Wait for a metadata request for up to one second. This greatly
@@ -253,10 +256,11 @@ pub async fn create_consumer(
253
256
let _ = consumer. fetch_metadata ( Some ( & owned_topic) , Duration :: from_secs ( 1 ) ) ;
254
257
}
255
258
} )
256
- . await ?;
259
+ . await
260
+ . map_err ( |e| sql_err ! ( "{}" , e) ) ?;
257
261
let error = context. error . lock ( ) . expect ( "lock poisoned" ) ;
258
262
if let Some ( error) = & * error {
259
- bail ! ( "librdkafka: {}" , error)
263
+ sql_bail ! ( "librdkafka: {}" , error)
260
264
}
261
265
Ok ( consumer)
262
266
}
@@ -281,11 +285,11 @@ pub async fn lookup_start_offsets(
281
285
topic : & str ,
282
286
with_options : & BTreeMap < String , SqlValueOrSecret > ,
283
287
now : u64 ,
284
- ) -> Result < Option < Vec < i64 > > , anyhow :: Error > {
288
+ ) -> Result < Option < Vec < i64 > > , PlanError > {
285
289
let time_offset = match with_options. get ( "kafka_time_offset" ) . cloned ( ) {
286
290
None => return Ok ( None ) ,
287
291
Some ( _) if with_options. contains_key ( "start_offset" ) => {
288
- bail ! ( "`start_offset` and `kafka_time_offset` cannot be set at the same time." )
292
+ sql_bail ! ( "`start_offset` and `kafka_time_offset` cannot be set at the same time." )
289
293
}
290
294
Some ( offset) => offset,
291
295
} ;
@@ -298,15 +302,15 @@ pub async fn lookup_start_offsets(
298
302
let now: i64 = now. try_into ( ) ?;
299
303
let ts = now - ts. abs ( ) ;
300
304
if ts <= 0 {
301
- bail ! ( "Relative `kafka_time_offset` must be smaller than current system timestamp" )
305
+ sql_bail ! ( "Relative `kafka_time_offset` must be smaller than current system timestamp" )
302
306
}
303
307
ts
304
308
}
305
309
// Timestamp in millis (e.g. 1622659034343)
306
310
Ok ( ts) => ts,
307
- _ => bail ! ( "`kafka_time_offset` must be a number" ) ,
311
+ _ => sql_bail ! ( "`kafka_time_offset` must be a number" ) ,
308
312
} ,
309
- _ => bail ! ( "`kafka_time_offset` must be a number" ) ,
313
+ _ => sql_bail ! ( "`kafka_time_offset` must be a number" ) ,
310
314
} ;
311
315
312
316
// Lookup offsets
@@ -319,14 +323,18 @@ pub async fn lookup_start_offsets(
319
323
consumer. as_ref ( ) . client ( ) ,
320
324
& topic,
321
325
Duration :: from_secs ( 10 ) ,
322
- ) ?
326
+ )
327
+ . map_err ( |e| sql_err ! ( "{}" , e) ) ?
323
328
. len ( ) ;
324
329
325
330
let mut tpl = TopicPartitionList :: with_capacity ( 1 ) ;
326
331
tpl. add_partition_range ( & topic, 0 , num_partitions as i32 - 1 ) ;
327
- tpl. set_all_offsets ( Offset :: Offset ( time_offset) ) ?;
332
+ tpl. set_all_offsets ( Offset :: Offset ( time_offset) )
333
+ . map_err ( |e| sql_err ! ( "{}" , e) ) ?;
328
334
329
- let offsets_for_times = consumer. offsets_for_times ( tpl, Duration :: from_secs ( 10 ) ) ?;
335
+ let offsets_for_times = consumer
336
+ . offsets_for_times ( tpl, Duration :: from_secs ( 10 ) )
337
+ . map_err ( |e| sql_err ! ( "{}" , e) ) ?;
330
338
331
339
// Translate to `start_offsets`
332
340
let start_offsets = offsets_for_times
@@ -335,7 +343,7 @@ pub async fn lookup_start_offsets(
335
343
. map ( |elem| match elem. offset ( ) {
336
344
Offset :: Offset ( offset) => Ok ( offset) ,
337
345
Offset :: End => fetch_end_offset ( & consumer, & topic, elem. partition ( ) ) ,
338
- _ => bail ! (
346
+ _ => sql_bail ! (
339
347
"Unexpected offset {:?} for partition {}" ,
340
348
elem. offset( ) ,
341
349
elem. partition( )
@@ -344,7 +352,7 @@ pub async fn lookup_start_offsets(
344
352
. collect :: < Result < Vec < _ > , _ > > ( ) ?;
345
353
346
354
if start_offsets. len ( ) != num_partitions {
347
- bail ! (
355
+ sql_bail ! (
348
356
"Expected offsets for {} partitions, but received {}" ,
349
357
num_partitions,
350
358
start_offsets. len( ) ,
@@ -354,7 +362,8 @@ pub async fn lookup_start_offsets(
354
362
Ok ( Some ( start_offsets) )
355
363
}
356
364
} )
357
- . await ?
365
+ . await
366
+ . map_err ( |e| sql_err ! ( "{}" , e) ) ?
358
367
}
359
368
360
369
// Kafka supports bulk lookup of watermarks, but it is not exposed in rdkafka.
@@ -365,8 +374,10 @@ fn fetch_end_offset(
365
374
consumer : & BaseConsumer < KafkaErrCheckContext > ,
366
375
topic : & str ,
367
376
pid : i32 ,
368
- ) -> Result < i64 , anyhow:: Error > {
369
- let ( _low, high) = consumer. fetch_watermarks ( topic, pid, Duration :: from_secs ( 10 ) ) ?;
377
+ ) -> Result < i64 , PlanError > {
378
+ let ( _low, high) = consumer
379
+ . fetch_watermarks ( topic, pid, Duration :: from_secs ( 10 ) )
380
+ . map_err ( |e| sql_err ! ( "{}" , e) ) ?;
370
381
Ok ( high)
371
382
}
372
383
@@ -410,7 +421,7 @@ impl ClientContext for KafkaErrCheckContext {
410
421
pub fn generate_ccsr_connection (
411
422
url : Url ,
412
423
ccsr_options : & mut BTreeMap < String , SqlValueOrSecret > ,
413
- ) -> Result < CsrConnection , anyhow :: Error > {
424
+ ) -> Result < CsrConnection , PlanError > {
414
425
let mut ccsr_options = extract (
415
426
ccsr_options,
416
427
& [
@@ -435,7 +446,7 @@ pub fn generate_ccsr_connection(
435
446
let key = key. unwrap_secret ( ) ;
436
447
Some ( CsrConnectionTlsIdentity { cert, key } )
437
448
}
438
- _ => bail ! (
449
+ _ => sql_bail ! (
439
450
"Reading from SSL-auth Confluent Schema Registry \
440
451
requires both ssl.key.pem and ssl.certificate.pem"
441
452
) ,
0 commit comments