草庐IT

Flink中常用的去重方案

大大大大肉包 2023-07-11 原文

Flink Sql去重方案

1、状态去重

将数据保存到状态中,进行累计

select
    window_start, 
    window_end, 
    count(distinct devId) as cnt 
from table (tumble(table source_table,descriptor(rt),interval '60' minute )) --滚动窗口  
group by window_start,window_end;

2、利用HyperLogLog进行去重

select
    window_start, 
    window_end, 
    hllDistinct(distinct devId) as cnt 
from table (tumble(table source_table,descriptor(rt),interval '60' minute )) --滚动窗口  
group by window_start,window_end;

3、Deduplication方式

当rownum<=1时,flink采用的是Deduplication方式进行去重。该方式有两种去重方案:有保留第一条(Deduplicate Keep FirstRow)和保留最后一条(Deduplicate Keep LastRow)2种。

Deduplicate Keep FirstRow

保留首行的去重策略:保留KEY下第一条出现的数据,之后出现该KEY下的数据会被丢弃掉。因为STATE中只存储了KEY数据,所以性能较优。

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
  FROM T
)
WHERE rowNum = 1

Deduplicate Keep LastRow

保留末行的去重策略:保留KEY下最后一条出现的数据。因此过程中会产生变更的记录,会下下游发送变更的消息。因此,sink表需要支持update操作。

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
  FROM T
)
WHERE rowNum = 1

Flink 程序去重方案

package com.yyds.flink_distinct;

import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * 方便起见使用输出类型使用Void,这里直接使用打印控制台方式查看结果,在实际中可输出到下游做一个批量的处理然后输出
 */
public class _01_DistinctProcessFunction extends KeyedProcessFunction<_01_AdKey,_01_AdvertiseMentData,Void> {


    // 定义第一个状态MapState
    MapState<String,Integer> deviceIdState ;
    // 定义第二个状态ValueState
    ValueState<Long> countState ;

    @Override
    public void open(Configuration parameters) throws Exception {
        MapStateDescriptor<String, Integer> deviceIdStateDescriptor = new MapStateDescriptor<>("deviceIdState", String.class, Integer.class);
        /*
          MapState,key表示devId, value表示一个随意的值只是为了标识,该状态表示一个广告位在某个小时的设备数据,
            如果我们使用rocksdb作为statebackend, 那么会将mapstate中key作为rocksdb中key的一部分,
            mapstate中value作为rocksdb中的value, rocksdb中value大小是有上限的,这种方式可以减少rocksdb value的大小;
         */
        deviceIdState = getRuntimeContext().getMapState(deviceIdStateDescriptor);

        ValueStateDescriptor<Long> countStateDescriptor = new ValueStateDescriptor<>("countState", Long.class);
        /*
          ValueState,存储当前MapState的数据量,是由于mapstate只能通过迭代方式获得数据量大小,每次获取都需要进行迭代,这种方式可以避免每次迭代。
         */
        countState = getRuntimeContext().getState(countStateDescriptor);

    }

    @Override
    public void processElement(_01_AdvertiseMentData data, Context context, Collector<Void> collector) throws Exception {
        // 主要考虑可能会存在滞后的数据比较严重,会影响之前的计算结果
        long currw = context.timerService().currentWatermark();
        if(context.getCurrentKey().getTime() + 1 <= currw){
            System.out.println("迟到的数据:" + data);
            return;
        }

        String devId = data.getDevId();
        Integer i = deviceIdState.get(devId);
        if(i == null){
            i = 0;
        }

        if(  i == 1  ){
            // 表示已经存在
        }else {
            // 表示不存在,放入到状态中
            deviceIdState.put(devId,1);
            // 将统计的数据 + 1
            Long count = countState.value();

            if(count == null){
                count = 0L;
            }
            count ++;
            countState.update(count);
            // 注册一个定时器,定期清理状态中的数据
            context.timerService().registerEventTimeTimer(context.getCurrentKey().getTime() + 1);
        }

        System.out.println("countState.value() = " + countState.value());
    }


    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Void> out) throws Exception {
        System.out.println(timestamp + " exec clean~~~");
        System.out.println("countState.value() = " + countState.value());
        // 清除状态
        deviceIdState.clear();
        countState.clear();
    }
}

