文档下载建议反馈入口

  • 创建数据推送管道
  • 查看数据推送管道
  • 修改数据推送管道
  • 删除数据推送管道

数据推送管道

创建数据推送管道

CREATE PIPE 语句用于创建数据推送管道。

所需权限

  • 非三权分立模式下,用户拥有目标表的 SELECT 权限。
  • 三权分立模式下,普通用户拥有目标表的 SELECT 权限。

语法格式

参数说明

参数说明
nameKaiwuDB 数据推送管道的名称。该名称系统内必须唯一。
ts_table_name时序表名称。用户可以使用 ALTER PIPE <name> SET TABLE 语句修改待监听的时序表。
ts_column待监听的时序表列。支持监听目标时序表的全部列或部分列。
- *:按照目标时序表结构监听全部列。
- 创建数据推送管道后,用户无法从目标时序表中删除已经选择推送的列或改变其类型,否则数据推送服务会报错退出。
- 用户可以添加新列,但新列不会出现在推送数据的列表中。
ts_column_condition可选参数,时序数据过滤条件。用户可以使用 ALTER PIPE <name> SET TABLE 语句修改时序数据的过滤条件。
- 支持 Primary Tag 单列的等于(=)运算,以及最多 4 列的 AND 和 OR 运算。
- 支持数据列的 <<=>=!== 运算,支持 IS NULLIS NOT NULL 取值。
- 支持 Primary Tag 列及数据列之间的 AND 和 OR 运算。
- 支持基于时序表时间戳列的条件过滤。
- 不支持普通 Tag 列的过滤条件。
- 不支持聚合函数、类型转换、排序规则、系统列引用或非不可变内置函数。
- 不支持使用 BETWEEN ... AND 进行范围过滤。
push_parameter数据推送参数。用户可以使用 ALTER PIPE <name> SET OPTIONS 语句修改数据推送管道的推送参数。支持以下参数:
- enable:是否启用实时数据推送服务。支持 onoff 两个取值。on 表示启用实时数据推送服务。off 表示停止实时数据推送。默认情况下,启用实时数据推送服务。
- sink:数据推送的目标。目前,KaiwuDB 仅支持推送数据到 Kafka 主题。如果用户未指定合法的推送目标,系统报错。
- message_format:数据推送的数据格式。目前,KaiwuDB 只支持推送 JSON 格式的数据。KaiwuDB 支持推送一条或多条记录,其中正常数据推送的消息类型为 insert,重启补发数据推送的消息类型为 snapshot
value数据推送参数的取值。

语法示例

以下示例创建一个名为 ts_table_a_pipe_a 数据推送管道,按照指定条件筛选 ts_table_a 时序表的数据并将其实时推送到 test_topic Kafka 主题中。

CREATE PIPE ts_table_a_pipe_a FOR TABLE ts_table_a(*) WHERE hostname='www.example1.com' AND max_usage_user > 60 WITH OPTIONS( enable = 'on', sink = 'kafka://localhost:9092?topic_name=test_topic');

查看数据推送管道

SHOW PIPESSHOW PIPE 语句用于查看所有或指定数据推送管道的信息,包括数据推送管道名称、目标时序表、创建参数、创建时间、创建者、当前状态、开始时间、结束时间、失败原因等重要信息。

所需权限

语法格式

参数说明

参数说明
nameKaiwuDB 数据推送管道的名称。

返回字段说明

字段说明
name数据推送管道的名称。该名称系统内必须唯一。
table_name数据推送服务的目标时序表的名称。
column_names数据推送服务的目标列。
filter数据推送服务的过滤条件。
sink数据推送服务的推送目标。
low_watermark数据推送服务的复制标识。
status是否启用实时数据推送服务。
create_at数据推送管道的创建时间。
start_time最后一次开始推送数据的时间。如未开始,则为空。
create_by数据推送管道的创建者。
end_time最后一次停止推送数据的时间。默认为空。
error_message数据推送失败的原因。

语法示例

以下示例查看 ts_table_a_pipe_a 数据推送管道的详细信息。

SHOW PIPE ts_table_a_pipe_a;

执行成功后,控制台输出以下信息:

     name    |        table_name        |     column_names     |  filter   |                       sink                        |    low_watermark     | status |            create_at             | create_by |        start_time         | end_time | error_message
-------------+--------------------------+----------------------+-----------+---------------------------------------------------+----------------------+--------+----------------------------------+-----------+---------------------------+----------+----------------
  ts_table_a_pipe_a | test_data_pipe.public.ts_table_a | k_timestamp,id,code1 | code1 = 3 | kafka://localhost:9092?topic_name=test_topic2 | 1717632000000 | Enable | 2025-02-17 01:41:42.135389+00:00 | root      | 2025-02-17 01:41:42+00:00 | NULL     |
(1 row)

修改数据推送管道

