草庐IT

Flink 自定义数据源Connector

Yaphets丶混世大魔王 2023-04-22 原文

概述

首先我们先来看一下自定义数据源,Flink系统提供的一些功能

我们可以从下面这个图看出来数据源的source和sink类的集成关系

当我们要实现自定义数据源的时候,我们需要先实现DynamicTableSourceFactory, DynamicTableSinkFactory这两个工厂类,在工厂类里面去实现参数定义和数据源的创建,然后再数据源DynamicTableSource和DynamicTableSink里面去初始化数据源的一些信息,最终在source类型的数据源的ScanRuntimeProvider或者LookupTableSource或者sink类型的数据源的SinkRuntimeProvider实现类里面去实现具体功能。

具体实现

第一步 实现工厂类

Dynamic Table Factories

动态表工厂用于根据目录和会话信息为外部存储系统配置动态表连接器。

org.apache.flink.table.factories.DynamicTableSourceFactory可以实现构造一个DynamicTableSource.

org.apache.flink.table.factories.DynamicTableSinkFactory可以实现构造一个DynamicTableSink.

默认情况下,使用connector选项的值作为工厂标识符和 Java 的服务提供者接口来发现工厂。

在 JAR 文件中,可以将对新实现的引用添加到服务文件中:

需要在resource目录下新建META-INF/services/目录,并且新建org.apache.flink.table.factories.Factory文件

META-INF/services/org.apache.flink.table.factories.Factory

然后在文件里面添加实现类的路径:com.flink.sql.connector.http.table.HttpDynamicTableFactory,只需要添加这个类路径就行,其他的不需要

如果没有这个文件个文件里面的内容,flink系统识别不到我们自定义的数据源信息

类文件的内容如下:HttpDynamicTableFactory.class

package com.flink.sql.connector.http.table;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;

// 我这里是同时实现了source和sink,也可以单独去实现,写在一起可以减少很多重复代码
public class HttpDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

/**
    首先定义一些数据源的参数信息,就是连接器的所有参数都需要先定义,这样才能在SQL里面去使用
*/
    public static final String IDENTIFIER = "http";
    public static final ConfigOption<String> URL = ConfigOptions.key("url")
            .stringType().noDefaultValue().withDescription("the jdbc database url.");
    public static final ConfigOption<String> HEADERS = ConfigOptions.key("headers")
            .stringType().noDefaultValue().withDescription("the http header.");
    private static final ConfigOption<String> BODY = ConfigOptions.key("body")
            .stringType().noDefaultValue().withDescription("the http body params.");
    private static final ConfigOption<String> TYPE = ConfigOptions.key("type")
            .stringType().noDefaultValue().withDescription("the http type.");
    private static final ConfigOption<String> FORMAT = ConfigOptions.key("format")
            .stringType().noDefaultValue().withDescription("the http type.");

    public HttpDynamicTableFactory() {
    }

// 构造source类型的数据源对象
    public DynamicTableSource createDynamicTableSource(Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        ReadableConfig config = helper.getOptions();
        helper.validate();
        this.validateConfigOptions(config);
        HttpSourceInfo httpSourceInfo = this.getHttpSource(config);
        // discover a suitable decoding format
        final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
                DeserializationFormatFactory.class,
                FactoryUtil.FORMAT);

        // derive the produced data type (excluding computed columns) from the catalog table
        final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();


        return new HttpDynamicTableSource(httpSourceInfo, decodingFormat, producedDataType);
    }

// 构造sink类型的数据源对象
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        ReadableConfig config = helper.getOptions();
        helper.validate();
        this.validateConfigOptions(config);
        HttpSourceInfo httpSourceInfo = this.getHttpSource(config);
        // discover a suitable encoding format
        final EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat(
                SerializationFormatFactory.class,
                FactoryUtil.FORMAT);

        // derive the produced data type (excluding computed columns) from the catalog table
        final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
        TableSchema tableSchema = context.getCatalogTable().getSchema();
        return new HttpDynamicTableSink(httpSourceInfo, encodingFormat, producedDataType, tableSchema);
    }

// 获取自定义的HTTP连接器的参数对象,主要用来存HTTP链接的一些参数,后面用来构造HTTP请求使用
    private HttpSourceInfo getHttpSource(ReadableConfig readableConfig) {
        String url = readableConfig.get(URL);
        String headers = readableConfig.get(HEADERS);
        String body = readableConfig.get(BODY);
        String type = readableConfig.get(TYPE);
        return new HttpSourceInfo(url,type, headers, body);
    }

