将数据保存到状态中,进行累计
select
window_start,
window_end,
count(distinct devId) as cnt
from table (tumble(table source_table,descriptor(rt),interval '60' minute )) --滚动窗口
group by window_start,window_end;
select
window_start,
window_end,
hllDistinct(distinct devId) as cnt
from table (tumble(table source_table,descriptor(rt),interval '60' minute )) --滚动窗口
group by window_start,window_end;
当rownum<=1时,flink采用的是Deduplication方式进行去重。该方式有两种去重方案:有保留第一条(Deduplicate Keep FirstRow)和保留最后一条(Deduplicate Keep LastRow)2种。
Deduplicate Keep FirstRow
保留首行的去重策略:保留KEY下第一条出现的数据,之后出现该KEY下的数据会被丢弃掉。因为STATE中只存储了KEY数据,所以性能较优。
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
FROM T
)
WHERE rowNum = 1
Deduplicate Keep LastRow
保留末行的去重策略:保留KEY下最后一条出现的数据。因此过程中会产生变更的记录,会下下游发送变更的消息。因此,sink表需要支持update操作。
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
FROM T
)
WHERE rowNum = 1
package com.yyds.flink_distinct;
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* 方便起见使用输出类型使用Void,这里直接使用打印控制台方式查看结果,在实际中可输出到下游做一个批量的处理然后输出
*/
public class _01_DistinctProcessFunction extends KeyedProcessFunction<_01_AdKey,_01_AdvertiseMentData,Void> {
// 定义第一个状态MapState
MapState<String,Integer> deviceIdState ;
// 定义第二个状态ValueState
ValueState<Long> countState ;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, Integer> deviceIdStateDescriptor = new MapStateDescriptor<>("deviceIdState", String.class, Integer.class);
/*
MapState,key表示devId, value表示一个随意的值只是为了标识,该状态表示一个广告位在某个小时的设备数据,
如果我们使用rocksdb作为statebackend, 那么会将mapstate中key作为rocksdb中key的一部分,
mapstate中value作为rocksdb中的value, rocksdb中value大小是有上限的,这种方式可以减少rocksdb value的大小;
*/
deviceIdState = getRuntimeContext().getMapState(deviceIdStateDescriptor);
ValueStateDescriptor<Long> countStateDescriptor = new ValueStateDescriptor<>("countState", Long.class);
/*
ValueState,存储当前MapState的数据量,是由于mapstate只能通过迭代方式获得数据量大小,每次获取都需要进行迭代,这种方式可以避免每次迭代。
*/
countState = getRuntimeContext().getState(countStateDescriptor);
}
@Override
public void processElement(_01_AdvertiseMentData data, Context context, Collector<Void> collector) throws Exception {
// 主要考虑可能会存在滞后的数据比较严重,会影响之前的计算结果
long currw = context.timerService().currentWatermark();
if(context.getCurrentKey().getTime() + 1 <= currw){
System.out.println("迟到的数据:" + data);
return;
}
String devId = data.getDevId();
Integer i = deviceIdState.get(devId);
if(i == null){
i = 0;
}
if( i == 1 ){
// 表示已经存在
}else {
// 表示不存在,放入到状态中
deviceIdState.put(devId,1);
// 将统计的数据 + 1
Long count = countState.value();
if(count == null){
count = 0L;
}
count ++;
countState.update(count);
// 注册一个定时器,定期清理状态中的数据
context.timerService().registerEventTimeTimer(context.getCurrentKey().getTime() + 1);
}
System.out.println("countState.value() = " + countState.value());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Void> out) throws Exception {
System.out.println(timestamp + " exec clean~~~");
System.out.println("countState.value() = " + countState.value());
// 清除状态
deviceIdState.clear();
countState.clear();
}
}
在MRIRuby中我可以这样做:deftransferinternal_server=self.init_serverpid=forkdointernal_server.runend#Maketheserverprocessrunindependently.Process.detach(pid)internal_client=self.init_client#Dootherstuffwithconnectingtointernal_server...internal_client.post('somedata')ensure#KillserverProcess.kill('KILL',
我有一个应用程序正在从Ruby迁移到JRuby(由于需要通过Java提供更好的Web服务安全支持)。我使用的gem之一是daemons创建后台作业。问题在于它使用fork+exec来创建后台进程,但这对JRuby来说是禁忌。那么-是否有用于创建后台作业的替代gem/wrapper?我目前的想法是只从shell脚本调用rake并让rake任务永远运行......提前致谢,克里斯。更新我们目前正在使用几个与Java线程相关的包装器,即https://github.com/jmettraux/rufus-scheduler和https://github.com/philostler/acts
我需要在rail3中使用标准注册/登录/忘记密码功能进行身份验证。是否有大多数人为此使用的插件或其他东西? 最佳答案 我不确定最常用的方法是什么-但可以肯定的是,Plataformatec的“Devise”是一个非常流行的方法:http://github.com/plataformatec/devise我已经尝试了一些authgem,对我来说,它是最简单的设置和修改以满足我的需要。它内置了密码恢复、帐户确认(如果需要)和其他一些非常方便的功能。 关于ruby-on-rails-在Rail
我在ruby表单中有一个提交按钮f.submitbtn_text,class:"btnbtn-onemgt12mgb12",id:"btn_id"我想在不使用任何javascript的情况下通过ruby禁用此按钮 最佳答案 添加disabled:true选项。f.submitbtn_text,class:"btnbtn-onemgt12mgb12",id:"btn_id",disabled:true 关于ruby-on-rails-如何在Rails中添加禁用的提交按钮,我们在St
关闭。这个问题是off-topic.它目前不接受答案。想改进这个问题吗?Updatethequestion所以它是on-topic用于堆栈溢出。关闭11年前。Improvethisquestion我不经常使用ruby-通常它加起来相当于每两个月或更长时间编写一次脚本。我的大部分编程都是使用C++进行的,这与ruby有很大不同。由于我与ruby之间的差距如此之大,我总是忘记语言的基本方面(比如解析文本文件和其他简单的东西)。我想每天练习一些基本的东西,我想知道是否有一些我可以订阅的网站,并且会向我发送当天的Ruby问题或类似的东西。有人知道这样的站点/Internet服务吗?
关闭。这个问题不符合StackOverflowguidelines.它目前不接受答案。要求我们推荐或查找工具、库或最喜欢的场外资源的问题对于StackOverflow来说是偏离主题的,因为它们往往会吸引自以为是的答案和垃圾邮件。相反,describetheproblem以及迄今为止为解决该问题所做的工作。关闭9年前。Improvethisquestion我想知道是否有人知道Ruby的rubyzip替代品,它可以处理各种格式,特别是zip/rar/7z?我知道libarchive,但它对我的目的来说并不完整(它是一个很好的gem)。(澄清一下,libarchive-对我不起作用-因为
我爱Sanitize.这是一个了不起的实用程序。我遇到的唯一问题是,它需要永远准备一个开发环境,因为它使用Nokogiri,这对编译时间来说是一种痛苦。是否有任何程序可以在不使用Nokogiri的情况下执行Sanitize的操作(如果没有别的,只是温和地执行它的操作)?这将以指数方式提供帮助! 最佳答案 Rails有自己的SanitizeHelper。根据http://api.rubyonrails.org/classes/ActionView/Helpers/SanitizeHelper.html,它将Thissanitizehe
我尝试每天在我的Rails应用程序中自动记录一些数据。我想知道是否有人知道一个好的解决方案?我找到了https://github.com/javan/whenever,但我想确保在选择之前了解所有选项。谢谢!艾略特 最佳答案 我真的很喜欢whenever-这是一个很棒的Gem,我已经在生产中使用了它。关于它还有一个很好的Railscasts插曲:http://railscasts.com/episodes/164-cron-in-ruby 关于ruby-on-rails-rails3中c
集成背景我们当前集群使用的是ClouderaCDP,Flink版本为ClouderaVersion1.14,整体Flink安装目录以及配置文件结构与社区版本有较大出入。直接根据Streampark官方文档进行部署,将无法配置FlinkHome,以及后续整体Flink任务提交到集群中,因此需要进行针对化适配集成,在满足使用需求上,尽量提供完整的Streampark使用体验。集成步骤版本匹配问题解决首先解决无法识别Cloudera中的FlinkHome问题,根据报错主要明确到的事情是无法读取到Flink版本、lib下面的jar包名称无法匹配。修改对象:修改源码:(解决无法匹配clouderajar
谁能告诉我如何在ruby中获取可用磁盘驱动器的列表?我正在创建一个打开的文件对话并且需要知道!提前致谢,嗯。 最佳答案 Brian给出的文章正确地陈述了以下代码:require'win32ole'file_system=WIN32OLE.new("Scripting.FileSystemObject")drives=file_system.Drivesdrives.eachdo|drive|puts"Availablespace:#{drive.AvailableSpace}"puts"Driveletter:#{drive.D