1、准备好数据源: 这里用SimulatedSource 来自己随机造一批数据
2、准备数据输入样例 `MarketUserBehavior` 和输出样例`MarketViewCountResult`
3、准备环境并设置watermark时间,和指定事件时间字段为timestamp
4、进行过滤:uninstall 的行为过滤掉(根据实际情况来改)
5、根据行为和渠道进行KeyBy统计
6、设置滑动窗口1小时,每10s输出一次
7、进行聚合输出
/**
* @author mafei
* @date 2021/1/9
*/
package com.mafei.market;
import cn.hutool.core.util.RandomUtil;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import static java.lang.Thread.sleep;
/**
* APP市场推广分析
*/
/**
* 定义一个输入数据的样例类
*/
class MarketUserBehavior {
String userId;
String behavior;
String channel;
Long timestamp;
public MarketUserBehavior(String userId, String behavior, String channel, Long timestamp) {
this.userId = userId;
this.behavior = behavior;
this.channel = channel;
this.timestamp = timestamp;
}
}
/**
* 定义一个输出数据的类
*/
class MarketViewCountResult {
Long windowStart;
Long windowEnd;
String channel;
String behavior;
Long count;
public MarketViewCountResult(Long windowStart, Long windowEnd, String channel, String behavior, Long count) {
this.windowStart = windowStart;
this.windowEnd = windowEnd;
this.channel = channel;
this.behavior = behavior;
this.count = count;
getOutput();
}
public void getOutput() {
/**
* 为了验证效果加的
*/
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("windowsStart: " + windowStart);
stringBuffer.append(" windowEnd: " + windowEnd);
stringBuffer.append(" channel: " + channel);
stringBuffer.append(" behavior: " + behavior);
stringBuffer.append(" count: " + count);
//为了验证效果,追加打印的
System.out.println(stringBuffer.toString());
}
}
/**
* 定义一个产生随机数据源的类
*/
class SimulatedSource extends RichSourceFunction<MarketUserBehavior> {
/**
* 是否运行的标志位,主要在cancel 方法中调用
*/
Boolean running = true;
/**
* 定义用户行为和渠道的集合
*/
String[] userBeahviors = {"view", "download", "install", "uninstall"};
String[] channels = {"dingding", "wexin", "appstore"};
Long maxRunning = 64 * 10000L;
Long currentRunningCount = 0L;
@Override
public void run(SourceContext<MarketUserBehavior> sourceContext) throws Exception {
while (running && currentRunningCount < maxRunning) {
String channel = RandomUtil.randomEle(channels);
String beahvior = RandomUtil.randomEle(userBeahviors);
Long timestamp = System.currentTimeMillis() * 1000;
String userId = RandomUtil.randomString(20);
sourceContext.collect(new MarketUserBehavior(userId, beahvior, channel, timestamp));
currentRunningCount += 1;
sleep(100L);
}
}
@Override
public void cancel() {
running = false;
}
}
public class MarketChannelAnalysis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<MarketUserBehavior> dataStream = environment.addSource(new SimulatedSource())
//设置watermark时间为5秒,并且指定事件时间字段为timestamp
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MarketUserBehavior>(Time.seconds(5)) {
@Override
public long extractTimestamp(MarketUserBehavior marketUserBehavior) {
return marketUserBehavior.timestamp;
}
});
DataStreamSink<MarketViewCountResult> result = dataStream
.filter(new FilterFunction<MarketUserBehavior>() {
@Override
public boolean filter(MarketUserBehavior marketUserBehavior) throws Exception {
return !marketUserBehavior.behavior.equals("uninstall");
}
})
// .keyBy("channel", "behavior") // scala的实现方式
.keyBy(new KeySelector<MarketUserBehavior, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(MarketUserBehavior marketUserBehavior) throws Exception {
// return new String[]{marketUserBehavior.behavior, marketUserBehavior.channel};
return Tuple2.of(marketUserBehavior.behavior, marketUserBehavior.channel);
}
})
.timeWindow(Time.hours(1), Time.seconds(10)) //窗口大小是1小时,每10秒输出一次
.aggregate(new MyMarketChannelAnalysis(), new MyMarketChannelResult())
// .process(new MarkCountByChannel()) //用process方法也可以实现
.print();
environment.execute();
}
}
/**
* 2种实现思路,用process的时候可以用这个方法
* process不用每来一条数据都定义怎么做,而是把对应的数据会放到内存里面,当窗口结束后进行统一处理,比较耗内存,看实际使用场景
*/
class MarkCountByChannel extends ProcessWindowFunction<MarketUserBehavior, MarketViewCountResult, Tuple2<String, String>, TimeWindow> {
@Override
public void process(Tuple2<String, String> key, Context context, Iterable<MarketUserBehavior> iterable, Collector<MarketViewCountResult> collector) throws Exception {
Long startTime = context.window().getStart();
Long endTime = context.window().getEnd();
String channel = key.f1;
String behavior = key.f0;
Long count = iterable.spliterator().estimateSize();
collector.collect(new MarketViewCountResult(startTime, endTime, channel, behavior, count));
}
}
/**
* 定义聚合函数的具体操作,AggregateFunction 的3个参数:
* IN,输入的数据类型: 输入已经在源头定义为 MarketUserBehavior
* ACC,中间状态的数据类型:因为每次要算count数,所以是Long类型
* OUT,输出的数据类型:输出的是统计的次数,所以也是Long类型
*/
class MyMarketChannelAnalysis implements AggregateFunction<MarketUserBehavior, Long, Long> {
@Override
public Long createAccumulator() {
/**
* 初始化的操作,定义次数为0
*/
return 0L;
}
@Override
public Long add(MarketUserBehavior marketUserBehavior, Long aLong) {
/**
* 每来一条数据做的操作,这里直接加1就行了
*/
return aLong + 1;
}
@Override
public Long getResult(Long aLong) {
/**
* 最终输出时调用的方法
*/
return aLong;
}
@Override
public Long merge(Long aLong, Long acc1) {
/**
* 这里是多个的时候用到,主要是session window时会使用
*/
return aLong + acc1;
}
}
/**
* 定义输出的WindowFunction,要的参数可以点进去看
* IN:这里输入是上一步的输出窗口内add的数量,所以是Long类型
* OUT:自定义的输出结构,这里定义的是一个类,可以直接改
* KEY:分组的Key,就是keyBy 里头定义的Tuple2.of(marketUserBehavior.behavior, marketUserBehavior.channel);
* W extends Window:TimeWindow
*
*/
class MyMarketChannelResult implements WindowFunction<Long, MarketViewCountResult, Tuple2<String, String>, TimeWindow> {
@Override
public void apply(Tuple2<String, String> stringStringTuple2, TimeWindow window, Iterable<Long> input, Collector<MarketViewCountResult> out) {
out.collect(new MarketViewCountResult(window.getStart(), window.getEnd(), stringStringTuple2.f1, stringStringTuple2.f0, input.iterator().next()));
}
}
代码结构及运行的效果,如果要输出es、mysql、kafka之类的直接把print换成addSink就可以了
这是在Ruby中设置默认值的常用方法:classQuietByDefaultdefinitialize(opts={})@verbose=opts[:verbose]endend这是一个容易落入的陷阱:classVerboseNoMatterWhatdefinitialize(opts={})@verbose=opts[:verbose]||trueendend正确的做法是:classVerboseByDefaultdefinitialize(opts={})@verbose=opts.include?(:verbose)?opts[:verbose]:trueendend编写Verb
我想在一个没有Sass引擎的类中使用Sass颜色函数。我已经在项目中使用了sassgem,所以我认为搭载会像以下一样简单:classRectangleincludeSass::Script::FunctionsdefcolorSass::Script::Color.new([0x82,0x39,0x06])enddefrender#hamlengineexecutedwithcontextofself#sothatwithintemlateicouldcall#%stop{offset:'0%',stop:{color:lighten(color)}}endend更新:参见上面的#re
我是Google云的新手,我正在尝试对其进行首次部署。我的第一个部署是RubyonRails项目。我基本上是在关注thisguideinthegoogleclouddocumentation.唯一的区别是我使用的是我自己的项目,而不是他们提供的“helloworld”项目。这是我的app.yaml文件runtime:customvm:trueentrypoint:bundleexecrackup-p8080-Eproductionconfig.ruresources:cpu:0.5memory_gb:1.3disk_size_gb:10当我转到我的项目目录并运行gcloudprevie
当我在我的Rails应用程序根目录中运行rakedoc:app时,API文档是使用/doc/README_FOR_APP作为主页生成的。我想向该文件添加.rdoc扩展名,以便它在GitHub上正确呈现。更好的是,我想将它移动到应用程序根目录(/README.rdoc)。有没有办法通过修改包含的rake/rdoctask任务在我的Rakefile中执行此操作?是否有某个地方可以查找可以修改的主页文件的名称?还是我必须编写一个新的Rake任务?额外的问题:Rails应用程序的两个单独文件/README和/doc/README_FOR_APP背后的逻辑是什么?为什么不只有一个?
我最近决定从我的系统中卸载RVM。在thispage提出的一些论点说服我:实际上,我的决定是,我根本不想担心Ruby的多个版本。我只想使用1.9.2-p290版本而不用担心其他任何事情。但是,当我在我的Mac上运行ruby--version时,它告诉我我的版本是1.8.7。我四处寻找如何简单地从我的Mac上卸载这个Ruby,但奇怪的是我没有找到任何东西。似乎唯一想卸载Ruby的人运行linux,而使用Mac的每个人都推荐RVM。如何从我的Mac上卸载Ruby1.8.7?我想升级到1.9.2-p290版本,并且我希望我的系统上只有一个版本。 最佳答案
我正在使用Postgres.app在OSX(10.8.3)上。我已经修改了我的PATH,以便应用程序的bin文件夹位于所有其他文件夹之前。Rammy:~phrogz$whichpg_config/Applications/Postgres.app/Contents/MacOS/bin/pg_config我已经安装了rvm并且可以毫无错误地安装pggem,但是当我需要它时我得到一个错误:Rammy:~phrogz$gem-v1.8.25Rammy:~phrogz$geminstallpgFetching:pg-0.15.1.gem(100%)Buildingnativeextension
@作者:SYFStrive @博客首页:HomePage📜:微信小程序📌:个人社区(欢迎大佬们加入)👉:社区链接🔗📌:觉得文章不错可以点点关注👉:专栏连接🔗💃:感谢支持,学累了可以先看小段由小胖给大家带来的街舞👉微信小程序(🔥)目录自定义组件-behaviors 1、什么是behaviors 2、behaviors的工作方式 3、创建behavior 4、导入并使用behavior 5、behavior中所有可用的节点 6、同名字段的覆盖和组合规则总结最后自定义组件-behaviors 1、什么是behaviorsbehaviors是小程序中,用于实现
我正在学习Rails,对Sinatra和Merb知之甚少。我想知道您会在哪些情况下使用Merb/Sinatra。感谢您的反馈! 最佳答案 Sinatra是一个比Rails更小、更轻的框架。如果你想让一些东西快速运行,只需发送几个URL并返回一些简单的内容,就可以使用它。看看Sinatrahomepage;这就是启动和运行“Hello,World”所需的全部内容,而在Rails中,您需要生成整个项目结构、设置Controller和View、设置路由等等(我还没有有一段时间写了一个Rails应用程序,所以我不知道“Hello,World
s=Socket.new(Socket::AF_INET,Socket::SOCK_STREAM,0)s.connect(Socket.pack_sockaddr_in('port','hostname'))ssl=OpenSSL::SSL::SSLSocket.new(s,sslcert)ssl.connect从这里开始,如果ssl连接和底层套接字仍然是ESTABLISHED,或者它是否在默认值7200之后进入CLOSE_WAIT,我想检查一个线程几秒钟甚至更糟的是在实际上不需要.write()或.read()的情况下关闭。是用select()、IO.select()还是其他方法完成
假设我在Store的模型中有这个非常简单的方法:defgeocode_addressloc=Store.geocode(address)self.lat=loc.latself.lng=loc.lngend如果我想编写一些不受地理编码服务影响的测试脚本,这些脚本可能已关闭、有限制或取决于我的互联网连接,我该如何模拟地理编码服务?如果我可以将地理编码对象传递到该方法中,那将很容易,但我不知道在这种情况下该怎么做。谢谢!特里斯坦 最佳答案 使用内置模拟和stub的rspecs,你可以做这样的事情:setupdo@subject=MyCl