// 返回数据源的type字符串,标识这是一个什么数据源
    public String factoryIdentifier() {
        return IDENTIFIER;
    }

// 必填参数需要在这个方法里面去添加
    public Set<ConfigOption<?>> requiredOptions() {
        Set<ConfigOption<?>> requiredOptions = new HashSet();
        requiredOptions.add(URL);
        requiredOptions.add(TYPE);
        return requiredOptions;
    }
// 非必填参数需要在这个方法里面去添加
    public Set<ConfigOption<?>> optionalOptions() {
        Set<ConfigOption<?>> optionalOptions = new HashSet();
        optionalOptions.add(HEADERS);
        optionalOptions.add(BODY);
        optionalOptions.add(FORMAT);
        return optionalOptions;
    }

// 参数校验,根据实际情况去实现需要校验哪些参数,比如有些参数有格式校验可以在这里实现,没有可以不实现
    private void validateConfigOptions(ReadableConfig config) {
        String url = config.get(URL);
        Optional<String> urlOp = Optional.of(url);
        Preconditions.checkState(urlOp.isPresent(), "Cannot handle such http url: " + url);
        String type = config.get(TYPE);
        if ("POST".equalsIgnoreCase(type)) {
            String body = config.get(BODY);
            Optional<String> bodyOp = Optional.of(body);
            Preconditions.checkState(bodyOp.isPresent(), "Cannot handle such http post body: " + bodyOp);
        }
    }

}

第二步:实现数据源类的具体方法

Dynamic Table Source

根据定义,动态表可以随时间变化。

在读取动态表时,内容可以被认为是:

  • 一个更改日志(有限或无限),所有更改都会持续使用,直到更改日志用完。这由ScanTableSource接口表示。
  • 一个不断变化的或非常大的外部表,其内容通常不会被完全读取,而是在必要时查询单个值。这由LookupTableSource 接口表示。

一个类可以同时实现这两个接口。规划器根据指定的查询决定它们的使用。

扫描表源 ScanTableSource

ScanTableSource在运行时扫描来自外部存储系统的所有行。

扫描的行不必只包含插入,还可以包含更新和删除。因此,表源可用于读取(有限或无限)变更日志。返回的更改日志模式指示计划程序在运行时可以预期的一组更改。

对于常规的批处理场景,源可以发出有限的仅插入行流。

对于常规流式处理方案,源可以发出无限制的仅插入行流。

对于变更数据捕获 (CDC) 方案,源可以发出带有插入、更新和删除行的有界或无界流。

查找表源 LookupTableSource

LookupTableSource在运行时通过一个或多个键查找外部存储系统的行。

ScanTableSource相比,源不必读取整个表,并且可以在必要时从(可能不断变化的)外部表中懒惰地获取单个值。

ScanTableSource相比, LookupTableSource当前仅支持发出仅插入更改。

我们的例子是HTTP类型的数据源,所以采用全表扫描获取的类型去实现。

代码实现类:HttpDynamicTableSource.class

package com.flink.sql.connector.http.table;

import com.flink.sql.connector.http.HttpSourceFunction;
import com.flink.sql.connector.http.HttpSourceInfo;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

import java.util.Objects;

public class HttpDynamicTableSource implements ScanTableSource {

    private HttpSourceInfo httpSourceInfo;

    private DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
    private DataType producedDataType;

    public HttpDynamicTableSource() {

    }

    public HttpDynamicTableSource(HttpSourceInfo httpSourceInfo,
                                  DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
                                  DataType producedDataType) {
        this.httpSourceInfo = httpSourceInfo;
        this.decodingFormat = decodingFormat;
        this.producedDataType = producedDataType;
    }

    @Override
    public ChangelogMode getChangelogMode() {
        return decodingFormat.getChangelogMode();
    }

// 最主要的方法,就是实现这个接口方法,在方法里面返回具体的实现逻辑类的构造
    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {

        final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
                scanContext,
                producedDataType);
        HttpSourceFunction.Builder builder = HttpSourceFunction.builder()
                .setUrl(httpSourceInfo.getUrl()).setBody(httpSourceInfo.getBody())
                .setType(httpSourceInfo.getType()).setDeserializer(deserializer);

