基本介绍
Apache Spark是专为大规模数据处理而规划的快速通用的核算引擎。现在形成一个高速开展运用广泛的生态体系。
特色介绍
Spark 首要有三个特色:
首要,高级 API 剥离了对集群自身的重视,Spark 运用开发者能够专心于运用所要做的核算自身。
其次,Spark 很快,支撑交互式核算和复杂算法。
最后,Spark 是一个通用引擎,可用它来完结各式各样的运算,包含 SQL 查询、文本处理、机器学习等,而在 Spark 出现之前,咱们一般需求学习各式各样的引擎来别离处理这些需求。(来源百度百科)
park对python语言的支撑—>PySpark
Spark对Python语言的支撑,要点体现在Python的第三方库: PySpark
PySpark是由Spark官方开发的Python语言第三方库Python开发者能够运用pip程序快速的装置PySpark并像其它三方库那样直接运用。
根底预备
PySpark库的装置
在命令行中输入:pip install pyspark
运用国内署理镜像网站:pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark
PySpark履行环境进口目标的构建
想要运用PySpark库完结数据处理,首要需求构建一个履行环境进口目标。
PySpark的履行环境进口目标是:类SparkContext的类目标
如何经过代码获得类目标,代码如下:
from pyspark import SparkConf, SparkContext
# 创立SparkConf类目标
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") # 链式调用,不论调用什么样的办法,回来的都是同一个目标
# 根据SparkConf类目标创立SparkContext目标
sc = SparkContext(conf=conf)
# 打印pyspark的运转版本
print(sc.version)
# 中止SparkContext目标的运转(中止pyspark程序)
sc.stop()
PySpark的编程模型
PySpark的编程,首要分为如下三大过程:
数据输入
RDD目标
PySpark支撑多种数据的输入,在输入完结后,都会得到一个:RDD类的目标
RDD全称为: 弹性分布式数据集(Resilient Distributed Datasets)
PySpark针对数据的处理,都是以RDD目标作为载体即:
(1)数据存储在RDD内
(2)各类数据的核算办法,也都是RDD的成员办法
(3)RDD的数据核算办法,回来值依旧是RDD目标
Python数据容器转RDD目标
PySpark支撑经过SparkContext目标的parallelize成员办法,将list、tuple、set、dict、str转换为PySpark的RDD目标。
from pyspark import SparkConf, SparkContext
# 创立SparkConf类目标
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") # 链式调用,不论调用什么样的办法,回来的都是同一个目标
# 根据SparkConf类目标创立SparkContext目标
sc = SparkContext(conf=conf)
rdd1 = sc.parallelize([1, 2, 3, 4, 5]) # 列表
rdd2 = sc.parallelize((1, 2, 3, 4, 5)) # 元组
rdd3 = sc.parallelize("study python") # 字符串
rdd4 = sc.parallelize({1, 2, 3, 4, 5}) # 集合
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2", "key3": "value3"}) # 字典
# 假如要检查RDD里边有什么内容,需求用collect()办法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())
# # 中止SparkContext目标的运转(中止pyspark程序)
sc.stop()
注意:
(1)字符串会被拆分出1个个的字符,存入RDD目标字典
(2)仅有key会被存入RDD目标
读取文件转RDD目标
PySpark支撑经过SparkContext进口目标,来读取文件并构建出RDD目标
前面逻辑都是相同的,仅仅调用办法不相同,需求运用sc.textFile
from pyspark import SparkConf, SparkContext
# 创立SparkConf类目标
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") # 链式调用,不论调用什么样的办法,回来的都是同一个目标
# 根据SparkConf类目标创立SparkContext目标
sc = SparkContext(conf=conf)
rdd = sc.textFile("G:\材料\2011年1月销售数据.txt")
# 假如要检查RDD里边有什么内容,需求用collect()办法
print(rdd.collect())
# # 中止SparkContext目标的运转(中止pyspark程序)
sc.stop()
数据核算
PySpark的数据核算,都是依靠RDD目标内置丰厚的“成员办法(算子)”来进行的
由于spark是一个分布式程序,内部运转机制比较复杂,暂不讨论,咱们只需求知道在python中运转spark程序时,需求额外增加以下代码,否则会报错:spark找不到python程序
# 增加'PYSPARK_PYTHON'和'PYSPARK_DRIVER_PYTHON'的履行环境
os.environ['PYSPARK_PYTHON'] = sys.executable # "E:/python/python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable # "E:/python/python.exe"
(1)map办法
功用:将RDD的数据一条条处理,回来新的RDD
语法:rdd.map(fun)
# 需求传入一个函数
需求:给[1,2,3,4,5]
每个数字乘以10
代码如下:
import sys
from pyspark import SparkConf, SparkContext
import os
# 增加'PYSPARK_PYTHON'和'PYSPARK_DRIVER_PYTHON'的履行环境
os.environ['PYSPARK_PYTHON'] = sys.executable # "E:/python/python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable # "E:/python/python.exe"
# 创立SparkConf类目标
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") # 链式调用,不论调用什么样的办法,回来的都是同一个目标
# 根据SparkConf类目标创立SparkContext目标
sc = SparkContext(conf=conf)
# 需求:经过map办法将悉数数据都乘以10
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 把rdd中的每一个数据都调用匿名函数(也可经过def从头界说函数)去处理
rdd2 = rdd.map(lambda x: x * 10)
print(rdd2.collect())
# 中止SparkContext目标的运转(中止pyspark程序)
sc.stop()
(2)flatMap办法
功用:对RDD履行map操作,然后进行解除嵌套操作
# 嵌套的list
list = [[1,2,3],[4,5,6],[7,8,9]]
# 解除了嵌套
list = [1,2,3,4,5,6,7,8,9]
示例代码:
import sys
from pyspark import SparkConf, SparkContext
import os
# 增加'PYSPARK_PYTHON'和'PYSPARK_DRIVER_PYTHON'的履行环境
os.environ['PYSPARK_PYTHON'] = sys.executable # "E:/python/python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable # "E:/python/python.exe"
# 创立SparkConf类目标
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") # 链式调用,不论调用什么样的办法,回来的都是同一个目标
# 根据SparkConf类目标创立SparkContext目标
sc = SparkContext(conf=conf)
rdd = sc.parallelize(["hello python", "hello word", "hello friend"])
# 需求:将RDD中的每一个单词都提取出来
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(rdd2.collect())
(3)reduceByKey办法
功用:针对KV型RDD(二元元组),自动依照key分组,然后根据你提供的聚合逻辑,完结组内数据(value)的聚合操作。
语法:
rdd.reduceByKey(func)
# func:(V,V)-> V
# 接受2个传入参数(类型要共同),回来一个回来值,类型和传入要求共同。
示例代码:
# 前面创立RDD目标不做赘述了
# 预备一个RDD
rdd = sc.parallelize([('男', 99), ('男', 80), ('男', 70), ('女', 100), ('女', 85)])
# 求男生和女生两个组的成绩之和
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd2.collect())
事例1
对下面txt文件中的单词进行计数核算
代码如下:
import sys
from pyspark import SparkConf, SparkContext
import os
# 增加'PYSPARK_PYTHON'和'PYSPARK_DRIVER_PYTHON'的履行环境
os.environ['PYSPARK_PYTHON'] = sys.executable # "E:/python/python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable # "E:/python/python.exe"
# 创立SparkConf类目标
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") # 链式调用,不论调用什么样的办法,回来的都是同一个目标
# 根据SparkConf类目标创立SparkContext目标
sc = SparkContext(conf=conf)
# 读取数据文件
rdd = sc.textFile("G:/hello.txt")
# 读取悉数单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# 将所有单词都转换成二元元组,单词为key,value设置为1
word_one_rdd = word_rdd.map(lambda word: (word, 1))
# 分组并求和传承传承
result_rdd = word_one_rdd.reduceByKey(lambda a, b: a + b)
print(result_rdd.collect())
(4)filter办法
功用:过滤想要的数据进行保存
语法:
rdd.filter(func)
# func: (T)-->bool 传入1个参数进来随意类型,回来值有必要是true or false
示例代码:
# 前面创立RDD目标不做赘述了
# 预备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 对RDD的数据进行过滤,偶数回来true,保存偶数
rdd2 = rdd.filter(lambda num: num % 2 == 0)
print(rdd2.collect())
(5)distinct办法
功用:对RDD数据进行去重,回来新RDD
语法:rdd.distinct()
实例代码:
# 前面创立RDD目标不做赘述了
# 预备一个RDD
rdd = sc.parallelize([1, 1, 2, 2, 3, 3, 4, 5, 6, 7, 7, 8, 9])
# 对RDD的数据进行去重
rdd2 = rdd.distinct()
print(rdd2.collect())
(6)stortBy办法
功用:对RDD数据进行排序,根据你指定的排序依据
语法:
rdd.sortBy(func,ascending=False, numPartitions=1)
# func: (T) -> U: 告知依照rdd中的哪个数据进行排序,比如 lambda x: [1] 表依照rdd中的第二列元素进行排序
# ascending True升序 False 降序
# numPartitions: 用多少分区排序
示例代码:
以上对单词进行计数核算的事例1中,对成果依照单词出现的次数从大到小进行排序
# 对成果进行排序
final_rdd = result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print(final_rdd.collect())
事例2
需求:对以下文件运用spark读取文件进行核算:
(1)各个城市销售额排名,从大到小
(2)悉数城市,有哪些产品类别在售卖
(3)北京市有哪些产品类别在售卖
事例代码:
import sys
from pyspark import SparkConf, SparkContext
import os
import json
# 增加'PYSPARK_PYTHON'和'PYSPARK_DRIVER_PYTHON'的履行环境
os.environ['PYSPARK_PYTHON'] = sys.executable # "E:/python/python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable # "E:/python/python.exe"
# 创立SparkConf类目标
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") # 链式调用,不论调用什么样的办法,回来的都是同一个目标
# 根据SparkConf类目标创立SparkContext目标
sc = SparkContext(conf=conf)
# (1)城市销售额排名
# 读取数据文件
file_rdd = sc.textFile("G:/orders.txt")
# 读取文件中单个json字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
# 将单个字符串转换为字典
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
# 取出城市和销售额数据(城市,销售额)
city_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
# 依照城市分组,依照销售额累计
city_result_rdd = city_money_rdd.reduceByKey(lambda a, b: a + b)
# 依照销售额累计成果进行排序
result1_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print("各个城市销售额排名,从大到小:", result1_rdd.collect())
# (2)悉数城市,有哪些产品类别在售卖
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("悉数城市,有哪些产品类别在售卖:", category_rdd.collect())
# (3)北京市有哪些产品类别在售卖
beijing_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
beijing_category = beijing_data_rdd.map(lambda x: x['category']).distinct()
print('北京市有哪些产品类别在售卖:', beijing_category.collect())
数据输出
RDD的成果输出为Python目标的各类办法
(1)collect办法
功用:将RDD各个分区内的数据,一致收集到Driver中,形成一个List目标
语法:rdd.collect()
,回来值是一个list
此办法咱们前面一直在运用,不做赘述
(2)reduce办法
功用:对RDD数据集依照你传入的逻辑进行聚合
语法:rdd.reduce(func)
,两参数传入,1个回来值,回来值和参数要求类型共同
示例代码:
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.reduce(lambda a, b: a + b))
# 输出成果为两两相加的值:15
(3)take办法
功用:取RDD的前n个元素,组成list回来
语法:rdd.take(num)
,num代表前几个元素
示例代码:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
print(rdd.take(5))
# 输出成果为:[1, 2, 3, 4, 5]
(4)count办法
功用:核算RDD有多少条数据,回来值是一个数字
用法:rdd.count()
示例代码:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
print(rdd.count())
# 输出成果为:10
注意:
关于Spark的办法(算子)还有很多很多,咱们目前不是深化学习python语言,一起也没有学习分布式,仅仅简单学习对自动化测试打下根底,所以本篇文章只介绍了最根底的几个。
将RDD的内容输出到文件中
(5)saveAsTextFile办法
功用:将RDD的数据写入文本文件中,支撑本地写出,hdfs(Hadoop分布式文件体系)等文件体系。
语法:
rdd.saveAsTextFile("../data/output/test.txt")
想要这个办法正常运转,还需求配置Hadoop依靠,自行百度配置
运转之后,内容会存在多个分区中,输出的成果是一个文件夹,有几个分区就输出多少个成果文件
修改RDD分区为1个
办法1:SparkConf目标设置conf.set("spark.default.parallelism", "1")
办法2:创立RDD的时候,sc.parallelize办法传入numSlices参数为1
# 办法1,SparkConf目标设置属性全局并行度为1:
conf = SparkConf().setMaster("Tocal[*]").setAppName("test_spark")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)
# 办法2, 创立RDD的时候设置(parallelize办法传入numSlices参数为1)
rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1)
rdd1 = sc.parallelize([1, 2, 3, 4, 5], 1)