Python – Spark 数据分析使用
Spark是什么
Apache Spark 是用于大规模数据处理的统一分析引擎
简单来说,Spark 是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃EB级的海量数据
Spark作为全球顶级的分布式计算框架,支持众多的编程语言进行开发,Spark重点支持Python方向。
引入Spark库
PySpark是由Spark官方开发的Py三方库,可以使用pip快速安装,并直接使用
pip install pyspark
基本入门
想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象
PySpark的执行环境入口对象是:SparkContext 的类对象
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 获取Spark的上下文对象
context = SparkContext(conf=conf)
# 打印Spark版本
print(context.version)
# 关闭Spark程序
context.stop()
编程模型
- 通过SparkContext对象,完成数据输入
- 输入数据后得到RDD对象,对RDD对象进行迭代计算
- 最终通过RDD对象的成员方法,完成数据输出工作
RDD对象
PySpark支持多种数据的输入,在输入完成后,都会得到一个RDD类的对象
RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)
PySpark针对数据的处理,都是以RDD对象作为载体
- 数据存储在RDD内
- 各类数据的计算方法,也都是RDD的成员方法
- RDD的数据计算方法,返回的依旧是RDD对象
Py数据转RDD对象
PySpark支持通过SparkContrext对象的parallelize成员方法,把 list, tuple,set,dict,str 转换为RDD对象
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 获取Spark的上下文对象
context = SparkContext(conf=conf)
# 把数据转为RDD对象
rdd = context.parallelize([1, 2, 3, 4])
rdd2 = context.parallelize((1, 2, 3, 4))
rdd3 = context.parallelize("abcde")
rdd4 = context.parallelize({1, 2, 3, 4, 5})
rdd5 = context.parallelize({"key1": "value1", "key2": "value2"})
# 读取文件内容转换为RDD对象
rdd6 = context.textFile("xxx.txt")
# 把RDD 对象转回列表数据
print(rdd.collect())
# 关闭Spark程序
context.stop()
RDD数据计算基础
Map 方法
PySpark的数据计算,都是基于RDD对象来进行的,自然是依赖RDD对象内置丰富的成员方法来计算。
map 算法,指的是将RDD的数据一条条处理,基于map算法中接收的处理函数,并返回新的RDD
示例:
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe"
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 获取Spark的上下文对象
context = SparkContext(conf=conf)
# 把列表数据转为RDD对象
rdd = context.parallelize([1, 2, 3, 4, 5])
def func(data):
return data * 10
# 需求:把列表中的数据都乘以10,RDD的map会把列表中的每一个成员传给来,接收一个函数
rdd.map(lambda x: x * 10)
rdd2 = rdd.map(func).map(lambda x: x + 5) # 这种和上面代码是一样的
print(rdd2.collect())
# 关闭Spark程序
context.stop()
flatMap 算法
功能,对RDD执行map操作,不过对于嵌套的列表中的数据进行解嵌套操作,如 [[1,2,3],4,5,6] 会被解嵌套为 [1,2,3,4,5,6]
示例:
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe"
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 获取Spark的上下文对象
context = SparkContext(conf=conf)
# 把列表数据转为RDD对象
rdd = context.parallelize(["Unsoft ABC ACD ADD", "TZMing CDC CBC CFC", "Tan BBA BBQ BCD"])
# 需求,把RDD 数据中的三个成员中的所有单词取到一个列表中
# 这里我们有个误区,当我们使用 map 算法时,算出来的RDD依然是三个成员
# 但这三个成员分别是分割后的单词组成的列表,也就是说外面的列表嵌套着里面的列表
rdd2 = rdd.map(lambda x: x.split(" "))
print(
rdd2.collect()) # [['Unsoft', 'ABC', 'ACD', 'ADD'], ['TZMing', 'CDC', 'CBC', 'CFC'], ['Tan', 'BBA', 'BBQ', 'BCD']]
# 所以要使用 flatMap 来对嵌套在里面的成员全部取到一个列表中
rdd3 = rdd.flatMap(lambda x: x.split(" "))
print(rdd3.collect()) # ['Unsoft', 'ABC', 'ACD', 'ADD', 'TZMing', 'CDC', 'CBC', 'CFC', 'Tan', 'BBA', 'BBQ', 'BCD']
# 关闭Spark程序
context.stop()
reduceByKey 算法
功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作
通俗的讲,就是专门用于带有KV值的,它会以Key作为分组,然后对V值进行个性化计算
用法:
rdd.reduceByKey(func)
# func: (V, V) -> V
# 接收2个传入参数(类型要一致),返回一个返回值,类型和传入要求一致
示例:针对【男】和【女】的数据聚合
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe"
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 获取Spark的上下文对象
context = SparkContext(conf=conf)
# 把列表数据转为RDD对象
rdd = context.parallelize([("男", 10), ("女", 20), ("男", 15), ("女", 9)])
# 对【男】和【女】的总体数据进行分组,并计算其值
# 其中的 x和y不是数据中的 男 和 10 ,而是两组中的 value ,如 男的 10 和 男的 15
rdd2 = rdd.reduceByKey(lambda x, y: x + y)
print(rdd2.collect()) # [('男', 25), ('女', 29)]
# 关闭Spark程序
context.stop()
Filter 算法
功能:过滤想要的数据进行保留
语法:
rdd.filter(func)
# func: (T) -> bool
传入一个米据,返回True或False,True则会被留下,False 刚会被丢弃
示例:
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe"
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 获取Spark的上下文对象
context = SparkContext(conf=conf)
# 把列表数据转为RDD对象
rdd = context.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])
# 需求:只留下偶数,对于奇数我们丢弃
rdd2 = rdd.filter(lambda r: r % 2 == 0)
print(rdd2.collect()) # [2, 4, 6, 8]
# 关闭Spark程序
context.stop()
distinct 算法
功能:对RDD数据进行去重,返回RDD
语法:
rdd.distanct()
示例:
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe"
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 获取Spark的上下文对象
context = SparkContext(conf=conf)
# 把列表数据转为RDD对象
rdd = context.parallelize([1, 1, 3, 3, 4, 5, 6, 6, 7, 78, 8, 8])
# 需求:对上面的数据进行去重,并得到新的RDD
rdd2 = rdd.distinct()
print(rdd2.collect()) # [1, 3, 4, 5, 6, 7, 8, 78]
# 关闭Spark程序
context.stop()
sortBy 算法
功能:对RDD数据进行排序,基于自定义的排序依据
语法:
rdd.sortBy(func, ascending=False, numPartitions=1)
# func: (T) -> U 告知按照RDD中的哪个数据进行排序,比如 lambda x: x[1] 表示按照RDD中的第二列元素进行排序
# ascending True 为升序,False 为降序
# numPartitions 用多少分区排序,设为1则是所有数据都放到一个分区里进行排序
示例:
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe"
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 获取Spark的上下文对象
context = SparkContext(conf=conf)
# 把列表数据转为RDD对象
rdd = context.parallelize([("A", 4), ("B", 1), ("C", 7), ("D", 8), ("E", 2), ("F", 3), ("G", 0)])
# 需求:对上面的数据进行排序,当然是按照第二个元素作为排序标准,升序
rdd2 = rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=1)
print(rdd2.collect()) #
# 关闭Spark程序
context.stop()
RDD 的数据输出
当我们把RDD数据计算完成后,我们需要对RDD数据输出为Python对象。在PySpark中有多种输出方式
collect 算法
功能,把RDD数据输出为一个列表数据
rdd.collect()
reduce 算法
功能:对RDD数据集按照你传入的逻辑进行聚合
语法:
rdd.reduce(func)
# func: (T, T) -> T
# 2参数传入 1个返回值,返回做和参数要求类型一至
# 该值会俩俩相加,最后得出相加后的结果
示例
# 生成 1~10的数
rdd = context.parallelize(range(1,10))
# 把1到10俩俩相加
rdd.reduce(lambda x, y: x + y) # 输出 55
take 算法
功能,取出RDD的前N个元素,组成list返回
语法:
rdd.take(n)
示例:
# 把列表数据转为RDD对象
rdd = context.parallelize([1, 1, 3, 3, 4, 5, 6, 6, 7, 78, 8, 8])
rdd.take(3) # [1,1,3]
count 算法
功能,统计RDD元素的个数
语法:
rdd.count()
示例:
# 把列表数据转为RDD对象
rdd = context.parallelize([1, 1, 3, 3, 4, 5, 6, 6, 7, 78, 8, 8])
rdd.count() # 12
saveAsTextFile 算法
功能:将RDD的数据写入文本文件中
语法:
rdd.saveAsTextFile("fileName")
注意:该方法依赖Hadoop,如果没有安装Hadoop的话会报错
设置 Hadoop 依赖地址:
import os
os.environ['HADOOP_HOME'] = 'HADOOP_HOME'
示例:
# 把列表数据转为RDD对象
rdd = context.parallelize([1, 1, 3, 3, 4, 5, 6, 6, 7, 78, 8, 8])
rdd.saveAsTextFile("../文件夹")
- 输出是一个文件夹,如果文件夹存在会报错
- 输出时可能会按多个分区进行写文件,如果希望把数据只写到一个文件中时,可以使用以下两种方式
- 设置全局分区数为1
-
# 创建SparkConf类对象 conf = (SparkConf() .setMaster("local[*]") # 设置当前连接Spark为本地 .setAppName("test_spark_app") # 设置Spark应用的名称 .set("spark.default.parallelism","1") # 设置全局分区数为1 )
- 设置单个RDD的分区数为1
-
# 把列表数据转为RDD对象 rdd = context.parallelize([1, 1, 3, 3, 4, 5, 6, 6, 7, 78, 8, 8] ,numSlices=1)
共有 0 条评论