草庐IT

Flink系列Table API和SQL之:时间属性

最笨的羊羊 2023-04-05 原文

Flink系列Table API和SQL之:时间属性

一、时间属性和窗口

  • 基于时间的操作(比如时间窗口),需要定义相关的时间语义和时间数据来源的信息。在Table API和SQL中,会给表单独提供一个逻辑上的时间字段,专门用来在表处理程序中指示时间。
  • 所谓的时间属性(time attributes),就是每个表模式结构(schema)的一部分。可以在创建表的DDL里直接定义一个字段,也可以在DataStream转换成表时定义。一旦定义了时间属性,它就可以作为一个普通字段引用,并且可以在基于时间的操作中使用。
  • 时间属性的数据类型为TIMESTAMP,它的行为类似于常规时间戳,可以直接访问并且进行计算。
  • 按照时间语义的不同,可以把时间属性的定义分成事件时间(event time)和处理时间(processing time)两种情况。

二、事件时间

  • 在实际应用中,最常用的就是事件时间。在事件时间语义下,允许表处理程序根据每个数据中包含的时间戳(也就是事件发生的时间)来生成结果。
  • 事件时间语义最大的用途就是处理乱序事件或者延迟事件的场景。通过设置水位线(watermark)来表示事件时间的进展,而水位线可以根据数据的最大事件戳设置一个延迟时间。这样即使在出现乱序的情况下,对数据的处理也可以获得正确的结果。
  • 为了处理无序时间,并区分流中的迟到事件。Flink需要从事件数据中提取时间戳,并生成水位线,用来推进事件时间的进展。
  • 事件时间属性可以在创建表DDL中定义,也可以在数据流和表的转换中定义。

1.在创建表的DDL中定义

  • 在创建表的DDL(CREATE TABLE语句)中,可以增加一个字段,通过WATERMARK语句来定义事件时间属性。WATERMARK语句主要用来定义水位线(watermark)的生成表达式,这个表达式会将带有事件时间戳到字段标记为事件时间属性,并在它基础上给出水位线的延迟时间。具体定义方式如下:
CREATE TABLE EventTable(
	user STRING,
	url STRING,
	ts TIMESTAMP(3),
	WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
...
);
  • 把ts字段定义为事件时间属性,而且基于ts设置了5秒的水位线延迟。
  • 这里的5秒是以时间间隔的形式定义的,格式是INTERVAL<数值><时间单位>
  • INTERVAL ‘5’ SECOND
  • 这里的数值必须用单引号引起来,而单位用SECOND和SECONDS是等效的。
  • Flink中支持的事件时间属性数据类型必须为TIMESTAMP或者TIMESTAMP_LTZ
  • 这里的TIMESTAMP_LTZ是指带有本地时区信息的时间戳(TIMESTAMP WITH LOCAL TIMEZONE)
  • 一般情况下如果数据中的时间戳是"年-月-日-时-分-秒"的形式,那就是不带时区信息的,可以将事件时间属性定义为TIMESTAMP类型。
  • 而如果原始的时间戳就是一个长整型的毫秒数,这时就需要另外定义一个字段来表示事件时间属性。类型定义为TIMESTAMP_LTZ会更方便。
CREATE TABLE events(
	user STRING,
	url STRING,
	ts BIGINT,
	ts_ltz AS TO_TIMESTAMP_LTZ(ts,3)
	WATERMARK FOR ts_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);
  • 另外定义了一个字段ts_ltz,是把长整型的ts转换为TIMESTAMP_LTZ得到的
  • 进而使用WATERMARK语句将它设为事件时间属性,并设置5秒的水位线延迟
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 1. 在创建表的DDL中直接定义时间属性
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
                " WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink/input/clicks.csv', " +
                " 'format' =  'csv' " +
                ")";

        tableEnv.executeSql(createDDL);

2.在数据流转换为表时定义

  • 事件时间属性也可以在将DataStream转换为表的时候来定义,调用fromDataStream()方法创建表时,可以追加参数来定义表中的字段结构。这时可以给某个字段加上.rowtime()后缀,就表示将当前字段指定为事件时间属性。这个字段可以是数据中本不存在、额外追加上去的逻辑字段,也可以是本身固有的字段,那么这个字段就会被事件时间属性所覆盖,类型也会被转换为TIMESTAMP。不论哪种方式,时间属性字段中保存的都是事件的时间戳(TIMESTAMP类型)。
  • 需要注意的是,这种方式只负责指定时间属性,而时间戳的提取和水位线的生成应该之前就在DataStream上定义好了,由于DataStream中没有时区概念,因此Flink会将事件时间属性解析成不带时区的TIMESTAMP类型,所有的时间值都被当作UTC标准时间。

