草庐IT

Spark:在数组类型列上连接两个数据框

codeneng 2023-03-28 原文

Spark: Join two dataframes on an array type column

我有一个简单的用例
我有两个数据框 df1 和 df2,我正在寻找一种有效的方式来加入它们?

df1:包含我的主要数据框(数十亿条记录)

1
2
3
4
5
6
+--------+-----------+--------------+
|doc_id  |doc_name   |doc_type_id   |
+--------+-----------+--------------+
|   1    |doc_name_1 |[1,4]         |
|   2    |doc_name_2 |[3,2,6]       |
+--------+-----------+--------------+

df2:包含文档类型的标签(40000条记录),因为它是一个小的我正在广播它。

1
2
3
4
5
6
7
8
9
10
+------------+----------------+
|doc_type_id |doc_type_name   |
+------------+----------------+
|   1        |doc_type_1      |
|   2        |doc_type_2      |
|   3        |doc_type_3      |
|   4        |doc_type_4      |
|   5        |doc_type_5      |
|   6        |doc_type_5      |
+------------+----------------+

我想加入这两个数据框以产生如下结果:

1
2
3
4
5
6
+--------+------------+--------------+----------------------------------------+
|doc_id  |doc_name    |doc_type_id   |doc_type_name                           |
+--------+------------+--------------+----------------------------------------+
|   1    |doc_name_1  |[1,4]         |["doc_type_1","doc_type_4"]             |
|   2    |doc_name_2  |[3,2,6]       |["doc_type_3","doc_type_2","doc_type_6"]|
+--------+------------+--------------+----------------------------------------+

谢谢


对于这种情况,我们可以使用 array_contains groupBy collect_list 函数。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val df1=Seq(("1","doc_name_1",Seq(1,4)),("2","doc_name_2",Seq(3,2,6))).toDF("doc_id","doc_name","doc_type_id")

val df2=Seq(("1","doc_type_1"),("2","doc_type_2"),("3","doc_type_3"),("4","doc_type_4"),("5","doc_type_5"),("6","doc_type_6")).toDF("doc_type_id","doc_type_name")

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

df1.createOrReplaceTempView("tbl")
df2.createOrReplaceTempView("tbl2")

spark.sql("select a.doc_id,a.doc_name,a.doc_type_id,collect_list(b.doc_type_name) doc_type_name from tbl a join tbl2 b on array_contains(a.doc_type_id,int(b.doc_type_id)) = TRUE group by a.doc_id,a.doc_name,a.doc_type_id").show(false)

//+------+----------+-----------+------------------------------------+
//|doc_id|doc_name  |doc_type_id|doc_type_name                       |
//+------+----------+-----------+------------------------------------+
//|2     |doc_name_2|[3, 2, 6]  |[doc_type_2, doc_type_3, doc_type_6]|
//|1     |doc_name_1|[1, 4]     |[doc_type_1, doc_type_4]            |
//+------+----------+-----------+------------------------------------+

其他实现方式是使用 explode join collect_list:

1
2
3
4
5
6
7
8
9
10
11
12
13
val df3=df1.withColumn("arr",explode(col("doc_type_id")))

df3.join(df2,df2.col("doc_type_id") === df3.col("arr"),"inner").
groupBy(df3.col("doc_id"),df3.col("doc_type_id"),df3.col("doc_name")).
agg(collect_list(df2.col("doc_type_name")).alias("doc_type_name")).
show(false)

//+------+-----------+----------+------------------------------------+
//|doc_id|doc_type_id|doc_name  |doc_type_name                       |
//+------+-----------+----------+------------------------------------+
//|1     |[1, 4]     |doc_name_1|[doc_type_1, doc_type_4]            |
//|2     |[3, 2, 6]  |doc_name_2|[doc_type_2, doc_type_3, doc_type_6]|
//+------+-----------+----------+------------------------------------+

  • 感谢您的回答,第一种方法有效。作为参考,还有另一篇包含更多选项和详细信息的帖子:stackoverflow.com/questions/59534351/...

