在 TDengine 中使用 UDF 计算SPC和CPK
Todd

背景

在一个质量检测相关项目中,需要计算 SPC 和 CPK,但是 TDengine 中没有现成的函数,而我做的项目UI采用的 Grafana 来展示,不能从业务层解决,所以需要自己写一个 UDF。
主要是和采样有关,计算其实问题不大,因为采样的需求是每个小时或者每天采集一次, 每次采集连续采集 3 个样本,所以每个子组有 3 个样品而且从每个小时的第 10 组开始采集。

UDF 介绍

研究了 TDEngine 中关于 UDF 的章节: https://docs.taosdata.com/taos-sql/udf/

  • 作用:扩展内置函数无法实现的查询功能,支持标量函数(每行输出一个值)和聚合函数(多行输出一个值)。
  • 语言支持:C(高性能)和 Python(利用丰富生态库)。
  • 安全机制:进程分离技术确保 UDF 崩溃不影响数据库服务。

可以用 C 和 和 Python 来实现。 C 的话效率肯定高,但是咱会的太少。 主要 Python 的话可以直接使用 NumPy 库,所以就选了 Python。

Python UDF 的注意事项

官方给了 Python UDF 的安装事项,如果 Python 不开--enable-shared 的话,会报错。其实也说白了 TDengine 就是通过 共享库 如何调用 Python 模块的。

但是官方的 Docker 镜像并不包含 Python 更别说--enable-shared 了。 所以我们第一步就是构建一个包含 Python 的 Docker 镜像。

在踩了一堆坑就快放弃的时候,终于可以了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# 使用官方的 TDengine 镜像
FROM tdengine/tdengine:3.3.6.6

# 设置环境变量
ENV DEBIAN_FRONTEND=noninteractive
ENV PYTHONUNBUFFERED=1

# 设置国内源 DEB822
# 检测 Ubuntu 版本并动态配置
RUN if grep -q "24.04" /etc/os-release; then \
echo "Types: deb\n\
URIs: http://mirrors.tuna.tsinghua.edu.cn/ubuntu-ports\n\
Suites: noble noble-updates noble-backports noble-security\n\
Components: main restricted universe multiverse\n\
Signed-By: /usr/share/keyrings/ubuntu-archive-keyring.gpg" > /etc/apt/sources.list.d/ubuntu.sources; \
else \
sed -i 's|http://[^/]*/ubuntu|http://mirrors.tuna.tsinghua.edu.cn/ubuntu|g' /etc/apt/sources.list; \
fi

