Skip to content

Commit 6eadbde

Browse files
chenzl25Li0k
andauthored
feat(iceberg): refine iceberg table column datatype and name. (#19690)
Co-authored-by: Li0k <[email protected]>
1 parent a2e53a5 commit 6eadbde

File tree

15 files changed

+279
-94
lines changed

15 files changed

+279
-94
lines changed

e2e_test/batch/catalog/pg_cast.slt.part

Lines changed: 62 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -24,67 +24,68 @@ SELECT * FROM pg_catalog.pg_cast;
2424
20 20 701 i
2525
21 20 1700 i
2626
22 20 1043 a
27-
23 20 1301 i
28-
24 700 21 a
29-
25 700 23 a
30-
26 700 20 a
31-
27 700 701 i
32-
28 700 1700 a
33-
29 700 1043 a
34-
30 701 21 a
35-
31 701 23 a
36-
32 701 20 a
37-
33 701 700 a
38-
34 701 1700 a
39-
35 701 1043 a
40-
36 1700 21 a
41-
37 1700 23 a
42-
38 1700 20 a
43-
39 1700 700 i
44-
40 1700 701 i
45-
41 1700 1043 a
46-
42 1082 1043 a
47-
43 1082 1114 i
48-
44 1082 1184 i
49-
45 1043 16 e
50-
46 1043 21 e
51-
47 1043 23 e
52-
48 1043 20 e
53-
49 1043 700 e
54-
50 1043 701 e
55-
51 1043 1700 e
56-
52 1043 1082 e
57-
53 1043 1083 e
58-
54 1043 1114 e
59-
55 1043 1184 e
60-
56 1043 1186 e
61-
57 1043 17 e
62-
58 1043 3802 e
63-
59 1043 1301 e
64-
60 1083 1043 a
65-
61 1083 1186 i
66-
62 1114 1082 a
67-
63 1114 1043 a
68-
64 1114 1083 a
69-
65 1114 1184 i
70-
66 1184 1082 a
71-
67 1184 1043 a
72-
68 1184 1083 a
73-
69 1184 1114 a
74-
70 1186 1043 a
75-
71 1186 1083 a
76-
72 17 1043 a
77-
73 3802 16 e
78-
74 3802 21 e
79-
75 3802 23 e
80-
76 3802 20 e
81-
77 3802 700 e
82-
78 3802 701 e
83-
79 3802 1700 e
84-
80 3802 1043 a
85-
81 20 20 e
86-
82 1301 701 e
87-
83 1301 1043 a
27+
23 20 20 e
28+
24 20 1301 i
29+
25 700 21 a
30+
26 700 23 a
31+
27 700 20 a
32+
28 700 701 i
33+
29 700 1700 a
34+
30 700 1043 a
35+
31 701 21 a
36+
32 701 23 a
37+
33 701 20 a
38+
34 701 700 a
39+
35 701 1700 a
40+
36 701 1043 a
41+
37 1700 21 a
42+
38 1700 23 a
43+
39 1700 20 a
44+
40 1700 700 i
45+
41 1700 701 i
46+
42 1700 1043 a
47+
43 1082 1043 a
48+
44 1082 1114 i
49+
45 1082 1184 i
50+
46 1043 16 e
51+
47 1043 21 e
52+
48 1043 23 e
53+
49 1043 20 e
54+
50 1043 700 e
55+
51 1043 701 e
56+
52 1043 1700 e
57+
53 1043 1082 e
58+
54 1043 1083 e
59+
55 1043 1114 e
60+
56 1043 1184 e
61+
57 1043 1186 e
62+
58 1043 17 e
63+
59 1043 3802 e
64+
60 1043 1301 e
65+
61 1083 1043 a
66+
62 1083 1186 i
67+
63 1114 1082 a
68+
64 1114 1043 a
69+
65 1114 1083 a
70+
66 1114 1184 i
71+
67 1184 1082 a
72+
68 1184 1043 a
73+
69 1184 1083 a
74+
70 1184 1114 a
75+
71 1186 1043 a
76+
72 1186 1083 a
77+
73 17 1043 a
78+
74 3802 16 e
79+
75 3802 21 e
80+
76 3802 23 e
81+
77 3802 20 e
82+
78 3802 700 e
83+
79 3802 701 e
84+
80 3802 1700 e
85+
81 3802 1043 a
86+
82 20 20 e
87+
83 1301 701 e
88+
84 1301 1043 a
8889

8990
query TT rowsort
9091
SELECT s.typname, t.typname

e2e_test/iceberg/test_case/iceberg_engine.slt

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,63 @@ statement ok
22
set sink_decouple = false;
33

44
statement ok
5-
create table t(id int primary key, xxname varchar) engine = iceberg;
5+
create table t(id int primary key, name varchar) engine = iceberg;
66

77
statement ok
88
insert into t values(1, 'xxx');
99

1010
statement ok
1111
FLUSH;
1212

13-
sleep 5s
14-
1513
query ??
1614
select * from t;
1715
----
1816
1 xxx
1917

2018
statement ok
2119
DROP TABLE t;
20+
21+
statement ok
22+
CREATE TABLE full_type_t (
23+
id bigint primary key,
24+
v_small_int smallint,
25+
v_int int,
26+
v_long bigint,
27+
v_float real,
28+
v_double double,
29+
v_varchar varchar,
30+
v_bool boolean,
31+
v_date date,
32+
v_timestamp timestamptz,
33+
v_ts_ntz timestamp,
34+
v_decimal decimal,
35+
v_map map(int, int),
36+
v_array int[],
37+
v_struct struct<a int,b int>,
38+
v_json jsonb,
39+
v_one_layer_struct struct<id bigint, v_small_int smallint, v_int int, v_long bigint, v_float real, v_double double, v_varchar varchar, v_bool boolean, v_date date, v_timestamp timestamptz, v_ts_ntz timestamp, v_decimal decimal, v_json jsonb>
40+
) engine = iceberg;
41+
42+
statement ok
43+
INSERT INTO full_type_t VALUES
44+
(1, 1, 1, 1000, 1.1, 1.11, '1-1', true, '2022-03-11', '2022-03-11 01:00:00Z'::timestamptz, '2022-03-11 01:00:00',1.11, map {1:100,2:200}, array[1,2,3], row(1,2), '{"a":"foo", "b":"bar"}', row(1, 1, 1, 1000, 1.1, 1.11, '1-1', true, '2022-03-11', '2022-03-11 01:00:00Z'::timestamptz, '2022-03-11 01:00:00',1.11, '{"a":"foo", "b":"bar"}')),
45+
(2, 2, 2, 2000, 2.2, 2.22, '2-2', false, '2022-03-12', '2022-03-12 02:00:00Z'::timestamptz, '2022-03-12 02:00:00',2.22, map {3:300}, array[1,null,3], row(3,null), '{"k2":[2,true,4]}', row(2, 2, 2, 2000, 2.2, 2.22, '2-2', false, '2022-03-12', '2022-03-12 02:00:00Z'::timestamptz, '2022-03-12 02:00:00',2.22, '{"k2":[2,true,4]}')),
46+
(3, 3, 3, 3000, 3.3, 3.33, '3-3', true, '2022-03-13', '2022-03-13 03:00:00Z'::timestamptz, '2022-03-13 03:00:00','inf', null, null, null, '1', row(3, 3, 3, 3000, 3.3, 3.33, '3-3', true, '2022-03-13', '2022-03-13 03:00:00Z'::timestamptz, '2022-03-13 03:00:00','inf', '1')),
47+
(4, 4, 4, 4000, 4.4, 4.44, '4-4', false, '2022-03-14', '2022-03-14 04:00:00Z'::timestamptz, '2022-03-14 04:00:00','-inf', null, null, null, 'true', row(4, 4, 4, 4000, 4.4, 4.44, '4-4', false, '2022-03-14', '2022-03-14 04:00:00Z'::timestamptz, '2022-03-14 04:00:00','-inf', 'true'));
48+
49+
statement ok
50+
FLUSH;
51+
52+
query ???????????????? rowsort
53+
select * from full_type_t
54+
----
55+
1 1 1 1000 1.1 1.11 1-1 t 2022-03-11 2022-03-11 01:00:00+00:00 2022-03-11 01:00:00 1.1100000000 {1:100,2:200} {1,2,3} (1,2) {"a": "foo", "b": "bar"} (1,1,1,1000,1.1,1.11,1-1,t,2022-03-11,"2022-03-11 01:00:00+00:00","2022-03-11 01:00:00",1.1100000000,"{""a"": ""foo"", ""b"": ""bar""}")
56+
2 2 2 2000 2.2 2.22 2-2 f 2022-03-12 2022-03-12 02:00:00+00:00 2022-03-12 02:00:00 2.2200000000 {3:300} {1,NULL,3} (3,) {"k2": [2, true, 4]} (2,2,2,2000,2.2,2.22,2-2,f,2022-03-12,"2022-03-12 02:00:00+00:00","2022-03-12 02:00:00",2.2200000000,"{""k2"": [2, true, 4]}")
57+
3 3 3 3000 3.3 3.33 3-3 t 2022-03-13 2022-03-13 03:00:00+00:00 2022-03-13 03:00:00 999999999999999999.9999999999 NULL NULL NULL 1 (3,3,3,3000,3.3,3.33,3-3,t,2022-03-13,"2022-03-13 03:00:00+00:00","2022-03-13 03:00:00",999999999999999999.9999999999,1)
58+
4 4 4 4000 4.4 4.44 4-4 f 2022-03-14 2022-03-14 04:00:00+00:00 2022-03-14 04:00:00 -999999999999999999.9999999999 NULL NULL NULL true (4,4,4,4000,4.4,4.44,4-4,f,2022-03-14,"2022-03-14 04:00:00+00:00","2022-03-14 04:00:00",-999999999999999999.9999999999,true)
59+
60+
statement ok
61+
DROP TABLE full_type_t;
62+
63+
64+

src/common/src/array/arrow/arrow_iceberg.rs

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ impl IcebergArrowConvert {
4444
FromArrow::from_record_batch(self, batch)
4545
}
4646

47+
pub fn type_from_field(&self, field: &arrow_schema::Field) -> Result<DataType, ArrayError> {
48+
FromArrow::from_field(self, field)
49+
}
50+
4751
pub fn to_arrow_field(
4852
&self,
4953
name: &str,
@@ -52,10 +56,6 @@ impl IcebergArrowConvert {
5256
ToArrow::to_arrow_field(self, name, data_type)
5357
}
5458

55-
pub fn type_from_field(&self, field: &arrow_schema::Field) -> Result<DataType, ArrayError> {
56-
FromArrow::from_field(self, field)
57-
}
58-
5959
pub fn struct_from_fields(
6060
&self,
6161
fields: &arrow_schema::Fields,
@@ -81,6 +81,36 @@ impl IcebergArrowConvert {
8181
}
8282

8383
impl ToArrow for IcebergArrowConvert {
84+
fn to_arrow_field(
85+
&self,
86+
name: &str,
87+
data_type: &DataType,
88+
) -> Result<arrow_schema::Field, ArrayError> {
89+
let data_type = match data_type {
90+
DataType::Boolean => self.bool_type_to_arrow(),
91+
DataType::Int16 => self.int32_type_to_arrow(),
92+
DataType::Int32 => self.int32_type_to_arrow(),
93+
DataType::Int64 => self.int64_type_to_arrow(),
94+
DataType::Int256 => self.int256_type_to_arrow(),
95+
DataType::Float32 => self.float32_type_to_arrow(),
96+
DataType::Float64 => self.float64_type_to_arrow(),
97+
DataType::Date => self.date_type_to_arrow(),
98+
DataType::Time => self.time_type_to_arrow(),
99+
DataType::Timestamp => self.timestamp_type_to_arrow(),
100+
DataType::Timestamptz => self.timestamptz_type_to_arrow(),
101+
DataType::Interval => self.interval_type_to_arrow(),
102+
DataType::Varchar => self.varchar_type_to_arrow(),
103+
DataType::Bytea => self.bytea_type_to_arrow(),
104+
DataType::Serial => self.serial_type_to_arrow(),
105+
DataType::Decimal => return Ok(self.decimal_type_to_arrow(name)),
106+
DataType::Jsonb => self.varchar_type_to_arrow(),
107+
DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
108+
DataType::List(datatype) => self.list_type_to_arrow(datatype)?,
109+
DataType::Map(datatype) => self.map_type_to_arrow(datatype)?,
110+
};
111+
Ok(arrow_schema::Field::new(name, data_type, true))
112+
}
113+
84114
#[inline]
85115
fn decimal_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
86116
// Fixed-point decimal; precision P, scale S Scale is fixed, precision must be less than 38.
@@ -207,10 +237,10 @@ impl ToArrow for IcebergCreateTableArrowConvert {
207237
let data_type = match value {
208238
// using the inline function
209239
DataType::Boolean => self.bool_type_to_arrow(),
210-
DataType::Int16 => self.int16_type_to_arrow(),
240+
DataType::Int16 => self.int32_type_to_arrow(),
211241
DataType::Int32 => self.int32_type_to_arrow(),
212242
DataType::Int64 => self.int64_type_to_arrow(),
213-
DataType::Int256 => self.int256_type_to_arrow(),
243+
DataType::Int256 => self.varchar_type_to_arrow(),
214244
DataType::Float32 => self.float32_type_to_arrow(),
215245
DataType::Float64 => self.float64_type_to_arrow(),
216246
DataType::Date => self.date_type_to_arrow(),
@@ -222,7 +252,7 @@ impl ToArrow for IcebergCreateTableArrowConvert {
222252
DataType::Bytea => self.bytea_type_to_arrow(),
223253
DataType::Serial => self.serial_type_to_arrow(),
224254
DataType::Decimal => return Ok(self.decimal_type_to_arrow(name)),
225-
DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
255+
DataType::Jsonb => self.varchar_type_to_arrow(),
226256
DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
227257
DataType::List(datatype) => self.list_type_to_arrow(datatype)?,
228258
DataType::Map(datatype) => self.map_type_to_arrow(datatype)?,

src/common/src/catalog/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ pub fn default_key_column_name_version_mapping(version: &ColumnDescVersion) -> &
9898
/// [this rfc](https://github.com/risingwavelabs/rfcs/pull/20).
9999
pub const KAFKA_TIMESTAMP_COLUMN_NAME: &str = "_rw_kafka_timestamp";
100100

101+
pub const RISINGWAVE_ICEBERG_ROW_ID: &str = "_risingwave_iceberg_row_id";
101102
pub fn is_system_schema(schema_name: &str) -> bool {
102103
SYSTEM_SCHEMAS.iter().any(|s| *s == schema_name)
103104
}

src/expr/impl/src/scalar/cast.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ pub fn jsonb_to_number<T: TryFrom<F64>>(v: JsonbRef<'_>) -> Result<T> {
8686
#[function("cast(int4) -> int2")]
8787
#[function("cast(int8) -> int2")]
8888
#[function("cast(int8) -> int4")]
89+
#[function("cast(int8) -> serial")]
8990
#[function("cast(serial) -> int8")]
9091
#[function("cast(float4) -> int2")]
9192
#[function("cast(float8) -> int2")]

src/expr/impl/tests/sig.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ fn test_func_sig_map() {
6060
let expected = expect_test::expect![[r#"
6161
[
6262
"cast(anyarray) -> character varying/anyarray",
63-
"cast(bigint) -> rw_int256/integer/smallint/numeric/double precision/real/character varying",
63+
"cast(bigint) -> rw_int256/serial/integer/smallint/numeric/double precision/real/character varying",
6464
"cast(boolean) -> integer/character varying",
6565
"cast(character varying) -> jsonb/interval/timestamp without time zone/time without time zone/date/rw_int256/real/double precision/numeric/smallint/integer/bigint/character varying/boolean/bytea/anyarray",
6666
"cast(date) -> timestamp without time zone/character varying",

src/frontend/planner_test/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,7 @@ impl TestCase {
607607
}
608608
};
609609

610-
let mut planner = Planner::new(context.clone());
610+
let mut planner = Planner::new_for_stream(context.clone());
611611

612612
let plan_root = match planner.plan(bound) {
613613
Ok(plan_root) => {

src/frontend/src/expr/type_inference/cast.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ pub static CAST_TABLE: LazyLock<CastTable> = LazyLock::new(|| {
309309
(". e a ", Boolean), // 0
310310
(" .iiiiii a ", Int16), // 1
311311
("ea.iiiii a ", Int32), // 2
312-
(" aa.iiii a ", Int64), // 3
312+
(" aa.iiii ae", Int64), // 3
313313
(" aaa.ii a ", Decimal), // 4
314314
(" aaaa.i a ", Float32), // 5
315315
(" aaaaa. a ", Float64), // 6

src/frontend/src/handler/create_mv.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ pub fn gen_create_mv_plan_bound(
125125
context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
126126
}
127127

128-
let mut plan_root = Planner::new(context).plan_query(query)?;
128+
let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
129129
if let Some(col_names) = col_names {
130130
for name in &col_names {
131131
check_valid_column_name(name)?;

src/frontend/src/handler/create_sink.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ pub async fn gen_sink_plan(
220220
};
221221

222222
let definition = context.normalized_sql().to_owned();
223-
let mut plan_root = Planner::new(context.into()).plan_query(bound)?;
223+
let mut plan_root = Planner::new_for_stream(context.into()).plan_query(bound)?;
224224
if let Some(col_names) = &col_names {
225225
plan_root.set_out_names(col_names.clone())?;
226226
};

src/frontend/src/handler/create_table.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use itertools::Itertools;
2424
use pgwire::pg_response::{PgResponse, StatementType};
2525
use risingwave_common::catalog::{
2626
CdcTableDesc, ColumnCatalog, ColumnDesc, Engine, TableId, TableVersionId, DEFAULT_SCHEMA_NAME,
27-
INITIAL_TABLE_VERSION_ID, ROWID_PREFIX,
27+
INITIAL_TABLE_VERSION_ID, RISINGWAVE_ICEBERG_ROW_ID, ROWID_PREFIX,
2828
};
2929
use risingwave_common::config::MetaBackend;
3030
use risingwave_common::license::Feature;
@@ -1530,12 +1530,14 @@ pub async fn create_iceberg_engine_table(
15301530

15311531
// For the table without primary key. We will use `_row_id` as primary key
15321532
let sink_from = if pks.is_empty() {
1533-
pks = vec![ROWID_PREFIX.to_string()];
1534-
let [stmt]: [_; 1] =
1535-
Parser::parse_sql(&format!("select {}, * from {}", ROWID_PREFIX, table_name))
1536-
.context("unable to parse query")?
1537-
.try_into()
1538-
.unwrap();
1533+
pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_string()];
1534+
let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1535+
"select {} as {}, * from {}",
1536+
ROWID_PREFIX, RISINGWAVE_ICEBERG_ROW_ID, table_name
1537+
))
1538+
.context("unable to parse query")?
1539+
.try_into()
1540+
.unwrap();
15391541

15401542
let Statement::Query(query) = &stmt else {
15411543
panic!("unexpected statement: {:?}", stmt);

src/frontend/src/handler/query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ fn gen_batch_query_plan(
241241
..
242242
} = bind_result;
243243

244-
let mut planner = Planner::new(context);
244+
let mut planner = Planner::new_for_batch(context);
245245

246246
let mut logical = planner.plan(bound)?;
247247
let schema = logical.schema();

0 commit comments

Comments
 (0)