您的位置:首页 > 健康 > 美食 > 从0开始学习pyspark--pyspark中的Spark DataFrame, Spark SQL, Pandas on Spark[第3节]

从0开始学习pyspark--pyspark中的Spark DataFrame, Spark SQL, Pandas on Spark[第3节]

2024/10/6 20:38:39 来源:https://blog.csdn.net/weixin_43817712/article/details/140105725  浏览:    关键词:从0开始学习pyspark--pyspark中的Spark DataFrame, Spark SQL, Pandas on Spark[第3节]

引言

Apache Spark 是一个开源的分布式计算系统,旨在实现大数据处理的快速和通用。PySpark 是 Spark 的 Python API,使 Python 用户能够利用 Spark 的强大功能。本文将详细探讨 PySpark 的几个核心概念:Spark DataFrame、Spark SQL 和 Pandas on Spark,并通过代码示例进行详细讲解。

Spark DataFrame

Spark DataFrame 是 Spark SQL 的核心数据结构,它是一个分布式的、不可变的表格型数据集合。DataFrame 提供了丰富的 API 用于数据操作,如选择、过滤、聚合等。

为什么选择 Spark DataFrame?
  1. 强大的 API:DataFrame 提供了丰富的 API,用于各种数据操作,包括选择、过滤、聚合、连接等。
  2. 优化的执行计划:利用 Catalyst 优化器,Spark DataFrame 的执行计划能够得到优化,从而提高性能。
  3. 扩展性和灵活性:DataFrame 可以处理各种类型的数据源,并能与 Spark 的其他组件无缝集成。
基本用法
from pyspark.sql import SparkSession# 创建 SparkSession
spark = SparkSession.builder \.appName("Spark DataFrame Example") \.getOrCreate()# 创建 DataFrame
data = [("Alice", 25), ("Bob", 30), ("Cathy", 45)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)# 显示数据
df.show()# 数据操作示例:过滤年龄大于30的人
df_filtered = df.filter(df['Age'] > 30)
df_filtered.show()# 数据分组和聚合:按年龄分组并计数
df_grouped = df.groupBy("Age").count()
df_grouped.show()
进阶用法
# 读取 JSON 文件
df = spark.read.json("path/to/json/file.json")# 数据清洗:删除重复行
df = df.dropDuplicates()# 数据转换:转换列的数据类型
df = df.withColumn("Age", df["Age"].cast("integer"))# 数据聚合:按国家分组并计算总收入
total_income_by_country = df.groupBy("Country").sum("Income")
total_income_by_country.show()

Spark SQL

Spark SQL 是 Spark 的一个模块,提供了对结构化数据的处理功能。它允许你使用 SQL 查询数据,并可以与 Spark 的其他 API 无缝集成。Spark SQL 支持多种数据源,包括 Hive 表、Parquet 文件、JSON 文件等。

为什么选择 Spark SQL?
  1. 熟悉的 SQL 语法:对于熟悉 SQL 的用户,Spark SQL 提供了一种直接且高效的方式来查询数据。
  2. 灵活的数据源:支持多种数据源,能够处理各种格式的结构化数据。
  3. 强大的优化器:Spark SQL 内置的 Catalyst 优化器能够自动优化查询计划,提高查询效率。
基本用法
from pyspark.sql import SparkSession# 创建 SparkSession
spark = SparkSession.builder \.appName("Spark SQL Example") \.getOrCreate()# 创建一个 DataFrame
data = [("Alice", 25), ("Bob", 30), ("Cathy", 45)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)# 创建一个临时视图
df.createOrReplaceTempView("people")# 使用 Spark SQL 查询数据
result = spark.sql("SELECT * FROM people WHERE Age > 30")
result.show()
进阶用法
# 读取 Parquet 文件
df = spark.read.parquet("path/to/parquet/file.parquet")# 创建全局视图
df.createGlobalTempView("global_people")# 使用 SQL 查询并聚合数据
average_age = spark.sql("SELECT AVG(Age) as avg_age FROM global_temp.global_people")
average_age.show()

Pandas on Spark

PySpark Pandas API 是一个用于处理数据的高效工具,它结合了 Pandas 的简单性和 Spark 的分布式计算能力。使用 Pandas API,你可以以类似于 Pandas 的方式进行数据操作,但底层的计算由 Spark 处理,从而实现了大规模数据处理的能力。

为什么选择 Pandas on Spark?
  1. 熟悉的接口:对于已经熟悉 Pandas 的用户,Pandas on Spark 提供了几乎相同的 API,使得学习曲线更平缓。
  2. 分布式计算:与 Pandas 只能在单台机器上运行不同,Pandas on Spark 能够在集群上运行,从而处理更大规模的数据。
  3. 高效的数据处理:利用 Spark 的优化器和分布式执行引擎,数据处理更加高效。
