Hadoop?MapReduce实现单词计数(Word?Count)

 更新时间:2023年05月22日 11:42:02   作者:orion-orion  
这篇文章主要为大家详细介绍了如何利用Hadoop实现单词计数(Word?Count)的MapReduce,文中的示例代码讲解详细,感兴趣的可以跟随小编一起学习一下
(福利推荐:【腾讯云】服务器最新限时优惠活动,云服务器1核2G仅99元/年、2核4G仅768元/3年,立即抢购>>>:9i0i.cn/qcloud

(福利推荐:你还在原价购买阿里云服务器?现在阿里云0.8折限时抢购活动来啦!4核8G企业云服务器仅2998元/3年,立即抢购>>>:9i0i.cn/aliyun

1.Map与Reduce过程

1.1 Map过程

首先,Hadoop会把输入数据划分成等长的输入分片(input split) 或分片发送到MapReduce。Hadoop为每个分片创建一个map任务,由它来运行用户自定义的map函数以分析每个分片中的记录。在我们的单词计数例子中,输入是多个文件,一般一个文件对应一个分片,如果文件太大则会划分为多个分片。map函数的输入以<key, value>形式做为输入,value为文件的每一行,key为该行在文件中的偏移量(一般我们会忽视)。这里map函数起到的作用为将每一行进行分词为多个word,并在context中写入<word, 1>以代表该单词出现一次。

map过程的示意图如下:

mapper代码编写如下:

public static class TokenizerMapper
        extends Mapper<Object, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        //每次处理一行,一个mapper里的value为一行,key为该行在文件中的偏移量
        StringTokenizer iter = new StringTokenizer(value.toString());
        while (iter.hasMoreTokens()) {
            word.set(iter.nextToken());
            // 向context中写入<word, 1>
            context.write(word, one);
            System.out.println(word);
        }
    }
}

如果我们能够并行处理分片(不一定是完全并行),且分片是小块的数据,那么处理过程将会有一个好的负载平衡。但是如果分片太小,那么管理分片与map任务创建将会耗费太多时间。对于大多数作业,理想分片大小为一个HDFS块的大小,默认是64MB。

map任务的执行节点和输入数据的存储节点相同时,Hadoop的性能能达到最佳,这就是计算机系统中所谓的data locality optimization(数据局部性优化)。而最佳分片大小与块大小相同的原因就在于,它能够保证一个分片存储在单个节点上,再大就不能了。

1.2 Reduce过程

接下来我们看reducer的编写。reduce任务的多少并不是由输入大小来决定,而是需要人工单独指定的(默认为1个)。和上面map不同的是,reduce任务不再具有本地读取的优势————一个reduce任务的输入往往来自于所有mapper的输出,因此map和reduce之间的数据流被称为 shuffle(洗牌) 。Hadoop会先按照key-value对进行排序,然后将排序好的map的输出通过网络传输到reduce任务运行的节点,并在那里进行合并,然后传递到用户定义的reduce函数中。

reduce 函数示意图如下:

