冠军方案之 Apache Flink 极客挑战赛
freeopen 2021-02-10 [机器学习] #top1冠军:合肥工大 SkyPeaceLL
比赛任务
数据集
新冠病例行动数据集
- 病例历史行动数据集(训练集1) 1M+
- 确诊病例数据 (测试集1) 500+
- 实时病例行动数据集(测试集2) 1000+
- 人脸特征512维
天猫精灵行为数据集
- 天猫精灵历史行为数据集(训练集2) 1M+
- 用户行为数据集(测试集3) 500+
- 实时用户行为数据集(测试集4) 1000+
- 行为特征700维
四个任务
- 根据测试集1每条数据的特征向量,在训练集1中找出该病例(人)对应的所有记录。
- 对测试集2的每条数据,根据其特征向量进行实时分类(人)。
- 根据测试集3每条数据的特征向量,在训练集2中找出该用户行为(领域+意图)对应的所有记录。
- 对测试集4的每条数据,根据其特征向量进行实时分类(领域+意图)。
性能要求
-
四个任务总运行时间不能超过3小时。
-
对每条实时数据完成实时分类的响应时间不能超过500ms。
-
平台和组件
- Flink,PyFlink,Flink ai_flow,Proxima,Intel Zoo cluster serving
工作流
官方提供一套docker环境及baseline代码,由于新冠病例和天猫精灵在算法任务上的相似性,编写两个 workflow 配置文件(yaml),使得一套python代码运行两个算法。
基本思路:
- 并发读入100万条训练集(并发100),训练AutoEncoder模型,为高维特征降维;
- 用训练好的模型对特征向量降维(并发16);
- 用Proxima HnswBuilder 对降维后的特征向量创建索引;
- 对测试集1中的样本选出Top1024+1(样本)个候选者;
- 对候选者进行聚类(算法:Chinese Whisper),得出任务1的结果
- 从kafka读取测试集2,选出Top1,并以对应UUID作为分类label, 得出任务2的结果。
数据预处理
病例行动数据集 不含异常数据,且特征向量已经L2 Normalization,不需要特别的预处理。
天猫精灵行为数据集
存在一些异常数据,需要做以下预处理:
移除某些特征向量数据末尾多出的空格(注:如果不做相应处理,score3通常得0分)
Re-generate UUID for duplicated UUID
Processing zero vector
Processing duplicated vector
L2 Normalization
模型训练
目标:对特征向量降维
算法比较:
- Simple AutoEncoder (实测效果好,稳定,性能好,采用)
- Deep AutoEncoder(实测效果好,性能一般,最终未采用)
- VAE (Variational AutoEncoder) (实测效果相对较差,未采用)
- PCA (Principal Component Analysis) (实测效果相对较差,未采用)
- NMF (Non-negative matrix factorization) (实测效果相对较差,未采用)
模型参数
- Loss Function: MSE
- Active Function: Linear
维度选择
- 新冠病例: 512 => 128
- 天猫精灵: 700 => 128
技术栈性能指标
Intel Zoo Cluster Serving
- 支持Tensorflow Saved Model 以及PyTorch Model for Inference
- 支持并发Inference(本赛题设置为16个并发),在多并发下运行稳定
- 模型针对CPU做了优化,无需GPU环境
- 自动生成配置,方便部署
- 响应时间短。平均每个请求响应时间实测小于35ms,充分满足本方案中的性能需求。
达摩院proxima
- 使用Proxima HnswBuilder 创建索引,使用HnswSearch search vector
- 支持海量数据向量检索
- 召回率高,Top100 召回率超过98.5%
- 检索性能高,在本赛题中,平均每个请求(TopK=1024)的响应时间小于3ms,完全满足TopK筛选+再聚类这样类型的应用需求,对于实时的向量检索也毫无压力。
Online Data (kafka) 超时问题
按参考代码标准流程,可能是初始化较慢的原因,在开始时有8秒延迟,将导致16条数据被超时。
方法一、使用ai_flow 内建的算子
-
使用
ai_flow.read_example
、ai_flow.predict
、ai_flow.transform
和ai_flow.write_example
-
在其中的
SourceExecutor/SinkExecutor
实现类中使用PyFlink TABLE API(For Kafka) 读/写Kafka Topic -
为相应Flink job的
StreamExecutionEnvironment
设置参数:stream_env.enable_checkpointing(250)
该参数默认为3000ms
,3000ms
会导致每3秒才集中从Kafka Topic中读出6条数据。所以,如果不设置这个参数,必定会导致每6条数据中平均有5条会超时500ms,使得实时数据(score2和score4)得分很难超过100分(满分500分),因此必须改变这个参数设置。针对本赛题,可以设置为250ms
。方法一在产线上应用没什么问题,但是在本比赛中它有一个小问题,那就是初始会有8秒延迟,这个延迟会使得赛题程序开始发送的约16条数据被TABLE API(For Kafka)读到时都会超时500ms,从而对最终评分有所影响(实测大概影响6分左右)。 使用方法一可确保只会有少量的初始数据(实测16条左右)产生超时。
方法二、使用ai_flow的用户自定义算子
- ai_flow支持更为灵活的用户自定义算子
af.user_define_operation
- 在用户自定义算子的Executor实现类中,直接使用Kafka Consumer/Producer 读写Kafka Topic
- 直接通过 Kafka consumer从Kafka Topic读取数据,然后call Inference API (by Zoo cluster serving) 降维,然后使用Proxima search API search Top1 UUID,然后得出分类label,最后直接通过Kafka Producer 将结果数据写入Kafka Topic。
使用方法二可避免初始16条数据的超时问题,设置好关键参数 (如
fetch_max_wait_ms=200
),可确保所有数据都不会超时。