揭秘字节跳动埋点数据实时动态处理引擎(附源码) 您所在的位置:网站首页 G681实时动态 揭秘字节跳动埋点数据实时动态处理引擎(附源码)

揭秘字节跳动埋点数据实时动态处理引擎(附源码)

2024-07-17 16:55| 来源: 网络整理| 查看: 265

感谢您的关注 + 点赞 + 再看,对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! ”

1.序篇-先说结论

宝贝们,还记得前几天博主去的火山引擎大数据场嘛,其中比较令大家感兴趣的就是最后一讲,字节一站式埋点平台的 flink 标准化清洗及拆流任务。

其中大家感觉比较流啤的就是的就是字节做到了:

不重启任务可以上下线新的拆流及清洗规则,所有的规则变更都不需要涉及到任务的重启。清洗 udf,rpc 接口热加载

总的来说就是任务永不停,不可能停止的,好么,beiber。

★字节火山引擎 PPT。公众号回复 20210724 获取。 ”

6

★本文博主就主要介绍第一点,即做到规则动态变化,可以做到动态添加一个 sink kafka topic,动态删除一个 sink kafka topic,而不重启任务。相信能抛砖引玉,给大家一些启发。 ”

本文从以下几个章节详细介绍框架实现:

背景篇-为啥需要这么个框架定义、目标篇-做这个框架的目标、预期效果是什么难点剖析篇-此框架建设的难点、业界目前的实现数据建设篇-框架具体方案设计数据保障篇-框架的保障方案总结与展望篇2.背景篇-为啥需要这么个框架

首先来看看字节他们做这件事情的背景:

任务重启造成数据的延迟:对于字节这种企业来说且每天都会新上线很多的埋点,把这些新的埋点拆流条件加入 flink 任务就要重启,但是字节客户端日志流量都是千万级别 qps 的,就意味着这个 flink 任务一旦重启耗时肯定是很长的,这对时延敏感的业务是不可接受的。减少对于原始客户端日志的烟囱式消费,节约资源统一标准化的埋点平台:用户能通过埋点平台用到正确的数据与埋点平台联动的、统一化的、标准化的流式数据处理平台:用户能通过这个平台去获取想要的统一标准化的数据数据的分级保障能力:Dump 日志,日志的产出需要分优先级进行保障(死保、尽力保...),用户能放心的用数据

如图:

因此诞生了这个框架。

项目代码、在博主公众号回复【揭秘字节跳动埋点数据实时动态处理引擎】获取:

3.定义、目标篇-做这个框架的目标、预期效果是什么

上述的痛点很多,本节就从最痛的任务重启的延迟角度出发解决问题,揭秘字节动态配置化的 flink 任务的实现。

预期效果如下:

1.即在任务不停止的情况下可以动态的上线一个动态规则、一个 sink kafka topic,上线某个、某类埋点对应的流数据的 kafka topic

如图左边是修改配置,添加了一个拆流规则以及对应 topic,右边这个规则 topic 就开始产出数据,对应的 console consumer 就消费到了复合规则的数据。(gif 加载可能比较慢)

8

2.即在任务不停止的情况下可以动态的下线一个动态规则、一个 sink kafka topic,下线某个、某类埋点对应的流数据的 kafka topic

如图左边是修改配置,删除了一个拆流规则以及对应 topic,右边这个规则 topic 就不产出数据了,对应的 console consumer 就没有新数据可以消费了。(gif 加载可能比较慢)

9

3.总体效果如下:

4.难点剖析篇-此框‍架建设的难点、业界目前的实现

首先带大家分析下,实现这个框架,最基本的模块都需要包含什么:

flink 任务:本身就是一个 Map 任务,逻辑简单动态上下线规则配置:肯定得有一个动态配置中心去告诉 flink 任务需要新上下线一个 kafka topic动态规则过滤引擎:flink 任务监听到规则发生动态变化之后,要热更新规则,将新的规律规则应用起来。需要一个动态代码执行引擎动态上下线 Kafka topic:目前大多数公司用的是 flink 自带的 kafka-connector,一旦涉及到需要添加一个下游,就需要添加一个 kafka producer operator,因为涉及到多加了一个 operator,那肯定得重启任务。需要动态添加删除 producer 的能力。5.数据建设篇-框架具体方案设计5.1.方案设计5.1.1.方案

先说说方案选择的结论:

flink 入口任务:Map 模型使用 ProcessFunction 底层算子动态上下线规则配置:配置中心开源的有很多,这里为了实现轻量化,实现简单,使用 zookeeper 作为动态规则配置中心。当然如果对 zk 压力大,也可以使用广播配置实现。动态规则引擎:规则引擎很多,比如常见的可以使用 JavaScript、Groovy、jython、mvel2、freemarker 等等,太多了。考虑到性能、易用性选用 janino 将动态规则动态编译出 class。然后作为动态规则引擎使用。后面会详述选用 janino 的原因。动态上下线 Kafka topic:去除 flink-kafka-connector,直接在 ProcessFunction 中使用原生 kafka-clients 输出数据,维护一个 producer 池。

