最佳实践 您所在的位置:网站首页 什么叫轻型病例病例 最佳实践

最佳实践

#最佳实践| 来源: 网络整理| 查看: 265

在SQL任务中使用Flink CEP在SQL任务中使用Flink CEP

复合事件处理(Complex Event Processing,CEP)是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。Flink 提供了专门的Flink CEP 库,它包含如下组件:Event Stream、Pattern定义、Pattern检测和生成Alert。

Flink CEP 的特征如下

目标:从有序的简单事件流中发现一些高阶特征; 输入:一个或多个简单事件构成的事件流; 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件; 输出:满足规则的复杂事件。

场景 CEP 可用于① 输入的流数据,尽快产生结果;② 在2个事件流上,基于时间进行聚合类的计算;③ 提供实时/准实时的警告和通知;④ 在多样的数据源中产生关联分析模式;⑤ 高吞吐、低延迟的处理

详细请参考Flink CEP需求: 检测一个用户在3秒内连续登陆失败。

测试数据

{"userId":5402,"ip":"83.149.11.115","eventType":"fail","eventTime":1558430815000} {"userId":5402,"ip":"83.149.11.115","eventType":"fail","eventTime":1558430817000} {"userId":23064,"ip":"66.249.3.15","eventType":"fail","eventTime":1558430826000} {"userId":5692,"ip":"80.149.25.29","eventType":"fail","eventTime":1558430833000} {"userId":7233,"ip":"86.226.15.75","eventType":"fail","eventTime":1558430832000} {"userId":5692,"ip":"80.149.25.29","eventType":"fail","eventTime":1558430840000} {"userId":29607,"ip":"66.249.73.135","eventType":"fail","eventTime":1558430849000} {"userId":29607,"ip":"66.249.73.135","eventType":"success","eventTime":1558430859000}完整sql: create table appLog( userId bigint, `ip` varchar, eventType varchar, eventTime bigint, ts as PROCTIME() )with( 'connector' = 'kafka', 'topic' = 'app_log', 'properties.bootstrap.servers' = 'sloth-test1.dg.163.org:9092', 'properties.group.id' = 'logs_moudle', 'scan.startup.mode' = 'latest-offset', 'parallelism' = '1', 'format' = 'json' ); create table warning_sink( userId bigint, firstFailTime timestamp, lastFailTime timestamp, warningMsg varchar )with( 'connector' = 'print' ); insert into warning_sink select userId, firstFailTime, lastFailTime, '连续3s 内登陆失败' as warningMsg from appLog match_recognize( partition by userId order by ts MEASURES FIRST(A.ts) AS firstFailTime, LAST(A.ts) as lastFailTime one row per match AFTER MATCH SKIP TO LAST PATTERN(A+?) WITHIN INTERVAL '3' SECOND DEFINE A as A.eventType = 'fail' ); -- PATTERN 子句指定了 该需求对以下模式感兴趣:具有开始事件start_row,然后是一个或多个登陆失败typeFail , 并以typeSuccess结束。 如 AFTER MATCH SKIP TO LAST 子句所示,则从最后一个 typeSuccess 事件开始寻找下一个模式匹配 数据结果: +I(5402,2021-12-09T11:35:22.242,2021-12-09T11:35:22.242,连续3s 内登陆失败) +I(5402,2021-12-09T11:35:22.244,2021-12-09T11:35:22.244,连续3s 内登陆失败) +I(23064,2021-12-09T11:35:22.245,2021-12-09T11:35:22.245,连续3s 内登陆失败) +I(5692,2021-12-09T11:35:22.245,2021-12-09T11:35:22.245,连续3s 内登陆失败) +I(7233,2021-12-09T11:35:22.246,2021-12-09T11:35:22.246,连续3s 内登陆失败) +I(5692,2021-12-09T11:35:22.246,2021-12-09T11:35:22.246,连续3s 内登陆失败) +I(29607,2021-12-09T11:35:22.246,2021-12-09T11:35:22.246,连续3s 内登陆失败)


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有