JDBC 连接器允许使用 JDBC 驱动程序从任何关系数据库读取数据并将数据写入数据。 本文档介绍如何设置 JDBC 连接器以针对关系数据库运行 SQL 查询。
如果在 DDL 上定义了主键,则 JDBC sink 以 upsert 模式与外部系统交换 UPDATE/DELETE 消息,否则,它以 append 模式运行,不支持消费 UPDATE/DELETE 消息。
为了使用 JDBC 连接器,使用构建自动化工具(例如 Maven 或 SBT)的项目和带有 SQL JAR 包的 SQL 客户端都需要以下依赖项。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${SCALA_VERSION}</artifactId>
<version>${FLINK_VERSION}1.13.6</version>
</dependency>
连接到指定的数据库还需要驱动程序依赖项。 以下是当前支持的驱动程序:
| Driver | Group Id | Artifact Id |
|---|---|---|
| MySQL | mysql | mysql-connector-java |
| PostgreSQL | org.postgresql | postgresql |
| Derby | org.apache.derby | derby |
JDBC 连接器和驱动程序目前不是 Flink 二进制发行版的一部分。 点此 查看如何与它们链接以进行集群执行。
jdbc表可以这样定义:
-- register a MySQL table 'users' in Flink SQL
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users'
);
-- write data into the JDBC table from the other table "T"
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;
-- scan data from the JDBC table
SELECT id, name, age, status FROM MyUserTable;
-- temporal join the JDBC table as a dimension table
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
| connector | required | (none) | String | Specify what connector to use, here should be 'jdbc'. |
| url | required | (none) | String | The JDBC database url. |
| table-name | required | (none) | String | The name of JDBC table to connect. |
| driver | optional | (none) | String | The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL. |
| username | optional | (none) | String | The JDBC user name. 'username' and 'password' must both be specified if any of them is specified. |
| password | optional | (none) | String | The JDBC password. |
| connection.max-retry-timeout | optional | 60s | Duration | Maximum timeout between retries. The timeout should be in second granularity and shouldn’t be smaller than 1 second. |
| scan.partition.column | optional | (none) | String | The column name used for partitioning the input. See the following Partitioned Scan section for more details. |
| scan.partition.num | optional | (none) | Integer | The number of partitions. |
| scan.partition.lower-bound | optional | (none) | Integer | The smallest value of the first partition. |
| scan.partition.upper-bound | optional | (none) | Integer | The largest value of the last partition. |
| scan.fetch-size | optional | 0 | Integer | The number of rows that should be fetched from the database when reading per round trip. If the value specified is zero, then the hint is ignored. |
| scan.auto-commit | optional | true | Boolean | Sets the auto-commit flag on the JDBC driver, which determines whether each statement is committed in a transaction automatically. Some JDBC drivers, specifically Postgres, may require this to be set to false in order to stream results. |
| lookup.cache.max-rows | optional | (none) | Integer | The max number of rows of lookup cache, over this value, the oldest rows will be expired. Lookup cache is disabled by default. See the following Lookup Cache section for more details. |
| lookup.cache.ttl | optional | (none) | Duration | The max time to live for each rows in lookup cache, over this time, the oldest rows will be expired. Lookup cache is disabled by default. See the following Lookup Cache section for more details. |
| lookup.max-retries | optional | 3 | Integer | The max retry times if lookup database failed. |
| sink.buffer-flush.max-rows | optional | 100 | Integer | The max size of buffered records before flush. Can be set to zero to disable it. |
| sink.buffer-flush.interval | optional | 1s | Duration | The flush interval mills, over this time, asynchronous threads will flush data. Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
| sink.max-retries | optional | 3 | Integer | The max retry times if writing records to database failed. |
| sink.parallelism | optional | (none) | Integer | Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
Flink 在将数据写入外部数据库时使用 DDL 中定义的主键。 如果定义了主键,则连接器以 upsert 模式运行,否则,连接器以追加(append)模式运行。
在 upsert 模式下,Flink 会根据主键插入新行或更新现有行,Flink 可以通过这种方式保证幂等性。 为保证输出结果符合预期,建议为表定义主键,并确保主键是底层数据库表的唯一键集或主键之一。 在 append 模式下,Flink 会将所有记录解释为 INSERT 消息,如果底层数据库发生主键或唯一约束违规,INSERT 操作可能会失败。
有关 PRIMARY KEY 语法的更多详细信息,请参阅 CREATE TABLE DDL。
为了加速并行 Source 任务实例中的数据读取,Flink 提供了 JDBC 表的分区扫描功能。
如果指定了以下所有扫描分区选项,则必须全部指定。 它们描述了从多个任务并行读取时如何对表进行分区。 scan.partition.column 必须是相关表中的数字、日期或时间戳列。 请注意,scan.partition.lower-bound 和 scan.partition.upper-bound 用于决定分区步长和过滤表中的行。 如果是批处理作业,也可以在提交 flink 作业之前先获取最大值和最小值。
在流式计算中,维表(dim_dic)是一个很常见的概念,一般用于sql的join中,对流式数据进行数据补全,或在不复杂的模型中对事实表做轻量化的维度退化。
比如我们的source stream是来自日志的订单数据,但日志中我们只是记录了订单商品的id,却没有其他的附加信息(如sku、促销活动、优惠券信息等),但我们把订单数据存入实时数仓进行数据分析的时候,却需要同步获取sku名称、优惠券等其他的信息,这种问题我们可以在进行流处理的时候通过查询维表的方式对数据进行数据补全。
维表一般存储在外部存储中,如mysql、hbase(使用phoenix操作)、redis等等。
JDBC 连接器可以在时间连接中用作查找源(又名维度表)。目前,仅支持同步查找模式。
默认情况下,查找缓存未启用。您可以通过设置如下2个属性来启用它。
查找缓存用于提高临时连接 JDBC 连接器的性能。默认情况下,查找缓存未启用,因此所有请求都发送到外部数据库。启用查找缓存后,每个进程(即 TaskManager)都会持有一个缓存。 Flink 会先查找缓存,只有在缓存缺失时才会向外部数据库发送请求,并根据返回的行更新缓存。当缓存达到最大缓存行lookup.cache.max-rows 或行超过最大存活时间lookup.cache.ttl 时,缓存中最旧的行将过期。缓存的行可能不是最新的,用户可以将 lookup.cache.ttl 调整为较小的值以获得更快的数据更新,但这可能会增加发送到数据库的请求数量。所以这是吞吐量和正确性之间的平衡。
如果在 DDL 中定义了主键,则 JDBC 接收器将使用 upsert 语义而不是普通的 INSERT 语句。 Upsert 语义是指如果底层数据库中存在唯一约束违规,则原子地添加新行或更新现有行,这提供了幂等性。
如果出现故障,Flink 作业将从上一个成功的检查点恢复并重新处理,这可能导致恢复期间重新处理消息。 强烈建议使用 upsert 模式,因为如果需要重新处理记录,它有助于避免违反约束或重复数据。
除了故障恢复之外,随着时间的推移,源主题自然也可能包含多个具有相同主键的记录,这使得 upserts 是可取的。
由于 upsert 没有标准语法,下表描述了使用的特定于数据库的 DML。
| Database | Upsert Grammar |
|---|---|
| MySQL | INSERT … ON DUPLICATE KEY UPDATE … |
| PostgreSQL | INSERT … ON CONFLICT … DO UPDATE SET … |
Flink 支持连接多个使用方言的数据库,如 MySQL、PostgresSQL、Derby。 Derby 数据库通常用于测试目的。 从关系数据库数据类型到 Flink SQL 数据类型的字段数据类型映射如下表所示,映射表可以帮助在 Flink 中轻松定义 JDBC 表。
| MySQL 类型 | PostgreSQL 类型 | Flink SQL 类型 |
|---|---|---|
TINYINT | TINYINT | |
SMALLINT TINYINT UNSIGNED | SMALLINT INT2 SMALLSERIAL SERIAL2 | SMALLINT |
INT MEDIUMINT SMALLINT UNSIGNED | INTEGER SERIAL | INT |
BIGINT INT UNSIGNED | BIGINT BIGSERIAL | BIGINT |
BIGINT UNSIGNED | DECIMAL(20, 0) | |
BIGINT | BIGINT | BIGINT |
FLOAT | REAL FLOAT4 | FLOAT |
DOUBLE DOUBLE PRECISION | FLOAT8 DOUBLE PRECISION | DOUBLE |
NUMERIC(p, s) DECIMAL(p, s) | NUMERIC(p, s) DECIMAL(p, s) | DECIMAL(p, s) |
BOOLEAN TINYINT(1) | BOOLEAN | BOOLEAN |
DATE | DATE | DATE |
TIME [(p)] | TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) VARCHAR(n) TEXT | CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT | STRING |
BINARY VARBINARY BLOB | BYTEA | BYTES |
ARRAY | ARRAY |
更详细信息参考 这里
输入输出:
Source:Bounded
Sink:Batch & Streaming Append / Upsert Mode
LookupSource: SyncMode
Flink 与 Hive 的集成包含两个层面。
一是利用了 Hive 的 MetaStore 作为持久化的 Catalog,用户可通过 HiveCatalog 将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用 HiveCatalog 将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。
二是利用 Flink 来读写 Hive 的表。HiveCatalog 的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive 仓库。不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。
官方强烈建议用户使用 Blink planner 与 Hive 集成。
Flink 支持以下 Hive 版本:
1.0.x(1.0.0、1.0.1),1.1.x(1.1.0、1.1.1),1.2.x(1.2.0、1.2.1、1.2.2)
2.0.x(2.0.0、2.0.1),2.1.x(2.1.0、2.1.1),2.2.0,2.3.x(2.3.0、2.3.1、2.3.2、2.3.4、2.3.5、2.3.6)
3.1.x(3.1.0、3.1.1、3.1.2)
某些功能是否可用取决于您使用的 Hive 版本,这些限制不是由 Flink 所引起的:
与 Hive 集成,需要在 Flink 下的 /lib/ 目录中添加一些额外的依赖包,以便通过 Table API 或 SQL Client 与 Hive 进行交互。或者,可以将这些依赖项放在专用文件夹中,并分别使用 Table API 程序或 SQL Client 的 -C 或 -l 选项将它们添加到 classpath 中。
Apache Hive 是基于 Hadoop 之上构建的,需要设置 Hadoop 环境变量
export HADOOP_CLASSPATH=`hadoop classpath`
有两种添加 Hive 依赖项的方法。第一种是使用 Flink 提供的 Hive Jar 包。可以根据使用的 Metastore 的版本来选择对应的 Hive jar。第二个方式是分别添加每个所需的 jar 包(如果使用的 Hive 版本没有在下方列出,则这种方法会更适合)。
建议优先使用 Flink 提供的 Hive jar 包。仅在 Flink 提供的 Hive jar 不满足需求时,再考虑使用分开添加 jar 包的方式。
使用 Flink 提供的 Hive jar
下表列出了所有可用的 Hive jar。可以选择一个并放在 Flink 的 /lib/ 目录中。
| Metastore version | Maven dependency |
|---|---|
| 1.0.0 – 1.2.2 | flink-sql-connector-hive-1.2.2 |
| 2.0.0 – 2.2.0 | flink-sql-connector-hive-2.2.0 |
| 2.3.0 – 2.3.6 | flink-sql-connector-hive-2.3.6 |
| 3.0.0 – 3.1.2 | flink-sql-connector-hive-3.1.2 |
构建应用程序,则需要在 mvn 文件中添加以下依赖项。 应该在运行时添加以上的这些依赖项,而不要在已生成的 jar 文件中去包含它们(scope: provide)。
<!-- Flink Dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive{{ site.scala_version_suffix }}</artifactId>
<version>{{site.version}}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge{{ site.scala_version_suffix }}</artifactId>
<version>{{site.version}}</version>
<scope>provided</scope>
</dependency>
<!-- Hive Dependency -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
</dependency>
通过 TableEnvironment 或者 YAML 配置,连接到现有的 Hive 集群。
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir = "/opt/hive-conf";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
在 Hadoop 生态系统中,Hive Metastore 多年来已经演变成事实上的元数据中心。
对于同时使用 Hive 和 Flink 的用户,HiveCatalog 允许使用 Hive Metastore 管理 Flink 的元数据。
对于只适用 Flink 的用户,HiveCatalog 是 Flink 唯一支持的开箱即用的持久化 Catalog。
如果没有持久化 Catalog,用户在创建 Kafka Table 时需要重复在每个应用中使用 Flink Create SQL 配置元数据。HiveCatalog 实现用户只创建一次表和元对象,并在以后的会话中方便地引用和管理它们。
使用 HiveCatalog
HiveCatalog可用于处理两种类型的表:Hive兼容表和通用表。 就存储层中的元数据和数据而言,兼容 Hive 的表是以兼容 Hive 的方式存储的表。 因此,通过 Flink 创建的 Hive 兼容表可以从 Hive 端查询。
另一方面,通用表特定于 Flink。 使用 HiveCatalog 创建通用表时,只是使用 HMS 来保留元数据。虽然这些表格对 Hive 可见,但 Hive 不太可能能够理解元数据。因此,在 Hive 中使用此类表会导致未定义的行为。
下面将通过一个简单的例子,演示将 Kafa 作为数据源,并将元数据保存到 Hive metastore 中,使用 Flink SQL 直接读取 Kafka。
step1. 确保 Hive Metastore 可用
安装 Hive 环境,设置 Hive 的 Metastore 配置(hive-site.xml 文件)。使用 Hive CLI 测试
hive> show databases;
OK
default
Time taken: 0.032 seconds, Fetched: 1 row(s)
hive> show tables;
OK
Time taken: 0.028 seconds, Fetched: 0 row(s)
step2. 配置 Flink 集群和 SQL CLI
将 Hive 依赖 jar 添加到 Flink 安装目录下的 /lib 目录中。修改 SQL CLI 的 YARM 配置(…/conf/sql-cli-defaults.yaml)
execution:
planner: blink
type: streaming
...
current-catalog: myhive # set the HiveCatalog as the current catalog of the session
current-database: mydatabase
catalogs:
- name: myhive
type: hive
hive-conf-dir: /opt/hive-conf # contains hive-site.xml
step3. 确保 Kafka 集群可用
step4. 启动 SQL Client,使用 Flink SQL DDL 创建 kafka table
Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'test',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'csv',
'update-mode' = 'append'
);
[INFO] Table has been created.
Flink SQL> DESCRIBE mykafka;
root
|-- name: STRING
|-- age: INT
使用 Hive CLI 测试是否可以使用该表
hive> show tables;
OK
mykafka
Time taken: 0.038 seconds, Fetched: 1 row(s)
hive> describe formatted mykafka;
OK
# col_name data_type comment
...
# Detailed Table Information
Database: default
Table Type: MANAGED_TABLE
Table Parameters:
flink.connector.properties.bootstrap.servers localhost:9092
flink.connector.topic test
flink.connector.type kafka
flink.connector.version universal
flink.format.type csv
flink.generic.table.schema.0.data-type VARCHAR(2147483647)
flink.generic.table.schema.0.name name
flink.generic.table.schema.1.data-type INT
flink.generic.table.schema.1.name age
flink.update-mode append
is_generic true
step5. 运行 Flink SQL 查询该表
Flink SQL> select * from mykafka;
SQL Query Result (Table)
Refresh: 1 s Page: Last of 1
name age
tom 15
john 21
kitty 30
amy 24
kaiky 18
从 1.11.0 开始,在使用 Hive Dialect 时,Flink 允许用户用 Hive 语法来编写 SQL 语句。旨在改善与 Hive 的互操作性,并减少在 Flink 和 Hive 之间切换来执行不同语句的情况。
使用 Hive Dialect
Flink 目前支持两种 SQL Dialect:default 和 hive。需要先切换到 Hive 方言,然后才能使用 Hive 语法编写。下面介绍如何使用 SQL 客户端和 Table API 设置方言。可以为执行的每个语句动态切换方言。无需重新启动会话即可使用其他方言。
SQL Client
以通过 table.sql-dialect 属性指定。修改 SQL CLI 的 YARM 配置(…/conf/sql-cli-defaults.yaml)
execution:
planner: blink
type: batch
result-mode: table
configuration:
table.sql-dialect: hive
可以在 SQL 客户端启动后设置方言。
Flink SQL> set table.sql-dialect=hive; -- to use hive dialect
[INFO] Session property has been set.
Flink SQL> set table.sql-dialect=default; -- to use default dialect
[INFO] Session property has been set.
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// to use hive dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// to use default dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
以下是使用 Hive 方言的一些注意事项:
使用 HiveCatalog,Flink 可以统一 Hive 表的批处理和流处理。
Flink 支持以批处理和流处理模式从 Hive 读取数据。当作为批处理应用程序运行时,Flink 会处理的执行查询时的表数据。流式读取将持续监视表的更新,在新数据可用时以增量方式即时获取。默认情况,Flink 视为读取有界的表。
流式读取支持使用分区和非分区表。对于分区表,Flink 将监视新分区的生成,并在可用时以增量方式读取。对于非分区表,Flink 将监视文件夹中新文件的生成,并以增量方式读取新文件。
使用 SQL Hints 可以应用以下配置(无需修改 Hive Metastore 中的定义)
SELECT *
FROM hive_table
/*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;
| Key | Default | Type | Description |
|---|---|---|---|
| streaming-source.enable | false | Boolean | 是否开启流式读取 Hive 数据。默认是批处理模式。 |
| streaming-source.partition.include | all | 可选值[‘all’, ‘latest’] | 设置要读取的分区,默认读所有分区。 可选项:latest:只读取最新分区数据all:读取全量分区数据latest 只能用在 temporal join 中,用于读取最新分区作为维表。 |
| streaming-source.monitor-interval | None | Duration | 监听新分区生成的时间。 Hive streaming reading 默认间隔是1分钟。Hive streaming temporal join 默认间隔是1小时。 目前的实现是每个 TaskManager 都会查询 Hive Metastore,高频的查询可能会对 Hive Metastore 产生过大的压力。 |
| streaming-source.partition-order | partition-name | 可选值[‘partition-name’, ‘create-time’, ‘partition-time’] | 加载分区顺序 可选项:partition-name:使用默认分区名称顺序加载最新分区create-time:使用分区文件创建时间顺序partition-time:使用分区时间顺序 |
| streaming-source.consume-start-offset | None | String | 流式读取 Hive 表的起始偏移量。 取决于设置的分区顺序对于 create-time 和 partition-time,应该是时间字符串(yyyy-[m]m-[d]d [hh:mm:ss])对于 partition-time,应该是分区名称字符串(例如pt_year=2020/pt_mon=10/pt_day=01) |
需要注意的
Flink 可以从 Hive 定义的视图中读取,有以下限制:
当满足以下条件时,Flink 将自动使用列式读取优化:
默认情况下是开启的,可以通过下面的配置关闭
table.exec.hive.fallback-mapred-reader=true
默认情况下,Flink 将根据文件数和每个文件中的块数推断出 Hive Reader 的最佳并行度。
Flink 也支持灵活地配置并行推理策略。可以在 TableConfig 中配置以下参数(请注意,这些参数会影响 Job 的所有 Source):
| Key | Default | Type | Description |
|---|---|---|---|
| table.exec.hive.infer-source-parallelism | true | Boolean | 如果为 true,根据文件数和文件块推断并行度。如果为 false,则由 config 设置源的并行度。 |
| table.exec.hive.infer-source-parallelism.max | 1000 | Integer | 设置 Source 的最大推断并行度。 |
可以将 Hive 表用作时态表(Temporal Table),流可以通过时态联接(Temporal Join)关联 Hive 表。
Flink 支持处理时间时态连接 Hive 表,处理时间时态连接总是关联最新版本的时态表。
Flink 支持 Hive 分区表和非分区表的时态连接,对于分区表,Flink 支持自动跟踪 Hive 表的最新分区。
Flink 还不支持事件时间临时连接 Hive 表。
对于一个随时间变化的分区表,可以把读作一个无界流,每个分区可以作为时态表的一个版本(如果每个分区都包含一个版本的完整数据)。表的最新版本保留 Hive 表的分区数据。
Flink 支持在处理时间时态连接时自动跟踪时态表的最新分区(版本),最新分区(版本)由 streaming source.partition-order 选项定义。
此功能仅在 Flink 流处理模式下支持。
下面代码演示了一个经典的业务处理,维度表来自 Hive,每天由批处理作业更新一次,kafka 流来自实时在线业务数据或日志,需要与维度表连接以丰富流。
假设 Hive 表中的数据每天更新,每天都包含最新的完整维度数据。
SET table.sql-dialect=hive;
CREATE TABLE dimension_table (
product_id STRING,
product_name STRING,
unit_price DECIMAL(10, 4),
pv_count BIGINT,
like_count BIGINT,
comment_count BIGINT,
update_time TIMESTAMP(3),
update_user STRING,
...
) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES (
-- 使用默认分区名顺序(partition-name),每12小时加载一次最新的分区(推荐用法)
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '12 h',
'streaming-source.partition-order' = 'partition-name',
-- 使用分区文件创建时间顺序,每12小时加载一次最新的分区
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.partition-order' = 'create-time',
'streaming-source.monitor-interval' = '12 h'
-- 使用分区时间顺序,每12小时加载一次最新的分区
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '12 h',
'streaming-source.partition-order' = 'partition-time',
'partition.time-extractor.kind' = 'default',
'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00'
);
创建 Kafka Table
SET table.sql-dialect=default;
CREATE TABLE orders_table (
order_id STRING,
order_amount DOUBLE,
product_id STRING,
log_ts TIMESTAMP(3),
proctime as PROCTIME()
) WITH (...);
实现 Temporal Join 查询丰富流
SELECT * FROM orders_table AS o
JOIN dimension_table FOR SYSTEM_TIME AS OF o.proctime AS dim
ON o.product_id = dim.product_id;
对于 Hive 表,可以把读作一个有界流。在这种情况下,Hive 表只能在查询时跟踪其最新版本。表的最新版本保留 Hive 表的所有数据。
在对最新的 Hive 表执行时态联接时,Hive 表将被缓存在 Slot 内存中,并且流中的每条记录都与表联接进行匹配项。可以使用以下属性配置 Hive 表缓存的 TTL。缓存过期后,将再次扫描 Hive 表以加载最新数据。
| Key | Default | Type | Description |
|---|---|---|---|
| lookup.join.cache.ttl | 60 min | Duration | 缓存生存时间,默认60分钟。 配置只有再查找有界 Hive Table 时有效。 |
下面的演示将 hive 表的所有数据作为时态表加载。
-- 假设 Hive 表中的数据被批处理以 overwrite 的形式生成。
SET table.sql-dialect=hive;
CREATE TABLE dimension_table (
product_id STRING,
product_name STRING,
unit_price DECIMAL(10, 4),
pv_count BIGINT,
like_count BIGINT,
comment_count BIGINT,
update_time TIMESTAMP(3),
update_user STRING,
...
) TBLPROPERTIES (
'streaming-source.enable' = 'false',
'streaming-source.partition.include' = 'all',
'lookup.join.cache.ttl' = '12 h'
);
SELECT * FROM orders_table AS o
JOIN dimension_table FOR SYSTEM_TIME AS OF o.proctime AS dim
ON o.product_id = dim.product_id;
需要注意:
Flink 支持以批处理和流处理模式向 Hive 写入数据。
当作为批处理运行时,Flink 将只在作业完成时才向 Hive 表写入这些记录。既支持 Append,也支持 Overwrite。
# ------ Append 追加数据
Flink SQL> INSERT INTO mytable SELECT 'Tom', 25;
# ------ Overwrite 覆盖数据
Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25;
数据也可以插入到特定的分区中。
# ------ 插入静态分区
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08')
SELECT 'Tom', 25;
# ------ 插入动态分区
Flink SQL> INSERT OVERWRITE myparttable
SELECT 'Tom', 25, 'type_1', '2019-08-08';
# ------
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1')
SELECT 'Tom', 25, '2019-08-08';
流式写入不断以增量方式向 Hive 添加新数据,提交使其可见。用户通过多个属性控制何时/如何触发提交。流式写入不支持覆盖插入。
下面的示例演示将 Kafka 中的数据写入有分区的 Hive 表,并运行批处理查询将数据读出。
SET table.sql-dialect=hive;
CREATE TABLE hive_table (
user_id STRING,
order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
SET table.sql-dialect=default;
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3),
-- Watermark 定义在 TIMESTAMP 类型字段上
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (...);
-- 查询 kafka 插入 Hive
INSERT INTO TABLE hive_table
SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;
-- 查询 Hive table
SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';
如果 Watermark 定义在 TIMESTAMP_LTZ 类型字段上,sink.partition-commit.watermark-time-zone 需要配置时区(如果使用 partition-time 作为提交策略)
SET table.sql-dialect=hive;
CREATE TABLE hive_table (
user_id STRING,
order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 h',
-- 配置时区
'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -
'sink.partition-commit.policy.kind'='metastore,success-file'
);
SET table.sql-dialect=default;
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
ts BIGINT,
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND
) WITH (...);
Flink 支持 Hive 使用以下数据格式:Text、CSV、SequenceFile、ORC、Parquet
HiveModule 提供了 Hive 可以使用的内置函数
tableEnv.loadModue("myhive", new HiveModule("2.3.4"));
用户可以在 Flink 中使用 Hive UDF,支持的 UDF 类型包括:
Hive 的 UDF 和 GenericUDF 自动转换为 Flink 的 ScalarFunction,Hive 的 GenericUDTF 自动转换为 Flink 的 TableFunction,Hive 的 UDAF 和 GenericUDAFResolver2 转换为 Flink 的 AggregateFunction。
要使用 Hive UDF,用户必须:
创建一个 UDF,使用时注册名 “myudf”
public class TestHiveSimpleUDF extends UDF {
public IntWritable evaluate(IntWritable i) {
return new IntWritable(i.get());
}
public Text evaluate(Text text) {
return new Text(text.toString());
}
}
创建一个 Generic UDF,使用时注册名 “mygenericudf”
public class TestHiveGenericUDF extends GenericUDF {
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
// ...
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
return arguments[0].get();
}
@Override
public String getDisplayString(String[] children) {
return "TestHiveGenericUDF";
}
}
创建一个 Generic UDTF,使用时注册名 “mygenericudtf”
public class TestHiveUDTF extends GenericUDTF {
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
// ...
}
@Override
public void process(Object[] args) throws HiveException {
String str = (String) args[0];
for (String s : str.split(",")) {
forward(s);
forward(s);
}
}
@Override
public void close() {
}
}
在 Hive CLI 中查看
hive> show functions;
OK
mygenericudf
myudf
myudtf
在 Flink SQL 中使用
select
mygenericudf(myudf(name), 1) as a,
mygenericudf(myudf(age), 1) as b,
s
from mysourcetable,
lateral table(myudtf(name, 1)) as T(s);
参考列表:
总的来说,我对ruby还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
作为我的Rails应用程序的一部分,我编写了一个小导入程序,它从我们的LDAP系统中吸取数据并将其塞入一个用户表中。不幸的是,与LDAP相关的代码在遍历我们的32K用户时泄漏了大量内存,我一直无法弄清楚如何解决这个问题。这个问题似乎在某种程度上与LDAP库有关,因为当我删除对LDAP内容的调用时,内存使用情况会很好地稳定下来。此外,不断增加的对象是Net::BER::BerIdentifiedString和Net::BER::BerIdentifiedArray,它们都是LDAP库的一部分。当我运行导入时,内存使用量最终达到超过1GB的峰值。如果问题存在,我需要找到一些方法来更正我的代
Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题
我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何
我在我的项目中添加了一个系统来重置用户密码并通过电子邮件将密码发送给他,以防他忘记密码。昨天它运行良好(当我实现它时)。当我今天尝试启动服务器时,出现以下错误。=>BootingWEBrick=>Rails3.2.1applicationstartingindevelopmentonhttp://0.0.0.0:3000=>Callwith-dtodetach=>Ctrl-CtoshutdownserverExiting/Users/vinayshenoy/.rvm/gems/ruby-1.9.3-p0/gems/actionmailer-3.2.1/lib/action_mailer
刚入门rails,开始慢慢理解。有人可以解释或给我一些关于在application_controller中编码的好处或时间和原因的想法吗?有哪些用例。您如何为Rails应用程序使用应用程序Controller?我不想在那里放太多代码,因为据我了解,每个请求都会调用此Controller。这是真的? 最佳答案 ApplicationController实际上是您应用程序中的每个其他Controller都将从中继承的类(尽管这不是强制性的)。我同意不要用太多代码弄乱它并保持干净整洁的态度,尽管在某些情况下ApplicationContr
我想向我的Controller传递一个参数,它是一个简单的复选框,但我不知道如何在模型的form_for中引入它,这是我的观点:{:id=>'go_finance'}do|f|%>Transferirde:para:Entrada:"input",:placeholder=>"Quantofoiganho?"%>Saída:"output",:placeholder=>"Quantofoigasto?"%>Nota:我想做一个额外的复选框,但我该怎么做,模型中没有一个对象,而是一个要检查的对象,以便在Controller中创建一个ifelse,如果没有检查,请帮助我,非常感谢,谢谢
我注意到像bundler这样的项目在每个specfile中执行requirespec_helper我还注意到rspec使用选项--require,它允许您在引导rspec时要求一个文件。您还可以将其添加到.rspec文件中,因此只要您运行不带参数的rspec就会添加它。使用上述方法有什么缺点可以解释为什么像bundler这样的项目选择在每个规范文件中都需要spec_helper吗? 最佳答案 我不在Bundler上工作,所以我不能直接谈论他们的做法。并非所有项目都checkin.rspec文件。原因是这个文件,通常按照当前的惯例,只
我正在使用active_admin,我在Rails3应用程序的应用程序中有一个目录管理,其中包含模型和页面的声明。时不时地我也有一个类,当那个类有一个常量时,就像这样:classFooBAR="bar"end然后,我在每个必须在我的Rails应用程序中重新加载一些代码的请求中收到此警告:/Users/pupeno/helloworld/app/admin/billing.rb:12:warning:alreadyinitializedconstantBAR知道发生了什么以及如何避免这些警告吗? 最佳答案 在纯Ruby中:classA