DolphinDB插件开发教程

本页是一篇关于 DolphinDB 插件开发的教程文章,并包含作者与发布日期等基础信息。

Source: https://dolphindb.cn/blogs/57

What this page covers

技能认证特训营第二期正式开启(限时报名)

页面顶部包含一段限时报名相关的推广式行动号召内容。

DolphinDB插件开发教程

该部分给出文章标题,以及作者与发布日期等元信息。

教程概述与覆盖的插件开发场景

该部分介绍 DolphinDB 的插件支持方式,并列出教程覆盖的插件开发应用场景。

1. 如何开发插件

该部分讲解插件开发的基础概念与步骤,包括函数类型、变量创建与管理、异常处理,以及在插件中调用内置函数的方法。

2. 如何开发支持时间序列数据处理的插件函数

该部分通过 msum 示例展示时间序列窗口计算的实现方式,强调向量缓冲访问与 NULL 处理等要点。

3. 如何开发用于处理分布式SQL的聚合函数

该部分介绍分布式 SQL 聚合函数插件的实现思路,包括向量存储模式差异、访问方式选择,以及通过脚本与 MapReduce 风格方式组织分布式版本。

3.3 随机访问大数组

该部分说明如何对 big array 使用位运算计算段偏移与段内偏移,从而实现随机访问。

3.4 应该选择哪种方式访问向量

该部分给出向量访问方式选择建议:缓冲式访问更通用;当存储模式与类型明确时可在特定场景使用直接底层访问。

4. 如何开发支持新的分布式算法的插件函数

该部分展示如何用 C++ 编写 map/reduce/final 等自定义函数,并调用 DolphinDB 的 mr/imr 来实现分布式算法(列平均值示例)。

5. 如何开发支持流数据处理的插件函数

该部分给出流订阅处理函数(handler)插件示例:从订阅消息中按索引提取字段并写入表,并说明 append 接口与可选错误日志配置。

6. 如何开发支持外部数据源的插件函数

该部分讨论外部数据源插件的关键问题(数据源、schema、IO),并以二进制平面文件为例,覆盖 schema 提取、数据加载、分批导入与数据源生成等内容。

Facts Index

