大数据分析与应用实验任务十一

大数据分析与应用实验任务十一

实验目的

  • 通过实验掌握spark Streaming相关对象的创建方法;

  • 熟悉spark Streaming对文件流、套接字流和RDD队列流的数据接收处理方法;

  • 熟悉spark Streaming的转换操作,包括无状态和有状态转换。

  • 熟悉spark Streaming输出编程操作。

实验任务

一、DStream 操作概述
  1. 创建 StreamingContext 对象

    登录 Linux 系统后,启动 pyspark。进入 pyspark 以后,就已经获得了一个默认的 SparkConext 对象,也就是 sc。因此,可以采用如下方式来创建 StreamingContext 对象:

    from pyspark.streaming import StreamingContext 
    sscluozhongye = StreamingContext(sc, 1)
    

    image-20231207112253827

    如果是编写一个独立的 Spark Streaming 程序,而不是在 pyspark 中运行,则需要在代码文件中通过类似如下的方式创建 StreamingContext 对象:

    from pyspark import SparkContext, SparkConf 
    from pyspark.streaming import StreamingContext 
    conf = SparkConf() 
    conf.setAppName('TestDStream') 
    conf.setMaster('local[2]') 
    sc = SparkContext(conf = conf) 
    ssc = StreamingContext(sc, 1)
    print("创建成功,lzy防伪")
    

    image-20231207112652285

二、基本输入源
  1. 文件流
  • 在 pyspark 中创建文件流

    首先,在 Linux 系统中打开第 1 个终端(为了便于区分多个终端,这里记作“数据源终端”),创建一个 logfile 目录,命令如下:

    cd /root/Desktop/luozhongye/
    mkdir streaming 
    cd streaming 
    mkdir logfile
    

    image-20231207112923323

    其次,在 Linux 系统中打开第二个终端(记作“流计算终端”),启动进入 pyspark,然后,依次输入如下语句:

    from pyspark import SparkContext 
    from pyspark.streaming import StreamingContext 
    ssc = StreamingContext(sc, 10) 
    lines = ssc.textFileStream('file:///root/Desktop/luozhongye/streaming/logfile') 
    words = lines.flatMap(lambda line: line.split(' ')) 
    wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b) 
    wordCounts.pprint() 
    ssc.start() 
    ssc.awaitTermination()
    

image-20231207113305405

  • 采用独立应用程序方式创建文件流

    #!/usr/bin/env python3 
    from pyspark import SparkContext, SparkConf 
    from pyspark.streaming import StreamingContext 
    conf = SparkConf() 
    conf.setAppName('TestDStream') 
    conf.setMaster('local[2]') 
    sc = SparkContext(conf = conf) 
    ssc = StreamingContext(sc, 10) 
    lines = ssc.textFileStream('file:///root/Desktop/luozhongye/streaming/logfile') 
    words = lines.flatMap(lambda line: line.split(' ')) 
    wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b) 
    wordCounts.pprint() 
    ssc.start() 
    ssc.awaitTermination()
    print("2023年12月7日lzy")
    

    保存该文件,并执行以下命令:

    cd /root/Desktop/luozhongye/streaming/logfile/ 
    spark-submit FileStreaming.py
    

image-20231207114014647

  1. 套接字流
  • 使用套接字流作为数据源

    新建一个代码文件“/root/Desktop/luozhongye/streaming/socket/NetworkWordCount.py”,在NetworkWordCount.py 中输入如下内容:

    #!/usr/bin/env python3 
    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: NetworkWordCount.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingNetworkWordCount")ssc = StreamingContext(sc, 1)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)counts.pprint()ssc.start()ssc.awaitTermination()
    

    使用如下 nc 命令生成一个 Socket 服务器端:

    nc -lk 9999
    

    新建一个终端(记作“流计算终端”),执行如下代码启动流计算:

    cd /root/Desktop/luozhongye/streaming/socket 
    /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
    

image-20231208002212790

  • 使用 Socket 编程实现自定义数据源

    新建一个代码文件“/root/Desktop/luozhongye/streaming/socket/DataSourceSocket.py”,在 DataSourceSocket.py 中输入如下代码:

    #!/usr/bin/env python3 
    import socket# 生成 socket 对象
    server = socket.socket()
    # 绑定 ip 和端口
    server.bind(('localhost', 9999))
    # 监听绑定的端口
    server.listen(1)
    while 1:# 为了方便识别,打印一个“I’m waiting the connect...”print("I'm waiting the connect...")# 这里用两个值接收,因为连接上之后使用的是客户端发来请求的这个实例# 所以下面的传输要使用 conn 实例操作conn, addr = server.accept()# 打印连接成功print("Connect success! Connection is from %s " % addr[0])# 打印正在发送数据print('Sending data...')conn.send('I love hadoop I love spark hadoop is good spark is fast'.encode())conn.close()print('Connection is broken.')
    print("2023年12月7日lzy")
    

    执行如下命令启动 Socket 服务器端:

    cd /root/Desktop/luozhongye/streaming/socket 
    /usr/local/spark/bin/spark-submit DataSourceSocket.py
    

    新建一个终端(记作“流计算终端”),输入以下命令启动 NetworkWordCount 程序:

    cd /root/Desktop/luozhongye/streaming/socket 
    /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
    

