Spark

Spark简介

Spark概述

  • 什么是Spark
    • 基于内存的计算引擎,它的计算速度非常快。但是仅仅只涉及到数据的计算,并没有涉及到数据的存储
    • Spark的优势
      • MapReduce框架局限性
        • Map结果写磁盘,Reduce写HDFS,多个MR之间通过HDFS交换数据
        • 任务调度和启动开销大
        • 无法充分利用内存
        • 不适合迭代计算(如机器学习、图计算等等),交互式处理(数据挖掘)
        • 不适合流式处理(点击日志分析)
        • MapReduce编程不够灵活,仅支持Map和Reduce两种操作
      • Hadoop生态圈
        • 批处理:MapReduce、Hive、Pig
        • 流式计算:Storm
        • 交互式计算:impala、presto
      • 需要一种灵活的框架可同时进行批处理、流式计算、交互式计算
        • 内存计算引擎,提供cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的IO开销
        • DAG引擎,较少多次计算之间中间结果写到HDFS的开销
        • 使用多线程模型来减少task启动开销,shuffle过程中避免不必要的sort操作以及减少磁盘IO
      • spark的缺点:吃内存,不太稳定
      • Spark特点
        • 速度快(比MapReduce在内存中快100倍,在磁盘中快10倍)
          • spaek中的job中间结果可以不落地,可以存放在内存中
          • MapReduce中Map和Reduce任务都是以进程的方式运行着,而spark中的job是以线程方式运行在进程中
        • 易用性(可以通过java/scala/python/R开发spark应用)
        • 通用性(可以使用spark sql/spark streaming/mlib/Graphx)
        • 兼容性(spark程序可以运行在standalone/yarn/mesos)

RDD概述

  • 什么是RDD
    • RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算集合
      • Dataset:一个数据集,简单的理解为集合,用于存放数据的
      • Distributed:它的数据是分布式存储,并且可以做分布式计算
      • Resilient:弹性的
        • 它表示的是数据可以保存在磁盘,也可以保存在内存中
        • 数据分布式也是弹性的
        • 弹性:并不是指它可以动态扩展,而是容错机制
          • RDD会在多个节点上存储,就和HDFS得分布式道理一样的。HDFS文件被切分为多个block存储在各个节点上,而RDD是被切分为多个partition。不同的partition可能在不同节点上
          • spark读取HDFS的场景下,spark把HDFS的block读到内存就会抽象为spark的partition。
          • spark计算结束,一般会把数据做持久化到Hive,HBase,HDFS等等
      • 不可变:Rdd数据不可变,只能是生成一个新的Rdd
      • 可分区partition
      • 并行计算

创建RDD

  • 第一步创建sparkContext

    • SparkContext,Spark程序入口。SparkContext代表了和Spark集群的链接,在Spark集群中通过SparkContext来创建RDD
    • SparkConf创建SparkContext的时候需要一个SparkConf,用来传递Spark应用的基本信息
    1
    2
    conf = SparkConf().setAppName(appName).setMaster(master)
    sc = SparkContext(conf=conf)
  • 创建RDD

1
2
3
4
5
6
7
data = [1, 2, 3, 4, 5]
# 创建RDD
data_rdd = sc.parallelize(data)
# 创建RDD,并且分成5个分区
data_rdd = sc.parallelize(data, 5)
# 直接读取文件生成RDD
data_rdd = sc.textFile('路径')

RDD常用算子

三类RDD算子

  • transformation算子(该算子操作都是惰性的,不会立即计算出结果,会记录计算过程,只有在进行action操作才会计算结果)
    • 从一个已经存在的数据集创建一个新的数据集
      • rdd_a —–>transformation—–> rdd_b
  • action算子
    • 获取对数据进行运算操作之后的结果
  • persist操作算子
    • persist操作用于将数据缓存,可以缓存在内存中,也可以在磁盘上,也可以复制到磁盘的其它节点上

