Spark SQL & Dataframe操作总结

标签:无 1430人阅读 评论(0)
分类:

from pyspark.sql import SparkSession

spark=SparkSession \

.builder \

.appName('my_first_app_name') \

.getOrCreate()


# 如果已经配置spark连接hive的参数,可以直接读取hive数据

spark = SparkSession \

.builder \

.enableHiveSupport() \

.master("XX") \

.appName("my_first_app_name") \

.getOrCreate()

df = spark.sql("select * from hive_tb_name")

df.show()


=================


# 读取csv

df = spark.read.csv('hdfs:///tmp/_da_exdata_path/data.csv', header=True)

# 读取json

df = spark.read.json(file)


# 写到csv

df.write.csv(path=file, header=True, sep=",", mode='overwrite')


# spark dataframe存储会基于hadoop存储为一个文件夹如a.csv文件夹,存储为单一文件的方式为

df.coalesce(1).write.csv('result.csv')


=================


# 修改列名


# spark# 使用selectExpr方法

color_df2 = color_df.selectExpr('color as color2','length as length2')


# spark# withColumnRenamed方法

color_df2 = color_df.withColumnRenamed('color','color2')\

.withColumnRenamed('length','length2')


# spark# alias 方法

color_df.select(color_df.color.alias('color2')).show()


=================


# 查看列的类型 ,同pandas

color_df.dtypes


# 查看有哪些列 ,同panda

scolor_df.columns


# 行数

color_df.count()


=================


# 获取第一行

row = df.select('col_1', 'col_2').first()


# 获取第一行的值

value = df.select('columns_name').first()[0]


# 获取一列的所有值,或者多列的所有值

rows= df.select('col_1', 'col_2').collect()


# 列的选择 select & filter

color_df.select('length').show()

color_df.select(color_df.length).show()

color_df.select(color_df[0]).show()

color_df.select(color_df['length']).show()


# 选择几列

color_df.select('length','color').show()


# 切片

color_df.select('length','color') \

.select(color_df['length']>4).show()


=================


# filter筛选

color_df.filter(color_df['length']>=4).show()

color_df.filter("color='green'").show()

color_df.filter("color like 'b%'").show()


# between 范围选择

color_df.filter(color_df.length.between(4,5) )\

.select(color_df.color.alias('mid_length')).show()


# 联合筛选

color_df.filter(color_df.length>4)\

.filter(color_df[0]!='white').show()


# where方法

color_df.where("color like '%yellow%'").show()


# 直接使用SQL语法

# 首先dataframe注册为临时表,然后执行SQL查询

color_df.createOrReplaceTempView("color_df")

spark.sql("select count(1) from color_df").show()


# 类似SQL中的case when 操作

from pyspark.sql.functions import when


# 1.case when age=2 then 3 else 4

df.select(when(df['age'] == 2, 3).otherwise(4).alias("age"))show()


# 2.case when age=2 when age=age+1

df.select(when(df.age == 2, df.age + 1).alias("age")).show()


=================


# 删除列

color_df.drop('length').show()


# 删除有缺失值的行

clean_data=final_data.na.drop()

clean_data.show()


# 如果一行至少2个缺失值才删除该行

final_data.na.drop(thresh=2).show()


=================


# 填充缺失值

# 对所有列用同一个值填充缺失值

df1.na.fill('unknown').show()


# 不同的列用不同的值填充

df1.na.fill({'LastName':'--', 'Dob':'unknown'}).show()


# 有两种空值判断,一种是数值类型是nan,另一种是普通的None


from pyspark.sql.functions import isnull, isnan


# 1.None 的空值判断

df = spark.createDataFrame([(1, None), (None, 2)], ("a", "b"))

df.select(isnull("a").alias("r1"), isnull(df.a).alias("r2")).show()


# 2.nan的空值判断

df = spark.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b"))

df.select(isnan("a").alias("r1"), isnan(df.a).alias("r2")).show()


=========================


# 通过列生成另一列

data_new=concat_df.withColumn("age_incremented",concat_df.age+1)

data_new.show()


# 列自带常用方法

df1.withColumn('Initial', df1.LastName.substr(1,1)).show()


# 增加新列并赋值

from pyspark.sql.functions import lit

df1.withColumn('newCol', lit(0)).show()


=================


# spark排序

color_df.sort('color',ascending=False).show()


# 多字段排序

color_df.filter(color_df['length']>=4)\

.sort('length', 'color', ascending=False).show()


# 混合排序

color_df.sort(color_df.length.desc(), color_df.color.asc()).show()


# orderBy排序,返回的Row对象列表

color_df.orderBy('length','color').take(4)


=================

# 获取列的不同值 distinct


distinctDF = df.distinct()

print("Distinct count: "+str(distinctDF.count()))

distinctDF.show(truncate=False)


# 返回删除了重复行的新DataFrame dropDuplicates

df2 = df.dropDuplicates()

print("Distinct count: "+str(df2.count()))

df2.show(truncate=False)


# 在选定的列上删除重复的行

dropDisDF = df.dropDuplicates(["department","salary"])

print("Distinct count of department & salary : "+str(dropDisDF.count()))

dropDisDF.show(truncate=False)


=================


# 分组计算1

color_df.groupBy('length').count().show()


# 分组计算2:应用多函数

import pyspark.sql.functions as func

color_df.groupBy("color").agg(func.max("length"), func.sum("length")).show()


from pyspark.sql import functions as F

df.groupBy(["province","city"]).agg(F.sum("confirmed") ,F.max("confirmed")).show()


# 连接

# join默认是内连接,最终结果会存在重复列名

final_data = employees.join(salary, employees.emp_id == salary.emp_id,

how='left')\

.join(department, employees.emp_id==department.emp_id)

final_data.show()


# 两边的关联字段名相同

final_data = employees.join(salary, on='emp_id', how='left')\

.join(department, on='emp_id', how='left')final_data.show()


=================


# 创建udf自定义函数,对于简单的lambda函数不需要指定返回值类型

from pyspark.sql.functions import udf

concat_func = udf(lambda name,age:name+'_'+str(age))


# 应用自定义函数

concat_df = final_data.withColumn("name_age",

                                                       concat_func(final_data.name, final_data.age))concat_df.show()


查看评论

暂无评论

发表评论
  • 评论内容:
      
首页
团队介绍
发展历史
组织结构
MESA大事记
新闻中心
通知
组内动态
科研成果
专利
论文
项目
获奖
软著
人才培养
MESA毕业生
MESA在读生
MESA员工
招贤纳士
走进MESA
学长分享
招聘通知
招生宣传
知识库
文章
地址:北京市朝阳区华严北里甲22号楼五层 | 邮编:100029
邮箱:nelist@iie.ac.cn
京ICP备15019404号-1