有没有类似Mahout'sXmlInputFormat的东西但是对于Flink呢?我有一个很大的XML文件,我想提取特定的元素。在我的例子中,它是一个维基百科转储,我需要得到所有标签。即如果我有一个文件............我想获取所有3条记录...在映射器中使用。理想情况下它应该是有效的XML,xpath查询/mediawiki/page的东西会回来的。 最佳答案 Mahout的XmlInputFormat扩展了Hadoop的TextInputFormat。Flink具有HadoopInputFormats的通用包装器,因此也应
我正在尝试以本地模式在ApacheFlink中执行示例程序。importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.util.Collector;publicclassWordCountExample{pu
官方FlinkRestAPI文档RESTAPI|ApacheFlinkFlink接口调用地址 用户可以通过flink提供的RestAPI管理应用。RestAPI可供用户或脚本直接访问,它可以对外公开有关Flink集群和应用的信息。flink使用Web服务器来同时支持RestAPI和WebUI,该服务器会作为Dispatcher进程的一部分(Dispatcher启动JobManager)来运行。默认情况下二者都会使用8081端口。我们可以通过./conf/flink-conf.yaml来设置web服务器的ip和端口:rest.address,rest.port,同时为了避免我们提交的项目在
准备工作在这一步需要配置Oracle。主要包含。开启Archivelog开启数据库和数据表的supplementallog创建CDC用户并赋予权限注意:不要使用Oracle的SYS和SYSTEM用户做为CDC用户。因为这两个用户能够捕获到大量Oracle数据库内部的变更信息,对于业务数据来说是不必要的。Debezium会过滤掉这两个用户捕获到的变更内容。下面开始配置步骤。在安装Oracle的机器上执行:su-oraclesqlplus/assysdba进入Sqlplus。然后开启Archivelog。altersystemsetdb_recovery_file_dest_size=10G;al
我使用ApacheAmbari在4节点上安装了ApacheHadoop。我用ApacheFlink写了一个简单的作业。我想将此作业提交到我的YARN集群。但是Flink需要在本地机器上YARN配置文件(core-site.xml、yarn-site.xml等)。所以如果我没有误解的话,有两种手动方式在ResourceManager节点上启动flink作业(以查找配置文件)从ResourceManager下载配置文件到本地。我觉得,这两种方式都不是很好。如何将作业提交到远程YARN集群。有合适的方法吗? 最佳答案 在Hadoop/YA
我正在使用Flink1.3.2和hbaseTableInputFormat来自flink-connectors(flink-hbase_2.11),使用DataSetAPI。我有一个HBase表,其中行键的结构如下:|RowKey|data||0-someuniqid|data||0-someuniqid|data||2-someuniqid|data||2-someuniqid|data||4-someuniqid|data||5-someuniqid|data||5-someuniqid|data||7-someuniqid|data||8-someuniqid|data|表的前缀
我有一个在AWSEMR中运行的具有高并行度(400)的Flink应用程序。它使用BucketingSink(使用RocksDb后端进行检查点)获取Kafka并汇入S3。目的地使用“s3a://”前缀定义。Flink作业是一个连续运行的流式应用程序。在任何给定时间,所有工作人员加起来可能会生成/写入400个文件(由于400并行度)。几天后,其中一名worker将失败,但出现异常:org.apache.hadoop.fs.s3a.AWSS3IOException:copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress,bucket/2
我已经按照文档指标部分中的说明设置了PrometheusPushGatewayReporter。我可以看到来自推送网关UI中公开的flinkjobmanager和taskmanagers的指标,以及它们已被Prometheus集群正确抓取。问题是,即使我明确设置了deleteOnJobShutdown配置选项,当通过flinkcli工具取消作业时,也只会删除jobmanager的指标。有没有办法同时删除陈旧的任务管理器指标?我的配置如下:metrics.reporter.promgateway.class:org.apache.flink.metrics.prometheus.Prom
我正在使用Flinkbucketingsink从Kafka到HDFS。Flink的版本是1.4.2。我发现每次重新启Action业时都会丢失一些数据,即使有保存点也是如此。我发现如果我设置编写器SequenceFile.CompressionType.RECORD而不是SequenceFile.CompressionType.BLOCK就可以解决这个问题。看来Flink在保存checkpoint的时候,有效长度和实际长度不一样,应该包括压缩数据。但如果我们由于磁盘使用而无法使用CompressionType.BLOCK,则可能会出现问题。如何在重新启Action业时使用block压缩来
我在maven/java项目中使用flink,需要在创建的jar中包含我的内部配置。因此,我在我的pom文件中添加了以下内容。这包括我在jar中的所有yml配置(位于src/main/resources文件夹中),我将在执行时将其名称作为参数传递。src/main/resources**/*.ymlorg.apache.maven.pluginsmaven-shade-plugin2.4.3packageshade*:*META-INF/*.SFMETA-INF/*.DSAMETA-INF/*.RSA${project.artifactId}-${project.version}tru