Transformation算子

  • map

    • 将func函数作用到数据集的每一个元素上,生成一个新的RDD返回
    1
    2
    3
    4
    rdd1 = sc.parallelize(range(1,10), 3)
    rdd2 = rdd1.map(lambda x: x+1)
    rdd2.collect()
    [2, 3, 4, 5, 6, 7, 8, 9, 10]
  • filter

    • 选出所有func返回值为true的元素,生成一个新的RDD返回
    1
    2
    3
    4
    5
    rdd1 = sc.parallelize(range(1,10), 3)
    rdd2 = rdd1.map(lambda x: x*2)
    rdd3 = rdd2.filter(lambda x: x>4)
    rdd3.collect()
    [6, 8, 10, 12, 14, 16, 18]
  • flatMap

    • flatMap会先执行map操作,再将所有对象合并为一个对象
    1
    2
    3
    4
    rdd1 = sc.parallelize(["a b c", "d e f", "h i j"])
    rdd2 = rdd1.flatMap(lambda x: x.split(" "))
    rdd2.collect()
    ['a', 'b', 'c', 'd', 'e', 'f', 'h', 'i', 'j']
  • union

    • 对两个RDD求并集
    1
    2
    3
    4
    5
    rdd1 = sc.parallelize([("a", 1), ("b", 2)])
    rdd2 = sc.parallelize([("c", 3), ("d", 4)])
    rdd3 = rdd1.union(rdd2)
    rdd3.collect()
    [("a", 1), ("b", 2), ("c", 3), ("d", 4)]
  • intersection

    • 对两个RDD求交集
    1
    2
    3
    4
    5
    6
    rdd1 = sc.parallelize([("a", 1), ("b", 2)])
    rdd2 = sc.parallelize([("c", 1), ("b", 3)])
    rdd3 = rdd1.union(rdd2)
    rdd4 = rdd3.intersection(rdd2)
    rdd4.collect()
    [("c", 1), ("b", 3)]
  • groupByKey

    • 以元组中的第0个元素作为key,进行分组,返回一个新的RDD
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    rdd1 = sc.parallelize([("a",1),("b",2)])
    rdd2 = sc.parallelize([("c",1),("b",3)])
    rdd3 = rdd1.union(rdd2)
    rdd4 = rdd3.groupByKey()
    result = rdd4.collect()
    result
    [('a', <pyspark.resultiterable.ResultIterable object at 0x7fba6a5e5898>), ('c', <pyspark.resultiterable.ResultIterable object at 0x7fba6a5e5518>), ('b', <pyspark.resultiterable.ResultIterable object at 0x7fba6a5e5f28>)]
    result[2]
    ('b', <pyspark.resultiterable.ResultIterable object at 0x7fba6c18e518>)
    result[2][1]
    <pyspark.resultiterable.ResultIterable object at 0x7fba6c18e518>
    list(result[2][1])
    [2, 3]
  • reduceByKey

    • 将key相同的键值,按照func进行计算
    1
    2
    3
    rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])
    rdd.reduceByKey(lambda a,y: x+y).collect()
    [('b', 1), ('a', 2)]

    Action算子

  • collect

    • 返回一个list,list中包含RDD中的所有元素
    • 只有当数据量较小的时候使用Collect因为所有的结果都会加载到内存中
  • reduce

    • reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后一个值为止。
    1
    2
    3
    rdd1 = sc.parallelize([1, 2, 3, 4])
    rdd1.reduce(lambda x,y: x+y)
    15
  • first

    • 返回RDD的第一个元素
    1
    2
    sc.parallelize([3,4,5]).first()
    3
  • take

    • 返回RDD的前n个元素
    1
    2
    sc.parallelize([2,3,4,5]).take(2)
    [2,3]
  • count

    • 返回RDD中元素的个数
    1
    2
    sc.parallelize([1,3,5]).count()
    3

    Spark集群

