文章链接:【KWDB 创作者计划】_Ubuntu 22.04系统KWDB数据库安装部署使用教程_案例传感器
作者:度假的小鱼
KWDB 是由开放原子开源基金会孵化及运营的开源项目,是一款面向 AIoT 场景的分布式多模数据库产品,支持在同一实例同时建立时序库和关系库并融合处理多模数据,具备千万级设备接入、百万级数据秒级写入、亿级数据秒级读取等时序数据高效处理能力,具有稳定安全、高可用、易运维等特点。

KWDB 基于浪潮 KaiwuDB 分布式多模数据库研发开源,典型应用场景包括但不限于物联网、能源电力、交通车联网、智慧政务、IT 运维、金融证券等,旨在为各行业领域提供一站式数据存储、管理与分析的基座,助力企业数智化建设,以更低的成本挖掘更大的数据价值。
KWDB 为不同角色开发者提供以下支持(包括但不限于):
为开发者提供通用连接接口,具备高速写入、极速查询、SQL 支持、随需压缩、数据生命周期管理、集群部署等特性,与第三方工具无缝集成,降低开发及学习难度,提升开发使用效率。
为运维管理人员提供快速安装部署、升级、迁移、监控等能力,降低数据库运维管理成本。
关键词:物联网(IoT)、多模数据库、分布式、时序数据处理、云边端协同
操作系统:支持 Ubuntu 20.04/22.04 LTS 版本、CentOS 7.x/8.x 版本、龙蜥(Anolis OS)8.x 版本。
硬件
内存:最低 4GB,生产环境推荐 8GB 以上。
存储:至少 50GB 可用空间。
CPU:x86_64 架构,建议 4 核以上。
软件依赖:在 Ubuntu 系统上,需要安装 dpkg - dev、devscripts、liblzma - dev、libstdc++ - static 等依赖。此外,若使用 GCC 和 G++ 7.3.0 版本,还需增加 libstdc++ - static 依赖;对于 Ubuntu 18.04 系统,由于默认的 libprotobuf 版本不满足要求,用户需要提前安装 3.6.1 或 3.12.4 等所需版本,并在编译时通过 make PROTOBUF_DIR=<protobuf_directory> 指定高版本路径。
对于生产环境,建议采用集群部署提高可用性,KWDB 集群至少需要 3 个节点1。同时,推荐使用 SSD 或者 NVMe 设备,文件系统建议使用 ext4








在终端输入以下命令:
ss -tuln | grep <端口号>
在终端输入以下命令:
ss -tuln | grep 8080
ss -tuln | grep 26257


检查libprotobuf版本,在终端输入以下命令:
protoc --version
dpkg -s libprotobuf-dev | grep Version
发现查找不到

安装libprotobuf及相关工具
sudo apt install libprotobuf-dev protobuf-compiler

再次执行以下命令查看版本
protoc --version


# 示例 sudo apt update && sudo apt install -y <依赖1> <依赖2> ...
sudo apt update && sudo apt install -y ca-certificates autoconf byacc dpkg-dev devscripts build-essential checkinstall libssl-dev libprotobuf-dev liblzma-dev libncurses-dev libatomic1 libstdc++6 protobuf-compiler
sudo apt update && sudo apt install -y \ ca-certificates \ autoconf \ # 包名需小写 byacc \ # goyacc 的替代包 dpkg-dev \ devscripts \ build-essential \ checkinstall \ libssl-dev \ # 开发库通常以 -dev 结尾 libprotobuf-dev \ liblzma-dev \ libncurses-dev \ libatomic1 \ # 运行时库,非开发库 libstdc++6 \ # 静态库通常由这个包提供 protobuf-compiler
sudo apt update && sudo apt install -y openssl libgeos-dev liblzma-dev squashfs-tools libgcc-s1 mount squashfuse
解释说明:
sudo apt update && sudo apt install -y \ openssl \ libgeos-dev \ # 地理空间库 liblzma-dev \ # xz-libs 的正确名称 squashfs-tools \ libgcc-s1 \ # libgcc 的正确名称 mount \ squashfuse

到这里环境准备工作基本就完成了
sudo apt update
sudo tar -C /usr/local/ -xvf cmake-3.23.4-linux-x86_64.tar.gz sudo mv /usr/local/cmake-3.23.4-linux-x86_64 /usr/local/cmake

