map(data => ("uv", data.userId))..keyBy(_._1)
自定义MapFunction,随机自定义key+"uv"
Random.nextString(10) + "uv"
滚动输出最近1小时内的PV
窗口:1小时
指标:点击量
总流程:创建输入输出类--->执行环境--->transform转换--->各类窗口函数的调用
主Object:
1.创建执行环境,设置时间语义,并行度等
2.transform api map转换为输入样例类,并设置watermark
3.key 定义成常量"v",那么keyBy就分为同一组,如果并行则可以自定义mapFunction
4.实现trigger
5.实现processWindowFunction
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// 从文件中读取数据
val resource = getClass.getResource("/UserBehavior.csv")
val inputStream: DataStream[String] = env.readTextFile(resource.getPath)
// 转换成样例类类型并提取时间戳和watermark
val dataStream: DataStream[UserBehavior] = inputStream
.map(data => {
val arr = data.split(",")
UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
val uvStream = dataStream
.filter(_.behavior == "pv")
.map( data => ("uv", data.userId) ) //如果要并行,并行自定义mymapper
.keyBy(_._1)
.timeWindow(Time.hours(1)) //滚动窗口
.trigger(new MyTrigger()) //trigger触发器,每来一条数据直接清空窗口,放到redis计算。
.process( new UvCountWithBloom() )
class MyTrigger() extends Trigger[(String,Long),TimeWindow]{
override def onElement(t: (String, Long), l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
TriggerResult.FIRE_AND_PURGE
}
//系统时间有进展时做什么操作
override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
//watermark改变做什么操作
override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {
}
}
1.定义redis中存储位图的key ,本例为窗口结束时间
2.定义一个redis hash表,保存统计之后的每个窗口结束时间的uv count.
表名:uvcount
KEY: 窗口结束时间
VALUE:uv count值
3. 对userid进行hash, 从位图中查看hash后的偏移量是否窜在,若存在则uvcount不操作。若不存在则uvcount+1,位图也相应更新
class UvCountWithBloom() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow]{
// 定义redis连接以及布隆过滤器
lazy val jedis = new Jedis("localhost", 6379)
lazy val bloomFilter = new Bloom(1<<29) // 2的29次方,1左移29位。 位的个数:2^6(64) * 2^20(1M) * 2^3(8bit) ,64MB
// 本来是收集齐所有数据、窗口触发计算的时候才会调用;现在每来一条数据都调用一次
override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {
// 先定义redis中存储位图的key
val storedBitMapKey = context.window.getEnd.toString
//另外将当前窗口的uv count值,作为状态保存到redis里,用一个叫做uvcount的hash表来保存(windowEnd,count)
val uvCountMap = "uvcount"
val currentKey = context.window.getEnd.toString
var count = 0L
// 从redis中取出当前窗口的uv count值
if(jedis.hget(uvCountMap, currentKey) != null)
count = jedis.hget(uvCountMap, currentKey).toLong
// 去重:判断当前userId的hash值对应的位图位置,是否为0
val userId = elements.last._2.toString
// 计算hash值,就对应着位图中的偏移量
val offset = bloomFilter.hash(userId, 61)
val isExist = jedis.getbit(storedBitMapKey, offset)
if(!isExist){
// 如果不存在,那么位图对应位置置1,并且将count值加1
jedis.setbit(storedBitMapKey, offset, true)
jedis.hset(uvCountMap, currentKey, (count + 1).toString)
}
}
}
也可以调用外部google等现成的布隆过滤器.
设计布隆过滤器的要点:
1.选好点的hash函数
2.不同userid经过hash到同一位上。不要那么稠密。
即1亿的user,我们给出2亿的位,出现碰撞的概率就特别小。
10B * 1亿,大概1GB, 用位来存,1bit * 1亿 大概10m,放redis放内存都是个很好的 选择。
即使我们扩大位防止碰撞,放6亿,也是68M,可以放到redis中。有可能出现hash碰撞
class Bloom(size: Long) extends Serializable{
private val cap = size // 默认cap应该是2的整次幂
//hash函数 value即userid,seed随机数种子
def hash(value: String, seed: Int): Long = {
var result = 0
//遍历userid,对每一位进行随机数种子的处理
for( i <- 0 until value.length ){
result = result * seed + value.charAt(i)
}
// 返回hash值,要映射到cap范围内
(cap - 1) & result
}
}






我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden
是否有简单的方法来更改默认ISO格式(yyyy-mm-dd)的ActiveAdmin日期过滤器显示格式? 最佳答案 您可以像这样为日期选择器提供额外的选项,而不是覆盖js:=f.input:my_date,as::datepicker,datepicker_options:{dateFormat:"mm/dd/yy"} 关于ruby-on-rails-事件管理员日期过滤器日期格式自定义,我们在StackOverflow上找到一个类似的问题: https://s
华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o
C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.
MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO
遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg
我有一个名为Post的类,我需要能够适应以下场景:如果用户选择了一个类别,则只显示该类别的帖子如果用户选择了一种类型,则只显示该类型的帖子如果用户选择了一个类别和类型,则只显示该类别中该类型的帖子如果用户没有选择任何内容,则显示所有帖子我想知道我的Controller是否不可避免地会因大量条件语句而显得粗糙...这是我解决此问题的错误方法-有谁知道我如何才能做到这一点?classPostsController 最佳答案 您最好遵循“胖模型,瘦Controller”的惯例,这意味着您应该将这种逻辑放在模型本身中。Post类应该能够报告
通常,数组被实现为内存块,集合被实现为HashMap,有序集合被实现为跳跃列表。在Ruby中也是如此吗?我正在尝试从性能和内存占用方面评估Ruby中不同容器的使用情况 最佳答案 数组是Ruby核心库的一部分。每个Ruby实现都有自己的数组实现。Ruby语言规范只规定了Ruby数组的行为,并没有规定任何特定的实现策略。它甚至没有指定任何会强制或至少建议特定实现策略的性能约束。然而,大多数Rubyist对数组的性能特征有一些期望,这会迫使不符合它们的实现变得默默无闻,因为实际上没有人会使用它:插入、前置或追加以及删除元素的最坏情况步骤复
我正在我的Rails项目中安装Grape以构建RESTfulAPI。现在一些端点的操作需要身份验证,而另一些则不需要身份验证。例如,我有users端点,看起来像这样:moduleBackendmoduleV1classUsers现在如您所见,除了password/forget之外的所有操作都需要用户登录/验证。创建一个新的端点也没有意义,比如passwords并且只是删除password/forget从逻辑上讲,这个端点应该与用户资源。问题是Grapebefore过滤器没有像except,only这样的选项,我可以在其中说对某些操作应用过滤器。您通常如何干净利落地处理这种情况?
在ruby中,你可以这样做:classThingpublicdeff1puts"f1"endprivatedeff2puts"f2"endpublicdeff3puts"f3"endprivatedeff4puts"f4"endend现在f1和f3是公共(public)的,f2和f4是私有(private)的。内部发生了什么,允许您调用一个类方法,然后更改方法定义?我怎样才能实现相同的功能(表面上是创建我自己的java之类的注释)例如...classThingfundeff1puts"hey"endnotfundeff2puts"hey"endendfun和notfun将更改以下函数定