spark集群架构(Standalone模式)

  • Application

    • 用户自己写的Spark应用程序,批处理作业的集合。Application的main方法为应用程序的入口,用户通过Spark的API,定义了RDD和对RDD的操作
  • Master和Worker

    整个集群分为Master节点和Worker节点,相当于Hadoop的Master和Slave节点

    • Master:Standalone模式中主控节点,负责接收Client提交的作业,管理Worker,并命令Worker启动Driver和Executor。
    • Worker:Standalone模式中slave节点上的守护进程,负责管理本节点的资源,定期向Master汇报心跳,接收Master的命令,启动Driver和Executor
  • Client:客户端进程,负责提交作业到Master。

  • Driver:一个Spark作业运行时包括一个Driver进程,也是作业的主进程,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler,TaskScheduler

  • Executor:即真正执行作业的地方,一个集群一般包含多个Executor,每个Executor接收Deiver的命令Launch Task,一个Executor可以执行一个到多个Task

  • Spark作业相关概念

    • Stage:一个Spark作业一般包含一到多个Stage
    • Task:一个Stage包含一到多个Task,通过多个Task实现并行运行的功能
    • DAGScheduler:实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task放到TaskScheduler中。
    • TaskScheduler:实现Task分配到Executor上执行。

Spark SQL 概述

Spark SQL概念

  • Spark SQL是Apache Spark用于处理结构化数据的模块
    • 它是spark中用于处理结构化数据的一个模块

Spark SQL历史

  • Hive是目前大数据领域,事实上的数据仓库标准
  • Shark:shark底层使用spark的基于内存的计算模型,从而让计算性能比Hive提升了数倍到上百倍。
  • 底层很多东西还是依赖于Hive,修改了内存管理、物理计划、执行三个模块
  • 2014年6月1日,spark宣布了不再开发Shark,全面转向Spark SQL的开发

Spark SQL优势

  • 速度

python操作RDD,转换为可执行代码,运行再java虚拟机,涉及两个不同语言引擎之间的切换,进行进程间通信很耗费性能。

DataFrame

  • 是RDD为基础的分布式数据集,类似于传统关系型数据库的二维表,dataframe记录了对应列的名称和类型
  • dataFrame引入schema和off-heap(使用操作系统层面上的内存)
    • 1.解决了RDD的缺点
    • 序列化和反序列化开销大
    • 频繁的创建和销毁对象造成大量的GC
    • 2.丢失了RDD的优点
    • RDD编译时进行类型检查
    • RDD具有面向对象编程的特性

用scala/python编写的RDD比Spark SQL编写转换的RDD慢,涉及到执行计划

  • CatalystOptimizer:Catalyst优化器
  • ProjectTungsten:钨丝计划,为了提高RDD的效率而制定的计划
  • Code gen:代码生成器

直接编写RDD也可以自实现优化代码,但是远不及SparkSQL面前的优化操作后转换的RDD效率高,快1倍左右

优化引擎:类似mysql等关系型数据库基于成本的优化器

首先执行逻辑执行计划,然后转换为物理执行计划(选择成本最小的),通过Code Generation最终生成RDD

  • Language-independent API

用任何语言编写生成的RDD都一样,而使用spark-core编写的RDD,不同的语言生成不同的RDD

  • Schema

结构化数据,可以直接看出数据的详情

在RDD中无法看出,解释性不强,无法告诉引擎信息,没法详细优化

SparkSQL优点

  • 易整合
  • 统一的数据源访问
  • 兼容Hive
  • 提供了标准的数据库连接

DataFrame

介绍

在Spark语义中,DataFrame是一个分布式的行集合,可以想象为一个关系型数据库的表。

  • Immuatable:一旦RDD、DataFrame被创建,就不能更改,只能通过transformation生成新的RDD、DataFrame
  • Lazy Evaluations:只有action才会触发Transformation的执行
  • Distributed:DataFrame和RDD一样都是分布式的
  • dataframe和dataset统一,dataframe只是dataset[ROW]的类型别名。由于Python是弱类型语言,只能使用DataFrame

