ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
下面几个栗子来一一说明:
package com.mafei.apitest
import com.mafei.sinktest.SensorReadingTest5
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object ProcessFunctionTest {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAutoWatermarkInterval(200) //直接全局设置watermark的时间为200毫秒
// val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
//接收一个socket文本流
val inputStream = env.socketTextStream("127.0.0.1",6666)
env.setParallelism(1)
//先转换成样例类类型
val dataStream = inputStream
.map(data => {
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
})
.keyBy(_.id)
// .process(new TestKeydProcessFunction) //demo
.process(new TempIncreWarning(15000L))
dataStream.print()
env.execute("定时器KeydProcessFunction")
}
}
/**
* 定义3个参数: Key 因为上面是按照id做groupby的,所以是string
* 输入数据: SensorReadingTest5
* 输出数据: 这个直接定,可以根据实际情况来改
*/
class TempIncreWarning(alertInterval: Long) extends KeyedProcessFunction[String, SensorReadingTest5,String]{
//定义状态: 保存上一个温度进行比较,保存注册定时器的时间用于删除
lazy val lastTempValue: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTempValue", classOf[Double]))
//定时器时间戳
lazy val timerTimestampState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timerTimestampState", classOf[Long]))
//每条数据都会经过这个方法
override def processElement(value: SensorReadingTest5, ctx: KeyedProcessFunction[String, SensorReadingTest5, String]#Context, out: Collector[String]): Unit = {
//先把上一次的值,和定时器的时间给拿出来
var lastTemp = lastTempValue.value()
var timerTimestamp = timerTimestampState.value()
//把上一次的值,设置成这一次的,用在下次调用
lastTempValue.update(value.temperature)
//用这次的温度和上一次的温度值做比较,如果比上次大,那说明在升温
if (value.temperature > lastTemp){
//说明是第一次,没有定时器被设定(定义的没有默认值,长整型所以是0
if (timerTimestamp == 0){
val ts = ctx.timerService().currentProcessingTime() + alertInterval
ctx.timerService().registerProcessingTimeTimer(ts)
timerTimestampState.update(ts)
}
}else if( value.temperature <= lastTemp){ //如果温度值没有在上升,那就需要把这个定时器给销毁掉,因为不满足15秒持续上升条件了
ctx.timerService().deleteProcessingTimeTimer(timerTimestamp)
// timerTimestampState.update(0L)// 可以直接设置成0
timerTimestampState.clear() //调用这个清空方法也是一样的效果
}
}
//定义触发的时候实际要做的操作
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReadingTest5, String]#OnTimerContext, out: Collector[String]): Unit = {
// ctx.getCurrentKey就是当前的sensor ID ,是上面.keyBy(_.id)这一步定义的
out.collect("传感器 "+ctx.getCurrentKey+"温度持续了"+alertInterval +"秒在持续上升!!!!")
timerTimestampState.clear() //已经触发了,那需要把定时器的时间给清空掉
}
}
/**
* 在KeyedProcessFunction中,点进去就可以看到要传的3个参数
* * @param <K> Type of the key.
* * @param <I> Type of the input elements.
* * @param <O> Type of the output elements.
*/
class TestKeydProcessFunction extends KeyedProcessFunction[String, SensorReadingTest5, String]{
// var stateTest1: valueState[Int] = _
override def processElement(value: SensorReadingTest5, ctx: KeyedProcessFunction[String, SensorReadingTest5, String]#Context, out: Collector[String]): Unit = {
// ctx.output() //定义一个侧输出流
ctx.getCurrentKey // 获取当前key, 跟从value中一个效果
ctx.timerService().currentWatermark() //获取当前水印
ctx.timerService().currentProcessingTime() //当前处理时间
ctx.timerService().registerEventTimeTimer(ctx.timestamp()+ 30000L) //注册一个定时器到当前时间30秒之后
ctx.timerService().registerProcessingTimeTimer(ctx.timestamp() * 30000L) //跟上面一样,换成processTime
ctx.timerService().deleteEventTimeTimer(ctx.timestamp()+ 30000L) //删除一个定时器,这里的时间跟定义的时间要对的上,因为可以注册多个
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReadingTest5, String]#OnTimerContext, out: Collector[String]): Unit = super.onTimer(timestamp, ctx, out)
}
代码结构和运行效果:
package com.mafei.apitest
import com.mafei.sinktest.SensorReadingTest5
import jdk.jfr.Threshold
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object SideOutputTest {
def main(args: Array[String]): Unit = {
//使用ProcessFunction,利用侧输出流实现一个分流操作
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAutoWatermarkInterval(200) //直接全局设置watermark的时间为200毫秒
// val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
//接收一个socket文本流
val inputStream = env.socketTextStream("127.0.0.1",6666)
env.setParallelism(1)
//先转换成样例类类型
val dataStream = inputStream
.map(data => {
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
})
// .process(new TestKeydProcessFunction) //demo
val highTempStream = dataStream.process(new SplitTempProcess(10.1))
highTempStream.print("this is high stream: ")
/**
* new OutputTag[(String,Long, Double)]("low")
* 这里[]内的是定义输出的格式,根据实际情况来改,()里面的low是标签的名称
*/
val lowTempStream = highTempStream.getSideOutput(new OutputTag[(String,Long, Double)]("low"))
lowTempStream.print("this is low stream: ")
env.execute("side output test")
}
}
//实现自定义的processFunction,利用侧输出流,进行分流操作
/**
*
* @param threshold
* ProcessFunction传2个参数,第一个是输入的数据类型,第二个是输出的数据类型,都可以自定义
*
*
*/
class SplitTempProcess(threshold: Double) extends ProcessFunction[SensorReadingTest5, SensorReadingTest5]{
override def processElement(value: SensorReadingTest5, ctx: ProcessFunction[SensorReadingTest5, SensorReadingTest5]#Context, out: Collector[SensorReadingTest5]): Unit = {
//如果温度值大于设置的阈值,那直接输出
if (value.temperature > threshold){
out.collect(value)
}else{ //如果小于等于就输出到侧输出流
/**
* 这里侧输出流的定义必须数据类型和id都要跟上面对的上,low后边的参数代表具体要输出的数据,
*/
ctx.output(new OutputTag[(String,Long, Double)]("low"),(value.id, value.timestamp, value.temperature))
}
}
}
代码结构及运行效果:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.10.1</version>
</dependency>
在代码中配置:
// env.setStateBackend(new MemoryStateBackend())
// env.setStateBackend(new FsStateBackend(""))
// env.setStateBackend(new RocksDBStateBackend(""))
我正在使用RubyonRails3.0.9,我想生成一个传递一些自定义参数的link_toURL。也就是说,有一个articles_path(www.my_web_site_name.com/articles)我想生成如下内容:link_to'Samplelinktitle',...#HereIshouldimplementthecode#=>'http://www.my_web_site_name.com/articles?param1=value1¶m2=value2&...我如何编写link_to语句“alàRubyonRailsWay”以实现该目的?如果我想通过传递一些
目录前言滤波电路科普主要分类实际情况单位的概念常用评价参数函数型滤波器简单分析滤波电路构成低通滤波器RC低通滤波器RL低通滤波器高通滤波器RC高通滤波器RL高通滤波器部分摘自《LC滤波器设计与制作》,侵权删。前言最近需要学习放大电路和滤波电路,但是由于只在之前做音乐频谱分析仪的时候简单了解过一点点运放,所以也是相当从零开始学习了。滤波电路科普主要分类滤波器:主要是从不同频率的成分中提取出特定频率的信号。有源滤波器:由RC元件与运算放大器组成的滤波器。可滤除某一次或多次谐波,最普通易于采用的无源滤波器结构是将电感与电容串联,可对主要次谐波(3、5、7)构成低阻抗旁路。无源滤波器:无源滤波器,又称
在VMware16.2.4安装Ubuntu一、安装VMware1.打开VMwareWorkstationPro官网,点击即可进入。2.进入后向下滑动找到Workstation16ProforWindows,点击立即下载。3.下载完成,文件大小615MB,如下图:4.鼠标右击,以管理员身份运行。5.点击下一步6.勾选条款,点击下一步7.先勾选,再点击下一步8.去掉勾选,点击下一步9.点击下一步10.点击安装11.点击许可证12.在百度上搜索VM16许可证,复制填入,然后点击输入即可,亲测有效。13.点击完成14.重启系统,点击是15.双击VMwareWorkstationPro图标,进入虚拟机主
@作者:SYFStrive @博客首页:HomePage📜:微信小程序📌:个人社区(欢迎大佬们加入)👉:社区链接🔗📌:觉得文章不错可以点点关注👉:专栏连接🔗💃:感谢支持,学累了可以先看小段由小胖给大家带来的街舞👉微信小程序(🔥)目录自定义组件-behaviors 1、什么是behaviors 2、behaviors的工作方式 3、创建behavior 4、导入并使用behavior 5、behavior中所有可用的节点 6、同名字段的覆盖和组合规则总结最后自定义组件-behaviors 1、什么是behaviorsbehaviors是小程序中,用于实现
遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg
ES一、简介1、ElasticStackES技术栈:ElasticSearch:存数据+搜索;QL;Kibana:Web可视化平台,分析。LogStash:日志收集,Log4j:产生日志;log.info(xxx)。。。。使用场景:metrics:指标监控…2、基本概念Index(索引)动词:保存(插入)名词:类似MySQL数据库,给数据Type(类型)已废弃,以前类似MySQL的表现在用索引对数据分类Document(文档)真正要保存的一个JSON数据{name:"tcx"}二、入门实战{"name":"DESKTOP-1TSVGKG","cluster_name":"elasticsear
我想找到在某些文本中找到一些(让它是两个)句子的好方法。什么会更好-使用正则表达式或拆分方法?你的想法?应JeremyStein的要求-有一些例子示例:输入:ThefirstthingtodoistocreatetheCommentmodel.We’llcreatethisinthenormalway,butwithonesmalldifference.IfwewerejustcreatingcommentsforanArticlewe’dhaveanintegerfieldcalledarticle_idinthemodeltostoretheforeignkey,butinthis
我已经开始了:defsplit_array(array,size)index=0results=[]ifsize>0whileindex如果我在[1,2,3,4,5,6]上运行它,比如split_array([1,2,3,4,5,6],3)它将产生这个数组:[[1,2,3],[4,5,6]]。在Ruby1.8.7中是否已经有可用的东西可以做到这一点? 最佳答案 [1,2,3,4,5,6].each_slice(3).to_a#=>[[1,2,3],[4,5,6]]对于1.8.6:require'enumerator'[1,2,3,4
如果我想要“00001”而不是“1”,除了我自己写填零方法之外,有没有内置的方法可以帮助我为整数填零? 最佳答案 puts"%05d"%1#00001参见:String::%,Kernel::sprintf这是正在发生的事情。%左侧的"%05d"是C风格的格式说明符。%右边的变量就是要格式化的东西。格式说明符可以像这样解码:%-格式说明符的开头0-用前导零填充5-长度为5个字符d-被格式化的是一个整数如果你要格式化多个东西,你会把它们放在一个数组中:"%d-%s"%[1,"One"]#=>1-one
我正在使用ruby1.8.7。p=lambda{return10;}deflab(block)puts'before'putsblock.callputs'after'endlabp以上代码输出为before10after我将相同的代码重构到这里deflab(&block)puts'before'putsblock.callputs'after'endlab{return10;}现在我收到LocalJumpError:意外返回。对我来说,这两个代码都在做同样的事情。是的,在第一种情况下我传递了一个过程,在第二种情况下我传递了一个block。但是&block将该block转换为pro