本文整理自阿里巴巴算法专家赵伟波,在 Flink Forward Asia 2023 AI特征工程专场的分享。本篇内容首要分为以下四部分:
- Flink ML 概略
- 在线学习的规划与运用
- 在线推理的规划与运用
- 特征工程算法与运用
一、Flink ML 概略
Flink ML 是 Apache Flink 的子项目,遵循 Apache 社区规范,愿景是成为实时传统机器学习的事实标准。
2022年1月份 Flink ML API 发布,7月份发布齐备、高功能的 Flink ML 基础设施,2023年4月份发力特征工程算法并服务用户,6月份支撑 Flink 多版别。
二、在线学习的规划与运用
2.1 在线机器学习工作流样例
有两个模型AB,用 online 在线学习的方法去练习这两个模型,而且运用模型去进行在线推理,在推理过程中这个模型是活动形式,叫做 Model stream (模型流),以模型流的方法将模型不断地流入链路中,使模型具有更好的实时性。推理结束之后,推理样本会推荐给一些前方的客户,客户对结果进行反馈,再进行一些样本的拼接,终究返回到练习的数据流形成闭环,这就是工作流样例。
接下来以工作流样例来介绍在线学习的规划。练习数据进行切分后,切成不同的 window,每个 window 在经过 Estimator 的时分需求更新里边的模型,之后该模型会流到下面推理的链路中,跟着数据的不断流入,模型会一个接一个的往推理的链路中活动,这就是 Model stream(模型流),其思路是经过把模型做成一个行列的方法去支撑推理以到达更好的时效性。
存在的问题:
- 如何使数据拆分更加合理?对不同的事务有不同的要求,有的期望用时刻,有的期望用巨细,都需求一些战略。
- 由于数据和模型都是活动的,两个往同一个当地去流,那么如何决议一条样本来了之后用哪个模型进行推理?
- 如何保证模型的一致性?由于链路中有两个模型,假如两个模型的练习数据不一致会导致出现一些问题。
- 数据是用哪一个模型推理出来的?每一条样本是哪个模型推理出来的,猜测的好坏需求去追溯源头。
2.2 在线机器学习的规划
针对四个问题,有四条规划需求:
- 支撑将输入数据划分为多个 window 进行练习,产生一个模型流。
- 支撑运用输入的模型流来对数据进行猜测。
- 支撑用户指定推理数据和当前模型数据的时刻差。每一条样本来了之后,咱们期望用最新的模型去进行推理,可是最新的模型或许还没有练习出来,这个时分就需求设定一个时刻差,答应它用非最新的模型进行推理。
- 支撑在输出数据中暴露猜测每条数据时运用的模型版别。从猜测结果追溯出模型的需求。
针对这些需求,咱们的规划方案是:
-
添加 HasWindows 接口。
答运用户声明划分数据的不同战略。
-
为 ModelData 添加 model version 和 timestamp。 model version 的值从 0 开始,每次添加 1。 模型数据的时刻戳为练习得到该模型的数据的最大时刻戳。
-
添加 HasMaxAllowedModelDelayMs 接口。
答运用户指定猜测数据 D 时,运用的模型数据 M 早于 D 的时刻小于等于设定的阈值。
-
添加 HasModelVersionCol 接口。 推理过程中,答运用户输出猜测每条数据时运用的模型版别。
有了方案之后再回来看问题:
-
怎样切分 window:
提供 window 战略,用户能够根据自己的需求去做一些合适自己事务场景的切分。
-
挑选哪一种模型来推理当前数据:
经过阈值参数设定答应离当前数据多远的模型进行推理;理论上能够用最新模型,可是或许会造成等待之类的问题。
-
关于模型的一致性:
每一条样本在猜测的时分都会带一个模型版别,经过第一个模型猜测再到第二个模型推理的时分会主动获取版别号,两头用相同的版别进行推理,终究输出的结果会带有一个版别号。
这样就把最初提的四个问题解决了。
2.3 在线学习在阿里云实时日志聚类的运用
阿里云 ABM 运维中心会把阿里一切平台的日志都收集到一起,然后会针对过错日志做一个聚类,把过错日志发送到对应的部门,去进行后续的处理。
传统算法工程链路首先进行数据输入,用 Flink job 进行数据加工处理,数据会落盘,之后经过守时调度来拉起聚类算法,然后写出模型,这个模型再经过加载的方法拉起 Flink job 进行数据猜测,可是整个链路具有局限性,流程比较复杂,运维本钱比较高,实时性低,而且功能难以保证。
日志聚类算法流程把体系日志进行预处理和编码后分词,做特征挑选提取关键词,然后做日志的特征表明和标准化,再做层次聚类,日志的类别,终究写出到数据库,用来辅导分词。
针对该流程咱们运用 Flink ML 构建流式日志聚类就能够把这个流程串起来。经过 Flink job 拼接 SLS 与数据库全量数据,接着进行清洗和编码日志数据,然后分词和标准化,核算聚类结果,终究选取簇内典型代表日志。
把这个案例中的算子进行抽取,像 SLS 流式读取,分词,日志的向量化,特征挑选,特征的标准化,这些并不是事务独有的,而是很多在线学习事务都需求的算子,把它抽取出来,做成一个独立的组件,客户需求做在线学习流程的时分能够来复用这些算子。
日志聚类算法链路升级的收益:
- 在链路推迟方面,将本来 5 min 的推迟下降到 30s
- 运营本钱下降,现在只需求保持 1 个 Flink 作业
- 剖析本钱下降
- 算法功能提升
三、在线推理的规划与运用
推理首要分为:
- 批量推理:例如,有 100w 条数据落盘,然后起一个批的使命对这100万条数据进行推理,再进行落盘。
- Near-line (近线)推理:根据 Flink 的使命,读取 Kafka 数据,经过 Transformer 的方法对流式的数据进行推理。这种推理有一个比较大的问题是推迟比较高,一般在百毫秒量级,在实际的事务场景中,推理需求很低的推迟,一般是几十毫秒甚至几毫秒,这就需求咱们做一个推理结构去习惯高要求的事务场景。
在做这个之前咱们对 Spark ML 的推理进行了一个调研。后来发现 Spark ML 自身是没有推理模块的,它有一个 mleap,把 Spark 推理这部分做成一个推理结构,这个推理结构与引擎 Runtime 彻底无关,减少依赖抵触,是一个更轻量的结构,别的这个新结构能够为推理重写核算逻辑代码,拥有更大的优化空间。
3.1 规划需求
规划需求借鉴了 mleap 的做法:
- 数据表明(与 Flink Runtime 无关)
- 单条数据表明:Row
- 批量数据表明:DataFrame
- 数据类型表明,提供 Vector、Matrix 等类型的支撑
- 推理逻辑表明
- 模型加载
- 支撑从 Model/Transoformer#save 的文件中加载
- 支撑动态加载模型数据,而不需求重启
- Utils
- 支撑查看 Transformer/PipelineModel 是否支撑在线推理
- 串联多个推理逻辑成单个推理逻辑
在这个规划需求下,左面是推理的数据结构 DataFrame,包含了 Column names, Column types, Row,进入推理逻辑之后输出仍是相同的数据结构,这样整个推理结构就能够串起来,不需求有数据结构转化。
模型加载这边都是经过 save 函数将模型写入到磁盘,左面的 save 是 Flink ML 做的工作,右边的 loadServable 是推理结构做的工作,经过这两个函数完结了模型的保存加载和推理。
接下来以逻辑回归为例来看代码的完结,经过 save 函数把模型写出到指定的目录,下面的 load 是推理结构做的工作,以 load 模型的文件去做推理。
模型的数据更新这部分是经过把一个模型写入到 kafka 里边,kafka 再 set 到模型的 Servable 里边,当把模型写入到 kafka 里的时分模型会自然而然的流入到 Servable 里边,终究完结模型的动态更新。
下面是代码
setModelData 的输入是 InputStream,它能够从 kafka 里读入,当更新 kafka 里的数据时它就能够更新到模型里边。
别的咱们也支撑 PipelineModel 推理,能够从 PipelineModel 的模型数据构建 Servable, 查看 PipelineModel 是否支撑在线推理,不需求履行练习作业就能判别。
3.2 运用场景
终究来看运用场景,这是一个简化的 ML 模型练习、猜测和部署的流程。首先是读入数据,做特征工程,然后做评价和部署。这边运用 PipelineModel 将标准化和 GBT 分类这两个模型打到 Pipeline 里边去,再去做在线的推理服务。
以下是代码
将标准化和 GBT 两个模型经过 Pipeline 写出去,在推理模块中终究完结 Pipeline 的推理,而且推理支撑写出和动态加载。
四、特征工程算法与运用
4.1 特征工程算法
新增27个算法,总共33个,基本掩盖常用算法。
4.2 特征工程的运用
首先是做推荐,广告的评价,都需求特征的处理。第二个运用场景是用于完结一些复杂的算法,以 GBT 为例,处理数值特征和处理类别型特征。别的在大言语模型这块,Flink ML 也做了一些规划。
接下来以大言语模型为例,来看特征工程的事务。高质量的文本输入能够获得更好的大言语模型,而文本近似去重能提高文本质量。关于互联网数据来说,文本重复的份额一般 20%-60% 之间,文本规模越大,重复份额越高。
针对这个问题,咱们规划了近似去重流程:
- 不同于精确去重:不要求彻底一致,或许子串关系
- 根据部分敏感性哈希 Locality-sensitive hashing:类似的样本更容易被 Hash 到相同的 buckets 内
- 关于文本数据来说,一般根据文本特征化后的 Jaccard 距离,运用 MinHashLSH 来找到类似文本
经过这些组件,能够完结文本去重流程:
- Tokenizer:进行分词
- HashingTF:将文本变换为 Binary 特征
- MinHash:核算文本签名
- MinHashLSH:进行 SimilarityJoin,找到类似对
终究是功能的测试,这是手动构造的 Benchmark 数据集,直接经过仿制、删除之类的操作拿到一个数据集。关于5亿的数据,重复率为 50%,耗时大约 1.5h,后面是对应的去重作用。