DataFrame vs RDD

  • RDD:分布式的对象的集合,Spark并不知道对象的详细模式信息
  • DataFrame:分布式的Row对象的集合,其提供了由列组成的详细模式信息,使得Spark SQL可以进行某些形式的执行优化。
  • DataFrame和普通的RDD的逻辑框架区别如下所示:

  • 左侧的RDD Spark框架本身不了解Person类的内部结构
  • 右侧的DataFrame提供了详细的结构信息(schema—每列的名称、类型)
  • DataFrame还配套了新的操作数据方法,DataFrame API(如:df.select())和SQL(select id, name from ***)
  • DataFrame还引入了off-heap,意味着JVM堆以外的内存,这些内存直接受操作系统管理(而不是JVM)
  • RDD是分布式的java对象集合,DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化
  • DataFrame的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据
  • 通过DataFrame API或SQL处理数据,会自动经过Spark优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行很快。
  • DataFrame相当于是一个带着schema的RDD

Pandas DataFrame vs Spark DataFrame

  • Cluster Parallel:集群并行执行
  • Lazy Evaluations:只有action才会触发Transformation的执行
  • Immutable:不可更改
  • Pandas rich API:比Spark SQL api丰富

创建DataFrame

  • 创建DataFrame

调用方法例如:spark.read.xxx方法

  • 其他方式创建dataframe

    • createDataFrame:pandas dataframe、list、RDD
    • 数据源:RDD、csv、json、parquet、orc、jdbc
    1
    2
    3
    4
    jsondf = spark.read.json("xxx.json")
    jsondf = spark.read.format("json").load("xxx.json")
    parquetdf = spark.read.parquet("xxx.parquet")
    jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/db_name").option("dbtable","table_name").option("user","xxx").option("password","xxx").load()
  • Transformation:延迟性操作

  • action:立即操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark = SparkSession.builder.appName('test').getOrCreate()
sc = spark.sparkContext
## 直接创建
list_rdd = [('a', 11), ('b', 15), ('c', 20), ('d', 25)]
rdd = sc.parallelize(list_rdd)
#为数据添加列名
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
# 创建DataFrame
df = spark.createDataFrame(people)
# 显示数据结构
df.printSchema()
# 显示前10条数据
df.show()
# 统计行数
df.count()
# 显示列名
df.columns
# 增加一个新列,如果列名是原本就有的,就会替换原有列
df.withColumn("列名",df.age*2)
# 删除列
df.drop('列名').show()
# 提取部分列
df.select('列名1''列名2').show()
# 分组统计
df.groupby().age({'列名1':'函数名', '列名2':'函数名'}).show()
# 自带函数
# avg(), count(), countDistinct(), first(), kurtosis(),
# max(), mean(), min(), skewness(), stddev(), stddev_pop(),
# stddev_samp(), sum(), sumDistinct(), var_pop(), var_samp() variance()
# 自定义的汇总方法
import pyspark.sql.functions as fn
# 调用函数并起一个别名
df.agg(fn.count('SepalWidth').alias('width_count'),fn.countDistinct('cls').alias('distinct_cls_count')).show()
# 按比例拆分数据集
trainDF,testDF = df.randomSplit([0.8, 0.2])
# 采样数据sample(是否有放回的采样,采样比例,随机种子)
df.sample(False, 0.2, 100).show()
# 查看两个数据集在类别上的差异
testDF.select('cls').subtract(trainDF.select('cls')).distinct().count()
# 交叉表
df.crosstab('cls','SepalLength').show()

优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#序列化选择kryo
#内存管理
spark.memory.fraction=0.6 #设置计算和存储的内存使用率
spark.memory.storageFraction=0.5 #设置存储内存避免被驱逐的比例
#查看ddr每次使用
# 广播变量
broadcast_var = sc.broadcast([1,2,3]) #会在每台机器上保存这个变量,之后使用可以直接在本机上获取
# 数据本地性 同jvm,同node,同机架,同网络
spark.locality.wait=3s #等待节点资源时间

