Demo entry 6776904

hadoop

   

Submitted by anonymous on Dec 06, 2018 at 02:19
Language: Java. Code size: 7.9 kB.

import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.conf.Configured;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;  
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;  
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;  
import org.apache.hadoop.util.GenericOptionsParser;  
import org.apache.hadoop.util.Tool;  
import org.apache.hadoop.util.ToolRunner;  
  
public class MyNaiveBayesClassifier extends Configured implements Tool {  
    private static String[] otherArgs;  
  
  
    private void printUsage() {  
        System.out.println("Usage : <InputDataPath> <Output1> <Output2> <Output3> <TestDataPath> <Output4>" );  
    }  
  
    public static String[] getArgs(){return otherArgs;}  
  
    public int run(String[] args) throws Exception {  
        Configuration conf = new Configuration();  
        otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //当命令行输入Hadoop作业的运行参数时,则此命令可取出剩余命令行  
  
        if(otherArgs.length != 6) {  
            printUsage();  
            return 6;  
        }  
  
        FileSystem hdfs = FileSystem.get(conf);  
  
//检查输出目录是否存在,若存在,则删除  
        Path OutPath1 = new Path(otherArgs[1]);  
        Path OutPath2 = new Path(otherArgs[2]);  
        Path OutPath3 = new Path(otherArgs[3]);  
        Path OutPath4 = new Path(otherArgs[5]);  
        if(hdfs.exists(OutPath1))  
            hdfs.delete(OutPath1, true);  
        if(hdfs.exists(OutPath2))  
            hdfs.delete(OutPath2, true);  
        if(hdfs.exists(OutPath3))  
            hdfs.delete(OutPath3, true);  
        if(hdfs.exists(OutPath4))  
            hdfs.delete(OutPath4, true);  
  
//  1. MultiFileWordCount  
//  功能:负责提取所有文本中的单词  
//  输入:args[0],经过MyFileInputFormat的处理后,key为<文件路径>,value为文件内容<word1 word2...>  
//  输出:args[1],key为<类名:单词>,value为单词出现次数,即<<Class:word>,TotalCounts>  
  
        Job job1 = Job.getInstance(conf, "MultiFileWordCount");  
        job1.setJarByClass(MyNaiveBayesClassifier.class);  
        job1.setInputFormatClass(MyFileInputFormat.class);  
        //将输出设置为Sequence类型,方便下一个Job读取处理  
        job1.setOutputFormatClass(SequenceFileOutputFormat.class);  
        job1.setOutputKeyClass(Text.class);  
        job1.setOutputValueClass(IntWritable.class);  
        job1.setMapperClass(MapsAndReduces.MultiFileWordCountMap.class);  
        job1.setCombinerClass(IntSumReducer.class);  
        job1.setReducerClass(IntSumReducer.class);  
  
        job1.setMapOutputKeyClass(Text.class);//map阶段的输出的key  
        job1.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value  
        job1.setOutputKeyClass(Text.class);//reduce阶段的输出的key  
        job1.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value  
  
        FileInputFormat.addInputPaths(job1, otherArgs[0]);  
        FileOutputFormat.setOutputPath(job1, OutPath1);  
  
        //加入控制容器  
        ControlledJob ctrljob1 = new  ControlledJob(conf);  
        ctrljob1.setJob(job1);  
  
  
  
//  2. ClassWordCount  
//  功能:得到每个类的单词总数  
//  输入:args[1],输入格式为<<class:word>,counts>  
//  输出:args[2],输出key为类名,value为单词总数.格式为<class,WordCount>  
  
        Job job2 = Job.getInstance(conf, "ClassWordCount");  
        job2.setJarByClass(MyNaiveBayesClassifier.class);  
        job2.setInputFormatClass(SequenceFileInputFormat.class);  
        job2.setOutputFormatClass(SequenceFileOutputFormat.class);  
        job2.setMapperClass(MapsAndReduces.ClassWordCountMap.class);  
        job2.setMapOutputKeyClass(Text.class);  
        job2.setMapOutputValueClass(IntWritable.class);  
        job2.setReducerClass(MapsAndReduces.ClassWordCountReduce.class);  
        job2.setOutputKeyClass(Text.class);  
        job2.setOutputValueClass(IntWritable.class);  
        //加入控制容器  
        ControlledJob ctrljob2 = new ControlledJob(conf);  
        ctrljob2.setJob(job2);  
        //job2的输入输出文件路径  
        FileInputFormat.addInputPath(job2, new Path(otherArgs[1] + "/part-r-00000"));  
        FileOutputFormat.setOutputPath(job2, OutPath2);  
  
//  3. ExistingWord  
//  功能:得到训练集中出现过的单词  
//  输入:args[1],输入格式为<<class,word>,counts>  
//  输出:args[3],输出key为出现过的单词,value为1.格式为<word,one>  
  
        Job job3 = Job.getInstance(conf, "ExistingWord");  
        job3.setJarByClass(MyNaiveBayesClassifier.class);  
        job3.setInputFormatClass(SequenceFileInputFormat.class);  
        job3.setOutputFormatClass(SequenceFileOutputFormat.class);  
        job3.setMapperClass(MapsAndReduces.ExistingWordMap.class);  
        job3.setMapOutputKeyClass(Text.class);  
        job3.setMapOutputValueClass(IntWritable.class);  
//      job3.setCombinerClass(MapsAndReduces.ExistingWordReduce.class);  
        job3.setReducerClass(MapsAndReduces.ExistingWordReduce.class);  
        job3.setOutputKeyClass(Text.class);  
        job3.setOutputValueClass(IntWritable.class);  
        //加入控制容器  
        ControlledJob ctrljob3 = new ControlledJob(conf);  
        ctrljob3.setJob(job3);  
        //job3的输入输出文件路径  
        FileInputFormat.addInputPath(job3, new Path(otherArgs[1] + "/part-r-00000"));  
        FileOutputFormat.setOutputPath(job3, OutPath3);  
  
//  4. Classify  
//  功能:根据得到的贝叶斯网络,对文档进行分类  
//  输入:args[4],测试文件的路径,经过InputFormat处理后的数据格式为<Path, Context ...>  
//  输出:args[5],输出每一份文档经贝叶斯分类后所对应的类,格式为<doc,class>  
  
        Job job4 = Job.getInstance(conf, "Classify");  
        job4.setJarByClass(MyNaiveBayesClassifier.class);  
        job4.setInputFormatClass(MyFileInputFormat.class);  
        job4.setOutputFormatClass(SequenceFileOutputFormat.class);  
        job4.setMapperClass(MapsAndReduces.ClassifyMap.class);  
        job4.setMapOutputKeyClass(Text.class);  
        job4.setMapOutputValueClass(Text.class);  
        job4.setReducerClass(MapsAndReduces.ClassifyReduce.class);  
        job4.setOutputKeyClass(Text.class);  
        job4.setOutputValueClass(Text.class);  
        //加入控制容器  
        ControlledJob ctrljob4 = new ControlledJob(conf);  
        ctrljob4.setJob(job4);  
        //job4的输入输出文件路径  
        FileInputFormat.addInputPath(job4, new Path(otherArgs[4]));  
        FileOutputFormat.setOutputPath(job4, OutPath4);  
  
        //作业之间依赖关系  
        ctrljob2.addDependingJob(ctrljob1);  
        ctrljob3.addDependingJob(ctrljob1);  
        ctrljob4.addDependingJob(ctrljob3);  
        ctrljob4.addDependingJob(ctrljob2);  
  
        //主的控制容器,控制上面的子作业  
        JobControl jobCtrl = new JobControl("NaiveBayes");  
        //添加到总的JobControl里,进行控制  
        jobCtrl.addJob(ctrljob1);  
        jobCtrl.addJob(ctrljob2);  
        jobCtrl.addJob(ctrljob3);  
        jobCtrl.addJob(ctrljob4);  
  
        //在线程启动,记住一定要有这个  
        Thread  theController = new Thread(jobCtrl);  
        theController.start();  
        while(true){  
            if(jobCtrl.allFinished()){  
                //如果作业成功完成,就打印成功作业的信息  
                System.out.println(jobCtrl.getSuccessfulJobList());  
                jobCtrl.stop();  
                return 0;  
            }  
        }  
    }  
  
    public static void main(String[] args) throws Exception {  
        int ret = ToolRunner.run(new MyNaiveBayesClassifier(), args);  
        System.exit(ret);  
    }  
}  

This snippet took 0.01 seconds to highlight.

Back to the Entry List or Home.

Delete this entry (admin only).