有关Flink中常用的去重方案的更多相关文章

  1. ruby - 在 jRuby 中使用 'fork' 生成进程的替代方案? - 2

    在MRIRuby中我可以这样做:deftransferinternal_server=self.init_serverpid=forkdointernal_server.runend#Maketheserverprocessrunindependently.Process.detach(pid)internal_client=self.init_client#Dootherstuffwithconnectingtointernal_server...internal_client.post('somedata')ensure#KillserverProcess.kill('KILL',

  2. Ruby 守护进程和 JRuby - 备选方案 - 2

    我有一个应用程序正在从Ruby迁移到JRuby(由于需要通过Java提供更好的Web服务安全支持)。我使用的gem之一是daemons创建后台作业。问题在于它使用fork+exec来创建后台进程,但这对JRuby来说是禁忌。那么-是否有用于创建后台作业的替代gem/wrapper?我目前的想法是只从shell脚本调用rake并让rake任务永远运行......提前致谢,克里斯。更新我们目前正在使用几个与Java线程相关的包装器,即https://github.com/jmettraux/rufus-scheduler和https://github.com/philostler/acts

  3. ruby-on-rails - 在 Rails 3 中进行身份验证最常用的方法是什么? - 2

    我需要在rail3中使用标准注册/登录/忘记密码功能进行身份验证。是否有大多数人为此使用的插件或其他东西? 最佳答案 我不确定最常用的方法是什么-但可以肯定的是,Plataformatec的“Devise”是一个非常流行的方法:http://github.com/plataformatec/devise我已经尝试了一些authgem,对我来说,它是最简单的设置和修改以满足我的需要。它内置了密码恢复、帐户确认(如果需要)和其他一些非常方便的功能。 关于ruby-on-rails-在Rail

  4. ruby-on-rails - 如何在 Rails 中添加禁用的提交按钮 - 2

    我在ruby​​表单中有一个提交按钮f.submitbtn_text,class:"btnbtn-onemgt12mgb12",id:"btn_id"我想在不使用任何javascript的情况下通过ruby​​禁用此按钮 最佳答案 添加disabled:true选项。f.submitbtn_text,class:"btnbtn-onemgt12mgb12",id:"btn_id",disabled:true 关于ruby-on-rails-如何在Rails中添加禁用的提交按钮,我们在St

  5. ruby - 如何保持我不常用的编程语言技能 - 2

    关闭。这个问题是off-topic.它目前不接受答案。想改进这个问题吗?Updatethequestion所以它是on-topic用于堆栈溢出。关闭11年前。Improvethisquestion我不经常使用ruby​​-通常它加起来相当于每两个月或更长时间编写一次脚本。我的大部分编程都是使用C++进行的,这与ruby​​有很大不同。由于我与ruby​​之间的差距如此之大,我总是忘记语言的基本方面(比如解析文本文件和其他简单的东西)。我想每天练习一些基本的东西,我想知道是否有一些我可以订阅的网站,并且会向我发送当天的Ruby问题或类似的东西。有人知道这样的站点/Internet服务吗?

  6. ruby-on-rails - 能够处理 rar/tar/zip/7z 的 Ruby/rubyzip 替代方案? - 2

    关闭。这个问题不符合StackOverflowguidelines.它目前不接受答案。要求我们推荐或查找工具、库或最喜欢的场外资源的问题对于StackOverflow来说是偏离主题的,因为它们往往会吸引自以为是的答案和垃圾邮件。相反,describetheproblem以及迄今为止为解决该问题所做的工作。关闭9年前。Improvethisquestion我想知道是否有人知道Ruby的ruby​​zip替代品,它可以处理各种格式,特别是zip/rar/7z?我知道libarchive,但它对我的目的来说并不完整(它是一个很好的gem)。(澄清一下,libarchive-对我不起作用-因为

  7. ruby-on-rails - 对于 Ruby 应用程序,是否有比 Sanitize 更好的替代方案? - 2

    我爱Sanitize.这是一个了不起的实用程序。我遇到的唯一问题是,它需要永远准备一个开发环境,因为它使用Nokogiri,这对编译时间来说是一种痛苦。是否有任何程序可以在不使用Nokogiri的情况下执行Sanitize的操作(如果没有别的,只是温和地执行它的操作)?这将以指数方式提供帮助! 最佳答案 Rails有自己的SanitizeHelper。根据http://api.rubyonrails.org/classes/ActionView/Helpers/SanitizeHelper.html,它将Thissanitizehe

  8. ruby-on-rails - rails3 中 cron 作业的解决方案 - 2

    我尝试每天在我的Rails应用程序中自动记录一些数据。我想知道是否有人知道一个好的解决方案?我找到了https://github.com/javan/whenever,但我想确保在选择之前了解所有选项。谢谢!艾略特 最佳答案 我真的很喜欢whenever-这是一个很棒的Gem,我已经在生产中使用了它。关于它还有一个很好的Railscasts插曲:http://railscasts.com/episodes/164-cron-in-ruby 关于ruby-on-rails-rails3中c

  9. Streampark集成Cloudera Flink、ldap、告警,以及部署常见问题 - 2

    集成背景我们当前集群使用的是ClouderaCDP,Flink版本为ClouderaVersion1.14,整体Flink安装目录以及配置文件结构与社区版本有较大出入。直接根据Streampark官方文档进行部署,将无法配置FlinkHome,以及后续整体Flink任务提交到集群中,因此需要进行针对化适配集成,在满足使用需求上,尽量提供完整的Streampark使用体验。集成步骤版本匹配问题解决首先解决无法识别Cloudera中的FlinkHome问题,根据报错主要明确到的事情是无法读取到Flink版本、lib下面的jar包名称无法匹配。修改对象:修改源码:(解决无法匹配clouderajar

  10. Ruby 获取可用的磁盘驱动器 - 2

    谁能告诉我如何在ruby​​中获取可用磁盘驱动器的列表?我正在创建一个打开的文件对话并且需要知道!提前致谢,嗯。 最佳答案 Brian给出的文章正确地陈述了以下代码:require'win32ole'file_system=WIN32OLE.new("Scripting.FileSystemObject")drives=file_system.Drivesdrives.eachdo|drive|puts"Availablespace:#{drive.AvailableSpace}"puts"Driveletter:#{drive.D

随机推荐