DolphinDB 内置的流数据结构支撑流数据的发布、订阅、预处理、实时内存核算、杂乱目标的翻滚窗口核算等,是一个运转高效、运用快捷的流数据处理结构。

DolphinDB节点启动时的流计算自动订阅教程

本教程首要解决根据 DolphinDB 流数据处理结构完结事务代码开发后,怎么布置节点发动时的流核算主动订阅问题。

1. 事务代码开发

事务描述

以金融交易实时 Level2 的股票快照数据为例,完结以下事务需求:

(1)Level2 的股票快照数据实时写入数据库耐久化;

(2)根据原始数据的 LastPx、TotalVolumeTrade、TotalValueTrade 的值,处理出每一笔快照数据的一些中心值:如回报率(Return)、成交量(Volume)、成交金额(Amount);

(3)根据原始数据和处理的中心值生成一些分钟目标。

上述事务在 DolphinDB 中的处理流程如下图所示:

DolphinDB节点启动时的流计算自动订阅教程

处理流程图说明:

(1)snapshotStream、snapshotStreamProcess、snapshotAggr1min 都是同享的异步耐久化流数据表。snapshotStream 用于存储 Level2 的股票快照原始数据,snapshotStreamProcess 用于存储呼应式状况引擎处理后的包括中心值的成果数据,snapshotAggr1min 用于存储时刻序列引擎处理后的分钟目标数据。将内存表同享的目的是让当时节点一切其它会话对该表可见,API 写入时与 DolphinDB Server 的会话相对于界说这些表的会话不是同一个,所以需求同享。对流数据表的耐久化的首要目的有两个,一是操控该表的最大内存占用,经过设置 enableTableShareAndPersistence 函数中的 cacheSize 大小,操控该表在内存中保留的最大记录条数从而操控该表的最大内存占用;二是在节点反常封闭的情况下,从耐久化目录恢复现已写入流数据表但是未消费的数据,保证流数据“至少消费一次”的需求。流数据表耐久化采用异步的方式进行,能够有用进步流表写入的吞吐量。只有流数据表才能够被订阅消费,所以需求将这些表界说成流数据表。

(2)subExecutor 表明流数据处理线程,能够经过装备文件的 subExecutors 指定节点的最大可用流数据处理线程数。经过设置 subscribeTable 函数中的 hash 参数,指定消费 topic 的流数据处理线程。比如 subExecutor 设置为 n,则 hash 能够从 0 至 n-1 进行指定,对应流数据处理线程 1 至 n。

(3)呼应式状况引擎和时刻序列引擎是 DolphinDB 的内置的流核算引擎,都已完结增量核算。呼应式状况引擎处理了上述事务需求中的中心值,时刻序列引擎用于核算生成分钟目标。

