相关推荐recommended
SparkSession对象操作--学习笔记
作者:mmseoamin日期:2024-01-22

1,SparkSession对象操作

    from pyspark.sql import SparkSession
    from pyspark import SparkConf
    from pyspark.sql import functions as F
    
    """
    创建ss对象时可以指定一些参数
    如果参数在脚本中不生效,就需要通过saprk-submit指令中进行设置
    spark sql 的分区数是由catalyst引擎的优化器决定
    发生shuffle过程(遇到宽依赖算子时)分区数会调整为200个,200个分区对应者200个task任务
    可以通过spark.sql.shuffffle.partitions调整shuffle过程中的分区数(根据实际业务情况调整)
    
    
    """
    # conf = SparkConf().set('driver-mimory','2g').set('num-executors','3')
    conf = SparkConf().set('spark.sql.shuffle.partitions','6')
    ss = SparkSession.\
        builder.\
        master('yarn').\
        appName('yarn_demo').\
        config(conf=conf).\
        getOrCreate()
    
    # 创建sc对象
    sc= ss.sparkContext
    
    #读取hdfs上的文件数据转换成rdd对象
    rdd1 = sc.textFile('/test/stu.txt')
    rdd_split = rdd1.map(lambda x:x.split(','))
    print(rdd1.take(10))
    
    
    df1 = rdd_split.toDF(schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string')
    df1.show()
    
    #通过cast()修改字段类型,格式为df.列名.cast('修改后的列名')
    df_select4 = df1.select(df1.id.cast('int'),df1.name,df1['age'].cast('int'),df1['gender'],df1['major'],df1['birthday'])
    # print(df_select4.collect())
    df_groupby= df_select4.groupby('gender').agg(F.avg('age').alias('avg_age'))
    df_groupby.show()

2,数据源和格式

1.1数据读取

    from pyspark.sql import SparkSession,functions as F
    
    ss = SparkSession.builder.master('local[*]').appName('local_demo').getOrCreate()
    
    
    print('--------------------txt格式文件----------------')
    
    #将读取到的数据保存到value列中
    df1 = ss.read.text(paths='/test/words.txt')
    # df1.show(truncate=False)
    # df1.printSchema()
    df_txt =df1.select(F.split('value',',')[0].alias('id'),
                        F.split('value', ',')[1].alias('name'),
                        F.split('value', ',')[2].alias('age'),
                        F.split('value', ',')[3].alias('gender'),
                        )
    
    
    # df_txt.show()
    # df_txt.printSchema()
    
    
    print('--------------------csv格式文件----------------')
    #path:文件路径
    #sep:分隔符,默认时逗号
    # schema:表结构,列名,类型
    # header:加载第一行列名信息
    #inferSchema:自动解析表结构
    
    df_csv = ss.read.csv(path ='/test/stu.csv',sep=',',inferSchema=True,schema='name string,age int,gender string,phone string,email string,city string,address string')
    df_csv.show()
    #另一种书写方式
    ss.read.load(path= '/test/stu.csv',format='csv',schema='name string,age int,gender string,phone string,email string,city string,address string').show()
    
    
    # ss.read.format('csv')
    
    print('--------------------json格式文件----------------------')
    
    df_json = ss.read.json(path = '/test/x0.json')
    df_json.show(truncate=False)
    
    
    print('--------------------mysql格式文件----------------------')
    df_mysql = ss.read.jdbc(url='jdbc:mysql://node1:3306/BI_db?characterEncoding=utf-8',
                            table='test',
                            properties={'user':'root','password':'123456',
                                       'driver':'com.mysql.jdbc.Driver'})
    df_mysql.show()
    
    
    print('--------------------orc格式文件读取----------------------')
    df_orc = ss.read.orc('file:///export/server/spark/examples/src/main/resources/users.orc')
    df_orc.show()
    
    print('--------------------parquet格式文件读取----------------------')
    
    df_parquet = ss.read.parquet('file:///export/server/spark/examples/src/main/resources/users.parquet')
    df_parquet.show()

1.2数据写入

    """
    ss.write.text/json/csv/jdbc()
    mode:写入模式overwrite:覆盖写,append:追加写
    
    """
    from pyspark.sql import SparkSession,functions as F
    
    ss = SparkSession.builder.getOrCreate()
    
    
    df = ss.createDataFrame([[1,'张三',20,'男'],[2,'王五',21,'女']],schema='id int,name string,age int,gender string')
    
    df.show()
    
    
    print('-----------------text文件---------------')
    
    
    #对字符串进行处理,以字符串类型保存到value字段中
    df_text = df.select(F.concat_ws(',','id','name','age','gender').alias('value'))
    df_text.show()
    
    # path:目录路径  按照分区数据写入到目录下的文件中
    # df_text.write.text(path='/test/data_test')
    
    
    
    # df_text.write.save(path='/test/data_test',format='text',mode='append')
    df_text.write.mode('overwrite').format('text').save(path='/test/data_test')
    print('-----------------csv文件---------------')
    
    #header:是否将列名写入
    df.write.csv(path='/test/data_csv',mode='overwrite',header=True)
    
    print('-----------------json文件---------------')
    df.write.mode('overwrite').format('json').save(path='/test/data_json')
    
    print('-----------------mysql表文件---------------')
    #参数说明
    #table:表不存在的话会自动创建
    #mode:写入的模式有两种overwrite和append,需要指定,不指定第一遍创建可以成功第二遍创建会失败
    
    df.write.jdbc(url='jdbc:mysql://node1:3306/BI_db?characterEncoding=utf-8',
                  table='test2',
                  mode='append',
                  properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})

