本文主要对MapReduce框架对表之间的join操作的几种实现方式进行详细分析,并且根据我在实际开发过程中遇到的实际例子来进行进一步的说明。

【博文引荐】Hadoop中MapReduce多种join完成实例剖析(mapreduce实现join)  MapReduce hadoop 第1张

本博文出自51CTO博客 zengzhaozheng博主,有任何问题请进入博主页面互动评论!
博文地址:http://zengzhaozheng.blog.51cto.com/8219051/1392961

一、概述

关于RDBMS中的join操作大伙必定十分了解,写sql的时分要十分注意细节,稍有差池就会耗时巨久形成很大的功能瓶颈,而在Hadoop中运用MapReduce结构进行join的操作时相同耗时,可是因为hadoop的散布式规划理念的特殊性,因而关于这种join操作相同也具有了必定的特殊性。本文首要对MapReduce结构对表之间的join操作的几种完结办法进行详细分析,而且依据我在实践开发过程中遇到的实践例子来进行进一步的阐明。

二、完结原理

1、在Reudce端进行衔接。

在Reudce端进行衔接是MapReduce结构进行表之间join操作最为常见的形式,其详细的完结原理如下:

Map端的首要作业:为来自不同表(文件)的key/value对打标签以差异不同来历的记载。然后用衔接字段作为key,其他部分和新加的标志作为value,***进行输出。

reduce端的首要作业:在reduce端以衔接字段作为key的分组现已完结,咱们只需要在每一个分组傍边将那些来历于不同文件的记载(在map阶段现已打标志)分隔,***进行笛卡尔只就ok了。原理十分简略,下面来看一个实例:

(1)自定义一个value回来类型:

  1. packagecom.mr.reduceSizeJoin;
  2. importjava.io.DataInput;
  3. importjava.io.DataOutput;
  4. importjava.io.IOException;
  5. importorg.apache.hadoop.io.Text;
  6. importorg.apache.hadoop.io.WritableComparable;
  7. publicclassCombineValuesimplementsWritableComparable<CombineValues>{
  8. //privatestaticfinalLoggerlogger=LoggerFactory.getLogger(CombineValues.class);
  9. privateTextjoinKey;//链接关键字
  10. privateTextflag;//文件来历标志
  11. privateTextsecondPart;//除了链接键外的其他部分
  12. publicvoidsetJoinKey(TextjoinKey){
  13. this.joinKey=joinKey;
  14. }
  15. publicvoidsetFlag(Textflag){
  16. this.flag=flag;
  17. }
  18. publicvoidsetSecondPart(TextsecondPart){
  19. this.secondPart=secondPart;
  20. }
  21. publicTextgetFlag(){
  22. returnflag;
  23. }
  24. publicTextgetSecondPart(){
  25. returnsecondPart;
  26. }
  27. publicTextgetJoinKey(){
  28. returnjoinKey;
  29. }
  30. publicCombineValues(){
  31. this.joinKey=newText();
  32. this.flag=newText();
  33. this.secondPart=newText();
  34. }
  35. @Override
  36. publicvoidwrite(DataOutputout)throwsIOException{
  37. this.joinKey.write(out);
  38. this.flag.write(out);
  39. this.secondPart.write(out);
  40. }
  41. @Override
  42. publicvoidreadFields(DataInputin)throwsIOException{
  43. this.joinKey.readFields(in);
  44. this.flag.readFields(in);
  45. this.secondPart.readFields(in);
  46. }
  47. @Override
  48. publicintcompareTo(CombineValueso){
  49. returnthis.joinKey.compareTo(o.getJoinKey());
  50. }
  51. @Override
  52. publicStringtoString(){
  53. //TODOAuto-generatedmethodstub
  54. return"[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";
  55. }
  56. }

