原文链接:【KWDB 创作者计划】Golang语言连接并操作KWDB数据库 - OSCHINA - 中文开源技术交流社区
作者:Next-Lyle
在数字化时代,数据库与编程语言的协作对构建高效应用至关重要。KWDB 作为面向 AIoT 的分布式多模数据库,优势显著。Golang 语言以高性能、简洁语法和强大并发处理能力,在后端开发中颇受认可。二者结合,在物联网、金融等领域潜力巨大。
本文将深入探索如何用 Golang 连接并操作 KWDB 数据库。从开发环境搭建、引入数据库驱动,到数据的增删查改操作,再到事务处理和性能优化,都将详细讲解。无论你是 Golang 开发老手,还是刚接触 KWDB 的新人,都能从中获取实用知识与技巧,开启高效开发之旅。
Golang 这里使用 pgx 驱动连接 KWDB 来支持对 KWDB 的 CRUD 操作。
pgx 是用 Go 语言编写的 PostgreSQL 驱动和工具包,提供了高性能的低级接口,支持用户直接利用 PostgreSQL 的特性。pgx 还包含一个适配器,与标准的数据库或 SQL 接口兼容,方便开发者进行数据库操作。
KWDB 支持用户通过 pgx 驱动连接数据库,并执行创建、插入和查询操作。本示例演示了如何使用 Go 语言通过 pgx 驱动连接 KWDB。
安装和运行 KWDB 数据库、配置数据库认证方式、创建数据库。
创建具有表级别及以上操作权限的用户。
已安装 Go 1.16 及以上版本。
使用 cmd 来输入 go version 来查看 go 的环境,我这里安装是的 go 的 1.24.0 版本,确认是可以使用的。
go env -w GOPROXY=https://goproxy.cn
还需要下载:go get github.com/jackc/pgx/v5
示例代码:
package mainimport ( "context" "fmt" "log" "time" "github.com/jackc/pgx/v5")func main() { // 使用账号密码连接 url := fmt.Sprintf("postgresql://%s:%s@%s/%s", "teacher", "Abcd1234", "8.147.135.144:26257", "ts_db") // 或者使用证书连接 // url := fmt.Sprintf("postgresql://%s@%s/%s?sslmode=verify-full&sslrootcert=%s&sslcert=%s&sslkey=%s", // "root", "127.0.0.1:26257", "defaultdb", // "/home/inspur/src/gitee.com/kwbasedb/install/certs/ca.crt", // "/home/inspur/src/gitee.com/kwbasedb/install/certs/client.root.crt", // "/home/inspur/src/gitee.com/kwbasedb/install/certs/client.root.key") config, err := pgx.ParseConfig(url) if err != nil { log.Fatalf("error parsing connection configuration: %v", err) } config.RuntimeParams["application_name"] = "sample_application_gopgx" conn, err := pgx.ConnectConfig(context.Background(), config) if err != nil { log.Fatalf("error connecting to database: %v", err) } defer conn.Close(context.Background()) // 创建时间序列数据库 _, err = conn.exec(context.Background(), "CREATE TS DATABASE db_TimeSeries") if err != nil { log.Fatalf("error creating database: %v", err) } // 创建表 _, err = conn.exec(context.Background(), "CREATE TABLE db_TimeSeries.table1 ("+ "k_timestamp timestamp NOT NULL, "+ "voltage double, "+ "current double, "+ "temperature double "+ ") TAGS ( "+ "number int NOT NULL) "+ "PRIMARY TAGS(number) "+ "ACTIVETIME 3h;") if err != nil { log.Fatalf("error creating table: %v", err) } // 插入数据 _, err = conn.exec(context.Background(), "insert INTO db_TimeSeries.table1 "+ "VALUES ("+ "'2024-07-01 10:00:00', "+ "220.0, 3.0, 20.5, "+ "123);") if err != nil { log.Fatalf("error inserting data: %v", err) } // 查询数据 rows, err := conn.Query(context.Background(), "select * from db_TimeSeries.table1") if err != nil { log.Fatalf("error querying data: %v", err) } else { for rows.Next() { values, err := rows.Values() if err != nil { log.Fatal("error while iterating dataset") } timestamp := values[0].(time.Time) voltage := values[1].(float64) current := values[2].(float64) temperature := values[3].(float64) number := values[4].(int32) log.Println("[k_timestamp:", timestamp, ", voltage:", voltage, ", current:", current, ", temperature:", temperature, ", number:", number, "]") } } }
运行效果中可以看到临街并且插入了一条数据,且成功返回。
单增加语句:
package mainimport ( "context" "fmt" "log" "github.com/jackc/pgx/v5")func main() { conn := connectDB() defer conn.Close(context.Background()) // 插入新数据 _, err := conn.exec(context.Background(), `insert INTO db_TimeSeries.table1 VALUES ('2024-07-01 11:00:00', 230.5, 3.2, 22.1, 124)`) if err != nil { log.Fatalf("插入失败: %v", err) } fmt.Println("数据创建成功") }// 公共连接方法(其他文件相同)func connectDB() *pgx.Conn { url := fmt.Sprintf("postgresql://%s:%s@%s/%s", "teacher", "Abcd1234", "8.147.135.144:26257", "ts_db") config, err := pgx.ParseConfig(url) if err != nil { log.Fatal(err) } conn, err := pgx.ConnectConfig(context.Background(), config) if err != nil { log.Fatal(err) } return conn }
并发式增加:
package mainimport ( "context" "fmt" "log" "github.com/jackc/pgx/v5")func main() { conn := connectDB() defer conn.Close(context.Background()) // 清空旧数据 _, err := conn.exec(context.Background(), "delete FROM db_TimeSeries.table1") if err != nil { log.Fatalf("清空数据失败: %v", err) } // 批量插入50条数据 for i := 0; i < 50; i++ { ts := fmt.Sprintf("2024-07-01 10:%02d:00", i) // 时间递增 _, err = conn.exec(context.Background(), `insert INTO db_TimeSeries.table1 VALUES ($1, $2, $3, $4, $5)`, ts, // 时间戳 220.0+0.1*float64(i), // 电压递增 3.0+0.02*float64(i), // 电流递增 20.5+0.3*float64(i), // 温度递增 123+i) // 编号递增 if err != nil { log.Fatalf("插入失败[%d]: %v", i, err) } } fmt.Println("成功插入50条数据") }func connectDB() *pgx.Conn { url := fmt.Sprintf("postgresql://%s:%s@%s/%s", "teacher", "Abcd1234", "8.147.135.144:26257", "ts_db") config, err := pgx.ParseConfig(url) if err != nil { log.Fatal(err) } conn, err := pgx.ConnectConfig(context.Background(), config) if err != nil { log.Fatal(err) } return conn }
go 语言代码运行效果:
数据库查看添加效果,确认 50 条数据添加成功,可以在代码中看到我先删除所有数据然后再添加,刚好 50 条。
删除语句:
package mainimport ( "context" "fmt" "log" "github.com/jackc/pgx/v5")func main() { conn := connectDB() defer conn.Close(context.Background()) // 删除指定数据 _, err := conn.exec(context.Background(), `delete FROM db_TimeSeries.table1 WHERE k_timestamp = $1 AND number = $2`, "2024-07-01 10:00:00", 123) if err != nil { log.Fatalf("删除失败: %v", err) } fmt.Println("数据删除成功") }// 公共连接方法(其他文件相同)func connectDB() *pgx.Conn { url := fmt.Sprintf("postgresql://%s:%s@%s/%s", "teacher", "Abcd1234", "8.147.135.144:26257", "ts_db") config, err := pgx.ParseConfig(url) if err != nil { log.Fatal(err) } conn, err := pgx.ConnectConfig(context.Background(), config) if err != nil { log.Fatal(err) } return conn }
查询语句:
package mainimport ( "context" "fmt" "log" "time" "github.com/jackc/pgx/v5")func main() { conn := connectDB() defer conn.Close(context.Background()) // 查询全部数据 rows, _ := conn.Query(context.Background(), "select * FROM db_TimeSeries.table1") for rows.Next() { var ( ts time.Time voltage float64 current float64 temp float64 number int32 ) err := rows.Scan(&ts, &voltage, ¤t, &temp, &number) if err != nil { log.Fatal(err) } fmt.Printf("时间: %s 电压: %.1f 编号: %d\n", ts.Format(time.RFC3339), voltage, number) } }// 公共连接方法(其他文件相同)func connectDB() *pgx.Conn { url := fmt.Sprintf("postgresql://%s:%s@%s/%s", "teacher", "Abcd1234", "8.147.135.144:26257", "ts_db") config, err := pgx.ParseConfig(url) if err != nil { log.Fatal(err) } conn, err := pgx.ConnectConfig(context.Background(), config) if err != nil { log.Fatal(err) } return conn }
在 KWDB 数据库中,由于其具备时序数据库特性,修改操作通常需要先删除旧数据再添加新数据。
原由:时序数据库中数据与时间戳紧密相关,为了保证时间序列的完整性、存储结构的稳定性以及数据一致性,KWDB 采用先删除再添加的方式进行数据修改。直接修改可能破坏时间序列、影响存储效率或导致数据冲突。
Go 语言简洁高效、原生支持并发,能充分利用多核 CPU 资源,面对 KWDB 数据库高并发的数据读写场景,可大幅提升处理效率。KWDB 作为面向 AIoT 的分布式多模时序数据库,擅长处理海量时序数据。Go 语言与之结合,在物联网场景中,可快速采集、存储设备产生的大量时序数据;在金融领域,能高效处理高频交易数据的实时写入与查询分析,满足低延迟、高吞吐的需求,为相关业务提供强大的技术支撑。
这里留一个实例,给了一些 DDL 语句与结构体,一般用于量化交易。
1.1时序数据库项目目标
专为金融时间序列数据设计(如股票行情、交易信号)
使用 CREATE TS DATABASE
创建专用时序数据库
支持自动数据清理(ACTIVETIME 参数)
1.2表结构特点:
市场数据表和交易信号表使用标签(TAGS)分区存储不同品种
订单表使用订单 ID 作为主键,设置 7 天自动清理
所有表都包含精确到秒的时间戳字段
数值字段使用 DECIMAL 类型保证计算精度
1.3量化交易数据结构:
// 市场行情数据结构type MarketData struct { Symbol string Timestamp string Open float64 // 开盘价 High float64 // 当日最高 Low float64 // 当日最低 Close float64 // 收盘价 Volume int64 // 成交量}
1.4金融场景适配
1.5防注入式写法:
// 示例参数化插入_, err = conn.exec(context.Background(), `insert INTO financial_data.market_data VALUES ($1, $2, $3, $4, $5, $6)`, data.Timestamp, data.Open, data.High, data.Low, data.Close, data.Volume, data.Symbol)
注:自动清理 7 天前订单数据(ACTIVETIME 7d)
1.6 DDL 语句:
-- 市场数据表(带品种标签)CREATE TABLE IF NOT EXISTS market_data ( ts TIMESTAMP NOT NULL, open DECIMAL(18,4), high DECIMAL(18,4), low DECIMAL(18,4), close DECIMAL(18,4), volume BIGINT) TAGS ( symbol VARCHAR(10) ) PRIMARY TAGS(symbol);-- 交易信号表(带品种标签)CREATE TABLE IF NOT EXISTS trade_signals ( ts TIMESTAMP NOT NULL, signal_type VARCHAR(20), strength DECIMAL(5,2) ) TAGS ( symbol VARCHAR(10) ) PRIMARY TAGS(symbol);-- 订单记录表(带订单ID标签)CREATE TABLE IF NOT EXISTS orders ( ts TIMESTAMP NOT NULL, direction VARCHAR(4), price DECIMAL(18,4), quantity DECIMAL(18,4), status VARCHAR(10) ) TAGS ( order_id VARCHAR(36) ) PRIMARY TAGS(order_id) ACTIVETIME 7d;
本文聚焦于用 Golang 连接并操作 KWDB 数据库,为开发者提供了全面指引。借助 pgx
驱动,实现对 KWDB 的 CRUD 操作。对环境,包括前提条件、Go 环境确认、代理设置与依赖下载做了示例。接着给出运行测试示例代码,展示数据库创建、表创建、数据插入与查询。最后提供完整 CRUD 示例,涵盖单条和并发式数据增加、数据删除及查询操作。这不仅体现了 Golang 与 KWDB 结合的强大功能,更为开发者在实际项目中运用二者提供了清晰的实现路径。
KWDB 开源库地址:https://gitee.com/kwdb/kwdb
KWDB 学习地址:https://www.kaiwudb.com/learning/
KWDB 活动地址:https://mp.weixin.qq.com/s/ZKQo7eQj_AtwamONCSl07A
希望本文能为大家带来一些价值,欢迎留言讨论。