草庐IT

最强Flink算子大全手册,面试拿捏了~

大数据兵工厂 2023-03-28 原文
大家好,我是老兵。

Flink基于流编程模型,内置了很多强大功能的算子,可以帮助我们快速开发应用程序。

作为Flink开发老手,大多算子的写法和场景想来已是了然于胸,但是使用过程常常会有一些小小的问题:

  1. 部分算子长时间未用,忘了用法。。
  2. 某些场景选择什么算子?如何选择?含糊不清。。
工欲善其事,必先利其器!快速高效的使用合适的算子开发程序,往往可以达到事半功倍的效果。

想着好记性不如烂笔头这个道理,特此整理一份常见的Flink算子开发手册!!也作为自己的工作笔记。欢迎大家收藏~

1 DataStream API

Flink DataStream API让用户灵活且高效编写Flink流式程序。主要分为DataSource模块、Transformation模块以及DataSink模块。

  • Source模块定义数据接入功能,包括内置数据源和外部数据源。
  • Transformation模块定义DataStream数据流各种转换操作。
  • Sink模块定义数据输出功能,存储结果到外部存储介质中。
执行环境: StreamExecutionEnvironment
系统模块 :
DataSouce、Transformation和DataSink

2 DataSource 输入

DataSource输入模块定义了DataStream API中的数据输入操作,Flink输入数据源分为内置数据源第三方数据源两种类型。

  • 内置数据源包括文件Socket网络端口以及集合类型数据,不需要引入其他依赖库,在Flink系统内部已经实现。
  • 第三方数据源定义了Flink和外部系统数据交互逻辑,例如Apache Kafka ConnectorElastic Search Connector等。
  • 同时用户可以自定义数据源。

2.1 readTextFile、readFile算子

支持读取文本文件到Flink系统,转换成DataStream数据集。

  • readTextFile算子直接读取系统文本文件(.log|.txt ...)
  • readFile算子可以指定InputFormat读取特定数据类型的文件(包括CSV、JSON或者自定义InputFormat)
// 读取文本文件 val textInputStream = env.readTextFile( "/data/example.log") // 指定InputFormat,读取CSV文件 val csvInputStream = env.readFile( // 可以自定义类型(InputFormat) new CsvInputFormat[String] ( new Path("/data/example.csv") ) { override def fillRecord(out: String, onbjects: Array[AnyRef]: String) = { return null } }, "/data/example.csv" )

2.2 Socket算子

支持从Socket端口读取数据,转换成DataStream算子。

  • 算子参数:Ip地址、端口、delimiter字符串切割符、最大重试次数maxRetry
  • maxRetry主要提供任务失败重连机制。当设定为0时,Flink任务直接停止。
  • Unix环境下,执行nc -lk [:port] 启动网络服务
// Flink程序读取Socket端口(9999)数据 val socketDataStream = env.socketTextStream("localhost", 9999)

2.3 集合算子

支持操作Flink内置集合类(Collection),转换成DataStream。

  • 支持JavaScala算子常见集合类
  • 本质是将本地集合数据分发到远程执行;适用于本地测试,注意数据结构类型的一致性
// fromElements元素集合转换 val elementDataStream = env.fromElements( Tuple2('aa', 1L),Tuple2('bb', 2L) ) // fromCollection数组转换(Java) String[] collections = new String[] { "aa", "bb" }; DataStream<String> collectionDatastream = env.fromCollection( Arrays.asList(collections) ); // List列表转换(Java) List<String> arrays = new ArrayList<>(); arrays.add("aa") arrays.add("bb") DataStream<String> arrayDataStream = env.fromCollection(arrays)

2.4 外部数据源算子

支持从第三方数据源系统读取数据,转换成DataStreams算子。

  • 常见外部数据源算子: Hadoop FileSystem、ElasticSearch、 Apache Kafka、RabbitMQ等
  • 使用时需要在Maven环境中添加jar包依赖(pom)
// Maven配置 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-1.2_2.12</artifactId> <version>1.9.1</version> </dependency> // 读取Kafka数据源(Java) Properties prop = new Properties(); prop.setProperty("bootstrap.servers", "localhost:9092"); ... DataStream<String> kafkaStream = env.addSource( new FlinkKafkaStream010<> ( "topic-1", new SimpleStringSchema(), properties ) )

2.5 自定义数据源算子

支持实现内置的Function相关接口,自定义数据源。

具体的内置方法包含但不限于:

  • SourceFunction接口
  • ParallelSourceFunction接口
  • RichParallelSourceFunction类