--master yarn-cluster (or yarn-client)(集群模式)
--num-executors 50 (Executor个数)
--executor-memory 6G (每个Executor进程的内存)
--conf spark.executor.cores=4 (每个Executor进程的CPU core数量)
--conf spark.yarn.executor.memoryOverhead=2048(Ececutor堆外内存)
--driver-memory 2G (Driver进程的内存)
--conf spark.default.parallelism=150 (分区个数)
--conf spark.dynamicAllocation.enable=true //打开动态executor模式
--conf spark.shuffle.service.enabled=true //动态executor需要的服务,需要和上面的spark.dynamicAllocation.enable同时打开
--conf spark.storage.memoryFraction=0.2(持久化数据在Executor内存占比)
exector、storage内存分配
--executor-memory 10G
--conf spark.shuffle.memoryFraction=0.5
--conf spark.sql.shuffle.partitions=20 (shuffle后partition个数)
--conf spark.shuffle.compress=true (shuffle过程是否压缩)
--conf spark.shuffle.file.buffer=512 (数据写入磁盘前buffer缓冲)
--conf spark.reducer.maxSizeInFlight=256m
--conf spark.shuffle.io.maxRetries=20 (网络问题重试次数)
--spark.shuffle.io.retryWait=5s (重试间隔时间)

提交Python环境到spark集群执行Python脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 将conda环境包打包成压缩包 ./envs/environment   打包成environment.zip
# 可以使用下面的命令执行python脚本了

spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 1 \
--executor-cores 1 \
--archives hdfs://tmp/zoubin1/environment.zip#environment \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/environment/bin/python3.6 \
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./environment/environment/bin/python3.6 \
--conf spark.executorEnv.PYSPARK_PYTHON=./environment/environment/bin/python3.6 \
--conf spark.executorEnv.PYSPARK_DRIVER_PYTHON=./environment/environment/bin/python3.6 \
aa.py


SparkStreaming

SparkStreaming概述

  • 它是一个可扩展,高吞吐具有容错性的流式计算框架

spark-core和spark-sql都是处理属于离线批处理任务。数据一般都是在固定位置上,通常我们写好一个脚本,每天定时取处理数据、计算、保存数据结果。但是有些任务是需要实时处理的,仅仅能够容忍的延迟1秒内。

实时计算框架对比

  • Storm
    • 流式计算框架
    • 以record为单位处理数据
    • 也支持micro-batch方式(Trident)
  • Spark
    • 批处理计算框架
    • 以RDD为单位处理数据
    • 支持micro-batch流式处理数据(Spark Streaming)
  • 对比
    • 吞吐量:Spark Streaming优于Storm
    • 延时:Spark Streaming差于Storm

SparkStreaming组件

  • Streaming Context
    • 一旦一个Context已经启动(调用了Streaming Context的start()),就不能有新的流算子(Dstream)建立或者是添加到context中
    • 一旦一个context已经停止,不能重新启动(Streaming Context调用了stop方法之后 就不能再次调 start())
    • 在JVM(java虚拟机)中, 同一时间只能有一个Streaming Context处于活跃状态, 一个SparkContext创建一个Streaming Context
    • 在Streaming Context上调用Stop方法, 也会关闭SparkContext对象, 如果只想仅关闭Streaming Context对象,设置stop()的可选参数为false
    • 一个SparkContext对象可以重复利用去创建多个Streaming Context对象(不关闭SparkContext前提下), 但是需要关一个再开下一个
  • DStream (离散流)
    • 代表一个连续的数据流
    • 在内部, DStream由一系列连续的RDD组成
    • DStreams中的每个RDD都包含确定时间间隔内的数据
    • 任何对DStreams的操作都转换成了对DStreams隐含的RDD的操作
    • 数据源
      • 基本源
        • TCP/IP Socket
        • FileSystem
      • 高级源
        • Kafka
        • Flume
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "python解释器路径"
JAVA_HOME='jdk路径'
SPARK_HOME = "spark路径"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":

