草庐IT

Apache Flink——输出算子(Sink)

小波同学 2023-09-28 原文

前言

Flink 作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供
支持。我们已经了解了 Flink 程序如何对数据进行读取、转换等操作,最后一步当然就应该将结果数据保存或输出到外部系统了。

连接到外部系统

在 Flink 中,如果我们希望将数据写入外部系统,其实并不是一件难事。我们知道所有算
子都可以通过实现函数类来自定义处理逻辑,所以只要有读写客户端,与外部系统的交互在任何一个处理算子中都可以实现。例如在 MapFunction 中,我们完全可以构建一个到 Redis 的连接,然后将当前处理的结果保存到 Redis 中。如果考虑到只需建立一次连接,我们也可以利用RichMapFunction,在 open() 生命周期中做连接操作。

这样看起来很方便,却会带来很多问题。Flink 作为一个快速的分布式实时流处理系统,
对稳定性和容错性要求极高。一旦出现故障,我们应该有能力恢复之前的状态,保障处理结果的正确性。这种性质一般被称作“状态一致性”。Flink 内部提供了一致性检查点(checkpoint)来保障我们可以回滚到正确的状态;但如果我们在处理过程中任意读写外部系统,发生故障后就很难回退到从前了。

为了避免这样的问题,Flink 的 DataStream API 专门提供了向外部写入数据的方法:
addSink。与 addSource 类似,addSink 方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink 程序中所有对外的输出操作,一般都是利用 Sink 算子完成的。

Sink 一词有“下沉”的意思,有些资料会相对于“数据源”把它翻译为“数据汇”。不论
怎样理解,Sink 在 Flink 中代表了将结果数据收集起来、输出到外部的意思,所以我们这里统一把它直观地叫作“输出算子”。

之前我们一直在使用的 print 方法其实就是一种 Sink,它表示将数据流写入标准控制台打
印输出。查看源码可以发现,print 方法返回的就是一个 DataStreamSink。

@PublicEvolving
public DataStreamSink<T> print(String sinkIdentifier) {
    PrintSinkFunction<T> printFunction = new PrintSinkFunction(sinkIdentifier, false);
    return this.addSink(printFunction).name("Print to Std. Out");
}

与 Source 算子非常类似,除去一些 Flink 预实现的 Sink,一般情况下 Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。

stream.addSink(new SinkFunction(…));

addSource 的参数需要实现一个 SourceFunction 接口;类似地,addSink 方法同样需要传入一个参数,实现的是 SinkFunction 接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用:

default void invoke(IN value, Context context) throws Exception

当然,SinkFuntion 多数情况下同样并不需要我们自己实现。Flink 官方提供了一部分的框架的 Sink 连接器。

我们可以看到,像 Kafka 之类流式系统,Flink 提供了完美对接,source/sink 两端都能连
接,可读可写;而对于 Elasticsearch、文件系统(FileSystem)、JDBC 等数据存储系统,则只提供了输出写入的 sink 连接器。

除 Flink 官方之外,Apache Bahir 作为给 Spark 和 Flink 提供扩展支持的项目,也实现了一些其他第三方系统与 Flink 的连接器。

除此以外,就需要用户自定义实现 sink 连接器了。

一、输出到文件

最简单的输出方式,当然就是写入文件了。对应着读取文件作为输入数据源,Flink 本来
也有一些非常简单粗暴的输出到文件的预实现方法:如 writeAsText()、writeAsCsv(),可以直接将输出结果保存到文本文件或 Csv 文件。但我们知道,这种方式是不支持同时写入一份文件的;所以我们往往会将最后的 Sink 操作并行度设为 1,这就大大拖慢了系统效率;而且对于故障恢复后的状态一致性,也没有任何保证。所以目前这些简单的方法已经要被弃用。

Flink 为此专门提供了一个流式文件系统的连接器:StreamingFileSink,它继承自抽象类
RichSinkFunction,而且集成了 Flink 的检查点(checkpoint)机制,用来保证精确一次(exactly once)的一致性语义。

StreamingFileSink 为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink支持的文件系统。它可以保证精确一次的状态一致性,大大改进了之前流式文件 Sink 的方式。它的主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分区文件,这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作;默认的分桶方式是基于时间的,我们每小时写入一个新的桶。换句话说,每个桶内保存的文件,记录的都是 1 小时的输出数据。

StreamingFileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如 Parquet)格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink 的静态方法:

  • 行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)。

在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径
(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)。

import com.yibo.flink.datastream.Event;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author: huangyibo
 * @Date: 2022/6/25 17:48
 * @Description: flink 数据写入到文件
 */

public class SinkFile {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        List<Event> list = new ArrayList<>();
        list.add(new Event("Mary","./home",1000L));
        list.add(new Event("Bobo","./cart",2000L));
        list.add(new Event("Alice","./cart",3000L));
        list.add(new Event("Bobo","./prod?id=1",4000L));
        list.add(new Event("Bobo","./prod?id=2",4500L));

        DataStreamSource<Event> dataStream = env.fromCollection(list);

        StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path("src/main/resources/output"),
                new SimpleStringEncoder<>("UTF-8"))
                //滚动策略
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                //文件的大小, 超过1G, 开启新的文件
                                .withMaxPartSize(MemorySize.ofMebiBytes(1024 * 1024 * 1024))
                                //滚动周期
                                .withRolloverInterval(Duration.ofMinutes(15))
                                //当前不活跃的间隔数
                                .withInactivityInterval(Duration.ofMinutes(5))
                                .build()
                ).build();

        dataStream.map(Event::toString).addSink(fileSink);

        env.execute();
    }
}

这里我们创建了一个简单的文件 Sink,通过.withRollingPolicy()方法指定了一个“滚动策
略”。“滚动”的概念在日志文件的写入中经常遇到:因为文件会有内容持续不断地写入,所以我们应该给一个标准,到什么时候就开启新的文件,将之前的内容归档保存。也就是说,上面的代码设置了在以下 3 种情况下,我们就会滚动分区文件:

  • 至少包含 15 分钟的数据
  • 最近 5 分钟没有收到新的数据
  • 文件大小已达到 1 GB

二、输出到 Kafka

Kafka 是一个分布式的基于发布/订阅的消息系统,本身处理的也是流式数据,所以跟Flink“天生一对”,经常会作为 Flink 的输入数据源和输出系统。Flink 官方为 Kafka 提供了 Source和 Sink 的连接器,我们可以用它方便地从 Kafka 读写数据。如果仅仅是支持读写,那还说明不了 Kafka 和 Flink 关系的亲密;真正让它们密不可分的是,Flink 与 Kafka 的连接器提供了端到端的精确一次(exactly once)语义保证,这在实际项目中是最高级别的一致性保证。

引入kafka依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.15.0</version>
</dependency>
import com.alibaba.fastjson.JSON;
import com.yibo.flink.datastream.Event;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

/**
 * @Author: huangyibo
 * @Date: 2022/6/25 17:48
 * @Description: flink 从kafka中读取数据,并写入到 kafka
 */

public class SinkKafka {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //数据源模仿 从kafka读取数据, 数据格式为 user=Bobo, url=./cart, timestamp=2000
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.111.188:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");
        DataStreamSource<String> kafkaDataStream = env.addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties));

        //用flink进行转换处理
        kafkaDataStream.map(data -> {
            String[] fields = data.split(", ");
            Event event = new Event(fields[0],fields[1], Long.valueOf(fields[2]));
            return JSON.toJSONString(event);
        }).addSink(new FlinkKafkaProducer<String>("192.168.111.188:9092","events", new SimpleStringSchema()));

        env.execute();
    }
}

这里我们可以看到,addSink 传入的参数是一个 FlinkKafkaProducer。这也很好理解,因为需要向 Kafka 写入数据,自然应该创建一个生产者。FlinkKafkaProducer 继承了抽象类TwoPhaseCommitSinkFunction,这是一个实现了“两阶段提交”的RichSinkFunction。两阶段提交提供了 Flink 向 Kafka 写入数据的事务性保证,能够真正做到精确一次(exactly once)的状态一致性。

三、输出到 Pulsar

随着数据日益膨胀,采用事件流处理数据至关重要。Apache Flink 将批流处理统一到计算引擎中,提供了一致化的编程接口。Apache Pulsar(与 Apache BookKeeper 一起)以 "流 "的方式统一数据。在 Pulsar 中,数据存储成一个副本,以流(streaming)(通过 pub-sub 接口)和 segment(用于批处理)的方式进行访问。Pulsar 解决了企业在使用不同的存储和消息技术解决方案时遇到的数据孤岛问题。