有关Spark:在数组类型列上连接两个数据框的更多相关文章

  1. ruby-on-rails - 在 Ruby 中循环遍历多个数组 - 2

    我有多个ActiveRecord子类Item的实例数组,我需要根据最早的事件循环打印。在这种情况下,我需要打印付款和维护日期,如下所示:ItemAmaintenancerequiredin5daysItemBpaymentrequiredin6daysItemApaymentrequiredin7daysItemBmaintenancerequiredin8days我目前有两个查询,用于查找maintenance和payment项目(非排他性查询),并输出如下内容:paymentrequiredin...maintenancerequiredin...有什么方法可以改善上述(丑陋的)代

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby - 多次弹出/移动 ruby​​ 数组 - 2

    我的代码目前看起来像这样numbers=[1,2,3,4,5]defpop_threepop=[]3.times{pop有没有办法在一行中完成pop_three方法中的内容?我基本上想做类似numbers.slice(0,3)的事情,但要删除切片中的数组项。嗯...嗯,我想我刚刚意识到我可以试试slice! 最佳答案 是numbers.pop(3)或者numbers.shift(3)如果你想要另一边。 关于ruby-多次弹出/移动ruby​​数组,我们在StackOverflow上找到一

  4. ruby - 将数组的内容转换为 int - 2

    我需要读入一个包含数字列表的文件。此代码读取文件并将其放入二维数组中。现在我需要获取数组中所有数字的平均值,但我需要将数组的内容更改为int。有什么想法可以将to_i方法放在哪里吗?ClassTerraindefinitializefile_name@input=IO.readlines(file_name)#readinfile@size=@input[0].to_i@land=[@size]x=1whilex 最佳答案 只需将数组映射为整数:@land边注如果你想得到一条线的平均值,你可以这样做:values=@input[x]

  5. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  6. ruby - 通过 erb 模板输出 ruby​​ 数组 - 2

    我正在使用puppet为ruby​​程序提供一组常量。我需要提供一组主机名,我的程序将对其进行迭代。在我之前使用的bash脚本中,我只是将它作为一个puppet变量hosts=>"host1,host2"我将其提供给bash脚本作为HOSTS=显然这对ruby​​不太适用——我需要它的格式hosts=["host1","host2"]自从phosts和putsmy_array.inspect提供输出["host1","host2"]我希望使用其中之一。不幸的是,我终其一生都无法弄清楚如何让它发挥作用。我尝试了以下各项:我发现某处他们指出我需要在函数调用前放置“function_”……这

  7. ruby - 检查数组是否在增加 - 2

    这个问题在这里已经有了答案:Checktoseeifanarrayisalreadysorted?(8个答案)关闭9年前。我只是想知道是否有办法检查数组是否在增加?这是我的解决方案,但我正在寻找更漂亮的方法:n=-1@arr.flatten.each{|e|returnfalseife

  8. ruby - 续集在添加关联时访问many_to_many连接表 - 2

    我正在使用Sequel构建一个愿望list系统。我有一个wishlists和itemstable和一个items_wishlists连接表(该名称是续集选择的名称)。items_wishlists表还有一个用于facebookid的额外列(因此我可以存储opengraph操作),这是一个NOTNULL列。我还有Wishlist和Item具有续集many_to_many关联的模型已建立。Wishlist类也有:selectmany_to_many关联的选项设置为select:[:items.*,:items_wishlists__facebook_action_id].有没有一种方法可以

  9. ruby - Infinity 和 NaN 的类型是什么? - 2

    我可以得到Infinity和NaNn=9.0/0#=>Infinityn.class#=>Floatm=0/0.0#=>NaNm.class#=>Float但是当我想直接访问Infinity或NaN时:Infinity#=>uninitializedconstantInfinity(NameError)NaN#=>uninitializedconstantNaN(NameError)什么是Infinity和NaN?它们是对象、关键字还是其他东西? 最佳答案 您看到打印为Infinity和NaN的只是Float类的两个特殊实例的字符串

  10. ruby - 检查方法参数的类型 - 2

    我不确定传递给方法的对象的类型是否正确。我可能会将一个字符串传递给一个只能处理整数的函数。某种运行时保证怎么样?我看不到比以下更好的选择:defsomeFixNumMangler(input)raise"wrongtype:integerrequired"unlessinput.class==FixNumother_stuffend有更好的选择吗? 最佳答案 使用Kernel#Integer在使用之前转换输入的方法。当无法以任何合理的方式将输入转换为整数时,它将引发ArgumentError。defmy_method(number)

随机推荐