分布式计算
本节讲述并行函数调用,远程函数调用,并行远程调用,以及pipeline函数。
并行函数调用
DolphinDB可以将大型任务分为多个小任务同时执行。
并行函数调用通常会使用以下两个高阶函数: peach 或 ploop。peach和ploop分别是each 和 loop的并行版本。有关each和loop的区别,请参考 loop。
以下是3种并行函数调用的场景:
(1) 相同的函数,但是多组不同参数。
$ peach(log, (1..3, 4..6));
#0 |
#1 |
---|---|
0 |
1.386294 |
0.693147 |
1.609438 |
1.098612 |
1.791759 |
$ ploop(log, (1..3, 4..6));
$ ([0,0.693147,1.098612],[1.386294,1.609438,1.791759])
(2) 不同函数,但参数相同。
$ peach(call{, 3 4 5}, (log, sum));
log |
sum |
---|---|
1.098612 |
12 |
1.386294 |
12 |
1.609438 |
12 |
$ ploop(call{, 3 4 5}, (log, sum));
$ ([1.098612,1.386294,1.609438],12)
注意在上面的例子中,我们不能写成peach((log, sum), 3 4 5)。这是因为高阶函数的第一个参数必须是函数名,而不是函数名数组。要在peach或ploop中调用多个函数,我们需要使用高阶函数 call.
(3) 不同的函数和不同的参数。
$ x=[log, exp];
$ y=[1 2 3, 4 5 6];
$ peach(call, x, y);
#0 |
#1 |
---|---|
0 |
54.59815 |
0.693147 |
148.413159 |
1.098612 |
403.428793 |
$ ploop(call, x, y);
$ ([0,0.693147,1.098612],[54.59815,148.413159,403.428793])
DolphinDB通过多线程支持并行计算。假设有n个任务和m个本地执行线程。(关于本地执行线程,请参考分布式计算的概念)。调用工作线程生成n个子任务并将n*m / (1+m)个子任务推送到本地执行线程的任务队列。剩余的n / (1+m) 个子任务将由工作线程执行。在执行n个子任务之后,工作线程将各个结果合并以产生最终结果。
要使用并行函数调用,我们需要确保配置文件里设置的本地执行线程数是一个正整数。
如果在子任务中启动并行函数调用,系统会抛出异常,因为它可能会导致死锁。如果我们在子任务中启动并行函数调用,则系统会将这些新的子任务分配给本地执行线程,但是当我们首次启动并行函数调用时(当n>1+m时),所有的本地执行线程都被分配了子任务。由于本地执行线程一次只能处理一个任务,所以我们可能会遇到本地执行线程有自相矛盾的工作流优先级,从而无法执行子任务。
如果系统配置文件中的本地执行线程的数量设置为正整数,则某些内置函数和命令已启用并行函数调用,例如: peach, ploop, pnodeRun, ploadText 和 loadText。
远程函数调用
远程函数调用是分布式系统常用的功能之一。DolphinDB的远程函数调用最强大的功能是,我们可以把调用了其他本地函数(内置函数或用户定义函数)的本地函数发送到远程节点即时运行,无需编译或者部署。系统自动把函数定义和所有相关函数的定义以及所需的本地数据序列化,并发送到远程节点。其他一些系统不能远程调用与用户定义函数相关的函数。
我们可以通过 xdb 函数创建节点之间的连接。如果要关闭连接,可以使用以下三种方法:
(1) 调用 close 函数。
(2) 将另一个对象(通常为NULL)绑定到当前连接句柄。
(3) 连接将在当前会话关闭时自动关闭。
我们可以使用 remoteRun / remoteRunWithCompression 和 rpc 函数来远程调用函数。它们的区别是:
与 remoteRun 相比,remoteRunWithCompression 对传输数据进行了压缩。
rpc 利用集群中数据节点/计算节点之间现有的异步连接;remoteRun / remoteRunWithCompression 使用 xdb 函数创建的显式连接。
rpc 的调用节点和远程节点必须在同一集群;remoteRun / remoteRunWithCompression 没有这样的限制。
以下是3种类型的远程调用:
(1) 在远程节点上执行脚本。
语法: remoteRun(conn, “script”) / remoteRunWithCompression(conn, “script”) 或 conn(“script”)
脚本要用双引号,也就是字符串形式。
$ conn = xdb("localhost",81);
$ remoteRun(conn, "x=rand(1.0,10000000); y=x pow 2; avg y");
0.333254
(2) 在远程节点上执行远程函数。该函数在远程节点上定义,而参数位于本地节点上。
语法: remoteRun(conn, “functionName”, param1, param2, …) / remoteRunWithCompression(conn, “functionName”, param1, param2, …) 或 conn(“function name”, param1, param2, …)
functionName 指定的函数可以是内置的或用户自定义的函数,且需要使用反引号,双引号或单引号。
$ conn = xdb("localhost",81);
$ remoteRun(conn, "avg", rand(1.0,10000000) pow 2);
0.333446
(3) 在远程节点上执行本地函数。本地函数是指在本地节点中定义,可以是内置函数或用户定义的函数,也可以是命名函数或匿名函数。该函数的参数也位于本地节点上。
DolphinDB 的远程函数调用最强大的功能是,可以把调用了其他本地函数(内置函数或用户定义函数)的本地函数发送到远程节点即时运行,无需编译或者部署。系统自动把函数定义和所有相关函数的定义以及所需的本地数据序列化,并发送到远程节点。其他一些系统不能远程调用与用户定义函数相关的函数。
语法:remoteRun(conn, functionName, param1, param2, …) / remoteRunWithCompression(conn, functionName, param1, param2, …) 或 conn(functionName, param1, param2, …)
functionName 指定的函数可以是内置函数或调用节点上的用户定义函数。
语法: rpc(nodeAlias, functionName, param1, param2, …)
functionName 指定的函数可以是内置函数或调用节点上的用户定义函数。
例子1:远程调用用户定义函数操作本地数据集
假设本地节点有一个表 EarningsDates ,表有2列:股票代码和日期。表中的每一个股票都对应2006年第三季度公布盈利的日期。IP地址为”localhost”,端口号为8081的远程节点上有一个表 USPrices ,该表包含了美国所有股票每天的价格。我们希望在宣布收益后的一周内,从远程节点获取 EarningsDates 表中的所有股票的价格。
在远程节点,我们导入数据文件创建表 USPrices ,然后在所有会话上共享为 sharedUSPrices :
$ USPrices = loadText("c:/DolphinDB/Data/USPrices.csv");
$ share USPrices as sharedUSPrices;
当我们创建远程节点的连接时,远程节点将为此连接创建一个新的会话。此新会话与远程节点上的其他会话完全隔离。这对于开发来说很方便,因为开发人员不必担心名称冲突。然而,在这种情况下,我们希望在同一节点上的多个会话之间共享数据。我们可以使用命令 share 来共享对象。目前只能在DolphinDB中共享表。
在本地节点,我们创建一个表EarningsDates,并将该表与脚本发送到远程节点。执行完毕后,结果将返回到本地节点。
$ EarningsDates=table(`XOM`AAPL`IBM as TICKER, 2006.10.26 2006.10.19 2006.10.17 as date)
$ def loadDailyPrice(data){
$ dateDict = dict(data.TICKER, data.date)
$ return select date, TICKER, PRC from objByName("sharedUSPrices") where dateDict[TICKER]<date<=dateDict[TICKER]+7
$ }
$ conn = xdb("localhost",8081)
$ prices = conn(loadDailyPrice, EarningsDates);
$ prices;
date |
TICKER |
PRC |
---|---|---|
2006.10.27 |
XOM |
71.46 |
2006.10.30 |
XOM |
70.84 |
2006.10.31 |
XOM |
71.42 |
2006.11.01 |
XOM |
71.06 |
2006.11.02 |
XOM |
71.19 |
2006.10.18 |
IBM |
89.82 |
2006.10.19 |
IBM |
89.86 |
2006.10.20 |
IBM |
90.48 |
2006.10.23 |
IBM |
91.56 |
2006.10.24 |
IBM |
91.49 |
2006.10.20 |
AAPL |
79.95 |
2006.10.23 |
AAPL |
81.46 |
2006.10.24 |
AAPL |
81.05 |
2006.10.25 |
AAPL |
81.68 |
2006.10.26 |
AAPL |
82.19 |
例子2:远程调用自定义函数
$ def jobDemo(n){
$ s = 0
$ for (x in 1 : n) {
$ s += sum(sin rand(1.0, 100000000)-0.5)
$ print("iteration " + x + " " + s)
$ }
$ return s
$ };
使用 remoteRun 函数远程调用:
$ conn = xdb("DFS_NODE2")
$ conn.remoteRun(submitJob, "jobDemo", "job demo", jobDemo, 10);
Output: jobDemo4
$ conn.remoteRun(getJobReturn, "jobDemo")
Output: 4238.832005
使用 rpc 函数远程调用:
在使用remoteRun的时候,需要注意死锁的问题。比如下面这个例子,当前节点为localhost:8080,执行以下脚本。
$ def testRemoteCall() {
$ h=xdb("localhost", 8080)
$ return h.remoteRun("1+2")
$ }
$ h = xdb("localhost", 8081)
$ h.remoteRun(testRemoteCall)
端口为8080的节点会把本地定义的函数testRemoteCall发送到端口号为8081的节点,端口号为8081的节点会把”1+2”的脚本发送到端口号为8080的节点执行。当节点接收到一个作业时,它会安排工作线程来执行这个作业。从8080到8081的远程调用和”1+2”脚本都在端口号为8080的节点上执行,这可能会安排同一个工作线程来执行。如果这两个作业共用同一个工作线程,就会发生死锁。
并行远程调用
如果远程调用处于阻塞模式,那么直到远程节点完成函数调用,结果才会返回。并行远程调用可以解决这个问题。并行远程调用需要 remoteRun 与 ploop 或 peach 一起使用。在以下示例中,高阶函数 each 在端口号为8081的节点上执行用户定义函数 experimentPi ,然后在端口号为8082的节点上执行;而高阶函数 peach 在端口号分别为8081和8082的节点上同时执行函数 experimentPi 。peach 比 each 节省相当多的时间。
$ def simuPi(n){
$ x=rand(1.0, n)
$ y=rand(1.0, n)
$ return 4.0 * sum(x*x + y*y<=1.0) / n
$ }
$ def experimentPi(repeat, n): avg each(simuPi, take(n, repeat));
$ // 创建两个连接
$ conns = each(xdb, "localhost", 8081 8082);
$ conns;
$ ("Conn[localhost:8081:1166953221]","Conn[localhost:8082:1166953221]")
$ timer result = each(remoteRun{, experimentPi, 10, 10000000}, conns);
Time elapsed: 6579.82 ms
$ timer result = peach(remoteRun{, experimentPi, 10, 10000000}, conns);
Time elapsed: 4122.29 ms
// 并行计算节省更多时间
$ print avg(result)
3.141691
// 关闭两个连接
$ each(close, conns);
在并行远程调用中使用 remoteRun ,我们需要使用 xdb 函数与每个远程节点建立连接。我们可以使用 pnodeRun 远程调用同一集群的节点。
语法:pnodeRun(function, [nodes], [addNodeToResult])
function:调用的本地函数,可以是内置函数或自定义函数。它必须是无参数的,可以是一个没有定义参数的函数,也可以是没有参数的封装了初始函数和参数的部分应用。
nodes:节点的别名。它是可选参数。如果没有指定,系统会调用集群中所有的活数据节点/计算节点。
addNodeToResult:是否在结果中加入节点的别名。它是可选择的。默认值是true。如果返回的结果已经包含节点的别名,我们可以把它设置成false。
pnodeRun 在多个节点上并行调用本地函数,然后把结果整合。
在下列例子中,我们把函数*sum*和参数1..10封装成了部分应用sum{1..10}。
$ pnodeRun(sum{1..10}, `nodeA`nodeB);
$ Output:
$ Node Value
$ DFS_NODE2 55
$ DFS_NODE3 55
pnodeRun 对于集群管理极其方便。例如,在一个集群中有4个节点:”DFS_NODE1”, “DFS_NODE2”, “DFS_NODE3”和”DFS_NODE4”。在每个节点上执行以下脚本:
$ def jobDemo(n){
$ s = 0
$ for (x in 1 : n) {
$ s += sum(sin rand(1.0, 100000000)-0.5)
$ print("iteration " + x + " " + s)
$ }
$ return s
$ };
$ submitJob("jobDemo1","job demo", jobDemo, 10);
$ submitJob("jobDemo2","job demo", jobDemo, 10);
$ submitJob("jobDemo3","job demo", jobDemo, 10);
查看集群中每个节点最近完成的两个批处理作业的状态:
$ pnodeRun(getRecentJobs{2});
Node |
UserID |
JobID |
JobDesc |
ReceivedTime |
StartTime |
EndTime |
ErrorMsg |
---|---|---|---|---|---|---|---|
DFS_NODE4 |
root |
jobDemo2 |
job demo |
2017.11.21T15:40:22.026 |
2017.11.21T15:40:22.027 |
2017.11.21T15:40:43.103 |
|
DFS_NODE4 |
root |
jobDemo3 |
job demo |
2017.11.21T15:40:22.027 |
2017.11.21T15:40:22.037 |
2017.11.21T15:40:43.115 |
|
DFS_NODE1 |
root |
jobDemo2 |
job demo |
2017.11.21T15:39:48.087 |
2017.11.21T15:39:48.088 |
2017.11.21T15:40:03.714 |
|
DFS_NODE1 |
root |
jobDemo3 |
job demo |
2017.11.21T15:39:48.088 |
2017.11.21T15:39:48.089 |
2017.11.21T15:40:03.767 |
|
DFS_NODE2 |
root |
jobDemo2 |
job demo |
2017.11.21T15:39:58.788 |
2017.11.21T15:39:58.788 |
2017.11.21T15:40:14.114 |
|
DFS_NODE2 |
root |
jobDemo3 |
job demo |
2017.11.21T15:39:58.788 |
2017.11.21T15:39:58.791 |
2017.11.21T15:40:14.178 |
|
DFS_NODE3 |
root |
jobDemo2 |
job demo |
2017.11.21T15:40:16.945 |
2017.11.21T15:40:16.945 |
2017.11.21T15:40:38.466 |
|
DFS_NODE3 |
root |
jobDemo3 |
job demo |
2017.11.21T15:40:16.945 |
2017.11.21T15:40:16.947 |
2017.11.21T15:40:38.789 |
$ pnodeRun(getRecentJobs{2}, `DFS_NODE3`DFS_NODE4);
Node |
UserID |
JobID |
JobDesc |
ReceivedTime |
StartTime |
EndTime |
ErrorMsg |
---|---|---|---|---|---|---|---|
DFS_NODE3 |
root |
jobDemo2 |
job demo |
2017.11.21T15:40:16.945 |
2017.11.21T15:40:16.945 |
2017.11.21T15:40:38.466 |
|
DFS_NODE3 |
root |
jobDemo3 |
job demo |
2017.11.21T15:40:16.945 |
2017.11.21T15:40:16.947 |
2017.11.21T15:40:38.789 |
|
DFS_NODE4 |
root |
jobDemo2 |
job demo |
2017.11.21T15:40:22.026 |
2017.11.21T15:40:22.027 |
2017.11.21T15:40:43.103 |
|
DFS_NODE4 |
root |
jobDemo3 |
job demo |
2017.11.21T15:40:22.027 |
2017.11.21T15:40:22.037 |
2017.11.21T15:40:43.115 |
pnodeRun 合并多个节点的结果时遵循以下规则:
(1) 如果函数返回一个标量:
返回一个表,它具有两列:节点别名和函数结果。
紧接上面的例子:
$ pnodeRun(getJobReturn{`jobDemo1})
Output:
Node Value
DFS_NODE3 2,123.5508
DFS_NODE2 (42,883.5404)
DFS_NODE1 3,337.4107
DFS_NODE4 (2,267.3681)
(2) 如果函数返回一个向量:
返回一个矩阵。矩阵中的每一列是函数在节点上返回的结果。矩阵的列标签是节点。
(3) 如果函数返回键-值形式的字典:
返回一个表,每行代表函数在一个节点上的结果。
(4) 如果函数返回一个表:
返回一个表,它是多个节点上的表的合并。
(5) 如果函数一个命令(该命令不返回任何内容)
不返回任何内容。
(6) 对于其他情况:
返回一个字典。键是节点别名,值是函数的返回内容。
分布式计算
DolphinDB提供了mr和imr函数,用户只需要指定分布式数据源和核心函数即可编写自己的分布式应用。
数据源
数据源是一种特殊类型的数据对象,包含有关数据实体的以下信息:
1. 数据实体的元描述。通过执行数据源,我们可以获得诸如表,矩阵,向量等数据实体。在DolphinDB的分布式计算框架中,轻量级数据源对象而不是大数据实体被传递到远程站点进行计算,这大大减少了网络流量。
2. 执行的位置。数据源可以有0,1或多个位置。位置为0的数据源是本地数据源。在多个位置的情况下,这些位置互为备份。系统会随机选择一个位置执行分布式计算。当数据源被指示缓存数据对象时,系统会选择我们上次成功检索数据的位置。
3. 指示系统缓存数据或清除缓存的属性。对于迭代计算算法(例如机器学习算法),数据缓存可以大大提高计算性能。当系统内存不足时,缓存数据将被清除。如果发生这种情况,系统可以恢复数据,因为数据源包含所有元描述和数据转换函数。
4. 一个数据源对象还可以包含多个数据转换函数,用以进一步处理所检索到的数据。系统会依次执行这些数据转换函数,一个函数的输出作为下一个函数的输入(和唯一的输入)。将数据转换函数包含在数据源中通常比包含核心计算操作在数据源更有效。虽然如果检索到的数据仅需要一次的时候没有性能差异,但它对于具有缓存数据对象的数据源的迭代计算会造成巨大的差异。如果转换操作在核心计算单元中,则每次迭代都需要执行转换; 如果转换操作在数据源中,则它们只被执行一次。
相关函数/命令
1. 函数 sqlDS 根据输入的SQL元代码创建一个数据源列表。如果SQL查询中的数据表有n个分区,sqlDS会生成n个数据源。如果SQL查询不包含任何分区表,sqlDS将返回一个包含数据源的元组。
语法: sqlDS(metaCode)
参数是SQL元代码。有关元代码的更多详细信息,请参考 元编程。
$ db = database("dfs://DBSeq",RANGE,`A`F`Z);
$ USPrices = loadTextEx(db, "USPrices",`TICKER ,"/home/DolphinDB/Data/USPrices.csv");
$ ds = sqlDS(<select log(SHROUT*(BID+ASK)/2) as SBA from USPrices where VOL>0>);
$ typestr ds;
ANY VECTOR
$ size ds;
2
$ ds[0];
DataSource< select [15] log(SHROUT * (BID + ASK) / 2) as SBA from USPrices where VOL > 0 [partition = /DBSeq/A_F/40r] >
$ ds[1];
DataSource< select [15] log(SHROUT * (BID + ASK) / 2) as SBA from USPrices where VOL > 0 [partition = /DBSeq/F_Z/40r] >
2. 函数 transDS! 将数据转换函数添加到数据源或数据源列表。
语法: transDS!(ds, func)
3. 函数 cacheDS! 指示系统缓存数据源。它返回true或false表示此操作是否成功。
语法: cacheDS!(ds)
4. 函数 clearDSCache! 指示系统在下次执行数据源之后清除缓存。
语法: clearDSCache!(ds)
5. 函数 cacheDSNow 立即执行并缓存数据源,并返回缓存行的总数。
语法: cacheDSNow(ds)
6. 函数 clearDSCacheNow 立即清除数据源和缓存。
语法: clearDSCacheNow(ds)
Map-Reduce
Map-Reduce函数是DolphinDB通用分布式计算框架的核心功能。
语法: mr(ds, mapFunc, [reduceFunc], [finalFunc], [parallel=true])
ds: 数据源列表。该必需参数必须是元组,元组的每个元素都是数据源对象。即使只有一个数据源,我们仍然需要一个元组来包装数据源。
mapFunc: map函数。它只接受一个参数,它是相应数据源的物化数据实体。如果我们希望map函数接受除了物化数据源之外的更多参数,我们可以使用 部分应用 将多参数函数转换为一个参数的函数。map函数调用的次数是数据源的数量。map函数返回一个常规对象(标量,对,数组,矩阵,表,集合或字典)或一个元组(包含多个常规对象)。
reduceFunc: 二元reduce函数组合了两个map函数的调用结果。在大多数情况下,reduce函数不重要。一个例子是加法函数,reduce函数是可选的。如果没有指定reduce函数,则系统将所有单独的map调用结果返回到最终函数。
finalFunc: final函数,只接受一个参数。该函数的输入是最后一个reduce函数的输出。如果未指定,系统将返回所有map函数调用结果。
parallel: 指示是否在本地并行执行map函数的可选布尔标志。默认值为true,即启用并行计算。当有非常有限的可用内存和每个map调用需要大量的内存时,我们可以禁用并行计算以防止内存不足问题。我们也可能要禁用并行选项以确保线程安全。例如,如果多个线程同时写入同一个文件,则可能会发生错误。
以下是分布式线性回归的示例。X是自变量的矩阵,y是因变量。X和Y存储在多个数据源中。为了估计最小二乘参数,我们需要计算X T X和X T y. 我们可以从每个数据源计算(X T X, X T y)的元组,然后将所有数据源的结果相加,以获得整个数据集的X T X和X T y.
$ def myOLSMap(table, yColName, xColNames, intercept){
$ if(intercept)
$ x = matrix(take(1.0, table.rows()), table[xColNames])
$ else
$ x = matrix(table[xColNames])
$ xt = x.transpose()
$ return xt.dot(x), xt.dot(table[yColName])
$ }
$ def myOLSFinal(result){
$ xtx = result[0]
$ xty = result[1]
$ return xtx.inv().dot(xty)[0]
$ }
$ def myOLSEx(ds, yColName, xColNames, intercept){
$ return mr(ds, myOLSMap{, yColName, xColNames, intercept}, +, myOLSFinal)
$ }
在上面的例子中,我们定义了map函数和final函数。实践中,我们也可为数据源定义转换函数。这些功能仅需在本地实例中定义,用户不需要编译它们或将其部署到远程实例。DolphinDB的分布式计算框架可以为最终用户快速处理这些复杂的问题。
作为经常使用的分析工具,分布式最小二乘线性回归已经在我们的核心库中实现。内置版本( olsEx)提供更多功能。
迭代计算
迭代计算是一种常用的计算方法。许多机器学习方法和统计模型使用迭代算法来估计模型参数。
DolphinDB提供了基于Map-Reduce函数的迭代计算函数 imr. 每次迭代使用上一次迭代的结果和输入数据集。每次迭代的输入数据集不变,因此可以缓存。迭代计算需要模型参数的初始值和终止标准。
语法: imr(ds, initValue, mapFunc, [reduceFunc], [finalFunc], terminateFunc, [carryover=false])
ds: 数据源列表。它必须是每个元素为数据源对象的元组。即使只有一个数据源,我们仍然需要一个元组来包装数据源。在迭代计算中,数据源自动缓存,缓存将在最后一次迭代后被清除。
initValue: 模型参数估计的初始值。初始值的格式必须与最终函数的输出相同。
mapFunc: map函数。它有两个参数。第一个参数是由相应数据源表示的数据实体。第二个参数是前一次迭代中最终函数的输出,这是对模型参数的更新估算。对于第一次迭代,它是用户给出的初始值。
reduceFunc: 二元reduce函数组合了两个一元函数调用的结果。如果有M个map调用,reduce函数将被调用M-1次。在大多数情况下,reduce函数不重要。一个例子是加法函数。reduce函数是可选的。
finalFunc: 每次迭代的最终函数。它接受两个参数。第一个参数是前一次迭代中最终函数的输出。对于第一次迭代,它是用户给出的初始值。第二个参数是调用reduce函数后的输出。如果没有指定reduce函数,第二个参数是各个map调用结果组成的元组。
terminateFunc: 这是一个确定计算是否继续的函数,或是迭代的指定次数。终止函数接受两个参数。第一个是前一次迭代中reduce函数的输出,第二个是当前迭代中reduce函数的输出。如果函数返回true,迭代将结束。
carryover:布尔值,表示map函数调用是否生成一个传递给下一次map函数调用的对象。默认值为false。如果carryover为true,那么map函数有3个参数并且最后一个参数为携带的对象,同时map函数的输出结果是一个元组,最后一个元素为携带的对象。在第一次迭代中,携带的对象为NULL。
现在我们使用分布式中位数计算的例子来说明函数imr。假设数据分布在多个节点上,我们想计算所有节点之间的变量的中位数。首先,把每个数据源的数据放入不同的桶中,并使用map函数对每个数据桶中的数据进行计数。然后使用reduce函数合并来自多个数据源的桶的计数,最后找到包含中位数的桶。在下一次迭代中,所选择的桶被分为更小的桶。当所选择的桶的长度不超过指定的数量时,迭代完成。
$ def medMap(data, range, colName){
$ return bucketCount(data[colName], double(range), 1024, true)
$ }
$ def medFinal(range, result){
$ x= result.cumsum()
$ index = x.asof(x[1025]/2.0)
$ ranges = range[1] - range[0]
$ if(index == -1)
$ return (range[0] - ranges*32):range[1]
$ else if(index == 1024)
$ return range[0]:(range[1] + ranges*32)
$ else{
$ interval = ranges / 1024.0
$ startValue = range[0] + (index - 1) * interval
$ return startValue : (startValue + interval)
$ }
$ }
$ def medEx(ds, colName, range, precision){
$ termFunc = def(prev, cur): cur[1] - cur[0] <= precision
$ return imr(ds, range, medMap{,,colName}, +, medFinal, termFunc).avg()
$ }
pipeline函数
pipeline 函数通过多线程优化符合如下条件的任务:
(1) 可分解为多个子任务。
(2) 每个子任务包含多个步骤。
(3) 第i个子任务的第k个步骤必须在第i个子任务的第k-1个步骤以及第i-1个子任务的第k个步骤完成后才能执行。
下例中,需要把分区表stockData转换成一个csv文件。该表包含了2008年到2018年的数据,超过了系统的可用内存,因此不能把整个表加载到内存后,再转换成csv文件。可把任务分为多个子任务,每个子任务包含两个步骤:加载一个月的数据到内存,然后将这些数据存储到csv文件中。每个月的数据存储到csv文件中时,必须保证该月数据已加载到内存,并且上个月的数据已经存储到csv文件中。
$ v = 2000.01M..2018.12M
$ def queryData(m){
$ return select * from loadTable("dfs://stockDB", "stockData") where TradingTime between datetime(date(m)) : datetime(date(m+1))
$ }
$ def saveData(tb){
$ tb.saveText("/hdd/hdd0/data/stockData.csv",',', true)
$ }
$ pipeline(each(partial{queryData}, v),saveData);