Python – Spark 数据分析使用

Spark是什么

Apache Spark 是用于大规模数据处理的统一分析引擎

简单来说,Spark 是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃EB级的海量数据

Spark作为全球顶级的分布式计算框架,支持众多的编程语言进行开发,Spark重点支持Python方向。

 

引入Spark库

PySpark是由Spark官方开发的Py三方库,可以使用pip快速安装,并直接使用

pip install pyspark

 

基本入门

想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象

PySpark的执行环境入口对象是:SparkContext 的类对象

from pyspark import SparkConf, SparkContext

# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")

# 获取Spark的上下文对象
context = SparkContext(conf=conf)

# 打印Spark版本
print(context.version)

# 关闭Spark程序
context.stop()

 

编程模型

 

  • 通过SparkContext对象,完成数据输入
  • 输入数据后得到RDD对象,对RDD对象进行迭代计算
  • 最终通过RDD对象的成员方法,完成数据输出工作

 

RDD对象

PySpark支持多种数据的输入,在输入完成后,都会得到一个RDD类的对象

RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)

PySpark针对数据的处理,都是以RDD对象作为载体

  • 数据存储在RDD内
  • 各类数据的计算方法,也都是RDD的成员方法
  • RDD的数据计算方法,返回的依旧是RDD对象

 

Py数据转RDD对象

PySpark支持通过SparkContrext对象的parallelize成员方法,把 list, tuple,set,dict,str 转换为RDD对象

from pyspark import SparkConf, SparkContext

# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")

# 获取Spark的上下文对象
context = SparkContext(conf=conf)

# 把数据转为RDD对象
rdd = context.parallelize([1, 2, 3, 4])
rdd2 = context.parallelize((1, 2, 3, 4))
rdd3 = context.parallelize("abcde")
rdd4 = context.parallelize({1, 2, 3, 4, 5})
rdd5 = context.parallelize({"key1": "value1", "key2": "value2"})

# 读取文件内容转换为RDD对象
rdd6 = context.textFile("xxx.txt")

# 把RDD 对象转回列表数据
print(rdd.collect())

# 关闭Spark程序
context.stop()

 

RDD数据计算基础

Map 方法

PySpark的数据计算,都是基于RDD对象来进行的,自然是依赖RDD对象内置丰富的成员方法来计算。

map 算法,指的是将RDD的数据一条条处理,基于map算法中接收的处理函数,并返回新的RDD

示例:

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe"

# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")

# 获取Spark的上下文对象
context = SparkContext(conf=conf)

# 把列表数据转为RDD对象
rdd = context.parallelize([1, 2, 3, 4, 5])


def func(data):
    return data * 10


# 需求:把列表中的数据都乘以10,RDD的map会把列表中的每一个成员传给来,接收一个函数
rdd.map(lambda x: x * 10)
rdd2 = rdd.map(func).map(lambda x: x + 5)  # 这种和上面代码是一样的

print(rdd2.collect())

# 关闭Spark程序
context.stop()

 

flatMap 算法

功能,对RDD执行map操作,不过对于嵌套的列表中的数据进行解嵌套操作,如 [[1,2,3],4,5,6] 会被解嵌套为 [1,2,3,4,5,6]

示例:

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe"

# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")

# 获取Spark的上下文对象
context = SparkContext(conf=conf)

# 把列表数据转为RDD对象
rdd = context.parallelize(["Unsoft ABC ACD ADD", "TZMing CDC CBC CFC", "Tan BBA BBQ BCD"])

# 需求,把RDD 数据中的三个成员中的所有单词取到一个列表中
# 这里我们有个误区,当我们使用 map 算法时,算出来的RDD依然是三个成员
# 但这三个成员分别是分割后的单词组成的列表,也就是说外面的列表嵌套着里面的列表
rdd2 = rdd.map(lambda x: x.split(" "))

print(
    rdd2.collect())  # [['Unsoft', 'ABC', 'ACD', 'ADD'], ['TZMing', 'CDC', 'CBC', 'CFC'], ['Tan', 'BBA', 'BBQ', 'BCD']]

# 所以要使用 flatMap 来对嵌套在里面的成员全部取到一个列表中
rdd3 = rdd.flatMap(lambda x: x.split(" "))
print(rdd3.collect())  # ['Unsoft', 'ABC', 'ACD', 'ADD', 'TZMing', 'CDC', 'CBC', 'CFC', 'Tan', 'BBA', 'BBQ', 'BCD']

# 关闭Spark程序
context.stop()

 

reduceByKey 算法

功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作

通俗的讲,就是专门用于带有KV值的,它会以Key作为分组,然后对V值进行个性化计算

用法:

rdd.reduceByKey(func)
# func: (V, V) -> V
# 接收2个传入参数(类型要一致),返回一个返回值,类型和传入要求一致

 

示例:针对【男】和【女】的数据聚合

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe"

# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")

# 获取Spark的上下文对象
context = SparkContext(conf=conf)

# 把列表数据转为RDD对象
rdd = context.parallelize([("男", 10), ("女", 20), ("男", 15), ("女", 9)])

# 对【男】和【女】的总体数据进行分组,并计算其值
# 其中的 x和y不是数据中的 男 和 10 ,而是两组中的 value ,如 男的 10 和 男的 15
rdd2 = rdd.reduceByKey(lambda x, y: x + y)

