hadoop如何自定义格式化输出
短信预约 -IT技能 免费直播动态提醒
这篇文章给大家分享的是有关hadoop如何自定义格式化输出的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
import java.io.IOException;import java.net.URI;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CustomizeOutputFormat {static final Log LOG = LogFactory.getLog(CustomizeOutputFormat.class);public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(CustomizeOutputFormat.class);job.setMapperClass(CustMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//此处这只自定义的格式化输出job.setOutputFormatClass(CustOutputFormat.class);String jobName = "Customize outputformat test!";job.setJobName(jobName);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean b = job.waitForCompletion(true);if(b) {LOG.info("Job "+ jobName +" is done.");}else {LOG.info("Job "+ jobName +"is going wrong,now exit.");System.exit(0);}}}class CustMapper extends Mapper<LongWritable, Text, Text, Text>{String[] textIn = null;Text outkey = new Text();Text outvalue = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {textIn = value.toString().split("\t");outkey.set(textIn[0]);outvalue.set(textIn[1]);context.write(outkey, outvalue);}}//自定义OutoutFormatclass CustOutputFormat extends FileOutputFormat<Text, Text>{@Overridepublic RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {//获得configrationConfiguration conf = context.getConfiguration();//获得FileSystemFileSystem fs = FileSystem.newInstance(conf);//获得输出路径Path path = CustOutputFormat.getOutputPath(context);URI uri = path.toUri();//创建两个文件,得到写入流FSDataOutputStream foa = fs.create(new Path(uri.toString()+"/out.a"));FSDataOutputStream fob = fs.create(new Path(uri.toString()+"/out.b"));//创建自定义RecordWriter 传入 两个流CustRecordWriter rw = new CustRecordWriter(foa,fob);return rw;}class CustRecordWriter extends RecordWriter<Text, Text>{ FSDataOutputStream foa = null; FSDataOutputStream fob = null;CustRecordWriter(FSDataOutputStream foa,FSDataOutputStream fob){this.foa = foa;this.fob = fob;}@Overridepublic void write(Text key, Text value) throws IOException, InterruptedException {String mText = key.toString();//根据可以长度的不同分别输入到不同的文件if(mText.length()>=5) {foa.writeUTF(mText+"\t"+value.toString()+"\n");}else {fob.writeUTF(mText+"\t"+value.toString()+"\n");}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {//最后将两个写入流关闭if(foa!=null) {foa.close();}if(fob!=null) {fob.close();}}}}//使用MultipleInputs,c处理多个来源的文件package hgs.multipuleinput;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import hgs.custsort.SortBean;import hgs.custsort.SortDriver;import hgs.custsort.SortMapper;import hgs.custsort.SortReducer;public class MultipuleInputDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SortDriver.class);job.setMapperClass(SortMapper.class);job.setReducerClass(SortReducer.class);job.setOutputKeyClass(SortBean.class);job.setOutputValueClass(NullWritable.class);MultipleInputs.addInputPath(job, new Path("/sort"), TextInputFormat.class,SortMapper.class);MultipleInputs.addInputPath(job, new Path("/sort1"), TextInputFormat.class,SortMapper.class);//FileInputFormat.setInputPaths(job, new Path("/sort"));FileOutputFormat.setOutputPath(job, new Path("/sortresult"));System.exit(job.waitForCompletion(true)==true?0:1);}}
感谢各位的阅读!关于“hadoop如何自定义格式化输出”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341