相关推荐recommended
pyspark笔记:over
作者:mmseoamin日期:2024-01-22

1 方法介绍

  • 在 PySpark 中,over 函数是一个非常重要的概念,尤其是在使用窗口函数(例如 row_number, rank, dense_rank, lead, lag 等)时。
  • over 函数允许你对一个数据集进行分组,然后在每个分组内应用窗口函数。

    1.1 基本概念

    • 窗口函数:

      • 在 PySpark 中,窗口函数是用于执行聚合和其他复杂操作的函数,这些操作涉及到某种形式的分区和排序。

      • 窗口函数不会导致行被折叠成单个输出行,不像标准的聚合函数那样。相反,它们会生成与输入行数相同的输出行数。

    • 窗口规范(Window Specification):

      • 在使用 over 函数时,你需要定义一个窗口规范。这个规范描述了窗口函数的作用范围,包括如何对数据进行分区(partitioning)、如何排序(ordering)以及是否有行或范围限制(frame specification)

      1.2 over函数通常步骤

      在 PySpark 中,使用 over 函数通常涉及以下步骤:

      • 定义窗口规范:

        • 使用 Window 类来定义分区和排序规则。

        • 例如,Window.partitionBy("column1").orderBy("column2") 表示按 column1 进行分区,并在每个分区内按 column2 排序。

      • 应用窗口函数:

        • 窗口函数被应用于定义的窗口规范。
        • 例如,F.row_number().over(windowSpec) 会在每个按 windowSpec 定义的窗口内对行进行编号。
          • 在这里,F.row_number() 是窗口函数,而 .over(windowSpec) 则指定了这个函数应该如何在数据上操作。
      • 2 举例

        2.1 创建DataFrame

        假设有一个如下的 DataFrame:

        pyspark笔记:over,第1张

        from pyspark.sql import Row
        data = [
             Row(id=1, Group='A',Value=10),
             Row(id=2, Group='A',Value=20),
             Row(id=3, Group='B',Value=30),
             Row(id=4, Group='B',Value=40)
        ]
         
        df = spark.createDataFrame(data)
         
        df.show()
        

         pyspark笔记:over,第2张

        现在,如果你想在每个 Group 内部对 Value 进行排名,你可以使用 over 函数与 rank() 窗口函数结合来实现这一点:

        2.2 定义窗口规范

        from pyspark.sql.window import Window
        import pyspark.sql.functions as F
        windowSpec = Window.partitionBy("Group").orderBy("Value")
        '''
        partitionBy("Group") 表示数据将根据 Group 列的值进行分区。在每个分区内,数据行将独立于其他分区处理。
        orderBy("Value") 指定了在每个分区内,数据将根据 Value 列的值进行排序。
        注:此时windowSpec 本身并不知道它将被应用于哪个 DataFrame。它只是定义了一个窗口规范
        '''

        2.3 应用窗口规范到 DataFrame

        windowSpec 本身并不知道它将被应用于哪个 DataFrame。它只是定义了一个窗口规范。当在 df.withColumn 中使用 .over(windowSpec) 时,就指定了在 df 上应用这个窗口规范。

        df.withColumn("rank", F.rank().over(windowSpec)).show()
        '''
        df.withColumn———— 创建了 df 的一个新版本,其中包含了一个新列 "rank"
        F.rank().over(windowSpec) ————计算了一个窗口函数 rank,该函数在 windowSpec 定义的每个分区内为每行分配一个排名
        '''

        pyspark笔记:over,第3张