EXPORT
EXPORTβ
Nameβ
EXPORT
Descriptionβ
The EXPORT
command is used to export the data of a specified table to a designated location as a file. Currently, it supports exporting to remote storage such as HDFS, S3, BOS, and COS (Tencent Cloud) through Broker process, S3 protocol, or HDFS protocol.
EXPORT
is an asynchronous operation, and the command submits an EXPORT JOB
to Doris. The task will be successfully submitted and returns immediately. After execution, you can use the SHOW EXPORT to view the progress.
grammar
EXPORT TABLE table_name
[PARTITION (p1[,p2])]
[WHERE]
TO export_path
[opt_properties]
WITH BROKER/S3/HDFS
[broker_properties];
principle
The bottom layer of the Export
statement actually executes the select...outfile..
statement. The Export
task will be decomposed into multiple select...outfile..
statements to execute concurrently according to the value of the parallelism
parameter. Each select...outfile..
is responsible for exporting some tablets of table.
illustrate:
-
table_name
The table name of the table currently being exported. Only the export of Doris local table / View / External Table data is supported.
-
partition
It is possible to export only some specified partitions of the specified table
-
export_path
The exported file path can be a directory or a file directory with a file prefix, for example:
hdfs://path/to/my_file_
-
opt_properties
Used to specify some export parameters.
[PROPERTIES ("key"="value", ...)]
The following parameters can be specified:
-
label
: This parameter is optional, specifies the label of the export task. If this parameter is not specified, the system randomly assigns a label to the export task. -
column_separator
: Specifies the exported column separator, default is\t
, mulit-bytes is supported. This parameter is only used forCSV
file format. -
line_delimiter
: Specifies the line delimiter for export, the default is\n
, mulit-bytes is supported. This parameter is only used forCSV
file format. -
timeout
: The timeout period of the export job, the default is 2 hours, the unit is seconds. -
columns
: Specifies certain columns of the export job table -
format
: Specifies the file format, support: parquet, orc, csv, csv_with_names, csv_with_names_and_types.The default is csv format. -
parallelism
: The concurrency degree of theexport
job, the default is1
. The export job will be divided intoselect..outfile..
statements of the number ofparallelism
to execute concurrently. (If the value ofparallelism
is greater than the number of tablets in the table, the system will automatically setparallelism
to the number of tablets, that is, eachselect..outfile..
statement is responsible for one tablet) -
delete_existing_files
: defaultfalse
. If it is specified as true, you will first delete all files specified in the directory specified by the file_path, and then export the data to the directory.For example: "file_path" = "/user/tmp", then delete all files and directory under "/user/"; "file_path" = "/user/tmp/", then delete all files and directory under "/user/tmp/" -
max_file_size
: it is the limit for the size of a single file in the export job. If the result file exceeds this value, it will be split into multiple files. The valid range formax_file_size
is [5MB, 2GB], with a default value of 1GB. (When exporting to the ORC file format, the actual size of the split files will be multiples of 64MB, for example, if max_file_size is specified as 5MB, the actual split size will be 64MB; if max_file_size is specified as 65MB, the actual split size will be 128MB.) -
with_bom
: The default is false. If it is set to true, the exported file is encoded in UTF8 with BOM (valid only for CSV-related file format). -
timeout
: This is the timeout parameter of the export job, the default timeout is 2 hours, and the unit is seconds. -
compress_type
: (since 2.1.5) When specifying the export file format as Parquet or ORC, you can choose the compression method for the Parquet or ORC files. For Parquet file format, you can specify the compression method as SNAPPY, GZIP, BROTLI, ZSTD, LZ4, or PLAIN, with the default being SNAPPY. For ORC file format, you can specify the compression method as PLAIN, SNAPPY, ZLIB, or ZSTD, with the default being ZLIB. This parameter is supported starting from version 2.1.5. (PLAIN means no compression is used.)
Note that to use the
delete_existing_files
parameter, you also need to add the configurationenable_delete_existing_files = true
to the fe.conf file and restart the FE. Only then will thedelete_existing_files
parameter take effect. Settingdelete_existing_files = true
is a dangerous operation and it is recommended to only use it in a testing environment. -
-
WITH BROKER
The export function needs to write data to the remote storage through the Broker process. Here you need to define the relevant connection information for the broker to use.
WITH BROKER "broker_name"
("key"="value"[,...])
Broker related properties:
username: user name
password: password
hadoop.security.authentication: specify the authentication method as kerberos
kerberos_principal: specifies the principal of kerberos
kerberos_keytab: specifies the path to the keytab file of kerberos. The file must be the absolute path to the file on the server where the broker process is located. and can be accessed by the Broker process -
WITH HDFS
You can directly write data to the remote HDFS.
WITH HDFS ("key"="value"[,...])
HDFS related properties:
fs.defaultFS: namenode address and port
hadoop.username: hdfs username
dfs.nameservices: if hadoop enable HA, please set fs nameservice. See hdfs-site.xml
dfs.ha.namenodes.[nameservice ID]οΌunique identifiers for each NameNode in the nameservice. See hdfs-site.xml
dfs.namenode.rpc-address.[nameservice ID].[name node ID]: the fully-qualified RPC address for each NameNode to listen on. See hdfs-site.xml
dfs.client.failover.proxy.provider.[nameservice ID]οΌthe Java class that HDFS clients use to contact the Active NameNode, usually it is org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
For a kerberos-authentication enabled Hadoop cluster, additional properties need to be set:
dfs.namenode.kerberos.principal: HDFS namenode service principal
hadoop.security.authentication: kerberos
hadoop.kerberos.principal: the Kerberos pincipal that Doris will use when connectiong to HDFS.
hadoop.kerberos.keytab: HDFS client keytab location. -
WITH S3
You can directly write data to a remote S3 object store
WITH S3 ("key"="value"[,...])
S3 related properties:
AWS_ENDPOINT
AWS_ACCESS_KEY
AWS_SECRET_KEY
AWS_REGION
use_path_style: (optional) default false . The S3 SDK uses the virtual-hosted style by default. However, some object storage systems may not be enabled or support virtual-hosted style access. At this time, we can add the use_path_style parameter to force the use of path style access method.
Exampleβ
export to localβ
Export data to the local file system needs to add
enable_outfile_to_local = true
to the fe.conf and restart the Fe.
- You can export the
test
table to a local store. Export csv format file by default.
EXPORT TABLE test TO "file:///home/user/tmp/";
- You can export the k1 and k2 columns in
test
table to a local store, and set export label. Export csv format file by default.
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
"label" = "label1",
"columns" = "k1,k2"
);
- You can export the rows where
k1 < 50
intest
table to a local store, and set column_separator to,
. Export csv format file by default.
EXPORT TABLE test WHERE k1 < 50 TO "file:///home/user/tmp/"
PROPERTIES (
"columns" = "k1,k2",
"column_separator"=","
);
- Export partitions p1 and p2 from the test table to local storage, with the default exported file format being csv.
EXPORT TABLE test PARTITION (p1,p2) TO "file:///home/user/tmp/"
PROPERTIES ("columns" = "k1,k2");
- Export all data in the test table to local storage with a non-default file format.
// parquet file format
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
"columns" = "k1,k2",
"format" = "parquet"
);
// orc file format
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
"columns" = "k1,k2",
"format" = "orc"
);
// csv_with_names file format. Using 'AA' as the column delimiter and 'zz' as the line delimiter.
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
"format" = "csv_with_names",
"column_separator"="AA",
"line_delimiter" = "zz"
);
// csv_with_names_and_types file format
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
"format" = "csv_with_names_and_types"
);
- set max_file_sizes
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
"format" = "parquet",
"max_file_size" = "5MB"
);
When the exported file size is larger than 5MB, the data will be split into multiple files, with each file containing a maximum of 5MB.
- set parallelism
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
"format" = "parquet",
"max_file_size" = "5MB",
"parallelism" = "5"
);
- set delete_existing_files
EXPORT TABLE test TO "file:///home/user/tmp"
PROPERTIES (
"format" = "parquet",
"max_file_size" = "5MB",
"delete_existing_files" = "true"
)
Before exporting data, all files and directories in the /home/user/
directory will be deleted, and then the data will be exported to that directory.
export with S3β
- Export all data from the
testTbl
table to S3 using invisible character '\x07' as a delimiter for columns and rows.If you want to export data to minio, you also need to specify use_path_style=true.
EXPORT TABLE testTbl TO "s3://bucket/a/b/c"
PROPERTIES (
"column_separator"="\\x07",
"line_delimiter" = "\\x07"
) WITH s3 (
"s3.endpoint" = "xxxxx",
"s3.region" = "xxxxx",
"s3.secret_key"="xxxx",
"s3.access_key" = "xxxxx"
)
- Export all data in the test table to HDFS in the format of parquet, limit the size of a single file to 1024MB, and reserve all files in the specified directory.
export with HDFSβ
- Export all data from the
test
table to HDFS inParquet
format, with a limit of 512MB for the size of a single file in the export job, and retain all files under the specified directory.
EXPORT TABLE test TO "hdfs://hdfs_host:port/a/b/c/"
PROPERTIES(
"format" = "parquet",
"max_file_size" = "512MB",
"delete_existing_files" = "false"
)
with HDFS (
"fs.defaultFS"="hdfs://hdfs_host:port",
"hadoop.username" = "hadoop"
);
export with Brokerβ
You need to first start the broker process and add it to the FE before proceeding.
- Export the
test
table to hdfs
EXPORT TABLE test TO "hdfs://hdfs_host:port/a/b/c"
WITH BROKER "broker_name"
(
"username"="xxx",
"password"="yyy"
);
- Export partitions 'p1' and 'p2' from the 'testTbl' table to HDFS using ',' as the column delimiter and specifying a label.
EXPORT TABLE testTbl PARTITION (p1,p2) TO "hdfs://hdfs_host:port/a/b/c"
PROPERTIES (
"label" = "mylabel",
"column_separator"=","
)
WITH BROKER "broker_name"
(
"username"="xxx",
"password"="yyy"
);
- Export all data from the 'testTbl' table to HDFS using the non-visible character '\x07' as the column and row delimiter.
EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c"
PROPERTIES (
"column_separator"="\\x07",
"line_delimiter" = "\\x07"
)
WITH BROKER "broker_name"
(
"username"="xxx",
"password"="yyy"
)
Keywordsβ
EXPORT
Best Practiceβ
Concurrent Exportβ
An Export job can be configured with the parallelism
parameter to concurrently export data. The parallelism
parameter specifies the number of threads to execute the EXPORT Job
. Each thread is responsible for exporting a subset of the total tablets.
The underlying execution logic of an Export Job
is actually the SELECT INTO OUTFILE
statement. Each thread specified by the parallelism
parameter executes independent SELECT INTO OUTFILE
statements.
The specific logic for splitting an Export Job
into multiple SELECT INTO OUTFILE
is, to evenly distribute all the tablets of the table among all parallel threads. For example:
- If num(tablets) = 40 and parallelism = 3, then the three threads will be responsible for 14, 13, and 13 tablets, respectively.
- If num(tablets) = 2 and parallelism = 3, then Doris automatically sets the parallelism to 2, and each thread is responsible for one tablet.
When the number of tablets responsible for a thread exceeds the maximum_tablets_of_outfile_in_export
value (default is 10, and can be modified by adding the maximum_tablets_of_outfile_in_export
parameter in fe.conf), the thread will split the tablets which are responsibled for this thread into multiple SELECT INTO OUTFILE
statements. For example:
- If a thread is responsible for 14 tablets and
maximum_tablets_of_outfile_in_export = 10
, then the thread will be responsible for twoSELECT INTO OUTFILE
statements. The firstSELECT INTO OUTFILE
statement exports 10 tablets, and the secondSELECT INTO OUTFILE
statement exports 4 tablets. The twoSELECT INTO OUTFILE
statements are executed serially by this thread.
memory limitβ
The query plan for an Export Job
typically involves only scanning and exporting
, and does not involve compute logic that requires a lot of memory. Therefore, the default memory limit of 2GB is usually sufficient to meet the requirements.
However, in certain scenarios, such as a query plan that requires scanning too many tablets on the same BE, or when there are too many data versions of tablets, it may result in insufficient memory. In these cases, you can adjust the session variable exec_mem_limit
to increase the memory usage limit.
Precautionsβ
-
Exporting a large amount of data at one time is not recommended. The maximum recommended export data volume for an Export job is several tens of GB. An overly large export results in more junk files and higher retry costs. If the amount of table data is too large, it is recommended to export by partition.
-
If the Export job fails, the generated files will not be deleted, and the user needs to delete it manually.
-
The Export job only exports the data of the Base table / View / External table, not the data of the materialized view.
-
The export job scans data and occupies IO resources, which may affect the query latency of the system.
-
Currently, The
Export Job
is simply check whether theTablets version
is the same, it is recommended not to import data during the execution of theExport Job
. -
The maximum number of partitions that an
Export job
allows is 2000. You can add a parameter to the fe.confmaximum_number_of_export_partitions
and restart FE to modify the setting.