基本用法
import pyspark.pandas as ps# 创建一个 Pandas API DataFrame
df = ps.DataFrame({'Name': ['Alice', 'Bob', 'Cathy'],'Age': [25, 30, 45]
})# 显示数据
print(df)# 基本数据操作:增加年龄
df['Age'] = df['Age'] + 1
print(df[df['Age'] > 25])
进阶用法
# 读取 CSV 文件
df = ps.read_csv("path/to/csv/file.csv")# 数据清洗:去除缺失值
df = df.dropna()# 数据转换:创建一个新列
df['AgeGroup'] = df['Age'].apply(lambda x: 'Adult' if x >= 18 else 'Child')# 数据聚合:按年龄分组并计算平均收入
average_income = df.groupby('AgeGroup')['Income'].mean()
print(average_income)

在讨论 PySpark 的 Pandas API 和 Spark DataFrame 的性能时,我们需要考虑它们的设计目的和使用场景:

Pandas on Spark和 Spark DataFrame 的对比

Apache Spark DataFrame和Spark Pandas API是Spark中处理结构化数据的两种不同方式,它们各自有不同的使用场景和优势。以下是它们之间的主要区别:

  1. API和编程范式:
    • DataFrame: Spark DataFrame API是基于Spark SQL的,它提供了一个声明式的API来处理结构化数据。用户可以使用DSL(Domain Specific Language)进行数据转换和操作,这些操作会在Spark的优化器中生成逻辑和物理执行计划,然后分布式执行。
    • Pandas on Spark:Pandas on Spark允许用户使用Pandas的API来处理大数据。它提供了一个类似于Pandas的API,可以用于在Spark集群上执行Pandas操作。这允许用户利用Pandas丰富的数据操作功能来处理大规模数据集。
  2. 数据大小和分布式处理:
    • DataFrame: Spark DataFrame专为分布式数据处理而设计。它可以处理远超过单机内存容量的数据集,因为数据被分布在集群中的多个节点上,并且操作是并行执行的。
    • Pandas on Spark: 虽然Pandas on Spark可以在Spark集群上运行Pandas操作,但它更适合于可以放入单个节点内存的数据集。对于大规模数据集,用户需要确保操作可以在分布式环境中有效执行。
  3. 性能和优化:
    • DataFrame: Spark DataFrame可以利用Spark SQL的 Catalyst 优化器和 Tungsten 执行引擎,这些优化器可以对查询进行优化,以提高执行效率。
    • Spark Pandas API: Pandas on Spark的性能取决于Pandas操作的性能以及这些操作在分布式环境中的适应性。某些Pandas操作可能需要修改才能在Spark中有效执行。
  4. 易用性和熟悉度:
    • DataFrame: 对于熟悉SQL或RDBMS的用户来说,DataFrame API可能更容易上手,因为它提供了类似于SQL的查询语言。
    • Pandas on Spark: 对于熟悉Pandas库的用户来说,Pandas on Spark提供了一个更自然的过渡,因为它允许他们使用已知的Pandas语法和函数。
  5. 集成和生态系统:
    • DataFrame: Spark DataFrame与Spark生态系统(包括Spark Streaming、MLlib、GraphX等)紧密集成,可以轻松地在不同的Spark组件之间转换数据。
    • Pandas on Spark: 虽然Pandas on Spark可以与Spark生态系统集成,但它主要是为了简化Pandas用户的数据处理工作流程,而不是为了与其他Spark组件紧密集成。
      总结来说,Spark DataFrame是一个强大的分布式数据处理工具,适合于大规模数据集和需要复杂ETL操作的场景。而Spark Pandas API提供了一个更加用户友好和易于使用的界面,特别适合于那些已经熟悉Pandas并且需要处理中小规模数据集的用户。选择哪种工具取决于具体的应用场景、数据大小以及用户的熟悉程度。

总结

通过本文的介绍,我们详细了解了 PySpark 的几个核心概念:Spark DataFrame、Spark SQL 和 Pandas API。Spark DataFrame 是一种高效的数据结构,适合大规模数据操作;Spark SQL 提供了强大的 SQL 查询能力;Pandas API 使我们可以使用熟悉的 Pandas 语法处理大规模数据。掌握这些工具,可以大大提升你在大数据处理中的效率和能力。

希望这篇博客能帮助你更好地理解和使用 PySpark。如果你有任何问题或需要进一步的指导,请随时留言讨论。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com