文档下载建议反馈入口

  • 前提条件
  • 配置数据库
  • 配置连接
  • 配置示例

使用 KaiwuDB JDBC 扩展接口优化批量数据写入

KaiwuDB JDBC 是 KaiwuDB 的官方 Java 语言连接器,基于 PgJDBC 扩展实现,符合 JDBC 4.0、JDBC 4.1 和 JDBC 4.2 规范。Java 开发人员可以使用 KaiwuDB JDBC 驱动程序连接 KaiwuDB 的服务进程,进行数据增删改查操作。

KaiwuDB JDBC 提供了传统的批量执行 SQL 接口,用户可以通过手动拼接 SQL 实现批量数据写入,同时提供了 addBatchInsertexecuteBatchInsertclearBatchInsert 接口,能够将同一张时序表的多次数据写入合并到一条 SQL 语句,降低 CPU 占用,提升写入性能。

使用批量接口写入数据时,如果待写入的值与列的数据类型不符或者待写入的字段不存在,KaiwuDB JDBC 会返回成功写入条数、写入失败条数,并将具体错误信息记录到日志中。本文提供了使用 KaiwuDB JDBC 批量接口写入数据的最佳实践。

说明

目前,批量写入功能只适用于 KaiwuDB 单机版本。

有关 KaiwuDB JDBC 的数据库连接方式、基础使用、数据类型和异常处理,参见使用 JDBC 连接 KaiwuDB 数据库;更多错误码信息,参见 KaiwuDB JDBC Driver 错误码;更多故障排查信息,参见 KaiwuDB JDBC 故障排查

前提条件

配置数据库

默认情况下,数据库开启时序写入短接功能。开启后,系统直接将数据写入存储,减少中间处理环节。此外,用户可以设置允许写入时跳过错误数据,正常写入其他数据,从而提高写入性能。

SET SESSION ts_ignore_batcherror=true;

配置连接

  1. pom.xml 中添加依赖,将 KaiwuDB JDBC 引入 Java 项目:

    <dependency>
      <groupId>com.kaiwudb</groupId>
      <artifactId>kaiwudb-jdbc</artifactId>
      <version>2.2.0</version>
    </dependency>
    
  2. 如果 KaiwuDB JDBC 无法正常加载使用,执行以下命令,将驱动安装到本地 Maven 仓库中:

    mvn install:install-file "-Dfile=../kaiwudb-jdbc-2.2.0.jar" "-DgroupId=com.kaiwudb" "-DartifactId=kaiwudb-jdbc" "-Dversion=2.2.0" "-Dpackaging=jar"
    

配置示例