        return SourceFunctionProvider.of(builder.build(), true);
    }

    @Override
    public DynamicTableSource copy() {
        return new HttpDynamicTableSource(this.httpSourceInfo, this.decodingFormat, this.producedDataType);
    }

    @Override
    public String asSummaryString() {
        return "HTTP Table Source";
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        } else if (!(o instanceof HttpDynamicTableSource)) {
            return false;
        } else {
            HttpDynamicTableSource that = (HttpDynamicTableSource) o;
            return Objects.equals(this.httpSourceInfo, that.httpSourceInfo)
                    && Objects.equals(this.decodingFormat, that.decodingFormat)
                    && Objects.equals(this.producedDataType, that.producedDataType);
        }
    }

    public int hashCode() {
        return Objects.hash(new Object[]{this.httpSourceInfo, this.decodingFormat, this.producedDataType});
    }
}

代码实现类:HttpSourceFunction.class

package com.flink.sql.connector.http;

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.RowData;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class HttpSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {

    private String url;
    private String body;
    private String type;
    private String result;
    private static final String HTTP_POST = "POST";
    private static final String HTTP_GET = "GET";
    private transient boolean hasNext;
    private DeserializationSchema<RowData> deserializer;

    public HttpSourceFunction() {

    }

    public HttpSourceFunction(String url, String body, String type, DeserializationSchema<RowData> deserializer) {
        this.url = url;
        this.body = body;
        this.type = type;
        this.deserializer = deserializer;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        deserializer.open(() -> getRuntimeContext().getMetricGroup());
    }

    @Override
    public void close() throws IOException {

    }

    @Override
    public TypeInformation<RowData> getProducedType() {
        return deserializer.getProducedType();
    }

    public static HttpSourceFunction.Builder builder() {
        return new HttpSourceFunction.Builder();
    }

// 重点关注地方,在run方法里面去实现具体调用HTTP请求获取数据的逻辑。我这里是实现了一个工具类去调用HTTP的各种POST或者GET请求,大家可以自己去实现,功能就是执行请求获取数据,具体代码就不贴了。
    @Override
    public void run(SourceContext<RowData> sourceContext) throws Exception {

        // open and consume from socket
        try {
            String response = "{}";
            if (StringUtils.isNotBlank(type) && HTTP_GET.equals(type.toUpperCase())) {
                response = DtHttpClient.get(url);
            } else {
                response = DtHttpClient.post(url, body);
            }
            JSONObject jsonObject = JSONObject.parseObject(response);
            this.result = jsonObject.getString("response");
            sourceContext.collect(deserializer.deserialize(result.getBytes(StandardCharsets.UTF_8)));
        } catch (Throwable t) {
            t.printStackTrace(); // print and continue
        }
    }

    @Override
    public void cancel() {

    }

    public static class Builder {
        private String url;
        private String body;
        private String type;
        private DeserializationSchema<RowData> deserializer;

        public Builder () {

        }

        public Builder setUrl(String url) {
            this.url = url;
            return this;
        }

        public Builder setBody(String body) {
            this.body = body;
            return this;
        }

        public Builder setType(String type) {
            this.type = type;
            return this;
        }

        public Builder setDeserializer(DeserializationSchema<RowData> deserializer) {
            this.deserializer = deserializer;
            return this;
        }

        public HttpSourceFunction build() {
            if (StringUtils.isBlank(url) || StringUtils.isBlank(body) || StringUtils.isBlank(type)) {
                throw new IllegalArgumentException("params has null");
            }
            return new HttpSourceFunction(this.url, this.body, this.type, this.deserializer);
        }

    }
}

Dynamic Table Sink

根据定义,动态表可以随时间变化。

在编写动态表时,可以始终将内容视为更改日志(有限或无限),其中所有更改都被连续写出,直到更改日志用完为止。返回的更改日志模式 指示接收器在运行时接受的更改集。

对于常规批处理场景,接收器可以仅接受仅插入行并写出有界流。

对于常规的流式处理方案,接收器只能接受仅插入行,并且可以写出无界流。

对于变更数据捕获 (CDC) 场景,接收器可以使用插入、更新和删除行写出有界或无界流。

代码实现类:HttpDynamicTableSink.class

package com.flink.sql.connector.http.table;