Entity Attribute Value Confidence
DolphinDB插件开发教程publication_date2021-08-05high
DolphinDB插件开发教程authorJunxihigh
DolphinDBsupportsDynamic loading of external plugins to extend system functionalityhigh
DolphinDB pluginimplementation_languageC++high
DolphinDB plugincompiled_artifactShared library file: .so or .dllhigh
Tutorial scopeincludesDeveloping plugin functions for time-series data processinghigh
Tutorial scopeincludesDeveloping aggregate functions for distributed SQL processinghigh
Tutorial scopeincludesDeveloping plugin functions supporting new distributed algorithmshigh
Tutorial scopeincludesDeveloping plugin functions for stream data processinghigh
Tutorial scopeincludesDeveloping plugin functions supporting external data sourceshigh
DolphinDB plugin functioncallable_inCan be called from scriptshigh
Operator function (plugin)parameter_count_limitAccepts <= 2 parametershigh
System function (plugin)parameter_count_limitCan accept any number of parameters and supports session access operationshigh
Operator function (plugin)C++_prototypeConstantSP (const ConstantSP& a, const ConstantSP& b)high
Operator function (plugin)parameter_semanticsWhen 2 params: a=first arg, b=second arg; when 1 param: b is placeholder; when 0 params: both are placeholdershigh
System function (plugin)C++_prototypeConstantSP (Heap* heap, vector<ConstantSP>& args)high
System function (plugin)argument_passingUser-passed arguments are stored in order in C++ vector args; heap is not passed by userhigh
ConstantSPrepresentsMost DolphinDB objects (scalar, vector, matrix, table, etc.)high
VectorSP / TableSPrelationship_to_ConstantSPCommon derived types from ConstantSP (vector/table types)high
ConstantSPmemory_managementEncapsulated smart pointer that auto-frees memory when reference count reaches 0; users need not manually delete created variableshigh
Scalar creation in plugin C++methodUse new to create types declared in ScalarImp.h and assign to ConstantSP (examples: Int, Date, String, Void)high
Util.h helper functionspurposeProvide functions to quickly create variables of certain types and formats (vectors, tuples, index vectors, matrices)high
Plugin exception handlingmechanismUse throw and try like standard C++; DolphinDB exception types declared in Exceptions.hhigh
Plugin runtime errorrecommended_exceptionRuntimeExceptionhigh
Plugin parameter validation failurerecommended_exceptionIllegalArgumentExceptionhigh
ConstantSP parameter checksexamplesgetType, getCategory, getForm, isVector, isScalar, isTable, isNumber, isNull, getInt, getString, sizehigh
Parameter validation functionslocationMore functions are generally in the Constant class methods in CoreConcept.hhigh
Factorial example plugin functionproblem_definitionCompute factorial for non-negative integers and return a long type valuehigh
DolphinDB long typemax_value2^63 - 1high
Factorial in long typemax_supported_factorial25!high
Factorial example plugin functionvalid_input_range0 to 25 (inclusive); input must be non-negative integer less than 26high
Calling built-in functions from pluginsnoteSome classes define common built-in functions as methods (examples: avg, sum2, sort)high
Calling other built-in functions from pluginsfunction_type_requirementPlugin function must be a system functionhigh
Accessing built-in function definitionsmethodheap->currentSession()->getFunctionDef to obtain a built-in function; then call ithigh
FunctionDef call signature selectionruleIf built-in is operator function: call(Heap, const ConstantSP&, const ConstantSP&); if system function: call(Heap, vector<ConstantSP>&)high
Temporary flag for vectorspurposev->setTemporary(false) used when value might be modified during built-in call and you do not want it modifiedhigh
DolphinDBtime_series_supportHas good support for time seriesmedium
msum plugin examplefunction_signatureConstantSP msum(const ConstantSP &X, const ConstantSP &window)high
msum plugin examplebehaviorTakes two parameters (a vector and window size) and returns a vector of same length as inputhigh
msum plugin examplereturn_type_assumptionAssumes return vector is DT_DOUBLE for simplicity and pre-allocates via Util::createVector(DT_DOUBLE, size)high
Vector access pattern in pluginsperformance_guidanceBatch reading with getDoubleConst/getIntConst etc. into buffers is more efficient than looping with getDouble/getInthigh
msum plugin examplebuffering_strategyUses getDoubleConst in chunks of Util::BUF_SIZE returning const double*high
Result vector writing strategymethodUse result->getDoubleBuffer for writable buffer; then result->setDouble to write back; if buffer address matches underlying storage, copy can be avoidedhigh
DolphinDB double NULL representationnull_valueUses minimum double value macro DBL_NMIN to represent NULL; must be specially checkedhigh
msum plugin examplenull_output_ruleFirst windowSize - 1 elements of return are NULL; msum uses two loops (initial accumulate then sliding add/subtract)high
Distributed SQL aggregate functions in DolphinDBtypical_ioTypically accept one or more vectors and ultimately return a scalarhigh
DolphinDB vector storagemodesTwo storage modes: regular array (contiguous in memory) and big array (segmented/chunked)high
isFastModepurposeUsed to distinguish regular array vs big array for a vectorhigh
Regular array vector accessmethodUse getDataArray to obtain pointer to underlying data for fast mode (example casts to double*)high
Big array vector accessmethodUse getSegmentSize and getDataSegment; getDataSegment returns pointer-to-pointer where each entry points to a segment blockhigh
Geometric mean exampleformulageometricMean([x1..xn]) = exp((log(x1)+...+log(xn))/n)high
Distributed geometric mean implementation approachcomponentsImplement aggregate plugin logSum for partition log-sum; then define Reduce via defg and MapReduce via maprhigh
Underlying storage type in practicenoteVector underlying storage may not be double; developers need to consider actual types; example uses generic programming (code in attachment)medium
DolphinDB function selection (distributed vs non-distributed)behaviorTypically implement both non-distributed and distributed versions; system chooses the more efficient version to callhigh
Non-distributed geometricMean (script)definitiondef geometricMean(x) { return exp(logSum::logSum(x) \\ count(x)) }high
Distributed geometricMean (script)definitiondef geometricMeanMap(x) { return logSum::logSum(x) } defg geometricMeanReduce(myLogSum, myCount) { return exp(sum(myLogSum) \\ sum(myCount)) } mapr geometricMean(x) { geometricMeanMap(x), count(x) -> geometricMeanReduce }high
Plugin deployment in cluster for remote dataruleIf data resides on remote nodes, plugin must be loaded on each remote node; can manually call loadPlugin on each node or use each(rpc{, loadPlugin, pathToPlugin}, getDataNodes())high
Big array random accessmethodUse getSegmentSizeInBit and bit operations to compute segment offset and in-segment offset (index >> bits, index & mask)high
Vector access method choiceguidanceBuffered access (get*Const/get*Buffer) is more general and usually preferred; direct storage access (getDataArray/getDataSegment) can be used in special cases when storage mode and type are knownhigh
DolphinDB distributed algorithm frameworkdescriptionMap-Reduce is a general framework; DolphinDB provides mr and imr; plugin-based distributed algorithms also use these functionshigh
Distributed algorithm plugin example (chapter 4)goalCompute average of all columns with specified column names in a distributed table (using mr)high
Custom functions in distributed algorithm pluginstypes_allowedUser-defined map/reduce/final/term functions can be operator functions or system functionshigh
columnAvgMap (example)behaviorFor a table partition, sums specified columns and counts non-null elements; returns a length-2 tuple [sum, count]high
Reduce function in columnAvg exampleimplementationUses built-in add function obtained via heap->currentSession()->getFunctionDef("add")high
columnAvgFinal (example)behaviorComputes sum/count from reduce result tuple to output averagehigh
Exporting plugin functionsstepsAdd extern "C" to function declarations in header and list functions in the plugin load text file; then they can be obtained via heap->currentSession->getFunctionDef for use as parameters to mrmedium
mr map function arityconstraintmr only allows map function with one parameter; if map needs more, use partial applicationhigh
Partial application in columnAvg examplemethodWrap mapFunc with Util::createPartialFunction and args {new Void(), colNames} to form columnAvgMap{, colNames}high
columnAvg (example)mr_call_structureCalls mr(ds, columnAvgMap{, colNames}, add, columnAvgFinal) by constructing mrArgs = {ds, mapWithColNames, reduceFunc, finalFunc} and invoking mr->call(heap, mrArgs)high
Plugin deployment in cluster for remote data (chapter 4)ruleIf data is on remote nodes, load plugin on each remote node; can use each(rpc{, loadPlugin, pathToPlugin}, getDataNodes())high
columnAvg usage (script)procedureGenerate data source via sqlDS(<select * from t>) after creating and filling a partitioned table; call columnAvg::columnAvg(ds, `v1`v2)high
Stream subscription handler in DolphinDBdescriptionSubscriber processes received data via a handler function; subscription message can be table or tuple depending on subscribeTable msgAsTable parameterhigh
Streaming handler plugin exampleinputsAccepts msg as tuple plus indices (int scalar or vector) and a table; inserts tuple elements at specified indices into the tablehigh
Table append interfacesignature_and_behaviorbool append(vector<ConstantSP>& values, INDEX& insertedRows, string& errMsg): returns true on success and writes inserted row count; else returns false and writes error messagehigh
Logging append failures in handler examplebuild_flagInclude Logger.h and compile plugin with macro -DLOGGING_LEVEL_2 to log errors (LOG_ERR)high
External data source plugin design considerationstopicsData source object, schema extraction, and IO bottlenecks/IO interfaceshigh
Data source (definition)descriptionSpecial data object containing metadata; executing a data source yields a data entity (table/matrix/vector/etc.); can be used with distributed functions like olsEx, randomForestClassifier, mr/imr, and lower-level computing modelsmedium
sqlDS (built-in)descriptionBuilt-in function that obtains a data source from an SQL expressionhigh
Data source representation in plugin designrepresentationOften represented as a Code object that is a function call; parameters are metadata and returns a tablehigh
Schema (definition)descriptionDescribes table column count, column names, and data types; external interface should provide function to quickly obtain schema for user adjustmentshigh
DolphinDB IO abstractionsinterfacesDataInputStream and DataOutputStream abstract compression, endianness, and IO types (network/disk/buffer)high
Multi-thread IO implementationsnamesBlockFileInputStream and BlockFileOutputStreamhigh
BlockFileInputStream/OutputStream benefitsbenefitsEnable compute/IO parallelism via async prefetch; avoid multi-thread disk contention by serializing disk IO to improve throughputhigh
Flat-file binary example formatrow_layoutBinary rows start at file head; each row has 4 columns: id (signed 64-bit, 8 bytes), symbol (C string, 8 bytes), date (BCD, 8 bytes), value (IEEE 754 double, 8 bytes); 32 bytes per rowhigh
Flat-file binary example formathex_example0x 00 00 00 00 00 00 00 05 0x 49 42 4D 00 00 00 00 00 0x 02 00 01 09 00 03 01 03 0x 40 24 33 33 33 33 33 33high
extractMyDataSchema (example)behaviorCreates a schema table via Util::createTable with columns name/type for fields id, symbol, date, value with types LONG, SYMBOL, DATE, DOUBLEhigh
loadMyData (example)syntaxloadMyData(path, [start], [length])high
loadMyData (example)parameterspath plus optional int start (start row) and int length (total rows to read)high
loadMyData (example)bytes_per_row_assumptionbytesPerRow = 32high
createBlockFileInputStream usage in loadMyDatabehaviorCan specify starting byte offset and total bytes to read using start*bytesPerRow and length*bytesPerRowhigh
Binary read loop pattern (example)behaviorRead chunks with readBytes into buf until actualLength <= 0high
Example: load binary bytes into DolphinDB char vectorimplementationCreate VectorSP vec = Util::createVector(DT_CHAR, 0); appendChar(buf, actualLength) per chunk; return as single-column tablehigh
loadMyDataEx (example)purposeAvoid memory bottleneck for huge files by importing while saving into DolphinDB distributed table rather than loading all into memory firsthigh
loadMyDataEx (example)syntaxloadMyDataEx(dbHandle, tableName, partitionColumns, path, [start], [length])high
loadMyDataEx (example)behavior_existing_tableIf table exists in database, append imported data to existing table; otherwise create table then append; returns the tablehigh
loadMyDataEx (example)table_existence_checkUses existsTable(dbPath, tableName) via getFunctionDef("existsTable")->callhigh
loadMyDataEx (example)load_existing_tableIf exists, uses loadTable(db, tableName) via getFunctionDef("loadTable")->callhigh
loadMyDataEx (example)create_new_partitioned_tableIf not exists, creates schema via extractMyDataSchema, makes empty table from schema using DBFileIO::createEmptyTableFromSchema, then createPartitionedTable(db, dummyTable, tableName, partitionColumns)high
loadMyDataEx (example)pipeline_framework_usageUses Pipeline framework: tasks are loadMyData calls with different start parameters; follower is partial application append!{result} to insert chunkshigh
loadMyDataEx (example)sizePerPartition16 * 1024 * 1024high
loadMyDataEx (example)task_partitioningpartitionNum computed as fileLength / sizePerPartition; partitionLength computed as length / partitionNum with last partition adjustedhigh
Alternative frameworks note (external data import)optionsBesides Pipeline, can use StaticStageExecutor (ComputingModel.h) or thread model Thread (Concurrent.h); approach depends on scenariohigh
myDataDS (example)behaviorReturns a tuple/vector of data sources; each data source is a Code object representing a loadMyData(path, partitionStart, partitionLength) call created by Util::createRegularFunctionCall and wrapped in DataSourcehigh
myDataDS (example)sizePerPartition16 * 1024 * 1024high
Tutorial full coderepository_urlhttps://github.com/dolphindb/Tutorials_CN/tree/master/pluginhigh