后续再通过env的addSource()方法添加,具体实现不展开。

3 DataStream转换

Flink对若干个DataStream操作生成新的DataStream,该过程被称为Transformation

Flink程序中大多数逻辑均在Transformation过程中完成,包含转换过滤排序连接关联选择聚合等操作。

注意和Spark中transformation的区别。

Flink中DataStream转换可以分为几种类型:

  • Single DataStream: 单个DataStream数据集元素处理逻辑
  • Multi DataStream: 多个DataStream数据集元素处理逻辑
  • 物理分区:数据集并行度和数据分区处理

3.1 Map算子(#Single)

对数据集中每个元素进行转换操作,生成新DataStream。

  • 底层为MapFunction算子。通过调用map函数,对每个元素执行操作。
  • 常用于数据清洗、计算和转换等。
val inputStream = env.fromElements( ("aa", 1), ("bb", 2), ("cc", 3) ) // 第一种写法: map操作,完成每个元素 + 1 val mapStream1 = inputStream.map( t => (t._1, t.2 + 1) ) // 第二种写法: 指定MapFunction val mapStream2 = inputStream.map( new MapFunction[(String, Int), (String, Int)] { override def map(t: (String, Int)): (String, Int) = { (t._1, t._2 + 1)} } )

3.2 FlatMap算子(#Single)

支持对数据集中所有元素转换成多个元素,生成新DataStream。

val flatDataStream = env.fromCollections() val resultStream = flatDataStream.flatMap{ line => line.split(",") }

3.3 filter算子(#Single)

支持对数据集进行过滤筛选,生成新的DataStream

// 通配符写法 val filterDataStream = dataStream.fliter { _ % 2 == 0 } // 指定运算符表达式 val filterDS = dataStream.filter( x => x % 2 == 0 )

3.4 keyBy算子(#Single)

根据指定Key对DataStream数据集分区,生成新的KeyedStream

  • 相同Key值的数据归并到同一分区
  • 类似于Spark中的groupByKey
val inputStream = env.fromElements( ("aa", 11), ("aa", 22), ("bb", 33) ) // 根据第一个字段作为key分区 // 转换为KeyedStream[(String, String), Tuple] val keyedStream: inputStream.keyBy(0)

3.5 reduce算子(#Single)

支持对输入KeyedSteam根据reduce()聚合,生成新的DataStream

  • 根据key分区聚合形成KeyedStream
  • 支持运算符和自定义reduceFunc函数
val inputStream = env.fromElements( ("aa", 11), ("bb", 33), ("cc", 22), ("aa", 21) ) // 指定第一个字段分区key val keyedStream = inputStream.keyBy(0) // 对第二个字段进行累加求和 val reduceDataStream = keyedStream.reduce { (t1, t2) => (t1._1, t1._2 + t2._2) } 自定义Reduce函数,需要实现匿名类。

val reduceDataStream = keyedStream.reeduce( new ReduceFunction[(String, Int)] { override def reduce(t1: (String,Int), t2: (String, Int)): (String, Int) = { (t1._1, t1._2 + t2._2) } } )

3.6 aggregations算子(#Single)

DataStream基础聚合算子,通过输入KeyedStream进行聚合生成新的DataStream

  • 根据指定字段聚合,可自定义聚合逻辑
  • 底层封装了sum、min、max等函数
val inputStream = env.fromElements( (1, 7), (2, 8), (3, 11), (2, 3) ) // 指定第一个字段分区key val keyedStream: [(Int, Int), Tuple] = inputStream.keyBy(0) // 第二个字段sum统计 val sumStream = keyedStream.sum(1) // 最后输出结果 sumStream.print()

3.7 Connect合并算子(#Multi)

合并多种类型数据集,并保留原数据集的数据类型,生成ConnectedStream

  • 共享状态数据,可互相获取数据集状态
  • 某些场景下可替代join算子,变相实现flink双流join功能
// 创建不同数据类型数据集 val stream1 = env.fromElements( ("aa", 3), ("bb", 4), ("cc", 11), ("dd", 22) ) val stream2 = env.fromElements( (1, 2, 11, 8) ) // 连接数据集 // 返回[(String, Int), Int] // 类似: [("aa", 3),1] val connectedStream = stream1.connect(stream2)

3.8 Connect算子—CoMap(#Multi)

ConnectedStream数据流的Map功能算子,操作合并数据集所有元素

  • 定义CoMapFunction对象,参数为输入数据类型、输出数据类型和mapFunc
  • 子map函数多线程交替执行,生成最终的合并目标数据集
// 上文Connected操作后形成的数据流 // 参数: 第1个为stream1类型;第2个为stream2类型;第3个为stream3类型 val resultStream = connnectedStream.map( new CoMapFunction[(String, Int), Int, (Int, String)] { // 定义第一个数据集处理逻辑,输入值为stream1 override def map1(in1: (String, Int)): (Int, String) = { (in1._2, in1._1) } // 定义第二个数据集处理逻辑,输入值为stream2 override def map2(in2: Int): (Int,String)={ (in2, "default") } )

3.9 Connect算子—CoFlatMap(#Multi)

ConnectedStream数据流的flatmap功能算子

在flatmap()方法中指定CoFlatMapFunction,并分别实现flatmap1()和flatmap2()函数。

val resultStream2 = connectedStream.flatMap( new CoFlatMapFunction[(String, Int), Int, (String, Int, Int)] { // 举例: 函数中共享变量,完成两个数据集合并 var value = 0 // 定义第1个数据集处理函数 override def flatMap1(in1: (String, Int), collect: Collector[(String, Int, Int)]): Unit = { collect.collect((in1._1, in1._2, value)) } } // 定义第2个数据集处理函数 override def flatMap2(in2: Int, collect: Collector[(String, Int, Int)]): Unit = { value = in2 } )

3.10 Union算子(#Multi)

将两个或者多个数据集合并,生成与输入数据集类型一致的DataStream

  • 输入数据集的数据类型要求一致
  • 输出数据集的数据类型和输入数据一致
  • 注意和connect算子的区别
val stream1 = env.fromElements( ("aa", 3), ("bb", 22), ("cc", 45) ) val stream2 = env.fromElements( ("dd", 23), ("ff", 21), ("gg", 89) ) val stream3 = .... // 合并数据集 val unionStream = stream1.union(stream2) val unionStream2 = stream1.union( stream2, stream3 )

3.11 Split算子(#Multi)

将DataStream数据集按照条件拆分,转换成两个数据集的DataStream算子

  • 将接入的数据路由到多个输出数据集,在split函数中定义拆分逻辑
  • 可以被看作是union的逆向实现
val stream1 = env.fromElements( ("aa", 3), ("bb", 33), ("cc", 56),("aa", 23), ("cc", 67) ) // 根据第二个字段的奇偶性标记数据(切分) val splitStream = stream1.split( v => if (v._2 % 2 == 1 Seq("even") else Seq("odd")) )

3.12 Select算子(#Multi)

Select筛选算子,通过条件选择数据集中元素,生成新的DataStream

// 筛选偶数数据 val evenStream = splitedStream.select("even") //筛选所有数据 val allStream = splitedStream.select("even", "odd")

3.13 window窗口算子(时间机制)

Flink的窗口算子是实时计算的核心算子,常用于某固定时间内指标统计

1)窗口API

Flink提供了高级窗口API算子,封装底层窗口操作,包括窗口类型、触发器、侧输出等。同时根据上游输入Stream流分为Non-Keyed和Keyed两种类型。

  • Non-keyed(上游为Non-KeyedStream) 直接调用windowAll(),获取全局统计
val inputStream: DataStream = ... // 当传入为KeyedStream时,调用window()函数 inputStream.keyBy(0).window(new WindowFunc(...)) // 当传入为不做处理的Non-Keyed输入Stream流 // 直接使用windowAll()全局统计 inputStream.windowAll(new WindowFunc(...))
  • keyed(上游为KeyedStream类型)
    调用DataStream的内置window()
stream.keyBy(..//keyed输入流.) .window(..//窗口类型.) .trigger(.//触发器<可选>..) .evictor(.//剔除器<可选>.) .allowdedLateness(.//延迟处理机制.) .sideOutputLateDate(.//侧输出.) .reduce/fold.aggregate/apply(.//计算函数.) 2)窗口类型

根据窗口的分配方式分为: 滚动滑动会话全局等,分别支持不同窗口流动方式和范围。

同时支持事件时间和处理时间数据流。

  • Tumbling Window Join (滚动窗口)

  • Sliding Window Join (滑动窗口)

  • Session Widnow Join(会话窗口)

以十分钟时间滑动窗口统计案例说明:

val tumblingStream = inputStream .keyBy(0) .window( TumblingEventTimeWindows.of( Time.seconds(10)) ).process(...)

4 DataSink输出

Flink读取数据源,经过系列Transform操作后,结果一般转存至外部存储介质或者下游,即Flink的DataSink过程。

Flink将外部存储的连接逻辑封装在Connector连接器中,常见的有:

  • Apache Kafka
  • ElasticSearch
  • Hadoop FileSystem
  • Redis
  • 文件系统、端口

4.1 文件|端口

支持文件、客户端、Socket网络输出,为Flink内置算子,不需要依赖三方库

常见有writeAsCSV(本地文件)、writeToSocket(Socket网络)

// 本地csv inputStream.writeAsCsv( "file://path/xx.csv", WriteMode.OVERWRITE ) // Socket网络 inputStream.writeToSocket( host, post, new SimpleStringSchema() )

4.2 外部第三方

基于SinkFunction定义,需要引入外部三方依赖库,设置三方系统参数

val dataStream = ... // 定义FlinkKafkaProducer val kafkaProducer = new FlinkKafkaProducer011[Sting] ( "localhost:9092", //kafka broker list连接 "xxx-topic", // kafka topic new SimpleStringSchema() //序列化 ) // 添加SinkFunc dataStream.addSink(kafkaProducer())

5 总结

Flink内置的算子库种类全、功能强大,熟练掌握算子的使用方式和场景应用,是实时计算的必备技能。

后面还会继续更新此系列,欢迎添加我的个人微信: youlong525,一起学习交流~

未完待续。。

》》》更多好文,欢迎关注我的公众号: 大数据兵工厂

有关最强Flink算子大全手册,面试拿捏了~的更多相关文章

  1. 【Java 面试合集】HashMap中为什么引入红黑树,而不是AVL树呢 - 2

    HashMap中为什么引入红黑树,而不是AVL树呢1.概述开始学习这个知识点之前我们需要知道,在JDK1.8以及之前,针对HashMap有什么不同。JDK1.7的时候,HashMap的底层实现是数组+链表JDK1.8的时候,HashMap的底层实现是数组+链表+红黑树我们要思考一个问题,为什么要从链表转为红黑树呢。首先先让我们了解下链表有什么不好???2.链表上述的截图其实就是链表的结构,我们来看下链表的增删改查的时间复杂度增:因为链表不是线性结构,所以每次添加的时候,只需要移动一个节点,所以可以理解为复杂度是N(1)删:算法时间复杂度跟增保持一致查:既然是非线性结构,所以查询某一个节点的时候

  2. 7个大一C语言必学的程序 / C语言经典代码大全 - 2

    嗨~大家好,这里是可莉!今天给大家带来的是7个C语言的经典基础代码~那一起往下看下去把【程序一】打印100到200之间的素数#includeintmain(){ inti; for(i=100;i 【程序二】输出乘法口诀表#includeintmain(){inti;for(i=1;i 【程序三】判断1000年---2000年之间的闰年#includeintmain(){intyear;for(year=1000;year 【程序四】给定两个整形变量的值,将两个值的内容进行交换。这里提供两种方法来进行交换,第一种为创建临时变量来进行交换,第二种是不创建临时变量而直接进行交换。1.创建临时变量来

  3. 西安华为OD面试体验 - 2

    西安华为OD面试体验开始投简历技术面试进展工作进展开始投简历去年一整年一直在考研和工作之间纠结,感觉自己的状态好像当时的疫情一样差劲。之前刚毕业的时候投了个大厂的简历,结果一面写算法的时候太拉跨了,虽然知道时dfs但是代码熟练度不够,放在平时给足时间自己可以调试通过,但是熟练度不够那面试当时就写不出来被刷了。说真的算法学到后期我感觉最重要的是熟练度和背板子(对于我这种普通玩家来说),面试题如果一上来短时间内想不出思路就完蛋了。然后由于当时找的工作不是很理想就又想考研了。但是考研是有风险的,我自我感觉自己可能冲不上那个学校,而找工作一个没成可以继续找嘛。本着抱着试试看的态度在boss上投了简历,

  4. [面试直通版]操作系统核心之进程、线程与协程(下) - 2

    点击->操作系统复习的文章集目录操作系统线程线程是什么进程与线程的关系用户态/内核态操作系统资源管理内核态用户态内核态/用户态切换程序运行类型分析计算密集型IO密集型结合进程,线程来理解程序运行类型分析协程基础上下文切换协程协程为什么叫协作式线程?协程的优缺点操作系统线程典型问题:简述进程和线程的区别以下内容带您一步步了解线程是什么比进程更小的独立运行的基本单位-线程(Threads)线程的提出主要是为了提高系统内程序并发执行的程度,从而进一步提升系统的吞吐量,充分发挥多核CPU的优越性而设计的引入进程是为了操作系统更加方便地管理程序,使得多个程序能并发管理和执行而线程则是为了减少程序在并发执

  5. 最强Http缓存策略之强缓存和协商缓存的详解与应用实例 - 2

    HTTP缓存是指浏览器或者代理服务器将已经请求过的资源保存到本地,以便下次请求时能够直接从缓存中获取资源,从而减少网络请求次数,提高网页的加载速度和用户体验。缓存分为强缓存和协商缓存两种模式。一.强缓存强缓存是指浏览器直接从本地缓存中获取资源,而不需要向web服务器发出网络请求。这是因为浏览器在第一次请求资源时,服务器会在响应头中添加相关缓存的响应头,以表明该资源的缓存策略。常见的强缓存响应头如下所述:Cache-ControlCache-Control响应头是用于控制强制缓存和协商缓存的缓存策略。该响应头中的指令如下:max-age:指定该资源在本地缓存的最长有效时间,以秒为单位。例如:Ca

  6. Streampark集成Cloudera Flink、ldap、告警,以及部署常见问题 - 2

    集成背景我们当前集群使用的是ClouderaCDP,Flink版本为ClouderaVersion1.14,整体Flink安装目录以及配置文件结构与社区版本有较大出入。直接根据Streampark官方文档进行部署,将无法配置FlinkHome,以及后续整体Flink任务提交到集群中,因此需要进行针对化适配集成,在满足使用需求上,尽量提供完整的Streampark使用体验。集成步骤版本匹配问题解决首先解决无法识别Cloudera中的FlinkHome问题,根据报错主要明确到的事情是无法读取到Flink版本、lib下面的jar包名称无法匹配。修改对象:修改源码:(解决无法匹配clouderajar

  7. 【华为OD技术面试 | 真八股 】MySQL联合索引,谈springIOC的理解,谈springAOP的理解,Erika和zookeeper等问题 - 2

    文章目录华为OD面试流程1.mysql数据库建了两个字段,且设置了联合索引,如果其中有一个字段为空会出现什么问题?2.谈谈springIOC的理解,有什么好处,解决了什么问题3.谈谈springAOP的理解,切面编程有没有实际应用,有哪些注解,作用是什么,有那些应用场景?4.Erika和zookeeper有了解过吗,作用是什么,主要解决了什么问题5.谈谈JDK、JRE、JVM的理解,区别是什么6.谈谈对泛型的理解7.JVM的组成华为OD面试流程机试:三道算法题,关于机试,橡皮擦已经准备好了各语言专栏,可以直接订阅。性格测试:机试技术一面(本专栏核心)技术二面(本专栏核心)主管面试定级定薪发of

  8. 绝对详细的 RabbitMQ 实践操作手册(一) - 2

    绝对详细的RabbitMQ实践操作手册,看完本系列就够了。一、什么是MQ?1、MQ的概念2、理解消息队列二、MQ的优势和劣势1、优势和作用2、劣势三、MQ的应用场景四、AMQP五、工作原理一、什么是MQ?1、MQ的概念MQ全称MessageQueue(消息队列),是在消息的传输过程中保存消息的容器。多用于系统之间的异步通信。下面用图来理解异步通信,并阐明与同步通信的区别。同步通信:甲乙两人面对面交流,你一句我一句必须同步进行,两人除此之外不做任何事情异步通信:异步通信相当于通过第三方转述对话,可能有消息的延迟,但不需要二人时刻保持联系,消息传给第三方后,两人可以做其他自己想做的事情,当需要获取

  9. flink on yarn - 2

    文章目录使用flinksqlclientonyarnsession模式Per-JobCluster模式flinkrunflinkrunapplication-tyarn-application配置任务退出时保留Checkpoint从外部checkpoint恢复应用资料使用安装完hadoop3.3.4之后,启动hadoop、yarn将flink1.14.6上传到各个服务器节点,解压flinksqlclientonyarnhttps://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/overview/Appli

  10. 相机面试问题总结 - 2

    1,Camera基本工作原理答案:光线通过镜头Lens进入摄像头内部,然后经过IRFilter过滤红外光,最后到达sensor(传感器),senor分为按照材质可以分为CMOS和CCD两种,可以将光学信号转换为电信号,再通过内部的ADC电路转换为数字信号,然后传输给DSP(如果有的话,如果没有则以DVP的方式传送数据到基带芯片baseband,此时的数据格式RawData,后面有讲进行加工)加工处理,转换成RGB、YUV等格式输出。数据流是如何从sensor到APP的?上述描述结束后,在ISP处理后面的阶段,数据会进行分流,分为capture,preview,video等以供后续动作使用。例如

随机推荐