草庐IT

hadoop - Spark - 寻找重叠值或寻找共同 friend 的变体

coder 2024-01-06 原文

我有一个问题想用 Spark 解决。我是 Spark 的新手,所以我不确定设计它的最佳方式是什么。

输入:

group1=user1,user2
group2=user1,user2,user3
group3=user2,user4
group4=user1,user4
group5=user3,user5
group6=user3,user4,user5
group7=user2,user4
group8=user1,user5
group9=user2,user4,user5
group10=user4,user5

我想找到每对用户之间的相互组数。所以对于上面的输入,我期望的输出是:

输出:

1st user || 2nd user || mutual/intersection count || union count
------------------------------------------------------------
user1        user2           2                       7
user1        user3           1                       6
user1        user4           1                       9
user2        user4           3                       8

我认为有几种方法可以解决这个问题,其中一种解决方案可能是:

  • 创建一个键值对,键是用户,值是组
  • 按键分组,然后我们将有一个用户所属的组列表
  • 然后找到两组之间的交集/并集

例子:

(1st stage): Map
group1=user1,user2 ==>
          user1, group1
          user2, group1
group2=user1,user2,user3 ==>
          user1, group2
          user2, group2
          user3, group2
....
....
....


(2nd stage): Reduce by key
user1 -> group1, group2, group4, group8
user2 -> group1, group2, group3, group7, group9

但我的问题是,在我按键减少它们之后,以我想要的方式表示计数的最佳方式是什么?

有没有更好的方法来处理这个问题?用户的最大数量是恒定的,不会超过 5000,因此这是它将创建的最大 key 数量。但是输入可能包含接近 1B 行的几行。我不认为这会是一个问题,如果我错了请纠正我。

更新:

这是我对Spark一知半解(上个月才开始学Spark)想出来的解决这个问题的代码:

def createPair(line: String): Array[(String, String)] = {
    val splits = line.split("=")
    val kuid = splits(0)
    splits(1).split(",").map { segment => (segment, kuid) }
}


val input = sc.textFile("input/test.log")
val pair = input.flatMap { line => createPair(line) }

val pairListDF = pair
  .aggregateByKey(scala.collection.mutable.ListBuffer.empty[String])(
    (kuidList, kuid) => { kuidList += kuid; kuidList },
    (kuidList1, kuidList2) => { kuidList1.appendAll(kuidList2); kuidList1 })
  .mapValues(_.toList).toDF().select($"_1".alias("user"), $"_2".alias("groups"))

pairListDF.registerTempTable("table")

sqlContext.udf.register("intersectCount", (list1: WrappedArray[String], list2: WrappedArray[String]) => list1.intersect(list2).size)
sqlContext.udf.register("unionCount", (list1: WrappedArray[String], list2: WrappedArray[String]) => list1.union(list2).distinct.size)

val populationDF = sqlContext.sql("SELECT t1.user AS user_first,"
  + "t2.user AS user_second,"
  + "intersectCount(t1.groups, t2.groups) AS intersect_count,"
  + "unionCount(t1.groups, t2.groups) AS union_count"
  + " FROM table t1 INNER JOIN table t2"
  + " ON t1.user < t2.user"
  + " ORDER BY user_first,user_second")

输出:

+----------+-----------+---------------+-----------+
|user_first|user_second|intersect_count|union_count|
+----------+-----------+---------------+-----------+
|     user1|      user2|              2|          7|
|     user1|      user3|              1|          6|
|     user1|      user4|              1|          9|
|     user1|      user5|              1|          8|
|     user2|      user3|              1|          7|
|     user2|      user4|              3|          8|
|     user2|      user5|              1|          9|
|     user3|      user4|              1|          8|
|     user3|      user5|              2|          6|
|     user4|      user5|              3|          8|
+----------+-----------+---------------+-----------+

很想得到一些关于我的代码和我遗漏的东西的反馈。请随意批评我的代码,因为我刚刚开始学习 Spark。再次感谢@axiom 的回答,比我预期的更小更好的解决方案。

最佳答案

总结:

获取对数,然后利用事实

union(a, b) = count(a) + count(b) - intersection(a, b)

val data = sc.textFile("test")
//optionally data.cache(), depending on size of data.
val pairCounts  = data.flatMap(pairs).reduceByKey(_ + _)
val singleCounts = data.flatMap(singles).reduceByKey(_ + _)
val singleCountMap = sc.broadcast(singleCounts.collectAsMap())
val result = pairCounts.map{case ((user1, user2), intersectionCount) =>(user1, user2, intersectionCount, singleCountMap.value(user1) + singleCountMap.value(user2) - intersectionCount)}


详情:

  1. 总共有 5000 个用户,2500 万个 key (每对 1 个)应该不会太多。我们可以使用 reduceByKey 来计算交点数。

  2. 个人计数可以很容易地在 map 中广播

  3. 现在众所周知:

    并集(用户 1,用户 2)= 计数(用户 1)+ 计数(用户 2)- 交集(用户 1,用户 2)

