hadoop - Facing an error when using TotalOrderPartitioner MapReduce -


i have written below program. have run without using totalorderpartitioner , has run well. don't think there issues mapper or reducer class such.

but when include code totalorderpartitioner i.e. write partition file , put in distributedcache, getting following error: clueless how go it.

[train@sandbox totalorderpartitioner]$ hadoop jar totalorderpart.jar average.averagejob counties totpart

//counties input directory , totpart output directory

16/01/18 04:14:00 info input.fileinputformat: total input paths process : 4 16/01/18 04:14:00 info partition.inputsampler: using 6 samples 16/01/18 04:14:00 info zlib.zlibfactory: loaded & initialized native-zlib library 16/01/18 04:14:00 info compress.codecpool: got brand-new compressor [.deflate] java.io.ioexception: wrong key class: org.apache.hadoop.io.longwritable not class org.apache.hadoop.io.text @ org.apache.hadoop.io.sequencefile$recordcompresswriter.append(sequencefile.java:1380) @ org.apache.hadoop.mapreduce.lib.partition.inputsampler.writepartitionfile(inputsampler.java:340) @ average.averagejob.run(averagejob.java:132) @ org.apache.hadoop.util.toolrunner.run(toolrunner.java:70) @ average.averagejob.main(averagejob.java:146) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:39) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:25) @ java.lang.reflect.method.invoke(method.java:597) @ org.apache.hadoop.util.runjar.main(runjar.java:212)

my code

package average;  import java.io.ioexception; import java.net.uri;  import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.fs.path; import org.apache.hadoop.util.stringutils; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.lib.input.textinputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.partition.inputsampler; import org.apache.hadoop.mapreduce.lib.partition.totalorderpartitioner; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;   public class averagejob extends configured implements tool {  public enum counters {map, comine, reduce};  public static class averagemapper extends mapper<longwritable, text, text, text> {      private text mapoutputkey = new text();     private text mapoutputvalue = new text();     @override     protected void map(longwritable key, text value, context context)             throws ioexception, interruptedexception {          string[] words = stringutils.split(value.tostring(), '\\', ',');         mapoutputkey.set(words[1].trim());          stringbuilder movalue = new stringbuilder();         movalue.append(words[9].trim()).append(",1");         mapoutputvalue.set(movalue.tostring());         context.write(mapoutputkey, mapoutputvalue);          context.getcounter(counters.map).increment(1);     } }  public static class averagecombiner extends reducer<text, text, text, text> {      private text combineroutputvalue = new text();      @override     protected void reduce(text key, iterable<text> values, context context)             throws ioexception, interruptedexception {          int count=0;         long sum=0;         for(text value: values)         {             string[] strvalues = stringutils.split(value.tostring(), ',');              sum+= long.parselong(strvalues[0]);             count+= integer.parseint(strvalues[1]);         }         combineroutputvalue.set(sum + "," + count);         context.write(key, combineroutputvalue);          context.getcounter(counters.comine).increment(1);     } }   public static class averagereducer extends reducer<text, text, text, doublewritable> {       private doublewritable reduceoutputkey = new doublewritable();      @override     protected void reduce(text key, iterable<text> values, context context)             throws ioexception, interruptedexception {          int count=0;         double sum=0;         for(text value: values)         {             string[] strvalues = stringutils.split(value.tostring(), ',');             sum+= double.parsedouble(strvalues[0]);             count+= integer.parseint(strvalues[1]);         }          reduceoutputkey.set(sum/count);         context.write(key, reduceoutputkey);          context.getcounter(counters.reduce).increment(1);     }  }   @override public int run(string[] args) throws exception {      configuration conf = getconf();     job job = job.getinstance(conf);     job.setjarbyclass(getclass());      path in = new path(args[0]);     path out = new path(args[1]);     fileinputformat.setinputpaths(job, in);     fileoutputformat.setoutputpath(job, out);      job.setinputformatclass(textinputformat.class);     job.setoutputformatclass(textoutputformat.class);      job.setmapoutputkeyclass(text.class);     job.setmapoutputvalueclass(text.class);      job.setoutputkeyclass(text.class);     job.setoutputvalueclass(doublewritable.class);      job.setmapperclass(averagemapper.class);     job.setcombinerclass(averagecombiner.class);      job.setpartitionerclass(totalorderpartitioner.class);      job.setreducerclass(averagereducer.class);      job.setnumreducetasks(6);      inputsampler.sampler<text, text> sampler = new inputsampler.randomsampler<text, text>(0.2, 6, 5);     inputsampler.writepartitionfile(job, sampler);      string partitionfile = totalorderpartitioner.getpartitionfile(conf);     uri partitionuri = new uri(partitionfile + "#" + totalorderpartitioner.default_path);     job.addcachefile(partitionuri);      return job.waitforcompletion(true)?0:1; }  public static void main(string[] args) {      int result=0;     try     {         result = toolrunner.run(new configuration(), new averagejob(), args);         system.exit(result);     }     catch (exception e)     {         e.printstacktrace();                 } } } 

totalorderpartitioner not run sampling on output of mapper, on input dataset. input format has longwritable key , text value. instead, trying call randomsampler claiming format has text key , text value. mismatch inputsampler finds when runs, hence message

wrong key class: org.apache.hadoop.io.longwritable not class org.apache.hadoop.io.text

meaning trying find text key (based on parametrization) found longwritable instead.


Comments

Popular posts from this blog

sql - VB.NET Operand type clash: date is incompatible with int error -

SVG stroke-linecap doesn't work for circles in Firefox? -

python - TypeError: Scalar value for argument 'color' is not numeric in openCV -