背景 在一个质量检测相关项目中,需要计算 SPC 和 CPK,但是 TDengine 中没有现成的函数,而我做的项目UI采用的 Grafana 来展示,不能从业务层解决,所以需要自己写一个 UDF。 主要是和采样有关,计算其实问题不大,因为采样的需求是每个小时或者每天采集一次, 每次采集连续采集 3 个样本,所以每个子组有 3 个样品而且从每个小时的第 10 组开始采集。
UDF 介绍 研究了 TDEngine 中关于 UDF 的章节: https://docs.taosdata.com/taos-sql/udf/
作用:扩展内置函数无法实现的查询功能,支持标量函数(每行输出一个值)和聚合函数(多行输出一个值)。 语言支持:C(高性能)和 Python(利用丰富生态库)。 安全机制:进程分离技术确保 UDF 崩溃不影响数据库服务。 可以用 C 和 和 Python 来实现。 C 的话效率肯定高,但是咱会的太少。 主要 Python 的话可以直接使用 NumPy 库,所以就选了 Python。
Python UDF 的注意事项 官方给了 Python UDF 的安装事项,如果 Python 不开--enable-shared
的话,会报错。其实也说白了 TDengine 就是通过 共享库 如何调用 Python 模块的。
但是官方的 Docker 镜像并不包含 Python 更别说--enable-shared
了。 所以我们第一步就是构建一个包含 Python 的 Docker 镜像。
在踩了一堆坑就快放弃的时候,终于可以了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 FROM tdengine/tdengine:3.3 .6.6 ENV DEBIAN_FRONTEND=noninteractiveENV PYTHONUNBUFFERED=1 RUN if grep -q "24.04" /etc/os-release; then \ echo "Types: deb\n\ URIs: http://mirrors.tuna.tsinghua.edu.cn/ubuntu-ports\n\ Suites: noble noble-updates noble-backports noble-security\n\ Components: main restricted universe multiverse\n\ Signed-By: /usr/share/keyrings/ubuntu-archive-keyring.gpg" > /etc/apt/sources.list.d/ubuntu.sources; \ else \ sed -i 's|http://[^/]*/ubuntu|http://mirrors.tuna.tsinghua.edu.cn/ubuntu|g' /etc/apt/sources.list; \ fi RUN apt-get update && apt-get install -y \ build-essential \ gcc \ g++ \ make \ wget \ curl \ zlib1g-dev \ libssl-dev \ libffi-dev \ libbz2-dev \ libreadline-dev \ libsqlite3-dev \ libncurses5-dev \ libncursesw5-dev \ xz-utils \ tk-dev \ libxml2-dev \ libxmlsec1-dev \ vim \ && rm -rf /var/lib/apt/lists/* WORKDIR /tmp RUN wget https://mirrors.huaweicloud.com/python/3.13.3/Python-3.13.3.tgz && \ tar -xzf Python-3.13.3.tgz && \ cd Python-3.13.3 && \ ./configure \ --enable-shared \ --enable-optimizations \ --prefix=/usr/local/python3 \ --with-ensurepip=install && \ make -j$(nproc ) && \ make install && \ cd / && \ rm -rf /tmp/Python-3.13.3* RUN echo "/usr/local/python3/lib" >> /etc/ld.so.conf && \ ldconfig RUN ln -sf /usr/local/python3/bin/python3 /usr/bin/python3 && \ ln -sf /usr/local/python3/bin/pip3 /usr/bin/pip3 RUN pip3 install --no-cache-dir -i https://pypi.tuna.tsinghua.edu.cn/simple \ taospyudf \ numpy \ pandas \ scipy RUN mkdir -p /opt/tdengine/udf COPY udf /opt/tdengine/udf RUN chmod 644 /opt/tdengine/udf/*.py RUN ldconfig
然后就是开发 UDF 了, 官方给的列子很好,基本上照着来一下就弄懂了。 因为我们这次写的 SPC 和 CPK 属于聚合函数,使用 聚合函数模板就行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def init (): def destroy (): def start () -> bytes : def reduce (inputs: datablock, buf: bytes ) -> bytes return new_state_bytes def finish (buf: bytes ) -> output_type:
上述代码定义了 3 个函数,分别用于实现一个自定义的聚合函数。具体过程如下。 首先,调用 start 函数生成最初的结果缓冲区。这个结果缓冲区用于存储聚合函数的内部状态,随着输入数据的处理而不断更新。 然后,输入数据会被分为多个行数据块。对于每个行数据块,调用 reduce 函数,并将当前行数据块(inputs)和当前的中间结果(buf)作为参数传递。reduce 函数会根据输入数据和当前状态来更新聚合函数的内部状态,并返回新的中间结果。 最后,当所有行数据块都处理完毕后,调用 finish 函数。这个函数接收最终的中间结果(buf)作为参数,并从中生成最终的输出。由于聚合函数的特性,最终输出只能包含 0 条或 1 条数据。这个输出结果将作为聚合函数的计算结果返回给调用者。
然后去注册这个 UDF , 注册的命令如下:
1 2 3 4 5 create or replace aggregate function spc as '/opt/tdengine/udf/spc.py' outputtype double bufsize 1024 language 'Python' ;create or replace aggregate function cpk as '/opt/tdengine/udf/cpk.py' outputtype double bufsize 1024 language 'Python' ;show functions;
可以直接通过执行 create or replace
命令来更新 UDF 的代码。 所以如果你需要修改 UDF 的代码,只需要修改代码然后重新注册一下就行。
我这里注册了两个 UDF , 一个是 SPC , 一个是 CPK 。 其中 SPC 的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 import jsonimport picklefrom datetime import datetimeimport ioimport mathimport statisticsLOG_FILE: io.TextIOBase = None def init (): """UDF初始化函数""" global LOG_FILE LOG_FILE = open ("/var/log/taos/mean_shift.log" , "wt" ) LOG_FILE.write("init function mean_shift success" ) def log (o ): LOG_FILE.write(str (o) + "\n" ) def destroy (): log("close log file: mean_shift.log" ) LOG_FILE.close() def start (): """聚合函数开始,初始化状态""" initial_state = { "hourly_data" : [], } log(f"start state: {initial_state} " ) return pickle.dumps(initial_state) def reduce (block, buf ): """聚合函数处理数据块""" state = pickle.loads(buf) (rows, cols) = block.shape() log(f"rows: {rows} , cols: {cols} ,first: {block.data(0 , 0 )} " ) if rows < 13 : return pickle.dumps(state) for i in range (10 , 13 ): sub_mean = 0 for j in range (0 , cols): value = block.data(i, j) sub_mean += value sub_mean = sub_mean / cols state["hourly_data" ].append(sub_mean) log(f"hourly_data: {state['hourly_data' ]} " ) return pickle.dumps(state) def finish (buf ): """聚合函数结束,计算SPC统计量并返回结果""" state = pickle.loads(buf) log(f"finish state: {state} " ) if not state["hourly_data" ]: return 0.0 return statistics.mean(state["hourly_data" ])
另一个代码太复杂,就不贴了。
最后执行的时候,使用 select spc(v1,v2,v3,....) from test;
就可以计算 SPC 了。
总结 有几个注意事项:
创建 UDF 的语句的 bufsize 1024
不写虽然能创建成功,但是就是执行不了。 检查 bug的话,要看两个地方,一个是 我们自己写的 log,一个是 /var/log/taos/taospyudf.log
,这个地方官方给了个坑,因为官方给的地址是错的: tail -10 /var/log/taos/taosudf.log
这个可能是 C 的 log 。 使用 UDF 的时候,不支持 * 语法,也就是说我上面的 select spc(v1,v2,v3,....) from test;
不能写成: select spc(*) from test;
替换原有版本的 TDEngine 的时候,注意提前测试,如果版本不兼容,就起不来了。 总之,TDengine 的 UDF 功能还是很强大的,可以满足很多需求。尤其是你不想改业务层,或者没有业务层的时候,可以考虑使用 UDF 来实现。这样也避免了大量数据传输到业务层,然后又传输回来的问题。 而且用到了 Python 丰富的库,可以很方便的实现一些功能。