import com.flink.sql.connector.http.HttpSinkFunction;
import com.flink.sql.connector.http.HttpSourceInfo;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

import java.util.Objects;

public class HttpDynamicTableSink implements DynamicTableSink {

    private HttpSourceInfo httpSourceInfo;
    private EncodingFormat<SerializationSchema<RowData>> encodingFormat;
    private DataType producedDataType;
    private TableSchema tableSchema;

    public HttpDynamicTableSink() {

    }

    public HttpDynamicTableSink(HttpSourceInfo httpSourceInfo,
                                EncodingFormat<SerializationSchema<RowData>> encodingFormat,
                                DataType producedDataType,
                                TableSchema tableSchema) {
        this.httpSourceInfo = httpSourceInfo;
        this.encodingFormat = encodingFormat;
        this.producedDataType = producedDataType;
        this.tableSchema = tableSchema;
    }

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
        return changelogMode;
    }

// 重点关注地方,主要通过这个方法去构造输出数据源的实现类
    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        final SerializationSchema<RowData> deserializer = encodingFormat.createRuntimeEncoder(
                context,
                producedDataType);
        HttpSinkFunction httpSinkFunction =
                HttpSinkFunction.builder().setUrl(httpSourceInfo.getUrl())
                        .setBody(httpSourceInfo.getBody()).setDeserializer(deserializer)
                        .setType(httpSourceInfo.getType()).setFields(tableSchema.getFieldNames())
                        .setDataTypes(tableSchema.getFieldDataTypes()).build();
        return SinkFunctionProvider.of(httpSinkFunction);
    }

    @Override
    public DynamicTableSink copy() {
        return new HttpDynamicTableSink(this.httpSourceInfo, this.encodingFormat,
                this.producedDataType, this.tableSchema);
    }

    @Override
    public String asSummaryString() {
        return "HTTP Table Sink";
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        } else if (!(o instanceof HttpDynamicTableSink)) {
            return false;
        } else {
            HttpDynamicTableSink that = (HttpDynamicTableSink) o;
            return Objects.equals(this.httpSourceInfo, that.httpSourceInfo)
                    && Objects.equals(this.encodingFormat, that.encodingFormat)
                    && Objects.equals(this.producedDataType, that.producedDataType);
        }
    }

    public int hashCode() {
        return Objects.hash(new Object[]{this.httpSourceInfo, this.encodingFormat,
                this.producedDataType, this.tableSchema});
    }
}

代码实现类:HttpSinkFunction.class

package com.flink.sql.connector.http;

import com.flink.sql.util.PluginUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;

import java.io.IOException;
import java.sql.Date;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.util.HashMap;

public class HttpSinkFunction extends RichSinkFunction<RowData> {

    private String url;
    private String body;
    private String type;
    private SerializationSchema<RowData> serializer;
    private String[] fields;
    private DataType[] dataTypes;
    private final SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd");
    private final SimpleDateFormat dateTimeFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private static final long serialVersionUID = 1L;

    public HttpSinkFunction() {

    }

    public HttpSinkFunction(String url, String body, String type,
                            SerializationSchema<RowData> serializer,
                            String[] fields,
                            DataType[] dataTypes) {
        this.url = url;
        this.body = body;
        this.type = type;
        this.serializer = serializer;
        this.fields = fields;
        this.dataTypes = dataTypes;
    }

// 重点关注,这个invoke方法实现对数据的写出,参数RowData value就是需要输出的数据,这个对象里面具体有多少数据是不确定的,因为默认是流式输出,如果需要考虑性能问题(并且对于实时性没有太高要求),可以自定义实现批量输出,先把这个里面的数据缓存起来,然后当一定时间,或者数据量达到一定阈值的时候再去调研接口输出数据。
    @Override
    public void invoke(RowData value, Context context) throws Exception {
        Object[] objValue =  transform(value);
        HashMap<String, Object> map = new HashMap<>();
        for (int i = 0; i < fields.length; i++) {
            map.put(fields[i], objValue[i]);
        }
        String body = PluginUtil.objectToString(map);
        DtHttpClient.post(url, body);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        serializer.open(() -> getRuntimeContext().getMetricGroup());
    }

    @Override
    public void close() throws IOException {

    }

    public static HttpSinkFunction.Builder builder() {
        return new HttpSinkFunction.Builder();
    }

