from pyspark.sql import SparkSession from pyspark import SparkConf from pyspark.sql import functions as F """ 创建ss对象时可以指定一些参数 如果参数在脚本中不生效,就需要通过saprk-submit指令中进行设置 spark sql 的分区数是由catalyst引擎的优化器决定 发生shuffle过程(遇到宽依赖算子时)分区数会调整为200个,200个分区对应者200个task任务 可以通过spark.sql.shuffffle.partitions调整shuffle过程中的分区数(根据实际业务情况调整) """ # conf = SparkConf().set('driver-mimory','2g').set('num-executors','3') conf = SparkConf().set('spark.sql.shuffle.partitions','6') ss = SparkSession.\ builder.\ master('yarn').\ appName('yarn_demo').\ config(conf=conf).\ getOrCreate() # 创建sc对象 sc= ss.sparkContext #读取hdfs上的文件数据转换成rdd对象 rdd1 = sc.textFile('/test/stu.txt') rdd_split = rdd1.map(lambda x:x.split(',')) print(rdd1.take(10)) df1 = rdd_split.toDF(schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string') df1.show() #通过cast()修改字段类型,格式为df.列名.cast('修改后的列名') df_select4 = df1.select(df1.id.cast('int'),df1.name,df1['age'].cast('int'),df1['gender'],df1['major'],df1['birthday']) # print(df_select4.collect()) df_groupby= df_select4.groupby('gender').agg(F.avg('age').alias('avg_age')) df_groupby.show()
from pyspark.sql import SparkSession,functions as F ss = SparkSession.builder.master('local[*]').appName('local_demo').getOrCreate() print('--------------------txt格式文件----------------') #将读取到的数据保存到value列中 df1 = ss.read.text(paths='/test/words.txt') # df1.show(truncate=False) # df1.printSchema() df_txt =df1.select(F.split('value',',')[0].alias('id'), F.split('value', ',')[1].alias('name'), F.split('value', ',')[2].alias('age'), F.split('value', ',')[3].alias('gender'), ) # df_txt.show() # df_txt.printSchema() print('--------------------csv格式文件----------------') #path:文件路径 #sep:分隔符,默认时逗号 # schema:表结构,列名,类型 # header:加载第一行列名信息 #inferSchema:自动解析表结构 df_csv = ss.read.csv(path ='/test/stu.csv',sep=',',inferSchema=True,schema='name string,age int,gender string,phone string,email string,city string,address string') df_csv.show() #另一种书写方式 ss.read.load(path= '/test/stu.csv',format='csv',schema='name string,age int,gender string,phone string,email string,city string,address string').show() # ss.read.format('csv') print('--------------------json格式文件----------------------') df_json = ss.read.json(path = '/test/x0.json') df_json.show(truncate=False) print('--------------------mysql格式文件----------------------') df_mysql = ss.read.jdbc(url='jdbc:mysql://node1:3306/BI_db?characterEncoding=utf-8', table='test', properties={'user':'root','password':'123456', 'driver':'com.mysql.jdbc.Driver'}) df_mysql.show() print('--------------------orc格式文件读取----------------------') df_orc = ss.read.orc('file:///export/server/spark/examples/src/main/resources/users.orc') df_orc.show() print('--------------------parquet格式文件读取----------------------') df_parquet = ss.read.parquet('file:///export/server/spark/examples/src/main/resources/users.parquet') df_parquet.show()
""" ss.write.text/json/csv/jdbc() mode:写入模式overwrite:覆盖写,append:追加写 """ from pyspark.sql import SparkSession,functions as F ss = SparkSession.builder.getOrCreate() df = ss.createDataFrame([[1,'张三',20,'男'],[2,'王五',21,'女']],schema='id int,name string,age int,gender string') df.show() print('-----------------text文件---------------') #对字符串进行处理,以字符串类型保存到value字段中 df_text = df.select(F.concat_ws(',','id','name','age','gender').alias('value')) df_text.show() # path:目录路径 按照分区数据写入到目录下的文件中 # df_text.write.text(path='/test/data_test') # df_text.write.save(path='/test/data_test',format='text',mode='append') df_text.write.mode('overwrite').format('text').save(path='/test/data_test') print('-----------------csv文件---------------') #header:是否将列名写入 df.write.csv(path='/test/data_csv',mode='overwrite',header=True) print('-----------------json文件---------------') df.write.mode('overwrite').format('json').save(path='/test/data_json') print('-----------------mysql表文件---------------') #参数说明 #table:表不存在的话会自动创建 #mode:写入的模式有两种overwrite和append,需要指定,不指定第一遍创建可以成功第二遍创建会失败 df.write.jdbc(url='jdbc:mysql://node1:3306/BI_db?characterEncoding=utf-8', table='test2', mode='append', properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})
业务中需求无法是使用内置函数处理数据时,可以来自己定义函数实现需求处理
3.1分类
from pyspark.sql import SparkSession,functions as F ss = SparkSession.builder.getOrCreate() #读取文件 df1 = ss.read.text(paths='/test/words.txt') df1.show(truncate=False) #对value字段中的字符串数据以逗号进行分割,返回列表 df_split = df1.select(F.split('value',',').alias('words_list')) df_split.show(truncate=False) #使用udtf函数对列表进行拆分 df3 = df_split.select(F.explode('words_list').alias('word')) df3.show() df3.groupby('word').count().orderBy('count',ascending=False).show() #sql方式 df1.createTempView('test') res_df = ss.sql('select split(value,',') from test') res_df.show() res_df.printSchema()
自定义udf函数需要啊先注册才能够使用
两种注册方式:
import re from pyspark.sql import SparkSession, functions as F from pyspark.sql.types import StringType, ArrayType ss = SparkSession.builder.master('local[*]').appName('local_demo').getOrCreate() df_csv = ss.read.csv(path='/test/stu.csv', sep=',', inferSchema=True, schema='name string,age int,gender string,phone string,email string,city string,address string') df_csv.show() # 需求获取用户名称公司名称信息 def get_emial(x): # print('x的值是'+x) # 通过正则表达式获取想要的部分 res = re.match("(.*?)@(.*?)\.(.*)", x) # print(res) name = res.group(1) company = res.group(2) # 返回列表 return [name, company] # 方式一:普通注册 email_func = ss.udf.register(name='new_func', f=get_emial, returnType=ArrayType(StringType())) # 在df对象中使用自定义函数 new_df = df_csv.select('name', 'age', email_func('email')) new_df.show() # Sql方式 df_csv.createTempView('stu') sql_df = ss.sql("select name,age,new_func(email)[0] as user_name,new_func(email)[1] as company from stu") sql_df.show()
UDF只能在DSL方式中使用
import re from pyspark.sql import SparkSession, functions as F from pyspark.sql.types import * ss = SparkSession.builder.master('local[*]').appName('local_demo').getOrCreate() df_csv = ss.read.csv(path='/test/stu.csv', sep=',', inferSchema=True, schema='name string,age int,gender string,phone string,email string,city string,address string') # 需求获取用户名称公司名称信息 #步骤一:定义函数 #步骤2,将自定义 @F.udf(returnType=ArrayType(StringType())) def get_emial(x): # print('x的值是'+x) # 通过正则表达式获取想要的部分 res = re.match("(.*?)@(.*?)\.(.*)", x) # print(res) name = res.group(1) company = res.group(2) # 返回列表 return [name, company] # 在df对象中使用自定义函数 new_df = df_csv.select('name', 'age', get_emial('email')) new_df.show() # # Sql方式,装饰器方式不能在sql方式中使用 # df_csv.createTempView('stu') # sql_df = ss.sql("select name,age,new_func(email)[0] as user_name,new_func(email)[1] as company from stu") # sql_df.show()
注意:UDAF函数需要借助pandas中的series类型进行操作
UDAF函数中多行数就是pandas中的series类型数据
pandas是python中一个数据分析包,需要通过pip install pandas进行安装
import pandas as pd #创建series对象 s1 = pd.Series(data=[1,2,3,4]) #不指定索引时默认生成0,1,2,3,4 print(s1) #指定行索引 index= s2 = pd.Series(data=(5,6,7,8),index=['a','b','c','d']) print(s2) print(type(s2)) print(type(s1)) #获取具体值 #根据行索引获取对应位置的值 print(s1[0]) #通过key获取值 print(s2['a']) #使用聚合函数 print(s1.sum()) print(s1.mean()) # print(s1.cumsum()) #获取行索引 print(s1.index)
4.2dataFrame对象操作
import pandas as pd #创建对象 df = pd.DataFrame(data=[[1,'张三',12],[2,'李四',23]]) print(df) #指定行列索引 df2 = pd.DataFrame(data=[[1,'张三',12],[2,'李四',23],[3,'w',12]], index=['a','b','c'], columns=['id','name','age']) print(df2) #获取df中的数据值 #通过df[列名]->获取列数据 print(df2['id']) print(df2.age) #得到一个df对象 print(df2[['id','name']]) """ loc:通过索引标签值获取数据 iloc:通过索引下标值获取数据 """ #获取行数据 print(df2.loc['a']) print(df2.iloc[0]) #获取列数据 print(df2.loc[:,'id']) print(df2.loc[:,['id','name']]) print(df2.iloc[:,0]) print(df2.iloc[:,[0,2]]) #获取行列数据 print(df2.loc['b','name']) print(df2.iloc[1,1]) #聚合函数 print(df2.sum()) print(df2['age'].mean()) #分组聚合 print(df2.groupby('id')['age'].sum())
4.3pandas和spark的df相互转换
import pandas as pd df2 = pd.DataFrame(data=[[1,'张三',12],[2,'李四',23],[3,'w',12]], index=['a','b','c'], columns=['id','name','age']) from pyspark.sql import SparkSession # 创建ss对象 ss = SparkSession.builder.getOrCreate() spark_df = ss.createDataFrame(data=df2,schema='id int,name string,age int') spark_df.show() #saprk的df对象转换成pandas的df对象 new_pandas_dfd = spark_df.toPandas() print(new_pandas_dfd)
4.4UDAF函数使用
注意点:需要安装pyspark模块
pyspark代码是会转换成java代码, 而pandas是python中特有的模块, java中没有此模块
自定义UDAF函数只能通过装饰器方式注册
自定义UDAF函数只能在DSL方式中使用
import pandas as pd from pyspark.sql import SparkSession,functions as F from pyspark.sql.types import * ss = SparkSession.builder.getOrCreate() df_csv = ss.read.csv(path ='/test/stu.csv',sep=',',inferSchema=True,schema='name string,age int,gender string,phone string,email string,city string,address string') df_csv.groupby('gender').agg(F.mean('age').alias('avg_age')).show() #手写聚合函数mean() """ 注意点: ①:需要指定自定义函数的参数类型,pandas的series类型 ②:需要指定自定义函数的返回值类型python的类型 """ @F.pandas_udf(returnType=FloatType()) def avg_age(age:pd.Series) ->float: print('age的值',age) res = age.mean() return res #第三步在df对象中使用udaf函数 df_csv.select(avg_age('age')).show() #sql方式 #将自定义rdaf函数 new_func = ss.udf.register('new_func',avg_age) df_csv.createTempView('stu') ss.sql("select gender,new_func(age) from stu group by gender").show()