数据推送管道
创建数据推送管道
CREATE PIPE
语句用于创建数据推送管道。
所需权限
- 非三权分立模式下,用户拥有目标表的
SELECT
权限。 - 三权分立模式下,普通用户拥有目标表的
SELECT
权限。
语法格式
参数说明
参数 | 说明 |
---|---|
name | KaiwuDB 数据推送管道的名称。该名称系统内必须唯一。 |
ts_table_name | 时序表名称。用户可以使用 ALTER PIPE <name> SET TABLE 语句修改待监听的时序表。 |
ts_column | 待监听的时序表列。支持监听目标时序表的全部列或部分列。 - *:按照目标时序表结构监听全部列。 - 创建数据推送管道后,用户无法从目标时序表中删除已经选择推送的列或改变其类型,否则数据推送服务会报错退出。 - 用户可以添加新列,但新列不会出现在推送数据的列表中。 |
ts_column_condition | 可选参数,时序数据过滤条件。用户可以使用 ALTER PIPE <name> SET TABLE 语句修改时序数据的过滤条件。- 支持 Primary Tag 单列的等于( = )运算,以及最多 4 列的 AND 和 OR 运算。- 支持数据列的 < 、<= 、> 、>= 、!= 和 = 运算,支持 IS NULL 和 IS NOT NULL 取值。- 支持 Primary Tag 列及数据列之间的 AND 和 OR 运算。 - 支持基于时序表时间戳列的条件过滤。 - 不支持普通 Tag 列的过滤条件。 - 不支持聚合函数、类型转换、排序规则、系统列引用或非不可变内置函数。 - 不支持使用 BETWEEN ... AND 进行范围过滤。 |
push_parameter | 数据推送参数。用户可以使用 ALTER PIPE <name> SET OPTIONS 语句修改数据推送管道的推送参数。支持以下参数:- enable :是否启用实时数据推送服务。支持 on 和 off 两个取值。on 表示启用实时数据推送服务。off 表示停止实时数据推送。默认情况下,启用实时数据推送服务。- sink :数据推送的目标。目前,KaiwuDB 仅支持推送数据到 Kafka 主题。如果用户未指定合法的推送目标,系统报错。- message_format :数据推送的数据格式。目前,KaiwuDB 只支持推送 JSON 格式的数据。KaiwuDB 支持推送一条或多条记录,其中正常数据推送的消息类型为 insert ,重启补发数据推送的消息类型为 snapshot 。 |
value | 数据推送参数的取值。 |
语法示例
以下示例创建一个名为 ts_table_a_pipe_a
数据推送管道,按照指定条件筛选 ts_table_a
时序表的数据并将其实时推送到 test_topic
Kafka 主题中。
CREATE PIPE ts_table_a_pipe_a FOR TABLE ts_table_a(*) WHERE hostname='www.example1.com' AND max_usage_user > 60 WITH OPTIONS( enable = 'on', sink = 'kafka://localhost:9092?topic_name=test_topic');
查看数据推送管道
SHOW PIPES
或 SHOW PIPE
语句用于查看所有或指定数据推送管道的信息,包括数据推送管道名称、目标时序表、创建参数、创建时间、创建者、当前状态、开始时间、结束时间、失败原因等重要信息。
所需权限
无
语法格式
参数说明
参数 | 说明 |
---|---|
name | KaiwuDB 数据推送管道的名称。 |
返回字段说明
字段 | 说明 |
---|---|
name | 数据推送管道的名称。该名称系统内必须唯一。 |
table_name | 数据推送服务的目标时序表的名称。 |
column_names | 数据推送服务的目标列。 |
filter | 数据推送服务的过滤条件。 |
sink | 数据推送服务的推送目标。 |
low_watermark | 数据推送服务的复制标识。 |
status | 是否启用实时数据推送服务。 |
create_at | 数据推送管道的创建时间。 |
start_time | 最后一次开始推送数据的时间。如未开始,则为空。 |
create_by | 数据推送管道的创建者。 |
end_time | 最后一次停止推送数据的时间。默认为空。 |
error_message | 数据推送失败的原因。 |
语法示例
以下示例查看 ts_table_a_pipe_a
数据推送管道的详细信息。
SHOW PIPE ts_table_a_pipe_a;
执行成功后,控制台输出以下信息:
name | table_name | column_names | filter | sink | low_watermark | status | create_at | create_by | start_time | end_time | error_message
-------------+--------------------------+----------------------+-----------+---------------------------------------------------+----------------------+--------+----------------------------------+-----------+---------------------------+----------+----------------
ts_table_a_pipe_a | test_data_pipe.public.ts_table_a | k_timestamp,id,code1 | code1 = 3 | kafka://localhost:9092?topic_name=test_topic2 | 1717632000000 | Enable | 2025-02-17 01:41:42.135389+00:00 | root | 2025-02-17 01:41:42+00:00 | NULL |
(1 row)
修改数据推送管道
ALTER PIPE
语句用于修改数据推送管道,包括 ALTER PIPE SET TABLE
和 ALTER PIPE SET OPTIONS
两个子命令:
ALTER PIPE SET TABLE
:用于修改数据推送管道的目标时序表、输出列、过滤条件。ALTER PIPE SET OPTIONS
:用于修改数据推送管道的推送参数和推送目标,以及开始和停止数据推送服务。
说明
ALTER PIPE SET TABLE
和 ALTER PIPE SET OPTIONS
子命令需要单独使用。KaiwuDB 不支持同时修改数据推送管道的目标时序表和推送参数。
所需权限
- 非三权分立模式下,用户拥有目标表的
SELECT
权限。 - 三权分立模式下,普通用户拥有目标表的
SELECT
权限。
语法格式
修改数据推送管道的目标时序表、输出列或时序数据的过滤条件
修改数据推送管道的推送参数
参数说明
参数 | 说明 |
---|---|
name | KaiwuDB 数据推送管道的名称。 |
ts_table_name | 时序表名称。 |
ts_column | 待监听的时序表列。支持监听目标时序表的全部列或部分列。 - *:按照目标时序表结构监听全部列。 - 创建数据推送管道后,用户无法从目标时序表中删除已经选择推送的列或改变其类型,否则数据推送服务会报错退出。 - 用户可以添加新列,但新列不会出现在推送数据的列表中。 |
ts_column_condition | 可选参数,时序数据过滤条件。 - 支持 Primary Tag 单列的等于( = )运算,以及最多 4 列的 AND 和 OR 运算。- 支持数据列的 < 、<= 、> 、>= 、!= 和 = 运算,支持 IS NULL 和 IS NOT NULL 取值。- 支持 Primary Tag 列及数据列之间的 AND 和 OR 运算。 - 支持基于时序表时间戳列的条件过滤。 - 不支持普通 Tag 列的过滤条件。 - 不支持聚合函数、类型转换、排序规则、系统列引用或非不可变内置函数。 - 不支持使用 BETWEEN ... AND 进行范围过滤。 |
push_parameter | 数据推送参数。支持以下参数: - enable :是否启用实时数据推送服务。支持 on 和 off 两个取值。on 表示启用实时数据推送服务。off 表示停止实时数据推送。默认情况下,启用实时数据推送服务。- sink :数据推送的目标。目前,KaiwuDB 仅支持推送数据到 Kafka 主题。如果用户未指定合法的推送目标,系统报错。- message_format :数据推送的数据格式。目前,KaiwuDB 只支持推送 JSON 格式的数据。KaiwuDB 支持推送一条或多条记录,其中正常数据推送的消息类型为 insert ,重启补发数据推送的消息类型为 snapshot 。 说明 - 目前,KaiwuDB 只支持在线修改 enable 参数。如需修改其它参数,用户需要先停止实时数据推送服务,然后再修改相关参数。- 启动处于停止状态的数据推送服务时,需要根据复制标识确定“未复制”数据的范围。如果“未复制”数据的数据量未超过内置阈值,则系统推送这部分数据。如果“未复制”数据超过内置阈值,系统启动数据推送服务失败,并提示用户。用户可以通过修改过滤条件跳过部分数据的实时推送,并使用数据导入、导出等方式复制这部分数据。 |
value | 数据推送参数的取值。 |
语法示例
修改数据推送管道的目标时序表
以下示例将修改数据推送管道的过滤条件,将
ts_table_a
时序表的max_usage_user
参数的取值设置为> 50
。ALTER PIPE ts_table_a_pipe_a SET TABLE ts_table_a(*) WHERE hostname='www.example1.com' AND max_usage_user > 50;
修改数据推送管道的推送参数
以下示例修改数据推送管道的推送参数,停止
ts_table_a_pipe
数据推送管道实时数据推送。-- 停止 PIPE ALTER PIPE ts_table_a_pipe_a SET OPTIONS (enable = 'off');
用户也可以直接使用
ALTER PIPE <name> SET <push_parameter> = '<value>'
的格式修改数据推送管道的推送参数。-- 停止 PIPE ALTER PIPE ts_table_a_pipe_a SET enable = 'off';
删除数据推送管道
DROP PIPE
语句用于删除指定的数据推送管道。若系统正在推送数据数据,当前推送正常完成后才能删除指定的数据推送管道。
所需权限
- 非三权分立模式下,用户拥有目标表的
SELECT
权限。 - 三权分立模式下,普通用户拥有目标表的
SELECT
权限。
语法格式
参数说明
参数 | 说明 |
---|---|
IF EXISTS | 可选关键字。当使用 IF EXISTS 关键字时,如果目标数据推送管道存在,系统删除目标数据推送管道。如果目标数据推送管道不存在,系统删除目标数据推送管道失败,但不会报错。当未使用 IF EXISTS 关键字时,如果目标数据推送管道存在,系统删除目标数据推送管道。如果目标数据推送管道不存在,系统报错,提示目标数据推送管道不存在。 |
name | KaiwuDB 数据推送管道的名称。 |
语法示例
以下示例删除 ts_table_a_pipe_a
数据推送管道。
DROP PIPE ts_table_a_pipe_a;