reducer代码编写如下:

 public static class IntSumReducer
            extends Reducer<Text, IntWritable, Text, IntWritable>{
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

2.完整代码

2.1 项目架构

关于VSCode+Java+Maven+Hadoop开发环境搭建,可以参见我的博客《VSCode+Maven+Hadoop开发环境搭建》,此处不再赘述。这里展示我们的项目架构如下:

Word-Count-Hadoop
├─ input
│  ├─ file1
│  ├─ file2
│  └─ file3
├─ output
├─ pom.xml
├─ src
│  └─ main
│     └─ java
│        └─ WordCount.java
└─ target

WordCount.java代码如下:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount{
    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        //每次处理一行,一个mapper里的value为一行,key为该行在文件中的偏移量
            StringTokenizer iter = new StringTokenizer(value.toString());
            while (iter.hasMoreTokens()) {
                word.set(iter.nextToken());
                // 向context中写入<word, 1>
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer
            extends Reducer<Text, IntWritable, Text, IntWritable>{
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word_count");

        job.setJarByClass(WordCount.class);

        job.setMapperClass(TokenizerMapper.class);
        //此处的Combine操作意为即第每个mapper工作完了先局部reduce一下,最后再全局reduce
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //第0个参数是输入目录,第1个参数是输出目录
        //先判断output path是否存在,如果存在则删除
        Path path = new Path(args[1]);// 
        FileSystem fileSystem = path.getFileSystem(conf);
        if (fileSystem.exists(path)) {
            fileSystem.delete(path, true);
        }

        //设置输入目录和输出目录
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

pom.xml中记得配置Hadoop的依赖环境:

    ...
  <!-- 集中定义版本号 -->
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>17</maven.compiler.source>
    <maven.compiler.target>17</maven.compiler.target>
    <hadoop.version>3.3.1</hadoop.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <!-- 导入hadoop依赖环境 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-yarn-api</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
  </dependencies>
  ...
</project>

此外,因为我们的程序自带输入参数,我们还需要在VSCode的launch.json中配置输入参数intput(代表输入目录)和output(代表输出目录):

...
"args": [
    "input",
    "output"
],
...

编译运行完毕后,可以查看output文件夹下的part-r-00000文件:

David    1
Goodbye    1
Hello    3
Tom    1
World    2

可见我们的程序正确地完成了单词计数的功能。

以上就是Hadoop MapReduce实现单词计数(Word Count)的详细内容,更多关于Hadoop MapReduce的资料请关注程序员之家其它相关文章!

相关文章

  • spring集成redis cluster详解

    spring集成redis cluster详解

    这篇文章主要介绍了spring集成redis cluster详解,分享了maven依赖,Spring配置,增加connect-redis.properties 配置文件等相关内容,具有一定参考价值,需要的朋友可以了解下。
    2017-11-11
  • SpringBoot3+SpringSecurity6前后端分离的项目实践

    SpringBoot3+SpringSecurity6前后端分离的项目实践

    SpringSecurity6 的用法和以前版本的有较大差别,本文主要介绍了SpringBoot3+SpringSecurity6前后端分离的项目实践,具有一定的参考价值,感兴趣的可以了解一下
    2023-12-12
  • 使用springboot结合vue实现sso单点登录

    使用springboot结合vue实现sso单点登录

    这篇文章主要为大家详细介绍了如何使用springboot+vue实现sso单点登录,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-06-06
  • springboot DTO字符字段与日期字段的转换问题

    springboot DTO字符字段与日期字段的转换问题

    这篇文章主要介绍了springboot DTO字符字段与日期字段的转换问题,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-07-07
  • jsp中存取session值简单介绍

    jsp中存取session值简单介绍

    这篇文章主要介绍了jsp中存取session值简单介绍,涉及request和session的域操作等相关内容,具有一定参考价值,需要的朋友可以了解下。
    2017-11-11
  • idea远程Debug部署在服务器上的服务

    idea远程Debug部署在服务器上的服务

    在开发的时候我们通常在本地代码上debug程序,但是服务部署到了开发环境服务器上,如何远程调试,本文主要介绍了idea远程Debug部署在服务器上的服务,具有一定的参考价值,感兴趣的可以了解一下
    2023-12-12
  • Java 11 正式发布,这 8 个逆天新特性教你写出更牛逼的代码

    Java 11 正式发布,这 8 个逆天新特性教你写出更牛逼的代码

    美国当地时间9月25日,Oracle 官方宣布 Java 11 (18.9 LTS) 正式发布,可在生产环境中使用!这是自 Java 8 后的首个长期支持版本
    2018-09-09
  • SpringBoot整合RabbitMQ, 实现生产者与消费者的功能

    SpringBoot整合RabbitMQ, 实现生产者与消费者的功能

    这篇文章主要介绍了SpringBoot整合RabbitMQ, 实现生产者与消费者的功能,帮助大家更好得理解和学习使用SpringBoot框架,感兴趣的朋友可以了解下
    2021-03-03
  • Spring Boot整合mybatis使用注解实现动态Sql、参数传递等常用操作(实现方法)

    Spring Boot整合mybatis使用注解实现动态Sql、参数传递等常用操作(实现方法)

    这篇文章主要介绍了Spring Boot整合mybatis使用注解实现动态Sql、参数传递等常用操作(实现方法),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08
  • java内存分布实现代码

    java内存分布实现代码

    这篇文章主要介绍了浅谈Java内存区域划分和内存分配策略,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-07-07

最新评论

?


http://www.vxiaotou.com