Flink 可以直接与 Pulsar broker 进行实时的流式读写,同时 Flink 也可以批量读取 Pulsar 底层离线存储,与 BookKeeper 的内容进行批次读写。同时支持批流,使得 Pulsar 和 Flink 先天就是契合的伙伴。把 Flink 和 Pulsar 结合使用,这两种开源技术可以创建一个统一的数据架构,为实时数据驱动企业提供最佳解决方案。

为了将 Pulsar 与 Flink 的功能进行整合,为用户提供更强大的开发能力,StreamNative 开发并开源了 Pulsar Flink Connector。经过多次的打磨,Pulsar Flink Connector 已合并进 Flink 代码仓库,并在 Flink 1.14.0 及其之后版本中发布!

Pulsar Flink Connector 基于 Apache Pulsar 和 Apache Flink 提供弹性数据处理,允许 Apache Flink 读写 Apache Pulsar 中的数据。使用 Pulsar Flink Connector,企业能够更专注于业务逻辑,无需关注存储问题。

引入Pulsar连接器依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-pulsar</artifactId>
    <version>1.15.0</version>
</dependency>
import com.yibo.flink.datastream.Event;
import org.apache.flink.connector.pulsar.sink.PulsarSink;
import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.pulsar.client.impl.schema.AvroSchema;

import java.util.ArrayList;
import java.util.List;

/**
 * @Author: huangyibo
 * @Date: 2022/6/25 17:48
 * @Description: flink 数据写入到 Pulsar
 */

public class SinkPulsar {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        List<Event> list = new ArrayList<>();
        list.add(new Event("Mary","./home",1000L));
        list.add(new Event("Bobo","./cart",2000L));
        list.add(new Event("Alice","./cart",3000L));
        list.add(new Event("Bobo","./prod?id=1",4000L));
        list.add(new Event("Bobo","./prod?id=2",4500L));

        DataStreamSource<Event> dataStream = env.fromCollection(list);

        //2. 添加sink源, 用于写入数据到 pulsar
        String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
        String adminUrl = "http://192.168.23.111:8080,192.168.23.112:8080,192.168.23.113:8080";
        String topic = "persistent://my-tenant/my-ns/my-partitioned-topic";

        PulsarSink<Event> pulsarSink = PulsarSink.builder()
                .setServiceUrl(serviceUrl)
                .setAdminUrl(adminUrl)
                .setTopics(topic)
                .setSerializationSchema(PulsarSerializationSchema.pulsarSchema(AvroSchema.of(Event.class)))
                .build();

        dataStream.sinkTo(pulsarSink);

        env.execute();
    }

}

四、输出到 Redis

Redis 是一个开源的内存式的数据存储,提供了像字符串(string)、哈希表(hash)、列表(list)、集合(set)、排序集合(sorted set)、位图(bitmap)、地理索引和流(stream)等一系列常用的数据结构。因为它运行速度快、支持的数据类型丰富,在实际项目中已经成为了架构优化必不可少的一员,一般用作数据库、缓存,也可以作为消息代理。

Flink 没有直接提供官方的 Redis 连接器,不过 Bahir 项目还是担任了合格的辅助角色,为我们提供了 Flink-Redis 的连接工具。但版本升级略显滞后,目前连接器版本为 1.0,支持的Scala 版本最新到 2.11。由于我们的测试不涉及到 Scala 的相关版本变化,所以并不影响使用。在实际项目应用中,应该以匹配的组件版本运行。

导入的 Redis 连接器依赖

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>
import com.yibo.flink.datastream.Event;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

import java.util.ArrayList;
import java.util.List;

/**
 * @Author: huangyibo
 * @Date: 2022/6/25 17:48
 * @Description: flink 数据写入到Redis
 */

public class SinkRedis {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        List<Event> list = new ArrayList<>();
        list.add(new Event("Mary","./home",1000L));
        list.add(new Event("Bobo","./cart",2000L));
        list.add(new Event("Alice","./cart",3000L));
        list.add(new Event("Bobo","./prod?id=1",4000L));
        list.add(new Event("Bobo","./prod?id=2",4500L));

        DataStreamSource<Event> dataStream = env.fromCollection(list);