print(rdd2.collect())  # [('男', 25), ('女', 29)]

# 关闭Spark程序
context.stop()

 

Filter 算法

功能:过滤想要的数据进行保留

语法:

rdd.filter(func)
# func: (T) -> bool  传入一个米据,返回True或False,True则会被留下,False 刚会被丢弃

示例:

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe"

# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")

# 获取Spark的上下文对象
context = SparkContext(conf=conf)

# 把列表数据转为RDD对象
rdd = context.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])

# 需求:只留下偶数,对于奇数我们丢弃
rdd2 = rdd.filter(lambda r: r % 2 == 0)

print(rdd2.collect())  # [2, 4, 6, 8]

# 关闭Spark程序
context.stop()

 

distinct 算法

功能:对RDD数据进行去重,返回RDD

语法:

rdd.distanct()

示例:

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe"

# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")

# 获取Spark的上下文对象
context = SparkContext(conf=conf)

# 把列表数据转为RDD对象
rdd = context.parallelize([1, 1, 3, 3, 4, 5, 6, 6, 7, 78, 8, 8])

# 需求:对上面的数据进行去重,并得到新的RDD
rdd2 = rdd.distinct() 

print(rdd2.collect())  # [1, 3, 4, 5, 6, 7, 8, 78]

# 关闭Spark程序
context.stop()

 

 

sortBy 算法

功能:对RDD数据进行排序,基于自定义的排序依据

语法:

rdd.sortBy(func, ascending=False, numPartitions=1)
# func: (T) -> U 告知按照RDD中的哪个数据进行排序,比如 lambda x: x[1] 表示按照RDD中的第二列元素进行排序
# ascending True 为升序,False 为降序
# numPartitions 用多少分区排序,设为1则是所有数据都放到一个分区里进行排序

示例:

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe"

# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")

# 获取Spark的上下文对象
context = SparkContext(conf=conf)

# 把列表数据转为RDD对象
rdd = context.parallelize([("A", 4), ("B", 1), ("C", 7), ("D", 8), ("E", 2), ("F", 3), ("G", 0)])

# 需求:对上面的数据进行排序,当然是按照第二个元素作为排序标准,升序
rdd2 = rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=1)

print(rdd2.collect())  #

# 关闭Spark程序
context.stop()

 

 

RDD 的数据输出

当我们把RDD数据计算完成后,我们需要对RDD数据输出为Python对象。在PySpark中有多种输出方式

 

collect 算法

功能,把RDD数据输出为一个列表数据

rdd.collect()

 

reduce 算法

功能:对RDD数据集按照你传入的逻辑进行聚合

语法:

rdd.reduce(func)
# func: (T, T) -> T
# 2参数传入 1个返回值,返回做和参数要求类型一至
# 该值会俩俩相加,最后得出相加后的结果

示例

# 生成 1~10的数
rdd = context.parallelize(range(1,10))

# 把1到10俩俩相加
rdd.reduce(lambda x, y: x + y)  # 输出 55 

 

take 算法

功能,取出RDD的前N个元素,组成list返回

语法:

rdd.take(n)

示例:

# 把列表数据转为RDD对象
rdd = context.parallelize([1, 1, 3, 3, 4, 5, 6, 6, 7, 78, 8, 8])
rdd.take(3)  # [1,1,3]

 

count 算法

功能,统计RDD元素的个数

语法:

rdd.count() 

 

示例:

# 把列表数据转为RDD对象
rdd = context.parallelize([1, 1, 3, 3, 4, 5, 6, 6, 7, 78, 8, 8])
rdd.count()  # 12

 

saveAsTextFile 算法

功能:将RDD的数据写入文本文件中

语法:

rdd.saveAsTextFile("fileName")

注意:该方法依赖Hadoop,如果没有安装Hadoop的话会报错

设置 Hadoop 依赖地址:

import os

os.environ['HADOOP_HOME'] = 'HADOOP_HOME'

 

示例:

# 把列表数据转为RDD对象
rdd = context.parallelize([1, 1, 3, 3, 4, 5, 6, 6, 7, 78, 8, 8])
rdd.saveAsTextFile("../文件夹")

 

  • 输出是一个文件夹,如果文件夹存在会报错
  • 输出时可能会按多个分区进行写文件,如果希望把数据只写到一个文件中时,可以使用以下两种方式
    • 设置全局分区数为1
    • # 创建SparkConf类对象
      conf = (SparkConf()
              .setMaster("local[*]")  # 设置当前连接Spark为本地
              .setAppName("test_spark_app")  # 设置Spark应用的名称
              .set("spark.default.parallelism","1")  # 设置全局分区数为1
              )
    • 设置单个RDD的分区数为1
    • # 把列表数据转为RDD对象
      rdd = context.parallelize([1, 1, 3, 3, 4, 5, 6, 6, 7, 78, 8, 8] ,numSlices=1)

 

如果您喜欢本站,点击这儿不花一分钱捐赠本站

这些信息可能会帮助到你: 下载帮助 | 报毒说明 | 进站必看

修改版本安卓软件,加群提示为修改者自留,非本站信息,注意鉴别

THE END
分享
二维码
打赏
海报
Python – Spark 数据分析使用
Spark是什么 Apache Spark 是用于大规模数据处理的统一分析引擎 简单来说,Spark 是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃EB级的海量数据 Spark作为全球顶级的分……
<<上一篇
下一篇>>