ALTER PIPE 语句用于修改数据推送管道,包括 ALTER PIPE SET TABLEALTER PIPE SET OPTIONS 两个子命令:

  • ALTER PIPE SET TABLE:用于修改数据推送管道的目标时序表、输出列、过滤条件。
  • ALTER PIPE SET OPTIONS:用于修改数据推送管道的推送参数和推送目标,以及开始和停止数据推送服务。

说明

ALTER PIPE SET TABLEALTER PIPE SET OPTIONS 子命令需要单独使用。KaiwuDB 不支持同时修改数据推送管道的目标时序表和推送参数。

所需权限

  • 非三权分立模式下,用户拥有目标表的 SELECT 权限。
  • 三权分立模式下,普通用户拥有目标表的 SELECT 权限。

语法格式

  • 修改数据推送管道的目标时序表、输出列或时序数据的过滤条件

  • 修改数据推送管道的推送参数

参数说明

参数说明
nameKaiwuDB 数据推送管道的名称。
ts_table_name时序表名称。
ts_column待监听的时序表列。支持监听目标时序表的全部列或部分列。
- *:按照目标时序表结构监听全部列。
- 创建数据推送管道后,用户无法从目标时序表中删除已经选择推送的列或改变其类型,否则数据推送服务会报错退出。
- 用户可以添加新列,但新列不会出现在推送数据的列表中。
ts_column_condition可选参数,时序数据过滤条件。
- 支持 Primary Tag 单列的等于(=)运算,以及最多 4 列的 AND 和 OR 运算。
- 支持数据列的 <<=>=!== 运算,支持 IS NULLIS NOT NULL 取值。
- 支持 Primary Tag 列及数据列之间的 AND 和 OR 运算。
- 支持基于时序表时间戳列的条件过滤。
- 不支持普通 Tag 列的过滤条件。
- 不支持聚合函数、类型转换、排序规则、系统列引用或非不可变内置函数。
- 不支持使用 BETWEEN ... AND 进行范围过滤。
push_parameter数据推送参数。支持以下参数:
- enable:是否启用实时数据推送服务。支持 onoff 两个取值。on 表示启用实时数据推送服务。off 表示停止实时数据推送。默认情况下,启用实时数据推送服务。
- sink:数据推送的目标。目前,KaiwuDB 仅支持推送数据到 Kafka 主题。如果用户未指定合法的推送目标,系统报错。
- message_format:数据推送的数据格式。目前,KaiwuDB 只支持推送 JSON 格式的数据。KaiwuDB 支持推送一条或多条记录,其中正常数据推送的消息类型为 insert,重启补发数据推送的消息类型为 snapshot
说明
- 目前,KaiwuDB 只支持在线修改 enable 参数。如需修改其它参数,用户需要先停止实时数据推送服务,然后再修改相关参数。
- 启动处于停止状态的数据推送服务时,需要根据复制标识确定“未复制”数据的范围。如果“未复制”数据的数据量未超过内置阈值,则系统推送这部分数据。如果“未复制”数据超过内置阈值,系统启动数据推送服务失败,并提示用户。用户可以通过修改过滤条件跳过部分数据的实时推送,并使用数据导入、导出等方式复制这部分数据。
value数据推送参数的取值。

语法示例

  • 修改数据推送管道的目标时序表

    以下示例将修改数据推送管道的过滤条件,将 ts_table_a 时序表的 max_usage_user 参数的取值设置为 > 50

    ALTER PIPE ts_table_a_pipe_a SET TABLE ts_table_a(*) WHERE hostname='www.example1.com' AND max_usage_user > 50;
    
  • 修改数据推送管道的推送参数

    以下示例修改数据推送管道的推送参数,停止 ts_table_a_pipe 数据推送管道实时数据推送。

    -- 停止 PIPE
    ALTER PIPE ts_table_a_pipe_a SET OPTIONS (enable = 'off');
    

    用户也可以直接使用 ALTER PIPE <name> SET <push_parameter> = '<value>' 的格式修改数据推送管道的推送参数。

    -- 停止 PIPE
    ALTER PIPE ts_table_a_pipe_a SET enable = 'off';
    

删除数据推送管道

DROP PIPE 语句用于删除指定的数据推送管道。若系统正在推送数据数据,当前推送正常完成后才能删除指定的数据推送管道。

所需权限

  • 非三权分立模式下,用户拥有目标表的 SELECT 权限。
  • 三权分立模式下,普通用户拥有目标表的 SELECT 权限。

语法格式

参数说明

参数说明
IF EXISTS可选关键字。当使用 IF EXISTS 关键字时,如果目标数据推送管道存在,系统删除目标数据推送管道。如果目标数据推送管道不存在,系统删除目标数据推送管道失败,但不会报错。当未使用 IF EXISTS 关键字时,如果目标数据推送管道存在,系统删除目标数据推送管道。如果目标数据推送管道不存在,系统报错,提示目标数据推送管道不存在。
nameKaiwuDB 数据推送管道的名称。

语法示例

以下示例删除 ts_table_a_pipe_a 数据推送管道。

DROP PIPE ts_table_a_pipe_a;