草庐IT

持久化Spark

全部标签

java - 当队列持久时,HornetQ Producer 变慢

我已经尝试在horntQ中使用PersistentQueue。我做了两个单独的例子(生产者,消费者)。我的消费者运行良好,但生产者花费太多时间来完成发送消息。我分别跑过和一起跑过。可能是什么问题呢?我的代码是:publicclassHornetProducerimplementsRunnable{Contextic=null;ConnectionFactorycf=null;Connectionconnection=null;Queuequeue=null;Sessionsession=null;MessageProducerpublisher=null;TextMessagemess

java - 将 spark 数据帧写入 Parquet 格式时出现内存不足错误

我正在尝试从数据库中查询数据,对其进行一些转换并将新数据以Parquet格式保存在hdfs上。由于数据库查询返回大量行,我正在分批获取数据并对每个传入批处理运行上述过程。更新2:批处理逻辑是:importscala.collection.JavaConverters._importorg.apache.spark.SparkContextimportorg.apache.spark.sql.SQLContextimportorg.apache.spark.sql.Rowimportorg.apache.spark.sql.types.{StructType,StructField,St

java - Apache Spark 如何将函数发送到引擎盖下的其他机器

我开始使用Pyspark进行一些数据处理。我可以做一些像这样的事情对我来说很有趣rdd.map(lambdax:(x['somekey'],1)).reduceByKey(lambdax,y:x+y).count()它会将这些函数中的逻辑发送到可能多台机器上以并行执行。现在,如果我有Java背景,如果我想将包含某些方法的对象发送到另一台机器,那台机器需要知道通过网络流式传输的对象的类定义。最近java有了函数式接口(interface)的想法,它将在编译时为我创建该接口(interface)的实现(即MyInterfaceimpl=()->System.out.println("Stu

java - Spark Kryo 注册数组类

我正在尝试用数组注册一个类(激活了Kryo的SparkJava),日志显示一条​​明确的消息:Classisnotregistered:org.apache.spark.sql.execution.datasources.InMemoryFileIndex$SerializableBlockLocation[]我已经写了几个组合,但这些都不起作用:kryo.register(Class.forName("org.apache.spark.sql.execution.datasources.InMemoryFileIndex$SerializableBlockLocation[]"));

java - HashMap 作为 Spark Streaming 中的广播变量?

我有一些数据需要在sparkstreaming中分类。分类键值在程序开始时加载到HashMap中。因此,每个传入的数据包都需要与这些key进行比较并进行相应标记。我意识到spark有称为广播变量和累加器的变量来分发对象。教程中的示例使用简单的变量,例如etc。如何使用HashMap在所有sparkworker上共享我的HashMap。或者,是否有更好的方法来执行此操作?我正在用Java编写我的SparkStreaming应用程序。 最佳答案 在spark中,您可以用相同的方式广播任何可序列化的对象。这是最好的方法,因为您只需将数据发

java - "spark.memory.fraction"好像没有作用

在Spark中,当我从一个函数中从HDFS读取一个大约1GB的字符串时,我遇到了java.lang.OutOfMemoryError:Javaheapspace错误。我使用的执行程序内存是6GB。为了增加用户内存,我什至将spark.memory.fraction减少到0.3,但我仍然遇到同样的错误。似乎降低该值没有效果。我正在使用Spark1.6.1并使用Spark1.6核心库进行编译。我在这里做错了什么吗? 最佳答案 请参阅SparkConfSparkExecutorOOM:如何在Spark上设置内存参数一旦应用程序运行,您将看

java - 比较 Spark 中的两个数据帧(性能)

我需要比较我的spark应用程序中的两个数据帧。我浏览了以下帖子。HowtoobtainthedifferencebetweentwoDataFrames?但是,我不明白为什么最佳答案中的方法df1.unionAll(df2).except(df1.intersect(df2))比问题中的那个好df1.except(df2).union(df2.except(df1))谁能解释一下?据我了解,后者适用于两个较小的数据集,而前者适用于大型数据集。是因为后者将不同作为联合的一部分吗?即使那样,如果两个数据框有相同记录的可能性更大,那么在后一种情况下我们处理的是一个小数据集。

java - 令人困惑的Tomcat持久 session 内存配置文件

与任何内存管理问题一样,这是一个很长的故事,所以请牢牢捕获。我们的应用程序遇到了一些内存管理问题,因此,我一直在尝试对该应用程序进行概要分析,以了解问题出在哪里。我今天早些时候看到了这个线程:TomcatSessionEvictiontoAvoidOutOfMemoryError……似乎跟我在探查器中看到的一样。基本上,如果我用Jmeter吸引了一群用户使用该应用程序,它将长时间保留在堆内存上,最终直到session开始过期为止。但是,与该线程中的发布者不同,我拥有源代码,并且可以尝试使用Tomcat来实现持久状态session,这是我今天一直在尝试的工作,但取得的成功有限。我认为这是

java - 实体不通过 TomEE 上的 Spring(CrudRepository) 持久化

我在尝试使用SpringsCrudRepository接口(interface)将实体持久保存到PostgreSQL数据库时遇到问题。我在TomEE上使用正确的值设置它和调整Spring本身的配置时遇到了很多问题。我曾短暂地尝试过使用hibernate,但问题变得更糟,所以我切换回与TomEE捆绑在一起的OpenJpa。我对纯JavaEE有一点经验,正在创建这个应用程序来学习Spring,我的一个friend需要一个Spring应用程序来部署在基于TomEE的ApplicationServer上,因此这两个是必需的。我附加的代码量可能有点过头了,但我宁愿让您看看那里有什么,这样您就可以

java - Apache Spark - 内存异常错误 - IntelliJ 设置

当我尝试运行使用ApacheSpark的测试时,我遇到了以下异常:Exceptionencounteredwheninvokingrunonanestedsuite-Systemmemory259522560mustbeatleast4.718592E8.Pleaseusealargerheapsize.java.lang.IllegalArgumentException:Systemmemory259522560mustbeatleast4.718592E8.Pleaseusealargerheapsize.我可以通过更改配置中的vm选项来绕过错误,使其具有:-Xms128m-Xmx