    public static class Builder {
        private String url;
        private String body;
        private String type;
        private SerializationSchema<RowData> serializer;
        private String[] fields;
        private DataType[] dataTypes;

        public Builder () {

        }

        public Builder setUrl(String url) {
            this.url = url;
            return this;
        }

        public Builder setFields(String[] fields) {
            this.fields = fields;
            return this;
        }

        public Builder setBody(String body) {
            this.body = body;
            return this;
        }

        public Builder setType(String type) {
            this.type = type;
            return this;
        }

        public Builder setDataTypes(DataType[] dataTypes) {
            this.dataTypes = dataTypes;
            return this;
        }

        public Builder setDeserializer(SerializationSchema<RowData> serializer) {
            this.serializer = serializer;
            return this;
        }

        public HttpSinkFunction build() {
            if (StringUtils.isBlank(url) || StringUtils.isBlank(body) || StringUtils.isBlank(type)) {
                throw new IllegalArgumentException("params has null");
            }
            return new HttpSinkFunction(this.url, this.body, this.type, this.serializer,
                    this.fields, this.dataTypes);
        }

    }


// 这个方法是用来把RowData对象转换为HTTP接口能够识别的JSON对象的,因为默认HTTP接口不能识别这种复杂对象并且转换为我们常用的JSON对象,所以需要我们自己去解析。当然直接把这个对象丢给HTTP也是可以的,那么就需要在接收方去解析RowData对象,但是默认来说,肯定解析为更通用的类型最合适
    public Object[] transform(RowData record) {
        Object[] values = new Object[dataTypes.length];
        int idx = 0;
        int var6 = dataTypes.length;

        for (int i = 0; i < var6; ++i) {
            DataType dataType = dataTypes[i];
            values[idx] = this.typeConvertion(dataType.getLogicalType(), record, idx);
            ++idx;
        }

        return values;
    }

    private Object typeConvertion(LogicalType type, RowData record, int pos) {
        if (record.isNullAt(pos)) {
            return null;
        } else {
            switch (type.getTypeRoot()) {
                case BOOLEAN:
                    return record.getBoolean(pos) ? 1L : 0L;
                case TINYINT:
                    return record.getByte(pos);
                case SMALLINT:
                    return record.getShort(pos);
                case INTEGER:
                    return record.getInt(pos);
                case BIGINT:
                    return record.getLong(pos);
                case FLOAT:
                    return record.getFloat(pos);
                case DOUBLE:
                    return record.getDouble(pos);
                case CHAR:
                case VARCHAR:
                    return record.getString(pos).toString();
                case DATE:
                    return this.dateFormatter.format(Date.valueOf(LocalDate.ofEpochDay(record.getInt(pos))));
                case TIMESTAMP_WITHOUT_TIME_ZONE:
                    int timestampPrecision = ((TimestampType) type).getPrecision();
                    return this.dateTimeFormatter.format(new Date(record.getTimestamp(pos, timestampPrecision)
                            .toTimestamp().getTime()));
                case DECIMAL:
                    int decimalPrecision = ((DecimalType) type).getPrecision();
                    int decimalScale = ((DecimalType) type).getScale();
                    return record.getDecimal(pos, decimalPrecision, decimalScale).toBigDecimal();
                default:
                    throw new UnsupportedOperationException("Unsupported type:" + type);
            }
        }
    }
}

最后在FlinkSQL里面使用:

CREATE TABLE source_table(
    id BIGINT,
    name STRING,
    descs STRING,
    valuess DOUBLE
   )
    WITH(
        'connector' ='http',
        'url' ='http//10.36.248.26:8080/test_flink?username=test',
        'type' ='GET'
        );

CREATE TABLE sink_table(
   id BIGINT,
   name STRING,
   descs STRING,
   valuess DOUBLE )
    WITH(
        'connector' ='http',
        'url' ='http//10.36.248.26:8080/post_flink',
        'type' ='POST');

 源码地址:https://download.csdn.net/download/tianhouquan/87424283

