文章链接:【KWDB 创作者计划】KWDB多模分布式数据库助力共享打印机物联网IoT最佳实践落地,实现高效存储与查询时序数据
作者:Sunny_媛
版本说明:KWDB是浪潮KaiwuDB的开源版本哈,目前KWDB已经捐赠给开放原子开源基金会,是基金会的孵化项目。
近年来,随着物联网(IoT)、人工智能(AI)、云计算、大数据等技术的快速发展,随着物联网设备的普及和智能设备的增加,产生了大量的时间序列数据,这些数据需要被实时存储和处理。时序数据库以其高效处理大量时间序列数据的能力,成为处理这些数据的理想选择。
那么,今天给大家带来的一款 - KWDB 作为一款面向 AIoT 场景的国产分布式数据库,凭借其高效的时序数据处理能力与多模融合特性,逐渐成为行业关注的焦点。本文将从Docker容器化部署集群方式,到KWDB落地最佳实践案例,再到KWDB的技术分析,本文将以3台ECS上部署docker集群来演示,让大家可以快速了解并体验 KWDB 多模数据库的强大功能与便捷管理功能。
时序数据库是一种专门设计用于处理时间序列数据的数据库系统。时间序列数据通常是指按照时间顺序记录的数据,通常包括时间戳和对应的观测值。时序数据广泛应用于各种领域,如物联网IoT、智能城市、金融市场分析、气象预测、交通流量监测和生产过程监控等。
时序数据库(Time-Series Database, TSDB)是专为处理带时间标签的数据而设计的数据库系统,其核心作用包括高效存储、快速查询和分析时间序列数据,适用于高频率、大规模且时间依赖性强的数据场景。
在物联网IoT时代,各类数据如潮水般涌现,数据处理与存储需求日趋复杂,数据类型的多元化直接推动了专用数据库的发展。平时也是接触过最多的是OLAP与OLTP这种关系型数据库,那么我们可以来对比一下时序数据库主要用途。
支持事务处理,其事务机制一般将一系列数据库操作组合成一个逻辑单元。比如银行转账场景中,A 账户转出 100 元,B 账户转入 100 元,该事务中的所有操作只能全部成功执行或全部不执行。分布式系统中关系型数据库的事务机制需要记录数据过程状态,并提供失败回滚机制,进行并发控制。
传感器采集的数据只来自单一数据源,每条数据都是传感器对该时刻测点数值(如打印了几张纸,打印是黑白还是彩色)的真实记录,不存在同时修改两个数值的场景,也没有写冲突的场景(多个人同时修改同一条数据),因此事务在时序数据库中是不必要的,时序数据库需要优先保证的是大量数据的高效稳定写入。
作为数据管理领域的两种核心引擎,时序数据库与关系型数据库分别服务于不同的数字化场景:前者凭借其通用性优势,广泛支撑着电商、物流、企业管理系统等业务,后者专精于处理按时间生成的时序数据,主要应用于工业物联网、金融等领域。
由于关系型数据库天生的劣势导致其无法进行高效的存储和数据的查询,因此需要一种专门针对时间序列数据来做优化的数据库系统,即时间序列数据库,主要用于处理带时间标签(按照时间的顺序变化,即时间序列化)的数据。
那么,有这么多时序数据库产品,KWDB到底有何优势呢?接下来,让我们一起来逐一来解读,KWDB在开放性、可扩展性、性能和易用性方面具备显著优势,KWDB可以在同一数据库实例内支持同时构建关系型与时序型数据库,实现统一接入、融合存储与分析处理,可高效应对多模态数据融合处理挑战。
KWDB 是一款面向 AIoT 场景的分布式、多模融合数据库产品。支持在同一个实例中建立时序库和关系库,并统一处理多种类型的数据,具备对海量时序数据的高效读写与分析能力。产品具备高可用、安全稳定、易运维等特性,广泛应用于工业物联网、数字能源、车联网、智慧矿山等多个行业领域,为用户提供一站式数据存储、管理与分析的基础平台。
①. 面向 AIoT 场景的分布式、多模融合、支持原生 AI 的数据库产品。
②. 同一实例同时建立时序库和关系库并融合处理多模数据。
③. 具备千万级设备接入、百万级数据秒级写入、亿级数据秒级读取等时序数据高效处理能力。
④. 具有稳定安全、高可用、易运维等特点。
⑤. 面向工业物联网、数字能源、车联网、智慧产业等领域。
⑥. 提供一站式数据存储、管理与分析的基座。
首先在安装集群之前,可以思考一下在集群规划阶段,需要合理规划集群的拓扑结构、硬件环境、安全性等方面,目前KWDB支持2种集群模式:
①. 单副本集群:整个集群只有1份数据副本,所有数据的存储和更新操作都由该副本负责,集群节点出现故障时,数据写入、查询和 DDL 操作可能失败。
②. 多副本集群:每份数据默认有 3 份副本,且副本分布在不同节点上,支持高可用性、能够实现故障转移、能够实现数据强一致性等优势。
那么,我们这里推荐去演示的环境也是3台设备,为了方便演示,这里使用阿里云的云ECS服务器来进行docker集群的部署,安装KWDB集群相关的软件:
本文实践为Docker 3台设备集群环境,操作系统版本依赖的是Ubuntu 22.04.1。
在部署 KWDB 集群时,系统将对配置文件、运行环境、硬件配置、软件依赖和 SSH 免密登录进行检查,需要根据以下内容检查待部署节点的硬件和软件环境是否符合要求,这里有2种安装KWDB集群的方式:
①. 裸机部署:使用软件包安装搭建KWDB集群方式。
②. 容器部署:使用docker镜像容器搭建KWDB集群方式。
在使用之前首先一定要在本地环境中准备好docker、docker-compose等必要的工具,如果没有的话,建议大家可以先使用以下shell脚本进行安装Docker。
创建一个Shell脚本文件名字叫“install_docker.sh”,在脚本中编写相关安装Docker的命令。
#!/bin/bash # 更新包列表 apt-get update # 安装必要的软件包 apt install -y apt-transport-https ca-certificates curl gnupg lsb-release software-properties-common # 创建密钥环目录 mkdir -p /etc/apt/keyrings # 获取并添加Docker的GPG密钥 curl -fsSL https://mirrors.aliyun.com/docker-ce/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker-archive-keyring.gpg # 添加Docker的官方APT源 echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker-archive-keyring.gpg] https://mirrors.aliyun.com/docker-ce/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null # 更新包列表 apt update # 安装Docker及相关工具 apt install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin # 重载Docker守护进程配置 systemctl daemon-reload # 重启Docker服务 systemctl restart docker echo "Docker安装完成并重启。"
给脚本“install_docker.sh”添加执行权限并运行脚本:
chmod +x install_docker.sh && ./install_docker.sh docker version docker compose version
查看3台ECS云服务器是否全部安装完成docker和docker-compose,并且检查相关版本,我们这次docker的般般是28.1.1,docker compose的版本是v2.25.1。
注意一下,三台CES主机都需要进行安装docker和docker-compose软件,因为每台云主机都要跑一个docker镜像。
因为这次官方有版本要求(v2.2.0),执行docker pull拉取KWDB容器镜像,这里下载安装的KWDB镜像为kwdb/kwdb:2.2.0。
docker pull kwdb/kwdb:2.2.0
注意一下,三台CES主机都需要进行docker下载kwdb/kwdb:2.2.0镜像,因为每台云主机都要跑一个docker镜像。
在部署过程中,系统会自动生成相关日志。如果部署时出现错误,用户可以通过查看终端输出或 KWDB 安装目录中 log 目录里的日志文件,获取详细的错误信息。
以非安全模式部署的集群存在严重的安全风险,KWDB 强烈建议采用安全模式部署集群,通过 TLS 或 TLCP 加密技术验证节点和客户端的身份,并对节点与客户端之间的数据传输进行加密,有效防范未经授权的访问和数据篡改。
因此,我们使用的是3台ECS云服务器,所以是跨机器的TLS安全模式部署,以TLS安全模式部署 KWDB 集群, 使用以下命令创建数据库证书颁发机构、root 用户的客户端证书以及节点服务器证书,并且需要同时把这个证书copy到其它的云ECS服务器上。
①. 使用 ./kwbase cert create-node <node_ip> 命令为所有节点创建证书和密钥。
②. 将 CA 证书和密钥、节点证书和密钥传输至所有节点。
③. 如果需要在其它节点上运行 KWDB 客户端命令,还需要将 root 用户的证书和密钥复制到该节点。
④. 只有拥有 root 用户证书和密钥的节点,才能够访问集群。
docker run --rm --privileged -v /kwdb/certs:/kaiwudb/certs -w /kaiwudb/bin kwdb/kwdb:2.2.0 \ bash -c './kwbase cert create-ca --certs-dir=/kaiwudb/certs --ca-key=/kaiwudb/certs/ca.key && \ ./kwbase cert create-client root --certs-dir=/kaiwudb/certs --ca-key=/kaiwudb/certs/ca.key && \ ./kwbase cert create-node 127.0.0.1 localhost 0.0.0.0 8.154.42.14 --certs-dir=/kaiwudb/certs --ca-key=/kaiwudb/certs/ca.key'
创建完成后,即可以在本地/kwdb/certs目录下面生成了一些CA的相关证书文件,比如,用户客户端CA证书及节点服务器CA证书,在访问的时候,就可以通过这个生成的CA证书来进行TLS安全模式访问。
上面已经帮我们生成了3种CA证书了,通过tar压缩包之后,再通过scp同步到其它2个节点的ECS服务器,这样,就可以进行KWDB数据库实例的容器创建了,以下是相关的创建命令,关于创建的相关参数解释如下:
备注:具体参数详情解释可以参考官方页面:https://www.kaiwudb.com/template_version/pc/doc/db-operation/cluster-settings-config.html
docker创建相关参数解释:
docker容器启动执行命令相关参数解释:
在了解了相关的参数后,接下来我们就可以使用TLS安全模式部署KWDB集群,将分别进行3台ECS云服务器的部署,再以docker ps命令查看是否跑起来了。
第一台kwdb-cluster-server001云服务器docker创建数据库实例命令:
docker run -d --name kwdb-cluster-server001 --privileged \ --ulimit memlock=-1 --ulimit nofile=1048576 \ -p 26257:26257 -p 8080:8080 \ -v /kwdb/certs:/kaiwudb/certs \ -v /kwdb/kwdb-cluster-server001:/kaiwudb/deploy/kaiwudb-container \ --ipc shareable -w /kaiwudb/bin \ kwdb/kwdb:2.2.0 \ ./kwbase start --certs-dir=/kaiwudb/certs --listen-addr=0.0.0.0:26257 \ --advertise-addr=8.154.42.14:26257 --http-addr=0.0.0.0:8080 \ --store=/kaiwudb/deploy/kaiwudb-container --join 8.154.42.14:26257
第二台kwdb-cluster-server002云服务器docker创建数据库实例命令:
docker run -d --name kwdb-cluster-server002 --privileged \ --ulimit memlock=-1 --ulimit nofile=1048576 \ -p 26257:26257 -p 8080:8080 \ -v /kwdb/certs:/kaiwudb/certs \ -v /kwdb/kwdb-cluster-server002:/kaiwudb/deploy/kaiwudb-container \ --ipc shareable -w /kaiwudb/bin \ kwdb/kwdb:2.2.0 \ ./kwbase start --certs-dir=/kaiwudb/certs --listen-addr=0.0.0.0:26257 \ --advertise-addr=8.154.31.6:26257 --http-addr=0.0.0.0:8080 \ --store=/kaiwudb/deploy/kaiwudb-container --join 8.154.42.14:26257
第三台kwdb-cluster-server003云服务器docker创建数据库实例命令:
docker run -d --name kwdb-cluster-server003 --privileged \ --ulimit memlock=-1 --ulimit nofile=1048576 \ -p 26257:26257 -p 8081:8080 \ -v /kwdb/certs:/kaiwudb/certs \ -v /kwdb/kwdb-cluster-server003:/kaiwudb/deploy/kaiwudb-container \ --ipc shareable -w /kaiwudb/bin \ kwdb/kwdb:2.2.0 \ ./kwbase start --certs-dir=/kaiwudb/certs --listen-addr=0.0.0.0:26257 \ --advertise-addr=47.110.243.180:26257 --http-addr=0.0.0.0:8080 \ --store=/kaiwudb/deploy/kaiwudb-container --join 8.154.42.14:26257
①. 部署KWDB数据实例1(kwdb-cluster-server001),将advertise-addr和join对应的IP都改成自己对应的公网IP地址。
②. 部署KWDB数据实例2(kwdb-cluster-server002),将advertise-addr对应的ip都改成自己对应的公网IP地址,同时,设置join对应的IP设置为KWDB数据库实例1(kwdb-cluster-server001)的IP地址。
③. 部署KWDB数据实例3(kwdb-cluster-server003),将advertise-addr对应的ip都改成自己对应的公网IP地址,同时,设置join对应的IP设置为KWDB数据库实例1(kwdb-cluster-server001)的IP地址。
安全端口说明:
下表列出 KWDB 服务需要映射的端口,在安装部署前,确保目标机器的以下端口没有被占用且没有被防火墙拦截。如果是使用到云服务器的话,则需要将网络安全组的端口限制打开(刚开始没想到,调试很久了):
①. 8080端口:数据库 Web 服务端口
②. 26257端口:数据库服务端口、节点监听端口和对外连接端口
其它详细参数可以参考官网地址:https://www.kaiwudb.com/template_version/pc/doc/tool-command-reference/client-tool/kwbase-sql-reference.html#%E4%BD%BF%E7%94%A8%E4%B8%BE%E4%BE%8B
kwbase init 命令用于初始化 KWDB 集群,使用TLS安全模式来访问、验证部署的集群,连接的 KWDB 指定的IP节点。
# 初始化KWDB集群 docker exec kwdb-cluster-server001 ./kwbase init --certs-dir=/kaiwudb/certs --host=8.154.42.14:26257 # 检查所有容器实例的状态 docker exec -it kwdb-cluster-server001 ./kwbase node status --certs-dir=/kaiwudb/certs
在初始化完成后,如果执行成功会提示“Cluster successfully initialized”,就表示成功创建了集群,可以使用node status来进行查看KWDB集群的节点状态,可以看到3台集群设备都加进来了,而且状态都是is_available的,表示是可以使用的。
# kwbase sql 命令用于开启交互式 SQL Shell docker exec -it kwdb-cluster-server001 ./kwbase sql --host=8.154.42.14:26257 --certs-dir=/kaiwudb/certs
也可以使用kwbase sql 命令用于开启交互式 SQL Shell,进入数据库控制台中:
①. 创建一个时序数据库,这里需要带一个TS的标识(对应的引擎Time series)。
②. 创建完成后,使用常见的SQL查询语句show databases即可查看所有的数据库,不需要额外学习其它的SQL知识。
③. 在创建时序数据库printer_iot之后,我们可以看到其它2个cluster的节点也自动同步了数据库。
公司主要是做打印机共享办公的,在日常办公、学习生活里,打印这件事儿,总能整出不少烦心事:公司赶着开重要会议,资料却还在打印机那儿“排队难产”;学生党熬夜赶出的论文,到打印店一看,色差严重、排版错乱等众多问题,公司推出共享打印机物联网IoT模式彻底革新传统打印模式!
打印机物联网系统的数据上传物联网数据多种纬度的业务相关时序数据,但是也需要结合设备功能特征和业务场景进行针对性采集,实际采集方案需根据业务场景动态调整,例如共享打印机需强化用户计费数据采集,而工业级设备侧重设备诊断数据密度。
根据上面物联网共享打印机设备相关的数据体系结构,我们可以将创建7张不同纬度的时序数据表,用来实际的项目场景体验,如下为在创建表中遇到的一些问题,也可以帮助大家一起来规避。
问题一:创建时序表时,提示:“ERROR: tag printer_id can not be a nullable tag as primary tag”,通过查询原因发现是将printer_id设置为既是主键又是可以为空(NULL)的,增加NOT NULL属性即可解决问题。
CREATE TABLE printer_status ( timestamp TIMESTAMPTZ NOT NULL, temperature FLOAT, fan_speed INTEGER, print_head_temp FLOAT, humidity FLOAT, power_status VARCHAR(50), error_code VARCHAR(50), created_at TIMESTAMPTZ NOT NULL, model VARCHAR(50), firmware_version VARCHAR(50) ) TAGS (printer_id char(32)) PRIMARY TAGS (printer_id);
问题二:在创建时序表时,提示:“ERROR: column error_message: unsupported column type string in timeseries table”,通过查询原因发现是尝试将一个不适合直接用于时间序列分析的数据类型(如字符串类型)用在时间序列分析的函数或模型中,将error_code中的text类型改为varchar类型即可解决问题。
CREATE TABLE printer_status ( timestamp TIMESTAMPTZ NOT NULL, temperature FLOAT, fan_speed INTEGER, print_head_temp FLOAT, humidity FLOAT, power_status VARCHAR(50), error_code VARCHAR(50), created_at TIMESTAMPTZ NOT NULL, model VARCHAR(50), firmware_version VARCHAR(50) ) TAGS (printer_id char(32) NOT NULL) PRIMARY TAGS (printer_id);
最后我们将这7张物联网IoT相关的业务时序数据表如下,可以看到table_type类型也是跟时序数据库一样,是Time series table的类型,表示这是时序数据表,跟关系型数据表有略微点不同。
接下来我们使用go代码写一段自动生成的脚本来跑一下集群写入数据的方案来试试,来验证一下时序数据库在集群中的性能来对比一下。
首先是KWDB数据库连接方式与其它MySQL和PostgreSQL略有区别,一般来说,在关系型数据库中只需要提供用户名和密码,即可连接到数据库的实例中来,但是上面我们使用的是TLS安全模式的部署方式来安装的,所以,这里我们需要使用相关CA证书来连接KWDB数据库实例。
url := fmt.Sprintf("postgresql://%s@%s/%s?sslmode=verify-full&sslrootcert=%s&sslcert=%s&sslkey=%s", "root", "8.154.42.14:26257", "printer_iot", "./certs/ca.crt", "./certs/client.root.crt", "./certs/client.root.key") config, err := pgx.ParseConfig(url) if err != nil { log.Fatalf("error parsing connection configuration: %v", err) } config.RuntimeParams["application_name"] = "batch_insert_all_tables" conn, err := pgx.ConnectConfig(context.Background(), config) if err != nil { log.Fatalf("error connecting to database: %v", err) } defer conn.Close(context.Background())
需要使用到ca.crt、client.root.crt、client.root.key这几个文件来进行TLS安全模式的验证,这里我们使用的是github.com/jackc/pgx/v5的应用程序连接器,如果是其它语言的同学,可以参考官网连接示例:https://www.kaiwudb.com/kaiwudb_docs/#/development/overview.html
接下来就是表的数据生成,这里我们以printer_status表为参考实例,使用脚本来生成相关的测试数据来模拟真实场景中大批量的数据插入情况,以下是相关的代码:
func insertPrinterStatus(conn *pgx.Conn, batchSize int, totalInserted map[string]int) { var values []interface{} var placeholders []string for i := 0; i < batchSize; i++ { timestamp := time.Now().Format("2006-01-02 15:04:05.000Z07:00") values = append(values, timestamp, 59.038678+float64(i)*0.1, // temperature 1510+i, // fan_speed 83.219425+float64(i)*0.1, // print_head_temp 0.471526+float64(i)*0.01, // humidity "on", // power_status fmt.Sprintf("E%d", 2791+i), // error_code timestamp, // created_at "Model-7", // model "v3.0.3", // firmware_version fmt.Sprintf("printer_%04d", totalInserted["printer_status"]+i), // printer_id ) start := i * 11 + 1 placeholder := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", start, start+1, start+2, start+3, start+4, start+5, start+6, start+7, start+8, start+9, start+10) placeholders = append(placeholders, placeholder) } sql := fmt.Sprintf("insert INTO printer_status (timestamp, temperature, fan_speed, print_head_temp, humidity, power_status, error_code, created_at, model, firmware_version, printer_id) VALUES %s", strings.Join(placeholders, ",")) _, err := conn.exec(context.Background(), sql, values...) if err != nil { log.Printf("error batch inserting printer_status data: %v\n", err) return } totalInserted["printer_status"] += batchSize log.Printf("成功插入 %d 条 printer_status 记录,总计: %d\n", batchSize, totalInserted["printer_status"]) }
这里需要注意一下就是时间戳的问题,在刚开始测试时,发现使用time.Now()函数,显示成功,但是就是数据库count没有数据,只能打印SQL语句在shell控制台进行操作,发现时间上有问题,后面改为time.Now().Format(“2006-01-02 15:04:05.000Z07:00”)时间格式即可插入到KWDB多模分布式数据库中。
// 设置批量插入的参数 batchSize := 5000 // 每批次插入记录数 totalInserted := make(map[string]int) for { // 为每个表执行批量插入 insertPrinterStatus(conn, batchSize, totalInserted) insertPrinterCounter(conn, batchSize, totalInserted) insertConsumableStatus(conn, batchSize, totalInserted) insertPrintJob(conn, batchSize, totalInserted) insertPaymentRecord(conn, batchSize, totalInserted) insertFirmwareUpgrade(conn, batchSize, totalInserted) insertHardwareCheck(conn, batchSize, totalInserted) }
但是在实际的代码测试中,遇到以下问题,SQL并没有抛错,但是数据库也不增加数据?只能进行打印SQL日志在shell控制台来进行测试。
发现报错的语句在SQL shell控制台中是执行成功了,显示insert 1,表示成功了,但是查询select count(*)总数时,并没有发现成功了,经过与插入成功的数据字段类型与值进行双向比对,发现是时间的原因,更换正确的时间后插入正常。
# 报错语句: insert INTO printer_status (timestamp, temperature, fan_speed, print_head_temp, humidity, power_status, error_code, created_at, model, firmware_version, printer_id) VALUES ('2025-05-23T11:00:42Z', 58.476,534, 38.6705, 89.04077, 'error','E4380', '2025-05-23T11:00:42Z', 'Model-7', 'v2.5.3', 'printer_0000'); # 正常语句: insert INTO printer_status (timestamp, temperature, fan_speed, print_head_temp, humidity, power_status, error_code, created_at, model, firmware_version, printer_id) VALUES ('2025-05-23 11:00:42.605+00:00', 58.476,534, 38.6705, 89.04077, 'error','E4380', '2025-05-23 11:00:42.605+00:00', 'Model-7', 'v2.5.3', 'printer_0000');
批量设置插入数据参数逻辑:
// 设置批量插入的参数 batchSize := 5000 // 每批次插入记录数 totalInserted := make(map[string]int) for { // 为每个表执行批量插入 insertPrinterStatus(conn, batchSize, totalInserted) insertPrinterCounter(conn, batchSize, totalInserted) insertConsumableStatus(conn, batchSize, totalInserted) insertPrintJob(conn, batchSize, totalInserted) insertPaymentRecord(conn, batchSize, totalInserted) insertFirmwareUpgrade(conn, batchSize, totalInserted) insertHardwareCheck(conn, batchSize, totalInserted) }
接下来就是批量数据操作函数定义,这里可以设置批量插入的数量,但是发现超过5000行,就会显示“error batch inserting data: Error: more than 65535 arguments to prepared statmement: 1100000(SQLSTATE 08P01)”。
2025/05/23 21:54:12 error batch inserting data: ERROR: more than 65535 arguments to prepared statement: 1100000 (SQLSTATE 08P01) 2025/05/23 21:54:21 error batch inserting data: ERROR: prepared statement "stmtcache_ff8a6b3e681096237b63a367a2c9c29264dd5bd3d5c1af9d" already exists (SQLSTATE 42P05) 2025/05/23 21:54:30 error batch inserting data: ERROR: prepared statement "stmtcache_ff8a6b3e681096237b63a367a2c9c29264dd5bd3d5c1af9d" already exists (SQLSTATE 42P05) 2025/05/23 21:54:40 error batch inserting data: ERROR: prepared statement "stmtcache_ff8a6b3e681096237b63a367a2c9c29264dd5bd3d5c1af9d" already exists (SQLSTATE 42P05) 2025/05/23 21:54:49 error batch inserting data: ERROR: prepared statement "stmtcache_ff8a6b3e681096237b63a367a2c9c29264dd5bd3d5c1af9d" already exists (SQLSTATE 42P05) 2025/05/23 21:54:58 error batch inserting data: ERROR: prepared statement "stmtcache_ff8a6b3e681096237b63a367a2c9c29264dd5bd3d5c1af9d" already exists (SQLSTATE 42P05) 2025/05/23 21:55:07 error batch inserting data: ERROR: prepared statement "stmtcache_ff8a6b3e681096237b63a367a2c9c29264dd5bd3d5c1af9d" already exists (SQLSTATE 42P05) 2025/05/23 21:55:16 error batch inserting data: ERROR: prepared statement "stmtcache_ff8a6b3e681096237b63a367a2c9c29264dd5bd3d5c1af9d" already exists (SQLSTATE 42P05)
接下来是上面完整的代码:
package main import ( "context" "fmt" "log" "strings" "time" "math/rand" "github.com/jackc/pgx/v5" ) func main() { url := fmt.Sprintf("postgresql://%s@%s/%s?sslmode=verify-full&sslrootcert=%s&sslcert=%s&sslkey=%s", "root", "8.154.42.14:26257", "printer_iot", "./certs/ca.crt", "./certs/client.root.crt", "./certs/client.root.key") config, err := pgx.ParseConfig(url) if err != nil { log.Fatalf("error parsing connection configuration: %v", err) } config.RuntimeParams["application_name"] = "batch_insert_all_tables" conn, err := pgx.ConnectConfig(context.Background(), config) if err != nil { log.Fatalf("error connecting to database: %v", err) } defer conn.Close(context.Background()) // 设置批量插入的参数 batchSize := 5000 // 每批次插入记录数 totalInserted := make(map[string]int) for { // 为每个表执行批量插入 insertPrinterStatus(conn, batchSize, totalInserted) insertPrinterCounter(conn, batchSize, totalInserted) insertConsumableStatus(conn, batchSize, totalInserted) insertPrintJob(conn, batchSize, totalInserted) insertPaymentRecord(conn, batchSize, totalInserted) insertFirmwareUpgrade(conn, batchSize, totalInserted) insertHardwareCheck(conn, batchSize, totalInserted) } } func insertPrinterStatus(conn *pgx.Conn, batchSize int, totalInserted map[string]int) { var values []interface{} var placeholders []string for i := 0; i < batchSize; i++ { timestamp := time.Now().Format("2006-01-02 15:04:05.000Z07:00") values = append(values, timestamp, 59.038678+float64(i)*0.1, // temperature 1510+i, // fan_speed 83.219425+float64(i)*0.1, // print_head_temp 0.471526+float64(i)*0.01, // humidity "on", // power_status fmt.Sprintf("E%d", 2791+i), // error_code timestamp, // created_at "Model-7", // model "v3.0.3", // firmware_version fmt.Sprintf("printer_%04d", totalInserted["printer_status"]+i), // printer_id ) start := i * 11 + 1 placeholder := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", start, start+1, start+2, start+3, start+4, start+5, start+6, start+7, start+8, start+9, start+10) placeholders = append(placeholders, placeholder) } sql := fmt.Sprintf("insert INTO printer_status (timestamp, temperature, fan_speed, print_head_temp, humidity, power_status, error_code, created_at, model, firmware_version, printer_id) VALUES %s", strings.Join(placeholders, ",")) _, err := conn.exec(context.Background(), sql, values...) if err != nil { log.Printf("error batch inserting printer_status data: %v\n", err) return } totalInserted["printer_status"] += batchSize log.Printf("成功插入 %d 条 printer_status 记录,总计: %d\n", batchSize, totalInserted["printer_status"]) } func insertPrinterCounter(conn *pgx.Conn, batchSize int, totalInserted map[string]int) { var values []interface{} var placeholders []string for i := 0; i < batchSize; i++ { timestamp := time.Now().Format("2006-01-02 15:04:05.000Z07:00") values = append(values, timestamp, 10000+i, // total_pages 3000+i, // color_pages 7000+i, // grayscale_pages timestamp, // created_at fmt.Sprintf("printer_%04d", totalInserted["printer_counter"]+i), // printer_id ) start := i * 6 + 1 placeholder := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d)", start, start+1, start+2, start+3, start+4, start+5) placeholders = append(placeholders, placeholder) } sql := fmt.Sprintf("insert INTO printer_counter (timestamp, total_pages, color_pages, grayscale_pages, created_at, printer_id) VALUES %s", strings.Join(placeholders, ",")) _, err := conn.exec(context.Background(), sql, values...) if err != nil { log.Printf("error batch inserting printer_counter data: %v\n", err) return } totalInserted["printer_counter"] += batchSize log.Printf("成功插入 %d 条 printer_counter 记录,总计: %d\n", batchSize, totalInserted["printer_counter"]) } func insertConsumableStatus(conn *pgx.Conn, batchSize int, totalInserted map[string]int) { var values []interface{} var placeholders []string for i := 0; i < batchSize; i++ { timestamp := time.Now().Format("2006-01-02 15:04:05.000Z07:00") values = append(values, timestamp, "toner", // type 80-i%20, // level 5000-i%1000, // estimated_pages timestamp, // created_at fmt.Sprintf("printer_%04d", totalInserted["consumable_status"]+i), // printer_id fmt.Sprintf("consumable_%04d", totalInserted["consumable_status"]+i), // consumable_id ) start := i * 7 + 1 placeholder := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d)", start, start+1, start+2, start+3, start+4, start+5, start+6) placeholders = append(placeholders, placeholder) } sql := fmt.Sprintf("insert INTO consumable_status (timestamp, type, level, estimated_pages, created_at, printer_id, consumable_id) VALUES %s", strings.Join(placeholders, ",")) _, err := conn.exec(context.Background(), sql, values...) if err != nil { log.Printf("error batch inserting consumable_status data: %v\n", err) return } totalInserted["consumable_status"] += batchSize log.Printf("成功插入 %d 条 consumable_status 记录,总计: %d\n", batchSize, totalInserted["consumable_status"]) } func insertPrintJob(conn *pgx.Conn, batchSize int, totalInserted map[string]int) { var values []interface{} var placeholders []string for i := 0; i < batchSize; i++ { timestamp := time.Now().Format("2006-01-02 15:04:05.000Z07:00") values = append(values, timestamp, fmt.Sprintf("job_%04d", totalInserted["print_job"]+i), // job_id fmt.Sprintf("user_%04d", rand.Intn(1000)), // user_id "pdf", // file_type 10+rand.Intn(50), // page_count rand.Intn(10), // color_pages 1, // status "", // error_code 60+rand.Intn(300), // duration timestamp, // created_at fmt.Sprintf("printer_%04d", totalInserted["print_job"]+i), // printer_id ) start := i * 11 + 1 placeholder := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", start, start+1, start+2, start+3, start+4, start+5, start+6, start+7, start+8, start+9, start+10) placeholders = append(placeholders, placeholder) } sql := fmt.Sprintf("insert INTO print_job (timestamp, job_id, user_id, file_type, page_count, color_pages, status, error_code, duration, created_at, printer_id) VALUES %s", strings.Join(placeholders, ",")) _, err := conn.exec(context.Background(), sql, values...) if err != nil { log.Printf("error batch inserting print_job data: %v\n", err) return } totalInserted["print_job"] += batchSize log.Printf("成功插入 %d 条 print_job 记录,总计: %d\n", batchSize, totalInserted["print_job"]) } func insertPaymentRecord(conn *pgx.Conn, batchSize int, totalInserted map[string]int) { var values []interface{} var placeholders []string for i := 0; i < batchSize; i++ { timestamp := time.Now().Format("2006-01-02 15:04:05.000Z07:00") values = append(values, timestamp, 50.0+float64(rand.Intn(200)), // amount fmt.Sprintf("coupon_%04d", rand.Intn(1000)), // coupon_id 5.0+float64(rand.Intn(20)), // coupon_amount "wechat", // payment_method fmt.Sprintf("trans_%04d", totalInserted["payment_record"]+i), // transaction_id timestamp, // created_at fmt.Sprintf("printer_%04d", totalInserted["payment_record"]+i), // printer_id fmt.Sprintf("user_%04d", rand.Intn(1000)), // user_id ) start := i * 9 + 1 placeholder := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", start, start+1, start+2, start+3, start+4, start+5, start+6, start+7, start+8) placeholders = append(placeholders, placeholder) } sql := fmt.Sprintf("insert INTO payment_record (timestamp, amount, coupon_id, coupon_amount, payment_method, transaction_id, created_at, printer_id, user_id) VALUES %s", strings.Join(placeholders, ",")) _, err := conn.exec(context.Background(), sql, values...) if err != nil { log.Printf("error batch inserting payment_record data: %v\n", err) return } totalInserted["payment_record"] += batchSize log.Printf("成功插入 %d 条 payment_record 记录,总计: %d\n", batchSize, totalInserted["payment_record"]) } func insertFirmwareUpgrade(conn *pgx.Conn, batchSize int, totalInserted map[string]int) { var values []interface{} var placeholders []string for i := 0; i < batchSize; i++ { timestamp := time.Now().Format("2006-01-02 15:04:05.000Z07:00") values = append(values, timestamp, "v2.0.0", // old_version "v3.0.0", // new_version true, // success "", // error_message 300+rand.Intn(600), // duration timestamp, // created_at fmt.Sprintf("printer_%04d", totalInserted["firmware_upgrade"]+i), // printer_id ) start := i * 8 + 1 placeholder := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", start, start+1, start+2, start+3, start+4, start+5, start+6, start+7) placeholders = append(placeholders, placeholder) } sql := fmt.Sprintf("insert INTO firmware_upgrade (timestamp, old_version, new_version, success, error_message, duration, created_at, printer_id) VALUES %s", strings.Join(placeholders, ",")) _, err := conn.exec(context.Background(), sql, values...) if err != nil { log.Printf("error batch inserting firmware_upgrade data: %v\n", err) return } totalInserted["firmware_upgrade"] += batchSize log.Printf("成功插入 %d 条 firmware_upgrade 记录,总计: %d\n", batchSize, totalInserted["firmware_upgrade"]) } func insertHardwareCheck(conn *pgx.Conn, batchSize int, totalInserted map[string]int) { var values []interface{} var placeholders []string for i := 0; i < batchSize; i++ { timestamp := time.Now().Format("2006-01-02 15:04:05.000Z07:00") values = append(values, timestamp, "normal", // print_head_check "success", // calibration "passed", // diagnostics "normal", // temperature 0.471526+float64(i)*0.01, // humidity 220.0+float64(rand.Intn(10)), // voltage timestamp, // created_at fmt.Sprintf("printer_%04d", totalInserted["hardware_check"]+i), // printer_id ) start := i * 9 + 1 placeholder := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", start, start+1, start+2, start+3, start+4, start+5, start+6, start+7, start+8) placeholders = append(placeholders, placeholder) } sql := fmt.Sprintf("insert INTO hardware_check (timestamp, print_head_check, calibration, diagnostics, temperature, humidity, voltage, created_at, printer_id) VALUES %s", strings.Join(placeholders, ",")) _, err := conn.exec(context.Background(), sql, values...) if err != nil { log.Printf("error batch inserting hardware_check data: %v\n", err) return } totalInserted["hardware_check"] += batchSize log.Printf("成功插入 %d 条 hardware_check 记录,总计: %d\n", batchSize, totalInserted["hardware_check"]) }
上面通过go的脚本在跑数据,满心欢喜的去查询kwdb-cluster-server002和kwdb-cluster-server003的数据库实例的数据,但是查不了任何一张表,都是报错:“Error: no inbound stream connection”,只kwdb-cluster-server001有数据,有这是什么问题呢?
①. 创建完3台KWDB数据库实例后,也可以看到时序数据库printer_iot是可以同步过来的。
②. 创建完7张相关时序表也是可以同步到过来的。
没有办法,只能在kwdb-cluster-server001查看logs日志看看能不能发现问题呢?果然,看到这是这里提示大概意思是grpc连接不到47.110.243.180的26257端口和8.154.31.180的26257端口,但是安全组肯定是开放了的。
最后排查到,发现在插入的语句在创建证书的语句存在问题,需要将所有的ip都加入进来,不能只加kwdb-cluster-server001的服务器IP地址,将另外2台服务器的IP地址也加上即可,再生成CA相关证书,copy到其它节点上再创建一下容器即可解决这个问题。
docker run --rm --privileged -v /kwdb/certs:/kaiwudb/certs -w /kaiwudb/bin kwdb/kwdb:2.2.0 \ bash -c './kwbase cert create-ca --certs-dir=/kaiwudb/certs --ca-key=/kaiwudb/certs/ca.key && \ ./kwbase cert create-client root --certs-dir=/kaiwudb/certs --ca-key=/kaiwudb/certs/ca.key && \ ./kwbase cert create-node 127.0.0.1 localhost 0.0.0.0 8.154.42.14 47.110.243.180 8.154.31.6 --certs-dir=/kaiwudb/certs --ca-key=/kaiwudb/certs/ca.key'
按上面的步骤重新进行删除容器再进行部署操作,再重复以上操作即可以完成新的环境重新进行部署,可以发现这个问题就能进行解决,3台服务器的数据都已经进行同步了。
上面解决了其它2台数据不同步的问题的,我们就可以进行大批量插入的动作,可以看到数据同步也是比较及时的,就算略微有一些差异也是在1秒内就能同步完成,基本可以说没有太大的数据延迟的问题:
①. 【查询期间无数据插入情况】第一次查询,3台服务器都同时在12:04:50的时候进行查询,可以发现查询的表数据量都是275000条,都是一致的,在查询后马上就有数据插入的操作。
②. 【查询期间有数据插入情况】第二次查询,3台服务器都同时在12:05:00的时候进行查询,可以发现查询的表数据量都是275000条,都是一致的,在查询前马上就有数据插入的操作,但是数据应该还没有插入到数据库中。
③. 【查询期间有数据插入情况】第三次查询,3台服务器都同时在12:01:10的时候进行查询,可以发现查询的表数据量不一致的(差异不大),在查询前马上就有数据插入的操作,表示数据正在插入的过程。
同时,也可以观看以下视屏,可以发现在3台服务器数据同步的过程中,延迟不超过1s,就算略微有一些差异也是在1秒内就能同步完成,基本可以说没有太大的数据延迟的问题,之前用某云时,主从数据库会存在2-3s的延迟,就是先注册马上登陆会提示“用户数据不存在”,只能使用延迟几秒提示“正在登录中”来解决这个问题。
通过查看kwdb-cluster-server01的数据库同步的日志,如下可以分析通过了244个goroutines在跑日志,将数据同步到其它2个节点中。
通过使用docker stop容器,可以将KWDB其中一台kwdb_cluster_server002暂时停止,通过查看集群节点状态,发现第二台kwdb_cluster_server002确实已经已经is_available是flase的。
但是还想通过docker start启动时,发现启动不了,查看logs日志发现识别有点问题,像是识别成了新增的节点 ,而不是原来的节点重新启动,向官方人员咨询也没有得到很好的回复,希望尽快修复这个问题点。
* * INFO: initial startup completed. * Node will now attempt to join a running cluster, or wait for `kwbase init`. * Client connections will be accepted after this completes successfully. * Check the log file(s) for progress. * * * WARNING: The server appears to be unable to contact the other nodes in the cluster. Please try: * * - starting the other nodes, if you haven't already; * - double-checking that the '--join' and '--listen'/'--advertise' flags are set up correctly; * - running the 'kwbase init' command if you are trying to initialize a new cluster. * * If problems persist, please see on KWDB web site. * KWDB node starting at 2025-05-24 05:45:50.226961358 +0000 UTC (took 37.4s) build: 2.2.0 @ 2025/03/31 07:20:02 (go1.16.15) sql: postgresql://root@8.154.31.6:26257?sslcert=%2Fkaiwudb%2Fcerts%2Fclient.root.crt&sslkey=%2Fkaiwudb%2Fcerts%2Fclient.root.key&sslmode=verify-full&sslrootcert=%2Fkaiwudb%2Fcerts%2Fca.crt RPC client flags: ./kwbase <client cmd> --host=8.154.31.6:26257 --certs-dir=/kaiwudb/certs logs: /kaiwudb/deploy/kaiwudb-container/logs temp dir: /kaiwudb/deploy/kaiwudb-container/kwbase-temp023690807 external I/O path: /kaiwudb/deploy/kaiwudb-container/extern store[0]: path=/kaiwudb/deploy/kaiwudb-container storage engine: rocksdb status: initialized new node, joined pre-existing cluster clusterID: 1e886e85-df09-4ba1-abfc-07767fb6382b nodeID: 2 initiating graceful shutdown of server * * ERROR: Cluster expansion feature needs an enterprise license to enable. *
另外,在其它节点(kwdb_cluster_server002、kwdb_cluster_server003)的SQL shell控制台中进行查询会出现以下的报错,但是在kwdb_cluster_server001没有发现此问题,不过,通过程序查询SQL没有这种问题出现。
上面是基于Docker的KWDB TLS安全部署集群,并且使用物联网共享打印机的最佳落地方案实践顺利的完成,搭建了3台节点集群并完成了时序数据库、时序数据表的创建、访问、同步的各种场景测试,同时,体验了KWDB分布式多模数据库在IoT的强大的分布式能力与数据插入高性能的特点。