前两个计数是从广播映射中读取的,而我们映射的是对计数的 rdd。

代码:

//generate ((user1, user2), 1) for pair counts
def pairs(str: String) = {
 val users = str.split("=")(1).split(",")
 val n = users.length
 for(i <- 0 until n; j <- i + 1 until n) yield {
  val pair = if(users(i) < users(j)) {
    (users(i), users(j))
  } else {
   (users(j), users(i))
  } //order of the user in a list shouldn't matter
  (pair, 1)
 } 
}

//generate (user, 1), to obtain single counts
def singles(str: String) = {
  for(user <- str.split("=")(1).split(",")) yield (user, 1)
}


//read the rdd
scala> val data = sc.textFile("test")
scala> data.collect.map(println)
group1=user1,user2
group2=user1,user2,user3
group3=user2,user4
group4=user1,user4
group5=user3,user5
group6=user3,user4,user5
group7=user2,user4
group8=user1,user5
group9=user2,user4,user5
group10=user4,user5

//get the pair counts
scala> val pairCounts  = data.flatMap(pairs).reduceByKey(_ + _)
pairCounts: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[16] at reduceByKey at <console>:25



//just checking
scala> pairCounts.collect.map(println)
((user2,user3),1)
((user1,user3),1)
((user3,user4),1)
((user2,user5),1)
((user1,user5),1)
((user2,user4),3)
((user4,user5),3)
((user1,user4),1)
((user3,user5),2)
((user1,user2),2)

//single counts
scala> val singleCounts = data.flatMap(singles).reduceByKey(_ + _)
singleCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[20] at reduceByKey at <console>:25

scala> singleCounts.collect.map(println)

(user5,5)
(user3,3)
(user1,4)
(user2,5)
(user4,6)


//broadcast single counts
scala> val singleCountMap = sc.broadcast(singleCounts.collectAsMap())

//calculate the results:

最后:

scala> val res = pairCounts.map{case ((user1, user2), intersectionCount) => (user1, user2, intersectionCount, singleCountMap.value(user1) + singleCountMap.value(user2) - intersectionCount)}
res: org.apache.spark.rdd.RDD[(String, String, Int, Int)] = MapPartitionsRDD[23] at map at <console>:33

scala> res.collect.map(println)
(user2,user3,1,7)
(user1,user3,1,6)
(user3,user4,1,8)
(user2,user5,1,9)
(user1,user5,1,8)
(user2,user4,3,8)
(user4,user5,3,8)
(user1,user4,1,9)
(user3,user5,2,6)
(user1,user2,2,7)

注意:

  1. 在生成对时,我对元组进行排序,因为我们不希望用户在列表中的顺序很重要。

  2. 请将用户名字符串编码为整数,您可能会获得显着的性能提升。

关于hadoop - Spark - 寻找重叠值或寻找共同 friend 的变体,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37577830/

