Skip to main content
Skip to main content

Aggregate Key Model

The following are practical examples to illustrate what an aggregated model is and how to use it correctly.

Importing Data Aggregation

Assume that the business has the following data table schema:

ColumnNameTypeAggregationTypeComment
useridLARGEINTuser id
dateDATEdate of data filling
CityVARCHAR (20)User City
ageSMALLINTUser age
sexTINYINTUser gender
Last_visit_dateDATETIMEREPLACELast user access time
CostBIGINTSUMTotal User Consumption
max dwell timeINTMAXMaximum user residence time
min dwell timeINTMINUser minimum residence time

The corresponding to CREATE TABLE statement would be as follows (omitting the Partition and Distribution information):

CREATE DATABASE IF NOT EXISTS example_db;

CREATE TABLE IF NOT EXISTS example_db.example_tbl_agg1
(
`user_id` LARGEINT NOT NULL COMMENT "user id",
`date` DATE NOT NULL COMMENT "data import time",
`city` VARCHAR(20) COMMENT "city",
`age` SMALLINT COMMENT "age",
`sex` TINYINT COMMENT "gender",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "last visit date time",
`cost` BIGINT SUM DEFAULT "0" COMMENT "user total cost",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "user max dwell time",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "user min dwell time"
)
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

As you can see, this is a typical fact table of user information and visit behaviors. In star models, user information and visit behaviors are usually stored in dimension tables and fact tables, respectively. Here, for the convenience of explanation, we store the two types of information in one single table.

The columns in the table are divided into Key (dimension) columns and Value (indicator columns) based on whether they are set with an AggregationType. Key columns are not set with an AggregationType, such as user_id, date, and age, while Value columns are.

When data are imported, rows with the same contents in the Key columns will be aggregated into one row, and their values in the Value columns will be aggregated as their AggregationType specify. Currently, there are several aggregation methods and "agg_state" options available:

  • SUM: Accumulate the values in multiple rows.
  • REPLACE: The newly imported value will replace the previous value.
  • MAX: Keep the maximum value.
  • MIN: Keep the minimum value.
  • REPLACE_IF_NOT_NULL: Non-null value replacement. Unlike REPLACE, it does not replace null values.
  • HLL_UNION: Aggregation method for columns of HLL type, using the HyperLogLog algorithm for aggregation.
  • BITMAP_UNION: Aggregation method for columns of BITMAP type, performing a union aggregation of bitmaps.
tip

If these aggregation methods cannot meet the requirements, you can choose to use the "agg_state" type.

Suppose that you have the following import data (raw data):

user_iddatecityagesexlast_visit_datecostmax_dwell_timemin_dwell_time
100002017-10-01Beijing2002017-10-01 06:00201010
100002017-10-01Beijing2002017-10-01 07:001522
100012017-10-01Beijing3012017-10-01 17:05:4522222
100022017-10-02Shanghai2012017-10-02 12:59:1220055
100032017-10-02Guangzhou3202017-10-02 11:20:00301111
100042017-10-01Shenzhen3502017-10-01 10:00:1510033
100042017-10-03Shenzhen3502017-10-03 10:20:221166

And you can import data with the following sql:

insert into example_db.example_tbl_agg1 values
(10000,"2017-10-01","Beijing",20,0,"2017-10-01 06:00:00",20,10,10),
(10000,"2017-10-01","Beijing",20,0,"2017-10-01 07:00:00",15,2,2),
(10001,"2017-10-01","Beijing",30,1,"2017-10-01 17:05:45",2,22,22),
(10002,"2017-10-02","Shanghai",20,1,"2017-10-02 12:59:12",200,5,5),
(10003,"2017-10-02","Guangzhou",32,0,"2017-10-02 11:20:00",30,11,11),
(10004,"2017-10-01","Shenzhen",35,0,"2017-10-01 10:00:15",100,3,3),
(10004,"2017-10-03","Shenzhen",35,0,"2017-10-03 10:20:22",11,6,6);

