文章链接:【KWDB创作者计划】KWDB高性能时序数据库能力,助力MT4期货交易系统实现时序数据管理
作者:蓝精灵1
随着互联网的发展,企业的规模也在不断的发展,企业内部的在线业务量也随之骤增,海量的数据访问和存储压力已经日益不满足公司业务的需求了,而且搭建传统集中式数据库也增加运维工作量和公司软硬件成本,对于突发的业务,无法满足更快更稳定的业务性能。
今天在CSDN看到有一个时序数据库相关的KWDB,经过了解了发现感叹技术发展越来越快了,KWDB分布式多模数据库是专门聚焦自主研发的面向 AIoT 场景的分布式、多模融合、支持原生 AI 的数据库产品,实现时序数据采集、写入、存储、查询、分析、应用全周期生命覆盖,让企业用更低的成本挖掘更大的数据价值。
KWDB高性能时序数据库能力,助力MT4期货交易系统实现时序数据管理
KWDB 分布式多模数据库可通过同一实例同时建立时序库和关系库并融合处理多模数据,具备千万级设备接入、百万级数据秒级写入、亿级数据秒级读取等时序数据高效处理能力;具有稳定安全、高可用、易运维等特点,面向工业物联网、数字能源、车联网、智慧产业等领域,提供一站式数据存储、管理与分析的基座。
公司从23年上半年开发了一个MT4市场行情接收软件,在投入运营阶段发现在业务的初期5、6、7、8月初,用户量比较少,基本上还不到100的访问量,后经过公司的策略改变,加上运营团队进行了几波疯狂有效的推广与促销,使系统的访问量逐渐在这2个月起来了,以下是公司的流量趋势图。
项目上线后,每个月还要消耗大量的IT软硬件资源成本。
①. 软硬件成本开销大,比如需要搭建自己的主从MySQL,需要够买云服务器、需要运维人力大量的工作支持
②. 改造难度系数高,比如,我们公司用的MySQL,但是想使用PostgreSQL来改造,可以发现有些语法兼容性不足,导致需要对原有业务系统进行大量改造。
③. 扩容难:为满足业务的快速增长带来的业务的计算能力与存储能力的需求,用户搭建一主多从、多主多从来进行扩容,而且作业时,需要中断业务数据才能进行完成扩容操作,还需要重启才能生效。
④. 迁移困难:比如说我们公司早期有一些人写存储过程,但是在迁移时,非常不方便,进一步增加企业的数据迁移改造成本。
基于这些痛点,我们迫切的想要一款易用的数据库来解决实际的问题,经过探索和学习KWDB分布式多模数据库之后,发现这个数据库非常契合我们在MT4的业务关于时序的数据收集与管理。
二、什么是时序数据和时序数据库?
在此需要了解一下,什么是时序数据库?只有知己知彼,才能很好的进行运用这个KWDB数据库。
对于数据库的概念大家应该并不陌生,但是‘时序’是什么?时序数据库有哪些特点,基本架构是什么,市面上又有哪些产品类型?
时序数据库顾名思义,是“管理时序数据的数据库”,所以在了解时序数据库之前,首先需要了解什么是时序数据。时序数据是按时间维度,记录系统、设备状态变化的数据类型。它的基本结构特点就是数据中自带数据产生的时间,也就是数据带有时间戳。在网络良好的情况下,时序数据是以时间顺序上报的。
物联网、工业物联网、金融、医疗等领域各种类型的设备和传感器网络都会产生海量的时序数据,以风机运行场景为例,测风仪可能随着环境因素(震动,腐蚀等)出现偏差,主控系统依据错误风向数据偏航导致迎风角错误,将会导致风机发电效率衰减,影响产能。
时序数据库是一种专门用于存储、管理和处理时序数据的数据库管理系统。因为时序数据一般存在采样频次高、实时性强、数据量大等显著特点,导致写入要求高、存储代价大、处理难度高,一旦时序数据体量增加,单靠传统数据库可能力不从心。
而时序数据库在管理时序数据方面就存在很多性能优势,主要包括:
①. 高吞吐写入能力:时序数据往往体量庞大,并可能存在高频数据上报。时序数据库通过优化数据结构和存储机制,可以在高并发的情况下保持高效的写入性能。
②. 高压缩存储能力:使用传统方法的情况下,海量时序数据的存储往往占用空间大、存储成本高。而时序数据库可以通过时序数据处理的相关技术,来大幅减少存储空间。
③. 低延迟查询能力:时序数据时间属性强,时序数据库能够支持用户用更简单的代码逻辑,实时进行基于时间范围的多类查询,方便灵活地获取所需结果。
在没有专门管理时序数据的数据库之前,通常使用关系型数据库管理时序数据,因此部分时序数据库的架构是基于关系型数据库进行优化的。还有一类基于 KV (key-value)存储的时序数据库,通过扩展 NoSQL 数据库实现时序数据存储,并使用分布式文件系统保障其扩展性,可以看到下面是公司MT4的业务流程,我们之前是使用Redis来进行管理的。
随着大数据时代到来,2010 年之后,时序数据爆发式增长,时序数据库的发展走上了“快车道”。为了适应更多的场景,实现更好的性能,面向时序数据存储全新研发的原生时序数据库也越来越多,现在给大家带来KWDB分布式多模数据库。
随着物联网、云计算、大数据等技术的发展,时序数据库在未来将迎来更多的发展机会和挑战,在实时数据处理、智能分析、跨平台兼容性、数据安全性等方面,时序数据库还在持续创新。
KWDB 是一款面向 AIoT 场景的分布式、多模融合、支持原生 AI 的数据库产品,支持同一实例同时建立时序库和关系库并融合处理多模数据,具备时序数据高效处理能力,具有稳定安全、高可用、易运维等特点。面向工业物联网、数字能源、车联网、智慧产业等领域,KWDB 提供一站式数据存储、管理与分析的基座。
KaiwuDB具备自适应时序、事务处理、预测分析计算、超速分析计算等计算引擎:
①. 自适应时序引擎:支持多种时序数据特色的复杂查询及多维聚合方式;提供 5-30 倍以上压缩能力,数据压缩后无需解压缩即可使用;
②. 事务处理引擎:支持分布式事务和 MVCC 多版本并发控制,具备注释、视图、约束、索引、序列等功能;
③. 预测分析引擎:提供模型生命周期管理、模型训练、模型推理预测等功能,任何具备数据库应用开发背景的应用开发人员都可以轻松地完成模型管理和预测等操作;
④. 超速分析引擎:提供多维复杂查询、关联查询等能力,加速复杂查询、实时查询效率。
①. 面向时间序列数据的优化:对物联网场景中常见的时序数据进行了深度优化。
②. 高吞吐与低延迟:适合实时数据写入和快速查询的场景。
③. 分布式架构:支持大规模横向扩展,能够处理 PB 级数据。
④. 查询灵活性:支持多种查询模式,包括时序查询、聚合分析和多维数据查询。
这里通过单节点部署方式快速上手体验 KWDB 数据库,这里安装KWDB数据库有3种方式,因为是非生产环境,可以选择以下任意一种方式部署 KWDB 数据库:
①. 使用脚本部署:通过安装包内提供的部署脚本进行部署,支持配置数据库的部署模式、数据存储路径、端口等参数。
②. 使用 kwbase CLI 部署:通过安装包内提供的服务组件和库进行部署,同样支持配置数据库的部署模式、数据存储路径、端口等参数。
这里我们使用脚本的方式来进行部署,不过,在此之前需要注意一些环境和依赖的检查准备,单节点配置建议不低于 4 核 8G。对于数据量大、复杂的工作负载、高并发和高性能场景,建议配置更高的 CPU 和内存资源以确保系统的高效运行,这里我选择Ubuntu 22.02的版本的系统。
apt-get update sudo apt install libprotobuf-dev wget https://gitee.com/kwdb/kwdb/releases/download/V2.2.0/KWDB-2.2.0-ubuntu22.04-x86_64-debs.tar.gz tar -zxvf KWDB-2.2.0-ubuntu22.04-x86_64-debs.tar.gz
这里可以登录待部署节点,编辑安装包目录下的 deploy.cfg 配置文件,设置安全模式、管理用户、服务端口等信息,其中,一些主要的参数做了解释。
但是,我们在集群部署中,这里一直输入密码,发现并没有成功,我很确定密码没有输错,这时候只能来看一下官方的文档来看看是什么问题,这里是因为我们开启 TLS 安全模式。开启安全模式后,KaiwuDB 生成 TLS 证书,作为客户端或应用程序连接数据库的凭证。生成的客户端相关证书存放在 /etc/kaiwudb/certs 目录,可以看到有一个SSH 免密登录。
使用脚本部署集群时,系统将对配置文件、运行环境、硬件配置、软件依赖和 SSH 免密登录进行检查。
①. 如果硬件配置未满足要求,系统将继续安装,并提示硬件规格不满足要求。
②. 如果软件依赖未满足要求,系统会中止安装并提供相应的错误信息。
# 登录当前节点,生成公私密钥对。-f ~/.ssh/id_rsa:指定生成的密钥对文件目录。-N:将密钥密码设置为空,以实现免密登录 ssh-keygen -f ~/.ssh/id_rsa -N "" # 将密钥分发至集群其它节点。 ssh-copy-id -f -i ~/.ssh/id_rsa.pub -o StrictHostKeyChecking=no <target_node> # 多副本集群 ./deploy.sh install --multi-replica # 初始化并启动集群 ./deploy.sh cluster -i # 查看集群节点状态 ./deploy.sh cluster -s
多副本集群初始化和启动大约需要 10 秒左右时间。在此期间,如果有节点死亡,可能会导致集群无法触发高可用机制。
KWDB 时序数据库支持在创建数据库的时候设置数据库的生命周期和分区时间范围。数据库生命周期和分区时间范围的设置与系统的存储空间密切相关。
生命周期越长,分区时间范围越大,系统所需的存储空间也越大。当用户单独指定或者修改数据库内某一时序表的生命周期或分区时间范围时,该配置只适用于该时序表。
这里可以建2个不同类型的数据库来进行比较:
mt4_ts; mt4_re;
MT4平台返回的交易数据主要包括实时报价、历史交易记录和账户信息三大类字段,这些字段用于支持交易决策和策略分析。以下分类详述各字段内容,基于MT4的标准数据结构和功能实现。
以下SQL结构包含三个核心表:market_data存储实时行情,order_history记录完整交易历史,account_status跟踪资金变化,所有数值字段均设置精确小数位,关键字段建立主键和索引。
-- 订单表 CREATE TABLE mt4_re.orders ( order_id BIGINT PRIMARY KEY, account_id INT NOT NULL, symbol VARCHAR(12) NOT NULL, type VARCHAR(10) NOT NULL, open_price DECIMAL(12,5) NOT NULL, close_price DECIMAL(12,5), volume DECIMAL(8,2) NOT NULL, stop_loss DECIMAL(12,5), take_profit DECIMAL(12,5), open_time TIMESTAMPTZ NOT NULL, close_time TIMESTAMPTZ ); -- 实时报价表 CREATE TABLE mt4_ts.market_data ( time TIMESTAMPTZ NOT NULL, symbol VARCHAR(10) NOT NULL, bid FLOAT NOT NULL, ask FLOAT NOT NULL, high FLOAT NOT NULL, low FLOAT NOT NULL, volume INT NOT NULL ) TAGS ( uuid char(32) NOT NULL ) PRIMARY TAGS (uuid); -- 历史订单表 CREATE TABLE mt4_ts.order_history ( open_time TIMESTAMP NOT NULL, close_time TIMESTAMP NOT NULL, ticket VARCHAR(50) NOT NULL, account INT NOT NULL, symbol VARCHAR(10) NOT NULL, type VARCHAR(10) NOT NULL, volume FLOAT NOT NULL, open_price FLOAT NOT NULL, close_price FLOAT NOT NULL, stop_loss FLOAT NOT NULL, take_profit FLOAT NOT NULL, commission FLOAT NOT NULL, swap FLOAT NOT NULL, profit FLOAT NOT NULL ) TAGS ( uuid char(32) NOT NULL ) PRIMARY TAGS (uuid); -- 账户状态表 CREATE TABLE mt4_re.account_status ( account_number INT PRIMARY KEY, balance DECIMAL(15,2) NOT NULL, equity DECIMAL(15,2) NOT NULL, free_margin DECIMAL(15,2) NOT NULL, margin DECIMAL(15,2) NOT NULL, margin_level DECIMAL(10,2) NOT NULL, last_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP );
然而,在创建时序数据表和关系数据表时,会发现有一些SQL的语法是不支持的,以下为列举的相关不支持的创建表语句:
①. 时序数据表第一列必须要是时间格式的
②. 不支持dataetime类型字段
③. 不支持innoDB引擎类型
④. 不支持enum枚举字段
⑤. 不支持``创建表结构
pgx 是用 Go 语言编写的 PostgreSQL 驱动和工具包,提供了高性能的低级接口,支持用户直接利用 PostgreSQL 的特性。pgx 还包含一个适配器,与标准的数据库或 SQL 接口兼容,方便开发者进行数据库操作。
KWDB 支持用户通过 pgx 驱动连接数据库,并执行创建、插入和查询操作。下面进行演示了如何使用 Go 语言通过 pgx 驱动连接 KWDB。
package main import ( "context" "fmt" "log" "math/rand" "os" "os/signal" "sync" "syscall" "time" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" ) const ( batchSize = 5000 // 批量插入的大小 workers = 4 // 每个表的写入goroutine数 ) // 生成随机浮点数 func randFloat(min, max float64) float64 { return min + rand.Float64()*(max-min) } // 生成随机字符串 func randString(length int) string { const charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" b := make([]byte, length) for i := range b { b[i] = charset[rand.Intn(len(charset))] } return string(b) } // 插入订单表数据 func insertOrders(ctx context.Context, pool *pgxpool.Pool, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < workers; i++ { workerID := i // 避免闭包问题 go func(workerID int) { batch := &pgx.Batch{} count := 0 for { // 准备批量插入数据 batch.Queue(` insert INTO mt4_re.orders ( order_id, account_id, symbol, type, open_price, close_price, volume, stop_loss, take_profit, open_time, close_time ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) `, rand.Int63(), // order_id rand.Intn(1000), // account_id randString(6), // symbol "BUY", // type randFloat(1.0, 100.0), // open_price randFloat(1.0, 100.0), // close_price randFloat(0.1, 10.0), // volume randFloat(1.0, 100.0), // stop_loss randFloat(1.0, 100.0), // take_profit time.Now(), // open_time time.Now().Add(time.Hour*24), // close_time ) count++ if count >= batchSize { br := pool.SendBatch(ctx, batch) if err := br.Close(); err != nil { log.Printf("Error sending batch: %v", err) } batch = &pgx.Batch{} count = 0 } } }(workerID) } } // 插入市场数据 func insertMarketData(ctx context.Context, pool *pgxpool.Pool, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < workers; i++ { workerID := i // 避免闭包问题 go func(workerID int) { batch := &pgx.Batch{} count := 0 for { batch.Queue(` insert INTO mt4_ts.market_data ( time, symbol, bid, ask, high, low, volume, uuid ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) `, time.Now(), // time randString(6), // symbol randFloat(1.0, 100.0), // bid randFloat(1.0, 100.0), // ask randFloat(1.0, 100.0), // high randFloat(1.0, 100.0), // low rand.Intn(1000), // volume randString(32), // uuid ) count++ if count >= batchSize { br := pool.SendBatch(ctx, batch) if err := br.Close(); err != nil { log.Printf("Error sending batch: %v", err) } batch = &pgx.Batch{} count = 0 } } }(workerID) } } // 插入历史订单数据 func insertOrderHistory(ctx context.Context, pool *pgxpool.Pool, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < workers; i++ { workerID := i // 避免闭包问题 go func(workerID int) { batch := &pgx.Batch{} count := 0 for { batch.Queue(` insert INTO mt4_ts.order_history ( open_time, close_time, ticket, account, symbol, type, volume, open_price, close_price, stop_loss, take_profit, commission, swap, profit, uuid ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) `, time.Now(), // open_time time.Now().Add(time.Hour*24), // close_time fmt.Sprintf("T%d", rand.Int()), // ticket rand.Intn(1000), // account randString(6), // symbol "SELL", // type randFloat(0.1, 10.0), // volume randFloat(1.0, 100.0), // open_price randFloat(1.0, 100.0), // close_price randFloat(1.0, 100.0), // stop_loss randFloat(1.0, 100.0), // take_profit randFloat(0.0, 10.0), // commission randFloat(-5.0, 5.0), // swap randFloat(-100.0, 100.0), // profit randString(32), // uuid ) count++ if count >= batchSize { br := pool.SendBatch(ctx, batch) if err := br.Close(); err != nil { log.Printf("Error sending batch: %v", err) } batch = &pgx.Batch{} count = 0 } } }(workerID) } } // 插入账户状态数据 func insertAccountStatus(ctx context.Context, pool *pgxpool.Pool, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < workers; i++ { workerID := i // 避免闭包问题 go func(workerID int) { batch := &pgx.Batch{} count := 0 for { batch.Queue(` insert INTO mt4_re.account_status ( account_number, balance, equity, free_margin, margin, margin_level, last_update ) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (account_number) DO update SET balance = EXCLUDED.balance, equity = EXCLUDED.equity, free_margin = EXCLUDED.free_margin, margin = EXCLUDED.margin, margin_level = EXCLUDED.margin_level, last_update = EXCLUDED.last_update `, rand.Intn(1000), // account_number randFloat(1000, 100000), // balance randFloat(1000, 100000), // equity randFloat(0, 50000), // free_margin randFloat(0, 50000), // margin randFloat(0, 1000), // margin_level time.Now(), // last_update ) count++ if count >= batchSize { br := pool.SendBatch(ctx, batch) if err := br.Close(); err != nil { log.Printf("Error sending batch: %v", err) } batch = &pgx.Batch{} count = 0 } } }(workerID) } } func main() { // 设置随机数种子 rand.Seed(time.Now().UnixNano()) // 数据库连接配置 url := fmt.Sprintf("postgresql://%s@%s/%s?sslmode=verify-full&sslrootcert=%s&sslcert=%s&sslkey=%s", "root", "47.110.144.145:26257", "defaultdb", "./certs/ca.crt", "./certs/client.root.crt", "./certs/client.root.key") config, err := pgxpool.ParseConfig(url) if err != nil { log.Fatalf("error parsing connection configuration: %v", err) } config.MaxConns = 50 // 设置连接池大小 // 创建连接池 ctx := context.Background() pool, err := pgxpool.ConnectConfig(ctx, config) if err != nil { log.Fatalf("error connecting to database: %v", err) } defer pool.Close() // 使用WaitGroup等待所有goroutine完成 var wg sync.WaitGroup wg.Add(4) // 启动4个独立的goroutine写入数据 go insertOrders(ctx, pool, &wg) go insertMarketData(ctx, pool, &wg) go insertOrderHistory(ctx, pool, &wg) go insertAccountStatus(ctx, pool, &wg)金仓数据库 2025 征文大赛 // 等待所有goroutine完成 // 添加信号处理 sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) // 等待中断信号 <-sigChan log.Println("Received shutdown signal, cleaning up...") // 关闭连接池并等待所有goroutine完成 pool.Close() wg.Wait() }
这里会有一个问题,就是在"github.com/jackc/pgx/v5",这里并没有发现pgxpool地址池的东西,但是我们在v4的版本是有的,所以,这里我们使用v4版本的pgxpool来进行地址池的数据写入操作。
可以看到数据写入到30w左右的时候,发现报错了,每个表都有4个worker(goroutine)同时工作,每5000条记录进行一次批量插入,这样可以获得较好的插入性能,但是在插入到30w的数据时,发现了报错。
F250602 09:06:06.756652 140 kv/kvserver/store_raft.go:531 [n1,s1,r62/1:/Table/83{-/400}] unable to put data: non-deterministic failure: unable to put data: could not PutData: PutData Error!,RangeGroup:99 goroutine 140 [running]: gitee.com/kwbasedb/kwbase/pkg/util/log.getStacks(0x6b7d500, 0xedfcf607e, 0x0, 0xc0094905e0) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/util/log/get_stacks.go:29 +0xb9 gitee.com/kwbasedb/kwbase/pkg/util/log.(*loggerT).outputLogEntry(0x6b7a6a0, 0xc000000004, 0xc0094905c0, 0x19, 0x213, 0xc0007a1400, 0x91) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/util/log/clog.go:227 +0xab6 gitee.com/kwbasedb/kwbase/pkg/util/log.addStructured(0x4cef458, 0xc007e9bc80, 0x4, 0x3, 0x442808f, 0x7, 0xc003427ca8, 0x2, 0x2) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/util/log/structured.go:80 +0x2b4 gitee.com/kwbasedb/kwbase/pkg/util/log.logDepth(...) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/util/log/log.go:66 gitee.com/kwbasedb/kwbase/pkg/util/log.FatalfDepth(...) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/util/log/log.go:196 gitee.com/kwbasedb/kwbase/pkg/kv/kvserver.maybeFatalOnRaftReadyErr(0x4cef458, 0xc007e9bc80, 0x444e3e6, 0x12, 0x4c83980, 0xc0092cb940, 0x0) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/kv/kvserver/replica_raft.go:1586 +0x1e5 gitee.com/kwbasedb/kwbase/pkg/kv/kvserver.(*Store).processReady(0xc00080ce00, 0x4cef458, 0xc000b840c0, 0x3e) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/kv/kvserver/store_raft.go:531 +0x1d4 gitee.com/kwbasedb/kwbase/pkg/kv/kvserver.(*raftScheduler).worker(0xc00052dc20, 0x4cef458, 0xc000b840c0) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/kv/kvserver/scheduler.go:282 +0x26b gitee.com/kwbasedb/kwbase/pkg/kv/kvserver.(*raftScheduler).Start.func2(0x4cef458, 0xc000b840c0) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/kv/kvserver/scheduler.go:208 +0x3e gitee.com/kwbasedb/kwbase/pkg/util/stop.(*Stopper).RunWorker.func1(0xc000b58140, 0xc00052cbd0, 0xc000b58130) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/util/stop/stopper.go:212 +0x103 created by gitee.com/kwbasedb/kwbase/pkg/util/stop.(*Stopper).RunWorker /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/util/stop/stopper.go:205 +0xa8 **************************************************************************** This node experienced a fatal error (printed above), and as a result the process is terminating. Fatal errors can occur due to faulty hardware (disks, memory, clocks) or a problem in KWDB. You can contact your vendor or report an issue to KWDB community.
为了提高可用性,降低数据丢失的风险,建议在单台计算机上只运行一个节点。KWDB 采用跨节点复制机制,如果在一台计算机上同时运行多个节点,当计算机发生故障时,更有可能丢失数据。
上面在面对这么多数据库的压力下,4核8G感觉有点吃力,那我们把数据库换成8核16G集群的方案来试试,看一下数据量是否能支撑的了业务的急剧数据增长呢?
I250602 09:29:58.700637 1 cli/start.go:1075 process identity: uid 1000 euid 1000 gid 1000 egid 1000 I250602 09:29:58.702088 1 cli/start.go:639 starting kwbase node I250602 09:29:58.709035 82 base/addr_validation.go:310 [n?] server certificate addresses: IP=47.110.144.145,47.110.144.145,127.0.0.1,0.0.0.0; DNS=localhost; CN=node I250602 09:29:58.709056 82 base/addr_validation.go:356 [n?] web UI certificate addresses: IP=47.110.144.145,47.110.144.145,127.0.0.1,0.0.0.0; DNS=localhost; CN=node I250602 09:29:58.709745 82 storage/rocksdb.go:625 opening rocksdb instance at "/var/lib/kaiwudb/kwbase-temp039940152" I250602 09:29:58.740661 82 server/server.go:1232 [n?] monitoring forward clock jumps based on server.clock.forward_jump_check_enabled I250602 09:29:58.741079 82 storage/rocksdb.go:625 opening rocksdb instance at "/var/lib/kaiwudb" I250602 09:29:58.958551 82 server/config.go:663 [n?] 1 storage engine initialized I250602 09:29:58.958560 82 server/config.go:666 [n?] RocksDB cache size: 128 MiB I250602 09:29:58.958566 82 server/config.go:666 [n?] store 0: RocksDB, max size 0 B, max open file limit 1043576 E250602 09:29:58.964930 82 cli/start.go:594 Cluster expansion feature needs an enterprise license to enable. panic: Cluster expansion feature needs an enterprise license to enable. goroutine 82 [running]: gitee.com/kwbasedb/kwbase/pkg/cli.runStart.func1(0xc000852000) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/cli/start.go:595 +0x7d1 gitee.com/kwbasedb/kwbase/pkg/server.(*Server).Start(0xc000852000, 0x4cef458, 0xc00062e1e0, 0x0, 0x0) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/server/server.go:1762 +0x5148 gitee.com/kwbasedb/kwbase/pkg/cli.runStart.func3.2(0xc000408ab0, 0xc000424230, 0xc00024b380, 0x4cef458, 0xc00062e1e0, 0x0, 0x2941b08b, 0xedfcf6616, 0x0, 0x0, ...) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/cli/start.go:698 +0x105 gitee.com/kwbasedb/kwbase/pkg/cli.runStart.func3(0xc000424230, 0x4cef458, 0xc00062e1e0, 0x4d47920, 0xc0002d19b0, 0xc000408ab0, 0xc00024b380, 0x0, 0x2941b08b, 0xedfcf6616, ...) /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/cli/start.go:818 +0x131 created by gitee.com/kwbasedb/kwbase/pkg/cli.runStart /home/inspur/src/gitee.com/kwbasedb/kwbase/pkg/cli/start.go:654 +0xebe
这里在重启kwdb服务时,会提示这个“Cluster expansion feature needs an enterprise license to enable.”,是只有企业版本才支持吗?反正,弄了挺久的没弄好。
这里表示两台集群的KWDB多模数据库就搭建完成了,为了提高可用性,降低数据丢失的风险,建议在单台计算机上只运行一个节点。KaiwuDB 采用跨节点复制机制,如果在一台计算机上同时运行多个节点,当计算机发生故障时,更有可能丢失数据。
这里可以看到这里集群里面数据同步也是非常快,不管是时序数据库和关系型数据库都可以很好的进行同步,虽然有一点点误差,但是还是在接受的范围内,可以看到只有不到20条数据的误差。
跨模查询是一种用于检索相关联数据的查询方法,通常用于在不同类型的数据库之间进行查询,例如在关系数据库和时序数据库之间检索相关的数据,KWDB 跨模查询支持对关系表和时序表进行关联查询、嵌套查询、联合查询。
KWDB 跨模查询支持以下关联查询:
①. 内连接(INNER JOIN)
②. 左连接(LEFT JOIN)
③. 右连接(RIGHT JOIN)
④. 全连接(FULL JOIN)
KWDB 跨模查询支持以下嵌套查询:
①. 相关子查询(Correlated Subquery):内部查询依赖于外部查询的结果,每次外部查询的都触发内部查询的执行。
②. 非相关子查询(Non-Correlated Subquery):内部查询独立于外部查询,只执行一次内部查询并返回固定的结果。
③. 相关投影子查询(Correlated Scalar Subquery): 内部查询依赖于外部查询的结果,并且只返回一个单一的值作为外部查询的结果。
④. 非相关投影子查询(Non-Correlated Scalar Subquery):内部查询独立于外部查询,并且只返回一个单一的值作为外部查询的结果。
⑤. FROM 子查询:将一个完整的 SQL 查询嵌套在另一个查询的 FROM 子句中,作为临时表格使用。
这个程序中包含了四个主要的查询语句,让我来详细解释每一个:
1. 查询账户余额TOP10:
select account_number, balance, equity, free_margin, margin, margin_level, last_update FROM mt4_re.account_status ORDER BY balance DESC LIMIT 10
这个查询从账户状态表中选择前10个余额最高的账户,显示他们的账号、余额、净值、可用保证金、已用保证金、保证金水平和最后更新时间。结果按余额从高到低排序。
2. 查询活跃交易订单:
select order_id, account_id, symbol, type, open_price, close_price, volume, stop_loss, take_profit, open_time, close_time FROM mt4_re.orders WHERE close_time > NOW() ORDER BY open_time DESC LIMIT 10
这个查询显示当前活跃的交易订单(未平仓的订单),包括订单号、账号、交易品种、订单类型、开仓价、平仓价、交易量、止损价、止盈价、开仓时间和平仓时间。结果按开仓时间从新到旧排序,最多显示10条记录。
3. 查询最新市场报价:
select time, symbol, bid, ask, high, low, volume, uuid FROM mt4_ts.market_data ORDER BY time DESC LIMIT 10
这个查询获取最新的市场报价数据,包括时间、交易品种、买价、卖价、最高价、最低价、成交量和唯一标识符。结果按时间从新到旧排序,显示最新的10条报价记录。
4. 查询历史订单盈亏统计:
select symbol, COUNT(*) as total_trades, SUM(close_price - open_price) as total_profit, AVG(close_price - open_price) as avg_profit, SUM(commission) as total_commission, SUM(swap) as total_swap FROM mt4_ts.order_history GROUP BY symbol ORDER BY total_profit DESC LIMIT 10
这个查询统计每个交易品种的历史交易情况,包括:
总交易次数(COUNT)
总盈亏(开平仓价差的总和)
平均盈亏(开平仓价差的平均值)
总佣金
package main import ( "context" "fmt" "log" "time" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" ) // 查询函数 func runQueries(ctx context.Context, pool *pgxpool.Pool) { var rows pgx.Rows var err error // 查询账户余额TOP10 queryTopAccounts := ` select account_number, balance, equity, free_margin, margin, margin_level, last_update FROM mt4_re.account_status ORDER BY balance DESC LIMIT 10 ` rows, err = pool.Query(ctx, queryTopAccounts) if err != nil { log.Printf("Failed to query top accounts: %v", err) return } defer rows.Close() fmt.Println("\n=== Top 10 Accounts by Balance ===") for rows.Next() { var accountNumber int var balance, equity, freeMargin, margin, marginLevel float64 var lastUpdate time.Time err := rows.Scan(&accountNumber, &balance, &equity, &freeMargin, &margin, &marginLevel, &lastUpdate) if err != nil { log.Printf("Error scanning row: %v\n", err) continue } if marginLevel > 1000 { marginLevel = 1000 // 限制显示范围 } fmt.Printf("Account: %d, Balance: %.2f, Equity: %.2f, Free Margin: %.2f, Margin: %.2f, Level: %.2f%%, Updated: %s\n", accountNumber, balance, equity, freeMargin, margin, marginLevel, lastUpdate.Format(time.RFC3339)) } // 查询活跃交易订单 queryActiveOrders := ` select order_id, account_id, symbol, type, open_price, close_price, volume, stop_loss, take_profit, open_time, close_time FROM mt4_re.orders WHERE close_time > NOW() ORDER BY open_time DESC LIMIT 10 ` rows, err = pool.Query(ctx, queryActiveOrders) if err != nil { log.Printf("Failed to query active orders: %v", err) return } defer rows.Close() fmt.Println("\n=== Active Trading Orders ===") for rows.Next() { var orderID int64 var accountID int var symbol, orderType string var openPrice, closePrice, volume, stopLoss, takeProfit float64 var openTime, closeTime time.Time err := rows.Scan(&orderID, &accountID, &symbol, &orderType, &openPrice, &closePrice, &volume, &stopLoss, &takeProfit, &openTime, &closeTime) if err != nil { log.Printf("Error scanning row: %v\n", err) continue } fmt.Printf("Order ID: %d, Account: %d, Symbol: %s, Type: %s, Open: %.5f, Close: %.5f, Vol: %.2f, SL: %.5f, TP: %.5f, Opened: %s\n", orderID, accountID, symbol, orderType, openPrice, closePrice, volume, stopLoss, takeProfit, openTime.Format(time.RFC3339)) } // 查询最新市场报价 queryMarketData := ` select time, symbol, bid, ask, high, low, volume, uuid FROM mt4_ts.market_data ORDER BY time DESC LIMIT 10 ` rows, err = pool.Query(ctx, queryMarketData) if err != nil { log.Printf("Failed to query market data: %v", err) return } defer rows.Close() fmt.Println("\n=== Latest Market Data ===") for rows.Next() { var timestamp time.Time var symbol, uuid string var bid, ask, high, low float64 var volume int err := rows.Scan(×tamp, &symbol, &bid, &ask, &high, &low, &volume, &uuid) if err != nil { log.Printf("Error scanning row: %v\n", err) continue } fmt.Printf("Time: %s, Symbol: %s, Bid: %.5f, Ask: %.5f, H: %.5f, L: %.5f, Vol: %.2f, UUID: %s\n", timestamp.Format(time.RFC3339), symbol, bid, ask, high, low, volume, uuid) } // 查询历史订单盈亏统计 queryProfitStats := ` select symbol, COUNT(*) as total_trades, SUM(close_price - open_price) as total_profit, AVG(close_price - open_price) as avg_profit, SUM(commission) as total_commission, SUM(swap) as total_swap FROM mt4_ts.order_history GROUP BY symbol ORDER BY total_profit DESC LIMIT 10 ` rows, err = pool.Query(ctx, queryProfitStats) if err != nil { log.Printf("Failed to query profit statistics: %v", err) return } defer rows.Close() fmt.Println("\n=== Trading Profit Statistics by Symbol ===") for rows.Next() { var symbol string var totalTrades int var totalProfit, avgProfit, totalCommission, totalSwap float64 err := rows.Scan(&symbol, &totalTrades, &totalProfit, &avgProfit, &totalCommission, &totalSwap) if err != nil { log.Printf("Error scanning row: %v\n", err) continue } fmt.Printf("Symbol: %s, Trades: %d, Total Profit: %.2f, Avg Profit: %.2f, Commission: %.2f, Swap: %.2f\n", symbol, totalTrades, totalProfit, avgProfit, totalCommission, totalSwap) } } func main() { // 数据库连接配置 url := fmt.Sprintf("postgresql://%s@%s/%s?sslmode=verify-full&sslrootcert=%s&sslcert=%s&sslkey=%s", "root", "47.110.144.145:26257", "defaultdb", "./certs/ca.crt", "./certs/client.root.crt", "./certs/client.root.key") config, err := pgxpool.ParseConfig(url) if err != nil { log.Fatalf("解析连接配置失败: %v", err) } config.MaxConns = 10 // 创建连接池 ctx := context.Background() pool, err := pgxpool.ConnectConfig(ctx, config) if err != nil { log.Fatalf("连接数据库失败: %v", err) } defer pool.Close() // 运行查询 runQueries(ctx, pool) }
①. select子句:指定了查询结果中要包含的列,即account_number, balance, equity, free_margin, margin, margin_level, 和 last_update。
②. FROM子句:指定了查询的数据表,即mt4_re.account_status。
③. ORDER BY子句:指定了查询结果按照balance列进行降序排序(DESC)。
④. LIMIT子句:限制查询结果只返回前10条记录。
⑤. 执行计划树解析
distributed:表示查询是在分布式环境中执行的,true表示确实是在分布式环境中执行。
vectorized:表示查询是否使用了向量化执行,false表示没有使用向量化执行。
limit:表示查询结果将被限制为一定数量的记录,这里是10条记录。
sort:表示查询结果将按照某个字段进行排序,这里是按照balance字段的降序(-balance表示降序)。
scan:表示对account_status表进行扫描以获取数据,spans字段的FULL SCAN表示这是一个全表扫描。
⑥. 总结:这条SQL查询语句的作用是从mt4_re.account_status表中选取account_number, balance, equity, free_margin, margin, margin_level, 和 last_update列的数据,按照balance列的值进行降序排序,并只返回前10条记录。执行计划表明,查询是在分布式环境中执行的,没有使用向量化执行,且需要对account_status表进行全表扫描。
①. select子句:指定了查询结果中要包含的列,包括订单ID、账户ID、交易品种、订单类型、开仓价格、平仓价格、交易量、止损价格、止盈价格和开仓时间、平仓时间。
②. FROM子句:指定了查询的数据表,即mt4_re.orders。
③. WHERE子句:设置了查询条件,仅选择close_time(平仓时间)大于当前时间(NOW())的记录。
④. ORDER BY子句:指定了查询结果按照open_time(开仓时间)列进行降序排序。
⑤. LIMIT子句:限制查询结果只返回前10条记录。
⑥. 执行计划树解析:
distributed:表示查询是在分布式环境中执行的,true表示确实是在分布式环境中执行。
vectorized:表示查询是否使用了向量化执行,false表示没有使用向量化执行。
limit:表示查询结果将被限制为一定数量的记录,这里是10条记录。
sort:表示查询结果将按照某个字段进行排序,这里是按照open_time字段的降序(-open_time表示降序)。
scan:表示对orders表进行扫描以获取数据,spans字段的FULL SCAN表示这是一个全表扫描。
filter:表示在扫描过程中会应用一个过滤条件,即close_time > ‘2025-06-02 14:32:52.01576365+00:00’,这个时间戳是查询执行时的当前时间,表示只选择平仓时间晚于该时间的记录。
⑦. 该SQL查询语句旨在从mt4_re.orders表中选取符合条件的订单记录,包括订单ID、账户ID等关键信息。查询条件是平仓时间必须晚于当前时间,结果按照开仓时间降序排序,并只返回前10条记录。执行计划表明,查询是在分布式环境中执行的,没有使用向量化执行,且需要对orders表进行全表扫描,并在扫描过程中应用过滤条件。
①. select子句:指定了查询结果中要包含的列,包括时间(time)、交易品种(symbol)、买价(bid)、卖价(ask)、最高价(high)、最低价(low)、交易量(volume)和唯一标识符(uuid)。
②. FROM子句:指定了查询的数据表,即mt4_ts.market_data。
③. ORDER BY子句:指定了查询结果按照time列进行降序排序(DESC)。
④. LIMIT子句:限制查询结果只返回前10条记录。
⑤. 执行计划树解析
distributed:表示查询是在分布式环境中执行的,true表示确实是在分布式环境中执行。
vectorized:表示查询是否使用了向量化执行,false表示没有使用向量化执行。
limit:表示查询结果将被限制为一定数量的记录,这里是10条记录。此外,还指出查询引擎类型为时间序列(time series),并且使用了排序扫描(useSorterScan为true)。
sort:表示查询结果将按照某个字段进行排序,这里是按照time字段的降序("-"time表示降序)。
synchronizer:表示在排序之前,数据可能需要同步处理。
ts scan:表示对时间序列表market_data进行扫描以获取数据。
ts-table:指出扫描的是时间序列表。
access mode:表示访问模式为tableTableMeta,这可能指的是访问表元数据的方式。
⑥. 该SQL查询语句旨在从mt4_ts.market_data时间序列表中选取最新的10条市场数据记录,包括时间、交易品种、买卖价格、最高最低价和交易量等信息。查询结果按照时间降序排序。执行计划表明,查询是在分布式环境中执行的,没有使用向量化执行,且利用了时间序列引擎的排序扫描功能。在扫描时间序列表时,可能还涉及了表元数据的访问。整个查询过程高效且针对时间序列数据进行了优化。
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>MT4交易数据监控</title> <script src="https://cdn.jsdelivr.net/npm/echarts@5.4.3/dist/echarts.min.js"></script> <style> body { margin: 0; font-family: Arial, sans-serif; background: #1a1a1a; color: #fff; } .container { padding: 20px; } .header { display: flex; justify-content: space-between; margin-bottom: 20px; } .card { background: #2a2a2a; border-radius: 8px; padding: 15px; margin-bottom: 20px; } .grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 20px; } .chart-container { height: 600px; background: #2a2a2a; border-radius: 8px; padding: 15px; } table { width: 100%; border-collapse: collapse; margin-top: 10px; } th, td { padding: 12px; text-align: left; border-bottom: 1px solid #3a3a3a; } th { background: #3a3a3a; } .positive { color: #00c853; } .negative { color: #ff3d00; } </style> </head> <body> <div> <div> <h1>MT4交易数据监控</h1> <div id="clock"></div> </div> <div> <div> <h2>账户余额TOP10</h2> <table id="accountTable"> <thead> <tr> <th>账号</th> <th>余额</th> <th>净值</th> <th>保证金水平</th> </tr> </thead> <tbody></tbody> </table> </div> <div> <h2>活跃交易订单</h2> <table id="ordersTable"> <thead> <tr> <th>订单号</th> <th>交易品种</th> <th>类型</th> <th>开仓价</th> <th>止损</th> <th>止盈</th> </tr> </thead> <tbody></tbody> </table> </div> </div> <div> <div id="klineChart" style="height: 100%;"></div> </div> </div> <script> // 初始化图表 const chart = echarts.init(document.getElementById('klineChart')); const basePrice = 3500; const data = []; let now = new Date(); let price = basePrice; for (let i = 0; i < 100; i++) { const open = price + Math.random() * 20 - 10; const close = open + Math.random() * 20 - 10; const low = Math.min(open, close) - Math.random() * 10; const high = Math.max(open, close) + Math.random() * 10; const volume = Math.round(Math.random() * 1000); data.push({ time: now.getTime(), open, close, low, high, volume }); now = new Date(now.getTime() - 60000); // 前一分钟 price = close; } return data.reverse(); } // 更新图表选项 function updateChartOption(data) { const option = { backgroundColor: '#2a2a2a', animation: false, legend: { top: 10, left: 'center', textStyle: { color: '#fff' } }, tooltip: { trigger: 'axis', axisPointer: { type: 'cross' }, backgroundColor: 'rgba(42, 42, 42, 0.8)', borderWidth: 0, textStyle: { color: '#fff' } }, grid: [ { left: '10%', right: '8%', height: '60%' }, { left: '10%', right: '8%', top: '75%', height: '15%' } ], xAxis: [ { type: 'category', data: data.map(item => new Date(item.time).toLocaleTimeString()), axisLine: { lineStyle: { color: '#8392A5' } } }, { type: 'category', gridIndex: 1, data: data.map(item => new Date(item.time).toLocaleTimeString()), axisLine: { lineStyle: { color: '#8392A5' } } } ], yAxis: [ { scale: true, splitLine: { show: false }, axisLine: { lineStyle: { color: '#8392A5' } }, textStyle: { color: '#fff' } }, { scale: true, gridIndex: 1, splitNumber: 2, axisLine: { lineStyle: { color: '#8392A5' } }, axisLabel: { show: false }, splitLine: { show: false } } ], dataZoom: [ { type: 'inside', xAxisIndex: [0, 1], start: 50, end: 100 } ], series: [ { name: 'K线', type: 'candlestick', data: data.map(item => [item.open, item.close, item.low, item.high]), itemStyle: { color: '#00c853', color0: '#ff3d00', borderColor: '#00c853', borderColor0: '#ff3d00' } }, { name: '成交量', type: 'bar', xAxisIndex: 1, yAxisIndex: 1, data: data.map(item => item.volume), itemStyle: { color: '#7fbe9e' } } ] }; chart.setOption(option); } // 生成模拟账户数据 function generateAccountData() { const accounts = []; for (let i = 0; i < 10; i++) { accounts.push({ accountNumber: Math.floor(Math.random() * 10000), balance: Math.round(Math.random() * 100000), equity: Math.round(Math.random() * 100000), marginLevel: Math.round(Math.random() * 1000) }); } return accounts; } // 生成模拟订单数据 function generateOrdersData() { const orders = []; const symbols = ['EURUSD', 'GBPUSD', 'USDJPY', 'AUDUSD']; const types = ['BUY', 'SELL']; for (let i = 0; i < 5; i++) { orders.push({ orderId: Math.floor(Math.random() * 1000000), symbol: symbols[Math.floor(Math.random() * symbols.length)], type: types[Math.floor(Math.random() * types.length)], openPrice: (Math.random() * 2 + 1).toFixed(5), stopLoss: (Math.random() * 2 + 1).toFixed(5), takeProfit: (Math.random() * 2 + 1).toFixed(5) }); } return orders; } // 更新账户表格 function updateAccountTable() { const accounts = generateAccountData(); const tbody = document.querySelector('#accountTable tbody'); tbody.innerHTML = accounts.map(account => ` <tr> <td>${account.accountNumber}</td> <td>${account.balance.toFixed(2)}</td> <td>${account.equity.toFixed(2)}</td> <td>${account.marginLevel.toFixed(2)}%</td> </tr> `).join(''); } // 更新订单表格 function updateOrdersTable() { const orders = generateOrdersData(); const tbody = document.querySelector('#ordersTable tbody'); tbody.innerHTML = orders.map(order => ` <tr> <td>${order.orderId}</td> <td>${order.symbol}</td> <td class="${order.type === 'BUY' ? 'positive' : 'negative'}">${order.type}</td> <td>${order.openPrice}</td> <td>${order.stopLoss}</td> <td>${order.takeProfit}</td> </tr> `).join(''); } // 更新时钟 function updateClock() { const clock = document.getElementById('clock'); clock.textContent = new Date().toLocaleString(); } // 初始化数据和更新 let klineData = generateKLineData(); updateChartOption(klineData); updateAccountTable(); updateOrdersTable(); updateClock(); // 定时更新数据 setInterval(() => { const lastData = klineData[klineData.length - 1]; const newData = { time: new Date().getTime(), open: lastData.close, close: lastData.close + Math.random() * 20 - 10, low: lastData.close - Math.random() * 10, high: lastData.close + Math.random() * 10, volume: Math.round(Math.random() * 1000) }; klineData.push(newData); klineData.shift(); updateChartOption(klineData); updateAccountTable(); updateOrdersTable(); updateClock(); }, 1000); // 响应窗口大小变化 window.addEventListener('resize', () => chart.resize()); </script> </body> </html>
KWDB在时序处理、多模融合、成本控制上的创新,高度契合金融行情系统对海量时序数据的实时处理需求。建议在非核心业务先行试点,重点验证高频写入稳定性及AI引擎预测准确性,逐步替代现有Redis+MySQL混合架构,构建新一代行情数据基座。
MT4业务作为典型时序场景(每秒高频报价),KWDB的时序引擎可显著优化K线计算、历史回测等操作;其分布式特性则直接解决流量激增带来的扩容瓶颈,降低服务器成本30%+(按截图推算)。
KWDB分布式多模数据库通过融合时序与关系型数据处理能力,完美解决企业面临的成本高、扩容难、迁移复杂等痛点,其千万级设备接入、百万级秒级写入的性能特别适合MT4行情系统的高频时序数据处理,同时原生AI支持可挖掘数据价值,建议通过非核心业务试点验证其压缩率(宣称5-30倍)和稳定性后逐步替代现有架构,实现降本增效。