Spark SQL & Dataframe操作总结
from pyspark.sql import SparkSession
spark=SparkSession \
.builder \
.appName('my_first_app_name') \
.getOrCreate()
# 如果已经配置spark连接hive的参数,可以直接读取hive数据
spark = SparkSession \
.builder \
.enableHiveSupport() \
.master("XX") \
.appName("my_first_app_name") \
.getOrCreate()
df = spark.sql("select * from hive_tb_name")
df.show()
=================
# 读取csv
df = spark.read.csv('hdfs:///tmp/_da_exdata_path/data.csv', header=True)
# 读取json
df = spark.read.json(file)
# 写到csv
df.write.csv(path=file, header=True, sep=",", mode='overwrite')
# spark dataframe存储会基于hadoop存储为一个文件夹如a.csv文件夹,存储为单一文件的方式为
df.coalesce(1).write.csv('result.csv')
=================
# 修改列名
# spark# 使用selectExpr方法
color_df2 = color_df.selectExpr('color as color2','length as length2')
# spark# withColumnRenamed方法
color_df2 = color_df.withColumnRenamed('color','color2')\
.withColumnRenamed('length','length2')
# spark# alias 方法
color_df.select(color_df.color.alias('color2')).show()
=================
# 查看列的类型 ,同pandas
color_df.dtypes
# 查看有哪些列 ,同panda
scolor_df.columns
# 行数
color_df.count()
=================
# 获取第一行
row = df.select('col_1', 'col_2').first()
# 获取第一行的值
value = df.select('columns_name').first()[0]
# 获取一列的所有值,或者多列的所有值
rows= df.select('col_1', 'col_2').collect()
# 列的选择 select & filter
color_df.select('length').show()
color_df.select(color_df.length).show()
color_df.select(color_df[0]).show()
color_df.select(color_df['length']).show()
# 选择几列
color_df.select('length','color').show()
# 切片
color_df.select('length','color') \
.select(color_df['length']>4).show()
=================
# filter筛选
color_df.filter(color_df['length']>=4).show()
color_df.filter("color='green'").show()
color_df.filter("color like 'b%'").show()
# between 范围选择
color_df.filter(color_df.length.between(4,5) )\
.select(color_df.color.alias('mid_length')).show()
# 联合筛选
color_df.filter(color_df.length>4)\
.filter(color_df[0]!='white').show()
# where方法
color_df.where("color like '%yellow%'").show()
# 直接使用SQL语法
# 首先dataframe注册为临时表,然后执行SQL查询
color_df.createOrReplaceTempView("color_df")
spark.sql("select count(1) from color_df").show()
# 类似SQL中的case when 操作
from pyspark.sql.functions import when
# 1.case when age=2 then 3 else 4
df.select(when(df['age'] == 2, 3).otherwise(4).alias("age"))show()
# 2.case when age=2 when age=age+1
df.select(when(df.age == 2, df.age + 1).alias("age")).show()
=================
# 删除列
color_df.drop('length').show()
# 删除有缺失值的行
clean_data=final_data.na.drop()
clean_data.show()
# 如果一行至少2个缺失值才删除该行
final_data.na.drop(thresh=2).show()
=================
# 填充缺失值
# 对所有列用同一个值填充缺失值
df1.na.fill('unknown').show()
# 不同的列用不同的值填充
df1.na.fill({'LastName':'--', 'Dob':'unknown'}).show()
# 有两种空值判断,一种是数值类型是nan,另一种是普通的None
from pyspark.sql.functions import isnull, isnan
# 1.None 的空值判断
df = spark.createDataFrame([(1, None), (None, 2)], ("a", "b"))
df.select(isnull("a").alias("r1"), isnull(df.a).alias("r2")).show()
# 2.nan的空值判断
df = spark.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b"))
df.select(isnan("a").alias("r1"), isnan(df.a).alias("r2")).show()
=========================
# 通过列生成另一列
data_new=concat_df.withColumn("age_incremented",concat_df.age+1)
data_new.show()
# 列自带常用方法
df1.withColumn('Initial', df1.LastName.substr(1,1)).show()
# 增加新列并赋值
from pyspark.sql.functions import lit
df1.withColumn('newCol', lit(0)).show()
=================
# spark排序
color_df.sort('color',ascending=False).show()
# 多字段排序
color_df.filter(color_df['length']>=4)\
.sort('length', 'color', ascending=False).show()
# 混合排序
color_df.sort(color_df.length.desc(), color_df.color.asc()).show()
# orderBy排序,返回的Row对象列表
color_df.orderBy('length','color').take(4)
=================
# 获取列的不同值 distinct
distinctDF = df.distinct()
print("Distinct count: "+str(distinctDF.count()))
distinctDF.show(truncate=False)
# 返回删除了重复行的新DataFrame dropDuplicates
df2 = df.dropDuplicates()
print("Distinct count: "+str(df2.count()))
df2.show(truncate=False)
# 在选定的列上删除重复的行
dropDisDF = df.dropDuplicates(["department","salary"])
print("Distinct count of department & salary : "+str(dropDisDF.count()))
dropDisDF.show(truncate=False)
=================
# 分组计算1
color_df.groupBy('length').count().show()
# 分组计算2:应用多函数
import pyspark.sql.functions as func
color_df.groupBy("color").agg(func.max("length"), func.sum("length")).show()
from pyspark.sql import functions as F
df.groupBy(["province","city"]).agg(F.sum("confirmed") ,F.max("confirmed")).show()
# 连接
# join默认是内连接,最终结果会存在重复列名
final_data = employees.join(salary, employees.emp_id == salary.emp_id,
how='left')\
.join(department, employees.emp_id==department.emp_id)
final_data.show()
# 两边的关联字段名相同
final_data = employees.join(salary, on='emp_id', how='left')\
.join(department, on='emp_id', how='left')final_data.show()
=================
# 创建udf自定义函数,对于简单的lambda函数不需要指定返回值类型
from pyspark.sql.functions import udf
concat_func = udf(lambda name,age:name+'_'+str(age))
# 应用自定义函数
concat_df = final_data.withColumn("name_age",
concat_func(final_data.name, final_data.age))concat_df.show()