在代码中定义方式如下:

方法一:

流中数据类型为二元组Tuple2,包含两个字断,需要自定义提取时间戳并生成水位线

DataStream<Tuple2<String,String>> stream = inputStream.assignTimestampsAndWatermarks(...);

声明一个额外的逻辑字段作为事件时间属性

Table table = tEnv.fromDataStream(stream,$("user"),$("url"),$("ts").rowtime());

方法二:
流中数据类型为三元组Tuple3,最后一个字段就是事件时间戳

DataStream<Tuple3<String,String,Long>> stream = inputStream.assignTimestampsAndWatermarks(...);

不再声明额外字段,直接用最后一个字段作为事件时间属性

Table table = tEnv.fromDataStream(stream,$("user"),$("url"),$("ts").rowtime());
        // 2. 在流转换成Table时定义时间属性
        SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        }));

        Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"),
                $("et").rowtime());
        clickTable.printSchema();
(
  `user` STRING,
  `url` STRING,
  `ts` BIGINT,
  `et` TIMESTAMP(3) *ROWTIME*
)

三、处理时间

相比之下,处理时间就比较简单了,就是我们的系统时间,使用时不需要提取时间戳(timestamp)和生成水位线(watermark)。因此在定义处理时间属性时,必须要额外声明一个字段,专门用来保存当前的处理时间。

类似地,处理时间属性的定义也有两种方式:创建表DDL中定义,或者在数据流转换成表时定义。

1.在创建表的DDL中定义

在创建表的DDL(CREATE TABLE语句中),可以增加一个额外的字段,通过调用系统内置的PROCTIME()函数来指定当前的处理时间属性,返回的类型是TIMESTAMP_LTZ。

CREATE TABLE EventTable(
	user STRING,
	url STRING,
	ts AS PROCTIME()
) WITH (
	...
);

这里的时间属性,其实是以计算列(computed column)的形式定义出来的。所谓的计算列是Flink SQL中引入的特殊概念,可以用一个AS语句来在表中产生数据中不存在的列,并且可以利用原有的列、各种运算符及内置函数。
在前面事件时间属性的定义中,将ts字段转换成TIMESTAMP_LTZ类型的ts_ltz,也是计算列的定义方式。

2.在数据流转换为表时定义

处理时间属性同样可以在将DataStream转换为表的时候来定义。调用fromDataStream()方法创建表时,可以用.proctime()后缀来指定处理时间属性字段。由于处理时间是系统时间,原始数据中并没有这个字段,所以处理时间属性一定不能定义在一个已有字段上,只能定义在表结构所有字段的最后,作为额外的逻辑字段出现。

代码中定义处理时间属性的方法如下:

DataStream<Tuple2<String,String>> stream = ...;

声明一个额外的字段作为处理时间属性字段

Table table = tEnv.fromDataStream(stream,$("user"),$("url"),$("ts").proctime());

