草庐IT

flink - 实时 - UV统计 - 布隆过滤器实现

坨坨的大数据 2023-03-28 原文

1.知识点

  • scala输入输出样例类
  • keyBy并行度为1计算UV的技巧

map(data => ("uv", data.userId))..keyBy(_._1)

  • keyBy并行度>1 计算UV的技巧

自定义MapFunction,随机自定义key+"uv"

Random.nextString(10) + "uv"

  • WindowedStream.trigger的使用
    trigger触发器,每来一条数据直接清空窗口,放到redis进行计算
  • trigger返回WindowedStream,继续调用process(ProcessWindowFunction)
  • WindowedStream.process()的使用
    windowStream调用接口
  • 布隆过滤器的实现

2.业务目标

滚动输出最近1小时内的PV

窗口:1小时

指标:点击量

3.流程心法

总流程:创建输入输出类--->执行环境--->transform转换--->各类窗口函数的调用

主Object:

1.创建执行环境,设置时间语义,并行度等

2.transform api map转换为输入样例类,并设置watermark

3.key 定义成常量"v",那么keyBy就分为同一组,如果并行则可以自定义mapFunction

4.实现trigger

5.实现processWindowFunction

4.模块详解

4.1 创建输入输出样例类

4.2 主object实现

4.2.1 创建执行环境并添加数据源

val env = StreamExecutionEnvironment.getExecutionEnvironment    
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)    
  env.setParallelism(1)     
// 从文件中读取数据    
val resource = getClass.getResource("/UserBehavior.csv")    
val inputStream: DataStream[String] = env.readTextFile(resource.getPath)

4.2.2 Datastream map转换为输入样例类

 // 转换成样例类类型并提取时间戳和watermark    
val dataStream: DataStream[UserBehavior] = inputStream      
  .map(data => {        
    val arr = data.split(",")        
    UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)      
  })      
  .assignAscendingTimestamps(_.timestamp * 1000L)

4.2.3 处理逻辑(1)----filter类型,timeWindow

val uvStream = dataStream      
    .filter(_.behavior == "pv")      
    .map( data => ("uv", data.userId) ) //如果要并行,并行自定义mymapper      
    .keyBy(_._1)      
    .timeWindow(Time.hours(1))  //滚动窗口      
    .trigger(new MyTrigger())  //trigger触发器,每来一条数据直接清空窗口,放到redis计算。      
    .process( new UvCountWithBloom() )

4.2.4 处理逻辑(2)----Trigger实现

class MyTrigger() extends Trigger[(String,Long),TimeWindow]{  
  override def onElement(t: (String, Long), l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {     
    TriggerResult.FIRE_AND_PURGE   
  }   
  //系统时间有进展时做什么操作  
  override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE   

  //watermark改变做什么操作  
  override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE   

  override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {   
  }
}

4.2.5 处理逻辑(2)----ProcessWindowFunction实现

1.定义redis中存储位图的key ,本例为窗口结束时间

2.定义一个redis hash表,保存统计之后的每个窗口结束时间的uv count.

表名:uvcount

KEY: 窗口结束时间

VALUE:uv count值

3. 对userid进行hash, 从位图中查看hash后的偏移量是否窜在,若存在则uvcount不操作。若不存在则uvcount+1,位图也相应更新

class UvCountWithBloom() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow]{  
  // 定义redis连接以及布隆过滤器  
  lazy val jedis = new Jedis("localhost", 6379)  
  lazy val bloomFilter = new Bloom(1<<29)    // 2的29次方,1左移29位。 位的个数:2^6(64) * 2^20(1M) * 2^3(8bit) ,64MB   

  // 本来是收集齐所有数据、窗口触发计算的时候才会调用;现在每来一条数据都调用一次  
  override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {    
    // 先定义redis中存储位图的key    
    val storedBitMapKey = context.window.getEnd.toString      
    
    //另外将当前窗口的uv count值,作为状态保存到redis里,用一个叫做uvcount的hash表来保存(windowEnd,count)    
    val uvCountMap = "uvcount"    
    val currentKey = context.window.getEnd.toString    
    var count = 0L     

    // 从redis中取出当前窗口的uv count值    
    if(jedis.hget(uvCountMap, currentKey) != null)      
      count = jedis.hget(uvCountMap, currentKey).toLong     
    
    // 去重:判断当前userId的hash值对应的位图位置,是否为0    
    val userId = elements.last._2.toString    
    // 计算hash值,就对应着位图中的偏移量    
    val offset = bloomFilter.hash(userId, 61)    
    val isExist = jedis.getbit(storedBitMapKey, offset)     

    if(!isExist){      
      // 如果不存在,那么位图对应位置置1,并且将count值加1            
      jedis.setbit(storedBitMapKey, offset, true)      
      jedis.hset(uvCountMap, currentKey, (count + 1).toString)    
    }  
  }
}