This is a table recording the user behaviors when visiting a certain commodity page. The first row of data, for example, is explained as follows:

DataDescription
10000User id, each user uniquely identifies id
2017-10-01Data storage time, accurate to date
BeijingUser City
20User Age
0Gender male (1 for female)
2017-10-01 06:00User's time to visit this page, accurate to seconds
20Consumption generated by the user's current visit
10User's visit, time to stay on the page
10User's current visit, time spent on the page (redundancy)

After this batch of data is imported into Doris correctly, it will be stored in Doris as follows:

user_iddatecityagesexlast_visit_datecostmax_dwell_timemin_dwell_time
100002017-10-01Beijing2002017-10-01 07:0035102
100012017-10-01Beijing3012017-10-01 17:05:4522222
100022017-10-02Shanghai2012017-10-02 12:59:1220055
100032017-10-02Guangzhou3202017-10-02 11:20:00301111
100042017-10-01Shenzhen3502017-10-01 10:00:1510033
100042017-10-03Shenzhen3502017-10-03 10:20:221166

The data of User 10000 have been aggregated to one row, while those of other users remain the same. The explanation for the aggregated data of User 10000 is as follows (the first 5 columns remain unchanged, so it starts with Column 6 last_visit_date):

  • The value in the 6th column is 2017-10-01 07:00: The last_visit_date column is aggregated by REPLACE, so 2017-10-01 07:00 has replaced 2017-10-01 06:00.
tip

When using REPLACE to aggregate data from the same import batch, the order of replacement is uncertain. That means, in this case, the data eventually saved in Doris could be 2017-10-01 06:00. However, for different import batches, it is certain that data from the new batch will replace those from the old batch.

  • The value in the 7th column is 35: The costcolumn is aggregated by SUM, so the update value 35 is the result of 20 + 15.
  • The value in the 8th column is 10: The max_dwell_time column is aggregated by MAX, so 10 is saved as it is the maximum between 10 and 2.
  • The value in the 9th column is 2: The min_dwell_time column is aggregated by MIN, so 2 is saved as it is the minimum between 10 and 2.

After aggregation, Doris only stores the aggregated data. In other words, the detailed raw data will no longer be available.

Import data and aggregate with existing data.

Assuming that the table already contains the previously imported data:

user_iddatecityagesexlast_visit_datecostmax_dwell_timemin_dwell_time
100002017-10-01Beijing2002017-10-01 07:0035102
100012017-10-01Beijing3012017-10-01 17:05:4522222
100022017-10-02Shanghai2012017-10-02 12:59:1220055
100032017-10-02Guangzhou3202017-10-02 11:20:00301111
100042017-10-01Shenzhen3502017-10-01 10:00:1510033
100042017-10-03Shenzhen3502017-10-03 10:20:221166

Now import a new batch of data:

user_iddatecityagesexlast_visit_datecostmax_dwell_timemin_dwell_time
100042017-10-03Shenzhen3502017-10-03 11:22:00441919
100052017-10-03Changsha2912017-10-03 18:11:02311

With the following SQL:

insert into example_db.example_tbl_agg1 values
(10004,"2017-10-03","Shenzhen",35,0,"2017-10-03 11:22:00",44,19,19),
(10005,"2017-10-03","Changsha",29,1,"2017-10-03 18:11:02",3,1,1);

After importing, the data stored in Doris will be updated as follows:

user_iddatecityagesexlast_visit_datecostmax_dwell_timemin_dwell_time
100002017-10-01Beijing2002017-10-01 07:0035102
100012017-10-01Beijing3012017-10-01 17:05:4522222
100022017-10-02Shanghai2012017-10-02 12:59:1220055
100032017-10-02Guangzhou3202017-10-02 11:20:00301111
100042017-10-01Shenzhen3502017-10-01 10:00:1510033
100042017-10-03Shenzhen3502017-10-03 11:22:0055196
100052017-10-03Changsha2912017-10-03 18:11:02311

As you can see, the existing data and the newly imported data of User 10004 have been aggregated. Meanwhile, the new data of User 10005 have been added.

