草庐IT

第12讲:Flink 常用的 Source 和 Connector

大数据技术派 2023-03-28 原文

Flink系列文章

  1. 第01讲:Flink 的应用场景和架构模型
  2. 第02讲:Flink 入门程序 WordCount 和 SQL 实现
  3. 第03讲:Flink 的编程模型与其他框架比较
  4. 第04讲:Flink 常用的 DataSet 和 DataStream API
  5. 第05讲:Flink SQL & Table 编程和案例
  6. 第06讲:Flink 集群安装部署和 HA 配置
  7. 第07讲:Flink 常见核心概念分析
  8. 第08讲:Flink 窗口、时间和水印
  9. 第09讲:Flink 状态与容错
  10. 第10讲:Flink Side OutPut 分流
本课时我们主要介绍 Flink 中支持的 Source 和常用的 Connector。

Flink 作为实时计算领域强大的计算能力,以及与其他系统进行对接的能力都非常强大。Flink 自身实现了多种 Source 和 Connector 方法,并且还提供了多种与第三方系统进行对接的 Connector。

我们可以把这些 Source、Connector 分成以下几个大类。

预定义和自定义 Source

在前面的第 04 课时“Flink 常用的 DataSet 和 DataStream API”中提到过几种 Flink 已经实现的新建 DataStream 方法。

基于文件

我们在本地环境进行测试时可以方便地从本地文件读取数据:

readTextFile(path) readFile(fileInputFormat, path) 可以直接在 ExecutionEnvironment 和 StreamExecutionEnvironment 类中找到 Flink 支持的读取本地文件的方法,如下图所示:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // read text file from local files system DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile"); // read text file from an HDFS running at nnHost:nnPort DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile"); // read a CSV file with three fields DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") .types(Integer.class, String.class, Double.class); // read a CSV file with five fields, taking only two of them DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") .includeFields("10010") // take the first and the fourth field .types(String.class, Double.class); // read a CSV file with three fields into a POJO (Person.class) with corresponding fields DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") .pojoType(Person.class, "name", "age", "zipcode");

基于 Collections

我们也可以基于内存中的集合、对象等创建自己的 Source。一般用来进行本地调试或者验证。

例如:

fromCollection(Collection) fromElements(T ...) 我们也可以在源码中看到 Flink 支持的方法,如下图所示:

DataSet<String> text = env.fromElements( "Flink Spark Storm", "Flink Flink Flink", "Spark Spark Spark", "Storm Storm Storm" ); List data = new ArrayList<Tuple3<Integer,Integer,Integer>>(); data.add(new Tuple3<>(0,1,0)); data.add(new Tuple3<>(0,1,1)); data.add(new Tuple3<>(0,2,2)); data.add(new Tuple3<>(0,1,3)); data.add(new Tuple3<>(1,2,5)); data.add(new Tuple3<>(1,2,9)); data.add(new Tuple3<>(1,2,11)); data.add(new Tuple3<>(1,2,13)); DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);

基于 Socket

通过监听 Socket 端口,我们可以在本地很方便地模拟一个实时计算环境。

StreamExecutionEnvironment 中提供了 socketTextStream 方法可以通过 host 和 port 从一个 Socket 中以文本的方式读取数据。

DataStream<String> text = env.socketTextStream("127.0.0.1", 9000, "\n");

自定义 Source

我们可以通过实现 Flink 的SourceFunction 或者 ParallelSourceFunction 来实现单个或者多个并行度的 Source。

例如,我们在之前的课程中用到的:

