streamFilter
New in version 1.30.17.
详情
创建流数据过滤引擎,对输入引擎的数据进行过滤和分发处理,返回一个表对象。
该引擎的处理流程包含以下步骤:
1. 输入数据为异构流表(由 replay 函数回放得到)时,解析(反序列化)数据。输入数据为普通流表时无此步骤。注意:1.30.18 版本开始,streamFilter 函数支持过滤普通流表数据,并进行分发处理。
2. 根据 filter 指定的规则对流表进行过滤。
3. 过滤后的数据按照时间顺序分发到不同的 handler 处理。
注意:
1.30.17 版本开始,DolphinDB 支持通过 replay 函数将多个结构不同的流数据表,回放(序列化)到一个流表里,这个流表被称为异构流表。streamFilter 函数用于解析(反序列化)和过滤异构流表的数据并进行分发处理。
语法
streamFilter(name, dummyTable, filter, [msgSchema], [timeColumn], [conditionColumn])
参数
name 字符串标量,表示流数据过滤引擎的名称,可包含字母,数字和下划线,但必须以字母开头。
dummyTable 表对象,和订阅的流数据表的 schema 一致,可以含有数据,亦可为空表。
filter 单个字典或包含多个字典的 tuple。对注入引擎的数据进行过滤和处理。
字典可选的 key-value 说明如下:
‘timeRange’:可选字段,一个数据对或包含多个数据对的 tuple。以指定的 timeRange 对 timeColumn 进行过滤。注意:若分发普通流表,则 timeRange 必须和 timeColumn 的数据类型一致;异构流表无该限制。
‘condition’:表示过滤条件。
若分发异构流表,则 ‘condition’ 是一个字符串,且必须是 replay 参数 inputTables 的键值(表的标识)。用于过滤出异构表中具有 condition 标识的数据。
若分发普通流表,则 ‘condition’ 可以是一个字符串标量/向量,或者是包含一个或多个布尔表达式的元代码。其中表达式中可以使用内置函数,但不支持部分应用。请注意,若 ‘condition’ 指定为字符串标量/向量,必须为 conditionColumn 指定列的元素。
‘handler’:一元函数或表对象(包括流数据引擎返回的表对象)。
如果它是一元函数,它唯一的参数是经过解析和过滤后的数据表。
如果它是表对象,经过解析和过滤后的数据会直接插入到该表中。
msgSchema 可选字段,字典。
若分发异构流表,其结构与 replay 回放到异构流表的数据表结构保持一致。引擎将根据 msgSchema 指定的结构对注入的数据解析;
若分发普通流表,该参数必须设置为空。
以下参数仅在分发普通流表时指定:
timeColumn 字符串,指定为 dummyTable 中时间列的名称。若不指定,则取 dummyTable 第一列的列名。
conditionColumn 字符串,指定为 dummyTable 中的列名。注意 conditionColumn 只能指定 STRING 或 SYMBOL 类型的列。若不指定该参数,则 filter 的 condition 字段将失效。
例子
(1) 异构流表分发:
一个典型场景:对分布式数据库中的订单表和交易表进行回放,模拟两者实时 asof join 的场景。如使用 replay 进行 N 对 N 回放,则无法保证数据严格按照时间戳 顺序注入 asof join 引擎的左表和右表(原因详见 replay 说明)。但通过 replay 将两表回放到一个异构流表则可以保证表内所有数据按照时间戳顺序排序, 之后通过 streamFilter 将表中数据分发至 asof join 引擎的左、右表时,可保证数据按时间戳顺序注入引擎左、右两表。
//创建 order 表
$ n=1000
$ sym = take(take("IBM",n).join(take("GS",n)), n*2*3)
$ date=take(2022.01.04..2022.01.06, n*2*3).sort!()
$ timestamp1=take(2022.01.04 09:30:00.000+rand(1000,n),n) join take(2022.01.04 09:31:00.000+rand(1000,n),n)
$ timestamp2=take(2022.01.05 09:30:00.000+rand(1000,n),n) join take(2022.01.05 09:31:00.000+rand(1000,n),n)
$ timestamp3=take(2022.01.06 09:30:00.000+rand(1000,n),n) join take(2022.01.06 09:31:00.000+rand(1000,n),n)
$ timestamp=timestamp1 join timestamp2 join timestamp3
$ volume = rand(100, n*2*3)
$ t=table(sym,date,timestamp,volume)
$ if(existsDatabase("dfs://test_order")){
$ dropDatabase("dfs://test_order")
$ }
$ db1_or=database("",RANGE, 2022.01.04..2022.01.07)
$ db2_or=database("",VALUE,`IBM`GS)
$ db_or=database("dfs://test_order",COMPO,[db1_or, db2_or])
$ orders=db_or.createPartitionedTable(t,`orders,`date`sym)
$ orders.append!(t);
$ select count(*) from orders
6000
//创建trades表
$ n=2000
$ sym = take(take("IBM",n).join(take("GS",n)), n*2*3)
$ date=take(2022.01.04..2022.01.06, n*2*3).sort!()
$ timestamp1=take(2022.01.04 09:30:00.000+rand(1000,n),n) join take(2022.01.04 09:31:00.000+rand(1000,n),n)
$ timestamp2=take(2022.01.05 09:30:00.000+rand(1000,n),n) join take(2022.01.05 09:31:00.000+rand(1000,n),n)
$ timestamp3=take(2022.01.06 09:30:00.000+rand(1000,n),n) join take(2022.01.06 09:31:00.000+rand(1000,n),n)
$ timestamp=timestamp1 join timestamp2 join timestamp3
$ volume = rand(100, n*2*3)
$ price = rand(50.0, n*3) join rand(20.0, n*3)
$ t=table(sym,date,timestamp,volume,price)
$ if(existsDatabase("dfs://test_trades")){
$ dropDatabase("dfs://test_trades")
$ }
$ db1=database("",RANGE, 2022.01.04..2022.01.07)
$ db2=database("",VALUE,`IBM`GS)
$ db=database("dfs://test_trades",COMPO,[db1, db2])
$ trades=db.createPartitionedTable(t,`trades,`date`sym)
$ trades.append!(t);
$ select count(*) from trades
12000
//准备回放的数据源和输出表
$ ds_or = replayDS(sqlObj=<select * from loadTable(db_or, `orders)>, dateColumn=`date, timeColumn=`timestamp)
$ ds = replayDS(sqlObj=<select * from loadTable(db, `trades)>, dateColumn=`date, timeColumn=`timestamp)
$ input_dict=dict(["orders","trades"], [ds_or, ds])
$ share streamTable(100:0,`timestamp`sym`blob`volume, [TIMESTAMP,SYMBOL, BLOB, INT]) as opt
//订阅异构流表回放数据到分配引擎
$ share streamTable(100:0,`timestamp`sym`blob`volume, [TIMESTAMP,SYMBOL, BLOB, INT]) as streamFilterOpt
$ share streamTable(100:0, `sym`date`timestamp`volume, [SYMBOL, DATE, TIMESTAMP, INT] ) as streamOrders
$ share streamTable(100:0, `sym`date`timestamp`volume`price, [SYMBOL, DATE, TIMESTAMP, INT, DOUBLE] ) as streamTrades
$ streamOpt=table(100:0, `timestamp`sym`volume`price`result, [TIMESTAMP, SYMBOL, INT, DOUBLE, DOUBLE])
$ filter1=dict(STRING,ANY)
$ filter1['condition']=`orders
$ filter1['timeRange']=09:30:00.000:09:30:00.005
$ filter2=dict(STRING,ANY)
$ filter2['condition']=`trades
$ filter2['timeRange']=09:30:00.000:09:30:00.005
$ ajEngine=createAsofJoinEngine(name="ajEngine", leftTable=streamOrders, rightTable=streamTrades, outputTable=streamOpt, metrics=<[volume,price,price*volume]>, matchingColumn=`sym, useSystemTime=true)
$ filter1['handler']=getLeftStream(ajEngine)
$ filter2['handler']=getRightStream(ajEngine)
$ schema=dict(["orders","trades"], [streamOrders, streamTrades])
$ engine=streamFilter(name=`streamFilter,dummyTable=streamFilterOpt, filter=[filter1,filter2],msgSchema=schema)
$ subscribeTable(tableName="opt", actionName="sub1", offset=0, handler=engine, msgAsTable=true)
//结构不同的两个表数据回放到opt
$ replay(inputTables=input_dict,outputTables=opt, timeColumn=`timestamp)
$ select count(*) from streamOpt
20
//取消订阅
unsubscribeTable(tableName="opt", actionName="sub1")
dropStreamEngine(`streamFilter)
dropStreamEngine(`ajEngine)
(2) 普通流表分发:
下例,将普通流表(trades)数据注入分发引擎中,根据过滤条件进行分发处理。
$ n=20
$ sym = symbol(take(`A`B`C,n))
$ name = string(rand(1..10,n))
$ date = temporalAdd(2012.12.06,0..(n-1),'d')
$ time = temporalAdd(09:30:00.000,0..(n-1),'ms')
$ vol = 100+take(1..8,20)
$ t = table(date,time,sym,name,vol)
//定义两个流数据引擎,作为分发引擎 handler
$ share streamTable(100:0,`date`time`sym`name`vol,[DATE,TIME,SYMBOL,STRING,INT]) as st1
$ share streamTable(100:0,`date`time`sym`name`vol,[DATE,TIME,SYMBOL,STRING,INT]) as st2
$ share streamTable(100:0,`date`time`sym`name`vol,[DATE,TIME,SYMBOL,STRING,INT]) as st3
//handler为引擎时的输入流表
$ share streamTable(100:0,`time`sym`sum_vol,[TIME,SYMBOL,INT]) as output1
$ share streamTable(100:0,`time`avg_vol,[TIME,INT]) as output2
$ engine1=createTimeSeriesEngine(name="timeEngine", windowSize=3, step=3, metrics=<[sum(vol)]>, dummyTable=st3, outputTable=output1, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50)
$ engine2=createReactiveStateEngine(name="reactiveEngine", metrics=<[mavg(vol, 3)]>, dummyTable=st1, outputTable=output2, keyColumn=`sym)
//需要注入分发引擎的流表
$ share streamTable(100:0,`date`time`sym`name`vol,[DATE,TIME,SYMBOL,STRING,INT]) as trades
//设置第一个过滤条件,将 trades 表中 sym 列对应元素为 A 且时间在09:30:00.001:09:30:00.010和09:29:00.000:09:30:00.000的数据输入状态引擎进行处理
$ filter1 = dict(STRING,ANY)
$ filter1['condition']=`A
$ filter1['handler']=engine2
$ filter1['timeRange']=(09:30:00.001:09:30:00.010,09:29:00.000:09:30:00.000)
//设置第二个过滤条件,将 trades 表时间范围在09:30:00.002:09:30:00.005里的数据全部输出到st2表里
$ filter2 = dict(STRING,ANY)
$ filter2['handler']=st2
$ filter2['timeRange']=09:30:00.002:09:30:00.005
//设置第三个过滤条件,将 trades 表中 sym 列对应元素为 A 和 C 的数据输入时序引擎进行处理
$ filter3 = dict(STRING,ANY)
$ filter3['condition']=`C`A
$ filter3['handler']=engine1
//订阅分发引擎,根据以上三个过滤条件,对 trades 表中的数据进行分发处理
$ streamFilter2=streamFilter(name="streamFilterDemo",dummyTable=trades,filter=[filter1,filter2,filter3], timeColumn=`time, conditionColumn=`sym)
$ subscribeTable(tableName="trades", actionName="sub1", offset=0, handler=streamFilter2, msgAsTable=true)
$ trades.append!(t)
$ select * from output1
time |
sym |
sum_vol |
---|---|---|
09:30:00.003 |
A |
101 |
09:30:00.003 |
C |
103 |
09:30:00.006 |
A |
104 |
09:30:00.006 |
C |
106 |
09:30:00.009 |
A |
107 |
09:30:00.009 |
C |
101 |
09:30:00.012 |
A |
102 |
09:30:00.012 |
C |
104 |
09:30:00.015 |
A |
105 |
09:30:00.015 |
C |
107 |
09:30:00.018 |
A |
108 |
$ select * from output2
time |
avg_vol |
---|---|
00:00:00.001 |
|
00:00:00.001 |
|
00:00:00.001 |
104 |
00:00:00.001 |
104 |
$ select * from st2
date |
time |
sym |
name |
vol |
---|---|---|---|---|
2012.12.08 |
09:30:00.002 |
C |
6 |
103 |
2012.12.09 |
09:30:00.003 |
A |
8 |
104 |
2012.12.10 |
09:30:00.004 |
B |
10 |
105 |
2012.12.11 |
09:30:00.005 |
C |
10 |
106 |
2012.12.12 |
09:30:00.006 |
A |
10 |
107 |
2012.12.13 |
09:30:00.007 |
B |
1 |
108 |
2012.12.14 |
09:30:00.008 |
C |
3 |
101 |
2012.12.15 |
09:30:00.009 |
A |
4 |
102 |
2012.12.16 |
09:30:00.010 |
B |
9 |
103 |
可以为 condition 指定表达式,以支持更复杂的过滤逻辑,将上例中 filter2 的 condition 指定为条件表达式,同时对 vol 和 date 列进行过滤。
$ filter2 = dict(STRING,ANY)
$ filter2['condition'] = <sym==`A and 101<vol<105 and date<2012.12.15>
$ filter2['handler'] = st2
$ filter2['timeRange'] = 09:30:00.002:09:30:00.010
$ select * from st2
date |
time |
sym |
name |
vol |
---|---|---|---|---|
2012.12.09 |
09:30:00.003 |
A |
7 |
104 |