sudo tar -C /usr/local -xvf go1.22.5.linux-amd64.tar.gz

mkdir -p /home/xiaoyu/go/src/gitee.com
nano ~/.bashrc
# 设置Go的安装目录 export GOROOT=/usr/local/go # 设置Go的工作目录,可根据需求修改,$HOME 会自动指向当前用户目录(如 /home/xiaoyu) export GOPATH=/home/xiaoyu/go # 将Go和CMake的可执行文件目录添加到系统的PATH环境变量中 export PATH=$PATH:/usr/local/go/bin:/usr/local/cmake/bin

source ~/.bashrc #个人用户设置 source /etc/profile #系统全局设置
(5)验证 CMake 环境变量是否设置成功:执行以下命令查看 CMake 的版本信息:
cmake --version

(6)验证 Go 环境变量是否设置成
在终端执行以下命令查看 Go 的版本信息:
go version
若能正常输出版本信息,就说明 Go 环境变量设置成功。

CMake
CMake 是一个开源、跨平台的自动化构建系统配置工具,它本身并不直接构建出最终的软件,而是为其他构建工具(如 Make、Ninja 等)生成相应的构建文件,比如在 Unix/Linux 系统中生成 Makefile 文件。其用途主要体现在以下方面:
跨平台项目构建:当你开发的项目需要在不同的操作系统(如 Windows、Linux、macOS)和编译器(如 GCC、Clang、MSVC)上进行构建时,CMake 能根据不同平台和编译器的特性生成对应的构建文件,保证项目在各平台上都能顺利构建。
简化构建配置:在大型项目中,手动编写和维护复杂的构建脚本(如 Makefile)是一项艰巨的任务。CMake 通过简单的 CMakeLists.txt 文件来描述项目的结构和构建规则,开发者只需编写一次 CMakeLists.txt 文件,CMake 就能自动生成适用于不同平台的构建文件。
依赖管理:CMake 可以方便地管理项目的依赖项,通过find_package命令查找系统中已安装的库,或者通过ExternalProject_Add命令下载和编译外部依赖库。
使用 git clone 命令
git clone https://gitee.com/kwdb/kwdb.git /home/xiaoyu/go/src/gitee.com/kwbasedb cd /home/xiaoyu/go/src/gitee.com/kwbasedb git submodule update --init git submodule update --remote

在项目目录下创建并切换到构建目录。
cd /home/xiaoyu/go/src/gitee.com/kwbasedb mkdir build && cd build # 当前工作目录下创建一个名为 build 的目录,并在创建成功后进入该目录。
运行 CMake 配置。
修改所有者
使用 chown 命令将 build 目录及其子内容的所有者修改为当前用户(假设当前用户是 xiaoyu):
sudo chown -R xiaoyu:xiaoyu /home/xiaoyu/go/src/gitee.com/kwbasedb/build
这里的 -R 选项表示递归操作,即对 build 目录及其所有子目录和文件都进行修改。
修改权限
使用 chmod 命令赋予当前用户读写和执行权限:
chmod -R 755 /home/xiaoyu/go/src/gitee.com/kwbasedb/build
755 表示所有者具有读、写、执行权限,而所属组和其他用户具有读和执行权限。
cmake .. -DCMAKE_BUILD_TYPE=Release
禁用Go模块功能
操作同1.4.5
nano ~/.bashrc export GO111MODULE=off

source ~/.bashrc
完整操作流程(避免重复错误)
# 1. 进入项目根目录(若未在此处) cd /home/xiaoyu/go/src/gitee.com/kwbasedb # 2. 直接进入已有的 build 目录(无需再次创建) cd build # 3. 清理旧文件(可选,按需执行) rm -rf * # 4. 执行 CMake 配置(明确指定 Release/Debuge # 或 Debug cmake .. -DCMAKE_BUILD_TYPE=Release # 5. 构建项目(如果生成 Makefile) make # 6. 将软件安装到系统中 make install # 7. 查看是否安装成功 进入 `kwbase` 脚本所在目录 cd /home/xiaoyu/go/src/gitee.com/kwbasedb/install/bin ./kwbase version
操作记录图


