文章链接:【KWDB 创作者计划】KWDB时序数据库在工业级机器手臂生产调度中的落地实践案例,加速时序数据高效存储与检索
作者:2301_79516858
随着AI与人工智能的快速发展,工业4.0也迎来了智能制造,机器人等技术的发展,单纯的关系型数据库已经远远不能满足工业量级的存储场景了。为了满足工业级物联网的数据增量问题,急需一款高性能、分布式的物联网、工业大数据平台,来解决时序数据的问题。
在万物互联时代,设备和传感器持续产生海量实时数据,传统数据库和数据处理平台难以满足高性能和实时数据处理的需求,面临严峻挑战。这些海量实时数据的涌现,也导致了企业在研发、运营和维护成本的大幅攀升。
为应对这一挑战,本文介绍一款专为物联网、工业互联网等场景设计并优化的大数据平台“KWDB”, 这个专为物联网和工业互联网等场景设计优化的大数据平台应运而生。作为一款高性能、分布式的物联网、工业大数据平台,其核心模块是高性能、集群开源、云原生、极简的时序数据库。它能安全高效地将大量设备每天产生的高达 TB 甚至 PB 级的数据进行汇聚、存储、分析和分发,并提供 AI 智能体对数据进行预测与异常检测,提供实时的商业洞察。
接下来从“工厂4.0的机器手臂自动化测试”的场景来实际分析与实践,它融合了关系型数据库,时序数据库等多种类型的数据库,很适合用在物联网IoT的应用场景。
工厂SFC(Shop Floor Control)生产管理系统,简单来说,是一种专注于工厂车间生产现场管控的信息化管理系统,它就像是车间里的智能管家,对生产过程中的各个环节进行细致入微的管理。
在现代的互联网应用中,其中MySQL数据库扮演着重要的角色。然而,随着工厂时序的数据量持续不断的增加,用户可能会发现数据库查询的速度逐渐变慢。本文将探讨一下“KWDB分布式多模数据库"时序数据库的,并提供一些其它的数据库解决方案。
随着公司的业务快速发展,数据库中的时序数据量猛增,访问性能也变慢了,单台MySQL实例无法应对和满足大规模数据管理和请求访问,导致数据库性能下降,成为瓶颈。
关系型数据本身就比较容易形成系统瓶颈,无论是从单机存储容量、连接数、处理能力都有限。
当单表的数据量达到1000W以后,由于查询和操作的维度较广,哪怕使用了MySQL从库读写分离、优化索引等操作时,性能还是无可避免严重下降。
当架构增加Redis、RabbitMQ等消息队列。
OLTP的大数据量统计数据类异构表同步只能满足业务的T+1。
系统架构中,异步设计方案中的中间件故障,导致数据重传、数据丢失。
所以,在日常的企业级应用中,OLAP和OLTP针对不同的业务场景,有不同的解决方案。OLAP主要用于企业级决策和战略分析,需要快速的数据查询和分析技术。相反,OLTP主要用于企业日常操作,需要快速的数据更新和处理技术。那么“KWDB分布式多模数据库"有什么特点和区别呢?
KWDB 2.0 是一款自主研发的面向 AIoT 场景的分布式、多模融合、支持原生 AI 的数据库产品,可通过同一实例同时建立时序库和关系库并融合处理多模数据,具备千万级设备接入、百万级数据秒级写入、亿级数据秒级读取等时序数据高效处理能力;具有稳定安全、高可用、易运维等特点,面向工业物联网、数字能源、车联网、智慧产业等领域,提供一站式数据存储、管理与分析的基座。
时序数据,即时间序列数据(Time-Series Data),它们是一组按照时间发生先后顺序进行排列的序列数据。日常生活中,设备、传感器采集的数据就是时序数据,证券交易的记录也是时序数据。因此时序数据的处理并不陌生,特别在是工业自动化以及证券金融行业,专业的时序数据处理软件早已存在,比如工业领域的 PI System 以及金融行业的 KDB。
我们可以来思考一下,工业SFC系统的业务流程是什么呢,先来介绍一下我们生产手机的测试场景,再来结合时序数据的特别来综合分析一下。
首先我们每个设备在车间是有固定的位置,一般流水线是分为A/B边,不同的工站测试的手机数据不同,比如有一些是专门测试声音、Wifi模组、摄像头等不同的工位。
每台设备机器手臂会产出很多时序数据,比如设备的编号、型号、代码等固定的信息,这些可以算是标签的数据,用于记录采集对象的静态数据。
接下来是采集的是字段,用于记录采集对象的实时数据,这里分为2部分:一部分是机器手臂相关的运行状态数据,另外一部分是手机的测试数据,比如某个功能是否成功。
结合上面时序数据的特点,我们这个场景完全是符合要求的,相较于传统使用的关系型数据库充分利用了时序数据特点,比如结构化、无需事务、很少删除或更新、写多读少等等。
KWDB 具备完善的功能和优异的性能,充分满足不同的应用场景需求,赋能行业企业的数字化建设和转型。能广泛应用于电力、石油、制造、出行、汽车、IT 运维、金融等领域。企业使用KWDB可以安全高效地将大量设备、传感器每天产生的高达 TB 甚至 PB级的数据进行汇聚、存储、分析和分发,对业务运行状态进行实时监测、预警,提供实时的商业洞察,加速数字化转型进程,将数据价值最大化。
从上面KWDB的产品优势来看,KWDB是一款高性能、分布式、支持 SQL 的时序数据库 (Database)的多模数据库,包括集群功能全部开源(开放原子开源基金会孵化及运营的项目)。KWDB能被广泛运用于物联网、工业互联网、车联网、IT 运维、金融等领域。
除核心的时序数据库 (Database) 功能外,KWDB还提关系型数据库查询的功能,可以给大数据平台所需要的系列功能,最大程度减少研发和运维的复杂度。
融合多种数据计算引擎,根据不同模型数据特征选择不同的存储、计算模式,对外提供统一的接口,提供多种分析计算能力,提升查询效率:
①. 自适应时序引擎:支持多种时序数据特色的复杂查询和多维聚合方式。与传统关系数据库相比,KWDB 具备优异的查询性能。另外,KWDB 提供 5-30 倍的压缩能力,数据压缩后无需解压缩即可使用。
②. 事务处理引擎:支持分布式事务和 MVCC(Multi-Version Concurrency Control,多版本并发控制),具备注释、视图、约束、索引、序列等功能。
③. 预测分析引擎:提供模型生命周期管理、模型训练、模型推理预测等功能。任何拥有数据库应用开发背景的开发人员都可以轻松地完成模型管理和预测等操作。
目前,KWDB 在工业物联网、数字能源、数字政务、金融等领域均已成功完成落地实践。未来,KWDB 能够赋能工业物联网、数字能源、车联网、智慧矿山等各大行业领域,助力企业从数据中挖掘更大的商业价值。
相关文档:
KWDB官方开源仓库地址:https://gitee.com/kwdb/kwdb
KWDB官网地址:https://www.kaiwudb.com/
KWDB官网文档地址: https://www.kaiwudb.com/kaiwudb_docs/#/
KWDB 2.2.0 下载地址: https://gitee.com/kwdb/kwdb/releases/tag/V2.2.0
KWDB 支持用户根据自己的需求来选择安装的方式,这里分为单节点部署和集群部署两种方式,因为工厂内部的数据量比较大,所以,我们这里使用集群的方式来进行部署与使用:
①. 单节点部署分为:二进制安装包、容器和源码安装部署。
②. 集群部署分为:裸机部署、容器部署。
从KWDB支持的部署集群方式来看,如果是单副本集群的话,整个集群只有一份数据副本,从数据库的CAP理论来讲,从一致性 (Consistency)、可用性 (Availability) 和分区容错性 (Partition Tolerance)角度来讲,单副本集群部署方式是不太推荐的。
那么,我们只能通过最少三台物理设备来搭建KWDB集群的方式,才能符合多副本集群的基础节点要求,从官方的文档查阅,支持有3种不同的集群部署支持方式:
接下来,那我们选择一个最简单的方式“使用脚本部署”来进行安装KWDB 3节点集群方案。
为了提高可用性,降低数据丢失的风险,建议在单台计算机上只运行一个节点。KWDB 采用跨节点复制机制,如果在一台计算机上同时运行多个节点,当计算机发生故障时,更有可能丢失数据。
在安装KWDB集群,先了解一下基本的配置情况,使用裸机安装包部署 KWDB 所需的硬件规格要求和软件要求,部署 KWDB 时,系统将对配置文件、运行环境、硬件配置和软件依赖进行检查:
①. 首先单节点配置的CPU 和内存,建议不低于 4 核 8G,如果对于数据量大、复杂工作负载、高并发和高性能场景,建议配置更高的 CPU 和内存资源以确保系统的高效运行。
②. 操作系统推荐使用Ubuntu版本的。
③. KWDB有一些依赖的要求,如果缺少依赖会退出安装并提示依赖缺失,如OpenSSL、libprotobuf等。
④. KWDB的服务映射端口防止被占用,且没有被防火墙拦截,也可以自定义修改 deploy.cfg 文件中的端口配置参数。
8080:数据库 Web 服务端口
26257:数据库服务端口、节点监听端口和对外连接端口
一般情况下,我们都是使用密码来登陆服务器进行管理,如果如果频繁登录的场景,如自动化脚本或持续集成环境,就可以使用SSH免密登陆,用户无需每次登录时输入密码,节省了时间和精力,同时,最主要的是免密登录可以避免密码在网络中明文传输,减少密码被窃取的风险。
SSH免密登录:
因为测试的集群是3台节点,所以,这里我们先把3台服务器做一个标识,用来区分不同的设备机器,kwdb-db-01、kwdb-db-02、kwdb-db-03。在第一个服务器,登录当前节点,生成公私密钥对,再把生成的密钥分发到集群其它节点:
ssh-keygen -f ~/.ssh/id_rsa -N "" ssh-copy-id -f -i ~/.ssh/id_rsa.pub -o StrictHostKeyChecking=no <target_node>
参数说明:
①. -f ~/.ssh/id_rsa:指定生成的密钥对文件目录。
②. -N:将密钥密码设置为空,以实现免密登录。
确认是否可以使用 SSH 免密登录到集群其它节
ssh <target_node>
NTP时钟同步:
在Linux系统中,时钟同步通常指的是确保系统时间与外部时间源(如互联网上的NTP服务器)同步,以保证系统时间的准确性和一致性。这对于服务器尤其重要,因为它们需要精确的时间来执行各种任务,如日志记录、数据库操作等。
过网络连接到指定的NTP(Network Time Protocol)服务器,获取准确的时间并调整本地系统时钟,KWDB采用中等强度的时钟同步机制来维持数据的一致性:
①. 当节点检测到自身的机器时间与集群中至少 50% 的节点的机器时间的误差值超过集群最大允许时间误差值(默认为 500 ms)的 80% 时,该节点会自动停止。
②. 可以避免违反数据一致性,带来读写旧数据的风险。
③. 每个节点都必须运行 NTP(Network Time Protocol,网络时间协议)或其它时钟同步软件,防止时钟漂移得太远。
# 启用 NTP 服务 sudo timedatectl set-ntp on sudo apt update && sudo apt install ntp timedatectl status # 重启服务 sudo systemctl restart ntp sudo ntpq -p
检查当前系统中是否已安装 libprotobuf 及其版本是否符合要求(3.6.1 及以上版本),安装完成后可以查看 libprotobuf 版本的相关依赖:
sudo apt install -y libprotobuf-dev sudo apt-cache rdepends libprotobuf-dev
wget https://gitee.com/kwdb/kwdb/releases/download/V2.2.0/KWDB-2.2.0-ubuntu22.04-x86_64-debs.tar.gz tar -xzvf KWDB-2.2.0-ubuntu22.04-x86_64-debs.tar.gz vi deploy.cfg # 编辑deploy.cfg 配置文件,设置local本机IP地址、设置cluster服务node_addr以及ssh_user用户等信息 ./deploy.sh install --multi-replica ./deploy.sh cluster -i ./deploy.sh cluster -s
这里我们需要下载对应的安装版本,请注意一下您服务器对应的版本,如果下错了会出现报错(明明依赖安装过了),接下来,需要修改deploy.cfg的文件,这里需要注意是使用的安全模式(默认是tls的,即有一个CA证书),另外,需要修改cluster需要对应的所有服务器节点加进来,最后ssh_user要特别注意,刚开始没注意,结果一直卡在那里,默认是admin,但是我的用户名是root。
以下是kwdb安装脚本的相关的目录结构:
. ├── add_user.sh # 安装、启动 KaiwuDB 后,为 KaiwuDB 数据库创建用户。 ├── deploy.cfg # 安装部署配置文件,用于配置部署节点的 IP 地址、端口等配置信息。 ├── deploy.sh # 安装部署脚本,用于安装、卸载、启动、状态获取、关停和重启等操作。 ├── packages # 存放 DEB、RPM、镜像包和 libprotobuf 包。 │ ├── kwdb-libcommon_2.2.0-ubuntu-22.04_amd64.deb │ └── kwdb-server_2.2.0-ubuntu-22.04_amd64.deb └── utils # 存放工具类脚本。 ├── container_shell.sh ├── kaiwudb_cluster.sh ├── kaiwudb_common.sh ├── kaiwudb_hardware.sh ├── kaiwudb_install.sh ├── kaiwudb_log.sh ├── kaiwudb_operate.sh ├── kaiwudb_uninstall.sh ├── kaiwudb_upgrade.sh ├── process_bar.sh └── utils.sh 2 directories, 16 files
最后,通过命令–multi-replica即可进行多副本集群安装,如果使用命令–single-replica是进行单副本集群安装,过程中会提示需要输入密码,在输入密码等待几分钟后(取决你的网速),当提示“INSTALL COMPLETED: KaiwuDB has been installed successfuly!”时,表示已经安装集群成功了。
接下来,就是初始化并启动集群,输入命令cluster -i,就会进行初始化集群,在几秒后会输出“Cluster init successfully”即表示初始化并启动集群成功,这里有一个小小问题,就是上面我只安装了,但是在写文章,耽误了大概半小时,没有进行初始化集群操作,再进行初始化会提示我KWDB没有安装,需要注意一下。
备注:多副本集群初始化和启动大约需要 10 秒左右时间,在此期间,如果有节点死亡,可能会导致集群无法触发高可用机制。
初始化集群发生的问题:
在进行安装与初始化集群的过程中,我也遇到了不少问题,希望可以给其它同学带来参考的价值。
①. 提示:“lost connection. 9 15:04:05 Distribute files failed in 120.55.76.178:Connection closed by 120.55.76.178 port 22
”
原因:因为以为是第一台kwdb-db-01机器操作,所以,不需要进行设置SSH免登录,结果发现还是需要进行SSH免登录。
解决:执行ssh-copy-id -f -i ~/.ssh/id_rsa.pub -o StrictHostKeyChecking=no <target_node>
②. 提示:“[ERROR] 2025-05-29 23:38:42 install exec failed in 118.178.56.167: dpkg: error: dpkg frontend lock was locked by another process with pid 4343 Note: removing the lock file is always wrong, and can end up damaging the locked area and the entir e system.”
原因:可能存在网络波动,连接超时的原因。
解决:重新进行安装命令“./deploy.sh install --multi-replica”即可正常成功。
③. 提示:“KaiwuDB not install.”
原因:未知,如果再执行命令“./deploy.sh install --multi-replica”会提示已经安装了。
解决:机器重新构建镜像即可,恢复初始化状态。
在集群搭建完成后,我们可以来查看一下第一个节点相关的集群状态信息,使用“systemctl status kaiwudb”来进行查看,会输出很多相关KWDB实例服务相关的信息:
①. 显示绿色的active(running)表示服务正在运行中,服务是正常在使用的,包括服务使用的内存大小,CPU的运行时间。
②. 系统命令kaisudb.service服务,实际上是执行了一条/usr/local/kaiwudb/bin/kwbase这条命令,里面包括一些参数,比如监听服务器的IP与端口,对外通信的节点IP与端口、数据库存储的路径、加入指定IP集群的方式。
③. 还会生成RPC客户端连接的方式,通过/usr/local/kaiwudb/bin/kwbase命令,用来访问数据库控制台的shell命令。
另外,在连接到访问数据库shell控制台后,可以看到基本上操作的命令也是跟平时写的SQL语句是一样的,但是在创建时序数据库的时候,有些区别,需要加一个ts的关键词来指定是时序数据库。
create ts database factory_ts;
在集群搭建完成后,我们也可以来查看一下其它的每个节点相关的集群状态信息,这里我们来进行操作kwdb-db-02和kwdb-db-03来查看一下状态,这里可以看到在kwdb-db-02和kwdb-db-03的服务也是都是active(running)启动起来的,而且各自都有各自的RPC客户端启动命令。
上面创建的时序数据库factory_ts也可以发现已经同步到其它3个KWDB服务节点上了,说明集群已经成功了,而且说明数据也是可以进行同步的。
接下来,我们来一起在时序数据库factory_ts下面创建3张表(在实际的场景中,肯定会更多),时序表(TIME SERIES TABLE表)是用于存储时间序列数据的数据。这里面创建三张时序数据表:测试记录表(test_records)、机器手臂数据(robot_arms)、警告信息(alerts)三张纬度的时序数据表。
①. 创建表时不能使用IF NOT EXISTS语句,会报错,不过,建议还是加上比较好。
②. 时间戳字段,时序数据表对于时间类型的列(TIMESTAMPTZ 或 TIMESTAMP),默认值可以是常量,也可以是 now() 函数。如果默认值类型与列类型不匹配,设置默认值时,系统报错。支持默认值设置为 NULL。
③. 字段也不支持double类型,这里我们只能使用Float类型字段。
④. Tags表示标签列表,支持添加一个或多个标签定义,最多可指定 128 个标签。标签定义包含标签名和数据类型,标签名的最大长度为 128 字节,支持指定 NOT NULL,默认为空值。不支持 TIMESTAMP、TIMESTAMPTZ、NVARCHAR 和 GEOMETRY 数据类型。
⑤. Primary_tags表示主标签列表,支持添加一个或多个主标签名称,最多可指定 4 个。主标签必须包含在标签列表内且指定为 NOT NULL,不支持浮点类型和除 VARCHAR 之外的变长数据类型。
如下所示是插入的时序数据表的SQL语句:
CREATE TABLE factory_ts.test_records (k_TIMESTAMPTZ TIMESTAMPTZ NOT NULL, test_type VARCHAR NOT NULL, test_result VARCHAR NOT NULL, fail_reason VARCHAR NOT NULL, start_time TIMESTAMPTZ NOT NULL, end_time TIMESTAMPTZ NOT NULL, phone_model VARCHAR NOT NULL, imei VARCHAR NOT NULL, battery_level FLOAT NOT NULL, signal_strength FLOAT NOT NULL, test_duration FLOAT NOT NULL, supplier VARCHAR NOT NULL, material_batch VARCHAR NOT NULL, production_line VARCHAR NOT NULL, operator VARCHAR NOT NULL, quality_inspector VARCHAR NOT NULL, mes_order_number VARCHAR NOT NULL, alert_count INT NOT NULL, quality_score FLOAT NOT NULL) TAGS ( batch_number VARCHAR NOT NULL, product_model VARCHAR NOT NULL ) PRIMARY TAGS(batch_number); CREATE TABLE factory_ts.robot_arms ( k_TIMESTAMPTZ TIMESTAMPTZ NOT NULL, availability FLOAT NOT NULL, performance FLOAT NOT NULL, quality FLOAT NOT NULL, oee FLOAT NOT NULL, total_faults INT NOT NULL, fault_rate FLOAT NOT NULL, last_fault_time TIMESTAMPTZ NOT NULL, last_fault_type VARCHAR NOT NULL, operation_speed FLOAT NOT NULL, temperature FLOAT NOT NULL, voltage FLOAT NOT NULL, current FLOAT NOT NULL, last_maintenance_time TIMESTAMPTZ NOT NULL, next_maintenance_time TIMESTAMPTZ NOT NULL, operation_start_time TIMESTAMPTZ NOT NULL, total_runtime FLOAT NOT NULL, status VARCHAR NOT NULL )TAGS( serial_number VARCHAR NOT NULL, model VARCHAR NOT NULL, position VARCHAR NOT NULL ) PRIMARY TAGS(serial_number) ACTIVETIME 30d; CREATE TABLE factory_ts.alerts ( k_TIMESTAMPTZ TIMESTAMPTZ NOT NULL, alert_type VARCHAR NOT NULL, level VARCHAR NOT NULL, title VARCHAR NOT NULL, status VARCHAR NOT NULL, handler VARCHAR NOT NULL, handle_method VARCHAR NOT NULL, handle_time TIMESTAMPTZ NOT NULL, resolve_time TIMESTAMPTZ NOT NULL, parameter_name VARCHAR NOT NULL, current_value FLOAT NOT NULL, threshold_value FLOAT NOT NULL, unit VARCHAR NOT NULL, product_batch VARCHAR NOT NULL, production_line VARCHAR NOT NULL, workstation VARCHAR NOT NULL, operator VARCHAR NOT NULL )TAGS( source VARCHAR NOT NULL, category VARCHAR NOT NULL, notify_group VARCHAR NOT NULL ) PRIMARY TAGS(source) ACTIVETIME 30d;
KWDB 为不同角色开发者提供以下支持(包括但不限于):
①. 为开发者提供通用连接接口,具备高速写入、极速查询、SQL 支持、随需压缩、数据生命周期管理、集群部署等特性,与第三方工具无缝集成,降低开发及学习难度,提升开发使用效率。
②. 为运维管理人员提供快速安装部署、升级、迁移、监控等能力,降低数据库运维管理成本。
从官方的文档来看,pgx 是用 Go 语言编写的 PostgreSQL 驱动和工具包,提供了高性能的低级接口,支持用户直接利用 PostgreSQL 的特性。pgx 还包含一个适配器,与标准的数据库或 SQL 接口兼容,方便开发者进行数据库操作。注意:需要安装 Go 1.16 及以上版本。
KWDB 支持用户通过 pgx 驱动连接数据库,并执行创建、插入和查询操作。下面演示了如何使用 Go 语言通过 pgx 驱动连接 KWDB。
go env -w GOPROXY=https://goproxy.cn go mod tidy go get github.com/jackc/pgx/v5
接下来,通过使用代码编写3张表的数据生成的代码,可以进行批量插入的动作。
package main import ( "context" "fmt" "log" "time" "github.com/jackc/pgx/v5" ) func main() { // 使用账号密码连接 url := fmt.Sprintf("postgresql://%s@%s/%s?sslmode=verify-full&sslrootcert=%s&sslcert=%s&sslkey=%s", "root", "120.55.76.178:26257", "factory_ts", "./certs/ca.crt", "./certs/client.root.crt", "./certs/client.root.key") config, err := pgx.ParseConfig(url) if err != nil { log.Fatalf("error parsing connection configuration: %v", err) } config.RuntimeParams["application_name"] = "factory_timeseries" conn, err := pgx.ConnectConfig(context.Background(), config) if err != nil { log.Fatalf("error connecting to database: %v", err) } defer conn.Close(context.Background()) // 批量插入告警数据 _, err = conn.exec(context.Background(), ` insert INTO factory_ts.alerts VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21), ($22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42); `, // 第一条告警记录 time.Now(), "温度异常", "高", "设备温度过高", "设备A温度超过阈值", "未处理", "张工", "停机检查", time.Now(), time.Now().Add(time.Hour), "温度", 85.5, 80.0, "℃", "BATCH001", "生产线1", "工位A", "王工", "设备A", "设备告警", "运维组", // 第二条告警记录 time.Now(), "电压波动", "中", "电压不稳定", "设备B电压波动较大", "已处理", "李工", "调整电源", time.Now(), time.Now().Add(time.Hour), "电压", 235.0, 220.0, "V", "BATCH002", "生产线2", "工位B", "赵工", "设备B", "电气告警", "电气组", ) if err != nil { log.Printf("error inserting alert data: %v\n", err) } else { fmt.Println("告警数据插入成功!") } // 批量插入机器手臂数据 _, err = conn.exec(context.Background(), ` insert INTO factory_ts.robot_arms VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21), ($22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42); `, // 第一个机器手臂数据 time.Now(), 95.5, 92.0, 98.0, 85.5, 2, 0.5, time.Now().Add(-24*time.Hour), "轻微故障", 60.0, 36.5, 220.0, 10.0, time.Now().Add(-7*24*time.Hour), time.Now().Add(7*24*time.Hour), time.Now().Add(-8*time.Hour), 2000.5, "运行中", "RA001", "A型", "生产线1", // 第二个机器手臂数据 time.Now(), 88.0, 90.0, 95.0, 75.2, 5, 1.2, time.Now().Add(-12*time.Hour), "通信故障", 45.0, 38.0, 218.0, 12.0, time.Now().Add(-14*24*time.Hour), time.Now().Add(14*24*time.Hour), time.Now().Add(-4*time.Hour), 1500.8, "待维护", "RA002", "B型", "生产线2", ) if err != nil { log.Printf("error inserting robot arm data: %v\n", err) } else { fmt.Println("机器手臂数据插入成功!") } // 批量插入测试记录数据 _, err = conn.exec(context.Background(), ` insert INTO factory_ts.test_records VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21), ($22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42); `, // 第一条测试记录 time.Now(), "性能测试", "通过", "", time.Now().Add(-time.Hour), time.Now(), "iPhone 14", "123456789", 95.0, 85.0, 3600.0, "供应商A", "MAT001", "生产线1", "张三", "李四", "MES001", 0, 98.5, "BATCH001", "产品型号A", // 第二条测试记录 time.Now(), "稳定性测试", "失败", "信号不稳定", time.Now().Add(-2*time.Hour), time.Now().Add(-time.Hour), "iPhone 13", "987654321", 88.0, 72.0, 7200.0, "供应商B", "MAT002", "生产线2", "王五", "赵六", "MES002", 2, 85.0, "BATCH002", "产品型号B", ) if err != nil { log.Printf("error inserting test record data: %v\n", err) } else { fmt.Println("测试记录数据插入成功!") } }
执行上面的代码,结果发现报错,提示“insert (row 2) has more expressions than target columns, 42 expressions for 20 targets (SQLSTATE 42601)”,执行报错后,发现shell控制台也是报错了,提示“write:broken pipe”。
然后,我查看第一台kwdb-db-01服务节点的状态,发现从active(running)变成了failed(code=exited,status=2),然后,输出了一段go相关的报错信息,但是只能看到一少部分信息,直接报错就终止服务了。
接着,只能查找更详细的日志信息了,由于也是第一次接触,没有找到像常见软件在的日志目录中,后面在官方文档中找到了日志系统解读,感兴趣的同学可以阅读一下。
在/var/lib/kaiwudb/logs日志目录文件下,我们找到了相关日志,如下为详细的报错信息““insert INTO factory_ts.public.robot_arms DEFAULT VALUES”: runtime error: index out of range [21] with length 0”,官方人员可以针对以下报错看看能不能修改这个bug。
E250529 08:53:32.295737 81461 sql/conn_executor.go:969 [n1,client=114.255.155.2:64850,hostssl,user=root] a SQL panic has occurred while executing "insert INTO factory_ts.public.robot_arms DEFAULT VALUES": runtime error: index out of range [21] with length 0 E250529 08:53:32.295877 81461 util/log/crash_reporting.go:222 [n1,client=114.255.155.2:64850,hostssl,user=root] a panic has occurred! panic: runtime error: index out of range [21] with length 0 [recovered] panic: panic while executing 1 statements: insert INTO _._._ DEFAULT VALUES; caused by runtime error: index out of range [21] with length 0 goroutine 81461 [running]: gitee.com/kwbasedb/kwbase/pkg/sql.(*connExecutor).closeWrapper(0xc004ee8000, 0x4cef3b0, 0xc0051b9400, 0x421c6c0, 0xc005548990) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/sql/conn_executor.go:983 +0x412 gitee.com/kwbasedb/kwbase/pkg/sql.(*Server).ServeConn.func1(0xc004ee8000, 0x4cef3b0, 0xc0051b9400) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/sql/conn_executor.go:606 +0x65 panic(0x421c6c0, 0xc005548990) /usr/local/go/src/runtime/panic.go:965 +0x1b9 gitee.com/kwbasedb/kwbase/pkg/sql/sem/tree.(*PlaceholderTypesInfo).ValueType(...) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/sql/sem/tree/placeholders.go:128 gitee.com/kwbasedb/kwbase/pkg/sql.(*connExecutor).execPrepare(0xc004ee8000, 0x4cef458, 0xc006286240, 0xc004344174, 0x3a, 0x4d1af98, 0xc006bffd50, 0xc0043441b2, 0xf8, 0x2a, ...) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/sql/conn_executor_prepare.go:130 +0xb5e gitee.com/kwbasedb/kwbase/pkg/sql.(*connExecutor).execCmd(0xc004ee8000, 0x4cef3b0, 0xc0051b9400, 0x0, 0x0) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/sql/conn_executor.go:1701 +0x2d98 gitee.com/kwbasedb/kwbase/pkg/sql.(*connExecutor).run(0xc004ee8000, 0x4cef3b0, 0xc0051b9400, 0xc000296888, 0x5400, 0x15000, 0xc000296920, 0xc007053330, 0x0, 0x0) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/sql/conn_executor.go:1501 +0x1fc gitee.com/kwbasedb/kwbase/pkg/sql.(*Server).ServeConn(0xc00078cb00, 0x4cef3b0, 0xc0051b9400, 0xc004ee8000, 0x5400, 0x15000, 0xc000296920, 0xc007053330, 0x0, 0x0) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/sql/conn_executor.go:608 +0xce gitee.com/kwbasedb/kwbase/pkg/sql/pgwire.(*conn).processCommandsAsync.func1(0xc000357b10, 0xc004db8318, 0x4cef3b0, 0xc0051b9400, 0xc007053330, 0xc00078cb00, 0xc0042dd800, 0x4d08a98, 0xc0055a2000, 0xc0041fdaa0, ...) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/sql/pgwire/conn.go:750 +0x505 created by gitee.com/kwbasedb/kwbase/pkg/sql/pgwire.(*conn).processCommandsAsync /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/sql/pgwire/conn.go:664 +0x17e
上面原因找出来了,就是数据库是20个字段,结果我们插入的代码是21个,所以会报错。通过修复这个bug的问题,就可以得到如下一直循环的代码:
package main import ( "context" "fmt" "log" "math/rand" "time" "github.com/jackc/pgx/v5" ) // 生成随机时间 func randomTime() time.Time { now := time.Now() minusHours := rand.Float64() * 24 * 7 // 随机生成过去7天内的时间 return now.Add(-time.Duration(minusHours) * time.Hour) } // 生成随机浮点数 func randomFloat(min, max float64) float64 { return min + rand.Float64()*(max-min) } // 生成随机整数 func randomInt(min, max int) int { return min + rand.Intn(max-min+1) } // 从切片中随机选择一个元素 func randomChoice(items []string) string { return items[rand.Intn(len(items))] } // 生成随机字符串 func randomString(prefix string, length int) string { const charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" result := make([]byte, length) for i := range result { result[i] = charset[rand.Intn(len(charset))] } return prefix + string(result) } func main() { // 初始化随机数生成器 rand.Seed(time.Now().UnixNano()) // 使用账号密码连接 url := fmt.Sprintf("postgresql://%s@%s/%s?sslmode=verify-full&sslrootcert=%s&sslcert=%s&sslkey=%s", "root", "47.110.144.145:26257", "defaultdb", "./certs/ca.crt", "./certs/client.root.crt", "./certs/client.root.key") config, err := pgx.ParseConfig(url) if err != nil { log.Fatalf("error parsing connection configuration: %v", err) } config.RuntimeParams["application_name"] = "factory_timeseries" conn, err := pgx.ConnectConfig(context.Background(), config) if err != nil { log.Fatalf("error connecting to database: %v", err) } defer conn.Close(context.Background()) for { // 预定义一些随机选择项 alertLevels := []string{"低", "中", "高", "紧急"} alertStatus := []string{"未处理", "处理中", "已解决", "已关闭"} handleMethods := []string{"远程处理", "现场处理", "停机检查", "更换零件"} productionLines := []string{"生产线1", "生产线2", "生产线3", "生产线4"} workstations := []string{"工位A", "工位B", "工位C", "工位D"} notifyGroups := []string{"运维组", "工程组", "质检组", "管理组"} faultTypes := []string{"轻微故障", "传感器异常", "电机过热", "通信中断", "校准偏差"} testTypes := []string{"性能测试", "稳定性测试", "压力测试", "功能测试"} testResults := []string{"通过", "未通过", "待复检"} failReasons := []string{"", "性能不达标", "稳定性差", "参数超限"} phoneModels := []string{"iPhone 14", "iPhone 14 Pro", "iPhone 15", "iPhone 15 Pro"} suppliers := []string{"供应商A", "供应商B", "供应商C", "供应商D"} // 插入告警数据 _, err = conn.exec(context.Background(), ` insert INTO factory_ts.alerts (k_timestamptz, alert_type, level, title, status, handler, handle_method, handle_time, resolve_time, parameter_name, current_value, threshold_value, unit, product_batch, production_line, workstation, operator, source, category, notify_group) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20); `, randomTime().Format("2006-01-02 15:04:05.000Z07:00"), "设备告警", randomChoice(alertLevels), "设备温度异常", randomChoice(alertStatus), randomString("OP", 4), randomChoice(handleMethods), randomTime().Format("2006-01-02 15:04:05.000Z07:00"), randomTime().Format("2006-01-02 15:04:05.000Z07:00"), "温度", randomFloat(60, 90), 80.0, "℃", randomString("BATCH", 4), randomChoice(productionLines), randomChoice(workstations), randomString("WK", 4), "温度异常", "设备温度过高", randomChoice(notifyGroups), ) if err != nil { log.Printf("error inserting alert data: %v\n", err) } else { fmt.Println("告警数据插入成功!") } // 插入机器手臂数据 _, err = conn.exec(context.Background(), ` insert INTO factory_ts.robot_arms (k_timestamptz, availability, performance, quality, oee, total_faults, fault_rate, last_fault_time, last_fault_type, operation_speed, temperature, voltage, current, last_maintenance_time, next_maintenance_time, operation_start_time, total_runtime, status, serial_number, model, position) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21); `, randomTime().Format("2006-01-02 15:04:05.000Z07:00"), randomFloat(90, 100), randomFloat(85, 100), randomFloat(90, 100), randomFloat(80, 95), randomInt(0, 5), randomFloat(0, 2), randomTime().Format("2006-01-02 15:04:05.000Z07:00"), randomChoice(faultTypes), randomFloat(50, 70), randomFloat(30, 40), randomFloat(210, 230), randomFloat(8, 12), randomTime().Format("2006-01-02 15:04:05.000Z07:00"), randomTime().Add(30*24*time.Hour).Format("2006-01-02 15:04:05.000Z07:00"), randomTime().Format("2006-01-02 15:04:05.000Z07:00"), randomFloat(1000, 3000), "运行中", randomString("RA", 4), randomString("MODEL", 2), randomChoice(productionLines), ) if err != nil { log.Printf("error inserting robot arm data: %v\n", err) } else { fmt.Println("机器手臂数据插入成功!") } // 插入测试记录数据 _, err = conn.exec(context.Background(), ` insert INTO factory_ts.test_records (k_timestamptz, test_type, test_result, fail_reason, start_time, end_time, phone_model, imei, battery_level, signal_strength, test_duration, supplier, material_batch, production_line, operator, quality_inspector, mes_order_number, alert_count, quality_score, batch_number, product_model) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21); `, randomTime().Format("2006-01-02 15:04:05.000Z07:00"), randomChoice(testTypes), randomChoice(testResults), randomChoice(failReasons), randomTime().Format("2006-01-02 15:04:05.000Z07:00"), randomTime().Format("2006-01-02 15:04:05.000Z07:00"), randomChoice(phoneModels), randomString("", 15), randomFloat(80, 100), randomFloat(70, 100), randomFloat(1800, 7200), randomChoice(suppliers), randomString("MAT", 4), randomChoice(productionLines), randomString("OP", 4), randomString("QA", 4), randomString("MES", 6), randomInt(0, 5), randomFloat(90, 100), randomString("BATCH", 4), randomString("PM", 4), ) if err != nil { log.Printf("error inserting test record data: %v\n", err) } else { fmt.Println("测试记录数据插入成功!") } } }
可以看到这里在代码修复后,可以往KWDB数据库中写入数据也成功了,在查看其它节点02和节点03时,发现数据也是可以进行同步过来的,都显示38722条数据。
KaiwuDB内置监控平台,但是KWDB是浪潮KaiwuDB的开源版本哈,目前KWDB已经捐赠给开放原子开源基金会咯,是基金会的孵化项目,所以部署 KWDB 集群后,需要自己通过实际需求选择另一种监控方案,集成Prometheus 和 Grafana 开源组件监控集群状态。
Prometheus 是一款开源的系统监控和告警平台,用于采集和存储 KWDB 集群的监控和性能指标信息。Grafana 是一款开源的数据可视化工具,可以从多种数据源获取数据,并在数据面板中展示所有数据。Grafana 读取 KWDB 集群的指标数据,以可视化方式展示数据库的集群节点状态、监控指标。
KWDB 使用 Prometheus 采集和存储 KWDB 集群的监控和性能指标信息,使用 Grafana 作为可视化组件进行展示,接下来介绍如何部署 Prometheus 和 Grafana。
部署 Prometheus:
wget https://github.com/prometheus/prometheus/releases/download/v2.53.4/prometheus-2.53.4.linux-amd64.tar.gz tar -zxvf prometheus-2.53.4.linux-amd64.tar.gz
在prometheus-2.53.0.linux-amd64 目录下创建 rules 子目录,再下载 Prometheus 告警规则和聚合规则配置文件并将其放置在 rules 子目录。
KWDB 在 monitoring/rules 目录下提供 alerts.rules.yml 和 aggregation.rules.yml 文件的Prometheus 告警规则和 Prometheus 聚合规则。
①. alerts.rules.yml:告警规则配置文件。
②. aggregation.rules.yml:聚合规则配置文件。
cd prometheus-2.53.0.linux-amd64 && vi prometheus.yml
以下是配置文件示例,可以根据实际部署情况,调整配置参数及取值。
# Prometheus configuration for kaiwudb clusters. # Requires prometheus 2.X # # Run with: # $ prometheus -config.file=prometheus.yml global: scrape_interval: 10s evaluation_interval: 10s rule_files: - "rules/alerts.rules.yml" - "rules/aggregation.rules.yml" scrape_configs: - job_name: 'kaiwudb' metrics_path: '/_status/vars' # Insecure mode: scheme: 'http' # Secure mode: # scheme: 'https' tls_config: insecure_skip_verify: true static_configs: - targets: ['118.178.131.120:8080', '47.110.254.183:8080', '47.110.144.145:8080'] labels: cluster: 'my-kaiwudb-cluster'
启动 Prometheus 服务,默认情况下,Prometheus 的启动端口是 9090:
./prometheus --config.file=prometheus.yml
通过IP:9090端口即可访问prometheus的web地址。
部署 Grafana:
下载 Grafana 安装包并解压缩到本地目录,以下示例下载 Grafana v11.1.0 安装包
wget https://dl.grafana.com/enterprise/release/grafana-enterprise-11.1.0.linux-amd64.tar.gz tar -zxvf grafana-enterprise-11.1.0.linux-amd64.tar.gz # 启动 Grafana 服务 cd grafana-v11.1.0/bin && ./grafana-server
此时,通过ip:3000端口访问grafana的web服务地址。
配置 Grafana以及面板:
登录 Grafana控制台,进行添加 Prometheus 数据源,默认情况下,Grafana 的登录地址是 http://IP:3000。用户可以使用默认的用户名和密码(均为 admin)登录 Grafana。
在 Grafana 左侧边栏,单击 Connections > Data sources,在 Data sources 窗口,单击 Add data source,然后选择 Prometheus,配置完Prometheus Server 的 IP 地址,单击 Save & test,保存 Prometheus 数据源。
登录 Grafana控制台,进行添加 Prometheus 数据源,默认情况下,Grafana 的登录地址是 http://IP:3000。用户可以使用默认的用户名和密码(均为 admin)登录 Grafana。
在 Grafana 左侧边栏,单击 Connections > Data sources,在 Data sources 窗口,单击 Add data source,然后选择 Prometheus,配置完Prometheus Server 的 IP 地址,单击 Save & test,保存 Prometheus 数据源。
导入 Grafana 面板,默认情况下,KWDB 在 monitoring/grafana-dashboards 目录下提供以下指标面板模板。用户将指标面板模板(.json 格式)导入 Grafana 后,即可监控 KWDB 集群。
①. 概览:展示集群和节点的关键指标 - KaiwuDB_Console_Overview.json
②. 硬件:展示硬件相关的监控指标 - KaiwuDB_Console_Hardware.json
③. 运行时:展示运行时相关的监控指标 - KaiwuDB_Console_Runtime.json
④. SQL:展示 SQL 相关的监控指标 - KaiwuDB_Console_SQL.json
⑤. 存储:展示存储相关的监控指标 - KaiwuDB_Console_Storage.json
⑥. 副本:展示副本相关的监控指标 - KaiwuDB_Console_Replication.json
⑦. 分布式:展示分布式相关的监控指标 - KaiwuDB_Console_Distribution.json
⑧. 队列:展示队列相关的监控指标 - KaiwuDB_Console_Queue.json
⑨. 慢查询:展示慢查询相关的监控指标 - KaiwuDB_Console_Slow_Query.json
以下为三个表(test_records、robot_arms、alerts时序数据表)同时插入语句,但是是顺序同步执行的,可以看到从100条数据到2000条数据,不同的批量插入方式存在不同的时间差,经过对比可以发现:
①. 这里并没有使用到地址池。
②. 可以发现在批量插入时,由于组装的数据量越少的表,在批量2000时插入的时间越短,但是组装数据量越多的表,在批量100条时插入时间越短。
接下来,修改脚本,将同步改为异步的情况,就是增加三个goroutine可以真正并行执行:
①. 告警数据插入goroutine
②. 机器手臂数据插入goroutine
③. 测试记录数据插入goroutine
每个goroutine都会立即启动,互不干扰,同时在各自的循环中通过 time.Sleep 来控制数据插入的频率。这样可以确保三个数据插入任务并行执行,提高数据生成的效率。
接下来使用三台机器同时进行数据的写入,但是这里不是三台机器同时往一台机器进行写入数据,而是分不同的节点,刚好对应开始写入数据,刚好测试一下,在集群的情况下,数据一致性准不准,但是这个也跟网速有一定的关系。
可以从上面的结果看出,基本上数据同步也是比较快的,误差平均在100以内,而且还有可能是没有同时执行的情况下,最终结果的误差可能会更小,这里需要注意如果在部署集群的话,使用的是云厂商的服务器,最好在同一个地区,这样网络延迟会小一点。
接下来,我们使用10个SQL查询的语句来进行KWDB时序数据库的试一下查询效率的测试,以下是10个SQL语句相关查询的内容:
以下是相关查询的代码:
package main import ( "context" "fmt" "log" "time" "github.com/jackc/pgx/v5" ) func main() { ctx := context.Background() conn, err := createDBConnection() if err != nil { log.Fatalf("连接数据库失败: %v", err) } defer conn.Close(ctx) // 执行所有查询 queryFuncs := []func(context.Context, *pgx.Conn) error{ queryTestRecordsTop10, queryRobotArmFaults, queryAlertsByProductionLine, queryTestQualityDistribution, queryRobotArmPerformance, queryHighPriorityAlerts, queryTestFailureAnalysis, queryRobotArmMaintenance, queryAlertHandlingEfficiency, queryTestTimeDistribution, } for _, fn := range queryFuncs { fmt.Println("\n----------------------------------------") if err := fn(ctx, conn); err != nil { log.Printf("查询执行失败: %v\n", err) } } } // createDBConnection 创建数据库连接 func createDBConnection() (*pgx.Conn, error) { url := fmt.Sprintf("postgresql://%s@%s/%s?sslmode=verify-full&sslrootcert=%s&sslcert=%s&sslkey=%s", "root", "118.178.131.120:26257", "defaultdb", "./certs/ca.crt", "./certs/client.root.crt", "./certs/client.root.key") config, err := pgx.ParseConfig(url) if err != nil { return nil, fmt.Errorf("error parsing connection configuration: %v", err) } config.RuntimeParams["application_name"] = "factory_timeseries" conn, err := pgx.ConnectConfig(context.Background(), config) if err != nil { return nil, fmt.Errorf("error connecting to database: %v", err) } return conn, nil } // 查询各生产线最近24小时的测试记录数量TOP10 func queryTestRecordsTop10(ctx context.Context, conn *pgx.Conn) error { query := ` select production_line, COUNT(*) as test_count, AVG(quality_score) as avg_quality FROM factory_ts.test_records WHERE k_timestamptz >= NOW() - INTERVAL '24 hours' GROUP BY production_line ORDER BY test_count DESC LIMIT 10; ` rows, err := conn.Query(ctx, query) if err != nil { return fmt.Errorf("执行查询失败: %v", err) } defer rows.Close() fmt.Println("生产线测试记录统计(24小时):") fmt.Printf("%-15s %-10s %-10s\n", "生产线", "测试数量", "平均质量分") for rows.Next() { var line string var count int var quality float64 if err := rows.Scan(&line, &count, &quality); err != nil { return fmt.Errorf("读取数据失败: %v", err) } fmt.Printf("%-15s %-10d %-10.2f\n", line, count, quality) } return nil } // 查询不同故障类型的机器手臂数量统计 func queryRobotArmFaults(ctx context.Context, conn *pgx.Conn) error { query := ` select last_fault_type, COUNT(*) as fault_count, AVG(fault_rate) as avg_fault_rate FROM factory_ts.robot_arms WHERE last_fault_type IS NOT NULL GROUP BY last_fault_type ORDER BY fault_count DESC; ` rows, err := conn.Query(ctx, query) if err != nil { return fmt.Errorf("执行查询失败: %v", err) } defer rows.Close() fmt.Println("机器手臂故障类型统计:") fmt.Printf("%-20s %-10s %-15s\n", "故障类型", "故障数量", "平均故障率(%)") for rows.Next() { var faultType string var count int var rate float64 if err := rows.Scan(&faultType, &count, &rate); err != nil { return fmt.Errorf("读取数据失败: %v", err) } fmt.Printf("%-20s %-10d %-15.2f\n", faultType, count, rate*100) } return nil } // 查询各生产线的告警统计 func queryAlertsByProductionLine(ctx context.Context, conn *pgx.Conn) error { query := ` select production_line, COUNT(*) as alert_count, COUNT(DISTINCT alert_type) as alert_types, AVG(CASE WHEN status = '已解决' THEN 1 ELSE 0 END) as resolve_rate FROM factory_ts.alerts GROUP BY production_line ORDER BY alert_count DESC; ` rows, err := conn.Query(ctx, query) if err != nil { return fmt.Errorf("执行查询失败: %v", err) } defer rows.Close() fmt.Println("生产线告警统计:") fmt.Printf("%-15s %-10s %-10s %-15s\n", "生产线", "告警数量", "告警类型", "解决率(%)") for rows.Next() { var line string var count, types int var resolveRate float64 if err := rows.Scan(&line, &count, &types, &resolveRate); err != nil { return fmt.Errorf("读取数据失败: %v", err) } fmt.Printf("%-15s %-10d %-10d %-15.2f\n", line, count, types, resolveRate*100) } return nil } // 查询测试记录的质量分布 func queryTestQualityDistribution(ctx context.Context, conn *pgx.Conn) error { query := ` select WIDTH_BUCKET(quality_score, 0, 100, 10) as score_range, COUNT(*) as count, MIN(quality_score) as min_score, MAX(quality_score) as max_score FROM factory_ts.test_records GROUP BY score_range ORDER BY score_range; ` rows, err := conn.Query(ctx, query) if err != nil { return fmt.Errorf("执行查询失败: %v", err) } defer rows.Close() fmt.Println("测试质量分数分布:") fmt.Printf("%-15s %-10s %-10s %-10s\n", "分数区间", "数量", "最低分", "最高分") for rows.Next() { var range_, count int var min, max float64 if err := rows.Scan(&range_, &count, &min, &max); err != nil { return fmt.Errorf("读取数据失败: %v", err) } fmt.Printf("%d-%-12d %-10d %-10.2f %-10.2f\n", (range_-1)*10, range_*10, count, min, max) } return nil } // 查询机器手臂性能指标 func queryRobotArmPerformance(ctx context.Context, conn *pgx.Conn) error { query := ` select position, ROUND(AVG(availability)::numeric, 2) as avg_availability, ROUND(AVG(performance)::numeric, 2) as avg_performance, ROUND(AVG(quality)::numeric, 2) as avg_quality, ROUND(AVG(oee)::numeric, 2) as avg_oee FROM factory_ts.robot_arms GROUP BY position ORDER BY avg_oee DESC; ` rows, err := conn.Query(ctx, query) if err != nil { return fmt.Errorf("执行查询失败: %v", err) } defer rows.Close() fmt.Println("机器手臂性能指标:") fmt.Printf("%-15s %-10s %-10s %-10s %-10s\n", "位置", "可用率", "性能", "质量", "OEE") for rows.Next() { var pos string var avail, perf, qual, oee float64 if err := rows.Scan(&pos, &avail, &perf, &qual, &oee); err != nil { return fmt.Errorf("读取数据失败: %v", err) } fmt.Printf("%-15s %-10.2f %-10.2f %-10.2f %-10.2f\n", pos, avail, perf, qual, oee) } return nil } // 查询未处理的高优先级告警 func queryHighPriorityAlerts(ctx context.Context, conn *pgx.Conn) error { query := ` select alert_type, level, title, production_line, workstation, k_timestamptz FROM factory_ts.alerts WHERE status = '未处理' AND level IN ('高', '紧急') ORDER BY CASE level WHEN '紧急' THEN 1 WHEN '高' THEN 2 END, k_timestamptz DESC; ` rows, err := conn.Query(ctx, query) if err != nil { return fmt.Errorf("执行查询失败: %v", err) } defer rows.Close() fmt.Println("未处理的高优先级告警:") fmt.Printf("%-15s %-6s %-20s %-15s %-10s %-20s\n", "类型", "级别", "标题", "生产线", "工位", "时间") for rows.Next() { var alertType, level, title, line, station string var ts time.Time if err := rows.Scan(&alertType, &level, &title, &line, &station, &ts); err != nil { return fmt.Errorf("读取数据失败: %v", err) } fmt.Printf("%-15s %-6s %-20s %-15s %-10s %-20s\n", alertType, level, title, line, station, ts.Format("2006-01-02 15:04:05")) } return nil } // 查询测试记录失败原因分析 func queryTestFailureAnalysis(ctx context.Context, conn *pgx.Conn) error { query := ` select fail_reason, COUNT(*) as failure_count, ROUND(AVG(quality_score)::numeric, 2) as avg_quality, COUNT(DISTINCT phone_model) as affected_models FROM factory_ts.test_records WHERE test_result = '未通过' AND fail_reason IS NOT NULL GROUP BY fail_reason ORDER BY failure_count DESC; ` rows, err := conn.Query(ctx, query) if err != nil { return fmt.Errorf("执行查询失败: %v", err) } defer rows.Close() fmt.Println("测试失败原因分析:") fmt.Printf("%-20s %-10s %-15s %-15s\n", "失败原因", "失败次数", "平均质量分", "影响机型数") for rows.Next() { var reason string var count, models int var quality float64 if err := rows.Scan(&reason, &count, &quality, &models); err != nil { return fmt.Errorf("读取数据失败: %v", err) } fmt.Printf("%-20s %-10d %-15.2f %-15d\n", reason, count, quality, models) } return nil } // 查询机器手臂维护计划 func queryRobotArmMaintenance(ctx context.Context, conn *pgx.Conn) error { query := ` select serial_number, model, position, total_runtime, last_maintenance_time, next_maintenance_time, total_faults FROM factory_ts.robot_arms WHERE next_maintenance_time <= NOW() + INTERVAL '7 days' ORDER BY next_maintenance_time; ` rows, err := conn.Query(ctx, query) if err != nil { return fmt.Errorf("执行查询失败: %v", err) } defer rows.Close() fmt.Println("即将需要维护的机器手臂:") fmt.Printf("%-15s %-10s %-10s %-15s %-20s %-20s %-10s\n", "序列号", "型号", "位置", "运行时间(h)", "上次维护", "下次维护", "故障数") for rows.Next() { var sn, model, pos string var runtime, faults float64 var lastMaint, nextMaint time.Time if err := rows.Scan(&sn, &model, &pos, &runtime, &lastMaint, &nextMaint, &faults); err != nil { return fmt.Errorf("读取数据失败: %v", err) } fmt.Printf("%-15s %-10s %-10s %-15.2f %-20s %-20s %-10.0f\n", sn, model, pos, runtime, lastMaint.Format("2006-01-02 15:04"), nextMaint.Format("2006-01-02 15:04"), faults) } return nil } // 查询告警处理效率分析 func queryAlertHandlingEfficiency(ctx context.Context, conn *pgx.Conn) error { query := ` WITH alert_times AS ( select alert_type, level, handler, handle_method, EXTRACT(EPOCH FROM (handle_time - k_timestamptz)) as response_time, EXTRACT(EPOCH FROM (resolve_time - handle_time)) as resolve_time FROM factory_ts.alerts WHERE status = '已解决' AND handle_time IS NOT NULL AND resolve_time IS NOT NULL ) select alert_type, level, COUNT(*) as alert_count, ROUND(AVG(response_time/60)::numeric, 2) as avg_response_minutes, ROUND(AVG(resolve_time/60)::numeric, 2) as avg_resolve_minutes FROM alert_times GROUP BY alert_type, level ORDER BY level DESC, avg_response_minutes DESC; ` rows, err := conn.Query(ctx, query) if err != nil { return fmt.Errorf("执行查询失败: %v", err) } defer rows.Close() fmt.Println("告警处理效率分析:") fmt.Printf("%-15s %-6s %-10s %-20s %-20s\n", "告警类型", "级别", "数量", "平均响应时间(分)", "平均解决时间(分)") for rows.Next() { var alertType, level string var count int var respTime, resolveTime float64 if err := rows.Scan(&alertType, &level, &count, &respTime, &resolveTime); err != nil { return fmt.Errorf("读取数据失败: %v", err) } fmt.Printf("%-15s %-6s %-10d %-20.2f %-20.2f\n", alertType, level, count, respTime, resolveTime) } return nil } // 查询测试记录的时间分布 func queryTestTimeDistribution(ctx context.Context, conn *pgx.Conn) error { query := ` select EXTRACT(HOUR FROM k_timestamptz) as hour, COUNT(*) as test_count, ROUND(AVG(quality_score)::numeric, 2) as avg_quality, ROUND(AVG(test_duration)::numeric, 2) as avg_duration FROM factory_ts.test_records WHERE k_timestamptz >= NOW() - INTERVAL '10 minutes' GROUP BY hour ORDER BY hour; ` rows, err := conn.Query(ctx, query) if err != nil { return fmt.Errorf("执行查询失败: %v", err) } defer rows.Close() fmt.Println("24小时测试记录分布:") fmt.Printf("%-6s %-10s %-15s %-15s\n", "小时", "测试数", "平均质量分", "平均耗时(秒)") for rows.Next() { var hour float64 var count int var quality, duration float64 if err := rows.Scan(&hour, &count, &quality, &duration); err != nil { return fmt.Errorf("读取数据失败: %v", err) } fmt.Printf("%02.0f:00 %-10d %-15.2f %-15.2f\n", hour, count, quality, duration) } return nil }
由于篇幅关系,这里只描述其中的一个查询场景,以下是在近270w的数据中查询相关的数据,可以看到,只需要3s左右就可以查询出来,
非常的快速,但是这种SQL在MySQL中进行了对比,需要10几秒才能查出来,查询的性能还是非常的不错。
Grafana 支持查看 KaiwuDB 集群及各个节点的监控指标,包括指标概览、硬件指标、运行指标、SQL 指标、存储指标、副本指标、分布式指标、队列指标和慢查询指标。
在工业4.0和智能制造快速发展的背景下,传统关系型数据库已难以应对海量时序数据的存储与处理需求。KWDB作为专为物联网和工业互联网设计的分布式大数据平台,通过高性能时序数据库核心模块,有效解决了TB/PB级数据的实时汇聚、存储分析和智能预测等关键问题。该平台融合了关系型和时序数据库优势,支持云原生部署,具备集群扩展能力,特别适用于工厂自动化等IoT场景。
相比MySQL在千万级数据量时出现的性能瓶颈、同步延迟等问题,KWDB通过分布式架构和AI集成,不仅提升了数据处理效率,还实现了实时商业洞察,显著降低了企业的研发和运维成本,为工业数字化转型提供了可靠的数据基础设施支撑。