image-20231208003303167

  1. RDD 队列流

    Linux 系统中打开一个终端,新建一个代码文件“/root/Desktop/luozhongye/ streaming/rddqueue/ RDDQueueStream.py”,输入以下代码:

    #!/usr/bin/env python3 
    import time
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContextif __name__ == "__main__":print("")sc = SparkContext(appName="PythonStreamingQueueStream")ssc = StreamingContext(sc, 2)# 创建一个队列,通过该队列可以把 RDD 推给一个 RDD 队列流rddQueue = []for i in range(5):rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]time.sleep(1)# 创建一个 RDD 队列流inputStream = ssc.queueStream(rddQueue)mappedStream = inputStream.map(lambda x: (x % 10, 1))reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)reducedStream.pprint()ssc.start()ssc.stop(stopSparkContext=True, stopGraceFully=True)
    

    下面执行如下命令运行该程序:

    cd /root/Desktop/luozhongye/streaming/rddqueue 
    /usr/local/spark/bin/spark-submit RDDQueueStream.py
    

image-20231208004439462

三、转换操作
  1. 滑动窗口转换操作

    对“套接字流”中的代码 NetworkWordCount.py 进行一个小的修改,得到新的代码文件“/root/Desktop/luozhongye/streaming/socket/WindowedNetworkWordCount.py”,其内容如下:

    #!/usr/bin/env python3 
    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: WindowedNetworkWordCount.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingWindowedNetworkWordCount")ssc = StreamingContext(sc, 10)ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/socket/checkpoint")lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)counts.pprint()ssc.start()ssc.awaitTermination()
    

为了测试程序的运行效果,首先新建一个终端(记作“数据源终端”),执行如下命令运行nc 程序:

   cd /root/Desktop/luozhongye/streaming/socket/ nc -lk 9999

然后,再新建一个终端(记作“流计算终端”),运行客户端程序 WindowedNetworkWordCount.py,命令如下:

   cd /root/Desktop/luozhongye/streaming/socket/ /usr/local/spark/bin/spark-submit WindowedNetworkWordCount.py localhost 9999

在数据源终端内,连续输入 10 个“hadoop”,每个 hadoop 单独占一行(即每输入一个 hadoop就按回车键),再连续输入 10 个“spark”,每个 spark 单独占一行。这时,可以查看流计算终端内显示的词频动态统计结果,可以看到,随着时间的流逝,词频统计结果会发生动态变化。

image-20231208005821701

  1. updateStateByKey 操作

    在“/root/Desktop/luozhongye/streaming/stateful/”目录下新建一个代码文件 NetworkWordCountStateful.py,输入以下代码:

    #!/usr/bin/env python3 
    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")ssc = StreamingContext(sc, 1)ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/stateful/")# RDD with initial state (key, value) pairsinitialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])def updateFunc(new_values, last_sum):return sum(new_values) + (last_sum or 0)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))running_counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.updateStateByKey(updateFunc, initialRDD=initialStateRDD)running_counts.pprint()ssc.start()ssc.awaitTermination()
    

    新建一个终端(记作“数据源终端”),执行如下命令启动 nc 程序:

    nc -lk 9999
    

    新建一个 Linux 终端(记作“流计算终端”),执行如下命令提交运行程序:

    cd /root/Desktop/luozhongye/streaming/stateful 
    /usr/local/spark/bin/spark-submit NetworkWordCountStateful.py localhost 9999
    

image-20231208010814959

四、把 DStream 输出到文本文件中

下面对之前已经得到的“/root/Desktop/luozhongye/streaming/stateful/NetworkWordCountStateful.py”代码进行简单的修改,把生成的词频统计结果写入文本文件中。

修改后得到的新代码文件“/root/Desktop/luozhongye/streaming/stateful/NetworkWordCountStatefulText.py”的内容如下:

#!/usr/bin/env python3 
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")ssc = StreamingContext(sc, 1)ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/stateful/")# RDD with initial state (key, value) pairs initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])def updateFunc(new_values, last_sum):return sum(new_values) + (last_sum or 0)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))running_counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.updateStateByKey(updateFunc, initialRDD=initialStateRDD)running_counts.saveAsTextFiles("file:///root/Desktop/luozhongye/streaming/stateful/output")running_counts.pprint()ssc.start()ssc.awaitTermination()

新建一个终端(记作“数据源终端”),执行如下命令运行nc 程序:

cd /root/Desktop/luozhongye/streaming/socket/ 
nc -lk 9999

新建一个 Linux 终端(记作“流计算终端”),执行如下命令提交运行程序:

cd /root/Desktop/luozhongye/streaming/stateful 
/usr/local/spark/bin/spark-submit NetworkWordCountStatefulText.py localhost 9999

image-20231208012123002

实验心得

