草庐IT

关于 apache spark:Extracting value using Window and Partition

codeneng 2023-03-28 原文

Extracting value using Window and Partition

我在 pyspark 中有一个数据框

1
2
3
4
5
6
7
8
9
10
id | value

1     0
1     1
1     0
2     1
2     0
3     0
3     0
3     1

我想提取同一 id 组中 value 列中第一次出现 1 之后的所有行。我创建了带有 Id 分区的窗口,但不知道如何获取值 1 之后存在的行。

我期待结果是

1
2
3
4
5
6
7
 id | value

    1     1
    1     0
    2     1
    2     0
    3     1

  • 你有定义窗口内排序的东西吗?否则我认为结果将是不确定的
  • 我只能按 id 列订购。
  • 底层数据模型是一个集合,而不是一个列表,例如对于 id=1,值 0、1 和 0 可以按任何顺序处理。如果没有洗牌,顺序保持不变,但不会基于该假设构建任何东西


以下解决方案可能与此相关(它对于小数据非常有效,但如果 id 在多个分区上可能会导致大数据出现问题)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
df = sqlContext.createDataFrame([
         [1, 0],
         [1, 1],
         [1, 0],
         [2, 1],
         [2, 0],
         [3, 0],
         [3, 0],
         [3, 1]
    ],
    ['id', 'Value']
)
df.show()
+---+-----+
| id|Value|
+---+-----+
|  1|    0|
|  1|    1|
|  1|    0|
|  2|    1|
|  2|    0|
|  3|    0|
|  3|    0|
|  3|    1|
+---+-----+

#importing Libraries
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
import sys

#This way we can generate a cumulative sum for values
df.withColumn(
   "sum",
    F.sum(
       "value"
    ).over(W.partitionBy(["id"]).rowsBetween(-sys.maxsize, 0))
).show()
+---+-----+-----+
| id|Value|sum  |
+---+-----+-----+
|  1|    0|    0|
|  1|    1|    1|
|  1|    0|    1|
|  3|    0|    0|
|  3|    0|    0|
|  3|    1|    1|
|  2|    1|    1|
|  2|    0|    1|
+---+-----+-----+

#Filter all those which are having sum > 0
df.withColumn(
   "sum",
    F.sum(
       "value"
    ).over(W.partitionBy(["id"]).rowsBetween(-sys.maxsize, 0))
).where("sum > 0").show()

+---+-----+-----+
| id|Value|sum  |
+---+-----+-----+
|  1|    1|    1|
|  1|    0|    1|
|  3|    1|    1|
|  2|    1|    1|
|  2|    0|    1|
+---+-----+-----+

Before running this you must be sure that data related to ID should be partitioned and no id can be on 2 partitions.


