Fluent Bit Doris Output Plugin
Fluent Bit is a fast log processor and forwarder that supports custom output plugins to write data into storage systems, with the Fluent Bit Doris output plugin being the one for outputting to Doris.
By invoking the Doris Stream Load HTTP interface, the Fluent Bit Doris output plugin writes data into Doris in real-time, offering capabilities such as multi-threaded concurrency, failure retries, custom Stream Load formats and parameters, and output write speed.
To use the Fluent Bit Doris output plugin, there are three main steps:
- Download or compile the Fluent Bit binary program that includes the Doris output plugin.
- Configure the Fluent Bit output address and other parameters.
- Start Fluent Bit to write data into Doris in real-time.
Installation (alpha)
Download from the Official Website
https://github.com/joker-star-l/fluent-bit/releases/download/v3.1.9-alpha/fluent-bit
Compile from Source Code
Clone the dev branch of https://github.com/joker-star-l/fluent-bit and run the following commands in the build/ directory
cmake -DFLB_RELEASE=ON ..
make
The build output is build/bin/fluent-bit.
Configuration
The configuration for the Fluent Bit Doris output plugin is as follows:
Configuration | Description |
---|---|
host | Stream Load HTTP host |
port | Stream Load HTTP port |
user | Doris username, this user needs to have import permissions for the corresponding Doris database and table |
password | Doris user's password |
database | The Doris database name to write into |
table | The Doris table name to write into |
label_prefix | Doris Stream Load Label prefix,the final generated Label is {label_prefix}_{timestamp}_{uuid} ,the default value is fluentbit. If set to false, no Label will be added |
time_key | The name of the timestamp column to add to the data. The default value is date. If set to false, the column will not be added |
header | Doris Stream Load headers parameter, can be set more than one |
log_request | Whether to output Doris Stream Load request and response metadata in logs for troubleshooting, default is true |
log_progress_interval | Time interval for outputting speed in logs, unit is seconds, default is 10, setting to 0 can disable this type of logging |
retry_limit | Doris Stream Load request failure retry number, default value is 1, if set to false will not limit the number of retries |
workers | Number of workers to perform Doris Stream Load, default value is 2 |
Usage Example
TEXT Log Collection Example
This example demonstrates TEXT log collection using Doris FE logs as an example.
1. Data
FE log files are typically located at the fe/log/fe.log file under the Doris installation directory. They are typical Java program logs, including fields such as timestamp, log level, thread name, code location, and log content. Not only do they contain normal logs, but also exception logs with stacktraces, which are multiline. Log collection and storage need to combine the main log and stacktrace into a single log entry.
2024-07-08 21:18:01,432 INFO (Statistics Job Appender|61) [StatisticsJobAppender.runAfterCatalogReady():70] Stats table not available, skip
2024-07-08 21:18:53,710 WARN (STATS_FETCH-0|208) [StmtExecutor.executeInternalQuery():3332] Failed to run internal SQL: OriginStatement{originStmt='SELECT * FROM __internal_schema.column_statistics WHERE part_id is NULL ORDER BY update_time DESC LIMIT 500000', idx=0}
org.apache.doris.common.UserException: errCode = 2, detailMessage = tablet 10031 has no queryable replicas. err: replica 10032's backend 10008 does not exist or not alive
at org.apache.doris.planner.OlapScanNode.addScanRangeLocations(OlapScanNode.java:931) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.planner.OlapScanNode.computeTabletInfo(OlapScanNode.java:1197) ~[doris-fe.jar:1.2-SNAPSHOT]
2. Table Creation
The table structure includes fields such as the log's creation time, collection time, hostname, log file path, log type, log level, thread name, code location, and log content.
CREATE TABLE `doris_log` (
`log_time` datetime NULL COMMENT 'log content time',
`collect_time` datetime NULL COMMENT 'log agent collect time',
`host` text NULL COMMENT 'hostname or ip',
`path` text NULL COMMENT 'log file path',
`type` text NULL COMMENT 'log type',
`level` text NULL COMMENT 'log level',
`thread` text NULL COMMENT 'log thread',
`position` text NULL COMMENT 'log code position',
`message` text NULL COMMENT 'log message',
INDEX idx_host (`host`) USING INVERTED COMMENT '',
INDEX idx_path (`path`) USING INVERTED COMMENT '',
INDEX idx_type (`type`) USING INVERTED COMMENT '',
INDEX idx_level (`level`) USING INVERTED COMMENT '',
INDEX idx_thread (`thread`) USING INVERTED COMMENT '',
INDEX idx_position (`position`) USING INVERTED COMMENT '',
INDEX idx_message (`message`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true") COMMENT ''
) ENGINE=OLAP
DUPLICATE KEY(`log_time`)
COMMENT 'OLAP'
PARTITION BY RANGE(`log_time`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-7",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.create_history_partition" = "true",
"compaction_policy" = "time_series"
);
3. Configuration
The configuration file of Fluent Bit log collection is as follows, doris_log.conf is used to define various parts of ETL components, and parsers.conf is used to define different log parsers.
doris_log.conf:
# config for Fluent Bit service
[SERVICE]
log_level info
# parsers file
parsers_file parsers.conf
# use input tail
[INPUT]
name tail
path /path/to/your/log
# add log file name to the record, key is 'path'
path_key path
# set multiline parser
multiline.parser multiline_java
# parse log
[FILTER]
match *
name parser
key_name log
parser fe_log
reserve_data true
# add host info
[FILTER]
name sysinfo
match *
# add hostname to the record, key is 'host'
hostname_key host
# output to doris
[OUTPUT]
name doris
match *
host fehost
port feport
user your_username
password your_password
database your_db
table your_table
# add 'collect_time' to the record
time_key collect_time
# 'collect_time' is timestamp, change it to datatime
header columns collect_time=from_unixtime(collect_time)
log_request true
log_progress_interval 10
parsers.conf:
[MULTILINE_PARSER]
name multiline_java
type regex
flush_timeout 1000
# Regex rules for multiline parsing
# ---------------------------------
#
# configuration hints:
#
# - first state always has the name: start_state
# - every field in the rule must be inside double quotes
#
# rules | state name | regex pattern | next state name
# --------|----------------|---------------|-----------------
rule "start_state" "/(^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2})(.*)/" "cont"
rule "cont" "/(^(?![0-9]{4}-[0-9]{2}-[0-9]{2}))(.*)/" "cont"
[PARSER]
name fe_log
format regex
# parse and add 'log_time', 'level', 'thread', 'position', 'message' to the record
regex ^(?<log_time>[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (?<level>[^ ]+) \((?<thread>[^\)]+)\) \[(?<position>[^\]]+)\] (?<message>(\n|.)*)\n$
4. Running Fluent Bit
fluent-bit -c doris_log.conf
# log stream load response
[2024/10/31 18:39:55] [ info] [output:doris:doris.1] 127.0.0.1:8040, HTTP status=200
{
"TxnId": 32155,
"Label": "fluentbit_1730371195_91cca1aa-c15f-45d2-b503-fe7d2e839c2a",
"Comment": "",
"TwoPhaseCommit": "false",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 1,
"NumberLoadedRows": 1,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 836,
"LoadTimeMs": 298,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 3,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 268,
"CommitAndPublishTimeMs": 25
}
# log speed info
[2024/10/31 18:40:13] [ info] [output:doris:doris.1] total 0 MB 2 ROWS, total speed 0 MB/s 0 R/s, last 10 seconds speed 0 MB/s 0 R/s
JSON Log Collection Example
This example demonstrates JSON log collection using data from the GitHub events archive.
1. Data
The GitHub events archive contains archived data of GitHub user actions, formatted as JSON. It can be downloaded from here, for example, the data for January 1, 2024, at 3 PM.
wget https://data.gharchive.org/2024-01-01-15.json.gz
Below is a sample of the data. Normally, each piece of data is on a single line, but for ease of display, it has been formatted here.
{
"id": "37066529221",
"type": "PushEvent",
"actor": {
"id": 46139131,
"login": "Bard89",
"display_login": "Bard89",
"gravatar_id": "",
"url": "https://api.github.com/users/Bard89",
"avatar_url": "https://avatars.githubusercontent.com/u/46139131?"
},
"repo": {
"id": 780125623,
"name": "Bard89/talk-to-me",
"url": "https://api.github.com/repos/Bard89/talk-to-me"
},
"payload": {
"repository_id": 780125623,
"push_id": 17799451992,
"size": 1,
"distinct_size": 1,
"ref": "refs/heads/add_mvcs",
"head": "f03baa2de66f88f5f1754ce3fa30972667f87e81",
"before": "85e6544ede4ae3f132fe2f5f1ce0ce35a3169d21"
},
"public": true,
"created_at": "2024-04-01T23:00:00Z"
}
2. Table Creation
CREATE DATABASE log_db;
USE log_db;
CREATE TABLE github_events
(
`created_at` DATETIME,
`id` BIGINT,
`type` TEXT,
`public` BOOLEAN,
`actor` VARIANT,
`repo` VARIANT,
`payload` TEXT,
INDEX `idx_id` (`id`) USING INVERTED,
INDEX `idx_type` (`type`) USING INVERTED,
INDEX `idx_actor` (`actor`) USING INVERTED,
INDEX `idx_host` (`repo`) USING INVERTED,
INDEX `idx_payload` (`payload`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true")
)
ENGINE = OLAP
DUPLICATE KEY(`created_at`)
PARTITION BY RANGE(`created_at`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"compaction_policy" = "time_series",
"enable_single_replica_compaction" = "true",
"dynamic_partition.enable" = "true",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.replication_num" = "1"
);
3. Configuration
In contrast to the previous TEXT log collection, this configuration does not use FILTER because no additional processing transformations are required.
github_events.conf:
[SERVICE]
log_level info
parsers_file github_parsers.conf
[INPUT]
name tail
parser github
path /path/to/your/log
[OUTPUT]
name doris
match *
host fehost
port feport
user your_username
password your_password
database your_db
table your_table
time_key false
log_request true
log_progress_interval 10
github_parsers.conf:
[PARSER]
name github
format json
4. Running Fluent Bit
fluent-bit -c github_events.conf