我在hdfs中实时接收文件,它们具有相同的命名约定。id_name_..._timestamp我能否以某种方式在spark(scala)上定义此命名约定,以便稍后我可以将这些与ID进行比较?谢谢 最佳答案 你使用这样的东西:注册udfspark.udf().register("get_only_file_name",(StringfullPath)->{intlastIndex=fullPath.lastIndexOf("/");returnfullPath.substring(lastIndex,fullPath.length-1
我正在构建一个docker镜像,以针对使用YARN的生产Hadoop集群在本地运行zeppelin或spark-shell。编辑:环境是macOS我可以很好地执行作业或spark-shell,但是当我尝试访问YARN上的TrackingURL时,作业正在运行,它会挂起YARN-UI整整10分钟。YARN仍在工作,如果我通过ssh连接,我可以执行yarn命令。如果我不访问SparkUI(直接或通过YARN),什么也不会发生。作业已执行且YARN-UI未挂起。更多信息:本地,在Docker上:Spark2.1.2、Hadoop2.6.0-cdh5.4.3生产环境:Spark2.1.0、Ha
我在EMR上运行Spark2.3,并尝试使用Scala将数据写入HDFS,如下所示:dataframe.write.partitionBy("column1").bucketBy(1,"column2").sortBy("column2").mode("overwrite").format("parquet").option("path","hdfs:///destination/").saveAsTable("result")写入数据并完成任务后,我收到超时错误。错误发生后,我可以在HDFS中看到已完全处理的数据。为什么会出现这个错误?有什么意义吗?看起来主节点正在尝试与另一个IP(
我正在使用ApacheSparkv2.3.1并尝试在处理后将数据卸载到AWSS3。类似的东西:data.write().parquet("s3a://"+bucketName+"/"+location);配置似乎没问题:Stringregion=System.getenv("AWS_REGION");StringaccessKeyId=System.getenv("AWS_ACCESS_KEY_ID");StringsecretAccessKey=System.getenv("AWS_SECRET_ACCESS_KEY");spark.sparkContext().hadoopConf
我通过使用map函数将RDD转换为DF创建了dataframe。当我尝试显示记录时,它给我exception。下面是我的代码://Createdcaseclasscaseclassemployees(emp_id:java.lang.Long,emp_name:String,job_name:String,manager_id:java.lang.Long,hire_date:String,salary:java.lang.Double,commision:java.lang.Double,dep_id:java.lang.Long);//CreatedDFvalemployeesDf
我在我的窗口(这是我的本地)中配置了Hadoop和spark,我在一个虚拟机(同一台机器)中设置了cloudera,它里面有hbase。我正在尝试使用sparkstream提取数据并将其放入vm中的hbase中。这有可能吗?我的尝试:打包hbaseimportorg.apache.hadoop.hbase.HBaseConfigurationimportorg.apache.hadoop.hbase.client.{ConnectionFactory,HBaseAdmin,HTable,Put,Get}objectConnect{defmain(args:Array[String]){
需要为Spark流代码实现测试。此特定代码使用thislibrary在单独的jvm中运行上述应用程序的输入是hdfs。我已经像这样启动了MiniDFSClusterexample(javaversion)但我不认为它会起作用,因为它们在两个不同的JVM中。如果我要成功测试spark流代码,模拟hdfs输入的最佳方法是什么。我大致解释了上面的场景。真正的要求是实现一个成功的cucumber测试。 最佳答案 您可以在本地模式下运行Spark并指定诸如“file:///foo/bar”之类的路径,而不是尝试模拟hdfs-然后将使用本地文件
我是新手。我正在尝试运行将数据加载到elasticsearch的spark作业。我用我的代码构建了一个fatjar,并在spark-submit期间使用了它。spark-submit\--classCLASS_NAME\--masteryarn\--deploy-modecluster\--num-executors20\--executor-cores5\--executor-memory32G\--jarsEXTERNAL_JAR_FILES\PATH_TO_FAT_JARelasticsearch-hadoop依赖的maven依赖为:org.elasticsearchelasti
我们正在将每小时JSON数据接收到HDFS中。数据大小约为每小时5-6GB。当在最终表中找到匹配记录时,然后更新(或)删除如果记录在最终数据集中不匹配,则插入记录。我们已经为USE案例尝试了Hive合并选项。这需要一个多小时来处理Hive中的合并操作。有没有其他替代方法来解决用例。所以基本上每天我们都将150GB的数据添加到配置单元中,每隔一天我们必须扫描150Gb的数据以查找是否需要更新/插入对大型数据集执行Upserts(Hadoop中的更新和插入)的最佳方法是什么。hive或HBase或尼菲。什么是流量。 最佳答案 我们正在使
我正在从宽字符串中选择列,其偏移量如下所示df2=df.select(substring(col("a"),4,6).as("c")).cast(IntegerType)但是我必须从字符串中提取1000列,如果我可以提供诸如列名、数据类型、宽度、起始位置和结束位置等详细信息,那么如何使用jsonsparkstruct模式生成select语句。另外,我不得不将一些列转换为intergertype或longtype,但是我观察到这些字段被像111111111将在转换为integertype时转换为1 最佳答案 如果可以使用configf