所以我在DataStream上运行一个映射函数,在映射函数中我想连接2个单独的数据集。只是想知道这在Flink中是否可行。我知道map函数本身作为单独分区的单独任务运行,所以想知道map函数内是否允许分布式连接? 最佳答案 好吧,事实证明你不能,因为连接数据集发生在与流处理(发生在StreamExecutionContext上)不同的上下文(ExecutionContext)上,并且Flink不允许在彼此内部具有不同执行上下文的操作。java.lang.IllegalArgumentException:Thetwoinputshav
在安全的Hadoop集群中,我尝试从YARN访问FlinkAM页面和日志,并看到以下错误:用户%remoteUser无权查看应用程序%appID看来是因为Flink这边不支持YARN中的ACL。代码是如何工作的消息来自使用ApplicationACLsManager类的hadoop/yarn/server/AppBlock类。此类执行检查并引用在RMAppManager中设置的应用程序信息:this.applicationACLsManager.addApplication(applicationId,submissionContext.getAMContainerSpec().get
当我在map中使用richfatMapFunction从hbase读取时,出现序列化错误。我想要做的是,如果数据流等于从hbase读取的特定字符串,则忽略。下面是示例程序和我遇到的错误。packagecom.abb.Flinktestimportjava.text.SimpleDateFormatimportjava.util.Propertiesimportscala.collection.concurrent.TrieMapimportorg.apache.flink.addons.hbase.TableInputFormatimportorg.apache.flink.api.c
我有一个独立的Flink安装,我想在其上运行一个将数据写入HDFS安装的流作业。HDFS安装是Cloudera部署的一部分,需要Kerberos身份验证才能读取和写入HDFS。由于我没有找到关于如何使Flink与受Kerberos保护的HDFS连接的文档,因此我不得不对该过程进行一些有根据的猜测。这是我到目前为止所做的:我为我的用户创建了一个key表文件。在我的Flink作业中,我添加了以下代码:UserGroupInformation.loginUserFromKeytab("myusername","/path/to/keytab");最后,我使用TextOutputFormat将
我正在尝试使用EMR和Flink将一些输出写入S3。我正在使用Scala2.11.7、Flink1.3.2和EMR5.11。但是,我收到以下错误:java.lang.NoSuchMethodError:org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)Vatcom.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:93)atorg.apache.flink.runtime.fs
使用Flink1.7.1为kubernetes上的单个作业集群构建它flink无法加载核心站点xml尽管在类路径上,导致忽略配置,但是,如果我将ENV变量AWS_SECRET_ACCESS_KEYAWS_ACCESS_KEY_ID工作找到它,但如果我依赖于core-site.xml,那么没有环境变量它就永远无法工作。我目前正在复制core-site.xml,因为它显示在Dockerfile中,并且正如文档所说,将HADOOP_CONF_DIR作为指向它的环境变量。它仍然不加载它,导致NoCredentialsProvider。异常(exception)是:Causedby:org.ap
我有一个小型测试项目,用于将数据推送到S3存储桶。但是,看起来我没有读取core-site.xml文件,因为我收到错误java.io.IOException:Nofilesystemfoundwithschemes3a。如何正确读取core-site.xml文件并将数据推送到S3?这是代码:publicclassS3Sink{publicstaticvoidmain(String[]args)throwsException{Mapconfigs=ConfigUtils.loadConfigs(“path/to/config.yaml");finalParameterToolparame
我正在尝试使用flink将csv文件编写为Parquet。我正在使用以下代码并收到错误。valparquetFormat=newHadoopOutputFormat[Void,String](newAvroParquetOutputFormat,job)FileOutputFormat.setOutputPath(job,newPath(outputPath))我收到以下构建错误。有人可以帮忙吗?typemismatch;found:parquet.avro.AvroParquetOutputFormatrequired:org.apache.hadoop.mapreduce.Outp
如果我们必须在流式应用程序中读取和写入HBASE,我们该怎么做。我们通过open方法打开连接进行写入,我们如何打开连接进行读取。objecttest{if(args.length!=11){//printargsSystem.exit(1)}valArray()=argsprintln("ParametersPassed"+...);valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalproperties=newProperties()properties.setProperty("bootstrap.servers"
我使用Hadoop2.9.2启动了Dataproc集群,下载了Flink1.7.2并尝试使用以下命令启动它:./bin/yarn-session.sh-n2失败并显示以下错误消息:SettingHADOOP_CONF_DIR=/etc/hadoop/confbecausenoHADOOP_CONF_DIRwasset.2019-02-1512:56:05,679INFOorg.apache.flink.configuration.GlobalConfiguration-Loadingconfigurationproperty:jobmanager.rpc.address,localho