当我运行这个命令时hduser@ubuntu:/usr/local/flume$bin/flume-ngagent--conf./conf/-fconf/twitterflume-agent.conf-Dflume.root.logger=DEBUG,console-nTwitterAgent水槽正在启动,但一段时间后它抛出异常,不允许水槽下载。我收到以下错误:2015-10-3110:18:32,152(conf-file-poller-0)[INFO-org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory
如何根据Kafka消息中的消息类型使用水槽写入自定义hdfs目录?说kafka消息:{"type":"A","data":"blah"}在类型字段中有"A"应该写入/data/A,message:{"type":"B","data":"blah"}在类型字段中有"B"应该写入/data/B等 最佳答案 我认为您需要自定义水槽。ApacheFlumecustomsink 关于hadoop-根据Kafka的消息数据写入自定义HDFS目录->Flume->hdfs摄取,我们在StackOver
我正在尝试使用flume假脱机目录将数据摄取到HDFS(SpoolDir>MemoryChannel>HDFS)。我正在使用ClouderaHadoop5.4.2。(Hadoop2.6.0,Flume1.5.0)。它适用于较小的文件,但不适用于较大的文件。请在下面找到我的测试场景:大小为KB到50-60MBytes的文件,处理无问题。大于50-60MB的文件,它将大约50MB写入HDFS,然后我发现flumeagent意外退出。水槽日志中没有错误消息。我发现它试图多次创建“.tmp”文件(HDFS),并且每次在意外退出之前写入几兆字节(有时2MB,有时45MB)。一段时间后,最后尝试的
我正在编写自定义水槽源和接收器,目前我没有看到重新使用现有的flume-ng代码的最佳方式。例如,我无法扩展AvroSource类并轻松自定义一些功能,而无需必须复制/粘贴大量现有代码。是不是故意这样还是我在这里做错了什么? 最佳答案 如果您想要一个接受Avro消息的源,您只需要自定义AvroSource。这将是编写自定义源和接收器的一个令人惊讶的理由。要编写自定义接收器,请遵循customsinkdocs.自定义源在同一文档中。顺便说一句,为什么您需要自定义源和接收器?最后,1.4.0刚刚发布-如果可能,您应该使用它。
我只是想知道是否有人遇到过需要将数据从excel导入或读取到Hadoop的场景?有没有FlumeExcel之类的源码?顺便说一句,我知道我可以将excel文件转换为csv然后处理它。真的只是想在这里进一步探索水槽源。 最佳答案 SpoolingDirectorySource可以配置为从Excel文件(或任何其他格式)读取。如文档所述,此源可以配置为EventDeserializer-实现将文件解析为事件的逻辑的类。我不认为有人已经实现了这样的反序列化器,但使用ApachePOI库似乎很容易完成任务。
如何使用Flume将一组csv文件从我的本地目录复制到HDFS?我尝试使用假脱机目录作为我的来源,但未能复制。然后我使用以下水槽配置来获得我的结果:agent1.sources=tailagent1.channels=MemoryChannel-2agent1.sinks=HDFSagent1.sources.tail.type=execagent1.sources.tail.command=tail-F/home/cloudera/runs/*agent1.sources.tail.channels=MemoryChannel-2agent1.sinks.HDFS.channel=M
我有一个为Flume代码编写的拦截器如下:publicEventintercept(Eventevent){byte[]xmlstr=event.getBody();InputStreaminstr=newByteArrayInputStream(xmlstr);//TransformerFactoryfactory=TransformerFactory.newInstance(TRANSFORMER_FACTORY_CLASS,TRANSFORMER_FACTORY_CLASS.getClass().getClassLoader());TransformerFactoryfactor
我正在使用以下代码将源目录中的文件写入hdfs。#Initializeagent'ssource,channelandsinkagent.sources=testagent.channels=memoryChannelagent.sinks=flumeHDFS#Settingthesourcetospooldirectorywherethefileexistsagent.sources.test.type=spooldiragent.sources.test.spoolDir=/Data#Settingthechanneltomemoryagent.channels.memoryCha
我有这样的GIS数据-'111,2011-02-0120:30:30,116.50443,40.00951''111,2011-02-0120:30:31,116.50443,40.00951''112,2011-02-0120:30:30,116.58197,40.06665''112,2011-02-0120:30:31,116.58197,40.06665'第一列是driver_id,第二个是timestamp,第三个是longitude&第四个是latitude.我正在使用Flume摄取此类数据,我的接收器是HBase(类型-AsyncHBaseSink)。默认情况下,HBas
我需要将数据从hadoop自动加载到hive,但我不想设置其他服务来执行此操作。我已经使用flume来收集我的日志了……那我该怎么办呢?flume是否可以执行命令(查询hive就像LOAD.....)? 最佳答案 抱歉,我来晚了一点,但实际上我已经整理了一个非常完整的示例,说明如何执行此操作并公开了所有细节。也许,它会帮助别人http://www.lopakalogic.com/articles/hadoop-articles/log-files-flume-hive/祝你好运! 关于h