通过本次实验,我深入理解了Spark Streaming,包括创建StreamingContext、DStream等对象。同时,我了解了Spark Streaming对不同类型数据流的处理方式,如文件流、套接字流和RDD队列流。此外,我还熟悉了Spark Streaming的转换操作和输出编程操作,并掌握了map、flatMap、filter等方法。最后,我能够自定义输出方式和格式。总之,这次实验让我全面了解了Spark Streaming,对未来的学习和工作有很大的帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/256651.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【深度学习】一维数组的聚类

在学习聚类算法的过程中&#xff0c;学习到的聚类算法大部分都是针对n维的&#xff0c;针对一维数据的聚类方式较少&#xff0c;今天就来学习下如何给一维的数据进行聚类。 方案一&#xff1a;采用K-Means对一维数据聚类 Python代码如下&#xff1a; from sklearn.cluster im…

[VSCode] Java开发环境配置

文章目录 1 VSCode & Java 安装1.1 安装 VSCode1.2 安装 JDK 2 环境变量配置3 在 VSCode 中安装 Java 扩展4 运行测试 1 VSCode & Java 安装 1.1 安装 VSCode Visual Studio Code 官方下载 地址&#xff1a; https://code.visualstudio.com/详细安装步骤这里不做赘…

m1编译xgboost的jar报错

1、编译 cd jvm-package包&#xff0c;然后进行编译mvn install -P libxgboost,java -DskipTests 2、报错信息&#xff1a; 3、解决方法 A、在jvm-packages包下找到xgboost4j中找到pom.xml&#xff0c;如图&#xff1a; B、修改python的额目录或者脚本&#xff1a; C、继续执…

图像处理之把模糊的图片变清晰

1.图片如果是有雾化效果的对图像产生影响的,要先进行图形增强,Retinex是基于深度神经网络了,我在之前图形处理的文章一路从神经网络(概率统计)—>积卷神经网络(对区域进行概率统计,对图片进行切割多个识别对象)–>深度积卷神经网络(RetinexNet也是模拟人脑的处理过程,增加…

【无线网络技术】——无线城域网(学习笔记)

&#x1f4d6; 前言&#xff1a;无线城域网&#xff08;WMAN&#xff09;是指在地域上覆盖城市及其郊区范围的分布节点之间传输信息的本地分配无线网络。能实现语音、数据、图像、多媒体、IP等多业务的接入服务。其覆盖范围的典型值为3~5km&#xff0c;点到点链路的覆盖可以高达…

Pytorch线性回归教程

import torch import numpy as np import torch.nn as nn import matplotlib.pyplot as plt生成测试数据 # 长期趋势 def trend(time, slope0):return slope * time# 季节趋势 def seasonal_pattern(season_time):return np.where(season_time < 0.4,np.cos(season_time * …

微信小程序js数组对象根据某个字段排序

一、排序栗子 注: 属性字段需要进行转换,如String类型或者Number类型 //升序排序 首元素(element1)在前 降序则(element1)元素在后 data data.sort((element1, element2) >element1.属性 - element2.属性 ); 二、代码 Page({/*** 页面的初始数据*/data: {user:…

算能 MilkV Duo开发板实战——opencv-mobile (迷你版opencv库)的移植和应用

前言 OpenCV是一种开源的计算机视觉和机器学习软件库&#xff0c;旨在提供一组通用的计算机视觉工具。它用于图像处理、目标识别、人脸识别、机器学习等领域&#xff0c;广泛应用于计算机视觉任务。 OpenCV-Mobile是OpenCV库的轻量版本&#xff0c;专为移动平台&#xff08;A…

水果党flstudio用什么midi键盘?哪个版本的FL Studio更适合我

好消息&#xff01;好消息&#xff01;特大好消息&#xff01; 水果党们&#xff01;终于有属于自己的专用MIDI键盘啦&#xff01; 万众期待的Novation FLKEY系列 正式出炉&#xff01; 话有点多话&#xff0c;先分享一份干货&#xff0c;尽快下载 FL Studio 21 Win-安装包&…

Android Audio实战——音频链路分析(二十五)

在 Android 系统的开发过程当中,音频异常问题通常有如下几类:无声、调节不了声音、爆音、声音卡顿和声音效果异常(忽大忽小,低音缺失等)等。尤其声音效果这部分问题通常从日志上信息量较少,相对难定位根因。想要分析此类问题,便需要对声音传输链路有一定的了解,能够在链…

qt creator配置opencv库 (MSVC版本)

目录 1. MSVC版本 1.1 使用cmake编译opencv 1.2 再使用visual studio 2019生成opencv的lib,dll 1.3 配置opencv的系统环境变量 1.4 新建qt项目 1. MSVC版本 1.1 使用cmake编译opencv 1.2 再使用visual studio 2019生成opencv的lib,dll 1.3 配置opencv的系统环境变量 D:…

【推荐系统】了解推荐系统的生态(重点:推荐算法的主要分类)

【大家好&#xff0c;我是爱干饭的猿&#xff0c;本文重点介绍推荐系统的关键元素和思维模式、推荐算法的主要分类、推荐系统常见的问题、推荐系统效果评测。 后续会继续分享其他重要知识点总结&#xff0c;如果喜欢这篇文章&#xff0c;点个赞&#x1f44d;&#xff0c;关注一…