Flink史上最简单双十一实时分析案例 您所在的位置:网站首页 双十一案例双十一盲目消费的案例 Flink史上最简单双十一实时分析案例

Flink史上最简单双十一实时分析案例

2024-07-11 19:21| 来源: 网络整理| 查看: 265

import org.apache.flink.util.Collector;

import java.math.BigDecimal; import java.math.RoundingMode; import java.util.ArrayList; import java.util.Comparator; import java.util.Random;

/** * @author ChinaManor * #Description * * Desc今天我们就做一个最简单的模拟电商统计大屏的小例子, * * 需求如下: * * 1.实时计算出当天零点截止到当前时间的销售总额 * * 2.计算出各个分类的销售额最大的top3 * * 3.每秒钟更新一次统计结果 * #Date: 25/6/2021 08:28 */ public class T4 { public static void main(String[] args) throws Exception { //TODO 1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //TODO 2.source //订单数据Tuple2 DataStreamSource orderDS = env.addSource(new MySource()); //TODO 3.transformation //-1.每秒预聚合各个分类的销售总额:从当天0点开始截止到目前为止的各个分类的销售总额 SingleOutputStreamOperator aggregateResult = orderDS.keyBy(t -> t.f0) //注意:中国使用UTC+08:00,您需要一天大小的时间窗口, //窗口从当地时间的每00:00:00开始,您可以使用{@code of(time.days(1),time.hours(-8))} .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))) //注意:下面表示每秒触发计算 .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) //聚合(可以使用之前学习的简单聚合:sum/reduce/或自定义聚合:apply或使用aggregate聚合(可以指定如何聚合及如何收集聚合结果)) .aggregate(new MyAggregate(), new MyWindow()); //输出查看下预聚合的结果 // aggregateResult.print();

//按照分类将订单金额进行聚合: //分类名称 金额 时间 /\* //男装 100 2021-11-11 11:11:11

//女装 100 2021-11-11 11:11:11 //男装 200 2021-11-11 11:11:12 //女装 200 2021-11-11 11:11:12*/

//TODO 4.sink //-2.计算所有分类的销售总额和分类销售额最大Top3 //要求每秒更新/计算所有分类目前的销售总额和分类销售额Top3

// aggregateResult.keyBy(CategoryPojo::getDateTime) aggregateResult.keyBy(c -> c.getDateTime()) .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) //先按照时间对数据分组,因为后续要每秒更新/计算销售总额和分类销售额Top3 .process(new MyProcessWindowFunction());

//TODO 5.execute env.execute(); } /\*\*

* abstract class ProcessWindowFunction */ public static class MyProcessWindowFunction extends ProcessWindowFunction { @Override public void process(String key, Context context, Iterable categoryPojos, Collector out) throws Exception { Double totalAmount = 0d;//用来记录销售总额

//尝试使用外比较器进行排序 ArrayList list = new ArrayList(); for (CategoryPojo categoryPojo : categoryPojos) { //--1.计算截止到目前为止的所有分类的销售总额 totalAmount += categoryPojo.getTotalPrice(); //--2. 分类销售额最大的Top3 if (list.size()=3 CategoryPojo first = list.get(0); if (categoryPojo.getTotalPrice()>first.getTotalPrice()){ list.remove(first); list.add(categoryPojo); }//进来元素小就不用变 } } list.sort(new Comparator() { @Override public int compare(CategoryPojo o1, CategoryPojo o2) { return (int) (o1.getTotalPrice()-o2.getTotalPrice()); } }); //--3.直接在这里输出 System.out.println("================================================================================================================================"); System.out.println("----当前时间:----"); System.out.println(key); System.out.println("----销售总额:----"); System.out.println(new BigDecimal(totalAmount).setScale(2, RoundingMode.HALF_UP)); System.out.println("----销售额Top3分类:----"); list.stream() .map(c -> { c.setTotalPrice(new BigDecimal(c.getTotalPrice()).setScale(2, RoundingMode.HALF_UP).doubleValue()); return c; }) .sorted((c1, c2) -> c1.getTotalPrice() return 0d; }

//累加过程 @Override public Double add(Tuple2 value, Double accumulator) { return value.f1+accumulator; } //累加结果 @Override public Double getResult(Double accumulator) { return accumulator; } //合并结果 @Override public Double merge(Double a, Double b) { return a+b; } } /\*\*

// * interface WindowFunction // * 自定义窗口函数,实现窗口聚合数据的收集 // */ public static class MyWindow implements WindowFunction { private FastDateFormat df =FastDateFormat.getInstance(“yyyy-MM-dd HH:mm:ss”);

@Override public void apply(String key, TimeWindow window, Iterable input, Collector out) throws Exception { double totalPrice =0d; for (Double price : input) { totalPrice +=price; } CategoryPojo categoryPojo = new CategoryPojo(); categoryPojo.setCategory(key); categoryPojo.setDateTime(df.format(System.currentTimeMillis())); categoryPojo.setTotalPrice(totalPrice); out.collect(categoryPojo); } } /\*\*

* 用于存储聚合的结果 */ @Data @AllArgsConstructor @NoArgsConstructor public static class CategoryPojo { private String category;//分类名称 private double totalPrice;//该分类总销售额 private String dateTime;// 截止到当前时间的时间,本来应该是EventTime,但是我们这里简化了直接用当前系统时间即可 }

/\*\*

* 自定义数据源实时产生订单数据Tuple2 */ public static class MySource implements SourceFunction{ private boolean flag =true; private String[] categorys ={“男装”,“女装”,“童装”, “洗护”}; private Random random =new Random(); @Override public void run(SourceContext ctx) throws Exception { while (flag){ //随机生成分类和金额 int index = random.nextInt(categorys.length); String category = categorys[index];//随机分类 double price = random.nextDouble() * 100; //注意生成[0,100) ctx.collect(Tuple2.of(category,price)); Thread.sleep(20); } }

@Override public void cancel() { flag =false; } }

}

### 兄弟萌,我考完试了 这是考试的需求,多了从Kafka读取需求:

1、从kafka读取到数据给5分 2、数据简单处理切分给5分 3、给出合适的数据类型给5分 4、销售总额和分类的订单额数据要精确到小数点后两位5分 5、设置合理的窗口和触发情况给10分 6、实现销售总额正确输出,每秒钟更新一次 30分 7、实现各分类的订单额降序输出,每秒钟更新一次 30分 8、是否按照要求写注释 5分 9、代码整洁度、健壮度 5分

这是参考答案: Flink几个函数这块,我还需要加强~

package demo;

import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector;

import java.math.BigDecimal; import java.text.SimpleDateFormat; import java.util.*;

public class KafkaToFlink { public static void main(String[] args) throws Exception { //TODO 0.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); //TODO 1.source //准备kafka连接参数 Properties props = new Properties(); props.setProperty(“bootstrap.servers”, “node1.itcast.cn:9092”);//集群地址 props.setProperty(“group.id”, “flink”);//消费者组id props.setProperty(“auto.offset.reset”,“latest”);//latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费 /earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费 props.setProperty(“flink.partition-discovery.interval-millis”,“5000”);//会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测 props.setProperty(“enable.auto.commit”, “true”);//自动提交(提交到默认主题,后续学习了Checkpoint后随着Checkpoint存储在Checkpoint和默认主题中) props.setProperty(“auto.commit.interval.ms”, “2000”);//自动提交的时间间隔 //使用连接参数创建FlinkKafkaConsumer/kafkaSource FlinkKafkaConsumer kafkaSource = new FlinkKafkaConsumer(“test”, new SimpleStringSchema(), props); //使用kafkaSource DataStream kafkaDS = env.addSource(kafkaSource); DataStream sourceKafka = kafkaDS.map(new MapFunction() { @Override public Tuple2 map(String value) throws Exception { String[] lines = value.split(“:”); return Tuple2.of(lines[0], Double.valueOf(lines[1])); } });

//todo 3.transformation //3.1定义大小为一天的窗口,第二个参数表示中国使用的UTC+08:00时区比UTC时间早 //3.2定义一个1s的触发器 //3.3聚合结果 DataStream tempAggResult = sourceKafka.keyBy(t -> t.f0) .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) .aggregate(new TestAggregate(), new TestWindowResult()); //todo 4.使用上面聚合的结果,实现业务需求: //4.1.实时计算出当天零点截止到当前时间的销售总额 //4.2.计算出各个分类的销售topN //4.3.每秒钟更新一次统计结果 tempAggResult.keyBy(CategoryPojo::getDateTime) .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) .process(new TestProcessWindowFunction()); //todo 5.execute env.execute(); } //public abstract class ProcessWindowFunction private static class TestProcessWindowFunction extends ProcessWindowFunction { @Override public void process(String datetime, Context context, Iterable elements, Collector out) throws Exception { double totalPrice = 0D; double roundPrice = 0D; Map map = new TreeMap(); for (CategoryPojo element : elements) { //4.1.实时计算出当天零点截止到当前时间的销售总额 totalPrice += element.totalPrice; BigDecimal bigDecimal = new BigDecimal(totalPrice); roundPrice = bigDecimal.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();//四舍五入 // 4.2.计算出各个分类的销售topN map.put(element.category,element.totalPrice); } ArrayListlist= new ArrayList(map.entrySet()); Collections.sort(list, new Comparator() { @Override public int compare(Map.Entry o1, Map.Entry o2) { return o2.getValue().compareTo(o1.getValue()); } }); System.out.println("时间 : " + datetime + " 总价 : " + roundPrice + "\ntopN: "); for (int i = 0; i public static void main(String[] args) { //1、准备配置文件 Properties props = new Properties(); props.put(“bootstrap.servers”, “node1.itcast.cn:9092”); props.put(“acks”, “all”); props.put(“retries”, 0); props.put(“batch.size”, 16384); props.put(“linger.ms”, 1); props.put(“buffer.memory”, 33554432); props.put(“KafkaCustomPartitioner.class”, “test.KafkaCustomPartitioner”); props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”); props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”); //2、创建KafkaProducer KafkaProducer kafkaProducer = new KafkaProducer(props); String[] categorys = {“女装”, “男装”, “图书”, “家电”, “洗护”, “美妆”, “运动”, “游戏”, “户外”, “家具”, “乐器”, “办公”}; Random random = new Random(); while (true){ //随机生成分类和金额 int index = random.nextInt(categorys.length);//[0~length) ==> [0~length-1] String category = categorys[index];//获取的随机分类 double price = random.nextDouble() * 100;//注意nextDouble生成的是[01)之间的随机数,*100之后表示[0100) kafkaProducer.send(new ProducerRecord(“categories”,category+“:”+price)); //3、发送数据 System.out.println(category+“:”+price); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }

### 总结 ​ 最典型的案例便是淘宝双十一活动,每年双十一购物节,除疯狂购物外,最引人注目的就是`双十一大屏不停跳跃的成交总额`。在整个计算链路中包括从天猫交易下单购买到数据采集,数据计算,数据校验,最终落到双十一大屏上展示的全链路时间压缩在5秒以内,顶峰计算性能高达数三十万笔订单/秒,通过多条链路流计算备份确保万无一失。 以上便是大数据Flink史上最简单双十一实时分析案例喜欢的小伙伴欢迎`一键三连`!!! 感谢李胜步博主提供的思路: [原文链接](https://bbs.csdn.net/topics/618545628) ![在这里插入图片描述](https://img-blog.csdnimg.cn/cover1/247466910341660722.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,image_MjAyMDA3MTUxNjIxMDEzOC5wbmc=,size_16,color_FFFFFF,t_70,image/resize,m_lfit,w_962#pic_center) ![img](https://img-blog.csdnimg.cn/img_convert/6a6af6ca4fb4c3bbd5418fa6464b88e0.png) ![img](https://img-blog.csdnimg.cn/img_convert/652cc3377dae10cba5278fa06d8f22aa.png) ![img](https://img-blog.csdnimg.cn/img_convert/8f2822c69b3dfc7dc913a11c5431b38b.png) **既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!** **由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新** **[需要这份系统化资料的朋友,可以戳这里获取](https://bbs.csdn.net/topics/618545628)** aGVpdGk,shadow_10,image_MjAyMDA3MTUxNjIxMDEzOC5wbmc=,size_16,color_FFFFFF,t_70,image/resize,m_lfit,w_962#pic_center) [外链图片转存中...(img-OWEBCQmJ-1714150932526)] [外链图片转存中...(img-tvkPbBmj-1714150932526)] [外链图片转存中...(img-IIl45YU6-1714150932527)] **既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!** **由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新** **[需要这份系统化资料的朋友,可以戳这里获取](https://bbs.csdn.net/topics/618545628)**


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有