(2)map、reduce主体代码

  1. packagecom.mr.reduceSizeJoin;
  2. importjava.io.IOException;
  3. importjava.util.ArrayList;
  4. importorg.apache.hadoop.conf.Configuration;
  5. importorg.apache.hadoop.conf.Configured;
  6. importorg.apache.hadoop.fs.Path;
  7. importorg.apache.hadoop.io.Text;
  8. importorg.apache.hadoop.mapreduce.Job;
  9. importorg.apache.hadoop.mapreduce.Mapper;
  10. importorg.apache.hadoop.mapreduce.Reducer;
  11. importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. importorg.apache.hadoop.mapreduce.lib.input.FileSplit;
  13. importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  14. importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  16. importorg.apache.hadoop.util.Tool;
  17. importorg.apache.hadoop.util.ToolRunner;
  18. importorg.slf4j.Logger;
  19. importorg.slf4j.LoggerFactory;
  20. /**
  21. *@authorzengzhaozheng
  22. *用处阐明:
  23. *reudcesidejoin中的leftouterjoin
  24. *左衔接,两个文件别离代表2个表,衔接字段table1的id字段和table2的cityID字段
  25. *table1(左表):tb_dim_city(idint,namestring,orderidint,city_code,is_show)
  26. *tb_dim_city.dat文件内容,分隔符为"|":
  27. *idnameorderidcity_codeis_show
  28. *0其他999999990
  29. *1长春19011
  30. *2吉林29021
  31. *3四平39031
  32. *4松原49041
  33. *5通化59051
  34. *6辽源69061
  35. *7白城79071
  36. *8白山89081
  37. *9延吉99091
  38. *-------------------------风流的分割线-------------------------------
  39. *table2(右表):tb_user_profiles(userIDint,userNamestring,networkstring,doubleflow,cityIDint)
  40. *tb_user_profiles.dat文件内容,分隔符为"|":
  41. *userIDnetworkflowcityID
  42. *12G1231
  43. *23G3332
  44. *33G5551
  45. *42G7773
  46. *53G6664
  47. *
  48. *-------------------------风流的分割线-------------------------------
  49. *成果:
  50. *1长春1901112G123
  51. *1长春1901133G555
  52. *2吉林2902123G333
  53. *3四平3903142G777
  54. *4松原4904153G666
  55. */
  56. publicclassReduceSideJoin_LeftOuterJoinextendsConfiguredimplementsTool{
  57. privatestaticfinalLoggerlogger=LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);
  58. publicstaticclassLeftOutJoinMapperextendsMapper<Object,Text,Text,CombineValues>{
  59. privateCombineValuescombineValues=newCombineValues();
  60. privateTextflag=newText();
  61. privateTextjoinKey=newText();
  62. privateTextsecondPart=newText();
  63. @Override
  64. protectedvoidmap(Objectkey,Textvalue,Contextcontext)
  65. throwsIOException,InterruptedException{
  66. //取得文件输入途径
  67. StringpathName=((FileSplit)context.getInputSplit()).getPath().toString();
  68. //数据来自tb_dim_city.dat文件,标志即为"0"
  69. if(pathName.endsWith("tb_dim_city.dat")){
  70. String[]valueItems=value.toString().split("\\|");
  71. //过滤格局过错的记载
  72. if(valueItems.length!=5){
  73. return;
  74. }
  75. flag.set("0");
  76. joinKey.set(valueItems[0]);
  77. secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);
  78. combineValues.setFlag(flag);
  79. combineValues.setJoinKey(joinKey);
  80. combineValues.setSecondPart(secondPart);
  81. context.write(combineValues.getJoinKey(),combineValues);
  82. }//数据来自于tb_user_profiles.dat,标志即为"1"
  83. elseif(pathName.endsWith("tb_user_profiles.dat")){
  84. String[]valueItems=value.toString().split("\\|");
  85. //过滤格局过错的记载
  86. if(valueItems.length!=4){
  87. return;
  88. }
  89. flag.set("1");
  90. joinKey.set(valueItems[3]);
  91. secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);
  92. combineValues.setFlag(flag);
  93. combineValues.setJoinKey(joinKey);
  94. combineValues.setSecondPart(secondPart);
  95. context.write(combineValues.getJoinKey(),combineValues);
  96. }
  97. }
  98. }
  99. publicstaticclassLeftOutJoinReducerextendsReducer<Text,CombineValues,Text,Text>{
  100. //存储一个分组中的左表信息
  101. privateArrayList<Text>leftTable=newArrayList<Text>();
  102. //存储一个分组中的右表信息
  103. privateArrayList<Text>rightTable=newArrayList<Text>();
  104. privateTextsecondPar=null;
  105. privateTextoutput=newText();
  106. /**
  107. *一个分组调用一次reduce函数
  108. */
  109. @Override
  110. protectedvoidreduce(Textkey,Iterable<CombineValues>value,Contextcontext)
  111. throwsIOException,InterruptedException{
  112. leftTable.clear();
  113. rightTable.clear();
  114. /**
  115. *将分组中的元素依照文件别离进行寄存
  116. *这种办法要注意的问题:
  117. *假如一个分组内的元素太多的话,可能会导致在reduce阶段呈现OOM,
  118. *在处理散布式问题之前***先了解数据的散布状况,依据不同的散布采纳最
  119. *恰当的处理办法,这样能够有用的避免导致OOM和数据过度歪斜问题。
  120. */
  121. for(CombineValuescv:value){
  122. secondPar=newText(cv.getSecondPart().toString());
  123. //左表tb_dim_city
  124. if("0".equals(cv.getFlag().toString().trim())){
  125. leftTable.add(secondPar);
  126. }
  127. //右表tb_user_profiles
转载请说明出处
知优网 » 【博文引荐】Hadoop中MapReduce多种join完成实例剖析(mapreduce实现join)

发表评论

您需要后才能发表评论