所以,我是第一次使用NiFi。我正在尝试使用它来调用API,然后将数据通过管道传输到HDFS(HortonworksSandbox2.4)。我目前只使用2个处理器:GetHTTP和PutHDFS。我似乎两个处理器都配置正常...它们运行,但我无法找到当我通过Ambari进入Hadoop时创建的输出文件...我已将输出目录设置为/user/,但什么也没有出现。但是,我在PutHDFS处理器上收到一条警告消息,建议:WARNINGPutHDFS[...]penalizingStandardFlowFileRecord[...]androutingtofailurebecausefilewi
我设置了以下流程来读取json数据并使用convertRecord处理器将其转换为csv。但是,输出流文件仅填充了单个记录(我假设只有第一条记录)而不是所有记录。谁能帮忙提供正确的配置?源json数据:{"creation_Date":"2018-08-19","Hour_of_day":7,"log_count":2136}{"creation_Date":"2018-08-19","Hour_of_day":17,"log_count":606}{"creation_Date":"2018-08-19","Hour_of_day":14,"log_count":1328}{"cre
我有HDFS集群,其中包含Active和StanbyNamenodes。有时,当集群重新启动时,Namenode会交换它们的角色-Standby变为Active,反之亦然。然后我让NiFi流和PutParquet处理器将一些文件写入这个HDFS集群。处理器的目录属性配置为“hdfs://${namenode}/some/path”,其中${namenode}变量值类似于“first.namenode.host.com:8020”。现在,当集群重新启动并且实际的Namenode更改为“second.namenode.host.com:8020”时,NiFi中的配置没有更新,处理器仍然尝试
我正在尝试从SFTP服务器加载包含225GB(文件数量约为1,75,000)的庞大数据,并将数据复制到HDFS。为了实现上述场景,我们使用了2个处理器。GetSFTP(从SFTP服务器获取文件)已配置处理器->递归搜索=true;使用自然排序=true;远程轮询批量大小=5000;并发任务=32.PutHDFS(将数据推送到HDFS)已配置的处理器->并发任务=3;冲突解决策略=replace;Hadoop配置资源;目录但是一段时间后,数据复制停止并且它的大小在HDFS中没有更新。当我在GetSFTP中将RemotePollBatchSize配置设置为5000->推送到HDFS的总数据
我需要将一堆json文件流式传输到Nifi,然后它会转到HDFS。Nifi需要查看json文件中的创建日期(UNIX格式),然后将其路由到相应的HDFS文件夹。到目前为止,我的处理器设置如下:使用Kafka->RouteOnContent(使用正则表达式^"creationDate":\"[0-9]{4}-[0-9]{2}-[0-9]{2}$)->PutHDFS每天都有一个HDFS文件夹,如“2019-01-28”、“2019-01-29”、“2019-01-30”等。但是,“PutHDFS”处理器只会输出到一个单个目录,我显然不想拥有365个处理器。据我所知,Nifi没有办法动态创建
我注意到nifi中没有可用的处理器可以直接将文件存储到hive中。是否有任何处理器组合可以帮助解决这个问题,或者是否有任何可用的模板可以做到这一点?谢谢。 最佳答案 在下一个版本的NiFi(0.7.0)中,将有一些初始处理器用于与Hive交互。其中之一是PutHiveQL,它可以将带有HiveQL语句的流文件作为内容,并使用HiveJDBC驱动程序执行它。csv或json的可能流程是首先将数据解析为构造插入语句所需的值,可能使用ExtractText或EvaluateJsonPath,然后使用ReplaceText和表达式语言构造插
我正在尝试在ApacheNiFi中开发一个自定义处理器,它将orc文件直接写入远程hadoop集群。为了编写它们,我正在使用orccoreapi.我尝试在本地FS上写入文件,它们没问题:hive,这是它们的“最终目的地”,读取它们没有问题。问题是,在尝试创建Writer对象时,我得到了org.apache.hadoop.hdfs.DistributedFileSystem类的NoClassDefFoundError。这是使用的代码:Configurationconf=newConfiguration();conf.addResource(newPath(hadoopConfigurat
根据NiFi'shomepage,它“支持数据路由、转换和系统中介逻辑的强大且可扩展的有向图”。过去几个月我一直在使用NiFi,不禁想知道为什么不将它也用于调度批处理。假设我有一个用例,其中数据流入Hadoop,由一系列Hive\MapReduce作业处理,然后导出到一些外部NoSql数据库以供某些系统使用。使用NiFi来摄取数据并将数据流入Hadoop是NiFi的一个用例。但是,使用Nifi来安排Hadoop上的作业(“Oozie-like”)是一个我没有遇到过其他人实现的用例,并且由于它似乎完全有可能实现,我试图了解是否有原因不要这样做。在NiFi上完成这一切的好处是,人们将在一个
我需要将巨大的CSV文件从Kafka主题读取到Cassandra。我配置了ApacheNifi实现相同。流量:用户无法控制Nifi设置。他只指定了CSV所在的URL。Web应用程序将URL写入kafka主题。Nifi获取文件并插入到Cassandra中。我如何知道Nifi已将CSV文件中的所有行插入到Cassandra中?我需要让用户知道插入已完成。如有任何帮助,我们将不胜感激。 最佳答案 我找到了解决方案。使用MergeContent处理器,所有具有相同“fragment.identifier”值的FlowFiles将被组合在一起
我在MSSQL中有几个表,这些表每秒更新一次,查询或多或少看起来像这样SELECTG_ID,UpdateTime,ID,Name,T_NAMEFROMTABLE1AStable1INNERJOINTABLE2AStable2ONtable1.IP=table2.IDWHEREtable2.UpdateTime>=${lastUpdateTime}ANDtable2.G_ID>${lastID}假设选择内连接查询结果为5条记录,如下所示。如果查询是第一次运行${lastUpdateTime}和${lastG_ID}设置为0,它将返回5条以下的记录。处理记录后,查询将存储max(G_ID)