PySpark 和 Pandas 中都有 DataFrame 这个数据结构,但是他们的使用方法大有不同。
Reference:
pyspark 系列 --pandas 与 pyspark 对比
;
Pandas 和 PySpark 中的 DataFrame 比较
;
PySpark API
;
Pandas API
PySpark
分布式并行计算框架,内建并行机制 parallelism,所有的数据和操作自动并行分布在各个集群结点上。以处理 in-memory 数据的方式处理 distributed 数据。支持 Hadoop,能处理大量数据
import pyspark.sql.functions as F
导入内置函数库
Pandas
单机 single machine tool,没有并行机制 parallelism,不支持 Hadoop,处理大量数据有瓶颈
PySpark
lazy-evaluated
Pandas
not lazy-evaluated
注:在程式语言理论中, 惰性求值 (英语:Lazy Evaluation),又译为 惰性计算 、 懒惰求值 ,也称为 传需求调用 (call-by-need),是一个计算机编程中的一个概念,目的是要最小化计算机要做的工作。它有两个相关而又有区别的含意,可以表示为 “延迟求值” 和 “最小化求值”。在使用延迟求值的时候,表达式不在它被绑定到变量之后就立即求值,而是在该值被取用的时候求值
PySpark
persist () 或 cache () 将转换的 RDDs 保存在内存
Pandas
PySpark
Spark 中 RDDs 是不可变的,因此 DataFrame 也是不可变的
Pandas
Pandas 中 DataFrame 是可变的
spark_df = sc.parallelize([(1, 2), (3, 4)]).toDF(['xx', 'yy']
spark_df = SQLContext.createDataFrame(pandas_df)
spark_df = spark.read.csv(csv_path, header=True)
如果 CSV 文件有 header,则将其读取为列名
spark_df = spark.read.parquet(parquet_path)
spark_df = spark.read.json(json_path)
spark_df = sc.textFile(txt_path).toDF()
pandas_df = pd.DataFrame({'xx': {0: 1, 1: 3}, 'yy': {0: 2, 1: 4}})
pandas_df = spark_df.toPandas()
,或读取其他数据
pd.read_csv(csv_path)
pd.read_parquet(parquet_path)
,其中如果 parquet_path 如果是 HDFS 路径则需要加前缀 'hdfs://',比如:
'hdfs:///projects/path/to/parquet/'
PySpark
pandas_df = spark_df.toPandas()
ArrayType()
,
StructType()
,
MapType()
类型需要提前转换成 string,pandas 不支持
Pandas
spark_df = spark.createDataFrame(pandas_df)
转换过程中可能会遇到报错: TypeError: field xx: Can not merge type A and B
原因是该列存在空值。解决方法是转换成 String
pandas_df.xx = pandas_df.xx.astype(str)
还需要保证程序运行的 python 版本和 spark driver 的版本一致,即
1 |
import sys |
否则会报错:Exception: Python in worker has different version x.y than that in driver m.n, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.
PySpark
1 |
df.repartition(partition_num).write.mode('overwrite'/'append').partitionBy(col_name).parquet(parquet_path) |
Pandas
1 |
df.to_csv(csv_path, index=True/False) # 是否保留index |
PySpark
没有 index 索引,若需要则要额外创建该列
1 |
df.withColumn('index', F.row_number().over(Window.orderBy(F.monotonically_increasing_id()))) |
Pandas
注:当将 pandas_df 转换为 spark_df 时如需保留索引,则可用
1 |
spark_df = SQLContext.createDataFrame(pandas_df.reset_index()) |
PySpark
Row 结构,属于 Spark DataFrame 结构
Pandas
Series 结构,属于 Pandas DataFrame 结构
PySpark
Column 结构,属于 Spark DataFrame 结构,如:
DataFrame[name: string]
Pandas
Series 结构,属于 Pandas DataFrame 结构
df.withColumnRenamed('old_name', 'new_name')
df.select(F.col('old_name').alias('new_name'), ...)
df.selectExpr('old_name as new_name', ...)
df.rename(columns={'old_name': 'new_name'})
df['xx']
列,
df.withColumn('xx', 1)
df.withColumn('xx', F.when(condition expression, true expression).otherwise(false expression))
df.withColumn('xx', F.concat(F.col('yy'), F.lit('-'), F.col('zz')))
其中 yy 和 zz 列须为 string 类型,如不是则需要提前类型转换。
df.withColumn('xx', F.input_file_name().substr(start_index, length))
df['xx']
列,
df['xx'] = 1
df.loc[condition expression, 'xx'] = true expression
df.loc[~condition expression, 'xx'] = false expression
df['xx'] = np.where(condition expression, true expression, false expression)
df['xx'] = df.apply(lambda x: true expression if condition expression else false expression, axis=1)
df['xx'] = df.yy + '-' + df.zz
其中 yy 和 zz 列须为 string 类型,如不是则需要提前类型转换。
df.show(5, truncate=100)
默认显示 20 行,每行显示长度通过 truncate 参数设置
df.printSchema()
df.columns
输出列的名字
df.columns
输出列的名字
pd.set_option('display.max_columns', None) # 显示所有列
pd.set_option('max_colwidth', 100) # 每行显示长度设置
pd.set_option('display.max_rows', None) # 显示所有行
PySpark
df.sort(df.xx.asc(),df.yy.desc())
df.sort(F.asc('xx'),F.desc('yy'))
df.sort(F.col("xx").asc(), F.col("yy").desc())
df.orderBy(F.col("xx").asc(), F.col("yy").desc())
在列中按值依次进行排序,指定先升序后降序
Pandas
df.sort_index()
按轴进行升序排序
df.sort_values(['xx', 'yy'], ascending=[True, False])
在列中按值依次进行排序,指定先升序后降序
df.sort_values(['xx', 'yy'], axis=0)
,
df.sort_values([1, 2], axis=1)
在列、行中按值进行升序排序
PySpark
df.select('xx', 'yy')
选择一列或多列
df.first()
以行的形式返回第一行。(注:行的形式为
[Row(col_name1=value1, col_name2=value2, ...)]
)
df.head(n)
,
df.take(n)
以行的形式返回前 n 行;
df.tail(n)
以行的形式返回最后 n 行
用
df.collect()
以行的形式返回所有行
Pandas
df.xx
,
df['xx']
选择列名为 xx 的列,df [k] 选择行名为 k 的行
df.iat[:, k]
,
df.iloc[:, k]
选择第 k 列,
df.iat[k]
,
df.iloc[k]
选择第 k 行
PySpark
df.filter(df['xx'] > k)
或者
df.where(df['xx'] > k)
取值存在于:
df.filter(F.col('xx').isin(filter_list))
空值处理:
df.filter(F.col('xx').isNull())
和
df.filter(F.col('xx').isNotNull())
df.filter(F.col('xx') == '')
df.filter(F.col('xx') == np.nan)
Pandas
df[df['xx']>k]
或者
s[s>k]
取值存在于:
df[df.xx.isin(filter_list)]
空值处理:包括 null,np.NaN,pd.NaT,None,不包括空字符串
df[df.xx.isnull()]
或
df[df.xx.isna()]
和
df.filter(F.col('xx').notnull())
或
df[df.xx.notnull()]
或
df[df.xx.notna()]
Examples:
1 |
>> pdf = pd.DataFrame(dict(numpy=[np.NaN], pandas=[pd.NaT], empty=[''], none=[None])) |
PySpark
df.groupBy(cols_to_group)
或者
df.groupBy(cols_to_group).avg('xx').show()
应用单个函数
df.groupBy(cols_to_group).agg(F.avg('xx'), F.min('xx'), F.max('xx')).show()
应用多个函数
Pandas
df.groupby(cols_to_group)
df.groupby(cols_to_group).avg('xx')
group filter by function:
df.groupby(cols_to_group).filter(function)
PySpark
df.count()
输出总行数
df.describe()
描述某些列的 count, mean, stddev, min, max
Pandas
df.count()
输出每一列的非空行数
df.shape
输出行数 x 列数
df.describe()
描述某些列的 count, mean, std, min, 25%, 50%, 75%, max
PySpark
df.join()
同名列不自动添加后缀,只有键值完全匹配才保留一份副本
'inner', 'outer', 'full', 'fullouter', 'full_outer', 'leftouter', 'left', 'left_outer', 'rightouter', 'right', 'right_outer', 'leftsemi', 'left_semi', 'leftanti', 'left_anti', 'cross'.
1 |
>> df_left = spark.createDataFrame(pd.DataFrame({'Keys': ['key1', 'key2', 'key3', 'key4', 'key5'], 'Values_Left': ['value1', 'value2', 'value3', 'value4', 'value5']}, index=[0, 1, 2, 3, 4])) |
df.union()
:两个 df 合并,按位置进行合并,列名以前表为准(a.union (b) 列名顺序以 a 为准)
df.unoinAll()
:同 union 方法
df.unionByName()
:两个 df 合并,按列名进行合并
df1.unionByName(df2).unionByName(df3)
Pandas
Pandas 下有
concat
方法,支持轴向合并
pd.concat([df1, df2, df3], ignore_index=True, sort=False)
df1.append([df2, df3], ignore_index=True, sort=False)
df1.join([df2, df3])
Pandas 下有
merge
方法,支持多列合并
同名列自动添加后缀,对应键仅保留一份副本
df.join()
支持多列合并
df.append()
支持多行合并
1 |
>> df_left = pd.DataFrame({'Keys': ['key1', 'key2', 'key3', 'key4', 'key5'], 'Values_Left': ['value1', 'value2', 'value3', 'value4', 'value5']}, index=[0, 1, 2, 3, 4]) |
PySpark
删除一列:
df.drop('xx')
或者
df.drop(F.col('xx'))
删除多列:
df.drop(*['xx', 'yy', ...])
删除某(些)行:使用 filter 方法
去重:
df.dropDuplicates()
或
df.drop_duplicates()
其中参数必须为数组,
df.distinct()
其中不能传入参数。
在 cols_to_group 相同的情况下保留 xx 列的值最小 / 最大的行:
1 |
df.groupBy(cols_to_group).agg(F.min/max('xx').alias('xx')) |
Pandas
删除某(些)列:
df.drop(['xx', 'xx'], axis=1)
或者
df.drop(columns=['xx', 'yy'])
删除某(些)行:
df.drop([0, 1])
其中 0,1 为 index 名字
去重:
drop_duplicates(subset=None, keep='first', inplace=False, ignore_index=False)
其中 keep 取值 {‘first’, ‘last’, False};keep 第一次或者最后一次出现。如果需要根据某列最大最小值保留的话,则需提前排序
PySpark
更改指定列的数据类型:
df = df.withColumn('xx', F.col('xx').cast(Type()))
Pandas
更改所有列的数据类型:
df.astype('type')
更改指定列的数据类型:
df.astype({'xx': 'int32'})
PySpark
不自动添加 NaNs,且不抛出错误
fillna 函数:
df.na.fill()
dropna 函数:
df.na.drop()
Pandas
对缺失数据自动添加 NaNs
fillna 函数:
df.fillna()
dropna 函数:
df.dropna()
PySpark
表格注册:把 DataFrame 结构注册成 SQL 语句使用类型
df.registerTempTable('tt')
或者
sqlContext.registerDataFrameAsTable(df, 'tt')
spark.sql('SELECT xx, yy FROM tt WHERE xx >= m AND yy <= n')
功能注册:把函数注册成 SQL 语句使用类型
spark.registerFunction('stringLengthString', lambda x: len(x))
spark.sql("SELECT stringLengthString('test')")
Pandas
import sqlite3
pd.read_sql('SELECT xx, yy FROM tt WHERE xx >= m AND yy <= n')
PySpark
df.foreach(f)
或者
df.rdd.foreach(f)
将 df 的每一列应用函数 f
df.foreachPartition(f)
或者
df.rdd.foreachPartition(f)
将 df 的每一块应用函数 f
UDF (User-defined Function):
1 |
# one-line way: |
Pandas
df.apply(f)
将 df 的每一列应用函数 f
Pandas udf in PySpark
Driver 可能缺少必要的 package:pyarrow 导致报错 ModuleNotFoundError: No module named 'pyarrow'。有多种方案解决:
参考
Python Package Management
PySpark 允许通过以下方式将 Python 文件 (
.py
)、压缩的 Python 包 (
.zip
) 和 Egg 文件 (
.egg
) 上传到执行程序:
设置配置
spark.submit.pyFiles
或者
--py-files
Spark 脚本中的设置选项或者直接调用
pyspark.SparkContext.addPyFile()
应用程序。
这是将额外的自定义 Python 代码发送到集群的直接方法。只添加单个文件或压缩整个包并上传它们。如果使用
pyspark.SparkContext.addPyFile()
,即使 job 开始运行后也允许使用上传的代码。
但是不允许添加构建为
Wheels
包,因此不允许包含与本机代码的依赖关系。
使用 Conda 打包
Conda
是使用最广泛的 Python 包管理系统之一。PySpark 可以直接使用 Conda 环境通过利用
conda-pack
来传送第三方 Python 包,它是一个命令行工具,可创建可重定位的 Conda 环境。
下面的示例创建了一个 Conda 环境以在驱动程序和执行程序上使用,并将其打包到一个存档文件中。此存档文件捕获 Python 的 Conda 环境并存储 Python 解释器及其所有相关依赖项。
1 |
conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas conda-pack |
--archives
选项或
spark.archives
配置(
spark.yarn.dist.archives
在 YARN 中)将其与脚本或代码一起发送。它会自动解压缩执行程序上的存档。
spark-submit
脚本的情况下,您可以按如下方式使用它:
1 |
export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes. |
注意
PYSPARK_DRIVER_PYTHON
不应为 YARN 集群模式设置上述内容。
如果您使用的是常规 Python shell 或 Notebook,您可以尝试如下所示:
1 |
import os |
对于 pyspark Shell:
1 |
export PYSPARK_DRIVER_PYTHON=python |
使用本机,不用集群
1 |
SPARK_CONF = SparkConf() \ |
PySpark
df.map(func)
,
df.reduce(func)
返回类型 seqRDDs
Pandas
map-reduce操作map(func, list)
,
reduce(func, list)
返回类型 seq
PySpark
没有 diff 操作(Spark 的上下行是相互独立,分布式存储的)
Pandas
有 diff 操作,处理时间序列数据(Pandas 会对比当前行与上一行)
PySpark
1 |
df.cube(column_name/column_list).count().sort('count', ascending=False) |
Pandas
df.value_counts(ascending=False)
PySpark
格式化:
df.withColum('json_string', F.to_json(F.struct('key1', 'key2')))
选择:
df.select('json_string.key')
解析:
json_schema = spark.read.json(df.rdd.map(lambda row: row.json_string)).schema
F.get_json_object('json_string', '$.key')
F.from_json(F.get_json_object('json_string', '$.key'), schema)
Pandas
格式化:
df['json_string'] = df[['key1', 'key2']].to_dict(orient='records')
选择:
df.json_string.apply(pd.Series).key
1 |
df.join(pd.concat(list(df['json_string'].apply(lambda x: pd.json_normalize(json.loads(x)))), ignore_index=True)) |
PySpark
Explode <=> Groupby:
将 xx 列中的每行的列表 / 数组值分拆形成单独的行
1 |
df.withColumn('xx', explode(F.col('yy'))) # 忽略空值或者空列表/数组 |
Pivot <=> Melt
Label Values
1 |
def melt(df, id_vars, value_vars, var_name, value_name): |
Pandas
Explode:
1 |
>> df = pd.DataFrame( {'a':['A', 'B', 'C'], 'b':[[1], [2, 3], [4, 5, 4]]}) |
Pivot <=> Melt:
Label Values
1 |
>> import numpy as np |