原文链接:【KWDB 创作者计划】Python语言连接并操作KWDB数据库 - OSCHINA - 中文开源技术交流社区
作者:osc_13235520
前言
纯命令行操作数据还是非常的麻烦的,且没有办法进行各类批量操作,那么本文就围绕着 Python 语言对 KWDB 数据库做链接以及 CRUD 操作做一个具体的讲解,操作的前提是环境,故而还需要对环境做出妥当的部署,否则是无法正常运行我们的代码的。
在官方网站中看到了 Python 给出了两种库操作,分别是:Psycopg 2 与 Psycopg 3,这里我使用 Psycopg 2 作为测试的库,因为我在安装 Psycopg 3 的时候需要了有些辅助库无法安装的情况,但是使用 Psycopg 2 就很顺畅,那么接下来我先罗列一下环境的概述与环境需求:
Psycopg 是最受欢迎的 PostgreSQL 数据库适配器,专为 Python 编程语言而设计。Psycopg 完全遵循 Python DB API 2.0 规范,支持线程安全,允许多个线程共享同一连接,特别适合高并发和多线程的应用场景。
KWDB 支持用户通过 Psycopg 2 连接数据库,并执行创建、插入和查询操作。本示例演示了如何通过 Psycopg 2 驱动连接和使用 KWDB。
本示例使用的 Python 版本为 Python 3.10。
我这里是 python 3.13.2 版本,所以基础内容是没有问题的。
pip3 install psycopg2-binary
安装完毕后直接 pip3 list 查看是否安装成功。
开启开发工具,我这里使用的是 VSCode。
基础连接代码:
con = psycopg2.connect(database="ts_db", user="teacher", password="Abcd1234", host="8.147.135.144",port="26257") print("连接成功!") con.set_session(autocommit=True) cur = con.cursor()
运行 sql 的方式,这里的 sql 是字符串格式的,或者直接写上就行。
cur.execute(sql)
测试代码:
#!/usr/bin/env python3# -*- coding: UTF-8 -*-import psycopg2def main(): try: con = psycopg2.connect(database="ts_db", user="teacher", password="Abcd1234", host="8.147.135.144",port="26257") print("连接成功!") con.set_session(autocommit=True) cur = con.cursor() except psycopg2.Error as e: print(f"连接Kaiwudb失败: {e}") sql = "CREATE TABLE mytest_table \ (k_timestamp timestamp NOT NULL, \ voltage double, \ current double, \ temperature double \ ) TAGS ( \ number int NOT NULL) \ PRIMARY TAGS(number) \ ACTIVETIME 3h" try: cur.execute(sql) except psycopg2.Error as e: print(f"创建表失败: {e}") sql = "insert INTO mytest_table \ VALUES ('2024-07-01 10:00:00', 220.0, 3.0, 20.5,123)" try: cur.execute(sql) except psycopg2.Error as e: print(f"插入数据失败: {e}") sql = "select * from mytest_table" try: cur.execute(sql) rows = cur.fetchall() for row in rows: print(f"k_timestamp: {row[0]}, voltage: {row[1]}, current: {row[2]}, temperature: {row[3]}, number: {row[4]}") except psycopg2.Error as e: print(f"查询数据失败: {e}") cur.close() con.close() returnif __name__ == "__main__": main()
我用的是的 VSCode:
建库建表之前都操作过了,所以这里没有建表成功。可以看到查询是没有问题的,不影响全程的操作。
这里我添加了 50 条插入的循环操作,我计算了总时间以及平均插入时间。
#!/usr/bin/env python3# -*- coding: UTF-8 -*-# 在文件顶部添加time模块导入import psycopg2import time # 新增导入def main(): try: con = psycopg2.connect(database="ts_db", user="teacher", password="Abcd1234", host="8.147.135.144",port="26257") print("连接成功!") con.set_session(autocommit=True) cur = con.cursor() except psycopg2.Error as e: print(f"连接Kaiwudb失败: {e}") # 生成50条测试数据(时间戳间隔1分钟) base_time = '2024-07-01 10:00:00' data = [ ( f'2024-07-01 10:{i:02d}:00', 220.0 + i*0.1, 3.0 + i*0.02, 20.5 + i*0.3, 123 + i ) for i in range(50) ] sql = "insert INTO mytest_table VALUES (%s, %s, %s, %s, %s)" try: start_time = time.time() # 记录开始时间 cur.executemany(sql, data) end_time = time.time() # 记录结束时间 # 计算并输出耗时 total_time = (end_time - start_time) * 1000 # 转换为毫秒 avg_time = total_time / 50 print(f"成功插入50条数据,总耗时:{total_time:.4f} 毫秒,平均每条耗时:{avg_time:.4f} 毫秒") except psycopg2.Error as e: print(f"批量插入数据失败: {e}") sql = "select * from mytest_table" try: cur.execute(sql) rows = cur.fetchall() for row in rows: print(f"k_timestamp: {row[0]}, voltage: {row[1]}, current: {row[2]}, temperature: {row[3]}, number: {row[4]}") except psycopg2.Error as e: print(f"查询数据失败: {e}") cur.close() con.close() returnif __name__ == "__main__": main()
展示对应的效果,成功的插入了 50 条数据。与对一个的信息展示。
仅仅看这些数据觉得也不快啊,那么咱们分析一下。
KWDB 插入 50 条数据耗时 822.1955 毫秒,平均每条 16.4439 毫秒,该数据仅反映特定环境和数据下的情况。若数据具有明显的时间序列特征,且插入操作频繁,KWDB 的设计使其在这类场景下的插入性能可能优于 MySQL。然而,在传统关系型数据处理场景中,MySQL 经过长期优化,对复杂关联、事务处理等操作有更好的支持,插入性能可能更稳定。在电商订单处理系统中,MySQL 能更好地满足订单数据与用户信息、商品信息等多表关联及事务处理的需求;而在电力监测系统中,KWDB 更适合处理电力设备产生的大量时序数据插入。
所以说得看具体的生产环境来决定我们如何来具体分析他的快与慢。
循环这里需要改一下:for i in range (500)
具体的来看一下效果,这里是 500 条,可以看到总计消耗的时间还是可以的。每条消耗还是有些许增加的。
这里留下代码:
#!/usr/bin/env python3# -*- coding: UTF-8 -*-# 在文件顶部添加time模块导入import psycopg2import time # 新增导入def main(): try: con = psycopg2.connect(database="ts_db", user="teacher", password="Abcd1234", host="8.147.135.144",port="26257") print("连接成功!") con.set_session(autocommit=True) cur = con.cursor() except psycopg2.Error as e: print(f"连接Kaiwudb失败: {e}") # 生成5000条测试数据(时间戳间隔1分钟) base_time = '2025-07-01 10:00:00' # 修正年份匹配 data = [ ( # 修正分钟生成逻辑(原%07d改为%02d) f'2025-07-01 {10 + i//60:02d}:{i%60:02d}:00', # 正确处理分钟进位 220.0 + i*0.1, 3.0 + i*0.02, 20.5 + i*0.3, 123 + i ) for i in range(500) ] # 清空旧数据 try: # 将 truncate 改为 delete cur.execute("delete FROM mytest_table") print("已清空历史数据") except psycopg2.Error as e: print(f"清空数据失败: {e}") cur.close() con.close() return sql = "insert INTO mytest_table VALUES (%s, %s, %s, %s, %s)" try: start_time = time.time() cur.executemany(sql, data) end_time = time.time() # 记录结束时间 # 计算并输出耗时 total_time = (end_time - start_time) * 1000 # 转换为毫秒 avg_time = total_time / 500 print(f"成功插入500条数据,总耗时:{total_time:.4f} 毫秒,平均每条耗时:{avg_time:.4f} 毫秒") except psycopg2.Error as e: print(f"批量插入数据失败: {e}") sql = "select * from mytest_table" try: cur.execute(sql) rows = cur.fetchall() for row in rows: print(f"k_timestamp: {row[0]}, voltage: {row[1]}, current: {row[2]}, temperature: {row[3]}, number: {row[4]}") except psycopg2.Error as e: print(f"查询数据失败: {e}") cur.close() con.close() returnif __name__ == "__main__": main()
可以在代码中看到我执行了批量删除的操作。
# 清空旧数据 try: # 将 truncate 改为 delete cur.execute("delete FROM mytest_table") print("已清空历史数据") except psycopg2.Error as e: print(f"清空数据失败: {e}") cur.close() con.close() return
数据清空后再去添加数据,这样更精准一些,如果是要对某条信息删除的话,就直接使用 where 来做筛选就行。
时序数据库通常以时间戳为核心,数据按时间顺序排列,其数据变更具有独特的逻辑,常采用先删除再添加的方式来实现修改操作。
时序数据库主要用于处理按时间顺序变化的数据,这些数据往往具有时间上的连续性和不可变性。每一个数据点都与特定的时间戳紧密关联,代表了在该时刻的真实状态。如果直接修改数据,可能会破坏时间序列的完整性和真实性。在监测工业设备运行状态时,每个时间点的设备参数数据都是对当时设备状况的记录。若直接修改某一时刻的数据,就会改变历史记录,使得后续基于时间序列的分析,如设备故障预测、性能趋势分析等,出现偏差甚至得出错误结论。而先删除再添加数据,能保证时间戳与数据的对应关系不变,维持时间序列的准确性。
时序数据库多采用顺序存储、列式存储,并支持数据压缩,以优化写入性能和存储效率。这种存储方式下,直接修改数据可能会导致存储结构的调整,影响数据的压缩效果和存储效率。数据压缩通常依赖于数据的连续性和相似性,直接修改会打破这种规律。先删除再添加数据,可以更好地适应时序数据库的存储特性,减少对存储结构的影响,保证数据的高效存储和管理。
时序数据库在处理数据时,更强调数据的追加和批量删除,更新场景相对较少。在弱网环境下,可能会出现数据延迟的情况,但整体数据变更特征类似 Append - Only 方式。这种模式下,先删除再添加数据有助于确保数据一致性。如果在数据更新时采用直接修改的方式,在并发操作的情况下,可能会出现数据不一致的问题。多个用户同时对同一数据点进行修改,可能导致数据冲突。而先删除再添加数据,可以通过控制删除和添加操作的顺序和事务处理,有效避免这种冲突,保证数据的一致性。
所以说,我们要做修改操作,就是先删除再添加数据。
# 生成需要覆盖的数据(示例数据)data = [('2025-07-01 10:00:00', 250.0, 5.0, 25.5, 123)]try: # 先删除旧数据 delete_sql = "delete FROM mytest_table WHERE k_timestamp = %s AND number = %s" for record in data: cur.execute(delete_sql, (record[0], record[4])) # 插入新数据 insert_sql = """ insert INTO mytest_table (k_timestamp, voltage, current, temperature, number) VALUES (%s, %s, %s, %s, %s) """ cur.executemany(insert_sql, data)except psycopg2.Error as e: print(f"数据替换失败: {e}")
我这里操作的都是时序数据的操作,所以操作过程与传统的数据库区别还是很清晰的,竟然没有看到唯一的 id,其实唯一的 id 用的是时间戳来锚定的,在环境上我们对齐颗粒度,在操作上我们需要反复测试几回才能更好的适应时序数据库的具体代码操作。
KWDB 开源库地址:https://gitee.com/kwdb/kwdb
KWDB 学习地址:https://www.kaiwudb.com/learning/
KWDB 活动地址:https://mp.weixin.qq.com/s/ZKQo7eQj_AtwamONCSl07A
希望本文能为大家带来一些价值,欢迎留言讨论。