public class MyStreamingSource implements SourceFunction<Item> { private boolean isRunning = true; /** * 重写run方法产生一个源源不断的数据发送源 * @param ctx * @throws Exception */ public void run(SourceContext<Item> ctx) throws Exception { while(isRunning){ Item item = generateItem(); ctx.collect(item); //每秒产生一条数据 Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } //随机产生一条商品数据 private Item generateItem(){ int i = new Random().nextInt(100); ArrayList<String> list = new ArrayList(); list.add("HAT"); list.add("TIE"); list.add("SHOE"); Item item = new Item(); item.setName(list.get(new Random().nextInt(3))); item.setId(i); return item; } }

自带连接器

Flink 中支持了比较丰富的用来连接第三方的连接器,可以在官网中找到 Flink 支持的各种各样的连接器:

Apache Kafka (source/sink)

Apache Cassandra (sink)

Amazon Kinesis Streams (source/sink)

Elasticsearch (sink)

Hadoop FileSystem (sink)

RabbitMQ (source/sink)

Apache NiFi (source/sink)

Twitter Streaming API (source)

Google PubSub (source/sink)

需注意,我们在使用这些连接器时通常需要引用相对应的 Jar 包依赖。而且一定要注意,对于某些连接器比如 Kafka 是有版本要求的,一定要去官方网站找到对应的依赖版本。

基于 Apache Bahir 发布

Flink 还会基于 Apache Bahir 来发布一些 Connector,比如我们常用的 Redis 等。

Apache Bahir 的代码最初是从 Apache Spark 项目中提取的,后作为一个独立的项目提供。Apache Bahir 通过提供多样化的流连接器(Streaming Connectors)和 SQL 数据源扩展分析平台的覆盖面,最初只是为 Apache Spark 提供拓展。目前也为 Apache Flink 提供,后续还可能为 Apache Beam 和更多平台提供拓展服务。

我们可以在 Bahir 的首页中找到目前支持的 Flink 连接器:

Flink streaming connector for ActiveMQ

Flink streaming connector for Akka

Flink streaming connector for Flume

Flink streaming connector for InfluxDB

Flink streaming connector for Kudu

Flink streaming connector for Redis

Flink streaming connector for Netty

其中就有我们非常熟悉的 Redis,很多同学 Flink 项目中访问 Redis 的方法都是自己进行的实现,推荐使用 Bahir 连接器。

在本地单机情况下:

public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>>{ @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME"); } @Override public String getKeyFromData(Tuple2<String, String> data) { return data.f0; } @Override public String getValueFromData(Tuple2<String, String> data) { return data.f1; } } FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build(); DataStream<String> stream = ...; stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper()); 当然我们也可以使用在集群或者哨兵模式下使用 Redis 连接器。

集群模式:

FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder() .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build(); DataStream<String> stream = ...; stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper()); 哨兵模式:

FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder() .setMasterName("master").setSentinels(...).build(); DataStream<String> stream = ...; stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());

基于异步 I/O 和可查询状态

异步 I/O 和可查询状态都是 Flink 提供的非常底层的与外部系统交互的方式。

其中异步 I/O 是为了解决 Flink 在实时计算中访问外部存储产生的延迟问题,如果我们按照传统的方式使用 MapFunction,那么所有对外部系统的访问都是同步进行的。在很多情况下,计算性能受制于外部系统的响应速度,长时间进行等待,会导致整体吞吐低下。

我们可以通过继承 RichAsyncFunction 来使用异步 I/O:

/** * 实现 'AsyncFunction' 用于发送请求和设置回调 */ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> { /** 能够利用回调函数并发发送请求的数据库客户端 */ private transient DatabaseClient client; @Override public void open(Configuration parameters) throws Exception { client = new DatabaseClient(host, post, credentials); } @Override public void close() throws Exception { client.close(); } @Override public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception { // 发送异步请求,接收 future 结果 final Future<String> result = client.query(key); // 设置客户端完成请求后要执行的回调函数 // 回调函数只是简单地把结果发给 future CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { return result.get(); } catch (InterruptedException | ExecutionException e) { // 显示地处理异常 return null; } } }).thenAccept( (String dbResult) -> { resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult))); }); } } // 创建初始 DataStream DataStream<String> stream = ...; // 应用异步 I/O 转换操作 DataStream<Tuple2<String, String>> resultStream = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100); 其中,ResultFuture 的 complete 方法是异步的,不需要等待返回。

我们在之前讲解 Flink State 时,提到过 Flink 提供了 StateDesciptor 方法专门用来访问不同的 state,StateDesciptor 同时还可以通过 setQueryable 使状态变成可以查询状态。可查询状态目前是一个 Beta 功能,暂时不推荐使用。

总结

这一课时讲解了 Flink 主要支持的 Source 和 Connector,这些是我们用 Flink 访问其他系统的桥梁。本节课也为我们寻找合适的连接器指明了方向。其中最重要的 Kafka 连接器我们将会在后面的实战课时中单独讲解。

关注公众号:大数据技术派,回复资料,领取1024G资料。

有关第12讲:Flink 常用的 Source 和 Connector的更多相关文章

  1. ruby-on-rails - Rails 常用字符串(用于通知和错误信息等) - 2

    大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje

  2. ruby-on-rails - 在 Rails 3 中进行身份验证最常用的方法是什么? - 2

    我需要在rail3中使用标准注册/登录/忘记密码功能进行身份验证。是否有大多数人为此使用的插件或其他东西? 最佳答案 我不确定最常用的方法是什么-但可以肯定的是,Plataformatec的“Devise”是一个非常流行的方法:http://github.com/plataformatec/devise我已经尝试了一些authgem,对我来说,它是最简单的设置和修改以满足我的需要。它内置了密码恢复、帐户确认(如果需要)和其他一些非常方便的功能。 关于ruby-on-rails-在Rail

  3. ruby-on-rails - 如何在 Rails 中添加禁用的提交按钮 - 2

    我在ruby​​表单中有一个提交按钮f.submitbtn_text,class:"btnbtn-onemgt12mgb12",id:"btn_id"我想在不使用任何javascript的情况下通过ruby​​禁用此按钮 最佳答案 添加disabled:true选项。f.submitbtn_text,class:"btnbtn-onemgt12mgb12",id:"btn_id",disabled:true 关于ruby-on-rails-如何在Rails中添加禁用的提交按钮,我们在St

  4. ruby - 如何保持我不常用的编程语言技能 - 2

    关闭。这个问题是off-topic.它目前不接受答案。想改进这个问题吗?Updatethequestion所以它是on-topic用于堆栈溢出。关闭11年前。Improvethisquestion我不经常使用ruby​​-通常它加起来相当于每两个月或更长时间编写一次脚本。我的大部分编程都是使用C++进行的,这与ruby​​有很大不同。由于我与ruby​​之间的差距如此之大,我总是忘记语言的基本方面(比如解析文本文件和其他简单的东西)。我想每天练习一些基本的东西,我想知道是否有一些我可以订阅的网站,并且会向我发送当天的Ruby问题或类似的东西。有人知道这样的站点/Internet服务吗?

  5. ruby-on-rails - 在任何来源中都找不到 coffee-script-source-1.1.3 - 2

    您好,我正在做Rails应用程序,当我捆绑安装时它返回一个错误:无法在任何来源中找到coffee-script-source-1.1.3我知道coffee-script-source-1.1.3gem已被弃用/取消,但我的其他gem与此有依赖关系。但是这个项目正在与其他机器和heroku产品一起工作。如何在不更改我的gemfile的情况下成功捆绑安装?谢谢 最佳答案 运行这个:bundleupdate--sourcecoffee-script-source如果出现错误,如下所示:Anerroroccurredwhileinstall

  6. 深度学习12. CNN经典网络 VGG16 - 2

    深度学习12.CNN经典网络VGG16一、简介1.VGG来源2.VGG分类3.不同模型的参数数量4.3x3卷积核的好处5.关于学习率调度6.批归一化二、VGG16层分析1.层划分2.参数展开过程图解3.参数传递示例4.VGG16各层参数数量三、代码分析1.VGG16模型定义2.训练3.测试一、简介1.VGG来源VGG(VisualGeometryGroup)是一个视觉几何组在2014年提出的深度卷积神经网络架构。VGG在2014年ImageNet图像分类竞赛亚军,定位竞赛冠军;VGG网络采用连续的小卷积核(3x3)和池化层构建深度神经网络,网络深度可以达到16层或19层,其中VGG16和VGG

  7. 电脑怎么截图?进来看(8种常用截图方法) - 2

    电脑上可以截取图片吗?如果可以,该如何操作呢?相信很多小伙伴都只知道一两种截图的方式,知道的并不全面。其实,电脑上有多种方式截图的,而且非常方便。电脑怎么截图?今天我们就来教大家如何使用电脑截取图片的8种常用方式!操作环境:演示机型:Delloptiplex7050系统版本:Windows10方法一:系统自带截图具体操作:同时按下电脑的自带截图键【Windows+shift+S】,可以选择其中一种方式来截取图片:截屏有矩形截屏、任意形状截屏、窗口截屏和全屏截图。 方法二:QQ截图具体操作:在电脑登录QQ,然后同时按下【Ctrl+Alt+A】,可以任意截图你需要的界面,可以把截图的页面直接下载,

  8. ruby-on-rails - 无法构建 gem native 扩展 (mkmf (LoadError)) - Ubuntu 12.04 - 2

    这个问题在这里已经有了答案:Unabletoinstallgem-Failedtobuildgemnativeextension-cannotloadsuchfile--mkmf(LoadError)(17个答案)关闭9年前。嘿,我正在尝试在一台新的ubuntu机器上安装rails。我安装了ruby​​和rvm,但出现“无法构建gemnative扩展”错误。这是什么意思?$sudogeminstallrails-v3.2.9(没有sudo表示我没有权限)然后它会输出很多“获取”命令,最终会出现这个错误:Buildingnativeextensions.Thiscouldtakeawhi

  9. ruby - 使用 OpenSSL ruby​​ 从一个 .p12 文件中提取多个 key - 2

    我想知道如何从Apple.p12文件中提取key。根据我有限的理解,.p12文件是X504证书和私钥的组合。我看到我遇到的每个.p12文件都有一个X504证书和至少一个key,在某些情况下有两个key。这是因为每个.p12都有一个Apple开发人员key,有些还有一个额外的key(可能是Appleroot授权key)。我只考虑那些具有两个key的.p12文件是有效的。我的目标是区分具有一个key的.p12文件和具有两个key的.p12文件。到目前为止,我已经使用OpenSSL来检查X504文件和任何.p12的key。例如,我有这段代码可以检查目录中的所有.p12文件:Dir.glob(

  10. ruby - 为什么 openssl 在 windows 上产生错误但在 centos 上不产生错误:PKCS12_parse: mac verify failure (OpenSSL::PKCS12::PKCS12Error) - 2

    require'openssl'ifARGV.length==2pkcs12=OpenSSL::PKCS12.new(File.read(ARGV[0]),ARGV[1])ppkcs12.certificateelseputs"Usage:load_cert.rb"end运行它会在Windows上产生错误,但在Linux上不会。错误:OpenSSL::PKCS12::PKCS12Error:PKCS12_parse:macverifyfailurefrom(irb):21:ininitializefrom(irb):21:innewfrom(irb):21fromC:/Ruby192/

随机推荐