冠军方案之 Apache Flink 极客挑战赛

freeopen 2021-02-10 [机器学习] #top1

冠军:合肥工大 SkyPeaceLL

比赛任务

数据集

新冠病例行动数据集

天猫精灵行为数据集

四个任务

性能要求

工作流

flink-workflow

官方提供一套docker环境及baseline代码,由于新冠病例和天猫精灵在算法任务上的相似性,编写两个 workflow 配置文件(yaml),使得一套python代码运行两个算法。

基本思路:

  1. 并发读入100万条训练集(并发100),训练AutoEncoder模型,为高维特征降维;
  2. 用训练好的模型对特征向量降维(并发16);
  3. 用Proxima HnswBuilder 对降维后的特征向量创建索引;
  4. 对测试集1中的样本选出Top1024+1(样本)个候选者;
  5. 对候选者进行聚类(算法:Chinese Whisper),得出任务1的结果
  6. 从kafka读取测试集2,选出Top1,并以对应UUID作为分类label, 得出任务2的结果。

数据预处理

病例行动数据集 不含异常数据,且特征向量已经L2 Normalization,不需要特别的预处理。

天猫精灵行为数据集

存在一些异常数据,需要做以下预处理:

移除某些特征向量数据末尾多出的空格(注:如果不做相应处理,score3通常得0分)

Re-generate UUID for duplicated UUID

Processing zero vector

Processing duplicated vector

L2 Normalization

模型训练

目标:对特征向量降维

算法比较:

  1. Simple AutoEncoder (实测效果好,稳定,性能好,采用)
  2. Deep AutoEncoder(实测效果好,性能一般,最终未采用)
  3. VAE (Variational AutoEncoder) (实测效果相对较差,未采用)
  4. PCA (Principal Component Analysis) (实测效果相对较差,未采用)
  5. NMF (Non-negative matrix factorization) (实测效果相对较差,未采用)

模型参数

  1. Loss Function: MSE
  2. Active Function: Linear

维度选择

  1. 新冠病例: 512 => 128
  2. 天猫精灵: 700 => 128

技术栈性能指标

Intel Zoo Cluster Serving

  1. 支持Tensorflow Saved Model 以及PyTorch Model for Inference
  2. 支持并发Inference(本赛题设置为16个并发),在多并发下运行稳定
  3. 模型针对CPU做了优化,无需GPU环境
  4. 自动生成配置,方便部署
  5. 响应时间短。平均每个请求响应时间实测小于35ms,充分满足本方案中的性能需求。

达摩院proxima

  1. 使用Proxima HnswBuilder 创建索引,使用HnswSearch search vector
  2. 支持海量数据向量检索
  3. 召回率高,Top100 召回率超过98.5%
  4. 检索性能高,在本赛题中,平均每个请求(TopK=1024)的响应时间小于3ms,完全满足TopK筛选+再聚类这样类型的应用需求,对于实时的向量检索也毫无压力。

Online Data (kafka) 超时问题

按参考代码标准流程,可能是初始化较慢的原因,在开始时有8秒延迟,将导致16条数据被超时。

方法一、使用ai_flow 内建的算子

  1. 使用ai_flow.read_exampleai_flow.predictai_flow.transformai_flow.write_example

  2. 在其中的SourceExecutor/SinkExecutor实现类中使用PyFlink TABLE API(For Kafka) 读/写Kafka Topic

  3. 为相应Flink job的StreamExecutionEnvironment设置参数:stream_env.enable_checkpointing(250) 该参数默认为3000ms3000ms会导致每3秒才集中从Kafka Topic中读出6条数据。所以,如果不设置这个参数,必定会导致每6条数据中平均有5条会超时500ms,使得实时数据(score2和score4)得分很难超过100分(满分500分),因此必须改变这个参数设置。针对本赛题,可以设置为250ms

    方法一在产线上应用没什么问题,但是在本比赛中它有一个小问题,那就是初始会有8秒延迟,这个延迟会使得赛题程序开始发送的约16条数据被TABLE API(For Kafka)读到时都会超时500ms,从而对最终评分有所影响(实测大概影响6分左右)。 使用方法一可确保只会有少量的初始数据(实测16条左右)产生超时。

方法二、使用ai_flow的用户自定义算子

  1. ai_flow支持更为灵活的用户自定义算子af.user_define_operation
  2. 在用户自定义算子的Executor实现类中,直接使用Kafka Consumer/Producer 读写Kafka Topic
  3. 直接通过 Kafka consumer从Kafka Topic读取数据,然后call Inference API (by Zoo cluster serving) 降维,然后使用Proxima search API search Top1 UUID,然后得出分类label,最后直接通过Kafka Producer 将结果数据写入Kafka Topic。 使用方法二可避免初始16条数据的超时问题,设置好关键参数 (如fetch_max_wait_ms=200),可确保所有数据都不会超时。

Back to top