草庐IT

scala-cats

全部标签

scala - 来自值列表的循环数,在 Spark 和 Scala 中是正数和负数的混合

有一个包含值列表的RDD,这些值是正值和负值的混合。需要根据此数据计算周期数。例如,valrange=List(sampleRange(2020,2030,2040,2050,-1000,-1010,-1020,起点,-1030,2040,-1020,2050,2040,2020,终点,-1060,-1030,-1010)上面列表中每个值之间的间隔是1秒。即,2020和2030以1秒为间隔记录,依此类推。它从负转正并保持正>=2秒的次数。如果>=2秒,则为一个循环。周期数:逻辑示例1:列表(1,2,3,4,5,6,-15,-66)循环次数为1。原因:当我们从列表的第一个元素移动到第6个

scala - NLineInputFormat 在 Spark 中不起作用

我想要的基本上是让每个数据元素由10行组成。但是,使用以下代码,每个元素仍然是一行。我在这里犯了什么错误?valconf=newSparkConf().setAppName("MyApp")conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")conf.registerKryoClasses(Array[Class[_]](classOf[NLineInputFormat],classOf[LongWritable],classOf[Text]))valsc=newSparkContext(co

scala - Scala Spark 属性的最佳实践

我正在使用HadoopSpark开始一个项目。我将使用Scala进行开发。我正在从头开始创建项目,我想知道如何处理属性。我来自Java背景,在那里我使用.properties文件并在开始时加载它们。然后我有一个类用于访问我的属性的不同值。这在Scala中也是一种好的做法吗?尝试谷歌搜索,但没有任何与此相关的内容。 最佳答案 可以像Java一样在scala中读取properties文件importscala.io.Source.fromUrlvalreader=fromURL(getClass.getResource("conf/fp

scala - 如何从spark写入文件到cassandra

我是spark和Cassandra的新手。我使用此代码,但它给我错误。valdfprev=df.select(col="se","hu")vala=dfprev.select("se")valb=dfprev.select("hu")valcollection=sc.parallelize(Seq(a,b))collection.saveToCassandra("keyspace","table",SomeColumns("se","hu"))当我在savetocassandra上输入这段代码时,出现错误,错误是:java.lang.IllegalArgumentException:M

scala - 如何在hadoop中实现OR join(scalding/cascading)

只需将连接字段作为缩减键发送,就可以很容易地通过单键连接数据集。但是通过多个键连接记录(其中至少一个键应该相同)对我来说并不那么容易。示例我有日志,我想按用户参数对它们进行分组,我想通过(ipAddress,sessionId,visitorCockies)加入它们如果log1.ip==log2.ipORlog1.session=log2.sessionORlog1.cockie=log2.coockie,那么log1应该与log2分组。也许可以创建复合键或一些概率方法,如minHash...这可能吗? 最佳答案 问题是MapRed

scala - 计算 Hadoop 上偶数/奇数对的总和?

我想为Hadoop创建一个并行scanLeft(计算关联运算符的前缀和)函数(特别是烫伤;请参阅下文了解如何完成)。给定一个hdfs文件中的数字序列(每行一个),我想用连续偶数/奇数对的总和计算一个新序列。例如:输入序列:0,1,2,3,4,5,6,7,8,9,10输出序列:0+1、2+3、4+5、6+7、8+9、10即1,5,9,13,17,10我认为,为了做到这一点,我需要为Hadoop编写一个InputFormat和InputSplits类,但我不知道该怎么做。参见本节3.3here.以下是Scala中的示例算法://forsimplicityassumeinputlengthi

scala - Spark 作业未在本地并行化(使用本地文件系统中的 Parquet + Avro)

编辑2通过将RDD重新分区为8个分区间接解决了问题。遇到avro对象不是“javaserialisable”的障碍,找到了一个片段heretodelegateavroserialisationtokryo.原来的问题依然存在。编辑1:删除了map函数中的局部变量引用我正在编写一个驱动程序,使用parquet和avroforio/schema在spark上运行计算繁重的作业。我似乎无法得到Spark来使用我所有的核心。我究竟做错了什么?是因为我已将键设置为null吗?我刚刚开始了解hadoop如何组织文件。据我所知,因为我的文件有1GB的原始数据,我应该期望看到与默认block和页面大小

scala - 如何在我的 Spark 应用程序中使用 OpenHashSet?

根据private[spark],我知道OpenHashSet在spark中是私有(private)的,但是当数据非常大时,通常我们需要更快的HashMap或HashSet实现。我如何在自己的代码中使用这些数据结构?或者是还有其他选择吗?谢谢! 最佳答案 好吧,它是开源的,因此您可以fork/复制它,重命名包以避免冲突,并删除private[spark]限制。但是,当然这取决于Paul提到的您的具体用例。参见thisquestion法律问题。 关于scala-如何在我的Spark应用程序

scala - 使用 sc.textFile() 加载本地文件以激发

问题如何使用sc.textFile从本地文件系统加载文件到Spark?我需要更改任何-env变量吗?此外,当我在未安装Hadoop的Windows上尝试相同操作时,我遇到了同样的错误。代码>valinputFile=sc.textFile("file///C:/Users/swaapnika/Desktop/todolist")/1722:28:18INFOMemoryStore:ensureFreeSpace(63280)calledwithcurMem=0,maxMem=278019440/1722:28:18INFOMemoryStore:Blockbroadcast_0stor

scala - Spark 集群无法从远程 Scala 应用程序分配资源

因此,我一直在尝试着手运行Spark-scala。我写了一个简单的测试程序,它只是稍微扩展了SparkPi示例:defmain(args:Array[String]):Unit={test()}defcalcPi(spark:SparkContext,args:Array[String],numSlices:Long):Array[Double]={valstart=System.nanoTime()valslices=if(args.length>0)args(0).toIntelse2valn=math.min(numSlices*slices,Int.MaxValue).toIn