理想情况下,您需要:

  • 创建一个由 id 分区的窗口,并以与数据框相同的方式排序
  • 只保留窗口中前面有"一"的行
  • AFAIK,Spark 的窗口中没有查找功能。但是,您可以遵循这个想法并解决一些问题。让我们首先创建数据并导入函数和窗口。

    1
    2
    3
    4
    5
    import pyspark.sql.functions as F
    from pyspark.sql.window import Window

    l = [(1, 0), (1, 1), (1, 0), (2, 1), (2, 0), (3, 0), (3, 0), (3, 1)]
    df = spark.createDataFrame(l, ['id', 'value'])

    然后,让我们在数据框上添加一个索引(它是免费的),以便能够对窗口进行排序。

    1
    indexedDf = df.withColumn("index", F.monotonically_increasing_id())

    然后我们创建一个窗口,它只查看当前行之前的值,按该索引排序并按 id 分区。

    1
    w = Window.partitionBy("id").orderBy("index").rowsBetween(Window.unboundedPreceding, 0)

    最后,我们使用该窗口收集每行的前面值集合,并过滤掉不包含 1 的值。可选地,我们按 index 排序,因为窗口化不保留按 id 列的顺序。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    indexedDf\\
        .withColumn('set', F.collect_set(F.col('value')).over(w))\\
        .where(F.array_contains(F.col('set'), 1))\\
        .orderBy("index")\\
        .select("id","value").show()

    +---+-----+
    | id|value|
    +---+-----+
    |  1|    1|
    |  1|    0|
    |  2|    1|
    |  2|    0|
    |  3|    1|
    +---+-----+

    有关关于 apache spark:Extracting value using Window and Partition的更多相关文章

    1. ruby-on-rails - 关于 Ruby 的一般问题 - 2

      我在我的rails应用程序中安装了来自github.com的acts_as_versioned插件,但有一段代码我不完全理解,我希望有人能帮我解决这个问题class_eval我知道block内的方法(或任何它是什么)被定义为类内的实例方法,但我在插件的任何地方都找不到定义为常量的CLASS_METHODS,而且我也不确定是什么here,并且有问题的代码从lib/acts_as_versioned.rb的第199行开始。如果有人愿意告诉我这里的内幕,我将不胜感激。谢谢-C 最佳答案 这是一个异端。http://en.wikipedia

    2. ruby - 我怎样才能更好地了解/了解更多关于 Ruby 的知识? - 2

      按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visitthehelpcenter指导。关闭9年前。我最近开始学习Ruby,这是我的第一门编程语言。我对语法感到满意,并且我已经完成了许多只教授相同基础知识的教程。我已经写了一些小程序(包括我自己的数组排序方法,在有人告诉我谷歌“冒泡排序”之前我认为它非常聪明),但我觉得我需要尝试更大更难的东西来理解更多关于Ruby.关于如何执行此操作的任何想法?

    3. ruby - 关于 Ruby 中 Dir[] 和 File.join() 的混淆 - 2

      我在Ruby中遇到了一个关于Dir[]和File.join()的简单程序,blobs_dir='/path/to/dir'Dir[File.join(blobs_dir,"**","*")].eachdo|file|FileUtils.rm_rf(file)ifFile.symlink?(file)我有两个困惑:首先,File.join(@blobs_dir,"**","*")中的第二个和第三个参数是什么意思?其次,Dir[]在Ruby中有什么用?我只知道它等价于Dir.glob(),但是,我对Dir.glob()确实不是很清楚。 最佳答案

    4. elasticsearch源码关于TransportSearchAction【阶段三】 - 2

      1.回顾.TransportServicepublicclassTransportServiceextendsAbstractLifecycleComponentTransportService:方法:1publicfinalTextendsTransportResponse>voidsendRequest(finalTransport.Connectionconnection,finalStringaction,finalTransportRequestrequest,finalTransportRequestOptionsoptions,TransportResponseHandlerT>

    5. 关于Qt程序打包后运行库依赖的常见问题分析及解决方法 - 2

      目录一.大致如下常见问题:(1)找不到程序所依赖的Qt库version`Qt_5'notfound(requiredby(2)CouldnotLoadtheQtplatformplugin"xcb"in""eventhoughitwasfound(3)打包到在不同的linux系统下,或者打包到高版本的相同系统下,运行程序时,直接提示段错误即segmentationfault,或者Illegalinstruction(coredumped)非法指令(4)ldd应用程序或者库,查看运行所依赖的库时,直接报段错误二.问题逐个分析,得出解决方法:(1)找不到程序所依赖的Qt库version`Qt_5'

    6. ruby - 关于 Ruby/ChefSpec 编码风格的反馈 - 2

      我是Ruby的新手,但过去两周我一直在对Chef测试进行大量研究。该测试使用ChefSpec和Fauxhai,但它看起来不是很“像ruby”,我希望社区能给我一些编码风格的建议。有没有更好的方法来编写这样的嵌套循环?Recipe/foo/recipes/default.rbpackage"foo"doaction:installendRecipe/foo/spec/default_spec.rbrequire'chefspec'describe'foo::default'doplatforms={"debian"=>['6.0.5'],"ubuntu"=>['12.04','10.04

    7. ruby - 关于 ruby​​ 类变量的困惑 - 2

      假设一个使用类变量的简单ruby​​程序,classHolder@@var=99defHolder.var=(val)@@var=valenddefvar@@varendend@@var="toplevelvariable"a=Holder.newputsa.var我猜结果应该是99,但输出不是99。我想知道为什么。由于类变量的范围是类,我假设@@var="toplevelvariable"行不会影响类中的变量。 最佳答案 @@var是Holder的类变量。而顶层的@@var不是Holder的同名类变量@@var,是你在创建类Obj

    8. 一文解决关于VLAN所有的疑惑 - 2

      一文解决关于VLAN所有的疑惑VLAN基本概念为什么需要VLAN?怎么在交换机上划分VLAN,VLAN的工作原理有了子网,已经隔离了广播,还需要VLAN干啥?只进行子网划分,不进行VLAN划分VLAN划分与子网划分附加VLAN信息的方法VLAN划分交换机的端口类型(Access和Trunk)一、访问链接二、汇聚链接汇聚链接VLAN间通信为什么要进行VLAN间通信?路由器实现VLAN间通信路由器和交换机的连接方式通信细节三层交换机实现VLAN间通信加速VLAN间通信三层交换机与路由器三层交换机路由器路由器和交换机配合构建LAN的实例使用VLAN设计局域网的特点VLAN增加网络的灵活性不使用VLA

    9. ruby - 关于 CoffeeScript 变量范围的困惑 - 2

      我正在尝试了解CoffeeScript变量的范围。根据文档:ThisbehavioriseffectivelyidenticaltoRuby'sscopeforlocalvariables.但是,我发现它的工作方式不同。在CoffeeScript中a=1changeValue=->a=3changeValue()console.log"a:#{a}"#Thisdisplays3在ruby中a=1deffa=3endputsa#Thisdisplays1有人能解释一下吗? 最佳答案 Ruby的局部变量(以[a-z_]开头)arerea

    10. 关于yolov5训练时参数workers和batch-size的理解 - 2

      关于yolov5训练时参数workers和batch-size的理解yolov5训练命令workers和batch-size参数的理解两个参数的调优总结yolov5训练命令python.\train.py--datamy.yaml--workers8--batch-size32--epochs100yolov5的训练很简单,下载好仓库,装好依赖后,只需自定义一下data目录中的yaml文件就可以了。这里我使用自定义的my.yaml文件,里面就是定义数据集位置和训练种类数和名字。workers和batch-size参数的理解一般训练主要需要调整的参数是这两个:workers指数据装载时cpu所使

    随机推荐