        // 创建一个到 redis 连接的配置
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.111.188").build();

        dataStream.addSink(new RedisSink<Event>(conf, new MyRedisMapper()));

        env.execute();
    }


    /**
     * Redis 的映射类
     * 自定义实现RedisMapper接口
     */
    public static class MyRedisMapper implements RedisMapper<Event> {
        @Override
        public String getKeyFromData(Event e) {
            return e.getUser();
        }
        @Override
        public String getValueFromData(Event e) {
            return e.getUrl();
        }
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "clicks");
        }
    }
}

这里 RedisSink 的构造方法需要传入两个参数:

  • JFlinkJedisConfigBase:Jedis 的连接配置
  • RedisMapper:Redis 映射类接口,说明怎样将数据转换成可以写入 Redis 的类型,主要就是定义一个 Redis 的映射类,实现 RedisMapper 接口。

五、输出到 Elasticsearch

ElasticSearch 是一个分布式的开源搜索和分析引擎,适用于所有类型的数据。
ElasticSearch有着简洁的 REST 风格的 API,以良好的分布式特性、速度和可扩展性而闻名,在大数据领域应用非常广泛。

Flink 为 ElasticSearch 专门提供了官方的 Sink 连接器,Flink 1.13 之后支持当前最新版本的ElasticSearch。

添加 Elasticsearch 连接器依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7</artifactId>
    <version>1.15.0</version>
</dependency>
import com.yibo.flink.datastream.Event;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
 * @Author: huangyibo
 * @Date: 2022/6/25 17:48
 * @Description: flink 数据写入到Redis
 */

public class SinkElasticSearch {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        List<Event> list = new ArrayList<>();
        list.add(new Event("Mary","./home",1000L));
        list.add(new Event("Bobo","./cart",2000L));
        list.add(new Event("Alice","./cart",3000L));
        list.add(new Event("Bobo","./prod?id=1",4000L));
        list.add(new Event("Bobo","./prod?id=2",4500L));

        DataStreamSource<Event> dataStream = env.fromCollection(list);

        ArrayList<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("192.168.111.188", 9200, "http"));

        // 创建一个 ElasticsearchSinkFunction
        ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() {
            @Override
            public void process(Event element, RuntimeContext ctx, RequestIndexer indexer) {
                HashMap<String, String> data = new HashMap<>();
                data.put(element.getUser(), element.getUrl());
                IndexRequest request = Requests.indexRequest()
                        .index("clicks")
                        // Es 6 必须定义 type, Es 7 不用定义
                        .type("type")
                        .source(data);
                indexer.add(request);
            }
        };

        dataStream.addSink(new ElasticsearchSink.Builder<Event>(httpHosts, elasticsearchSinkFunction).build());

        env.execute();
    }
}

与 RedisSink 类 似 , 连 接 器 也 为 我 们 实 现 了 写 入 到 Elasticsearch 的
SinkFunction——ElasticsearchSink。区别在于,这个类的构造方法是私有(private)的,我们需要使用 ElasticsearchSink 的 Builder 内部静态类,调用它的 build()方法才能创建出真正的SinkFunction。

而 Builder 的构造方法中又有两个参数:

  • httpHosts:连接到的 Elasticsearch 集群主机列表
  • elasticsearchSinkFunction:这并不是我们所说的 SinkFunction,而是用来说明具体处
    理逻辑、准备数据向 Elasticsearch 发送请求的函数。

具体的操作需要重写中 elasticsearchSinkFunction 中的 process 方法,我们可以将要发送的数据放在一个 HashMap 中,包装成 IndexRequest 向外部发送 HTTP 请求。

六、输出到 MySQL(JDBC)

关系型数据库有着非常好的结构化数据设计、方便的 SQL 查询,是很多企业中业务数据
存储的主要形式。MySQL 就是其中的典型代表。尽管在大数据处理中直接与 MySQL 交互的场景不多,但最终处理的计算结果是要给外部应用消费使用的,而外部应用读取的数据存储往往就是 MySQL。所以我们也需要知道如何将数据输出到 MySQL 这样的传统数据库。

添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>1.15.0</version>
</dependency>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.29</version>
</dependency>
import com.yibo.flink.datastream.Event;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

/**
 * @Author: huangyibo
 * @Date: 2022/6/25 17:48
 * @Description: flink 数据写入到Redis
 */

