hadoop - Facing an error when using TotalOrderPartitioner MapReduce -
i have written below program. have run without using totalorderpartitioner , has run well. don't think there issues mapper or reducer class such.
but when include code totalorderpartitioner i.e. write partition file , put in distributedcache, getting following error: clueless how go it.
[train@sandbox totalorderpartitioner]$ hadoop jar totalorderpart.jar average.averagejob counties totpart
//counties input directory , totpart output directory
16/01/18 04:14:00 info input.fileinputformat: total input paths process : 4 16/01/18 04:14:00 info partition.inputsampler: using 6 samples 16/01/18 04:14:00 info zlib.zlibfactory: loaded & initialized native-zlib library 16/01/18 04:14:00 info compress.codecpool: got brand-new compressor [.deflate] java.io.ioexception: wrong key class: org.apache.hadoop.io.longwritable not class org.apache.hadoop.io.text @ org.apache.hadoop.io.sequencefile$recordcompresswriter.append(sequencefile.java:1380) @ org.apache.hadoop.mapreduce.lib.partition.inputsampler.writepartitionfile(inputsampler.java:340) @ average.averagejob.run(averagejob.java:132) @ org.apache.hadoop.util.toolrunner.run(toolrunner.java:70) @ average.averagejob.main(averagejob.java:146) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:39) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:25) @ java.lang.reflect.method.invoke(method.java:597) @ org.apache.hadoop.util.runjar.main(runjar.java:212)
my code
package average; import java.io.ioexception; import java.net.uri; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.fs.path; import org.apache.hadoop.util.stringutils; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.lib.input.textinputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.partition.inputsampler; import org.apache.hadoop.mapreduce.lib.partition.totalorderpartitioner; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; public class averagejob extends configured implements tool { public enum counters {map, comine, reduce}; public static class averagemapper extends mapper<longwritable, text, text, text> { private text mapoutputkey = new text(); private text mapoutputvalue = new text(); @override protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception { string[] words = stringutils.split(value.tostring(), '\\', ','); mapoutputkey.set(words[1].trim()); stringbuilder movalue = new stringbuilder(); movalue.append(words[9].trim()).append(",1"); mapoutputvalue.set(movalue.tostring()); context.write(mapoutputkey, mapoutputvalue); context.getcounter(counters.map).increment(1); } } public static class averagecombiner extends reducer<text, text, text, text> { private text combineroutputvalue = new text(); @override protected void reduce(text key, iterable<text> values, context context) throws ioexception, interruptedexception { int count=0; long sum=0; for(text value: values) { string[] strvalues = stringutils.split(value.tostring(), ','); sum+= long.parselong(strvalues[0]); count+= integer.parseint(strvalues[1]); } combineroutputvalue.set(sum + "," + count); context.write(key, combineroutputvalue); context.getcounter(counters.comine).increment(1); } } public static class averagereducer extends reducer<text, text, text, doublewritable> { private doublewritable reduceoutputkey = new doublewritable(); @override protected void reduce(text key, iterable<text> values, context context) throws ioexception, interruptedexception { int count=0; double sum=0; for(text value: values) { string[] strvalues = stringutils.split(value.tostring(), ','); sum+= double.parsedouble(strvalues[0]); count+= integer.parseint(strvalues[1]); } reduceoutputkey.set(sum/count); context.write(key, reduceoutputkey); context.getcounter(counters.reduce).increment(1); } } @override public int run(string[] args) throws exception { configuration conf = getconf(); job job = job.getinstance(conf); job.setjarbyclass(getclass()); path in = new path(args[0]); path out = new path(args[1]); fileinputformat.setinputpaths(job, in); fileoutputformat.setoutputpath(job, out); job.setinputformatclass(textinputformat.class); job.setoutputformatclass(textoutputformat.class); job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(text.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(doublewritable.class); job.setmapperclass(averagemapper.class); job.setcombinerclass(averagecombiner.class); job.setpartitionerclass(totalorderpartitioner.class); job.setreducerclass(averagereducer.class); job.setnumreducetasks(6); inputsampler.sampler<text, text> sampler = new inputsampler.randomsampler<text, text>(0.2, 6, 5); inputsampler.writepartitionfile(job, sampler); string partitionfile = totalorderpartitioner.getpartitionfile(conf); uri partitionuri = new uri(partitionfile + "#" + totalorderpartitioner.default_path); job.addcachefile(partitionuri); return job.waitforcompletion(true)?0:1; } public static void main(string[] args) { int result=0; try { result = toolrunner.run(new configuration(), new averagejob(), args); system.exit(result); } catch (exception e) { e.printstacktrace(); } } }
totalorderpartitioner not run sampling on output of mapper, on input dataset. input format has longwritable key , text value. instead, trying call randomsampler claiming format has text key , text value. mismatch inputsampler finds when runs, hence message
wrong key class: org.apache.hadoop.io.longwritable not class org.apache.hadoop.io.text
meaning trying find text key (based on parametrization) found longwritable instead.
Comments
Post a Comment