整体方案架构图如图所示:

项目代码、在博主公众号回复【揭秘字节跳动埋点数据实时动态处理引擎】获取:

5.1.2.预期效果5.1.2.1.上线配置

4

5.1.2.2.下线配置

5

5.2.具体实现

整个任务的实现非常简单。

本地运行,可以参考下面两篇安装 zk 和 kafka。

zk:https://www.jianshu.com/p/5491d16e6abdkafka:https://www.jianshu.com/p/dd2578d47ff65.2.1.flink 任务入口逻辑

首先来看看整个任务的入口逻辑,ProcessFunction 的功能很简单:

针对数据源的每一条日志数据,遍历动态规则引擎池只要这条数据满足某一条规则的条件,就将这条日志数据写出到规则对应的 topic 中代码语言:javascript复制 env.addSource(new UserDefinedSource()) .process(new ProcessFunction() { // 动态规则配置中心 private ZkBasedConfigCenter zkBasedConfigCenter; // kafka producer 管理中心 private KafkaProducerCenter kafkaProducerCenter; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.zkBasedConfigCenter = ZkBasedConfigCenter.getInstance(); this.kafkaProducerCenter = KafkaProducerCenter.getInstance(); } @Override public void processElement(ClientLogSource clientLogSource, Context context, Collector collector) throws Exception { // 遍历所有的动态规则 this.zkBasedConfigCenter.getMap().forEach(new BiConsumer() { @Override public void accept(Long id, DynamicProducerRule dynamicProducerRule) { // 验证该条数据是否符合该条规则 if (dynamicProducerRule.eval(clientLogSource)) { // 将符合规则的数据发向对应规则的 topic 中 kafkaProducerCenter.send(dynamicProducerRule.getTargetTopic(), clientLogSource.toString()); } } }); } @Override public void close() throws Exception { super.close(); // 关闭规则池 this.zkBasedConfigCenter.close(); // 关闭 producer 池 this.kafkaProducerCenter.close(); } }); env.execute(); 5.2.2.动态上下线规则配置

来看 flink ProcessFunction 中的核心点,第一部分就是 ZkBasedConfigCenter。其功能包含:

任务启动时,初始化加载 zk 配置,初始化规则池,将规则池中的配置规则编译成 class 可执行规则监听 zk 配置变更,将新增配置加入规则池,将下线配置从规则池删除5.2.2.1.动态规则 schema 设计

动态规则包含的内容与用户需求息息相关:

举例:用户需要将在首页上报 + id > 300 用户的客户端日志都写入 topic_id_bigger_than_300_and_main_page 的 kafka topic 中。

那么针对这个 flink 任务来说就有以下三项用户的输入:

动态规则的过滤条件:即上游每一条数据过来之后检验这条数据是否满足规则条件。上面这个例子的条件就是 clientLogSource.getId() > 300 && clientLogSource.getPage().equals("首页");其中 clientLogSource 是原始日志 model动态规则要写入的 topic 名称:这条规则过滤出来的数据要写入哪个 topic。上面这个例子的 topic 就是 topic_id_bigger_than_300_and_main_page动态规则的唯一 id:唯一标识一个过滤规则的 id

针对上述要求设计动态规则配置的 schema 如下:

代码语言:javascript复制{ "id-数值类型 string": { "condition-过滤条件": "1==1", "targetTopic-目标 topic 名称": "tuzisir1" } "1": { "condition": "clientLogSource.getId() > 300 && clientLogSource.getPage().equals(\"首页\")", "targetTopic": "topic_id_bigger_than_300_and_main_page" }, "2": { "condition": "clientLogSource.getPage().equals(\"个人主页\")", "targetTopic": "topic_profile_page" } }

对应动态规则 model 设计如下:

项目代码、在博主公众号回复【揭秘字节跳动埋点数据实时动态处理引擎】获取:

代码语言:javascript复制public class DynamicProducerRule implements Evaluable { // 具体过滤规则 private String condition; // 具体写入 topic private String targetTopic; // 使用 janino 编译的规则过滤器 private Evaluable evaluable; public void init(Long id) { try { // 使用 janino 初始化规则 Class clazz = JaninoUtils.genCodeAndGetClazz(id, targetTopic, condition); this.evaluable = clazz.newInstance(); } catch (Exception e) { throw new RuntimeException(e); } } @Override public boolean eval(ClientLogSource clientLogSource) { return this.evaluable.eval(clientLogSource); } }

重点在于 Evaluable 接口,动态生成代码就是继承了这个接口,用于执行过滤规则的基础接口。

代码动态生成下面会详细介绍。

代码语言:javascript复制public interface Evaluable { // 动态规则接口过滤方法 boolean eval(ClientLogSource clientLogSource); } 5.2.2.2.基于 zk 的动态配置中心

使用了 zk 作为动态配置中心,来动态监听规则配置以及更新规则池。

代码语言:javascript复制public class ZkBasedConfigCenter { // zk config 变化监听器 private TreeCache treeCache; // zk 客户端 private CuratorFramework zkClient; private ZkBasedConfigCenter() { try { open(); } catch (Exception e) { throw new RuntimeException(e); } } // !!!规则池!!!规则池!!!规则池 private ConcurrentMap map = new ConcurrentHashMap(); private void open() throws Exception { // 初始化规则 // 初始化 zk config 监听器 // 当有配置变更时 // 调用 private void update(String json) 更新规则 } public void close() { this.treeCache.close(); this.zkClient.close(); } private void update(String json) { Map result = getNewMap(json); // 1.将新增规则添加进规则池 // 2.将下线规则从规则池删除 } private Map getNewMap(String json) { // 将新规则解析,并使用 janino 进行初始化 } }

可以使用一个固定路径的配置,如图博主使用的是 /kafka-config 这个路径

7

5.2.3.动态规则引擎

目前字节使用的引擎是 Groovy,但是博主常用 flink sql,sql 中的代码生成是使用 janino 做的,因此就比较了 janino 和 groovy 的性能差异,janino 编译出的原生 class 性能接近原生 class,是 Groovy 的 4 倍左右。其他的引擎不考虑,要么易用性差,要么性能差。

★Notes:性能这一点真的是很重要,1:4 的差距可以说是差别很大了。如果你的场景也是大流量,非常耗费性能的场景,建议直接入手 janino!!! ”

来看看具体的 benchmark case 代码:

代码语言:javascript复制// ClientLogSource 是原始日志 boolean eval(flink.examples.datastream._01.bytedance.split.model.ClientLogSource clientLogSource) { return String.valueOf(clientLogSource.getId()).equals("1"); }

上面这段代码,在博主 mac 本地执行,每次循环执行 5kw 次,总计执行 5 次 得出的结果如下:

代码语言:javascript复制java:847 ms janino:745 ms groovy:4110 ms java:1097 ms janino:1170 ms groovy:4052 ms java:916 ms janino:1117 ms groovy:4311 ms java:915 ms janino:1112 ms groovy:4382 ms java:921 ms janino:1104 ms groovy:4321 ms

重复执行了很多次:java object : janino 编译原生 class :groovy :几乎都是 1:1:4 的耗时。所以此处我们选择性能更好的 janino。

代码语言:javascript复制public class JaninoUtils { public static Class genCodeAndGetClazz(Long id, String topic, String condition) throws Exception { // 动态生成代码 // 初始化 Class 并返回 } } 5.2.4.动态上下线 Kafka topic

来看入口类中的第二个核心点,就是 KafkaProducerCenter。其功能包含:

维护所有的 producer 池提供消息发送接口

项目代码、在博主公众号回复【揭秘字节跳动埋点数据实时动态处理引擎】获取:

代码语言:javascript复制public class KafkaProducerCenter { // kafka producer 池 private final ConcurrentMap producerConcurrentMap = new ConcurrentHashMap(); private Producer getProducer(String topicName) { // 如果 kafka producer 池中有当前 topic 的 producer,则直接返回 // 如果没有,则初始化一个新的 producer 然后返回 } public void send(String topicName, String message) { final ProducerRecord record = new ProducerRecord(topicName, "", message); try { RecordMetadata metadata = getProducer(topicName).send(record).get(); } catch (Exception e) { throw new RuntimeException(e); } } public void close() { // 关闭所有 producer 连接 } }

上面就是所有的代码、逻辑实现方案。其实整体看下来是非常简单的。

6. 数据保障篇-框架的保障方案配置中心挂了怎么办?

为这个任务分配独立的队列资源,每当这个任务加载到最新配置时,都将配置在本地存储一份。当配置中心挂了的时候,还可以直接加载机器本地的配置,不至于什么都产出不了。

怎么保障用户的配置是无误的?上线前审批:有专门的埋点管理人员进行逻辑验证及管理上线前自动化测试:在埋点管理平台自动化验证逻辑正确性,保障上线到 flink 任务里的配置都是正确的AOP 异常处理、报警:在环境中做 AOP 异常处理,将异常数据 dump 到专用异常 topic 中,也需要自动化把报警信息透出结果验证:针对最终的结果需要有数据准确性验证机制7. 总结与展望篇7.1.总结

本文主要揭秘、实现了字节跳动埋点数据实时动态处理引擎。

7.2.展望本文主要实现了拆流的动态化,输出数据和输入数据完全相同,但是很多情况下,下游只需要其中的一些字段。因此之后还可以做到对于 sink message 字段、消息的个性化。比如可以加一个动态化的 Map 逻辑,将数据源中的 ClientLogSource 转化为任何用户想要的 Model。比如使用 Dynamic Message 或者使用代码生成去做。目前过滤条件完全是 java 语法,之后可以扩展成为 sql 语法,提高可读性函数、rpc 热加载


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

      专题文章
        CopyRight 2018-2019 实验室设备网 版权所有