实时数据推送
KaiwuDB 提供开箱即用的实时数据推送功能,旨在获取数据库中关键业务的数据变化信息,将这些信息包装为消息对象推送到 Kafka 主题(Kafka Topic)中,方便下游业务获取和消费。开启实时数据推送服务后,KaiwuDB 自动捕获用户通过 INSERT
语句插入的新数据,基于灵活过滤规则筛选数据,并将符合条件的数据以 JSON 格式推送至 Kafka 主题。
工作原理
KaiwuDB 支持使用 SQL 语句创建数据推送任务,将增量时序数据实时同步到 Kafka 主题中。用户使用 Kafka 客户端读取 Kafka 主题中的数据,然后写入到 Kafka 第三方消息组件或者目标数据库。
使用限制
- KaiwuDB 不支持推送时序表的历史数据。
- KaiwuDB 不支持推送多个时序表、时序库或关系表。
- KaiwuDB 不支持推送
DELETE
、UPDATE
、DDL
语句产生的数据。 - KaiwuDB 不支持普通 Tag 列、聚合函数、类型转换、排序规则、系统列引用或非不可变内置函数的过滤条件,不支持使用
BETWEEN ... AND
进行范围过滤。 - KaiwuDB 不支持实时数据推送时应用数据去重规则。
功能特性
数据推送对象
- KaiwuDB 支持推送单个时序表,其数据来源为用户通过
INSERT
语句插入的数据,不包含时序表中已存在的历史数据。 - 支持使用
WHERE
子句创建带有过滤条件的数据推送任务。 - 支持使用
WITH
关键字指定包括推送目标在内的数据推送参数。
- KaiwuDB 支持推送单个时序表,其数据来源为用户通过
数据过滤规则:支持对实时数据推送中的数据进行行级简单条件过滤,包括:
- Primary Tag 单列:支持等于(
=
)运算,以及最多 4 列的 AND 和 OR 运算。 - 数据列:支持小于(
<
)、小于等于(<=
)、大于(>
)、大于等于(>=
)、不等于(!=
) 和等于(=
)运算、以及IS NULL
和IS NOT NULL
。 - 时间戳列:支持基于时序表时间戳列的条件过滤。
- 支持 Primary Tag 列及数据列之间的 AND 和 OR 运算。
- Primary Tag 单列:支持等于(
数据推送配置
KaiwuDB 采用异步方式将过滤后的时序数据以 JSON 的格式作为消息推送到 Kafka 主题中。如果用户指定的推送目标不合法,则系统在尝试推送后以错误消息形式提示用户。Kafka 连接示例如下:
kafka://{kafka_cluster_address}:{port}?topic_name={topic_name}&tls_enabled=true&ca_cert={ca_cert}&sasl_enabled=true&sasl_user={sasl_user}&sasl_password={sasl_password}
kafka_cluster_address
:Kafka 集群的 IP 地址。port
:端口号,默认为9093
。topic_name
:数据推送的目标 Kafka 主题的名称。- Kafka 认证方式及其参数
tls_enabled
:若设置为true
,开启与 Kafka 的 TLS 连接。需与ca_cert
参数配合使用。ca_cert
: Base64 编码的 CA 证书,用于 Kafka 的 TLS 认证。说明:用户可以使用base64 -w 0 ca.cert
命令编码 CA 证书。client_cert
:Base64 编码的 PEM 证书,需与client_key
参数配合使用。client_key
:Base64 编码的 PEM 证书私钥,需与client_cert
参数配合使用。sasl_enabled
:若设置为true
,表示采用 SASL/PLAIN 认证协议。必须同时设置sasl_user
和sasl_password
参数。sasl_user
:SASL 用户名。sasl_password
:SASL 密码。
目前,KaiwuDB 仅支持推送一到多条 JSON 格式的数据。消息示例如下:
正常数据推送消息(消息类型为
insert
){ "change": [ { "kind": "insert", "database": "benchmark", "schema": "public", "table": "cpu", "columnnames": ["k_timestamp", "hostname", "max_usage_user" ], "columntypes": [ "integer", "text", "integer" ], "columnvalues": [ [ 173441827100000, "www.example1.com", 77], [ 173441827200000, "www.example1.com", 80], [ 173441827300000, "www.example1.com", 65] ] } ] }
重启补发数据消息(消息类型为
snapshot
){ "change": [ { "kind": "snapshot", "database": "benchmark", "schema": "public", "table": "cpu", "columnnames": ["k_timestamp", "hostname", "max_usage_user" ], "columntypes": [ "integer", "text", "integer" ], "columnvalues": [ [ 173441827100000, "www.example1.com", 77], [ 173441827200000, "www.example1.com", 80], [ 173441827300000, "www.example1.com", 65] ] } ] }
用户可以设置全局最大数据推送失败次数。当最大数据推送失败次数超过设置值后,停止向外推送数据。用户可以使用
SET CLUSTER SETTING ts.pipe.sink_max_retries
命令设置最大数据推送失败次数。目前,最大数据推送失败次数为 5。
断点续传
实时数据推送期间,KaiwuDB 使用时序表中已推送数据的最大数据时间(即时序表最低推送水位线)作为复制标识记录数据推送进度。如果用户停止数据推送服务,并在一段时间后重新启动服务,这期间可能有大量的数据入库。这些数据没有被推送到用户指定的目标端,称为“未推送数据”。这些数据和历史数据类似,不适合使用实时数据推送功能进行处理。在启动数据推送服务时,系统使用复制标识确认未推送数据的数量及范围。如果其数量没有超过内置阈值,则系统读取并推送这部分数据。如果“未推送数据“超过内置阈值,系统启动数据推送服务失败,并提示用户。用户可以通过修改过滤条件跳过部分数据的实时推送,并使用数据导入/导出等方式复制这部分数据。
说明
- 由于时序数据的数据时间和入库时间之间存在不确定且不一致的间隔,断点续传可能会导致重复推送部分数据。
- 仅支持集群环境的断点续传,不支持分布式集群高可用。
数据推送任务管理
- 支持启动和停止数据推送服务。
- 支持重新启动数据推送服务时,从中断位置续传可能存在的“未推送数据”。
- 支持查看目前正在运行的数据推送任务,包含任务名称、推送内容、任务的创建时间、创建者、任务当前状态、开始执行时间、结束时间、失败执行时间、失败原因。
- 支持删除指定的数据推送任务(若当前批次的推送正在执行,则完成本次推送后再停止或删除数据推送任务)。 有关数据推送任务管理的详细信息,参见数据推送管道。