3月20日

news/2025/3/26 3:14:58/文章来源:https://www.cnblogs.com/kuandong24/p/18788666

实现窗口实时数据统计程序

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.state import StateSpec# 初始化Spark Streaming上下文
sc = SparkContext(appName="SparkStreamingAdvanced")
ssc = StreamingContext(sc, batchDuration=5)  # 设置微批处理的时间间隔为5秒
ssc.checkpoint("checkpoint")  # 设置检查点目录,用于状态恢复# 定义状态更新函数
def update_word_count(new_values, running_count):"""状态管理:更新单词计数"""if running_count is None:running_count = 0return sum(new_values, running_count)# 创建一个DStream,从Socket接收数据
# 假设数据通过Socket发送到localhost的9999端口
lines = ssc.socketTextStream("localhost", 9999)# 对接收到的每行数据进行处理
words = lines.flatMap(lambda line: line.split(" "))  # 将每行数据分割为单词
word_pairs = words.map(lambda word: (word, 1))  # 将单词映射为键值对 (word, 1)# 使用状态管理更新单词计数
word_counts = word_pairs.updateStateByKey(update_word_count)# 使用窗口操作统计过去30秒内的单词计数,窗口滑动间隔为10秒
windowed_word_counts = word_pairs.reduceByKeyAndWindow(lambda x, y: x + y,  # 窗口内数据的聚合函数lambda x, y: x - y,  # 窗口移除数据的聚合函数windowDuration=30,   # 窗口持续时间slideDuration=10     # 窗口滑动时间
)# 打印状态管理和窗口操作的结果
word_counts.pprint()
windowed_word_counts.pprint()# 启动StreamingContext
ssc.start()# 等待程序运行
ssc.awaitTermination()

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/904634.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

读DAMA数据管理知识体系指南29文件和内容管理活动

读DAMA数据管理知识体系指南29文件和内容管理活动1. 规划生命周期的管理 1.1. 从文件的创建或接收文件后的分发、存储、检索、归档和潜在的销毁 1.2. 规划包括开发分类/索引系统和分类法,以实现文件的存储和检索 1.3. 重要的是,生命周期规划中需要为档案建立具体的制度 1.4. …

c语言实验2

1 #include <stdio.h>2 #include <stdlib.h>3 #include <time.h>4 5 #define N 56 7 int main() {8 int number;9 int i; 10 11 srand(time(0)); // 以当前系统时间作为随机种子 12 for(i = 0; i < N; ++i) { 13 number = r…

使用 Browser-Use WebUI + DeepSeek 实现浏览器AI自动化全攻略

使用 Browser-Use WebUI + DeepSeek 实现浏览器AI自动化全攻略 环境准备 1. 安装 Python 环境版本要求:Python 3.11 或更高版本 验证安装:命令行执行 python --version 注意:安装时需勾选 "Add to PATH" 选项(Windows用户)2. 核心工具安装 # 安装 browser-use 框…

20234214 2024-2025-2 《Python程序设计》实验一报告

20234214 2024-2025-2 《Python程序设计》实验一报告 课程:《Python程序设计》 班级: 2342 姓名: 唐果儿 学号:20234214 实验教师:王志强 实验日期:2025年3月18日 必修/选修: 公选课 1.实验内容 (一)实验内容 1.熟悉Python开发环境; 2.练习Python运行、调试技能; …

WinForm 使用 Win32 API 实现的无边框窗口

WinForm 使用 Win32 API 实现的无边框窗口前言 时光荏苒,转眼已近是2025年了。不知不觉两年多没有研究代码了,在这期间 .NET 10 都快 RC 了,前几天刷手机看到张队公众号里有关于 .NET 9.0 AOT 发布的内容,所以写了这些代码来测试一下 AOT 编译的效果,并评估未来是否开发支…

C语言打卡学习第4天(2025.3.23)

今天只写了几道基础题,又看了下数组和冒泡排序,概念搞懂了但是写代码还是比较困难,准备明天把排序这类题好好看看。

一文速通Python并行计算:01 Python多线程编程-基本概念、切换流程、GIL锁机制和生产者与消费者模型

多线程允许程序同时执行多个任务,提升效率和响应性。线程分为新建、就绪、运行、阻塞和死亡五种状态。Python的GIL锁限制多线程并行执行,适合I/O密集型任务。生产者-消费者模型通过共享缓冲区和条件变量实现线程协作,解决数据共享问题。一文速通 Python 并行计算:01 Python…

Spring的三级缓存详解

目录 1、什么是三级缓存 2、三级缓存详解Bean实例化前属性赋值/注入前初始化后总结3、怎么解决的循环依赖 4、不用三级缓存不行吗 5、总结 一、什么是三级缓存 就是在Bean生成流程中保存Bean对象三种形态的三个Map集合,如下:

20244207 实验一 《python程序设计》实验报告

# 20244207 2024-2025-2 《Python程序设计》实验一报告 课程:《Python程序设计》 班级: 2442 姓名: 赵文萱 学号:20244207 实验教师:王志强 实验日期:2025年3月18日 必修/选修: 公选课 1.实验内容 1.熟悉Python开发环境; 2.练习Python运行、调试技能; 3.编写程序,…

20244202 《Python程序设计》实验一报告

20244202 《Python程序设计》实验一报告 课程:《Python程序设计》 班级: 2442 姓名: 陈艺豪 学号:20244219 实验教师:王志强 实验日期:2025年3月23日 必修/选修: 公选课 1.实验内容 (1).熟悉Python开发环境; (2).练习Python运行、调试技能; (3).编写程序,练习变量和类…

USTCPC 2025 游记

队名 合肥一中能不能多请点OI教练,二人队。队长 @包涵宇 ,省队爷。 Day -inf~0 随机写了几道有意思的 cf 。 bhy 又在做黑的插头 dp 。膜拜。 Day 1 早上被父母逼着学习文化课,结果作业做不完直接半红温状态,, 加上昨天做了 ~8h 文化课作业只完成了 1/2 ,然后直接不做了…