基于 ClickHouse 的数据统计平台设计简述
目录:
近年来大数据统计需求越来越多,基于此业务的数据库如雨后春笋般闪现,ClickHouse 就是此方面的重要明星,那么该如何设计一套符合业务的大数据统计系统?
本文以常见的站点用户访问统计系统为例,下文统一简写为统计系统。原始数据存储到 MongoDB
中,统计分析数据库使用 ClickHouse
,数据流转中间件为 Kafka
。
1. 为何选择 ClickHouse
统计系统需求:
- 针对大量字段的大表进行统计
- 部分字段类型为数组,因此最好支持数组,这样不需要拆分原有数据
- 针对时间等字段进行排序分组等,因此需要支持
groupBy
,orderBy
等聚合操作 - 需要统计结果具有实时性
- 不需要严格的事务支持
- 很少需要更新操作
- 易于开发
针对以上需求,考察当前流行的 OLAP 系统后发现 ClickHouse 完美契合当前需求。ClickHouse 是一个用于联机分析(OLAP)的列式数据库管理系统(DBMS)。它支持高速读取和写入大数据量,并且可以通过水平扩展轻松地扩展到多个节点上。ClickHouse 适用于各种类型的数据分析,包括实时和流数据分析,数据仓库等。OLAP场景的关键特征:
- 绝大多数是读请求
- 数据以相当大的批次插入
- 对于读取,从数据库中提取相当多的行,但只提取列的一小部分。
- 宽表,即每个表包含着大量的列
- 查询相对较少(通常每台服务器每秒查询数百次或更少)
- 对于简单查询,允许适当延迟
- 列中的数据长度相对较小
- 处理单个查询时需要高吞吐量(每台服务器每秒可达数十亿行)
- 事务不是必须的
- 对数据一致性要求低
- 每个查询有一个大表。除了他以外,其他的都很小。
- 查询结果明显小于源数据。换句话说,数据经过过滤或聚合,因此单个普通服务器也可以支持
很容易可以看出,OLAP 场景与其他通常业务场景(例如, OLTP 或 K/V )有很大的不同, 因此想要使用 OLTP 或 Key-Value 数据库去高效的处理分析查询场景,并不是非常完美的适用方案。例如,使用 OLAP 数据库去处理分析请求通常要优于使用 MongoDB 或 Redis 去处理分析请求。ClickHouse 的各项特点更为符合统计系统统计需求,因此采用 ClickHouse 数据库作为承载统计需求的底层数据库。
用户访问统计系统的业务需要实时或全量的方式把数导入到 ClickHouse 中。
2. 平台设计
2.1 技术栈
由于官方只提供了基本的 JDBC 驱动,目前还没有对应的 Spring Data 模块,为操作方便使用 Spring JdbcTemplate 获取 Connection/Statement
,减少直接操作 JDBC
。
统计指标查询使用 Querydsl,虽然 ClickHouse 提供了 JDBC 驱动,但是由于它独有的一些数据结构导致不兼容 ORM 框架,无论是 JPA 还是 Myatis,要想实现大量的查询需要编写大量的 SQL 语句,这样导致开发困难,而且bug率很增加,为了避免这些情况,决定使用 Querydsl。
Querydsl 是一个通用的查询框架,专注于通过 Java Api 构建类型安全的SQL查询。它可以通过一组通用的查询 Api 为用户构建出适合不同类型 ORM 框架或者是SQL 的查询语句,也就是说 Querydsl 是基于各种 ORM 框架以及 SQL 之上的一个通用的查询框架。借助 Querydsl 可以在任何支持的 ORM 框架或者 SQL 平台上以一种通用的 API 方式来构建查询。如果通过 Java Api 方式开发,可把大量查询操作封装,统一注入条件和参数等,这样即提高了开发效率,降低了 bug 率,而且提供了更好的封装性,不再受直接写 SQL 语句开发困难影响。
在本例中通过扩展 querydsl-sql
,针对数组,时间等数据类型进行自定义,支持 arrayJoin
、has
等函数,最大程度上提供开箱即用的 ClickHouse Java Api 开发。
查询用户访问页面 TOP 10 示例:
query
.select(QUserVisit.userVisit.path.countDistinct().as(COUNT))
.orderBy(COUNT_EXPRESSION.desc())
.limit(10)
.fetch();
2.2 业务实现
2.2.1 入库
统计系统数据处理流程:当收集到统计数据后发送数据到数据清洗消息队列(Kafka),数据经过清洗,标记等一系列流程后发送到数据归档消息队列,由入库程序负责把消息写入数据库(ClickHouse)。
分析上述流程,统计数据可以分为两类:
- Kafka 实时数据
- MongoDB 历史数据
这两种数据虽然来源不同,但是数据结构基本一致,都可以抽象为相同的数据结构,因此可以采用相同入库方式。数据层通过抽象,封装写操作保证接口一致,内部实现批量写入,这样不仅易于开发而且提高写入效率。
实时数据: 实时数据来源为 Kafka 消息队列,当客户端获取到消息后发送给 DataWriter
, DataWriter
负责把消息数据转换为 JSON 格式并通过 ClickHouseFormat.JSONEachRow
方式入库。插入数据时需要判断数据是否存在数据库中,如果存在则把该数据加入删除队列(如果使用 ReplacingMergeTree 者不需要删除操作)。
提高入库效率措施:
- 生产者批量发送和启用消息压缩以增加单批次获取的消息量
- 采用布隆过滤器来实现判断数据是否在数据库中,这样不仅内存消耗小,而且效率高
- 后台任务定时从删除队列获取数据并合并为单条 SQL 语句以执行删除操作
数据更新: 由于 ClickHouse 并不支持常规意义上的数据更新操作,而后通过后台线程来刷新合并旧数据,因此为了保证查询准确性,插入数据前先判断是否存在旧数据,如存在则删除旧数据然后插入新数据(不推荐删除操作,可使用 ReplacingMergeTree 等表引擎实现)。
插入数据前使用用户唯一标识和 “dataTime 小于当前时间(时间精确度至少为微秒)” 作为删除数据条件,插入该条件到删除任务队列。
注意: Java 提供的 SimpleDateFormat
日期格式化器只能序列化到毫秒,DateTimeFormatter
可以精确到纳秒。推荐使用 LocalDateTime
, JDK 版本高于8时精确到微秒。
全量数据: 导入全量数据时,清空数据库和布隆过滤器数据,清空删除任务队列,接下来的流程与实时保持一致。如果第一次做全量导入则全量数据的布隆过滤器创建成功,要是不做则说明是全新的系统则布隆过滤器从头开始,以上两种情况都下的布隆过滤器都存储完整数据,确保极低的数据重复率。
从数据库导入数据流程:
- 重置数据库
- 导入数据,具体实现可通过
Mongotemplat#stream
方式,合并多条数据为一个批次,其它流程和 Kafka 一致,注意记录相关日志信息以及导入进度
全量导入时,也可采用临时表方式,全量数据导入到临时表,实时数据写入切换到临时表,旧表不变,等导入完成后,旧表改名为临时表,新表改为原表名,删除临时表,注意重命名时不能做修改数据等操作。
写数据时以 JSON 格式写入,自定义 NestedList
数据类型以适配 Nested
类型,通过 NestedListSerializer
把 NestedList
数据转换为 ClickHouse 可识别的 JSON 结构,通过以下方式把序列化器注入到 Jackson 中。`
设计表时的注意点:
- 字段命名调整为小驼峰风格,便于转为 JSON 批量写入
- 每个表添加一个
dataTime
字段,表示数据插入时间,精确到微秒, 数据类型为datetime64
- 数组结构使用
Array
数据类型 - 数组內部元素为对象时使用
Nested
类型(特殊的数组)
数据入库使用DataWriter
, 该接口定义数据写入规范,DataRecord
封装数据和时间戳,批次号等数据,当抛出异常时,交由 ErrorReporter
处理,默认提供日志打印功能,用户可自定义该操作,无论是重试还是发送到其它消息队列。
DataWriter
定义:
public interface DataWriter<T> {
void write(DataRecord<T> record);
interface ErrorReporter {
Future<Void> report(DataRecord<?> record, Throwable error);
}
}
2.2.2 查询
当前统计有大量的统计指标,为了简化开发难度,增强扩展性,需要抽象统计维度,定义统计维度接口,每个维度实现类只需实现查询。数据处理层负责汇总各个维度数据,注入查询参数等。调用方查询时只需传入维度键和参数即可,如果想要扩展统计维度,只需实现查询业务即可,不需关心业务之外的编码,这样极大的降低了开发难度,提高了开发效率。
DataReader
接口定义数据写入规范,DataReaderKey
定义统计维度键,DataReaderKeys
定义同时查询多维度时的键信息,ItemDataReader
接口定义具体统计维度实现,应用需实现该类来执行具体的查询操作。QueryBuilder
提供通用查询条件构建,可以把查询条件注入到查询中,统计维度实现类不需要自己实现该条件。
相关接口定义:
public interface DataReader {
/**
* 读根据指标取结果
*/
<T> T read(DataReaderKey key, Object... condition);
/**
* 统计指标定义
*/
interface DataReaderKey {
String key();
}
/**
* 具体的指标接口定义,内部使用
*/
interface ItemDataReader<R> {
Map<String, R> read(DataReaderKey key, Object... condition);
DataReaderKey itemKey();
}
interface QueryBuilder<T> {
/**
* 构建查询条件
*/
T build(T query, CountMode countMode);
}
}
3. 参考资料
4. 常见问题
4.1 ClickHouse 安装配置
ClickHouse 安装参考文档:
安装相关命令:
deb
sudo apt-get install apt-transport-https ca-certificates dirmngr
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4
echo "deb https://repo.clickhouse.tech/deb/stable/ main/" | sudo tee \
/etc/apt/sources.list.d/clickhouse.list
sudo apt-get update
sudo apt-get install -y clickhouse-server clickhouse-client
sudo service clickhouse-server start
clickhouse-client
rpm
sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.tech/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.tech/rpm/stable/x86_64
sudo yum install clickhouse-server clickhouse-client
# Exception: Effective user of the process (root) does not match the owner of the data (clickhouse). Run under 'sudo -u clickhouse'
# 注意权限问题
sudo -u clickhouse clickhouse-server --config-file=/etc/clickhouse-server/config.xml
# 用户管理参考 https://clickhouse.com/docs/en/operations/settings/settings-users/
# 生成密码
PASSWORD=$(base64 < /dev/urandom | head -c8); echo "$PASSWORD"; echo -n "$PASSWORD" | sha256sum | tr -d '-'
tar
# 21.8.3.44
export LATEST_VERSION=`curl https://api.github.com/repos/ClickHouse/ClickHouse/tags 2>/dev/null | grep -Eo '[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+' | head -n 1`
curl -O https://repo.clickhouse.tech/tgz/stable/clickhouse-common-static-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.tech/tgz/stable/clickhouse-common-static-dbg-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.tech/tgz/stable/clickhouse-server-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.tech/tgz/stable/clickhouse-client-$LATEST_VERSION.tgz
export LATEST_VERSION=21.8.3.44
tar -xzvf clickhouse-common-static-$LATEST_VERSION.tgz
sudo clickhouse-common-static-$LATEST_VERSION/install/doinst.sh
tar -xzvf clickhouse-common-static-dbg-$LATEST_VERSION.tgz
sudo clickhouse-common-static-dbg-$LATEST_VERSION/install/doinst.sh
tar -xzvf clickhouse-server-$LATEST_VERSION.tgz
sudo clickhouse-server-$LATEST_VERSION/install/doinst.sh
sudo /etc/init.d/clickhouse-server start
tar -xzvf clickhouse-client-$LATEST_VERSION.tgz
sudo clickhouse-client-$LATEST_VERSION/install/doinst.sh
4.2 数据库的数据合并操作比插入慢
参考 DB::Exception: Too many parts (600). Merges are processing significantly slower than inserts, 控制单次插入数量为 10k-500k, 1-2s 插入一次,尽量合并小数据为大数据插入减少插入频率。
4.3 部分数据遗漏
MongoTemplate#stream
遍历的条件为插入时间,时间戳可能会重复,当排序字段的值大量相同且判断条件为大于时,可能导致有些数据被忽略,导致数据缺少,可通过增加一个自增批次号字段解决。
4.4 常用配置
编辑配置文件: vim /etc/clickhouse-server/config.xml
(或 /etc/clickhouse-server/config.d/listen.xml
)
允许非本机访问
<!-- Same for hosts without support for IPv6: -->
<listen_host>0.0.0.0</listen_host>
配置时区
<timezone>Asia/Shanghai</timezone>
4.5 数据初始化脚本
根据 Sql 文件初始化数据库(为密码安全 $2 可不写),推荐使用 Flyway 等数据库自动升级工具。
#!/usr/bin/env bash
# 把单表的多行建表语句处理为一行并执行
sql_file=$1
passwd=$2
echo "start write $sql_file sql file"
cat $sql_file | xargs | tr '\n' ' ' |
xargs | sed 's/;/;\n/g' | while read line; do
if [ "$line" != '' ]; then
clickhouse-client -q "$line" --password $passwd
fi
done