sc = SparkContext("local[2]",appName="NetworkWordCount")
#参数2:指定执行计算的时间间隔
ssc = StreamingContext(sc, 1)
#监听ip,端口上的上的数据
lines = ssc.socketTextStream('localhost',9999)
#将数据按空格进行拆分为多个单词
words = lines.flatMap(lambda line: line.split(" "))
#将单词转换为(单词,1)的形式
pairs = words.map(lambda word:(word,1))
#统计单词个数
wordCounts = pairs.reduceByKey(lambda x,y:x+y)
#打印结果信息,会使得前面的transformation操作执行
wordCounts.pprint()
#启动StreamingContext
ssc.start()
#等待计算结束
ssc.awaitTermination()

Spark Streaming的状态操作

  • 在Spark Streaming中存在两种状态操作
    • UpdateStateByKey
    • Windows操作
  • 使用有状态的transformation,需要开启Checkpoint
    • Spark streaming的容错机制
    • 它将足够多的信息checkpoint到某些具备容错性的存储系统,如HDFS上,以便出错时能够迅速恢复

updateStateByKey

  • Spark Streaming实现的时一个实时批处理操作,每隔一段时间将数据进行打包,封装成RDD,是无状态的。
  • 如果我们需要拿一天的数据来进行离线处理,我们得把rdd数据放让mysql中,再取再进行计算,而updateStateByKey可以解决这种问题

监听网络端口的数据,获取到每个批次的出现的单词数量,并且需要把每个批次的信息保留下来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "python解释器路径"
JAVA_HOME='java路径'
SPARK_HOME = "spark路径"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession

# 创建SparkContext
spark = SparkSession.builder.master("local[2]").getOrCreate()
sc = spark.sparkContext

ssc = StreamingContext(sc, 3)
#开启检查点
ssc.checkpoint("checkpoint")

#定义state更新函数
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)

lines = ssc.socketTextStream("localhost", 9999)
# 对数据以空格进行拆分,分为多个单词
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.updateStateByKey(updateFunc=updateFunc)#应用updateStateByKey函数

counts.pprint()

ssc.start()
ssc.awaitTermination()

Windows

  • 窗口长度L:运算的数据量

  • 滑动间隔G:控制每隔多长时间做一次运算

每隔G秒,统计最近L秒的数据

  • 操作细节
    • Window操作是基于窗口长度和滑动间隔来工作的
    • 窗口的长度控制考虑前几批次数据量
    • 默认为批处理的滑动间隔来确定计算结果的频率
  • 相关函数

reduceByKeyAndWindow(func,invFunc,windowLength,slidelnterval,[num,Tasks])

func:正向操作,类似于updateStateByKey

invFunc:反向操作

监听网络端口的数据,每隔3秒统计前6秒出现的单词数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "python解释器"
JAVA_HOME='java路径'
SPARK_HOME = "spark路径"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession

def get_countryname(line):
country_name = line.strip()

if country_name == 'usa':
output = 'USA'
elif country_name == 'ind':
output = 'India'
elif country_name == 'aus':
output = 'Australia'
else:
output = 'Unknown'

return (output, 1)

if __name__ == "__main__":
#定义处理的时间间隔
batch_interval = 1 # base time unit (in seconds)
#定义窗口长度
window_length = 6 * batch_interval
#定义滑动时间间隔
frequency = 3 * batch_interval

#获取StreamingContext
spark = SparkSession.builder.master("local[2]").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, batch_interval)

#需要设置检查点
ssc.checkpoint("checkpoint")

lines = ssc.socketTextStream('localhost', 9999)
addFunc = lambda x, y: x + y
invAddFunc = lambda x, y: x - y
#调用reduceByKeyAndWindow,来进行窗口函数的调用
window_counts = lines.map(get_countryname) \
.reduceByKeyAndWindow(addFunc, invAddFunc, window_length, frequency)
#输出处理结果信息
window_counts.pprint()

ssc.start()
ssc.awaitTermination()

转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 zoubinbf@163.com

×

喜欢就点赞,疼爱就打赏