实验五 Spark Structured Streaming编程实践

一、编写程序

(1). 按照tag分组统计生成的日志数。

在新开的终端内输入 vi spark_exercise_testsyslog1.py ,贴入如下代码并运行。运行之前需要关闭“tail终端”内的tail命令并重新运行tail命令,否则多次运行测试可能导致没有新数据生成。

#!/usr/bin/env python3from functools import partialfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import *if __name__ == "__main__":spark = SparkSession \.builder \.appName("Structuredcronlog") \.getOrCreate()lines = spark \.readStream \.format("socket") \.option("host", "localhost") \.option("port", 9988) \.load()# Nov 24 13:17:01 spark CRON[18455]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)# 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段fields = partial(regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$")words = lines.select(unix_timestamp(format_string('2022 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),fields(idx=2).alias("hadooplyf316"),fields(idx=3).alias("tag"),fields(idx=4).alias("content"),)# (1).  按照tag分组统计日志数。windowedCounts1 = words \.groupBy("tag") \.count()# 开始运行查询并在控制台输出query = windowedCounts1 \.writeStream \.outputMode("append") \.format("console") \.option('truncate', 'false')\.trigger(processingTime="3 seconds") \.start()query.awaitTermination()

(2).输出所有日志内容带spark的日志。

在新开的终端内输入 vi spark_exercise_testsyslog3.py ,贴入如下代码并运行。运行之前需要关闭“tail终端”内的tail命令并重新运行tail命令,否则多次运行测试可能导致没有新数据生成。

#!/usr/bin/env python3from functools import partialfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import *if __name__ == "__main__":spark = SparkSession \.builder \.appName("Structuredcronlog") \.getOrCreate()lines = spark \.readStream \.format("socket") \.option("host", "localhost") \.option("port", 9988) \.load()# Nov 24 13:17:01 spark CRON[18455]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)# 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段fields = partial(regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$")words = lines.select(unix_timestamp(format_string('2022 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),fields(idx=2).alias("hostname"),fields(idx=3).alias("tag"),fields(idx=4).alias("content"),)# (3).  输出所有日志内容带spark的日志(根据自己模拟的日志内容进行筛选)。windowedCounts3 = words \.filter("content like '%spark%'")# 开始运行查询并在控制台输出query = windowedCounts3 \.writeStream \.outputMode("append") \.format("console") \.option('truncate', 'false')\.trigger(processingTime="3 seconds") \.start()query.awaitTermination()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/704040.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

msvcp140_codecvt_ids.dll找不到要如何处理?简单的修复方法分享

在使用Windows操作系统时,用户可能会遇到“无法找到msvcp140_codecvt_ids.dll”这一错误信息。该提示通常发生在启动某些应用程序时,提示失去了关键的动态链接库文件(DLL)依赖。此DLL文件属于Microsoft Visual C Redistributable软…

合并K个升序链表

题目 解法一 优先级队列 思想 将每个链表中的一个节点存放到优先级队列中,本题采用小根堆,将小根堆中的根节点取出,插入到最终的链表中,并且将该节点在原链表中的下一个节点插入小根堆中(需要向下调整)&a…

七人拼团策略:深度解析奖励体系与互助合作机制

在七人拼团策略中,其精心设计的奖励体系是吸引众多参与者的核心动力。接下来,我们将详细解析这一模式中三种关键的奖励类型:直推奖、滑落奖和团队奖,并探讨它们背后的互助合作机制。 奖励体系解析 在七人拼团中,每一件…

冯喜运:5.15黄金原油晚盘分析:鲍威尔再放鹰,降息悬念重重

【黄金消息面分析】:在全球经济动荡和通胀预期不断上升的背景下,黄金作为传统的避险资产,再次成为投资者关注的焦点。当前,黄金价格交投于2370美元/盎司左右,连续两日日线呈现上涨趋势,而白银价格也在连续三…

PSAI超强插件来袭:一键提升设计效率!

无需魔法,直接在PS中完成图生图、局部重绘、线稿上色、无损放大、扩图等操作。无论你是Windows还是Mac用户,都能轻松驾驭这款强大的AI绘图工具,这款PSAI插件让你的设计工作直接起飞! 在之前的分享中,我为大家推荐过两…

【LabVIEW FPGA入门】NI 环境安装教程

注意:安装软件之前关闭杀毒软件,避免安装时损坏,安装完成在使用杀毒软件。 步骤1:判断自己是否需要LabVIEW 编程。 下面这几种情况可以调过安装LabVIEW: 不需要LabVIEW或其他语言编程,直接在MAX或仪器软面板…

LSTM与GAN创新结合!模型性能起飞,准确率超98%

今天来聊一个深度学习领域非常具有创新性的研究方向:LSTM结合GAN。 LSTM擅长处理和记忆长期的时间依赖关系,而GAN可以学习复杂的数据分布并生成逼真的数据样本。通过充分结合两者的优势,我们可以增强模型对复杂数据的处理能力,提…

react18【系列实用教程】memo —— 缓存组件 (2024最新版)

memo 的语法 如上图所示,在react中,当父组件重新渲染时,子组件也会重新渲染,即便子组件无任何变化,通过 memo 可以实现对组件的缓存,即当子组件无变化时,不再重新渲染子组件,核心代码…

MQTT_客户端安装_1.4

下载地址 MQTTX 下载 下一步直接安装即可 界面介绍

[AI]-(第1期):OpenAI-API调用

文章目录 一、OpenAI API中使用GPT-3.5-turbo模型充值方式使用模型计费方式价格说明相关限制和条款 二、接入一个OpenAI API流程1. 获取OpenAI API 密钥2. 集成ChatGPT到小程序3. 处理用户输入4. 调用OpenAI API5. 返回回复至小程序6. 持续优化7. Postman请求示例 三、通用AI客…

欢迎光临Java中的客“栈”

就目前而言,相信大家对数组、链表还有栈都基本已经有了一些了解,本篇文章将以栈为主体,探究栈和数组,栈和链表之间的一些联系。 当然在开始对栈的学习之前,我们先回顾有关数组、链表的基础知识点。 学习代码就是一个…

50.乐理基础-拍号的类型-混合拍子

混合拍子的定义: 1.由不同的单拍子组合起来的,如图1。 2.因为组合顺序有多种可能,所以次强拍的位置也有多种可能,如图3。 图1:四二拍是单拍子,四三拍也是单拍子,四二拍 与 四三拍就是 不同的单拍…