DolphinDB插件开发教程
本页是一篇关于 DolphinDB 插件开发的教程文章,并包含作者与发布日期等基础信息。
Source: https://dolphindb.cn/blogs/57
What this page covers
- DolphinDB 插件支持概述与教程覆盖场景。
- 插件开发基础:函数类型、变量创建、异常处理、调用内置函数。
- 时间序列处理插件函数示例(msum)实现要点。
- 分布式 SQL 聚合函数与向量存储访问方式。
- 基于 mr/imr 的分布式算法插件示例(列均值)。
- 流订阅处理(handler)插件示例与可选日志。
- 外部数据源插件:数据源、schema、IO 与二进制平面文件示例。
技能认证特训营第二期正式开启(限时报名)
页面顶部包含一段限时报名相关的推广式行动号召内容。
- 该段落属于页面顶部的推广信息。
- 该推广信息以限时报名为核心表述。
DolphinDB插件开发教程
该部分给出文章标题,以及作者与发布日期等元信息。
- 作者信息在文中明确给出。
- 发布日期在文中明确给出。
- 文章主题为 DolphinDB 插件开发教程。
教程概述与覆盖的插件开发场景
该部分介绍 DolphinDB 的插件支持方式,并列出教程覆盖的插件开发应用场景。
- DolphinDB 支持动态加载外部插件以扩展功能。
- 插件以 C++ 实现。
- 插件编译产物为共享库文件(.so 或 .dll)。
- 教程覆盖时间序列数据处理相关插件函数开发。
- 教程覆盖分布式 SQL 聚合函数开发。
1. 如何开发插件
该部分讲解插件开发的基础概念与步骤,包括函数类型、变量创建与管理、异常处理,以及在插件中调用内置函数的方法。
- 插件函数可以从脚本中调用。
- Operator function 最多接受 2 个参数。
- System function 可接受任意数量参数,并支持会话访问操作。
- ConstantSP 是带引用计数的智能指针封装,引用计数为 0 时自动释放。
- 插件异常处理可使用 C++ 的 throw/try;异常类型在 Exceptions.h 中声明。
2. 如何开发支持时间序列数据处理的插件函数
该部分通过 msum 示例展示时间序列窗口计算的实现方式,强调向量缓冲访问与 NULL 处理等要点。
- msum 的签名为 ConstantSP msum(const ConstantSP &X, const ConstantSP &window)。
- msum 接收向量与窗口大小,并返回与输入等长的向量。
- 批量读取到缓冲区通常比逐元素 getDouble/getInt 更高效。
- DT_DOUBLE 的 NULL 使用宏 DBL_NMIN 表示,需要单独判断。
- 返回的前 windowSize-1 个元素为 NULL。
3. 如何开发用于处理分布式SQL的聚合函数
该部分介绍分布式 SQL 聚合函数插件的实现思路,包括向量存储模式差异、访问方式选择,以及通过脚本与 MapReduce 风格方式组织分布式版本。
- 分布式 SQL 聚合函数通常接收一个或多个向量,最终返回标量。
- DolphinDB 向量有 regular array 与 big array 两种存储模式。
- isFastMode 可用于区分 regular array 与 big array。
- 系统通常会同时实现分布式与非分布式版本,并选择更高效的版本调用。
- 远程节点存在数据时,插件需要在各远程节点加载。
3.3 随机访问大数组
该部分说明如何对 big array 使用位运算计算段偏移与段内偏移,从而实现随机访问。
- 随机访问 big array 需要计算 segment offset。
- 随机访问 big array 需要计算 in-segment offset。
- 该计算可通过 getSegmentSizeInBit 与位运算实现。
- 示例包含 index >> bits 与 index & mask 的用法。
3.4 应该选择哪种方式访问向量
该部分给出向量访问方式选择建议:缓冲式访问更通用;当存储模式与类型明确时可在特定场景使用直接底层访问。
- 缓冲式访问(get*Const/get*Buffer)更通用。
- 缓冲式访问通常是更推荐的默认选择。
- 直接访问(getDataArray/getDataSegment)适用于特定场景。
- 直接访问依赖对存储模式与数据类型的已知前提。
4. 如何开发支持新的分布式算法的插件函数
该部分展示如何用 C++ 编写 map/reduce/final 等自定义函数,并调用 DolphinDB 的 mr/imr 来实现分布式算法(列平均值示例)。
- Map-Reduce 是通用框架,DolphinDB 提供 mr 与 imr。
- 示例目标是计算分布式表中指定列的平均值。
- columnAvgMap 对分区数据求和并统计非空数量,返回 [sum, count] 元组。
- mr 的 map 函数仅允许一个参数;需要更多参数时可用偏函数应用。
- 数据位于远程节点时,需要在各数据节点加载插件。
5. 如何开发支持流数据处理的插件函数
该部分给出流订阅处理函数(handler)插件示例:从订阅消息中按索引提取字段并写入表,并说明 append 接口与可选错误日志配置。
- 订阅者通过 handler 函数处理接收的数据。
- 订阅消息可为 table 或 tuple,取决于 subscribeTable 的 msgAsTable 参数。
- 示例 handler 接收 msg(tuple) 与 indices,并将字段插入到 table。
- append 接口返回成功与否,并输出 insertedRows 或 errMsg。
- 如需记录 append 失败日志,可包含 Logger.h 并以 -DLOGGING_LEVEL_2 编译。
6. 如何开发支持外部数据源的插件函数
该部分讨论外部数据源插件的关键问题(数据源、schema、IO),并以二进制平面文件为例,覆盖 schema 提取、数据加载、分批导入与数据源生成等内容。
- 数据源是一种包含元数据的特殊数据对象,执行后得到表/矩阵/向量等数据实体。
- schema 描述列数、列名与数据类型,并建议提供快速获取 schema 的接口。
- DataInputStream 与 DataOutputStream 可抽象压缩、字节序与多种 IO 类型。
- 示例二进制格式每行 4 列,总计 32 字节/行。
- loadMyDataEx 示例用于避免超大文件全量载入内存的瓶颈。
Facts Index
| Entity | Attribute | Value | Confidence |
|---|---|---|---|
| DolphinDB插件开发教程 | publication_date | 2021-08-05 | high |
| DolphinDB插件开发教程 | author | Junxi | high |
| DolphinDB | supports | Dynamic loading of external plugins to extend system functionality | high |
| DolphinDB plugin | implementation_language | C++ | high |
| DolphinDB plugin | compiled_artifact | Shared library file: .so or .dll | high |
| Tutorial scope | includes | Developing plugin functions for time-series data processing | high |
| Tutorial scope | includes | Developing aggregate functions for distributed SQL processing | high |
| Tutorial scope | includes | Developing plugin functions supporting new distributed algorithms | high |
| Tutorial scope | includes | Developing plugin functions for stream data processing | high |
| Tutorial scope | includes | Developing plugin functions supporting external data sources | high |
| DolphinDB plugin function | callable_in | Can be called from scripts | high |
| Operator function (plugin) | parameter_count_limit | Accepts <= 2 parameters | high |
| System function (plugin) | parameter_count_limit | Can accept any number of parameters and supports session access operations | high |
| Operator function (plugin) | C++_prototype | ConstantSP (const ConstantSP& a, const ConstantSP& b) | high |
| Operator function (plugin) | parameter_semantics | When 2 params: a=first arg, b=second arg; when 1 param: b is placeholder; when 0 params: both are placeholders | high |
| System function (plugin) | C++_prototype | ConstantSP (Heap* heap, vector<ConstantSP>& args) | high |
| System function (plugin) | argument_passing | User-passed arguments are stored in order in C++ vector args; heap is not passed by user | high |
| ConstantSP | represents | Most DolphinDB objects (scalar, vector, matrix, table, etc.) | high |
| VectorSP / TableSP | relationship_to_ConstantSP | Common derived types from ConstantSP (vector/table types) | high |
| ConstantSP | memory_management | Encapsulated smart pointer that auto-frees memory when reference count reaches 0; users need not manually delete created variables | high |
| Scalar creation in plugin C++ | method | Use new to create types declared in ScalarImp.h and assign to ConstantSP (examples: Int, Date, String, Void) | high |
| Util.h helper functions | purpose | Provide functions to quickly create variables of certain types and formats (vectors, tuples, index vectors, matrices) | high |
| Plugin exception handling | mechanism | Use throw and try like standard C++; DolphinDB exception types declared in Exceptions.h | high |
| Plugin runtime error | recommended_exception | RuntimeException | high |
| Plugin parameter validation failure | recommended_exception | IllegalArgumentException | high |
| ConstantSP parameter checks | examples | getType, getCategory, getForm, isVector, isScalar, isTable, isNumber, isNull, getInt, getString, size | high |
| Parameter validation functions | location | More functions are generally in the Constant class methods in CoreConcept.h | high |
| Factorial example plugin function | problem_definition | Compute factorial for non-negative integers and return a long type value | high |
| DolphinDB long type | max_value | 2^63 - 1 | high |
| Factorial in long type | max_supported_factorial | 25! | high |
| Factorial example plugin function | valid_input_range | 0 to 25 (inclusive); input must be non-negative integer less than 26 | high |
| Calling built-in functions from plugins | note | Some classes define common built-in functions as methods (examples: avg, sum2, sort) | high |
| Calling other built-in functions from plugins | function_type_requirement | Plugin function must be a system function | high |
| Accessing built-in function definitions | method | heap->currentSession()->getFunctionDef to obtain a built-in function; then call it | high |
| FunctionDef call signature selection | rule | If built-in is operator function: call(Heap, const ConstantSP&, const ConstantSP&); if system function: call(Heap, vector<ConstantSP>&) | high |
| Temporary flag for vectors | purpose | v->setTemporary(false) used when value might be modified during built-in call and you do not want it modified | high |
| DolphinDB | time_series_support | Has good support for time series | medium |
| msum plugin example | function_signature | ConstantSP msum(const ConstantSP &X, const ConstantSP &window) | high |
| msum plugin example | behavior | Takes two parameters (a vector and window size) and returns a vector of same length as input | high |
| msum plugin example | return_type_assumption | Assumes return vector is DT_DOUBLE for simplicity and pre-allocates via Util::createVector(DT_DOUBLE, size) | high |
| Vector access pattern in plugins | performance_guidance | Batch reading with getDoubleConst/getIntConst etc. into buffers is more efficient than looping with getDouble/getInt | high |
| msum plugin example | buffering_strategy | Uses getDoubleConst in chunks of Util::BUF_SIZE returning const double* | high |
| Result vector writing strategy | method | Use result->getDoubleBuffer for writable buffer; then result->setDouble to write back; if buffer address matches underlying storage, copy can be avoided | high |
| DolphinDB double NULL representation | null_value | Uses minimum double value macro DBL_NMIN to represent NULL; must be specially checked | high |
| msum plugin example | null_output_rule | First windowSize - 1 elements of return are NULL; msum uses two loops (initial accumulate then sliding add/subtract) | high |
| Distributed SQL aggregate functions in DolphinDB | typical_io | Typically accept one or more vectors and ultimately return a scalar | high |
| DolphinDB vector storage | modes | Two storage modes: regular array (contiguous in memory) and big array (segmented/chunked) | high |
| isFastMode | purpose | Used to distinguish regular array vs big array for a vector | high |
| Regular array vector access | method | Use getDataArray to obtain pointer to underlying data for fast mode (example casts to double*) | high |
| Big array vector access | method | Use getSegmentSize and getDataSegment; getDataSegment returns pointer-to-pointer where each entry points to a segment block | high |
| Geometric mean example | formula | geometricMean([x1..xn]) = exp((log(x1)+...+log(xn))/n) | high |
| Distributed geometric mean implementation approach | components | Implement aggregate plugin logSum for partition log-sum; then define Reduce via defg and MapReduce via mapr | high |
| Underlying storage type in practice | note | Vector underlying storage may not be double; developers need to consider actual types; example uses generic programming (code in attachment) | medium |
| DolphinDB function selection (distributed vs non-distributed) | behavior | Typically implement both non-distributed and distributed versions; system chooses the more efficient version to call | high |
| Non-distributed geometricMean (script) | definition | def geometricMean(x) { return exp(logSum::logSum(x) \\ count(x)) } | high |
| Distributed geometricMean (script) | definition | def geometricMeanMap(x) { return logSum::logSum(x) } defg geometricMeanReduce(myLogSum, myCount) { return exp(sum(myLogSum) \\ sum(myCount)) } mapr geometricMean(x) { geometricMeanMap(x), count(x) -> geometricMeanReduce } | high |
| Plugin deployment in cluster for remote data | rule | If data resides on remote nodes, plugin must be loaded on each remote node; can manually call loadPlugin on each node or use each(rpc{, loadPlugin, pathToPlugin}, getDataNodes()) | high |
| Big array random access | method | Use getSegmentSizeInBit and bit operations to compute segment offset and in-segment offset (index >> bits, index & mask) | high |
| Vector access method choice | guidance | Buffered access (get*Const/get*Buffer) is more general and usually preferred; direct storage access (getDataArray/getDataSegment) can be used in special cases when storage mode and type are known | high |
| DolphinDB distributed algorithm framework | description | Map-Reduce is a general framework; DolphinDB provides mr and imr; plugin-based distributed algorithms also use these functions | high |
| Distributed algorithm plugin example (chapter 4) | goal | Compute average of all columns with specified column names in a distributed table (using mr) | high |
| Custom functions in distributed algorithm plugins | types_allowed | User-defined map/reduce/final/term functions can be operator functions or system functions | high |
| columnAvgMap (example) | behavior | For a table partition, sums specified columns and counts non-null elements; returns a length-2 tuple [sum, count] | high |
| Reduce function in columnAvg example | implementation | Uses built-in add function obtained via heap->currentSession()->getFunctionDef("add") | high |
| columnAvgFinal (example) | behavior | Computes sum/count from reduce result tuple to output average | high |
| Exporting plugin functions | steps | Add extern "C" to function declarations in header and list functions in the plugin load text file; then they can be obtained via heap->currentSession->getFunctionDef for use as parameters to mr | medium |
| mr map function arity | constraint | mr only allows map function with one parameter; if map needs more, use partial application | high |
| Partial application in columnAvg example | method | Wrap mapFunc with Util::createPartialFunction and args {new Void(), colNames} to form columnAvgMap{, colNames} | high |
| columnAvg (example) | mr_call_structure | Calls mr(ds, columnAvgMap{, colNames}, add, columnAvgFinal) by constructing mrArgs = {ds, mapWithColNames, reduceFunc, finalFunc} and invoking mr->call(heap, mrArgs) | high |
| Plugin deployment in cluster for remote data (chapter 4) | rule | If data is on remote nodes, load plugin on each remote node; can use each(rpc{, loadPlugin, pathToPlugin}, getDataNodes()) | high |
| columnAvg usage (script) | procedure | Generate data source via sqlDS(<select * from t>) after creating and filling a partitioned table; call columnAvg::columnAvg(ds, `v1`v2) | high |
| Stream subscription handler in DolphinDB | description | Subscriber processes received data via a handler function; subscription message can be table or tuple depending on subscribeTable msgAsTable parameter | high |
| Streaming handler plugin example | inputs | Accepts msg as tuple plus indices (int scalar or vector) and a table; inserts tuple elements at specified indices into the table | high |
| Table append interface | signature_and_behavior | bool append(vector<ConstantSP>& values, INDEX& insertedRows, string& errMsg): returns true on success and writes inserted row count; else returns false and writes error message | high |
| Logging append failures in handler example | build_flag | Include Logger.h and compile plugin with macro -DLOGGING_LEVEL_2 to log errors (LOG_ERR) | high |
| External data source plugin design considerations | topics | Data source object, schema extraction, and IO bottlenecks/IO interfaces | high |
| Data source (definition) | description | Special data object containing metadata; executing a data source yields a data entity (table/matrix/vector/etc.); can be used with distributed functions like olsEx, randomForestClassifier, mr/imr, and lower-level computing models | medium |
| sqlDS (built-in) | description | Built-in function that obtains a data source from an SQL expression | high |
| Data source representation in plugin design | representation | Often represented as a Code object that is a function call; parameters are metadata and returns a table | high |
| Schema (definition) | description | Describes table column count, column names, and data types; external interface should provide function to quickly obtain schema for user adjustments | high |
| DolphinDB IO abstractions | interfaces | DataInputStream and DataOutputStream abstract compression, endianness, and IO types (network/disk/buffer) | high |
| Multi-thread IO implementations | names | BlockFileInputStream and BlockFileOutputStream | high |
| BlockFileInputStream/OutputStream benefits | benefits | Enable compute/IO parallelism via async prefetch; avoid multi-thread disk contention by serializing disk IO to improve throughput | high |
| Flat-file binary example format | row_layout | Binary rows start at file head; each row has 4 columns: id (signed 64-bit, 8 bytes), symbol (C string, 8 bytes), date (BCD, 8 bytes), value (IEEE 754 double, 8 bytes); 32 bytes per row | high |
| Flat-file binary example format | hex_example | 0x 00 00 00 00 00 00 00 05 0x 49 42 4D 00 00 00 00 00 0x 02 00 01 09 00 03 01 03 0x 40 24 33 33 33 33 33 33 | high |
| extractMyDataSchema (example) | behavior | Creates a schema table via Util::createTable with columns name/type for fields id, symbol, date, value with types LONG, SYMBOL, DATE, DOUBLE | high |
| loadMyData (example) | syntax | loadMyData(path, [start], [length]) | high |
| loadMyData (example) | parameters | path plus optional int start (start row) and int length (total rows to read) | high |
| loadMyData (example) | bytes_per_row_assumption | bytesPerRow = 32 | high |
| createBlockFileInputStream usage in loadMyData | behavior | Can specify starting byte offset and total bytes to read using start*bytesPerRow and length*bytesPerRow | high |
| Binary read loop pattern (example) | behavior | Read chunks with readBytes into buf until actualLength <= 0 | high |
| Example: load binary bytes into DolphinDB char vector | implementation | Create VectorSP vec = Util::createVector(DT_CHAR, 0); appendChar(buf, actualLength) per chunk; return as single-column table | high |
| loadMyDataEx (example) | purpose | Avoid memory bottleneck for huge files by importing while saving into DolphinDB distributed table rather than loading all into memory first | high |
| loadMyDataEx (example) | syntax | loadMyDataEx(dbHandle, tableName, partitionColumns, path, [start], [length]) | high |
| loadMyDataEx (example) | behavior_existing_table | If table exists in database, append imported data to existing table; otherwise create table then append; returns the table | high |
| loadMyDataEx (example) | table_existence_check | Uses existsTable(dbPath, tableName) via getFunctionDef("existsTable")->call | high |
| loadMyDataEx (example) | load_existing_table | If exists, uses loadTable(db, tableName) via getFunctionDef("loadTable")->call | high |
| loadMyDataEx (example) | create_new_partitioned_table | If not exists, creates schema via extractMyDataSchema, makes empty table from schema using DBFileIO::createEmptyTableFromSchema, then createPartitionedTable(db, dummyTable, tableName, partitionColumns) | high |
| loadMyDataEx (example) | pipeline_framework_usage | Uses Pipeline framework: tasks are loadMyData calls with different start parameters; follower is partial application append!{result} to insert chunks | high |
| loadMyDataEx (example) | sizePerPartition | 16 * 1024 * 1024 | high |
| loadMyDataEx (example) | task_partitioning | partitionNum computed as fileLength / sizePerPartition; partitionLength computed as length / partitionNum with last partition adjusted | high |
| Alternative frameworks note (external data import) | options | Besides Pipeline, can use StaticStageExecutor (ComputingModel.h) or thread model Thread (Concurrent.h); approach depends on scenario | high |
| myDataDS (example) | behavior | Returns a tuple/vector of data sources; each data source is a Code object representing a loadMyData(path, partitionStart, partitionLength) call created by Util::createRegularFunctionCall and wrapped in DataSource | high |
| myDataDS (example) | sizePerPartition | 16 * 1024 * 1024 | high |
| Tutorial full code | repository_url | https://github.com/dolphindb/Tutorials_CN/tree/master/plugin | high |