haStreamTable

语法

haStreamTable(raftGroup, table, tableName, cacheLimit, [keyColumn], [retentionMinutes=1440])

参数

raftGroup 是一个大于1的整数,表示Raft组的ID。

table 是一个表对象。它必须是一个由table函数创建的空表。

tableName 是一个字符串,表示高可用流数据表的名称。

cacheLimit 是一个整数,表示高可用流数据表在内存中最多保留多少行。如果cacheLimit是小于100,000的正整数,它会被自动调整为100,000。

keyColumn 是一个字符串标量或向量,表示主键。它是一个可选参数。

retentionMinutes 是一个整数,表示保留大小超过 1GB 的 log 文件的时间(从文件的最后修改时间开始计算),单位是分钟。默认值是1440,即一天。

详情

创建高可用流数据表。该函数只能在启用流数据高可用后使用。要启用流数据高可用,用户需要在集群配置文件cluster.cfg中指定配置参数streamingHAMode和streamingRaftGroups。系统启动时,配置参数streamingRaftGroup指定的数据节点/计算节点组成Raft组,一个数据节点/计算节点作为Leader,其他数据节点/计算节点作为Follower。Raft组的每个/计算节点上都有流数据表的副本。

客户端只需订阅Raft组中任意一个数据节点/计算节点上的高可用流数据表,并启用订阅的自动重连功能,即把reconnect参数设置为true。Leader上的高可用流数据表会向客户端发布数据。如果Raft组中的Leader宕机,系统会选举出新的Leader继续发布数据,客户端会自动切换订阅到新的Leader上的高可用流数据表。

一个Raft组可以包含多个高可用流数据表。

例子

假设配置参数streamingRaftGroup=11:NODE1:NODE2:NODE3,在Raft组的任意一个节点如NODE1上执行以下脚本创建高可用流数据表trades:

$ colNames = `timestamp`sym`qty`price
$ colTypes = [TIMESTAMP,SYMBOL,INT,DOUBLE]
$ t=table(1:0,colNames,colTypes)
$ haStreamTable(11,t,`trades,100000);

在集群的另外一个节点NODE4上执行以下脚本订阅表trades,把订阅的数据保存至分布式数据库中。

$ if(existsDatabase("dfs://stock")){
$    dropDatabase("dfs://stock")
$ }
$ db=database(directory="dfs://stock",partitionType=VALUE,partitionScheme=2018.08.01..2019.12.30)
$ t=table(1:0,`timestamp`sym`qty`price,[TIMESTAMP,SYMBOL,INT,DOUBLE])
$ trades_slave=db.createPartitionedTable(table=t,tableName=`trades_slave,partitionColumns=`timestamp);
$ subscribeTable(server=NODE2,tableName=`trades,actionName=`sub_trades,offset=-1,handler=append!{trades_slave},msgAsTable=true,batchSize=1000,throttle=1,hash=-1,reconnect=true);

//这里subscribeTable函数的第一个参数可以是NODE1,NODE2,NODE3中的任意一个,reconnect参数必须为true。

在NODE4上执行以下脚本取消订阅:

$ unsubscribeTable(server=NODE2,tableName=`trades,actionName=`sub_trades);
//这里unsubscribeTable函数的第一个参数可以是NODE1,NODE2,NODE3中的任意一个。

相关函数:dropStreamTable, getStreamingLeader, getStreamingRaftGroups