# 更新包管理器并安装必要的依赖
RUN apt-get update && apt-get install -y \
# 编译工具
build-essential \
gcc \
g++ \
make \
wget \
curl \
# Python编译依赖
zlib1g-dev \
libssl-dev \
libffi-dev \
libbz2-dev \
libreadline-dev \
libsqlite3-dev \
libncurses5-dev \
libncursesw5-dev \
xz-utils \
tk-dev \
libxml2-dev \
libxmlsec1-dev \
# 其他工具
vim \
&& rm -rf /var/lib/apt/lists/*

# 下载并编译Python 3.13.3(支持共享库)
WORKDIR /tmp
RUN wget https://mirrors.huaweicloud.com/python/3.13.3/Python-3.13.3.tgz && \
tar -xzf Python-3.13.3.tgz && \
cd Python-3.13.3 && \
./configure \
--enable-shared \
--enable-optimizations \
--prefix=/usr/local/python3 \
--with-ensurepip=install && \
make -j$(nproc) && \
make install && \
cd / && \
rm -rf /tmp/Python-3.13.3*

# 更新动态库路径
RUN echo "/usr/local/python3/lib" >> /etc/ld.so.conf && \
ldconfig

# 创建Python软链接
RUN ln -sf /usr/local/python3/bin/python3 /usr/bin/python3 && \
ln -sf /usr/local/python3/bin/pip3 /usr/bin/pip3

# 安装Python UDF必需的包 注意如果还想增加其他库的话,可以在这里加
RUN pip3 install --no-cache-dir -i https://pypi.tuna.tsinghua.edu.cn/simple \
taospyudf \
numpy \
pandas \
scipy

# 创建UDF目录
RUN mkdir -p /opt/tdengine/udf

# 复制UDF文件
COPY udf /opt/tdengine/udf

# 设置UDF文件权限
RUN chmod 644 /opt/tdengine/udf/*.py

RUN ldconfig

然后就是开发 UDF 了, 官方给的列子很好,基本上照着来一下就弄懂了。 因为我们这次写的 SPC 和 CPK 属于聚合函数,使用 聚合函数模板就行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def init():
#initialization
def destroy():
#destroy
def start() -> bytes:
#return serialize(init_state)
def reduce(inputs: datablock, buf: bytes) -> bytes
# deserialize buf to state
# reduce the inputs and state into new_state.
# use inputs.data(i, j) to access python object of location(i, j)
# serialize new_state into new_state_bytes
return new_state_bytes
def finish(buf: bytes) -> output_type:
#return obj of type outputtype

上述代码定义了 3 个函数,分别用于实现一个自定义的聚合函数。具体过程如下。
首先,调用 start 函数生成最初的结果缓冲区。这个结果缓冲区用于存储聚合函数的内部状态,随着输入数据的处理而不断更新。
然后,输入数据会被分为多个行数据块。对于每个行数据块,调用 reduce 函数,并将当前行数据块(inputs)和当前的中间结果(buf)作为参数传递。reduce 函数会根据输入数据和当前状态来更新聚合函数的内部状态,并返回新的中间结果。
最后,当所有行数据块都处理完毕后,调用 finish 函数。这个函数接收最终的中间结果(buf)作为参数,并从中生成最终的输出。由于聚合函数的特性,最终输出只能包含 0 条或 1 条数据。这个输出结果将作为聚合函数的计算结果返回给调用者。

然后去注册这个 UDF , 注册的命令如下:

1
2
3
4
5
create or replace aggregate function spc as '/opt/tdengine/udf/spc.py' outputtype double bufsize 1024 language 'Python';
create or replace aggregate function cpk as '/opt/tdengine/udf/cpk.py' outputtype double bufsize 1024 language 'Python';

-- 通过 show functions 命令查看是否注册成功
show functions;

可以直接通过执行 create or replace 命令来更新 UDF 的代码。 所以如果你需要修改 UDF 的代码,只需要修改代码然后重新注册一下就行。

我这里注册了两个 UDF , 一个是 SPC , 一个是 CPK 。 其中 SPC 的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import json
import pickle
from datetime import datetime
import io
import math
import statistics


LOG_FILE: io.TextIOBase = None


def init():
"""UDF初始化函数"""
global LOG_FILE
LOG_FILE = open("/var/log/taos/mean_shift.log", "wt")
LOG_FILE.write("init function mean_shift success")


def log(o):
LOG_FILE.write(str(o) + "\n")


def destroy():
log("close log file: mean_shift.log")
LOG_FILE.close()


def start():
"""聚合函数开始,初始化状态"""
# 初始化状态:存储每小时的数据和子组结果
initial_state = {
"hourly_data": [],
}
log(f"start state: {initial_state}")
return pickle.dumps(initial_state)


def reduce(block, buf):
"""聚合函数处理数据块"""
state = pickle.loads(buf)

# 获取数据块的行数和列数
(rows, cols) = block.shape()

log(f"rows: {rows}, cols: {cols},first: {block.data(0, 0)}")
# 从第10条开始,取三个作为一个子组,如果不够 10个 直接返回 不处理
if rows < 13:
return pickle.dumps(state)

# 从 10 开始取3 条
for i in range(10, 13):
sub_mean = 0
for j in range(0, cols):
value = block.data(i, j)
sub_mean += value
sub_mean = sub_mean / cols
state["hourly_data"].append(sub_mean)
log(f"hourly_data: {state['hourly_data']}")
return pickle.dumps(state)


def finish(buf):
"""聚合函数结束,计算SPC统计量并返回结果"""
state = pickle.loads(buf)
log(f"finish state: {state}")
if not state["hourly_data"]:
return 0.0
return statistics.mean(state["hourly_data"])

另一个代码太复杂,就不贴了。

最后执行的时候,使用 select spc(v1,v2,v3,....) from test; 就可以计算 SPC 了。

总结

有几个注意事项:

  1. 创建 UDF 的语句的 bufsize 1024 不写虽然能创建成功,但是就是执行不了。
  2. 检查 bug的话,要看两个地方,一个是 我们自己写的 log,一个是 /var/log/taos/taospyudf.log ,这个地方官方给了个坑,因为官方给的地址是错的: tail -10 /var/log/taos/taosudf.log 这个可能是 C 的 log 。
  3. 使用 UDF 的时候,不支持 * 语法,也就是说我上面的 select spc(v1,v2,v3,....) from test; 不能写成: select spc(*) from test;
  4. 替换原有版本的 TDEngine 的时候,注意提前测试,如果版本不兼容,就起不来了。

总之,TDengine 的 UDF 功能还是很强大的,可以满足很多需求。尤其是你不想改业务层,或者没有业务层的时候,可以考虑使用 UDF 来实现。这样也避免了大量数据传输到业务层,然后又传输回来的问题。
而且用到了 Python 丰富的库,可以很方便的实现一些功能。

 评论
评论插件加载失败
正在加载评论插件
由 Hexo 驱动 & 主题 Keep
本站由 提供部署服务
总字数 89.8k 访客数 访问量