Hadoop: word count,并将reduce结果写入ES

一、依赖,其中ES版本为7.6.2

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>TestHadoop3</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.2.1</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.6</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.12.3</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.6.2</version></dependency></dependencies></project>

二、定义表示word count 结果的实体类

package cn.edu.tju;import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;public class MyDBWritable implements DBWritable, Writable {private String word;private int count;public String getWord() {return word;}public void setWord(String word) {this.word = word;}public int getCount() {return count;}public void setCount(int count) {this.count = count;}@Overridepublic void write(PreparedStatement statement) throws SQLException {statement.setString(1, word);statement.setInt(2, count);}@Overridepublic void readFields(ResultSet resultSet) throws SQLException {word = resultSet.getString(1);count = resultSet.getInt(2);}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(word);out.writeInt(count);}@Overridepublic void readFields(DataInput in) throws IOException {word = in.readUTF();count = in.readInt();}
}

三、定义mapper和reducer

package cn.edu.tju;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private IntWritable one = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String valueString = value.toString();String[] values = valueString.split(" ");for (String val : values) {context.write(new Text(val), one);}}
}
package cn.edu.tju;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.Iterator;public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {Iterator<IntWritable> iterator = values.iterator();int count = 0;while (iterator.hasNext()) {count += iterator.next().get();}context.write(key, new IntWritable(count));}
}

四、自定义类,实现接口OutputFormat

package cn.edu.tju;import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;import java.io.IOException;public class MyOutputFormat extends OutputFormat<Text, IntWritable> {@Overridepublic RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {return new MyRecordWriter();}@Overridepublic void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {}@Overridepublic OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {NullOutputFormat<NullWritable, NullWritable> format = new NullOutputFormat<>();OutputCommitter outputCommitter = format.getOutputCommitter(context);return outputCommitter;}
}

其中getRecordWriter方法返回了一个RecordWriter类的对象。自定义的RecordWriter类如下:

package cn.edu.tju;import com.google.gson.Gson;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;import java.io.IOException;
import java.util.Random;
import java.util.UUID;public class MyRecordWriter extends RecordWriter<Text, IntWritable> {private RestHighLevelClient client;public MyRecordWriter() {client = new RestHighLevelClient(RestClient.builder(HttpHost.create("xx.xx.xx.xx:9200")));}@Overridepublic void write(Text key, IntWritable value) throws IOException, InterruptedException {IndexRequest indexRequest = new IndexRequest("demo6");indexRequest.id(UUID.randomUUID().toString());MyDBWritable myDBWritable = new MyDBWritable();myDBWritable.setWord(key.toString());myDBWritable.setCount(value.get());Gson gson = new Gson();String s = gson.toJson(myDBWritable);indexRequest.source(s, XContentType.JSON);IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);System.out.println(indexResponse);}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {client.close();}
}

其中构造方法创建了一个ES 客户端对象。write方法用来把数据写入ES.

五、定义主类

package cn.edu.tju;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class MyWordCount6 {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration(true);//hdfs路径configuration.set("fs.defaultFS", "hdfs://xx.xx.xx.xx:9000/");//local 运行configuration.set("mapreduce.framework.name", "local");configuration.set("yarn.resourcemanager.hostname","xx.xx.xx.xx");//configuration.set("mapreduce.job.jar","target\\TestHadoop3-1.0-SNAPSHOT.jar");//job 创建Job job = Job.getInstance(configuration);//job.setJarByClass(MyWordCount.class);//job namejob.setJobName("wordcount-" + System.currentTimeMillis());//输入数据路径FileInputFormat.setInputPaths(job, new Path("/user/root/tju/"));job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setMapperClass(WordCountMapper.class);//job.setCombinerClass(MyCombiner.class);job.setReducerClass(WordCountReducer.class);job.setOutputFormatClass(MyOutputFormat.class);//等待任务执行完成job.waitForCompletion(true);}
}

其中 job.setOutputFormatClass(MyOutputFormat.class);用来指定reduce的结果写到哪里。输入数据来自hdfs
六、任务执行后在ES查询数据
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/591835.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STL中各类容器详细介绍

STL介绍 STL&#xff08;Standard Template Library&#xff09;&#xff0c;即标准模板库&#xff0c;是一个具有工业强度的&#xff0c;高效的C程序库。它被容纳于C标准程序库&#xff08;C Standard Library&#xff09;中&#xff0c;是ANSI/ISO C标准中最新的也是极具革命…