public class SinkMySql {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        List<Event> list = new ArrayList<>();
        list.add(new Event("Mary","./home",1000L));
        list.add(new Event("Bobo","./cart",2000L));
        list.add(new Event("Alice","./cart",3000L));
        list.add(new Event("Bobo","./prod?id=1",4000L));
        list.add(new Event("Bobo","./prod?id=2",4500L));

        DataStreamSource<Event> dataStream = env.fromCollection(list);

        dataStream.addSink(
                JdbcSink.sink("INSERT INTO clicks (user, url) VALUES (?, ?)",
                        (statement, event) -> {
                            statement.setString(1, event.getUser());
                            statement.setString(2, event.getUrl());
                        },
                        JdbcExecutionOptions.builder()
                                .withBatchSize(1000)
                                .withBatchIntervalMs(200)
                                .withMaxRetries(5)
                                .build(),
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://localhost:3306/userbehavior")
                                // 对于 MySQL 5.7,用"com.mysql.jdbc.Driver"
                                .withDriverName("com.mysql.cj.jdbc.Driver")
                                .withUsername("username")
                                .withPassword("password")
                                .build()
                )
        );

        env.execute();
    }

}

七、自定义 Sink 输出
如果我们想将数据存储到我们自己的存储设备中,而 Flink 并没有提供可以直接使用的连接器,又该怎么办呢?

与 Source 类似,Flink 为我们提供了通用的 SinkFunction 接口和对应的RichSinkDunction 抽象类,只要实现它,通过简单地调用 DataStream 的.addSink()方法就可以自定义写入任何外部存储。之前与外部系统的连接,其实都是连接器帮我们实现了 SinkFunction,现在既然没有现成的,我们就只好自力更生了。

在实现 SinkFunction 的时候,需要重写的一个关键方法 invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。

一般情况下,不要自己定义SinkFunction ,因为发生故障的时候,怎么从原先的状态恢复以及状态一致性的保证等诸多问题都特别复杂。

参考:
https://blog.csdn.net/weixin_47491957/article/details/124379542

https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/datastream/pulsar/

