Spark简介
Spark概述
- 什么是Spark
- 基于内存的计算引擎,它的计算速度非常快。但是仅仅只涉及到数据的计算,并没有涉及到数据的存储
- Spark的优势
- MapReduce框架局限性
- Map结果写磁盘,Reduce写HDFS,多个MR之间通过HDFS交换数据
- 任务调度和启动开销大
- 无法充分利用内存
- 不适合迭代计算(如机器学习、图计算等等),交互式处理(数据挖掘)
- 不适合流式处理(点击日志分析)
- MapReduce编程不够灵活,仅支持Map和Reduce两种操作
- Hadoop生态圈
- 批处理:MapReduce、Hive、Pig
- 流式计算:Storm
- 交互式计算:impala、presto
- 需要一种灵活的框架可同时进行批处理、流式计算、交互式计算
- 内存计算引擎,提供cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的IO开销
- DAG引擎,较少多次计算之间中间结果写到HDFS的开销
- 使用多线程模型来减少task启动开销,shuffle过程中避免不必要的sort操作以及减少磁盘IO
- spark的缺点:吃内存,不太稳定
- Spark特点
- 速度快(比MapReduce在内存中快100倍,在磁盘中快10倍)
- spaek中的job中间结果可以不落地,可以存放在内存中
- MapReduce中Map和Reduce任务都是以进程的方式运行着,而spark中的job是以线程方式运行在进程中
- 易用性(可以通过java/scala/python/R开发spark应用)
- 通用性(可以使用spark sql/spark streaming/mlib/Graphx)
- 兼容性(spark程序可以运行在standalone/yarn/mesos)
- 速度快(比MapReduce在内存中快100倍,在磁盘中快10倍)
- MapReduce框架局限性
RDD概述
- 什么是RDD
- RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算集合
- Dataset:一个数据集,简单的理解为集合,用于存放数据的
- Distributed:它的数据是分布式存储,并且可以做分布式计算
- Resilient:弹性的
- 它表示的是数据可以保存在磁盘,也可以保存在内存中
- 数据分布式也是弹性的
- 弹性:并不是指它可以动态扩展,而是容错机制
- RDD会在多个节点上存储,就和HDFS得分布式道理一样的。HDFS文件被切分为多个block存储在各个节点上,而RDD是被切分为多个partition。不同的partition可能在不同节点上
- spark读取HDFS的场景下,spark把HDFS的block读到内存就会抽象为spark的partition。
- spark计算结束,一般会把数据做持久化到Hive,HBase,HDFS等等
- 不可变:Rdd数据不可变,只能是生成一个新的Rdd
- 可分区partition
- 并行计算
- RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算集合
创建RDD
第一步创建sparkContext
- SparkContext,Spark程序入口。SparkContext代表了和Spark集群的链接,在Spark集群中通过SparkContext来创建RDD
- SparkConf创建SparkContext的时候需要一个SparkConf,用来传递Spark应用的基本信息
1
2conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)创建RDD
1 | data = [1, 2, 3, 4, 5] |
RDD常用算子
三类RDD算子
- transformation算子(该算子操作都是惰性的,不会立即计算出结果,会记录计算过程,只有在进行action操作才会计算结果)
- 从一个已经存在的数据集创建一个新的数据集
- rdd_a —–>transformation—–> rdd_b
- 从一个已经存在的数据集创建一个新的数据集
- action算子
- 获取对数据进行运算操作之后的结果
- persist操作算子
- persist操作用于将数据缓存,可以缓存在内存中,也可以在磁盘上,也可以复制到磁盘的其它节点上
Transformation算子
map
- 将func函数作用到数据集的每一个元素上,生成一个新的RDD返回
1
2
3
4rdd1 = sc.parallelize(range(1,10), 3)
rdd2 = rdd1.map(lambda x: x+1)
rdd2.collect()
[2, 3, 4, 5, 6, 7, 8, 9, 10]filter
- 选出所有func返回值为true的元素,生成一个新的RDD返回
1
2
3
4
5rdd1 = sc.parallelize(range(1,10), 3)
rdd2 = rdd1.map(lambda x: x*2)
rdd3 = rdd2.filter(lambda x: x>4)
rdd3.collect()
[6, 8, 10, 12, 14, 16, 18]flatMap
- flatMap会先执行map操作,再将所有对象合并为一个对象
1
2
3
4rdd1 = sc.parallelize(["a b c", "d e f", "h i j"])
rdd2 = rdd1.flatMap(lambda x: x.split(" "))
rdd2.collect()
['a', 'b', 'c', 'd', 'e', 'f', 'h', 'i', 'j']union
- 对两个RDD求并集
1
2
3
4
5rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("c", 3), ("d", 4)])
rdd3 = rdd1.union(rdd2)
rdd3.collect()
[("a", 1), ("b", 2), ("c", 3), ("d", 4)]intersection
- 对两个RDD求交集
1
2
3
4
5
6rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("c", 1), ("b", 3)])
rdd3 = rdd1.union(rdd2)
rdd4 = rdd3.intersection(rdd2)
rdd4.collect()
[("c", 1), ("b", 3)]groupByKey
- 以元组中的第0个元素作为key,进行分组,返回一个新的RDD
1
2
3
4
5
6
7
8
9
10
11
12
13rdd1 = sc.parallelize([("a",1),("b",2)])
rdd2 = sc.parallelize([("c",1),("b",3)])
rdd3 = rdd1.union(rdd2)
rdd4 = rdd3.groupByKey()
result = rdd4.collect()
result
[('a', <pyspark.resultiterable.ResultIterable object at 0x7fba6a5e5898>), ('c', <pyspark.resultiterable.ResultIterable object at 0x7fba6a5e5518>), ('b', <pyspark.resultiterable.ResultIterable object at 0x7fba6a5e5f28>)]
result[2]
('b', <pyspark.resultiterable.ResultIterable object at 0x7fba6c18e518>)
result[2][1]
<pyspark.resultiterable.ResultIterable object at 0x7fba6c18e518>
list(result[2][1])
[2, 3]reduceByKey
- 将key相同的键值,按照func进行计算
1
2
3rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])
rdd.reduceByKey(lambda a,y: x+y).collect()
[('b', 1), ('a', 2)]Action算子
collect
- 返回一个list,list中包含RDD中的所有元素
- 只有当数据量较小的时候使用Collect因为所有的结果都会加载到内存中
reduce
- reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后一个值为止。
1
2
3rdd1 = sc.parallelize([1, 2, 3, 4])
rdd1.reduce(lambda x,y: x+y)
15first
- 返回RDD的第一个元素
1
2sc.parallelize([3,4,5]).first()
3take
- 返回RDD的前n个元素
1
2sc.parallelize([2,3,4,5]).take(2)
[2,3]count
- 返回RDD中元素的个数
1
2sc.parallelize([1,3,5]).count()
3Spark集群
spark集群架构(Standalone模式)
Application
- 用户自己写的Spark应用程序,批处理作业的集合。Application的main方法为应用程序的入口,用户通过Spark的API,定义了RDD和对RDD的操作
Master和Worker
整个集群分为Master节点和Worker节点,相当于Hadoop的Master和Slave节点
- Master:Standalone模式中主控节点,负责接收Client提交的作业,管理Worker,并命令Worker启动Driver和Executor。
- Worker:Standalone模式中slave节点上的守护进程,负责管理本节点的资源,定期向Master汇报心跳,接收Master的命令,启动Driver和Executor
Client:客户端进程,负责提交作业到Master。
Driver:一个Spark作业运行时包括一个Driver进程,也是作业的主进程,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler,TaskScheduler
Executor:即真正执行作业的地方,一个集群一般包含多个Executor,每个Executor接收Deiver的命令Launch Task,一个Executor可以执行一个到多个Task
Spark作业相关概念
- Stage:一个Spark作业一般包含一到多个Stage
- Task:一个Stage包含一到多个Task,通过多个Task实现并行运行的功能
- DAGScheduler:实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task放到TaskScheduler中。
- TaskScheduler:实现Task分配到Executor上执行。
Spark SQL 概述
Spark SQL概念
- Spark SQL是Apache Spark用于处理结构化数据的模块
- 它是spark中用于处理结构化数据的一个模块
Spark SQL历史
- Hive是目前大数据领域,事实上的数据仓库标准
- Shark:shark底层使用spark的基于内存的计算模型,从而让计算性能比Hive提升了数倍到上百倍。
- 底层很多东西还是依赖于Hive,修改了内存管理、物理计划、执行三个模块
- 2014年6月1日,spark宣布了不再开发Shark,全面转向Spark SQL的开发
Spark SQL优势
- 速度
python操作RDD,转换为可执行代码,运行再java虚拟机,涉及两个不同语言引擎之间的切换,进行进程间通信很耗费性能。
DataFrame
- 是RDD为基础的分布式数据集,类似于传统关系型数据库的二维表,dataframe记录了对应列的名称和类型
- dataFrame引入schema和off-heap(使用操作系统层面上的内存)
- 1.解决了RDD的缺点
- 序列化和反序列化开销大
- 频繁的创建和销毁对象造成大量的GC
- 2.丢失了RDD的优点
- RDD编译时进行类型检查
- RDD具有面向对象编程的特性
用scala/python编写的RDD比Spark SQL编写转换的RDD慢,涉及到执行计划
- CatalystOptimizer:Catalyst优化器
- ProjectTungsten:钨丝计划,为了提高RDD的效率而制定的计划
- Code gen:代码生成器
直接编写RDD也可以自实现优化代码,但是远不及SparkSQL面前的优化操作后转换的RDD效率高,快1倍左右
优化引擎:类似mysql等关系型数据库基于成本的优化器
首先执行逻辑执行计划,然后转换为物理执行计划(选择成本最小的),通过Code Generation最终生成RDD
- Language-independent API
用任何语言编写生成的RDD都一样,而使用spark-core编写的RDD,不同的语言生成不同的RDD
- Schema
结构化数据,可以直接看出数据的详情
在RDD中无法看出,解释性不强,无法告诉引擎信息,没法详细优化
SparkSQL优点
- 易整合
- 统一的数据源访问
- 兼容Hive
- 提供了标准的数据库连接
DataFrame
介绍
在Spark语义中,DataFrame是一个分布式的行集合,可以想象为一个关系型数据库的表。
- Immuatable:一旦RDD、DataFrame被创建,就不能更改,只能通过transformation生成新的RDD、DataFrame
- Lazy Evaluations:只有action才会触发Transformation的执行
- Distributed:DataFrame和RDD一样都是分布式的
- dataframe和dataset统一,dataframe只是dataset[ROW]的类型别名。由于Python是弱类型语言,只能使用DataFrame
DataFrame vs RDD
- RDD:分布式的对象的集合,Spark并不知道对象的详细模式信息
- DataFrame:分布式的Row对象的集合,其提供了由列组成的详细模式信息,使得Spark SQL可以进行某些形式的执行优化。
- DataFrame和普通的RDD的逻辑框架区别如下所示:
- 左侧的RDD Spark框架本身不了解Person类的内部结构
- 右侧的DataFrame提供了详细的结构信息(schema—每列的名称、类型)
- DataFrame还配套了新的操作数据方法,DataFrame API(如:df.select())和SQL(select id, name from ***)
- DataFrame还引入了off-heap,意味着JVM堆以外的内存,这些内存直接受操作系统管理(而不是JVM)
- RDD是分布式的java对象集合,DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化
- DataFrame的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据
- 通过DataFrame API或SQL处理数据,会自动经过Spark优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行很快。
- DataFrame相当于是一个带着schema的RDD
Pandas DataFrame vs Spark DataFrame
- Cluster Parallel:集群并行执行
- Lazy Evaluations:只有action才会触发Transformation的执行
- Immutable:不可更改
- Pandas rich API:比Spark SQL api丰富
创建DataFrame
- 创建DataFrame
调用方法例如:spark.read.xxx方法
其他方式创建dataframe
- createDataFrame:pandas dataframe、list、RDD
- 数据源:RDD、csv、json、parquet、orc、jdbc
1
2
3
4jsondf = spark.read.json("xxx.json")
jsondf = spark.read.format("json").load("xxx.json")
parquetdf = spark.read.parquet("xxx.parquet")
jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/db_name").option("dbtable","table_name").option("user","xxx").option("password","xxx").load()Transformation:延迟性操作
action:立即操作
1 | from pyspark.sql import SparkSession |
优化
1 | #序列化选择kryo |
提交Python环境到spark集群执行Python脚本
1 | 将conda环境包打包成压缩包 ./envs/environment 打包成environment.zip |
SparkStreaming
SparkStreaming概述
- 它是一个可扩展,高吞吐具有容错性的流式计算框架
spark-core和spark-sql都是处理属于离线批处理任务。数据一般都是在固定位置上,通常我们写好一个脚本,每天定时取处理数据、计算、保存数据结果。但是有些任务是需要实时处理的,仅仅能够容忍的延迟1秒内。
实时计算框架对比
- Storm
- 流式计算框架
- 以record为单位处理数据
- 也支持micro-batch方式(Trident)
- Spark
- 批处理计算框架
- 以RDD为单位处理数据
- 支持micro-batch流式处理数据(Spark Streaming)
- 对比
- 吞吐量:Spark Streaming优于Storm
- 延时:Spark Streaming差于Storm
SparkStreaming组件
- Streaming Context
- 一旦一个Context已经启动(调用了Streaming Context的start()),就不能有新的流算子(Dstream)建立或者是添加到context中
- 一旦一个context已经停止,不能重新启动(Streaming Context调用了stop方法之后 就不能再次调 start())
- 在JVM(java虚拟机)中, 同一时间只能有一个Streaming Context处于活跃状态, 一个SparkContext创建一个Streaming Context
- 在Streaming Context上调用Stop方法, 也会关闭SparkContext对象, 如果只想仅关闭Streaming Context对象,设置stop()的可选参数为false
- 一个SparkContext对象可以重复利用去创建多个Streaming Context对象(不关闭SparkContext前提下), 但是需要关一个再开下一个
- DStream (离散流)
- 代表一个连续的数据流
- 在内部, DStream由一系列连续的RDD组成
- DStreams中的每个RDD都包含确定时间间隔内的数据
- 任何对DStreams的操作都转换成了对DStreams隐含的RDD的操作
- 数据源
- 基本源
- TCP/IP Socket
- FileSystem
- 高级源
- Kafka
- Flume
- 基本源
1 | import os |
Spark Streaming的状态操作
- 在Spark Streaming中存在两种状态操作
- UpdateStateByKey
- Windows操作
- 使用有状态的transformation,需要开启Checkpoint
- Spark streaming的容错机制
- 它将足够多的信息checkpoint到某些具备容错性的存储系统,如HDFS上,以便出错时能够迅速恢复
updateStateByKey
- Spark Streaming实现的时一个实时批处理操作,每隔一段时间将数据进行打包,封装成RDD,是无状态的。
- 如果我们需要拿一天的数据来进行离线处理,我们得把rdd数据放让mysql中,再取再进行计算,而updateStateByKey可以解决这种问题
监听网络端口的数据,获取到每个批次的出现的单词数量,并且需要把每个批次的信息保留下来
1 | import os |
Windows
窗口长度L:运算的数据量
滑动间隔G:控制每隔多长时间做一次运算
每隔G秒,统计最近L秒的数据
- 操作细节
- Window操作是基于窗口长度和滑动间隔来工作的
- 窗口的长度控制考虑前几批次数据量
- 默认为批处理的滑动间隔来确定计算结果的频率
- 相关函数
reduceByKeyAndWindow(func,invFunc,windowLength,slidelnterval,[num,Tasks])
func:正向操作,类似于updateStateByKey
invFunc:反向操作
监听网络端口的数据,每隔3秒统计前6秒出现的单词数量
1 | import os |
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 zoubinbf@163.com