进入 kwbase 脚本所在目录。
cd /home/xiaoyu/go/src/gitee.com/kwbasedb/install/bin
export LD_LIBRARY_PATH=../lib
启动数据库。
./kwbase start-single-node --insecure --listen-addr=:26257 --background

安装部署完 KWDB 以后,用户可以使用使用 KaiwuDB 开发者中心连接 KWDB连接和管理 KWDB

从KWDB社区下载了免安装的KWDB开发者中心

直接解压到一个文件夹下

-- 1. 创建数据表
CREATE TABLE device_metrics (
id SERIAL PRIMARY KEY, -- 自增主键
ts TIMESTAMP NOT NULL, -- 时间戳
device_id VARCHAR(50) NOT NULL, -- 设备编号
metric_name VARCHAR(50) NOT NULL, -- 指标名称
value DOUBLE PRECISION NOT NULL, -- 数值
UNIQUE(device_id, ts, metric_name) -- 确保同一设备的同一时刻只有一条记录
);
-- 2. 插入测试数据
insert INTO device_metrics (ts, device_id, metric_name, value) VALUES
('2023-10-01 10:00:00', 'Sensor_A', 'Temperature', 26.5),
('2023-10-01 10:00:00', 'Sensor_A', 'Humidity', 45.2),
('2023-10-01 10:05:00', 'Sensor_B', 'Pressure', 1013.25);
-- 3. 查询所有数据
select * FROM device_metrics;
-- 4. 按设备分组查询最新数据
select DISTINCT ON (device_id) *
FROM device_metrics
ORDER BY device_id, ts DESC;
-- 5. 更新温度数据
update device_metrics
SET value = 27.0
WHERE device_id = 'Sensor_A' AND metric_name = 'Temperature';
-- 6. 删除过期数据(保留最近7天)
delete FROM device_metrics
WHERE ts < NOW() - INTERVAL '7 days';操作4如下图


以下以浪潮开务数据库(如KaiwuDB)的工业物联网场景为例,提供一个时序数据库增删改查(CRUD)的完整案例。案例基于假设的工业设备时序数据表,包含设备ID、时间戳、温度值、振动值等字段,涵盖建表、数据插入、查询、更新和删除操作。
创建名为robot_sensor_data的时序表,包含以下字段:
设备ID:device_id(字符串,主标签,非空)
timestamp(时间戳,非空,索引)temperature(浮点数)vibration(浮点数)状态码:status_code(整数,可选)
-- 创建时序数据库(可选,若未预创建) CREATE DATABASE sensor_data WITH (type = 'timeseries'); -- 切换到工业物联网数据库 USE sensor_data; -- 创建时序表 CREATE TABLE robot_sensor_data ( timestamp TIMESTAMPTZ NOT NULL, temperature FLOAT, vibration FLOAT, status_code INT ) TAGS ( device_id VARCHAR(32) NOT NULL ) PRIMARY TAGS (device_id);

单条插入
insert INTO robot_sensor_data VALUES ( '2025-05-1 10:00:00+08:00', -- 时间戳 75.2, -- 温度值 12.3, -- 振动值 0, -- 状态码 'ROBOT-001' -- 设备ID(标签) );