有关hadoop - Spark - 寻找重叠值或寻找共同 friend 的变体的更多相关文章

  1. ruby - 寻找通过阅读代码确定编程语言的ruby gem? - 2

    几个月前,我读了一篇关于ruby​​gem的博客文章,它可以通过阅读代码本身来确定编程语言。对于我的生活,我不记得博客或gem的名称。谷歌搜索“ruby编程语言猜测”及其变体也无济于事。有人碰巧知道相关gem的名称吗? 最佳答案 是这个吗:http://github.com/chrislo/sourceclassifier/tree/master 关于ruby-寻找通过阅读代码确定编程语言的rubygem?,我们在StackOverflow上找到一个类似的问题:

  2. hadoop安装之保姆级教程(二)之YARN的配置 - 2

    1.1.1 YARN的介绍 为克服Hadoop1.0中HDFS和MapReduce存在的各种问题⽽提出的,针对Hadoop1.0中的MapReduce在扩展性和多框架⽀持⽅⾯的不⾜,提出了全新的资源管理框架YARN. ApacheYARN(YetanotherResourceNegotiator的缩写)是Hadoop集群的资源管理系统,负责为计算程序提供服务器计算资源,相当于⼀个分布式的操作系统平台,⽽MapReduce等计算程序则相当于运⾏于操作系统之上的应⽤程序。 YARN被引⼊Hadoop2,最初是为了改善MapReduce的实现,但是因为具有⾜够的通⽤性,同样可以⽀持其他的分布式计算模

  3. ruby - 查找重叠的正则表达式匹配项 - 2

    我想找到给定字符串中的所有匹配项,包括重叠匹配项。我怎样才能实现它?#Example"a-b-c-d".???(/\w-\w/)#=>["a-b","b-c","c-d"]expected#Solutionwithoutoverlappedresults"a-b-c-d".scan(/\w-\w/)#=>["a-b","c-d"],but"b-c"ismissing 最佳答案 在积极的前瞻中使用捕获:"a-b-c-d".scan(/(?=(\w-\w))/).flatten#=>["a-b","b-c","c-d"]参见Rubyde

  4. ruby - 确定字符串的结尾是否与单独的字符串的开头重叠 - 2

    我想查找字符串的结尾是否与单独字符串的开头重叠。例如,如果我有这两个字符串:string_1='Peoplesaynothingisimpossible,butI'string_2='butIdonothingeveryday.'如何找到string_1末尾的“butI”部分与string_2开头相同?我可以编写一个方法来遍历这两个字符串,但我希望得到一个包含我错过的Ruby字符串方法或Ruby习惯用法的答案。 最佳答案 将MARKER设置为一些从未出现在您的string_1和string_2中的字符串。有一些方法可以动态地做到这一

  5. ruby-on-rails - Spree 如何在下拉 Ruby on Rails 中显示变体? - 2

    我在我的应用程序中使用spree2.0.0稳定版。在产品展示页面上,所有变体都显示为单选按钮。我只想在下拉列表中显示它们。对此有什么想法吗?谢谢。 最佳答案 注意:此解决方案实现Spree“模板替换方法”,尤其是当您在应用程序设计或使用自定义设计中进行大量设计更改时。看这里http://guides.spreecommerce.com/developer/view.html否则,如果您使用的是Spree商店的默认设计或较小的更改,请使用“破坏”方法。前往:app/views/spree/products/_cart.html.erb

  6. ruby - 检查 ruby 中的两个范围是否重叠 - 2

    我知道我能做到:(1..30).cover?(2)=>true但是当我尝试对另一个范围执行相同操作时,它总是返回false:(1..30).cover?(2..3)=>false所以我的问题是-是否有任何优雅的方法来比较ruby​​中的两个范围?在我的例子中,我想检查两个日期时间范围是否重叠。提前致谢。 最佳答案 给定范围A的两个范围重叠,当:范围B从范围A开始,范围B在范围A内结束或范围B在范围A之前开始,在范围A之后结束例子:RangeA|-----||-----|Case1|-----|Case2|-|Case1+2|----

  7. Unity Shader 学习笔记(5)Shader变体、Shader属性定义技巧、自定义材质面板 - 2

    写在之前Shader变体、Shader属性定义技巧、自定义材质面板,这三个知识点任何一个单拿出来都是一套知识体系,不能一概而论,本文章目的在于将学习和实际工作中遇见的问题进行总结,类似于网络笔记之用,方便后续回顾查看,如有以偏概全、不祥不尽之处,还望海涵。1、Shader变体先看一段代码......Properties{ [KeywordEnum(on,off)]USL_USE_COL("IsUseColorMixTex?",int)=0 [Toggle(IS_RED_ON)]_IsRed("IsRed?",int)=0}......//中间省略,后续会有完整代码 #pragmamulti_c

  8. ruby - 寻找纯 Ruby 的图像处理库 - 2

    关闭。这个问题不符合StackOverflowguidelines.它目前不接受答案。我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以便用事实和引用来回答。关闭7年前。Improvethisquestion我找到的大多数库/代码都是针对RoR而不是纯ruby​​。即使我在谷歌上搜索纯ruby​​图片上传,我也会得到PureRubyOnRails;)所以...我正在寻找一个gem/library/code来做一些简单的事情:检查它是否是一个有效的图像文件将图像调整为预定义的值将其保存为jpg(来自jpeg、jpg、png、gif、bmp)

  9. 大数据之Hadoop数据仓库Hive - 2

    目录:一、简介二、HQL的执行流程三、索引四、索引案例五、Hive常用DDL操作六、Hive常用DML操作七、查询结果插入到表八、更新和删除操作九、查询结果写出到文件系统十、HiveCLI和Beeline命令行的基本使用十一、Hive配置一、简介Hive是一个构建在Hadoop之上的数据仓库,它可以将结构化的数据文件映射成表,并提供类SQL查询功能,用于查询的SQL语句会被转化为MapReduce作业,然后提交到Hadoop上运行。特点:简单、容易上手(提供了类似sql的查询语言hql),使得精通sql但是不了解Java编程的人也能很好地进行大数据分析;灵活性高,可以自定义用户函数(UDF)和

  10. ruby - 寻找产品和商店的最佳组合以最小化成本的算法 - 2

    你好,Stackoverflow的人们,我经营一个网站,为用户寻找最便宜的书籍购买地点。这对于单本书来说很容易,但对于多本书来说,有时在一家商店购买一本书而在另一家商店购买另一本书会更便宜。目前我找到了销售用户列表中所有书籍的最便宜的商店,但我想要一个更智能的系统。这里有更多信息:一本书的价格对于一家商店来说是不变的。运费可能会有所不同,具体取决于书籍的数量或书籍的总值(value)。每个商店对象都可以获取一组书籍并返回运费。通常,并非每家书店都出售每一本书。不确定在这里链接到我的站点是否很酷,但它列在我的用户配置文件中。我希望能够找到最便宜的商店和书籍组合。我担心这需要一种蛮力方法-

随机推荐