草庐IT

Flink系列Table API和SQL之:表和流的转换

最笨的羊羊 2025-04-24 原文

Flink系列Table API和SQL之:表和流的转换

一、表和流的转换

  • 从创建表环境开始,历经表的创建、查询转换和输出,已经可以使用Table API和SQL进行完整的流处理了。不过在应用的开发过程中,我们测试业务逻辑一般不会直接将结果直接写入到外部系统,而是在本地控制台打印输出。对于DataStream非常容易,直接调用print()方法就可以看到结果数据流的内容了。但对于Table就比较悲剧,没有提供print()方法。
  • 在Flink中可以将Table再转换成DataStream,然后进行打印输出。这就涉及了表和流的转换

二、将表(Table)转换成流(DataStream)

调用toDataStream()方法

  • 将一个Table对象转换成DataStream非常简单,只要直接调用表环境的方法toDataStream()就可以了。
Table aliceVisitTable = tableEnv.sqlQuery(
	"SELECT user,url " +
	"FROM EventTable " +
	"WHERE user = 'Alice' "
);

将表转换成数据流,这里需要将要转换的Table对象作为参数传入。

tableEnv.toDataStream(aliceVisitTable).print();

调用toChangelogStream()方法

tableEnv.createTemporaryView("clickTable",eventTable);
Table aggResult = tableEnv.sqlQuery("select user,COUNT(url) as cnt from clickTable group by user");

tableEnv.toChangelogStream(aggResult).print("agg");

三、将流转换成表

调用fromDataStream()方法

  • 想要将一个DataStream转换成表也很简单,可以通过调用表环境的fromDataStream()方法来实现,返回的就是一个Table对象。例如,可以直接将事件流eventStream转换成一个表。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

获取表环境

        //创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

读取数据源

SingleOutputStreamOperator<Event> eventStream = env.addSource(...)

将数据流转换成表

Table eventTable = tableEnv.fromDataStream(eventStream);

由于流中的数据本身就是定义好的POJO类型Event,所以我们将流转换成表之后,每一行数据就对应着一个Event,而表中的列名就对应着Event中的属性。

另外,还可以在fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置。

提取Event中的timestamp和url作为表中的列

Table eventTable2 = tableEnv.fromDataStream(eventStream,$("timestamp"),$("url"));

需要注意的是,timestamp本身是SQL中的关键字,所以我们在定义表名、列名时要尽量避免。这时可以通过表达式的as()方法对字段进行重命名。

Table eventTable2 = tableEnv.fromDataStream(eventStream,$("timestamp").as("ts"),$("url"));

调用createTemporaryView()方法

  • 调用fromDataStream()方法简单直观,可以直接实现DataStream到Table的转换。不过如果希望直接在SQL中引用这张表,就还需要调用表环境的createTemporaryView()方法来创建虚拟视图。
  • 对于这种场景,更简洁的调用方式,可以直接调用createTemporaryView()方法创建虚拟表,传入的两个参数,第一个依然是注册的表名,而第二个可以直接就是DataStream。之后可以传入多个参数,用来指定表中的字段:
tableEnv.createTemporaryView("EventTable",eventStream,$("timestamp").as("ts"),$("url"));

这样接下来就可以直接在SQL中引用表EventTable了。

调用fromChangelogStream()方法
表环境还提供了一个方法fromChangelogStream(),可以将一个更新日志流转换成表。这个方法要求流中的数据类型只能是Row,而且每一个数据都需要指定当前航的更新类型(RowKind)。所以一般是由连接器帮我们实现的。

四、支持的数据类型

  • DataStream,流中的数据类型都是定义好的POJO类。如果DataStream中的类型是简单的基本类型,还可以直接转换成表么?这就涉及了Table中支持的数据类型。
  • 整体来看,DataStream中支持的数据类型,Table中也都是支持的,只不过在进行转换时需要注意一些细节。