In Doris, data aggregation happens in the following 3 stages:

  1. The ETL stage of each batch of import data. At this stage, the batch of import data will be aggregated internally.
  2. The data compaction stage of the underlying BE. At this stage, BE will aggregate data from different batches that have been imported.
  3. The data query stage. The data involved in the query will be aggregated accordingly.

At different stages, data will be aggregated to varying degrees. For example, when a batch of data is just imported, it may not be aggregated with the existing data. But for users, they can only query aggregated data. That is, what users see are the aggregated data, and they should not assume that what they have seen are not or partly aggregated.

agg_state

tip

AGG_STATE cannot be used as a key column, and when creating a table, you need to declare the signature of the aggregation function. Users do not need to specify a length or default value. The actual storage size of the data depends on the function implementation.

CREATE TABLE

set enable_agg_state=true;
create table aggstate(
k1 int null,
k2 agg_state<sum(int)> generic,
k3 agg_state<group_concat(string)> generic
)
aggregate key (k1)
distributed BY hash(k1) buckets 3
properties("replication_num" = "1");

agg_state is used to declare the data type as agg_state, and sum/group_concat are the signatures of aggregation functions.

Please note that agg_state is a data type, similar to int, array, or string.

agg_state can only be used in conjunction with the state/merge/union function combinators.

agg_state represents an intermediate result of an aggregation function. For example, with the aggregation function sum, agg_state can represent the intermediate state of summing values like sum(1, 2, 3, 4, 5), rather than the final result.

The agg_state type needs to be generated using the state function. For the current table, it would be sum_state and group_concat_state for the "sum" and group_concat aggregation functions, respectively.

insert into aggstate values(1,sum_state(1),group_concat_state('a'));
insert into aggstate values(1,sum_state(2),group_concat_state('b'));
insert into aggstate values(1,sum_state(3),group_concat_state('c'));

At this point, the table contains only one row. Please note that the table below is for illustrative purposes and cannot be selected/displayed directly:

k1k2k3
1sum(1,2,3)group_concat_state(a,b,c)

Insert another record.

insert into aggstate values(2,sum_state(4),group_concat_state('d'));

The table's structure at this moment is...

k1k2k3
1sum(1,2,3)group_concat_state(a,b,c)
2sum(4)group_concat_state(d)

We can use the merge operation to combine multiple states and return the final result calculated by the aggregation function.

mysql> select sum_merge(k2) from aggstate;
+---------------+
| sum_merge(k2) |
+---------------+
| 10 |
+---------------+

sum_merge will first combine sum(1,2,3) and sum(4) into sum(1,2,3,4), and return the calculated result. Because group_concat has a specific order requirement, the result is not stable.

mysql> select group_concat_merge(k3) from aggstate;
+------------------------+
| group_concat_merge(k3) |
+------------------------+
| c,b,a,d |
+------------------------+

If you do not want the final aggregation result, you can use 'union' to combine multiple intermediate aggregation results and generate a new intermediate result.

insert into aggstate select 3,sum_union(k2),group_concat_union(k3) from aggstate ;

The table's structure at this moment is...

k1k2k3
1sum(1,2,3)group_concat_state(a,b,c)
2sum(4)group_concat_state(d)
3sum(1,2,3,4)group_concat_state(a,b,c,d)

You can achieve this through a query.

mysql> select sum_merge(k2) , group_concat_merge(k3)from aggstate;
+---------------+------------------------+
| sum_merge(k2) | group_concat_merge(k3) |
+---------------+------------------------+
| 20 | c,b,a,d,c,b,a,d |
+---------------+------------------------+

mysql> select sum_merge(k2) , group_concat_merge(k3)from aggstate where k1 != 2;
+---------------+------------------------+
| sum_merge(k2) | group_concat_merge(k3) |
+---------------+------------------------+
| 16 | c,b,a,d,c,b,a |
+---------------+------------------------+

Users can perform more detailed aggregation function operations using agg_state.

tip

agg_state comes with a certain performance overhead.