文章目录
- Pyspark sql DataFrame
- 相关文章
- RDD
- repartition 重新分区
- replace 替换
- sameSemantics dataframe是否相等
- sample 采样
- sampleBy 分层采样
- schema 显示dataframe结构
- select 查询
- selectExpr 查询
- semanticHash 获取哈希值
- show 展示dataframe
- sort 排序
- sortWithinPartitions 分区按照指定列排序
- stat 返回统计函数类型
- storageLevel 获取存储级别
- subtract 获取差集
- summary 总览
- tail 从结尾获取数据
- take 返回记录
- to 配合schema返回新结构的dataframe
Pyspark sql DataFrame
相关文章
Pyspark下操作dataframe方法(1)
Pyspark下操作dataframe方法(2)
Pyspark下操作dataframe方法(3)
Pyspark下操作dataframe方法(4)
Pyspark下操作dataframe方法(5)
RDD
返回包含ROW对象的rdd
data.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12| 1| 男|
|test1| 20| 1| 女|
|test2| 26| 1| 男|
|test3| 19| 1| 女|
|test4| 51| 1| 女|
|test5| 13| 1| 男|
+-----+---+---+------+
data.rdd
MapPartitionsRDD[12] at javaToPython at NativeMethodAccessorImpl.java:0data.rdd.foreach(lambda x : print(type(x),x))
<class 'pyspark.sql.types.Row'> Row(name='test3', age='19', id='1', gender='女')
<class 'pyspark.sql.types.Row'> Row(name='test4', age='51', id='1', gender='女')
<class 'pyspark.sql.types.Row'> Row(name='test5', age='13', id='1', gender='男')
<class 'pyspark.sql.types.Row'> Row(name='ldsx', age='12', id='1', gender='男')
<class 'pyspark.sql.types.Row'> Row(name='test1', age='20', id='1', gender='女')
<class 'pyspark.sql.types.Row'> Row(name='test2', age='26', id='1', gender='男')
repartition 重新分区
每一个 RDD 包含的数据被存储在系统的不同节点上。逻辑上我们可以将 RDD 理解成一个大的数组,数组中的每个元素就代表一个分区 (Partition) 。
在物理存储中,每个分区指向一个存储在内存或者硬盘中的数据块 (Block) ,其实这个数据块就是每个 task 计算出的数据块,它们可以分布在不同的节点上。
RDD 只是抽象意义的数据集合,分区内部并不会存储具体的数据,只会存储它在该 RDD 中的 index,通过该 RDD 的 ID 和分区的 index 可以唯一确定对应数据块的编号,然后通过底层存储层的接口提取到数据进行处理。
data.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12| 1| 男|
|test1| 20| 1| 女|
|test2| 26| 1| 男|
|test3| 19| 1| 女|
|test4| 51| 1| 女|
|test5| 13| 1| 男|
+-----+---+---+------+
# 选择以某列进行分区
data.repartition('name').rdd.getNumPartitions()
1
# 指定分区数量进行分区(可以指定多列)
data.repartition(7,'name','age').rdd.getNumPartitions()
7data = data.repartition(5,'gender')
data.rdd.glom().collect()[[], [Row(name='ldsx', age='12', id='1', gender='男'), Row(name='test1', age='20', id='1', gender='女'),Row(name='test2', age='26', id='1', gender='男'), Row(name='test3', age='19', id='1', gender='女'),Row(name='test4', age='51', id='1', gender='女'), Row(name='test5', age='13', id='1', gender='男')], [], [], []]# 直接操作rdd只能按数据分区不能按照列分区
data.rdd.repartition(1).glom().collect()
[[Row(name='ldsx', age='12', id='1', gender='男'), Row(name='test2', age='26', id='1', gender='男'), Row(name='test5', age='13', id='1', gender='男'), Row(name='test1', age='20', id='1', gender='女'), Row(name='test3', age='19', id='1', gender='女'), Row(name='test4', age='51', id='1', gender='女')]]data.repartition(2,'id').rdd.glom().collect()
[[Row(name='ldsx', age='12', id='1', gender='男'), Row(name='test1', age='20', id='1', gender='女'), Row(name='test2', age='26', id='1', gender='男'), Row(name='test3', age='19', id='1', gender='女'), Row(name='test4', age='51', id='1', gender='女'), Row(name='test5', age='13', id='1', gender='男')], []]data.repartition(2).rdd.glom().collect()
[[Row(name='test2', age='26', id='1', gender='男'), Row(name='test3', age='19', id='1', gender='女')], [Row(name='test1', age='20', id='1', gender='女'), Row(name='ldsx', age='12', id='1', gender='男'), Row(name='test4', age='51', id='1', gender='女'), Row(name='test5', age='13', id='1', gender='男')]]
replace 替换
当替换的值与原本列的数据类型不相同时会报错
df.show()
+----+------+-----+
| age|height| name|
+----+------+-----+
| 10| 80|Alice|
| 5| null| Bob|
|null| 10| Tom|
|null| null| null|
+----+------+-----+
df.fillna({'age':1,'height':'2','name':"sr"}).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10| 80|Alice|
| 5| 2| Bob|
| 1| 10| Tom|
| 1| 2| sr|
+---+------+-----+df.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
+----+------+----+
| age|height|name|
+----+------+----+
| 10| 80| A|
| 5| null| B|
|null| 10| Tom|
|null| null|null|
+----+------+----+df.show()
+----+------+-----+
| age|height| name|
+----+------+-----+
| 10| 80|Alice|
| 5| null| Bob|
|null| 10| Tom|
|null| null| null|
+----+------+-----+df.na.replace(10,12).show()
+----+------+-----+
| age|height| name|
+----+------+-----+
| 12| 80|Alice|
| 5| null| Bob|
|null| 12| Tom|
|null| null| null|
+----+------+-----+
sameSemantics dataframe是否相等
当两个 dataframe中的逻辑查询计划相等并因此返回相同的结果时,返回 True
。
data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12| 1| 男| 1|
|test1| 20| 1| 女| 1|
|test2| 26| 1| 男| 1|
|test3| 19| 1| 女| 1|
|test4| 51| 1| 女| 1|
|test5| 13| 1| 男| 1|
+-----+---+---+------+------+
data2.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12| 1| 男| 2.0|
|test1| 20| 1| 女| 2.0|
|test2| 26| 1| 男| 2.0|
|test3| 19| 1| 女| 2.0|
|test4| 51| 1| 女| 2.0|
|test5| 13| 1| 男| 2.0|
+-----+---+---+------+------+data.sameSemantics(data2)
False
data.sameSemantics(data)
True
sample 采样
withReplacement:是否进行有放回采样,默认为False,表示进行无放回采样;设置为True时,表示进行有放回采样
fraction: 采样比例 float
seed: 随机种子值,值固定后采样获取固定默认为空
# 取样不固定
df.sample(0.1).show()
+---+
| id|
+---+
+---+
df.sample(0.1).show()
+---+
| id|
+---+
| 9|
+---+
df.sample(0.1).show()
+---+
| id|
+---+
| 1|
| 5|
+---+# 随机种子固定,取样固定
df.sample(0.1,1).show()
+---+
| id|
+---+
| 3|
+---+
df.sample(0.1,1).show()
+---+
| id|
+---+
| 3|
+---+
sampleBy 分层采样
col:列名
fractions: 采样字典
seed: 随机种子值,值固定后采样获取固定默认为空
ataset = spark.range(0, 100).select((col("id") % 3).alias("key"))
dataset.show()+---+
|key|
+---+
| 0|
| 1|
| 2|
| 0|
| 1|
| 2|
...
...
| 0|
| 1|
| 2|
| 0|
| 1|
+---+# 列为key,中值为0取样10%,值为1取样10%,值为2取样10%
dataset.sampleBy("key", fractions={0: 0.1, 1: 0.1,2:0.1}, seed=0).show()
+---+
|key|
+---+
| 2|
| 0|
| 1|
| 2|
| 1|
| 2|
| 2|
| 1|
| 2|
+---+
# 列为key,中值为0取样10%,值为2取样10%
dataset.sampleBy("key", fractions={0: 0.1,2:0.1}, seed=0).show()
+---+
|key|
+---+
| 2|
| 0|
| 2|
| 2|
| 2|
| 2|
+---+
schema 显示dataframe结构
将此DataFrame的架构作为pyspark.sql.types返回
df.schema
StructType([StructField('id', LongType(), False)])df.printSchema()
root|-- id: long (nullable = false)
select 查询
查询并返回新dataframe,可结合多方法使用是。
df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])df.select('*').show()
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 5| Bob|
+---+-----+df.select(df.name, (df.age + 10).alias('age')).show()
+-----+---+
| name|age|
+-----+---+
|Alice| 12|
| Bob| 15|
+-----+---+
selectExpr 查询
接受sql表达式并执行
df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.show()
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 5| Bob|
+---+-----+
df.selectExpr('age * 2','age+2').show()
+---------+---------+
|(age * 2)|(age + 2)|
+---------+---------+
| 4| 4|
| 10| 7|
+---------+---------+df.selectExpr('age * 2 as ldsx','age+2').show()
+----+---------+
|ldsx|(age + 2)|
+----+---------+
| 4| 4|
| 10| 7|
+----+---------+
semanticHash 获取哈希值
df.selectExpr('age * 2 as ldsx','age+2').semanticHash()
-2082183221
df.semanticHash()
1019336781
show 展示dataframe
展示前n行数据到控制台,默认展示20行
df.show(1)
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
+---+-----+
only showing top 1 row
sort 排序
按照指定列排序
from pyspark.sql.functions import desc, asc
# 下面方式效果一致
df.sort(desc('age')).show()
df.sort("age", ascending=False).show()
df.orderBy(df.age.desc()).show()
+---+-----+
|age| name|
+---+-----+
| 5| Bob|
| 2|Alice|
| 2| Bob|
+---+-----+# 使用两列排序,一列降序,一列默认(升序)
df.orderBy(desc("age"), "name").show()
+---+-----+
|age| name|
+---+-----+
| 5| Bob|
| 2|Alice|
| 2| Bob|
+---+-----+
# 使用两列排序,都为降序
df.orderBy(desc("age"), desc("name")).show()
+---+-----+
|age| name|
+---+-----+
| 5| Bob|
| 2| Bob|
| 2|Alice|
+---+-----+# 两列都为降序
df.orderBy(["age", "name"], ascending=[False, False]).show()
+---+-----+
|age| name|
+---+-----+
| 5| Bob|
| 2| Bob|
| 2|Alice|
+---+-----+
sortWithinPartitions 分区按照指定列排序
df.sortWithinPartitions('age').show()
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 2| Bob|
| 5| Bob|
+---+-----+
stat 返回统计函数类型
df.stat
<pyspark.sql.dataframe.DataFrameStatFunctions object at 0x7f55c87669e8>
storageLevel 获取存储级别
df.storageLevel
StorageLevel(False, False, False, False, 1)
df.cache().storageLevel
StorageLevel(True, True, False, True, 1)
subtract 获取差集
返回一个新的DataFrame,其中包含此DataFrame中的行,但不包含另一个DataFrame中。d1.subtarct(d2),获取d1的差集。
df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
df1.show()
+---+---+
| C1| C2|
+---+---+
| a| 1|
| a| 1|
| b| 3|
| c| 4|
+---+---+
df2.show()
+---+---+
| C1| C2|
+---+---+
| a| 1|
| a| 1|
| b| 3|
+---+---+
df1.subtract(df2).show()
+---+---+
| C1| C2|
+---+---+
| c| 4|
+---+---+
summary 总览
计算数值列和字符串列的指定统计信息。可用的统计数据有:-count-mean-stddev-min-max-指定为百分比的任意近似百分位数
如果没有给出统计数据,此函数将计算计数、平均值、标准偏差、最小值、近似四分位数(百分位数分别为25%、50%和75%)和最大值。
df.show()
+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
| Bob| 13| 40.3| 150.5|
|Alice| 12| 37.8| 142.3|
| Tom| 11| 44.1| 142.2|
+-----+---+------+------+df.summary().show()
24/09/19 11:24:13 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.+-------+-----+----+------------------+-----------------+
|summary| name| age| weight| height|
+-------+-----+----+------------------+-----------------+
| count| 3| 3| 3| 3|
| mean| null|12.0|40.733333333333334| 145.0|
| stddev| null| 1.0| 3.172275734127371|4.763402145525822|
| min|Alice| 11| 37.8| 142.2|
| 25%| null| 11| 37.8| 142.2|
| 50%| null| 12| 40.3| 142.3|
| 75%| null| 13| 44.1| 150.5|
| max| Tom| 13| 44.1| 150.5|
+-------+-----+----+------------------+-----------------+
tail 从结尾获取数据
运行尾部需要将数据移动到应用程序的驱动程序进程中,如果使用非常大的num,可能会导致驱动程序进程因OutOfMemoryError而崩溃。
df.show()
+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
| Bob| 13| 40.3| 150.5|
|Alice| 12| 37.8| 142.3|
| Tom| 11| 44.1| 142.2|
+-----+---+------+------+
df.tail(2)
[Row(name='Alice', age=12, weight=37.8, height=142.3), Row(name='Tom', age=11, weight=44.1, height=142.2)]
take 返回记录
head 调用的就是take,take调用的limit
# 源码def take(self, num: int) -> List[Row]:"""Returns the first ``num`` rows as a :class:`list` of :class:`Row`... versionadded:: 1.3.0.. versionchanged:: 3.4.0Supports Spark Connect.Parameters----------num : intNumber of records to return. Will return this number of recordsor all records if the DataFrame contains less than this number of records..Returns-------listList of rowsExamples-------->>> df = spark.createDataFrame(... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])Return the first 2 rows of the :class:`DataFrame`.>>> df.take(2)[Row(age=14, name='Tom'), Row(age=23, name='Alice')]"""return self.limit(num).collect()
to 配合schema返回新结构的dataframe
from pyspark.sql.types import StructField, StringType
df = spark.createDataFrame([("a", 1)], ["i", "j"])
df.show()
+---+---+
| i| j|
+---+---+
| a| 1|
+---+---+
df.schema
StructType([StructField('i', StringType(), True), StructField('j', LongType(), True)])# 设置新的scheam
schema = StructType([StructField("j", StringType()), StructField("i", StringType())])
df.schema
StructType([StructField('i', StringType(), True), StructField('j', LongType(), True)])# df使用新的scheam进行转换,查看scheam
df.to(schema).schema
# 顺序改变,字段类型改变
StructType([StructField('j', StringType(), True), StructField('i', StringType(), True)])
df.to(schema).show()
+---+---+
| j| i|
+---+---+
| 1| a|
+---+---+# 当schema设置原df不存在的列,则会默认补充null
schema = StructType([StructField("q", StringType()), StructField("w", StringType()),StructField("i", StringType())])
df.to(schema).show()
+----+----+---+
| q| w| i|
+----+----+---+
|null|null| a|
+----+----+---+