有关Flink 自定义数据源Connector的更多相关文章

  1. ruby - Facter::Util::Uptime:Module 的未定义方法 get_uptime (NoMethodError) - 2

    我正在尝试设置一个puppet节点,但ruby​​gems似乎不正常。如果我通过它自己的二进制文件(/usr/lib/ruby/gems/1.8/gems/facter-1.5.8/bin/facter)在cli上运行facter,它工作正常,但如果我通过由ruby​​gems(/usr/bin/facter)安装的二进制文件,它抛出:/usr/lib/ruby/1.8/facter/uptime.rb:11:undefinedmethod`get_uptime'forFacter::Util::Uptime:Module(NoMethodError)from/usr/lib/ruby

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby-on-rails - Rails 3.2.1 中 ActionMailer 中的未定义方法 'default_content_type=' - 2

    我在我的项目中添加了一个系统来重置用户密码并通过电子邮件将密码发送给他,以防他忘记密码。昨天它运行良好(当我实现它时)。当我今天尝试启动服务器时,出现以下错误。=>BootingWEBrick=>Rails3.2.1applicationstartingindevelopmentonhttp://0.0.0.0:3000=>Callwith-dtodetach=>Ctrl-CtoshutdownserverExiting/Users/vinayshenoy/.rvm/gems/ruby-1.9.3-p0/gems/actionmailer-3.2.1/lib/action_mailer

  4. ruby-on-rails - form_for 中不在模型中的自定义字段 - 2

    我想向我的Controller传递一个参数,它是一个简单的复选框,但我不知道如何在模型的form_for中引入它,这是我的观点:{:id=>'go_finance'}do|f|%>Transferirde:para:Entrada:"input",:placeholder=>"Quantofoiganho?"%>Saída:"output",:placeholder=>"Quantofoigasto?"%>Nota:我想做一个额外的复选框,但我该怎么做,模型中没有一个对象,而是一个要检查的对象,以便在Controller中创建一个ifelse,如果没有检查,请帮助我,非常感谢,谢谢

  5. ruby - 主要 :Object when running build from sublime 的未定义方法 `require_relative' - 2

    我已经从我的命令行中获得了一切,所以我可以运行rubymyfile并且它可以正常工作。但是当我尝试从sublime中运行它时,我得到了undefinedmethod`require_relative'formain:Object有人知道我的sublime设置中缺少什么吗?我正在使用OSX并安装了rvm。 最佳答案 或者,您可以只使用“require”,它应该可以正常工作。我认为“require_relative”仅适用于ruby​​1.9+ 关于ruby-主要:Objectwhenrun

  6. ruby - 在 Ruby 中有条件地定义函数 - 2

    我有一些代码在几个不同的位置之一运行:作为具有调试输出的命令行工具,作为不接受任何输出的更大程序的一部分,以及在Rails环境中。有时我需要根据代码的位置对代码进行细微的更改,我意识到以下样式似乎可行:print"Testingnestedfunctionsdefined\n"CLI=trueifCLIdeftest_printprint"CommandLineVersion\n"endelsedeftest_printprint"ReleaseVersion\n"endendtest_print()这导致:TestingnestedfunctionsdefinedCommandLin

  7. ruby - 定义方法参数的条件 - 2

    我有一个只接受一个参数的方法:defmy_method(number)end如果使用number调用方法,我该如何引发错误??通常,我如何定义方法参数的条件?比如我想在调用的时候报错:my_method(1) 最佳答案 您可以添加guard在函数的开头,如果参数无效则引发异常。例如:defmy_method(number)failArgumentError,"Inputshouldbegreaterthanorequalto2"ifnumbereputse.messageend#=>Inputshouldbegreaterthano

  8. ruby - 如何在 Grape 中定义哈希数组? - 2

    我使用Ember作为我的前端和GrapeAPI来为我的API提供服务。前端发送类似:{"service"=>{"name"=>"Name","duration"=>"30","user"=>nil,"organization"=>"org","category"=>nil,"description"=>"description","disabled"=>true,"color"=>nil,"availabilities"=>[{"day"=>"Saturday","enabled"=>false,"timeSlots"=>[{"startAt"=>"09:00AM","endAt"=>

  9. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  10. ruby - 获取模块中定义的所有常量的值 - 2

    我想获取模块中定义的所有常量的值:moduleLettersA='apple'.freezeB='boy'.freezeendconstants给了我常量的名字:Letters.constants(false)#=>[:A,:B]如何获取它们的值的数组,即["apple","boy"]? 最佳答案 为了做到这一点,请使用mapLetters.constants(false).map&Letters.method(:const_get)这将返回["a","b"]第二种方式:Letters.constants(false).map{|c

随机推荐