3,自定义函数

业务中需求无法是使用内置函数处理数据时,可以来自己定义函数实现需求处理

3.1分类

  • UDF函数
    • 一对一关系, df中的一行数据经过函数处理返回一行计算结果
    • concat()/concat_ws()/split()…
    • 可以自定义
    • UDAF函数 聚合函数
      • 多对一关系, df中的多行数据经过函数处理返回一行计算结果
      • sum()/avg()/count()…
      • 可以自定义
      • UDTF函数
        • 一对多关系, df中的一行数据经过函数处理返回多行计算结果
        • explode() -> 爆炸函数, 接受容器类型(array or map type), 将容器中的元素拆分成多行

          3.1UDTF函数

              from pyspark.sql import SparkSession,functions as F
              
              
              ss = SparkSession.builder.getOrCreate()
              
              
              #读取文件
              
              df1 = ss.read.text(paths='/test/words.txt')
              df1.show(truncate=False)
              #对value字段中的字符串数据以逗号进行分割,返回列表
              df_split = df1.select(F.split('value',',').alias('words_list'))
              df_split.show(truncate=False)
              #使用udtf函数对列表进行拆分
              df3 = df_split.select(F.explode('words_list').alias('word'))
              
              df3.show()
              df3.groupby('word').count().orderBy('count',ascending=False).show()
              
              #sql方式
              df1.createTempView('test')
              res_df = ss.sql('select split(value,',') from test')
              res_df.show()
              res_df.printSchema()
          

          3.2,UDF函数使用

          自定义udf函数需要啊先注册才能够使用

          两种注册方式:

          普通注册:
              import re
              
              from pyspark.sql import SparkSession, functions as F
              from pyspark.sql.types import StringType, ArrayType
              
              ss = SparkSession.builder.master('local[*]').appName('local_demo').getOrCreate()
              
              df_csv = ss.read.csv(path='/test/stu.csv', sep=',', inferSchema=True,
                                   schema='name string,age int,gender string,phone string,email string,city string,address string')
              df_csv.show()
              
              
              # 需求获取用户名称公司名称信息
              def get_emial(x):
                  # print('x的值是'+x)
                  # 通过正则表达式获取想要的部分
                  res = re.match("(.*?)@(.*?)\.(.*)", x)
                  # print(res)
                  name = res.group(1)
                  company = res.group(2)
                  # 返回列表
                  return [name, company]
              
              
              # 方式一:普通注册
              email_func = ss.udf.register(name='new_func', f=get_emial, returnType=ArrayType(StringType()))
              
              # 在df对象中使用自定义函数
              new_df = df_csv.select('name', 'age', email_func('email'))
              new_df.show()
              
              # Sql方式
              df_csv.createTempView('stu')
              sql_df = ss.sql("select name,age,new_func(email)[0] as user_name,new_func(email)[1] as company from stu")
              sql_df.show()
          
          装饰器注册方式

          UDF只能在DSL方式中使用

              import re
              
              from pyspark.sql import SparkSession, functions as F
              from pyspark.sql.types import *
              
              ss = SparkSession.builder.master('local[*]').appName('local_demo').getOrCreate()
              
              df_csv = ss.read.csv(path='/test/stu.csv', sep=',', inferSchema=True,
                                   schema='name string,age int,gender string,phone string,email string,city string,address string')
              # 需求获取用户名称公司名称信息
              #步骤一:定义函数
              #步骤2,将自定义
              
              @F.udf(returnType=ArrayType(StringType()))
              def get_emial(x):
                  # print('x的值是'+x)
                  # 通过正则表达式获取想要的部分
                  res = re.match("(.*?)@(.*?)\.(.*)", x)
                  # print(res)
                  name = res.group(1)
                  company = res.group(2)
                  # 返回列表
                  return [name, company]
              
              
              # 在df对象中使用自定义函数
              new_df = df_csv.select('name', 'age', get_emial('email'))
              new_df.show()
              
              # # Sql方式,装饰器方式不能在sql方式中使用
              # df_csv.createTempView('stu')
              # sql_df = ss.sql("select name,age,new_func(email)[0] as user_name,new_func(email)[1] as company from stu")
              # sql_df.show()
          

          4,UDAF函数

          注意:UDAF函数需要借助pandas中的series类型进行操作

          UDAF函数中多行数就是pandas中的series类型数据

          pandas介绍:

          pandas是python中一个数据分析包,需要通过pip install pandas进行安装

          4.1pandas有两种数据类型:Series和DataFrame

              import pandas as pd
              
              
              #创建series对象
              s1 = pd.Series(data=[1,2,3,4])
              #不指定索引时默认生成0,1,2,3,4
              print(s1)
              #指定行索引 index=
              s2 = pd.Series(data=(5,6,7,8),index=['a','b','c','d'])
              print(s2)
              print(type(s2))
              print(type(s1))
              #获取具体值
              #根据行索引获取对应位置的值
              print(s1[0])
              #通过key获取值
              print(s2['a'])
              
              #使用聚合函数
              print(s1.sum())
              print(s1.mean())
              # print(s1.cumsum())
              #获取行索引
              print(s1.index)
          

          4.2dataFrame对象操作

          import pandas as pd
          #创建对象
          df = pd.DataFrame(data=[[1,'张三',12],[2,'李四',23]])
          print(df)
          #指定行列索引
          df2 = pd.DataFrame(data=[[1,'张三',12],[2,'李四',23],[3,'w',12]],
                             index=['a','b','c'],
                             columns=['id','name','age'])
          print(df2)
          #获取df中的数据值
          #通过df[列名]->获取列数据
          print(df2['id'])
          print(df2.age)
          #得到一个df对象
          print(df2[['id','name']])
          """
          loc:通过索引标签值获取数据
          iloc:通过索引下标值获取数据
          """
          #获取行数据
          print(df2.loc['a'])
          print(df2.iloc[0])
          #获取列数据
          print(df2.loc[:,'id'])
          print(df2.loc[:,['id','name']])
          print(df2.iloc[:,0])
          print(df2.iloc[:,[0,2]])
          #获取行列数据
          print(df2.loc['b','name'])
          print(df2.iloc[1,1])
          #聚合函数
          print(df2.sum())
          print(df2['age'].mean())
          #分组聚合
          print(df2.groupby('id')['age'].sum())
          

          4.3pandas和spark的df相互转换

          import pandas as pd
          df2 = pd.DataFrame(data=[[1,'张三',12],[2,'李四',23],[3,'w',12]],
                             index=['a','b','c'],
                             columns=['id','name','age'])
          from pyspark.sql import SparkSession
          # 创建ss对象
          ss = SparkSession.builder.getOrCreate()
          spark_df = ss.createDataFrame(data=df2,schema='id int,name string,age int')
          spark_df.show()
          #saprk的df对象转换成pandas的df对象
          new_pandas_dfd = spark_df.toPandas()
          print(new_pandas_dfd)
          

          4.4UDAF函数使用

          注意点:需要安装pyspark模块

          pyspark代码是会转换成java代码, 而pandas是python中特有的模块, java中没有此模块

          自定义UDAF函数只能通过装饰器方式注册

          自定义UDAF函数只能在DSL方式中使用

          import pandas as pd
          from pyspark.sql import SparkSession,functions as F
          from pyspark.sql.types import *
          ss = SparkSession.builder.getOrCreate()
          df_csv = ss.read.csv(path ='/test/stu.csv',sep=',',inferSchema=True,schema='name string,age int,gender string,phone string,email string,city string,address string')
          df_csv.groupby('gender').agg(F.mean('age').alias('avg_age')).show()
          #手写聚合函数mean()
          """
          注意点:
          ①:需要指定自定义函数的参数类型,pandas的series类型
          ②:需要指定自定义函数的返回值类型python的类型
          """
          @F.pandas_udf(returnType=FloatType())
          def avg_age(age:pd.Series) ->float:
              print('age的值',age)
              res = age.mean()
              return res
          #第三步在df对象中使用udaf函数
          df_csv.select(avg_age('age')).show()
          #sql方式
          #将自定义rdaf函数
          new_func = ss.udf.register('new_func',avg_age)
          df_csv.createTempView('stu')
          ss.sql("select gender,new_func(age) from stu group by gender").show()