以下示例演示了如何使用 KaiwuDB JDBC 将不同设备的数据批量写入到时序表。

  1. 创建时序表。

    以下示例创建 tbl_raw_1tbl_raw_10 多个时序表用于批量插入不同设备的数据。每个表的数据结构相同。

    CREATE TABLE tsdb.tbl_raw_1 (ts TIMESTAMPTZ NOT NULL, data FLOAT8 NULL, type CHAR(10) NULL, parse VARCHAR NULL) TAGS (device CHAR(10) NOT NULL, iot_hub_name VARCHAR(64) NOT NULL) PRIMARY TAGS (device, iot_hub_name);
    
    CREATE TABLE tsdb.tbl_raw_2 (ts TIMESTAMPTZ NOT NULL, data FLOAT8 NULL, type CHAR(10) NULL, parse VARCHAR NULL) TAGS (device CHAR(10) NOT NULL, iot_hub_name VARCHAR(64) NOT NULL) PRIMARY TAGS (device, iot_hub_name);
    
    CREATE TABLE tsdb.tbl_raw_3 (ts TIMESTAMPTZ NOT NULL, data FLOAT8 NULL, type CHAR(10) NULL, parse VARCHAR NULL) TAGS (device CHAR(10) NOT NULL, iot_hub_name VARCHAR(64) NOT NULL) PRIMARY TAGS (device, iot_hub_name);
    
    CREATE TABLE tsdb.tbl_raw_4 (ts TIMESTAMPTZ NOT NULL, data FLOAT8 NULL, type CHAR(10) NULL, parse VARCHAR NULL) TAGS (device CHAR(10) NOT NULL, iot_hub_name VARCHAR(64) NOT NULL) PRIMARY TAGS (device, iot_hub_name);
    
    CREATE TABLE tsdb.tbl_raw_5 (ts TIMESTAMPTZ NOT NULL, data FLOAT8 NULL, type CHAR(10) NULL, parse VARCHAR NULL) TAGS (device CHAR(10) NOT NULL, iot_hub_name VARCHAR(64) NOT NULL) PRIMARY TAGS (device, iot_hub_name);
    
    CREATE TABLE tsdb.tbl_raw_6 (ts TIMESTAMPTZ NOT NULL, data FLOAT8 NULL, type CHAR(10) NULL, parse VARCHAR NULL) TAGS (device CHAR(10) NOT NULL, iot_hub_name VARCHAR(64) NOT NULL) PRIMARY TAGS (device, iot_hub_name);
    
    CREATE TABLE tsdb.tbl_raw_7 (ts TIMESTAMPTZ NOT NULL, data FLOAT8 NULL, type CHAR(10) NULL, parse VARCHAR NULL) TAGS (device CHAR(10) NOT NULL, iot_hub_name VARCHAR(64) NOT NULL) PRIMARY TAGS (device, iot_hub_name);
    
    CREATE TABLE tsdb.tbl_raw_8 (ts TIMESTAMPTZ NOT NULL, data FLOAT8 NULL, type CHAR(10) NULL, parse VARCHAR NULL) TAGS (device CHAR(10) NOT NULL, iot_hub_name VARCHAR(64) NOT NULL) PRIMARY TAGS (device, iot_hub_name);
    
    CREATE TABLE tsdb.tbl_raw_9 (ts TIMESTAMPTZ NOT NULL, data FLOAT8 NULL, type CHAR(10) NULL, parse VARCHAR NULL) TAGS (device CHAR(10) NOT NULL, iot_hub_name VARCHAR(64) NOT NULL) PRIMARY TAGS (device, iot_hub_name);
    
    CREATE TABLE tsdb.tbl_raw_10 (ts TIMESTAMPTZ NOT NULL, data FLOAT8 NULL, type CHAR(10) NULL, parse VARCHAR NULL) TAGS (device CHAR(10) NOT NULL, iot_hub_name VARCHAR(64) NOT NULL) PRIMARY TAGS (device, iot_hub_name);
    
  2. 向时序表中批量写入数据。

    public class BatchInsertTest {
    
    public static void main(String[] args) {
       String url = "jdbc:kaiwudb://127.0.0.1:26257/tsdb?preferQueryMode=simple";
       String user = "<user_name>";
       String password = "<password>";
    
       try (Connection connection = DriverManager.getConnection(url, user, password)) {
          KwStatement statement = (KwStatement) connection.createStatement();
          long timestamp = 1731373200000L; // 2024-11-12 09:00:00.000 初始时间戳
          for (int i = 0; i < 1000; i++) {
          // 循环1000次,每次写入1000行数据,共计100万行数据;每次循环插入20个设备,每个设备50行的数据
          for (int row = 1; row <= 50; row++) {
          int index = (row - 1) % 10 + 1;
             long finalTime = timestamp + (row * 1000L) + (i * 50 * 1000L);
             for (int num = 1; num <= 20; num++) {
                String device = "device_" + num;
                String iot = "iot_" + num;
                statement.addBatchInsert(finalTime, ("tbl_raw_" + index),
                new LinkedHashMap<String, Object>() {{
                   put("ts", finalTime);
                   put("data", ThreadLocalRandom.current().nextDouble());
                   put("type", "t_001");
                   put("parse", UUID.randomUUID() + "'123");
                }},
                new LinkedHashMap<String, Object>() {{
                   put("device", device);
                   put("iot_hub_name", iot);
                }});
             }
          }
    
          // execute batch insert sql data
          statement.executeBatchInsert();
    
          // clear batch insert temp data
          statement.clearBatchInsert();
          }
          // close statement
          statement.colse();
       } catch (SQLException ex) {
          ex.printStackTrace();
       }
    }
    }