Transforming Data During Load
Doris provides powerful data transformation capabilities during data loading, allowing you to process data without additional ETL tools. It mainly supports four types of transformations:
-
Column Mapping: Map source data columns to different columns in the target table.
-
Column Transformation: Transform source data in real-time using functions and expressions.
-
Pre-filtering: Filter out unwanted raw data before column mapping and transformation.
-
Post-filtering: Filter the final results after column mapping and transformation.
Through these built-in data transformation functions, you can significantly simplify the data processing workflow, improve loading efficiency, and ensure consistency in data processing logic.
Load Syntaxβ
Stream Load
Add columns
and where
parameters in the HTTP header.
-
columns
specifies column mapping and transformation. -
where
specifies post-filtering. -
Pre-filtering is not supported.
curl --location-trusted -u user:passwd \
-H "columns: k1, k2, tmp_k3, k3 = tmp_k3 + 1" \
-H "where: k1 > 1"\
-T data.csv \
http://host:port/api/example_db/example_table/_stream_load
Broker Load
Define data transformations in SQL statements, where:
-
(k1, k2, tmp_k3)
specifies column mapping. -
SET
specifies column transformation. -
PRECEDING FILTER
specifies pre-filtering. -
WHERE
specifies post-filtering.
LOAD LABEL test_db.label1
(
DATA INFILE("s3://bucket_name/data.csv")
INTO TABLE `test_tbl`
(k1, k2, tmp_k3)
PRECEDING FILTER k1 = 1
SET (
k3 = tmp_k3 + 1
)
WHERE k1 > 1
)
WITH S3 (...);
Routine Load
Define data transformations in SQL statements, where:
-
COLUMNS
specifies column mapping and transformation. -
PRECEDING FILTER
specifies pre-filtering. -
WHERE
specifies post-filtering.
CREATE ROUTINE LOAD test_db.label1 ON test_tbl
COLUMNS(k1, k2, tmp_k3, k3 = tmp_k3 + 1),
PRECEDING FILTER k1 = 1,
WHERE k1 > 1
...
Column Mappingβ
Column mapping is used to define the correspondence between source data columns and target table columns. It can handle the following scenarios:
-
The order of source data columns and target table columns is inconsistent.
-
The number of source data columns and target table columns is inconsistent.
Exampleβ
Suppose we have the following source data (column names are only for illustration purposes and do not actually exist):
Column1,Column2,Column3,Column4
1,100,beijing,1.1
2,200,shanghai,1.2
3,300,guangzhou,1.3
4,\N,chongqing,1.4
Note: \N
in the source file represents NULL.
Adjusting Column Orderβ
Suppose the target table has four columns: k1, k2, k3, and k4. We want to map the columns as follows:
- Column1 -> k1
- Column2 -> k3
- Column3 -> k2
- Column4 -> k4
Creating the Target Tableβ
CREATE TABLE example_table
(
k1 INT,
k2 STRING,
k3 INT,
k4 DOUBLE
) ENGINE = OLAP
DUPLICATE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1;
Loading Dataβ
- Stream Load
curl --location-trusted -u user:passwd \
-H "column_separator:," \
-H "columns: k1,k3,k2,k4" \
-T data.csv \
-X PUT \
http://<fe_ip>:<fe_http_port>/api/example_db/example_table/_stream_load
- Broker Load
LOAD LABEL example_db.label_broker
(
DATA INFILE("s3://bucket_name/data.csv")
INTO TABLE example_table
COLUMNS TERMINATED BY ","
(k1, k3, k2, k4)
)
WITH s3 (...);
- Routine Load
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table
COLUMNS(k1, k3, k2, k4),
COLUMNS TERMINATED BY ","
FROM KAFKA (...);
Query Resultsβ
mysql> select * from example_table;
+------+-----------+------+------+
| k1 | k2 | k3 | k4 |
+------+-----------+------+------+
| 2 | shanghai | 200 | 1.2 |
| 4 | chongqing | NULL | 1.4 |
| 3 | guangzhou | 300 | 1.3 |
| 1 | beijing | 100 | 1.1 |
+------+-----------+------+------+
Source File Columns Exceed Table Columnsβ
Suppose the target table has three columns: k1, k2, and k3. We only need the first three columns of the source file. The mapping relationship is as follows:
- Column1 -> k1
- Column2 -> k2
- Column4 -> k3
Creating the Target Tableβ
CREATE TABLE example_table
(
k1 INT,
k2 STRING,
k3 DOUBLE
) ENGINE = OLAP
DUPLICATE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1;
Loading Dataβ
- Stream Load
curl --location-trusted -u user:passwd \
-H "column_separator:," \
-H "columns: k1,k2,tmp_skip,k3" \
-T data.csv \
http://<fe_ip>:<fe_http_port>/api/example_db/example_table/_stream_load
- Broker Load
LOAD LABEL example_db.label_broker
(
DATA INFILE("s3://bucket_name/data.csv")
INTO TABLE example_table
COLUMNS TERMINATED BY ","
(tmp_k1, tmp_k2, tmp_skip, tmp_k3)
SET (
k1 = tmp_k1,
k2 = tmp_k2,
k3 = tmp_k3
)
)
WITH s3 (...);
- Routine Load
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table
COLUMNS(k1, k2, tmp_skip, k3),
PROPERTIES
(
"format" = "csv",
"column_separator" = ","
)
FROM KAFKA (...);
The third column of the source file is named tmp_skip, which is a custom column name that does not exist in the table. It is ignored during loading and not written to the target table.
Query Resultsβ
mysql> select * from example_table;
+------+------+------+
| k1 | k2 | k3 |
+------+------+------+
| 1 | 100 | 1.1 |
| 2 | 200 | 1.2 |
| 3 | 300 | 1.3 |
| 4 | NULL | 1.4 |
+------+------+------+
Source File Columns Less Than Table Columnsβ
Suppose the target table has five columns: k1, k2, k3, k4, and k5. We only use the first three columns of the source file:
- Column1 -> k1
- Column2 -> k3
- Column3 -> k2
- Column4 -> k4
- k5 uses the default value
Creating the Target Tableβ
CREATE TABLE example_table
(
k1 INT,
k2 STRING,
k3 INT,
k4 DOUBLE,
k5 INT DEFAULT 2
) ENGINE = OLAP
DUPLICATE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1;
Loading Dataβ
- Stream Load
curl --location-trusted -u user:passwd \
-H "column_separator:," \
-H "columns: k1,k3,k2,k4" \
-T data.csv \
http://<fe_ip>:<fe_http_port>/api/example_db/example_table/_stream_load
- Broker Load
LOAD LABEL example_db.label_broker
(
DATA INFILE("s3://bucket_name/data.csv")
INTO TABLE example_table
COLUMNS TERMINATED BY ","
(tmp_k1, tmp_k3, tmp_k2, tmp_k4)
SET (
k1 = tmp_k1,
k3 = tmp_k3,
k2 = tmp_k2
k4 = tmp_k4
)
)
WITH s3 (...);
- Routine Load
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table
COLUMNS(k1, k3, k2, k4),
COLUMNS TERMINATED BY ","
FROM KAFKA (...);
Note:
- If k5 has a default value, it will be used.
- If it is a nullable column, it will be filled with NULL.
- If it is a non-nullable column without a default value, the load will fail.
Query Resultsβ
mysql> select * from example_table;
+------+-----------+------+------+------+
| k1 | k2 | k3 | k4 | k5 |
+------+-----------+------+------+------+
| 1 | beijing | 100 | 1.1 | 2 |
| 2 | shanghai | 200 | 1.2 | 2 |
| 3 | guangzhou | 300 | 1.3 | 2 |
| 4 | chongqing | NULL | 1.4 | 2 |
+------+-----------+------+------+------+
Column Transformationβ
Column transformation allows you to transform source data column values. It supports most built-in functions. Column transformation is usually defined together with column mapping.
Exampleβ
Suppose we have the following source data (column names are only for illustration purposes and do not actually exist):
Column1,Column2,Column3,Column4
1,100,beijing,1.1
2,200,shanghai,1.2
3,300,guangzhou,1.3
4,\N,chongqing,1.4
Transforming Source Data Column Valuesβ
Suppose the target table has four columns: k1, k2, k3, and k4. We want to transform the column values as follows:
- Column1 -> k1
- Column2 * 100 -> k3
- Column3 -> k2
- Column4 -> k4
Creating the Target Tableβ
CREATE TABLE example_table
(
k1 INT,
k2 STRING,
k3 INT,
k4 DOUBLE
)
ENGINE = OLAP
DUPLICATE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1;
Loading Dataβ
- Stream Load
curl --location-trusted -u user:passwd \
-H "column_separator:," \
-H "columns: k1, tmp_k3, k2, k4, k3 = tmp_k3 * 100" \
-T data.csv \
http://host:port/api/example_db/example_table/_stream_load
- Broker Load
LOAD LABEL example_db.label1
(
DATA INFILE("s3://bucket_name/data.csv")
INTO TABLE example_table
COLUMNS TERMINATED BY ","
(k1, tmp_k3, k2, k4)
SET (
k3 = tmp_k3 * 100
)
)
WITH s3 (...);
- Routine Load
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table
COLUMNS(k1, tmp_k3, k2, k4, k3 = tmp_k3 * 100),
COLUMNS TERMINATED BY ","
FROM KAFKA (...);
Query Resultsβ
mysql> select * from example_table;
+------+------+-------+------+
| k1 | k2 | k3 | k4 |
+------+------+-------+------+
| 1 | beijing | 10000 | 1.1 |
| 2 | shanghai | 20000 | 1.2 |
| 3 | guangzhou | 30000 | 1.3 |
| 4 | chongqing | NULL | 1.4 |
+------+------+-------+------+
Using Case When Function for Conditional Transformationβ
Suppose the target table has four columns: k1, k2, k3, and k4. We want to transform the column values as follows:
- Column1 -> k1
- Column2 -> k2
- Column3 -> k3 (transformed to area ID)
- Column4 -> k4
Creating the Target Tableβ
CREATE TABLE example_table
(
k1 INT,
k2 INT,
k3 INT,
k4 DOUBLE
)
ENGINE = OLAP
DUPLICATE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1;
Loading Dataβ
- Stream Load
curl --location-trusted -u user:passwd \
-H "column_separator:," \
-H "columns: k1, k2, tmp_k3, k4, k3 = CASE tmp_k3 WHEN 'beijing' THEN 1 WHEN 'shanghai' THEN 2 WHEN 'guangzhou' THEN 3 WHEN 'chongqing' THEN 4 ELSE NULL END" \
-T data.csv \
http://host:port/api/example_db/example_table/_stream_load
- Broker Load
LOAD LABEL example_db.label1
(
DATA INFILE("s3://bucket_name/data.csv")
INTO TABLE example_table
COLUMNS TERMINATED BY ","
(k1, k2, tmp_k3, k4)
SET (
k3 = CASE tmp_k3 WHEN 'beijing' THEN 1 WHEN 'shanghai' THEN 2 WHEN 'guangzhou' THEN 3 WHEN 'chongqing' THEN 4 ELSE NULL END
)
)
WITH s3 (...);
- Routine Load
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table
COLUMNS(k1, k2, tmp_k3, k4, k3 = CASE tmp_k3 WHEN 'beijing' THEN 1 WHEN 'shanghai' THEN 2 WHEN 'guangzhou' THEN 3 WHEN 'chongqing' THEN NULL END),
COLUMNS TERMINATED BY ","
FROM KAFKA (...);
Query Resultsβ
mysql> select * from example_table;
+------+------+------+------+
| k1 | k2 | k3 | k4 |
+------+------+------+------+
| 1 | 100 | 1 | 1.1 |
| 2 | 200 | 2 | 1.2 |
| 3 | 300 | 3 | 1.3 |
| 4 | NULL | 4 | 1.4 |
+------+------+------+------+
Handling NULL Values in Source Filesβ
Suppose the target table has four columns: k1, k2, k3, and k4. We want to transform the column values as follows:
- Column1 -> k1
- Column2 (if NULL, transform to 0) -> k2
- Column3 -> k3
- Column4 -> k4
Creating the Target Tableβ
CREATE TABLE example_table
(
k1 INT,
k2 INT,
k3 INT,
k4 DOUBLE
)
ENGINE = OLAP
DUPLICATE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1;
Loading Dataβ
- Stream Load
curl --location-trusted -u user:passwd \
-H "column_separator:," \
-H "columns: k1, tmp_k2, tmp_k3, k4, k2 = ifnull(tmp_k2, 0), k3 = CASE tmp_k3 WHEN 'beijing' THEN 1 WHEN 'shanghai' THEN 2 WHEN 'guangzhou' THEN 3 WHEN 'chongqing' THEN 4 ELSE NULL END" \
-T data.csv \
http://host:port/api/example_db/example_table/_stream_load
- Broker Load
LOAD LABEL example_db.label1
(
DATA INFILE("s3://bucket_name/data.csv")
INTO TABLE example_table
COLUMNS TERMINATED BY ","
(k1, tmp_k2, tmp_k3, k4)
SET (
k2 = ifnull(tmp_k2, 0),
k3 = CASE tmp_k3 WHEN 'beijing' THEN 1 WHEN 'shanghai' THEN 2 WHEN 'guangzhou' THEN 3 WHEN 'chongqing' THEN 4 ELSE NULL END
)
)
WITH s3 (...);
- Routine Load
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table
COLUMNS(k1, tmp_k2, tmp_k3, k4, k2 = ifnull(tmp_k2, 0), k3 = CASE tmp_k3 WHEN 'beijing' THEN 1 WHEN 'shanghai' THEN 2 WHEN 'guangzhou' THEN 3 WHEN 'chongqing' THEN 4 ELSE NULL END),
COLUMNS TERMINATED BY ","
FROM KAFKA (...);
Query Resultsβ
mysql> select * from example_table;
+------+------+------+------+
| k1 | k2 | k3 | k4 |
+------+------+------+------+
| 1 | 100 | 1 | 1.1 |
| 2 | 200 | 2 | 1.2 |
| 3 | 300 | 3 | 1.3 |
| 4 | 0 | 4 | 1.4 |
+------+------+------+------+
Pre-filteringβ
Pre-filtering is used to filter out unwanted raw data before column mapping and transformation. It is only supported in Broker Load and Routine Load.
Exampleβ
Suppose we have the following source data (column names are only for illustration purposes and do not actually exist):
Column1,Column2,Column3,Column4
1,100,beijing,1.1
2,200,shanghai,1.2
3,300,guangzhou,1.3
4,\N,chongqing,1.4
Pre-filtering condition:
Column1 > 1
Creating the Target Tableβ
CREATE TABLE example_table
(
k1 INT,
k2 INT,
k3 STRING,
k4 DOUBLE
)
ENGINE = OLAP
DUPLICATE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1;
Loading Dataβ
- Broker Load
LOAD LABEL example_db.label1
(
DATA INFILE("s3://bucket_name/data.csv")
INTO TABLE example_table
COLUMNS TERMINATED BY ","
(k1, k2, k3, k4)
PRECEDING FILTER k1 > 1
)
WITH s3 (...);
- Routine Load
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table
COLUMNS(k1, k2, k3, k4),
COLUMNS TERMINATED BY ","
PRECEDING FILTER k1 > 1
FROM KAFKA (...);
Query Resultsβ
mysql> select * from example_table;
+------+------+-----------+------+
| k1 | k2 | k3 | k4 |
+------+------+-----------+------+
| 2 | 200 | shanghai | 1.2 |
| 3 | 300 | guangzhou | 1.3 |
| 4 | NULL | chongqing | 1.4 |
+------+------+-----------+------+
Post-filteringβ
Post-filtering is used to filter the final results after column mapping and transformation.
Exampleβ
Suppose we have the following source data (column names are only for illustration purposes and do not actually exist):
Column1,Column2,Column3,Column4
1,100,beijing,1.1
2,200,shanghai,1.2
3,300,guangzhou,1.3
4,\N,chongqing,1.4
Filtering Without Column Mapping and Transformationβ
Suppose the target table has four columns: k1, k2, k3, and k4. We want to filter out rows where Column4 is greater than 1.2.
Creating the Target Tableβ
CREATE TABLE example_table
(
k1 INT,
k2 INT,
k3 STRING,
k4 DOUBLE
)
ENGINE = OLAP
DUPLICATE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1;
Loading Dataβ
- Stream Load
curl --location-trusted -u user:passwd \
-H "column_separator:," \
-H "columns: k1, k2, k3, k4" \
-H "where: k4 > 1.2" \
-T data.csv \
http://host:port/api/example_db/example_table/_stream_load
- Broker Load
LOAD LABEL example_db.label1
(
DATA INFILE("s3://bucket_name/data.csv")
INTO TABLE example_table
COLUMNS TERMINATED BY ","
(k1, k2, k3, k4)
where k4 > 1.2
)
WITH s3 (...);
- Routine Load
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table
COLUMNS(k1, k2, k3, k4),
COLUMNS TERMINATED BY ","
WHERE k4 > 1.2;
FROM KAFKA (...);
Query Resultsβ
mysql> select * from example_table;
+------+------+-----------+------+
| k1 | k2 | k3 | k4 |
+------+------+-----------+------+
| 3 | 300 | guangzhou | 1.3 |
| 4 | NULL | chongqing | 1.4 |
+------+------+-----------+------+
Filtering After Column Transformationβ
Suppose the target table has four columns: k1, k2, k3, and k4. We want to transform Column3 to area ID and filter out rows where the area ID is 3.
Creating the Target Tableβ
CREATE TABLE example_table
(
k1 INT,
k2 INT,
k3 INT,
k4 DOUBLE
)
ENGINE = OLAP
DUPLICATE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1;
Loading Dataβ
- Stream Load
curl --location-trusted -u user:passwd \
-H "column_separator:," \
-H "columns: k1, k2, tmp_k3, k4, k3 = case tmp_k3 when 'beijing' then 1 when 'shanghai' then 2 when 'guangzhou' then 3 when 'chongqing' then 4 else null end" \
-H "where: k3 != 3" \
-T data.csv \
http://host:port/api/example_db/example_table/_stream_load
- Broker Load
LOAD LABEL example_db.label1
(
DATA INFILE("s3://bucket_name/data.csv")
INTO TABLE example_table
COLUMNS TERMINATED BY ","
(k1, k2, tmp_k3, k4)
SET (
k3 = CASE tmp_k3 WHEN 'beijing' THEN 1 WHEN 'shanghai' THEN 2 WHEN 'guangzhou' THEN 3 WHEN 'chongqing' THEN 4 ELSE NULL END
)
WHERE k3 != 3
)
WITH s3 (...);
- Routine Load
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table
COLUMNS(k1, k2, tmp_k3, k4),
COLUMNS TERMINATED BY ","
SET (
k3 = CASE tmp_k3 WHEN 'beijing' THEN 1 WHEN 'shanghai' THEN 2 WHEN 'guangzhou' THEN 3 WHEN 'chongqing' THEN 4 ELSE NULL END
)
WHERE k3 != 3;
FROM KAFKA (...);
Query Resultsβ
mysql> select * from example_table;
+------+------+------+------+
| k1 | k2 | k3 | k4 |
+------+------+------+------+
| 1 | 100 | 1 | 1.1 |
| 2 | 200 | 2 | 1.2 |
| 4 | NULL | 4 | 1.4 |
+------+------+------+------+
Multiple Conditionsβ
Suppose the target table has four columns: k1, k2, k3, and k4. We want to filter out rows where k1 is NULL and k4 is less than 1.2.
Creating the Target Tableβ
CREATE TABLE example_table
(
k1 INT,
k2 INT,
k3 STRING,
k4 DOUBLE
)
ENGINE = OLAP
DUPLICATE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1;
Loading Dataβ
- Stream Load
curl --location-trusted -u user:passwd \
-H "column_separator:," \
-H "columns: k1, k2, k3, k4" \
-H "where: k1 is not null and k4 > 1.2" \
-T data.csv \
http://host:port/api/example_db/example_table/_stream_load
- Broker Load
LOAD LABEL example_db.label1
(
DATA INFILE("s3://bucket_name/data.csv")
INTO TABLE example_table
COLUMNS TERMINATED BY ","
(k1, k2, k3, k4)
where k1 is not null and k4 > 1.2
)
WITH s3 (...);
- Routine Load
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table
COLUMNS(k1, k2, k3, k4),
COLUMNS TERMINATED BY ","
WHERE k1 is not null and k4 > 1.2
FROM KAFKA (...);
Query Resultsβ
mysql> select * from example_table;
+------+------+-----------+------+
| k1 | k2 | k3 | k4 |
+------+------+-----------+------+
| 3 | 300 | guangzhou | 1.3 |
| 4 | NULL | chongqing | 1.4 |
+------+------+-----------+------+