草庐IT

Flink实战,APP推广情况分析

马鹏飞 2023-03-28 原文
接着 https://blog.51cto.com/mapengfei/2581240 这里用Flink来实现对APP在每个渠道的推广情况包括下载、查看、卸载等等行为的分析

因为之前的文章都是用scala写的,这篇用纯java来实现一波, 分别演示下用aggregate 聚合方式和process 方式的实现和效果

整体思路

1、准备好数据源: 这里用SimulatedSource 来自己随机造一批数据 2、准备数据输入样例 `MarketUserBehavior` 和输出样例`MarketViewCountResult` 3、准备环境并设置watermark时间,和指定事件时间字段为timestamp 4、进行过滤:uninstall 的行为过滤掉(根据实际情况来改) 5、根据行为和渠道进行KeyBy统计 6、设置滑动窗口1小时,每10s输出一次 7、进行聚合输出 /** * @author mafei * @date 2021/1/9 */ package com.mafei.market; import cn.hutool.core.util.RandomUtil; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import static java.lang.Thread.sleep; /** * APP市场推广分析 */ /** * 定义一个输入数据的样例类 */ class MarketUserBehavior { String userId; String behavior; String channel; Long timestamp; public MarketUserBehavior(String userId, String behavior, String channel, Long timestamp) { this.userId = userId; this.behavior = behavior; this.channel = channel; this.timestamp = timestamp; } } /** * 定义一个输出数据的类 */ class MarketViewCountResult { Long windowStart; Long windowEnd; String channel; String behavior; Long count; public MarketViewCountResult(Long windowStart, Long windowEnd, String channel, String behavior, Long count) { this.windowStart = windowStart; this.windowEnd = windowEnd; this.channel = channel; this.behavior = behavior; this.count = count; getOutput(); } public void getOutput() { /** * 为了验证效果加的 */ StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("windowsStart: " + windowStart); stringBuffer.append(" windowEnd: " + windowEnd); stringBuffer.append(" channel: " + channel); stringBuffer.append(" behavior: " + behavior); stringBuffer.append(" count: " + count); //为了验证效果,追加打印的 System.out.println(stringBuffer.toString()); } } /** * 定义一个产生随机数据源的类 */ class SimulatedSource extends RichSourceFunction<MarketUserBehavior> { /** * 是否运行的标志位,主要在cancel 方法中调用 */ Boolean running = true; /** * 定义用户行为和渠道的集合 */ String[] userBeahviors = {"view", "download", "install", "uninstall"}; String[] channels = {"dingding", "wexin", "appstore"}; Long maxRunning = 64 * 10000L; Long currentRunningCount = 0L; @Override public void run(SourceContext<MarketUserBehavior> sourceContext) throws Exception { while (running && currentRunningCount < maxRunning) { String channel = RandomUtil.randomEle(channels); String beahvior = RandomUtil.randomEle(userBeahviors); Long timestamp = System.currentTimeMillis() * 1000; String userId = RandomUtil.randomString(20); sourceContext.collect(new MarketUserBehavior(userId, beahvior, channel, timestamp)); currentRunningCount += 1; sleep(100L); } } @Override public void cancel() { running = false; } } public class MarketChannelAnalysis { public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); SingleOutputStreamOperator<MarketUserBehavior> dataStream = environment.addSource(new SimulatedSource()) //设置watermark时间为5秒,并且指定事件时间字段为timestamp .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MarketUserBehavior>(Time.seconds(5)) { @Override public long extractTimestamp(MarketUserBehavior marketUserBehavior) { return marketUserBehavior.timestamp; } }); DataStreamSink<MarketViewCountResult> result = dataStream .filter(new FilterFunction<MarketUserBehavior>() { @Override public boolean filter(MarketUserBehavior marketUserBehavior) throws Exception { return !marketUserBehavior.behavior.equals("uninstall"); } }) // .keyBy("channel", "behavior") // scala的实现方式 .keyBy(new KeySelector<MarketUserBehavior, Tuple2<String, String>>() { @Override public Tuple2<String, String> getKey(MarketUserBehavior marketUserBehavior) throws Exception { // return new String[]{marketUserBehavior.behavior, marketUserBehavior.channel}; return Tuple2.of(marketUserBehavior.behavior, marketUserBehavior.channel); } }) .timeWindow(Time.hours(1), Time.seconds(10)) //窗口大小是1小时,每10秒输出一次 .aggregate(new MyMarketChannelAnalysis(), new MyMarketChannelResult()) // .process(new MarkCountByChannel()) //用process方法也可以实现 .print(); environment.execute(); } } /** * 2种实现思路,用process的时候可以用这个方法 * process不用每来一条数据都定义怎么做,而是把对应的数据会放到内存里面,当窗口结束后进行统一处理,比较耗内存,看实际使用场景 */ class MarkCountByChannel extends ProcessWindowFunction<MarketUserBehavior, MarketViewCountResult, Tuple2<String, String>, TimeWindow> { @Override public void process(Tuple2<String, String> key, Context context, Iterable<MarketUserBehavior> iterable, Collector<MarketViewCountResult> collector) throws Exception { Long startTime = context.window().getStart(); Long endTime = context.window().getEnd(); String channel = key.f1; String behavior = key.f0; Long count = iterable.spliterator().estimateSize(); collector.collect(new MarketViewCountResult(startTime, endTime, channel, behavior, count)); } } /** * 定义聚合函数的具体操作,AggregateFunction 的3个参数: * IN,输入的数据类型: 输入已经在源头定义为 MarketUserBehavior * ACC,中间状态的数据类型:因为每次要算count数,所以是Long类型 * OUT,输出的数据类型:输出的是统计的次数,所以也是Long类型 */ class MyMarketChannelAnalysis implements AggregateFunction<MarketUserBehavior, Long, Long> { @Override public Long createAccumulator() { /** * 初始化的操作,定义次数为0 */ return 0L; } @Override public Long add(MarketUserBehavior marketUserBehavior, Long aLong) { /** * 每来一条数据做的操作,这里直接加1就行了 */ return aLong + 1; } @Override public Long getResult(Long aLong) { /** * 最终输出时调用的方法 */ return aLong; } @Override public Long merge(Long aLong, Long acc1) { /** * 这里是多个的时候用到,主要是session window时会使用 */ return aLong + acc1; } } /** * 定义输出的WindowFunction,要的参数可以点进去看 * IN:这里输入是上一步的输出窗口内add的数量,所以是Long类型 * OUT:自定义的输出结构,这里定义的是一个类,可以直接改 * KEY:分组的Key,就是keyBy 里头定义的Tuple2.of(marketUserBehavior.behavior, marketUserBehavior.channel); * W extends Window:TimeWindow * */ class MyMarketChannelResult implements WindowFunction<Long, MarketViewCountResult, Tuple2<String, String>, TimeWindow> { @Override public void apply(Tuple2<String, String> stringStringTuple2, TimeWindow window, Iterable<Long> input, Collector<MarketViewCountResult> out) { out.collect(new MarketViewCountResult(window.getStart(), window.getEnd(), stringStringTuple2.f1, stringStringTuple2.f0, input.iterator().next())); } } 代码结构及运行的效果,如果要输出es、mysql、kafka之类的直接把print换成addSink就可以了

有关Flink实战,APP推广情况分析的更多相关文章

  1. ruby - 默认情况下使选项为 false - 2

    这是在Ruby中设置默认值的常用方法:classQuietByDefaultdefinitialize(opts={})@verbose=opts[:verbose]endend这是一个容易落入的陷阱:classVerboseNoMatterWhatdefinitialize(opts={})@verbose=opts[:verbose]||trueendend正确的做法是:classVerboseByDefaultdefinitialize(opts={})@verbose=opts.include?(:verbose)?opts[:verbose]:trueendend编写Verb

  2. ruby - 在没有 sass 引擎的情况下使用 sass 颜色函数 - 2

    我想在一个没有Sass引擎的类中使用Sass颜色函数。我已经在项目中使用了sassgem,所以我认为搭载会像以下一样简单:classRectangleincludeSass::Script::FunctionsdefcolorSass::Script::Color.new([0x82,0x39,0x06])enddefrender#hamlengineexecutedwithcontextofself#sothatwithintemlateicouldcall#%stop{offset:'0%',stop:{color:lighten(color)}}endend更新:参见上面的#re

  3. ruby-on-rails - 每次我尝试部署时,我都会得到 - (gcloud.preview.app.deploy) 错误响应 : [4] DEADLINE_EXCEEDED - 2

    我是Google云的新手,我正在尝试对其进行首次部署。我的第一个部署是RubyonRails项目。我基本上是在关注thisguideinthegoogleclouddocumentation.唯一的区别是我使用的是我自己的项目,而不是他们提供的“helloworld”项目。这是我的app.yaml文件runtime:customvm:trueentrypoint:bundleexecrackup-p8080-Eproductionconfig.ruresources:cpu:0.5memory_gb:1.3disk_size_gb:10当我转到我的项目目录并运行gcloudprevie

  4. ruby-on-rails - 如何重命名或移动 Rails 的 README_FOR_APP - 2

    当我在我的Rails应用程序根目录中运行rakedoc:app时,API文档是使用/doc/README_FOR_APP作为主页生成的。我想向该文件添加.rdoc扩展名,以便它在GitHub上正确呈现。更好的是,我想将它移动到应用程序根目录(/README.rdoc)。有没有办法通过修改包含的rake/rdoctask任务在我的Rakefile中执行此操作?是否有某个地方可以查找可以修改的主页文件的名称?还是我必须编写一个新的Rake任务?额外的问题:Rails应用程序的两个单独文件/README和/doc/README_FOR_APP背后的逻辑是什么?为什么不只有一个?

  5. ruby - 在不使用 RVM 的情况下在 Mac 上卸载和升级 Ruby - 2

    我最近决定从我的系统中卸载RVM。在thispage提出的一些论点说服我:实际上,我的决定是,我根本不想担心Ruby的多个版本。我只想使用1.9.2-p290版本而不用担心其他任何事情。但是,当我在我的Mac上运行ruby--version时,它告诉我我的版本是1.8.7。我四处寻找如何简单地从我的Mac上卸载这个Ruby,但奇怪的是我没有找到任何东西。似乎唯一想卸载Ruby的人运行linux,而使用Mac的每个人都推荐RVM。如何从我的Mac上卸载Ruby1.8.7?我想升级到1.9.2-p290版本,并且我希望我的系统上只有一个版本。 最佳答案

  6. ruby - 使用 postgres.app 在 rvm 下要求 pg 时出错 - 2

    我正在使用Postgres.app在OSX(10.8.3)上。我已经修改了我的PATH,以便应用程序的bin文件夹位于所有其他文件夹之前。Rammy:~phrogz$whichpg_config/Applications/Postgres.app/Contents/MacOS/bin/pg_config我已经安装了rvm并且可以毫无错误地安装pggem,但是当我需要它时我得到一个错误:Rammy:~phrogz$gem-v1.8.25Rammy:~phrogz$geminstallpgFetching:pg-0.15.1.gem(100%)Buildingnativeextension

  7. 微信小程序开发入门与实战(Behaviors使用) - 2

    @作者:SYFStrive @博客首页:HomePage📜:微信小程序📌:个人社区(欢迎大佬们加入)👉:社区链接🔗📌:觉得文章不错可以点点关注👉:专栏连接🔗💃:感谢支持,学累了可以先看小段由小胖给大家带来的街舞👉微信小程序(🔥)目录自定义组件-behaviors    1、什么是behaviors    2、behaviors的工作方式    3、创建behavior    4、导入并使用behavior    5、behavior中所有可用的节点    6、同名字段的覆盖和组合规则总结最后自定义组件-behaviors    1、什么是behaviorsbehaviors是小程序中,用于实现

  8. ruby - 在什么情况下会使用 Sinatra 或 Merb? - 2

    我正在学习Rails,对Sinatra和Merb知之甚少。我想知道您会在哪些情况下使用Merb/Sinatra。感谢您的反馈! 最佳答案 Sinatra是一个比Rails更小、更轻的框架。如果你想让一些东西快速运行,只需发送几个URL并返回一些简单的内容,就可以使用它。看看Sinatrahomepage;这就是启动和运行“Hello,World”所需的全部内容,而在Rails中,您需要生成整个项目结构、设置Controller和View、设置路由等等(我还没有有一段时间写了一个Rails应用程序,所以我不知道“Hello,World

  9. ruby - 是否可以在不实际发送或读取数据的情况下查明 ruby​​ 套接字是否处于 ESTABLISHED 或 CLOSE_WAIT 状态? - 2

    s=Socket.new(Socket::AF_INET,Socket::SOCK_STREAM,0)s.connect(Socket.pack_sockaddr_in('port','hostname'))ssl=OpenSSL::SSL::SSLSocket.new(s,sslcert)ssl.connect从这里开始,如果ssl连接和底层套接字仍然是ESTABLISHED,或者它是否在默认值7200之后进入CLOSE_WAIT,我想检查一个线程几秒钟甚至更糟的是在实际上不需要.write()或.read()的情况下关闭。是用select()、IO.select()还是其他方法完成

  10. ruby-on-rails - 在这种情况下我如何模拟一个对象?没有明显的方法可以用模拟替换对象 - 2

    假设我在Store的模型中有这个非常简单的方法:defgeocode_addressloc=Store.geocode(address)self.lat=loc.latself.lng=loc.lngend如果我想编写一些不受地理编码服务影响的测试脚本,这些脚本可能已关闭、有限制或取决于我的互联网连接,我该如何模拟地理编码服务?如果我可以将地理编码对象传递到该方法中,那将很容易,但我不知道在这种情况下该怎么做。谢谢!特里斯坦 最佳答案 使用内置模拟和stub的rspecs,你可以做这样的事情:setupdo@subject=MyCl

随机推荐