Python | NCL风格 | EOF | 相关 | 回归

这里在linux系统上使用geocat实现NCL风格的图片绘制 geocat Linux上安装 geocat conda update condaconda create -n geocat -c conda-forge geocat-vizconda activate geocatconda update geocat-viz Dataset - NOAA Optimum Interpolation (OI) SST V2 # 海温月平均数据- lsm…

讲讲你对数据结构-线性表了解多少?

线性表 - 数组和矩阵 当谈到线性表时&#xff0c;数组和矩阵是两种常见的数据结构。 数组&#xff08;Array&#xff09;&#xff1a; 数组是有序的元素集合&#xff0c;可以通过索引来访问和操作其中的元素。它是最简单、最基本的数据结构之一。数组的特点包括&#xff1a; …

哈哈哈哈哈

欢迎使用Markdown编辑器 你好&#xff01; 这是你第一次使用 Markdown编辑器 所展示的欢迎页。如果你想学习如何使用Markdown编辑器, 可以仔细阅读这篇文章&#xff0c;了解一下Markdown的基本语法知识。 222 我们对Markdown编辑器进行了一些功能拓展与语法支持&#xff0c;…

java replaceFirst抛出异常

问题描述 replace和replaceAll出现异常 问题原因 其中replace使用的是普通的KMP替换&#xff0c;而replaceAll和replaceFirst是正则表达式。 当出现特殊字符或者匹配正则表达式的时候&#xff08;常见是&#xff08;&#xff09;$^\&#xff09;&#xff0c;会直接出现正则表…

CSS基础:简单介绍CSS

你好&#xff0c;我是云桃桃。 一个希望帮助更多朋友快速入门 WEB 前端的程序媛。大专生&#xff0c;2年时间从1800到月入过万&#xff0c;工作5年买房。 分享成长心得。 259篇原创内容-公众号 后台回复“前端工具”可获取开发工具&#xff0c;持续更新中 后台回复“前端基础…

java中Date类,SimpleDateFormat类和Calendar类

Date类 public Date() 创建一个Date对象&#xff0c;代表的是系统当前此刻的日期时间 public Date(long date) Constructs a Date object using the given milliseconds time value. 把时间毫秒值转变成Date日期对象 public void setTime(long date) Sets an existing Date ob…

SQLite下一代查询规划器(十)

返回&#xff1a;SQLite—系列文章目录 上一篇&#xff1a;SQLite 查询优化器概述&#xff08;九&#xff09; 下一篇&#xff1a;SQLite的架构&#xff08;十一&#xff09; 1. 引言 “查询规划器”的任务是弄清楚 找出完成 SQL 语句的最佳算法或“查询计划”。 从 SQLi…

顶象全系列产品升级,对抗AI带来的新威胁

4月2日&#xff0c;在顶象CSDN直播间&#xff0c;最新一期“业务安全大讲堂”上&#xff0c;顶象数据科学家翼龙与资深解决方案专家鳯羽&#xff0c;围绕当前备受关注的AI威胁、人脸风险进行深入分享&#xff0c;并详细介绍针对AI威胁的最新反欺诈技术与安全产品。 AI威胁带来的…

【信号处理】基于变分自编码器(VAE)的图片典型增强方法实现

关于 深度学习中&#xff0c;经常面临图片数据量较小的问题&#xff0c;此时&#xff0c;对数据进行增强&#xff0c;显得比较重要。传统的图片增强方法包括剪切&#xff0c;增加噪声&#xff0c;改变对比度等等方法&#xff0c;但是&#xff0c;对于后端任务的性能提升有限。…

面试复盘1 - 测试相关(实习)

写在前&#xff1a;hello&#xff0c;大家早中晚上好~这里是西西&#xff0c;最近有在准备测试相关的面试&#xff0c;特此开设了新的篇章&#xff0c;针对于面试中的问题来做一下复盘&#xff0c;会把我自己遇到的问题进行整理&#xff0c;除此之外还会进行对一些常见面试题的…

【解决】Unity Profile | FindMainCamera

开发平台&#xff1a;Unity 2020.3.7f1c1 关键词&#xff1a;FindMainCamera   问题背景 ModelViewer 是开发者基于 UnityEngine 编写的相机控制组件。ModelView.Update 中调度52次并触发3次GC.Collect。显然并不期望并尽可能避免 Update 造成的GC 问题。事实上 FindMainCame…