原子类型:

  • 在Flink中,基础数据类型(Integer、Double、String)和通用数据类型(也就是不可再拆分的数据类型)统一称做原子类型。原子类型的DataStream,转换之后就成了只有一列的Table,列字段(field)的数据类型可以由原子类型推断出。另外,还可以在fromDataStream()方法里增加参数,用来重新命名列字段。
StreamTableEnvironment tableEnv = ...;

DataStream<Long> stream = ...;

将数据流转换成动态表,动态表只有一个字段,重命名为myLong

Table table = tableEnv.fromDataStream(stream,$("myLong"));

Tuple类型

  • 当原子类型不做重命名时,默认的字段名就是"f0",容易想到,其实就是将原子类型看做了一元组Tuple1的处理结果。
  • Table支持Flink中定义的元组类型Tuple,对应在表中字段名默认就是元祖中元素的属性名f0、f1、f2…。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段还可以通过调用表达式的as()方法来进行重命名。
StreamTableEnvironment tableEnv = ...;

DataStream<Tuple2<Long,Integer>> stream = ... ;

将数据流转换成只包含f1字段的表

Table table = tableEnv.fromDataStream(stream,$("f1"));

将数据流转换成包含f0和f1字段的表,在表中f0和f1位置交换

Table table = tableEnv.fromDataStream(stream,$("f1"),$("f0"));

将f1字段命名为myInt,f0命名为myLong

Table table = tableEnv.fromDataStream(stream,$("f1").as("myInt"),$("f0").as("myLong"));

