本文主要对MapReduce框架对表之间的join操作的几种实现方式进行详细分析,并且根据我在实际开发过程中遇到的实际例子来进行进一步的说明。
本博文出自51CTO博客 zengzhaozheng博主,有任何问题请进入博主页面互动评论! 博文地址:http://zengzhaozheng.blog.51cto.com/8219051/1392961 |
一、概述
关于RDBMS中的join操作大伙必定十分了解,写sql的时分要十分注意细节,稍有差池就会耗时巨久形成很大的功能瓶颈,而在Hadoop中运用MapReduce结构进行join的操作时相同耗时,可是因为hadoop的散布式规划理念的特殊性,因而关于这种join操作相同也具有了必定的特殊性。本文首要对MapReduce结构对表之间的join操作的几种完结办法进行详细分析,而且依据我在实践开发过程中遇到的实践例子来进行进一步的阐明。
二、完结原理
1、在Reudce端进行衔接。
在Reudce端进行衔接是MapReduce结构进行表之间join操作最为常见的形式,其详细的完结原理如下:
Map端的首要作业:为来自不同表(文件)的key/value对打标签以差异不同来历的记载。然后用衔接字段作为key,其他部分和新加的标志作为value,***进行输出。
reduce端的首要作业:在reduce端以衔接字段作为key的分组现已完结,咱们只需要在每一个分组傍边将那些来历于不同文件的记载(在map阶段现已打标志)分隔,***进行笛卡尔只就ok了。原理十分简略,下面来看一个实例:
(1)自定义一个value回来类型:
- packagecom.mr.reduceSizeJoin;
- importjava.io.DataInput;
- importjava.io.DataOutput;
- importjava.io.IOException;
- importorg.apache.hadoop.io.Text;
- importorg.apache.hadoop.io.WritableComparable;
- publicclassCombineValuesimplementsWritableComparable<CombineValues>{
- //privatestaticfinalLoggerlogger=LoggerFactory.getLogger(CombineValues.class);
- privateTextjoinKey;//链接关键字
- privateTextflag;//文件来历标志
- privateTextsecondPart;//除了链接键外的其他部分
- publicvoidsetJoinKey(TextjoinKey){
- this.joinKey=joinKey;
- }
- publicvoidsetFlag(Textflag){
- this.flag=flag;
- }
- publicvoidsetSecondPart(TextsecondPart){
- this.secondPart=secondPart;
- }
- publicTextgetFlag(){
- returnflag;
- }
- publicTextgetSecondPart(){
- returnsecondPart;
- }
- publicTextgetJoinKey(){
- returnjoinKey;
- }
- publicCombineValues(){
- this.joinKey=newText();
- this.flag=newText();
- this.secondPart=newText();
- }
- @Override
- publicvoidwrite(DataOutputout)throwsIOException{
- this.joinKey.write(out);
- this.flag.write(out);
- this.secondPart.write(out);
- }
- @Override
- publicvoidreadFields(DataInputin)throwsIOException{
- this.joinKey.readFields(in);
- this.flag.readFields(in);
- this.secondPart.readFields(in);
- }
- @Override
- publicintcompareTo(CombineValueso){
- returnthis.joinKey.compareTo(o.getJoinKey());
- }
- @Override
- publicStringtoString(){
- //TODOAuto-generatedmethodstub
- return"[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";
- }
- }
(2)map、reduce主体代码
- packagecom.mr.reduceSizeJoin;
- importjava.io.IOException;
- importjava.util.ArrayList;
- importorg.apache.hadoop.conf.Configuration;
- importorg.apache.hadoop.conf.Configured;
- importorg.apache.hadoop.fs.Path;
- importorg.apache.hadoop.io.Text;
- importorg.apache.hadoop.mapreduce.Job;
- importorg.apache.hadoop.mapreduce.Mapper;
- importorg.apache.hadoop.mapreduce.Reducer;
- importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- importorg.apache.hadoop.mapreduce.lib.input.FileSplit;
- importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- importorg.apache.hadoop.util.Tool;
- importorg.apache.hadoop.util.ToolRunner;
- importorg.slf4j.Logger;
- importorg.slf4j.LoggerFactory;
- /**
- *@authorzengzhaozheng
- *用处阐明:
- *reudcesidejoin中的leftouterjoin
- *左衔接,两个文件别离代表2个表,衔接字段table1的id字段和table2的cityID字段
- *table1(左表):tb_dim_city(idint,namestring,orderidint,city_code,is_show)
- *tb_dim_city.dat文件内容,分隔符为"|":
- *idnameorderidcity_codeis_show
- *0其他999999990
- *1长春19011
- *2吉林29021
- *3四平39031
- *4松原49041
- *5通化59051
- *6辽源69061
- *7白城79071
- *8白山89081
- *9延吉99091
- *-------------------------风流的分割线-------------------------------
- *table2(右表):tb_user_profiles(userIDint,userNamestring,networkstring,doubleflow,cityIDint)
- *tb_user_profiles.dat文件内容,分隔符为"|":
- *userIDnetworkflowcityID
- *12G1231
- *23G3332
- *33G5551
- *42G7773
- *53G6664
- *
- *-------------------------风流的分割线-------------------------------
- *成果:
- *1长春1901112G123
- *1长春1901133G555
- *2吉林2902123G333
- *3四平3903142G777
- *4松原4904153G666
- */
- publicclassReduceSideJoin_LeftOuterJoinextendsConfiguredimplementsTool{
- privatestaticfinalLoggerlogger=LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);
- publicstaticclassLeftOutJoinMapperextendsMapper<Object,Text,Text,CombineValues>{
- privateCombineValuescombineValues=newCombineValues();
- privateTextflag=newText();
- privateTextjoinKey=newText();
- privateTextsecondPart=newText();
- @Override
- protectedvoidmap(Objectkey,Textvalue,Contextcontext)
- throwsIOException,InterruptedException{
- //取得文件输入途径
- StringpathName=((FileSplit)context.getInputSplit()).getPath().toString();
- //数据来自tb_dim_city.dat文件,标志即为"0"
- if(pathName.endsWith("tb_dim_city.dat")){
- String[]valueItems=value.toString().split("\\|");
- //过滤格局过错的记载
- if(valueItems.length!=5){
- return;
- }
- flag.set("0");
- joinKey.set(valueItems[0]);
- secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);
- combineValues.setFlag(flag);
- combineValues.setJoinKey(joinKey);
- combineValues.setSecondPart(secondPart);
- context.write(combineValues.getJoinKey(),combineValues);
- }//数据来自于tb_user_profiles.dat,标志即为"1"
- elseif(pathName.endsWith("tb_user_profiles.dat")){
- String[]valueItems=value.toString().split("\\|");
- //过滤格局过错的记载
- if(valueItems.length!=4){
- return;
- }
- flag.set("1");
- joinKey.set(valueItems[3]);
- secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);
- combineValues.setFlag(flag);
- combineValues.setJoinKey(joinKey);
- combineValues.setSecondPart(secondPart);
- context.write(combineValues.getJoinKey(),combineValues);
- }
- }
- }
- publicstaticclassLeftOutJoinReducerextendsReducer<Text,CombineValues,Text,Text>{
- //存储一个分组中的左表信息
- privateArrayList<Text>leftTable=newArrayList<Text>();
- //存储一个分组中的右表信息
- privateArrayList<Text>rightTable=newArrayList<Text>();
- privateTextsecondPar=null;
- privateTextoutput=newText();
- /**
- *一个分组调用一次reduce函数
- */
- @Override
- protectedvoidreduce(Textkey,Iterable<CombineValues>value,Contextcontext)
- throwsIOException,InterruptedException{
- leftTable.clear();
- rightTable.clear();
- /**
- *将分组中的元素依照文件别离进行寄存
- *这种办法要注意的问题:
- *假如一个分组内的元素太多的话,可能会导致在reduce阶段呈现OOM,
- *在处理散布式问题之前***先了解数据的散布状况,依据不同的散布采纳最
- *恰当的处理办法,这样能够有用的避免导致OOM和数据过度歪斜问题。
- */
- for(CombineValuescv:value){
- secondPar=newText(cv.getSecondPart().toString());
- //左表tb_dim_city
- if("0".equals(cv.getFlag().toString().trim())){
- leftTable.add(secondPar);
- }
- //右表tb_user_profiles