使用 Psycopg 2 连接 KaiwuDB
Psycopg 是最受欢迎的 PostgreSQL 数据库适配器,专为 Python 编程语言而设计。Psycopg 完全遵循 Python DB API 2.0 规范,支持线程安全,允许多个线程共享同一连接,特别适合高并发和多线程的应用场景。
KaiwuDB 支持用户通过 Psycopg 2 连接数据库,并执行创建、插入和查询操作。
pip install psycopg2-binary
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import psycopg2
import random
from datetime import datetime, timedelta, timezone
def main():
try:
# 连接到KaiwuDB
con = psycopg2.connect(
database="sensor_data",
user="root",
password="",
host="192.168.0.141",
port="26257"
)
print("成功连接到KaiwuDB!")
con.set_session(autocommit=True)
cur = con.cursor()
# 测试插入一条简单记录
test_insert(cur)
# 生成并插入模拟数据
mock_data = generate_mock_data(device_count=2, data_points_per_device=5)
insert_count = 0
for data in mock_data:
try:
# 关键修改:使用字符串格式的时间戳,不带时区信息
timestamp_str = data["time"].strftime("%Y-%m-%d %H:%M:%S.%f")
insert_sql = """
insert INTO robot_sensor_data VALUES (
%s, %s, %s, %s, %s
)
"""
cur.execute(insert_sql, (
timestamp_str, # 直接传递格式化后的字符串
data["temp"],
data["vib"],
data["status"],
data["device"]
))
insert_count += 1
print(f"已插入 {insert_count} 条记录")
except psycopg2.Error as e:
print(f"插入数据时出错: {e}")
print(f"出错的数据: {data}")
# 继续尝试下一条记录而非中断
continue
print(f"\n成功插入 {insert_count} 条记录")
# 验证最后插入的5条记录
print("\n最后插入的5条记录验证:")
for data in mock_data[-5:]:
print(f"设备: {data['device']}, 时间: {data['time']}, 温度: {data['temp']}, 振动: {data['vib']}")
except psycopg2.Error as e:
print(f"连接数据库时出错: {e}")
finally:
if 'cur' in locals():
cur.close()
if 'con' in locals():
con.close()
def test_insert(cur):
"""测试插入一条简单记录"""
try:
test_sql = """
insert INTO robot_sensor_data VALUES (
NOW(), 25.5, 1.2, 0, 'TEST-001'
)
"""
cur.execute(test_sql)
print("测试记录插入成功")
# 查询验证
cur.execute("select * FROM robot_sensor_data WHERE device_id = 'TEST-001'")
row = cur.fetchone()
if row:
print("测试查询结果:", row)
else:
print("未找到测试记录")
except psycopg2.Error as e:
print(f"测试插入失败: {e}")
def generate_mock_data(device_count=2, data_points_per_device=5):
"""生成10条模拟工业机器人传感器数据(2台设备,每台5条)"""
base_time = datetime.now(timezone.utc) - timedelta(hours=24) # 生成过去24小时的数据
devices = [f"ROBOT-{str(i).zfill(3)}" for i in range(1, device_count + 1)]
data_stream = []
for device in devices:
for i in range(data_points_per_device):
# 生成随机时间戳
timestamp = base_time + timedelta(
seconds=random.randint(0, 3600),
milliseconds=random.randint(0, 999)
)
data = {
"time": timestamp,
"temp": round(random.uniform(30.0, 90.0), 2), # 温度30-90°C
"vib": round(random.uniform(0.1, 20.0), 2), # 振动0.1-20mm/s
"status": random.randint(0, 2), # 状态码0-2
"device": device # 设备ID
}
data_stream.append(data)
return data_stream
if __name__ == "__main__":
main()#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import psycopg2
import random
import math
from datetime import datetime, timedelta, timezone
def generate_mock_data(device_count=10, data_points_per_device=100):
"""
生成模拟工业机器人传感器数据
参数:
device_count: 设备数量 (默认10台)
data_points_per_device: 每台设备数据点数 (默认100条)
返回:
包含模拟数据的列表
"""
base_time = datetime.now(timezone.utc) - timedelta(days=30) # 30天内的数据
devices = [f"ROBOT-{str(i).zfill(3)}" for i in range(1, device_count + 1)]
data_stream = []
for device in devices:
device_base_time = base_time + timedelta(
hours=random.randint(0, 24*7) # 每台设备数据分布在不同的时间段
)
for i in range(data_points_per_device):
# 时间递增但加入随机波动
timestamp = device_base_time + timedelta(
minutes=i*15 + random.randint(-5, 5),
seconds=random.randint(0, 59),
milliseconds=random.randint(0, 999)
)
# 生成更真实的传感器数据模式
base_temp = random.uniform(30.0, 40.0)
temp_variation = math.sin(i/10) * 10 # 周期性波动
current_temp = round(base_temp + temp_variation + random.uniform(-2, 2), 2)
# 振动数据与温度相关性
base_vib = max(0.1, (current_temp - 35) / 10)
current_vib = round(base_vib + random.uniform(-0.5, 0.5), 2)
# 设备状态 (0正常, 1警告, 2故障)
status = 0
if current_temp > 75:
status = 1
elif current_temp > 85 or current_vib > 15:
status = 2
data = {
"timestamp": timestamp,
"temperature": current_temp,
"vibration": current_vib,
"status_code": status,
"device_id": device
}
data_stream.append(data)
# 按时间排序
data_stream.sort(key=lambda x: x["timestamp"])
return data_stream
def main():
try:
# 连接到KaiwuDB
con = psycopg2.connect(
database="sensor_data",
user="root",
password="",
host="192.168.0.141",
port="26257"
)
print("成功连接到KaiwuDB!")
con.set_session(autocommit=True)
cur = con.cursor()
# 检查实际表结构
print("\n正在检查表结构...")
cur.execute("""
select column_name, data_type
FROM information_schema.columns
WHERE table_name = 'robot_sensor_data'
""")
columns = {col[0]: col[1] for col in cur.fetchall()}
print("实际表结构:")
for col_name, col_type in columns.items():
print(f"{col_name}: {col_type}")
# 生成并插入1000条模拟数据
mock_data = generate_mock_data(device_count=10, data_points_per_device=100)
insert_count = 0
batch_params = []
for data in mock_data:
try:
batch_params.append((
data["timestamp"].strftime("%Y-%m-%d %H:%M:%S.%f"),
data["temperature"],
data["vibration"],
data["status_code"],
data["device_id"]
))
if len(batch_params) >= 100:
execute_batch_insert(cur, batch_params, columns)
insert_count += len(batch_params)
print(f"已批量插入 {insert_count} 条记录")
batch_params = []
except Exception as e:
print(f"准备数据时出错: {e}")
continue
if batch_params:
execute_batch_insert(cur, batch_params, columns)
insert_count += len(batch_params)
print(f"\n总计成功插入 {insert_count} 条记录")
except psycopg2.Error as e:
print(f"数据库操作出错: {e}")
finally:
if 'cur' in locals():
cur.close()
if 'con' in locals():
con.close()
def execute_batch_insert(cur, params, columns):
"""动态适配表结构的批量插入"""
try:
# 根据实际列名构建SQL
col_names = list(columns.keys())
placeholders = ", ".join(["%s"] * len(col_names))
insert_sql = f"""
insert INTO robot_sensor_data ({", ".join(col_names)})
VALUES ({placeholders})
"""
cur.executemany(insert_sql, params)
except psycopg2.Error as e:
print(f"批量插入出错: {e}")
raise
if __name__ == "__main__":
main()查询某设备最近1小时数据
select * FROM robot_sensor_data WHERE device_id = 'ROBOT-001' AND timestamp > NOW() - INTERVAL '1 hour' ORDER BY timestamp DESC;
-- 删除设备ROBOT-001在2025-05-1 10:00:00的数据 delete FROM robot_sensor_data WHERE device_id = 'ROBOT-001' AND timestamp = '2025-05-1 10:00:00+08:00'; -- 插入修正后的数据 insert INTO robot_sensor_data VALUES ( '2025-05-1 10:00:00+08:00', 74.8, -- 修正后的温度 12.1, -- 修正后的振动 0, 'ROBOT-001' );
删除单条数据
-- 删除设备ROBOT-001在2025-05-1 10:00:00的数据 delete FROM robot_sensor_data WHERE device_id = 'ROBOT-001' AND timestamp = '2025-05-1 10:00:00+08:00';
删除设备ROBOT-001超过30天的历史数据
delete FROM robot_sensor_data WHERE device_id = 'ROBOT-001' AND timestamp < NOW() - INTERVAL '30 days';
清空整表(谨慎操作)
truncate TABLE robot_sensor_data; -- 操作了一下这个sql没有成功,可能不支持这样写
delete from robot_sensor_data; --执行成功删除数据
通过以上案例,可以熟悉浪潮开务数据库在时序数据场景下的增删改查操作,并应用于工业物联网、能源监控等实际场景。
官网提供了 AI智能数据库交互问答 小助手,貌似不太理想,如果可以给出结果在链接到官方文档就更方便了.




问题出在 KaiwuDB 对时间戳数据类型的处理上。以下是针对性的解决方案:
问题诊断
错误信息:unsupported input type *tree.CastExpr
触发场景:尝试插入包含 datetime 对象的数据时
测试对比:
直接使用 NOW() 的测试插入成功
使用 Python datetime 对象插入失败
# 修改后 base_time = datetime.now() - timedelta(hours=24)