(4)loadTable(“dfs://snapshot”, “snapshot”) 用于存储原始数据,做数据的耐久化。

事务代码

本教程事务代码根据 1.30.15 版别开发,1.30 一切版别都能够运转本教程的示例代码,具体代码内容见附件。

2. DolphinDB 体系发动流程

DolphinDB 体系的发动流程如下图所示:

DolphinDB节点启动时的流计算自动订阅教程

  • 体系初始化脚本(dolphindb.dos)
    体系初始化脚本是必需的,默许加载版别发布目录中的 dolphindb.dos,不建议做修改,因为版别晋级的时候需求用新版别发布包中的体系初始化脚本覆盖。

  • 用户发动脚本(startup.dos)
    用户发动脚本是经过装备参数 startup 后才会履行,单机 single 形式在 dolphindb.cfg 中装备,集群形式在 cluster.cfg 中装备,可装备绝对路径或相对路径。若装备了相对路径或许没有指定目录,体系会顺次查找本地节点的 home 目录、工作目录和可履行文件地点目录。
    装备举例如下:

    startup=/opt/DolphinDB/server/startup.dos

将上述事务代码添加到 /opt/DolphinDB/server 目录的 startup.dos 文件中,并在对应的装备文件中装备参数 startup,即可完结节点发动时的流核算主动订阅布置。

  • 守时使命脚本(postStart.dos)
    DolphinDB 中经过 scheduleJob 函数界说的守时使命是做耐久化的,所以在节点从头发动时,体系会在履行完用户发动脚本后在初始化守时使命模块时完结耐久化守时使命的加载。完结上述过程后,体系会履行守时使命脚本,此刻能够在守时使命脚本中调用 scheduleJob 函数界说新的守时使命。本教程中未运用该功用,所以不需求敞开该装备项。1.30.15 和 2.00.3 版别开端支撑装备 postStart.dos 完结节点发动主动履行守时使命脚本。上述事务代码添加到 /opt/DolphinDB/server 目录的 startup.dos 文件中,并在对应的装备文件中装备参数 startup,即可完结节点发动时的流核算主动订阅布置。

3 单节点 single 形式布置流核算主动订阅

装备文件 dolphindb.cfg

localSite=localhost:8848:local8848
mode=single
maxMemSize=128
maxConnections=512
workerNum=8
localExecutors=7
maxConnectionPerSite=15
newValuePartitionPolicy=add
webWorkerNum=8
dataSync=1
chunkCacheEngineMemSize=8
persistenceDir=/opt/DolphinDB/server/local8848/persistenceDir
startup=/opt/DolphinDB/server/startup.dos
maxPubConnections=64
subExecutors=7
subPort=8849
subThrottle=1
persistenceWorkerNum=1
lanCluster=0

发动 DolphinDB Server,履行下述语句调查运转日志 dolphindb.log 中近期的日志:

tail -1000f /opt/DolphinDB/server/dolphindb.log

调查到用户发动脚本中履行成功打印的日志信息:

2021-12-01 00:23:56.314159 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.314172 <INFO> :dfs://snapshot created successfully !
2021-12-01 00:23:56.314178 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.315084 <INFO> :Prepare to share a stream table: tableName=snapshotStream raftGroup=-1
2021-12-01 00:23:56.315132 <INFO> :enableTablePersistence tableName=snapshotStream hashValue=0 offset=0 cacheSize=5000000
2021-12-01 00:23:56.315163 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.315174 <INFO> :sharedTable1:snapshotStream created  successfully !
2021-12-01 00:23:56.315182 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.315512 <INFO> :Prepare to share a stream table: tableName=snapshotStreamProcess raftGroup=-1
2021-12-01 00:23:56.315534 <INFO> :enableTablePersistence tableName=snapshotStreamProcess hashValue=1 offset=0 cacheSize=5000000
2021-12-01 00:23:56.315549 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.315562 <INFO> :sharedTable2:snapshotStreamProcess created successfully !
2021-12-01 00:23:56.315569 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.315783 <INFO> :Prepare to share a stream table: tableName=snapshotAggr1min raftGroup=-1
2021-12-01 00:23:56.315806 <INFO> :enableTablePersistence tableName=snapshotAggr1min hashValue=2 offset=0 cacheSize=2000000
2021-12-01 00:23:56.315821 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.315833 <INFO> :sharedTable3:snapshotAggr1min created successfully !
2021-12-01 00:23:56.315840 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.316775 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.316793 <INFO> :ReactiveStateEngine:snapshotProcessing created successfully !
2021-12-01 00:23:56.316800 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.316852 <INFO> :Begin to subscription topic=localhost:24110:local24110/snapshotStream/snapshotProcessing
2021-12-01 00:23:56.316888 <INFO> :Enable reconnection for topic=localhost:24110:local24110/snapshotStream/snapshotProcessing site=local24110:1
2021-12-01 00:23:56.316915 <INFO> :[subscribeTable] #attempt=0 topic=localhost:24110:local24110/snapshotStream/snapshotProcessing conn=
2021-12-01 00:23:56.316940 <INFO> :Received a request to publish table [snapshotStream] to site localhost:24111.Offset=-1
2021-12-01 00:23:56.317229 <INFO> :Subscription topic=localhost:24110:local24110/snapshotStream/snapshotProcessing hashValue=0
2021-12-01 00:23:56.317252 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.317259 <INFO> :subscribe1:snapshotStream subscribed successfully !
2021-12-01 00:23:56.317264 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.318486 <INFO> :Begin to subscription topic=localhost:24110:local24110/snapshotStreamProcess/snapshotAggr1min
2021-12-01 00:23:56.318531 <INFO> :Enable reconnection for topic=localhost:24110:local24110/snapshotStreamProcess/snapshotAggr1min site=local24110:1
2021-12-01 00:23:56.318555 <INFO> :[subscribeTable] #attempt=0 topic=localhost:24110:local24110/snapshotStreamProcess/snapshotAggr1min conn=
2021-12-01 00:23:56.318574 <INFO> :Received a request to publish table [snapshotStreamProcess] to site localhost:24111.Offset=-1
2021-12-01 00:23:56.318844 <INFO> :Subscription topic=localhost:24110:local24110/snapshotStreamProcess/snapshotAggr1min hashValue=1
2021-12-01 00:23:56.318871 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.318883 <INFO> :subscribe2:snapshotStreamProcess subscribed successfully !
2021-12-01 00:23:56.318891 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.318942 <INFO> :Begin to subscription topic=localhost:24110:local24110/snapshotStream/snapshotToDatabase
2021-12-01 00:23:56.318968 <INFO> :Enable reconnection for topic=localhost:24110:local24110/snapshotStream/snapshotToDatabase site=local24110:1
2021-12-01 00:23:56.318996 <INFO> :[subscribeTable] #attempt=0 topic=localhost:24110:local24110/snapshotStream/snapshotToDatabase conn=
2021-12-01 00:23:56.319011 <INFO> :Received a request to publish table [snapshotStream] to site localhost:24111.Offset=-1
2021-12-01 00:23:56.319042 <INFO> :Subscription topic=localhost:24110:local24110/snapshotStream/snapshotToDatabase hashValue=2
2021-12-01 00:23:56.319058 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.319065 <INFO> :subscribe3:snapshotStream subscribed successfully !
2021-12-01 00:23:56.319071 <INFO> :---------------------------------------------------------------------

假如没有 error 信息提示,且看到用户发动脚本成功履行到了最终一行代码,打印了 subscribe3:snapshotStream subscribed successfully ! 的信息,说明用户发动脚本成功履行,流核算主动订阅布置成功。

经过 GUI 衔接相应的数据节点,履行登录代码:

login("admin", "123456")

能够在右下角的变量栏调查到耐久化流表 snapshotStream、snapshotStreamProcess、snapshotAggr1min 现已成功界说加载:

DolphinDB节点启动时的流计算自动订阅教程

履行流数据发布表订阅状信息询函数:

getStreamingStat().pubTables

回来一切订阅信息:

DolphinDB节点启动时的流计算自动订阅教程

履行呼应式状况引擎信息询函数:

getStreamEngineStat().ReactiveStreamEngine

回来信息:

DolphinDB节点启动时的流计算自动订阅教程

履行时刻序列引擎信息询函数:

getStreamEngineStat().TimeSeriesEngine

回来信息:

DolphinDB节点启动时的流计算自动订阅教程

此刻,只需求将实时 Level2 的股票快照数据经过 API 写入流数据表 snapshotStream,就会按照上述的事务处理逻辑对流数据进行实时处理。

4. DolphinDB 数据回放东西

DolphinDB 供给了快捷的历史数据回放东西,首要是经过 replay 函数完结历史数据的回放功用。

本教程回放的对象是一个内存表数据,包括了 5 只股票某天的历史 Level2 快照数据,csv 文本文件(见附件)的存储路径为:/data/snapshot/stockData.csv,回放的代码如下,在时刻维度加快了 1000 倍进行回放:

//replay
filePath = "/data/snapshot/stockData.csv"
schema = table(loadTable("dfs://snapshot", "snapshot").schema().colDefs.name as name, loadTable("dfs://snapshot", "snapshot").schema().colDefs.typeString as type)
snapshot = loadText(filename=filePath, schema=schema)
replay(inputTables=snapshot, outputTables=snapshotStream, dateColumn=`Date, timeColumn=`Time, replayRate=1000, absoluteRate=true)

大约 26 s 今后,5 只股票某天的历史数据回放结束,履行下述代码查看某只股票的分钟处理目标:

select * from snapshotAggr1min where SecurityID="000001.SZ"

查询成果回来至客户端:

DolphinDB节点启动时的流计算自动订阅教程

一起能够履行下述代码,查询写入数据库的耐久化数据的条数:

select count(*) from loadTable("dfs://snapshot", "snapshot")

查询成果回来至客户端:

DolphinDB节点启动时的流计算自动订阅教程

调查下图后可知,本次回放的 snapshot 内存表的数据条数是 24909 条,说明写入 snapshotStream 流数据表的一切数据全部被写入数据库完结了数据耐久化,未产生数据丢掉。

DolphinDB节点启动时的流计算自动订阅教程

5. 总结

本教程首要根据一个金融交易实时 Level2 的股票快照数据流核算案例,结合事务逻辑处理代码具体叙述了节点发动时的流核算主动订阅布置过程,以及在开发布置过程中的常用调试手段,比如运用 writeLog 函数打印代码调试日志、合理运用 go 语句分段解析并履行代码块、调查节点运转日志判断用户发动脚本的履行情况、经过流数据状况信息查询函数承认订阅信息和流核算引擎的界说情况等。目的是为了进步开发人员在运用 DolphinDB 内置的流数据结构开发流核算事务场景时的开发功率、下降开发难度,一起下降开发结束的流核算事务代码的主动化布置和运维难度,更好地挖掘 DolphinDB 在杂乱实时流核算场景中的价值。

附件

事务代码

csv 文本文件