4.2.6 处理逻辑(3)----布隆过滤器实现

也可以调用外部google等现成的布隆过滤器.

设计布隆过滤器的要点:
1.选好点的hash函数
2.不同userid经过hash到同一位上。不要那么稠密。
即1亿的user,我们给出2亿的位,出现碰撞的概率就特别小。
10B * 1亿,大概1GB, 用位来存,1bit * 1亿 大概10m,放redis放内存都是个很好的 选择。
即使我们扩大位防止碰撞,放6亿,也是68M,可以放到redis中。有可能出现hash碰撞

class Bloom(size: Long) extends Serializable{  
  private val cap = size    // 默认cap应该是2的整次幂   

  //hash函数 value即userid,seed随机数种子  
  def hash(value: String, seed: Int): Long = {    
    var result = 0    
    //遍历userid,对每一位进行随机数种子的处理    
    for( i <- 0 until value.length ){      
      result = result * seed + value.charAt(i)    
    }     
    // 返回hash值,要映射到cap范围内    
    (cap - 1) & result  
  }
}

4.3 完整代码






有关flink - 实时 - UV统计 - 布隆过滤器实现的更多相关文章

  1. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  2. ruby-on-rails - 事件管理员日期过滤器日期格式自定义 - 2

    是否有简单的方法来更改默认ISO格式(yyyy-mm-dd)的ActiveAdmin日期过滤器显示格式? 最佳答案 您可以像这样为日期选择器提供额外的选项,而不是覆盖js:=f.input:my_date,as::datepicker,datepicker_options:{dateFormat:"mm/dd/yy"} 关于ruby-on-rails-事件管理员日期过滤器日期格式自定义,我们在StackOverflow上找到一个类似的问题: https://s

  3. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  4. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  5. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

  6. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

  7. ruby-on-rails - 在 Controller 中干净地处理多个过滤器(参数) - 2

    我有一个名为Post的类,我需要能够适应以下场景:如果用户选择了一个类别,则只显示该类别的帖子如果用户选择了一种类型,则只显示该类型的帖子如果用户选择了一个类别和类型,则只显示该类别中该类型的帖子如果用户没有选择任何内容,则显示所有帖子我想知道我的Controller是否不可避免地会因大量条件语句而显得粗糙...这是我解决此问题的错误方法-有谁知道我如何才能做到这一点?classPostsController 最佳答案 您最好遵循“胖模型,瘦Controller”的惯例,这意味着您应该将这种逻辑放在模型本身中。Post类应该能够报告

  8. ruby - Arrays Sets 和 SortedSets 在 Ruby 中是如何实现的 - 2

    通常,数组被实现为内存块,集合被实现为HashMap,有序集合被实现为跳跃列表。在Ruby中也是如此吗?我正在尝试从性能和内存占用方面评估Ruby中不同容器的使用情况 最佳答案 数组是Ruby核心库的一部分。每个Ruby实现都有自己的数组实现。Ruby语言规范只规定了Ruby数组的行为,并没有规定任何特定的实现策略。它甚至没有指定任何会强制或至少建议特定实现策略的性能约束。然而,大多数Rubyist对数组的性能特征有一些期望,这会迫使不符合它们的实现变得默默无闻,因为实际上没有人会使用它:插入、前置或追加以及删除元素的最坏情况步骤复

  9. ruby-on-rails - 如何处理 Grape 中特定操作的过滤器之前? - 2

    我正在我的Rails项目中安装Grape以构建RESTfulAPI。现在一些端点的操作需要身份验证,而另一些则不需要身份验证。例如,我有users端点,看起来像这样:moduleBackendmoduleV1classUsers现在如您所见,除了password/forget之外的所有操作都需要用户登录/验证。创建一个新的端点也没有意义,比如passwords并且只是删除password/forget从逻辑上讲,这个端点应该与用户资源。问题是Grapebefore过滤器没有像except,only这样的选项,我可以在其中说对某些操作应用过滤器。您通常如何干净利落地处理这种情况?

  10. ruby - "public/protected/private"方法是如何实现的,我该如何模拟它? - 2

    在ruby中,你可以这样做:classThingpublicdeff1puts"f1"endprivatedeff2puts"f2"endpublicdeff3puts"f3"endprivatedeff4puts"f4"endend现在f1和f3是公共(public)的,f2和f4是私有(private)的。内部发生了什么,允许您调用一个类方法,然后更改方法定义?我怎样才能实现相同的功能(表面上是创建我自己的java之类的注释)例如...classThingfundeff1puts"hey"endnotfundeff2puts"hey"endendfun和notfun将更改以下函数定

随机推荐