有关Flink系列Table API和SQL之:时间属性的更多相关文章

  1. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  2. ruby-on-rails - 如果为空或不验证数值,则使属性默认为 0 - 2

    我希望我的UserPrice模型的属性在它们为空或不验证数值时默认为0。这些属性是tax_rate、shipping_cost和price。classCreateUserPrices8,:scale=>2t.decimal:tax_rate,:precision=>8,:scale=>2t.decimal:shipping_cost,:precision=>8,:scale=>2endendend起初,我将所有3列的:default=>0放在表格中,但我不想要这样,因为它已经填充了字段,我想使用占位符。这是我的UserPrice模型:classUserPrice回答before_val

  3. ruby-on-rails - 在混合/模块中覆盖模型的属性访问器 - 2

    我有一个包含模块的模型。我想在模块中覆盖模型的访问器方法。例如:classBlah这显然行不通。有什么想法可以实现吗? 最佳答案 您的代码看起来是正确的。我们正在毫无困难地使用这个确切的模式。如果我没记错的话,Rails使用#method_missing作为属性setter,因此您的模块将优先,阻止ActiveRecord的setter。如果您正在使用ActiveSupport::Concern(参见thisblogpost),那么您的实例方法需要进入一个特殊的模块:classBlah

  4. ruby-on-rails - 使用一系列等级计算字母等级 - 2

    这里是Ruby新手。完成一些练习后碰壁了。练习:计算一系列成绩的字母等级创建一个方法get_grade来接受测试分数数组。数组中的每个分数应介于0和100之间,其中100是最大分数。计算平均分并将字母等级作为字符串返回,即“A”、“B”、“C”、“D”、“E”或“F”。我一直返回错误:avg.rb:1:syntaxerror,unexpectedtLBRACK,expecting')'defget_grade([100,90,80])^avg.rb:1:syntaxerror,unexpected')',expecting$end这是我目前所拥有的。我想坚持使用下面的方法或.join,

  5. ruby - 多个属性的 update_column 方法 - 2

    我有一个具有一些属性的模型:attr1、attr2和attr3。我需要在不执行回调和验证的情况下更新此属性。我找到了update_column方法,但我想同时更新三个属性。我需要这样的东西:update_columns({attr1:val1,attr2:val2,attr3:val3})代替update_column(attr1,val1)update_column(attr2,val2)update_column(attr3,val3) 最佳答案 您可以使用update_columns(attr1:val1,attr2:val2

  6. ruby - Nokogiri 剥离所有属性 - 2

    我有这个html标记:我想得到这个:我如何使用Nokogiri做到这一点? 最佳答案 require'nokogiri'doc=Nokogiri::HTML('')您可以通过xpath删除所有属性:doc.xpath('//@*').remove或者,如果您需要做一些更复杂的事情,有时使用以下方法遍历所有元素会更容易:doc.traversedo|node|node.keys.eachdo|attribute|node.deleteattributeendend 关于ruby-Nokog

  7. ruby-on-rails - Rails 模型——非持久类成员或属性? - 2

    对于Rails模型,是否可以/建议让一个类的成员不持久保存到数据库中?我想将用户最后选择的类型存储在session变量中。由于我无法从我的模型中设置session变量,我想将值存储在一个“虚拟”类成员中,该成员只是将值传递回Controller。你能有这样的类(class)成员吗? 最佳答案 将非持久属性添加到Rails模型就像任何其他Ruby类一样:classUser扩展解释:在Ruby中,所有实例变量都是私有(private)的,不需要在赋值前定义。attr_accessor创建一个setter和getter方法:classUs

  8. ruby-on-rails - Ruby 检查日期时间是否为 iso8601 并保存 - 2

    我需要检查DateTime是否采用有效的ISO8601格式。喜欢:#iso8601?我检查了ruby​​是否有特定方法,但没有找到。目前我正在使用date.iso8601==date来检查这个。有什么好的方法吗?编辑解释我的环境,并改变问题的范围。因此,我的项目将使用jsapiFullCalendar,这就是我需要iso8601字符串格式的原因。我想知道更好或正确的方法是什么,以正确的格式将日期保存在数据库中,或者让ActiveRecord完成它们的工作并在我需要时间信息时对其进行操作。 最佳答案 我不太明白你的问题。我假设您想检查

  9. ruby-on-rails - 将 Ruby 中的日期/时间格式化为 YYYY-MM-DD HH :MM:SS - 2

    这个问题在这里已经有了答案:Railsformattingdate(4个答案)关闭4年前。我想格式化Time.Now函数以显示YYYY-MM-DDHH:MM:SS而不是:“2018-03-0909:47:19+0000”该函数需要放在时间中.现在功能。require‘roo’require‘roo-xls’require‘byebug’file_name=ARGV.first||“Template.xlsx”excel_file=Roo::Spreadsheet.open(“./#{file_name}“,extension::xlsx)xml=Nokogiri::XML::Build

  10. ruby - 查找字符串中的内容类型(数字、日期、时间、字符串等) - 2

    我正在尝试解析一个CSV文件并使用SQL命令自动为其创建一个表。CSV中的第一行给出了列标题。但我需要推断每个列的类型。Ruby中是否有任何函数可以找到每个字段中内容的类型。例如,CSV行:"12012","Test","1233.22","12:21:22","10/10/2009"应该产生像这样的类型['integer','string','float','time','date']谢谢! 最佳答案 require'time'defto_something(str)if(num=Integer(str)rescueFloat(s

随机推荐