有关Apache Flink——输出算子(Sink)的更多相关文章

  1. ruby - 检查 "command"的输出应该包含 NilClass 的意外崩溃 - 2

    为了将Cucumber用于命令行脚本,我按照提供的说明安装了arubagem。它在我的Gemfile中,我可以验证是否安装了正确的版本并且我已经包含了require'aruba/cucumber'在'features/env.rb'中为了确保它能正常工作,我写了以下场景:@announceScenario:Testingcucumber/arubaGivenablankslateThentheoutputfrom"ls-la"shouldcontain"drw"假设事情应该失败。它确实失败了,但失败的原因是错误的:@announceScenario:Testingcucumber/ar

  2. ruby - 通过 erb 模板输出 ruby​​ 数组 - 2

    我正在使用puppet为ruby​​程序提供一组常量。我需要提供一组主机名,我的程序将对其进行迭代。在我之前使用的bash脚本中,我只是将它作为一个puppet变量hosts=>"host1,host2"我将其提供给bash脚本作为HOSTS=显然这对ruby​​不太适用——我需要它的格式hosts=["host1","host2"]自从phosts和putsmy_array.inspect提供输出["host1","host2"]我希望使用其中之一。不幸的是,我终其一生都无法弄清楚如何让它发挥作用。我尝试了以下各项:我发现某处他们指出我需要在函数调用前放置“function_”……这

  3. ruby - 如何进行排列以有效地定制输出 - 2

    这是一道面试题,我没有答对,但还是很好奇怎么解。你有N个人的大家庭,分别是1,2,3,...,N岁。你想给你的大家庭拍张照片。所有的家庭成员都排成一排。“我是家里的friend,建议家庭成员安排如下:”1岁的家庭成员坐在这一排的最左边。每两个坐在一起的家庭成员的年龄相差不得超过2岁。输入:整数N,1≤N≤55。输出:摄影师可以拍摄的照片数量。示例->输入:4,输出:4符合条件的数组:[1,2,3,4][1,2,4,3][1,3,2,4][1,3,4,2]另一个例子:输入:5输出:6符合条件的数组:[1,2,3,4,5][1,2,3,5,4][1,2,4,3,5][1,2,4,5,3][

  4. ruby - 将 spawn() 的标准输出/标准错误重定向到 Ruby 中的字符串 - 2

    我想使用spawn(针对多个并发子进程)在Ruby中执行一个外部进程,并将标准输出或标准错误收集到一个字符串中,其方式类似于使用Python的子进程Popen.communicate()可以完成的操作。我尝试将:out/:err重定向到一个新的StringIO对象,但这会生成一个ArgumentError,并且临时重新定义$stdxxx会混淆子进程的输出。 最佳答案 如果你不喜欢popen,这是我的方法:r,w=IO.pipepid=Process.spawn(command,:out=>w,:err=>[:child,:out])

  5. ruby - Ruby 是否使用 $stdout 来写入 puts 和 return 的输出? - 2

    我想知道Ruby用来在命令行打印这些东西的输出流:irb(main):001:0>a="test"=>"test"irb(main):002:0>putsatest=>nilirb(main):003:0>a=>"test"$stdout是否用于irb(main):002:0>和irb(main):003:0>?而且,在这两次调用之间,$stdout的值是否有任何变化?另外,有人能告诉我打印/写入这些内容的Ruby源代码吗? 最佳答案 是的。而且很容易向自己测试/证明。在命令行试试这个:ruby-e'puts"foo"'>test.

  6. ruby-on-rails - 无法在 Rails 助手中捕获 block 的输出 - 2

    我在使用自定义RailsFormBuilder时遇到了问题,从昨天晚上开始我就发疯了。基本上我想对我的构建器方法之一有一个可选block,以便我可以在我的主要content_tag中显示其他内容。:defform_field(method,&block)content_tag(:div,class:'field')doconcatlabel(method,"Label#{method}")concattext_field(method)capture(&block)ifblock_given?endend当我在我的一个Slim模板中调用该方法时,如下所示:=f.form_field:e

  7. ruby-on-rails - 连接字符串时如何在 <%=%> block 内输出 html_safe? - 2

    考虑一下:现在这些情况:#output:http://domain.com/?foo=1&bar=2#output:http://domain.com/?foo=1&bar=2#output:http://domain.com/?foo=1&bar=2#output:http://domain.com/?foo=1&bar=2我需要用其他字符串输出URL。我如何保证&符号不会被转义?由于我无法控制的原因,我无法发送&。求助!把我的头发拉到这里:\编辑:为了澄清,我实际上有一个像这样的数组:@images=[{:id=>"fooid",:url=>"http://

  8. ruby - 捕获 Ruby Logger 输出以进行测试 - 2

    我有一个像这样的ruby​​类:require'logger'classTdefdo_somethinglog=Logger.new(STDERR)log.info("Hereisaninfomessage")endend测试脚本行如下:#!/usr/bin/envrubygem"minitest"require'minitest/autorun'require_relative't'classTestMailProcessorClasses当我运行这个测试时,out和err都是空字符串。我看到消息打印在stderr上(在终端上)。有没有办法让Logger和capture_io一起玩得

  9. ruby - Cucumber/Savon 省略或删除日志输出 - 2

    在运行Cucumber测试时,我得到(除了测试结果)大量调试/日志相关的输出形式:D,[2013-03-06T12:21:38.911829#49031]DEBUG--:SOAPrequest:D,[2013-03-06T12:21:38.911919#49031]DEBUG--:Pragma:no-cache,SOAPAction:"",Content-Type:text/xml;charset=UTF-8,Content-Length:1592W,[2013-03-06T12:21:38.912360#49031]WARN--:HTTPIexecutesHTTPPOSTusingt

  10. ruby-on-rails - Rails 控制台的 YAML 输出 - 2

    在Rails控制台中执行类似yGrau.all的命令时,我得到这些奇怪的!binary字符串而不是属性名称。知道如何解决这个问题吗?谢谢。irb(main):003:0>yGrau.all←[1m←[36mGrauLoad(0.0ms)←[0m←[1mSELECT"graus".*FROM"gr←[1m←[35mEXPLAIN(0.0ms)←[0mEXPLAINQUERYPLANSELECT"grauEXPLAINfor:SELECT"graus".*FROM"graus"0|0|0|SCANTABLEgraus(~1000000rows)----!ruby/object:Grauat

随机推荐