云计算实验:Java?MapReduce编程

 更新时间:2022年01月25日 09:34:26   作者:wow_awsl_qwq  
这篇文章主要介绍了云计算实验:Java?MapReduce编程,?居于Java围绕MapReduce编程展开详细内容,文章助大家掌握MapReduce编程,理解MapReduce原理,需要的朋友可以参考一下
(福利推荐:【腾讯云】服务器最新限时优惠活动,云服务器1核2G仅99元/年、2核4G仅768元/3年,立即抢购>>>:9i0i.cn/qcloud

(福利推荐:你还在原价购买阿里云服务器?现在阿里云0.8折限时抢购活动来啦!4核8G企业云服务器仅2998元/3年,立即抢购>>>:9i0i.cn/aliyun

实验题目:

MapReduce:编程

实验内容:

本实验利用 Hadoop 提供的 Java API 进行编程进行 MapReduce 编程。

实验目标:

  • 掌握MapReduce编程。
  • 理解MapReduce原理

【实验作业】简单流量统计

有如下这样的日志文件:

13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230513 00-FD-07-A4-72-B8:CMCC 120.196.40.8 i02.c.aliimg.com 248 0 200
13826230523 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230533 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230543 00-FD-07-A4-72-B8:CMCC 120.196.100.82 Video website 1527 2106 200
13926230553 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13826230563 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13926230573 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
18912688533 00-FD-07-A4-72-B8:CMCC 220.196.100.82 Integrated portal 1938 2910 200
18912688533 00-FD-07-A4-72-B8:CMCC 220.196.100.82 i02.c.aliimg.com 3333 21321 200
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 Search Engines 9531 9531 200
13826230523 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200

该日志文件记录了每个手机用户在一段时间内的网络流量信息,具体字段含义为:

手机号码 MAC地址 IP地址 域名 上行流量(字节数) 下行流量(字节数) 套餐类型
根据以上日志,统计出每个手机用户在该时间段内的总流量(上行流量+下行流量),统计结果的格式为:

手机号码 字节数量

实验结果:

实验代码:

WcMap.java

import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

        public class WcMap extends Mapper<LongWritable, Text, Text, LongWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
                    String str = value.toString();
                    String[] words = StringUtils.split(str," ",10);
                    int i=0;
                    for(String word : words){
                        if(i==words.length-2||i==words.length-3)
                        context.write(new Text(words[0]), new LongWritable(Integer.parseInt(word)));
                        i++;
                    }
            }
        }

WcReduce.java

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WcReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,Context context)
            throws IOException, InterruptedException {
        long count = 0;
        for(LongWritable value : values){
            count += value.get();
        }
        context.write(key, new LongWritable(count));
    }
}

WcRunner.java

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Scanner;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;

public class WcRunner{
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(WcRunner.class);
        
        job.setMapperClass(WcMap.class);
        job.setReducerClass(WcReduce.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        Scanner sc = new Scanner(System.in);
        System.out.print("inputPath:");
        String inputPath = sc.next();
        System.out.print("outputPath:");
        String outputPath = sc.next();

        try {
            FileSystem fs0 = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
            Path hdfsPath = new Path(outputPath);
            fs0.copyFromLocalFile(new Path("/headless/Desktop/workspace/mapreduce/WordCount/data/1.txt"),new Path("/mapreduce/WordCount/input/1.txt"));
            if(fs0.delete(hdfsPath,true)){
                System.out.println("Directory "+ outputPath +" has been deleted successfully!");
            }
        }catch(Exception e) {
            e.printStackTrace();
        }
        FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000"+inputPath));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000"+outputPath));
        job.waitForCompletion(true);
        try {
            FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
            Path srcPath = new Path(outputPath+"/part-r-00000");

            FSDataInputStream is = fs.open(srcPath);
            System.out.println("Results:");
            while(true) {
                String line = is.readLine();
                if(line == null) {
                    break;
                }
                System.out.println(line);
            }
            is.close();
        }catch(Exception e) {
            e.printStackTrace();
        }
    }
}

【实验作业】索引倒排输出行号

在索引倒排实验中,我们可以得到每个单词分布在哪些文件中,以及在每个文件中出现的次数,修改以上实现,在输出的倒排索引结果中可以得到每个单词在每个文件中的具体行号信息。输出结果的格式如下:
单词 文件名:行号,文件名:行号,文件名:行号

实验结果:

MapReduce在3.txt的第一行出现了两次所以有两个1

import java.io.*;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class MyMapper extends Mapper<Object,Text,Text,Text>{
    private Text keyInfo = new Text();
    private Text valueInfo = new Text();
    private FileSplit split;
    int num=0;

    public void map(Object key,Text value,Context context)
            throws IOException,InterruptedException{
        num++;
        split = (FileSplit)context.getInputSplit();
        StringTokenizer itr = new StringTokenizer(value.toString());
        while(itr.hasMoreTokens()){
            keyInfo.set(itr.nextToken()+" "+split.getPath().getName().toString());
            valueInfo.set(num+"");
            context.write(keyInfo,valueInfo);
        }
    }
}



