Flink+HyperLogLog实现海量实时去重计数 您所在的位置:网站首页 hyperloglog误差 Flink+HyperLogLog实现海量实时去重计数

Flink+HyperLogLog实现海量实时去重计数

2024-06-28 13:55| 来源: 网络整理| 查看: 265

今天忙到飞起(到现在还没完),写一篇超短的小技巧吧。

HyperLogLog是去重计数的利器,能够以很小的精确度误差作为trade-off大幅减少内存空间占用,在不要求100%准确的计数场景极为常用。关于它的数学原理,看官可参见之前写过的《再谈基数估计之HyperLogLog算法》,不再赘述了。

在用Flink做实时计算的过程中,也短不了做去重计数,比如统计UV。我们当然可以直接借助Redis的HyperLogLog实现,但是要在Flink job内直接整合HyperLogLog该怎么做呢?

先引入如下Maven依赖项:

net.agkn hll 1.6.0 compile

下面的聚合函数即可实现从WindowedStream按天、分键统计PV和UV。

WindowedStream windowedStream = watermarkedStream .keyBy("siteId") .window(TumblingEventTimeWindows.of(Time.days(1))) .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10))); windowedStream.aggregate(new AggregateFunction() { private static final long serialVersionUID = 1L; @Override public Tuple2 createAccumulator() { return new Tuple2(0L, new HLL(14, 6)); } @Override public Tuple2 add(AnalyticsAccessLogRecord record, Tuple2 acc) { acc.f0++; acc.f1.addRaw(record.getUserId()); return acc; } @Override public Tuple2 getResult(Tuple2 acc) { return new Tuple2(acc.f0, acc.f1.cardinality()); } @Override public Tuple2 merge(Tuple2 acc1, Tuple2 acc2) { acc1.f0 += acc2.f0; acc1.f1.union(acc2.f1); return acc1; } });

上述开源HyperLogLog组件的主要方法简述如下:

HLL(int log2m, int regwidth) 创建一个HyperLogLog对象。log2m即总分桶数目以2为底的对数,regwidth则是真正用来做基数估计的比特的下标值宽度。根据Redis的思路,log2m=14,regwidth=6,即可以仅用最多12kB内存,以0.81%的误差计算接近264的基数。

void addRaw(long rawValue) 向HyperLogLog中插入元素。如果插入的元素非数值型的,则需要hash过后(推荐用Murmur3等比较快的哈希算法)再插入。

long cardinality() 返回该HyperLogLog中元素的基数。

void union(HLL other) 将两个HyperLogLog结构合并为一个。

该HyperLogLog组件如同Redis一样实现了稀疏存储与密集存储两种方式,以进一步减少内存占用量。其源码不难理解,看官可以自行参看。

最后,如果一定追求100%准确,该怎么办呢?普通的位图法显然不合适,应该采用压缩位图,如笔者之前提到过的RoaringBitmap。

继续忙去了。民那好梦。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有