Row类型

  • Flink中还定义了一个在关系型表中更加通用的数据类型——行(Row),它是Table中数据的基本组织形式。Row类型也是一种复合类型,它的长度固定,而且无法直接推断出每个字段的类型,所以在使用时必须指明具体的类型信息。在创建Table时调用的CREATE语句就会将所有的字段名称和类型指定,这在Flink中被称为表的模式结构(Schema)。除此之外,Row类型还附加了一个属性RowKind,用来表示当前行在更新操作中的类型。这样,Row就可以用来表示更新日志流(changelog stream)中的数据,从而架起了Flink中流和表的转换桥梁。
  • 所以在更新日志流中,元素的类型必须是Row,而且需要调用ofKind(0方法来指定更新类型。下面是一个具体的例子:
DataStream<Row> dataStream = env.fromElements(
	Row.ofKind(RowKind.INSERT,"Alice",12),
	Row.ofKind(RowKind.INSERT,"Bob",5),
	Row.ofKind(RowKind.UPDATE_BEFORE,"Alice",12),
	Row.ofKind(RowKind.UPDATE_AFTER,"Alice",100)
);

将更新日志流转换为表

Table table = tableEnv.fromChangelogStream(dataStream);

有关Flink系列Table API和SQL之:表和流的转换的更多相关文章

  1. ruby-on-rails - 在 Rails 中将文件大小字符串转换为等效千字节 - 2

    我的目标是转换表单输入,例如“100兆字节”或“1GB”,并将其转换为我可以存储在数据库中的文件大小(以千字节为单位)。目前,我有这个:defquota_convert@regex=/([0-9]+)(.*)s/@sizes=%w{kilobytemegabytegigabyte}m=self.quota.match(@regex)if@sizes.include?m[2]eval("self.quota=#{m[1]}.#{m[2]}")endend这有效,但前提是输入是倍数(“gigabytes”,而不是“gigabyte”)并且由于使用了eval看起来疯狂不安全。所以,功能正常,

  2. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  3. ruby - 使用 ruby​​ 将 HTML 转换为纯文本并维护结构/格式 - 2

    我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h

  4. ruby - 将数组的内容转换为 int - 2

    我需要读入一个包含数字列表的文件。此代码读取文件并将其放入二维数组中。现在我需要获取数组中所有数字的平均值,但我需要将数组的内容更改为int。有什么想法可以将to_i方法放在哪里吗?ClassTerraindefinitializefile_name@input=IO.readlines(file_name)#readinfile@size=@input[0].to_i@land=[@size]x=1whilex 最佳答案 只需将数组映射为整数:@land边注如果你想得到一条线的平均值,你可以这样做:values=@input[x]

  5. ruby - 将散列转换为嵌套散列 - 2

    这道题是thisquestion的逆题.给定一个散列,每个键都有一个数组,例如{[:a,:b,:c]=>1,[:a,:b,:d]=>2,[:a,:e]=>3,[:f]=>4,}将其转换为嵌套哈希的最佳方法是什么{:a=>{:b=>{:c=>1,:d=>2},:e=>3,},:f=>4,} 最佳答案 这是一个迭代的解决方案,递归的解决方案留给读者作为练习:defconvert(h={})ret={}h.eachdo|k,v|node=retk[0..-2].each{|x|node[x]||={};node=node[x]}node[

  6. ruby-on-rails - 使用一系列等级计算字母等级 - 2

    这里是Ruby新手。完成一些练习后碰壁了。练习:计算一系列成绩的字母等级创建一个方法get_grade来接受测试分数数组。数组中的每个分数应介于0和100之间,其中100是最大分数。计算平均分并将字母等级作为字符串返回,即“A”、“B”、“C”、“D”、“E”或“F”。我一直返回错误:avg.rb:1:syntaxerror,unexpectedtLBRACK,expecting')'defget_grade([100,90,80])^avg.rb:1:syntaxerror,unexpected')',expecting$end这是我目前所拥有的。我想坚持使用下面的方法或.join,

  7. ruby-on-rails - Ruby url 到 html 链接转换 - 2

    我正在使用Rails构建一个简单的聊天应用程序。当用户输入url时,我希望将其输出为html链接(即“url”)。我想知道在Ruby中是否有任何库或众所周知的方法可以做到这一点。如果没有,我有一些不错的正则表达式示例代码可以使用... 最佳答案 查看auto_linkRails提供的辅助方法。这会将所有URL和电子邮件地址变成可点击的链接(htmlanchor标记)。这是文档中的代码示例。auto_link("Gotohttp://www.rubyonrails.organdsayhellotodavid@loudthinking.

  8. ruby-on-rails - 使用 ruby​​ 将多个实例变量转换为散列的更好方法? - 2

    我收到格式为的回复#我需要将其转换为哈希值(针对活跃商家)。目前我正在遍历变量并执行此操作:response.instance_variables.eachdo|r|my_hash.merge!(r.to_s.delete("@").intern=>response.instance_eval(r.to_s.delete("@")))end这有效,它将生成{:first="charlie",:last=>"kelly"},但它似乎有点hacky和不稳定。有更好的方法吗?编辑:我刚刚意识到我可以使用instance_variable_get作为该等式的第二部分,但这仍然是主要问题。

  9. 【鸿蒙应用开发系列】- 获取系统设备信息以及版本API兼容调用方式 - 2

    在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList​()Obt

  10. 阿里云RDS——产品系列概述 - 2

    基础版云数据库RDS的产品系列包括基础版、高可用版、集群版、三节点企业版,本文介绍基础版实例的相关信息。RDS基础版实例也称为单机版实例,只有单个数据库节点,计算与存储分离,性价比超高。说明RDS基础版实例只有一个数据库节点,没有备节点作为热备份,因此当该节点意外宕机或者执行重启实例、变更配置、版本升级等任务时,会出现较长时间的不可用。如果业务对数据库的可用性要求较高,不建议使用基础版实例,可选择其他系列(如高可用版),部分基础版实例也支持升级为高可用版。基础版与高可用版的对比拓扑图如下所示。优势 性能由于不提供备节点,主节点不会因为实时的数据库复制而产生额外的性能开销,因此基础版的性能相对于

随机推荐