import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;

public class MyCombiner extends Reducer<Text,Text,Text,Text>{

    private Text info = new Text();

    public void reduce(Text key,Iterable<Text>values,Context context)
            throws IOException, InterruptedException{
        String  sum = "";
        for(Text value:values){
            sum += value.toString()+" ";
        }

                String record = key.toString();
        String[] str = record.split(" ");

        key.set(str[0]);
        info.set(str[1]+":"+sum);
        context.write(key,info);
    }
}

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text,Text,Text,Text>{
    private Text result = new Text();
    public void reduce(Text key,Iterable<Text>values,Context context) throws

            IOException, InterruptedException{
        String value =new String();
        for(Text value1:values){
            value += value1.toString()+" ; ";
        }
        result.set(value);
        context.write(key,result);
    }
}

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Scanner;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;

public class MyRunner {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setJarByClass(MyRunner.class);

        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        job.setCombinerClass(MyCombiner.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);


        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        Scanner sc = new Scanner(System.in);
        System.out.print("inputPath:");
        String inputPath = sc.next();
        System.out.print("outputPath:");
        String outputPath = sc.next();

        try {
            FileSystem fs0 = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
            Path hdfsPath = new Path(outputPath);
            if(fs0.delete(hdfsPath,true)){
                System.out.println("Directory "+ outputPath +" has been deleted successfully!");
            }
        }catch(Exception e) {
            e.printStackTrace();
        }

        FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000"+inputPath));

        FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000"+outputPath));

        job.waitForCompletion(true);

        try {
            FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
            Path srcPath = new Path(outputPath+"/part-r-00000");

            FSDataInputStream is = fs.open(srcPath);
            System.out.println("Results:");
            while(true) {
                String line = is.readLine();
                if(line == null) {
                    break;
                }
                System.out.println(line);
            }
            is.close();
        }catch(Exception e) {
            e.printStackTrace();
        }

    }
}

到此这篇关于云计算实验:Java MapReduce编程的文章就介绍到这了,更多相关Java MapReduce编程内容请搜索程序员之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持程序员之家!

相关文章

  • spring boot自带图片服务器使用详解

    spring boot自带图片服务器使用详解

    这篇文章主要为大家详细介绍了spring boot自带图片服务器的使用 ,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-08-08
  • shiro并发人数登录控制的实现代码

    shiro并发人数登录控制的实现代码

    在做项目中遇到这样的需求要求每个账户同时只能有一个人登录或几个人同时登录,如果是同时登录的多人,要么不让后者登录,要么踢出前者登录,怎么实现这样的功能呢?下面小编给大家带来了shiro并发人数登录控制的实现代码,一起看看吧
    2017-09-09
  • 详解Java中StringBuffer类常用方法

    详解Java中StringBuffer类常用方法

    这篇文章主要为大家介绍了java中StringBuffer类常用方法
    2016-01-01
  • spring.datasource.schema配置详解

    spring.datasource.schema配置详解

    本文主要介绍了spring.datasource.schema配置,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-05-05
  • java异常:异常处理--try-catch结构详解

    java异常:异常处理--try-catch结构详解

    今天小编就为大家分享一篇关于Java异常处理之try...catch...finally详解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2021-09-09
  • SpringBoot集成使用Redis及搭建过程

    SpringBoot集成使用Redis及搭建过程

    jackson-json 工具提供了 javabean 与 json 之 间的转换能力,可以将 pojo 实例序列化成 json 格式存储在 redis 中,也可以将 json 格式的数据转换成 pojo 实例,本文给大家介绍SpringBoot集成使用Redis及搭建过程,感兴趣的朋友一起看看吧
    2022-01-01
  • Java使用agent实现main方法之前的实例详解

    Java使用agent实现main方法之前的实例详解

    这篇文章主要介绍了Java使用agent实现main方法之前的实例详解的相关资料,希望通过本文能帮助到大家,让大家理解这部分内容,需要的朋友可以参考下
    2017-10-10
  • java二维数组实现推箱子小游戏

    java二维数组实现推箱子小游戏

    这篇文章主要为大家详细介绍了java二维数组实现推箱子小游戏,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-11-11
  • mybatisplus如何解决分页最多500条数据

    mybatisplus如何解决分页最多500条数据

    这篇文章主要介绍了mybatisplus如何解决分页最多500条数据的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-07-07
  • RocketMQ消息积压原因及处理方法

    RocketMQ消息积压原因及处理方法

    RocketMQ是一种可靠的、可扩展的消息中间件,广泛应用于分布式系统中的消息通信,然而,在高并发的场景下,由于消息产生速度超过消费速度,可能会导致消息积压的问题,本文将介绍RocketMQ消息积压的原因和如何处理积压问题
    2023-06-06

最新评论

?


http://www.vxiaotou.com