빅데이터/하둡

하둡 1.0 튜토리얼 - (16) 조인

_금융덕후_ 2019. 7. 19. 19:30
728x90
반응형

 

조인

RDBMS에서의 조인이란 두개의 테이블을 한개(혹은 여러개)의 키값을 기준으로 합치는 작업입니다.

맵리듀스를 사용해 이전에 사용했던 항공데이터와 다른 데이터를 조인해 보겠습니다.

 

데이터 다운로드

먼저 데이터를 받아야합니다. 다음 명령어를 사용하시면 다운받을 수 있고,

> wget http://stat-computing.org/dataexpo/data/carriers.csv

다음 링크를 가셔도 다운 받을 수 있습니다.

http://stat-computing.org/dataexpo/2009/carriers.csv

 

데이터 정제

항공 데이터의 첫줄을 없애주었듯이 이 데이터도 없애주겠습니다.

아래 커맨드를 입력하면 첫줄과 큰다옴표가 데이터에서 사라지게 됩니다.

> perl -p -i -e 's/"//g' carriers.csv | sed -e '1d' carriers.csv > carriers_new.csv

혹은 아래 파이썬 코드를 실행하셔도 됩니다.

with open("carriers.csv", 'r') as in_file,  \
	open("carriers_new.csv", 'w') as out_file:
	for line in in_file.readlines()[1:]:
		out_file.write(line.replace("\"", ""))

 

데이터를 정제한 후 HDFS에 해당 파일을 업로드 합니다.

커맨드를 보시면 carriers_new.csv를 이름을 바꿔 meta폴더에 넣어주었습니다.

> hadoop fs -mkdir meta
> hadoop fs -put carriers_new.csv meta/carriers.csv

 

데이터 형식

먼저 기존의 항공운항데이터의 샘플은 다음과같은 테이블로 표현될 수 있습니다.

테이블에는 우리가 분석에 사용했던 컬럼들만 넣었습니다.

Year Month Departure Delay Arrival Delay Distance Code
2008 1 2 -2 100 AQ
2008 1 -5 -9 100 AQ
2008 5 -5 6 142 OO
2008 5 -15 -13 142 OO

 

아래는 우리가 조인할 항공사코드데이터의 샘플 입니다.

Code Description
AQ Aloha Airlines Inc.
AR Aerolineas Argentinas
EGA Eagle Airline
OO Skywest Airlines Inc.

 

위의 두 테이블을 겹치는 Code 컬럼으로 조인하면 다음과 같은 결과가 나옵니다.

Year Month Departure Delay Arrival Delay Distance Code Description
2008 1 2 -2 100 AQ Aloha Airlines Inc.
2008 1 -5 -9 100 AQ Aloha Airlines Inc.
2008 5 -5 6 142 OO Skywest Airlines Inc.
2008 5 -15 -13 142 OO Skywest Airlines Inc.

 

맵리듀스와 조인

맵리듀스를 사용해 조인을 하는 방법은 세가지가 있습니다.

  • 맵사이드 조인 - 분산캐시를 사용해 맵단계에서 조인
  • 리듀스사이드 조인 - 리듀스 도중에 복합키를 사용해 조인
  • 세미 조인 - 두 조인 방법을 혼합해 사용

우리는 세개의 조인 방법 중 리듀스 사이드 조인을 알아보도록 하겠습니다.

 

프로젝트 생성

먼저 개발을 위해 IntelliJ프로젝트를 생성합니다.

이름은 reduceSide라고 지었고, 패키지 구조는 com.jyoon.study.reduceSide로 생성했습니다.

gradle dependency는 기존 프로젝트들과 동일하게 다음과 같습니다.

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    implementation 'org.projectlombok:lombok:1.16.16'
    implementation 'org.apache.hadoop:hadoop-common:2.6.0'
    implementation 'org.apache.hadoop:hadoop-mapreduce-client-core:2.6.0'
}

 

리듀스사이드 조인

이제 리듀스사이드 조인을 구현해 보도록 하겠습니다.

리듀스 사이드 조인은 복합키를 사용해 정렬하고, 마지막 리듀스 단계에서 조인을 수행하게 됩니다.

따라서 이전 포스팅에서 보았던 보조정렬과 코드가 유사할 것입니다.

 

항공사코드 데이터

먼저 항공사코드데이터를 처리하는 Parser클래스를 만들어주겠습니다.

이 코드는 Lombok을 사용했습니다.

설정하지 않으셨다면 File > Setting 메뉴에서  Build > Compiler > Annotation Processor에서 체크를 해주시기 바랍니다.

import lombok.Getter;
import lombok.NoArgsConstructor;
import org.apache.hadoop.io.Text;

@NoArgsConstructor
public class CarrierCodeParser {
    @Getter
    private String carrierCode;
    @Getter
    private String carrierName;

    public CarrierCodeParser(Text text){
        try{
            String[] columns = text.toString().split(",");
            carrierCode = columns[0];
            carrierName = columns[1];
        } catch (Exception ex){
            System.out.println("Error parsing a record: " + ex.getMessage());
        }
    }
}

 

복합키

이제 복합키를 만들어주겠습니다.

복합키는 조인할 컬럼인 항공사코드와 태그로 이루어져 있습니다.

태그는 RDBMS에서 이야기하는 테이블의 Alias라고 생각하면 쉽습니다.

여기서는 0번이 항공사코드 테이블, 1번이 항공사운항 테이블 입니다.

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

@NoArgsConstructor
@AllArgsConstructor
public class TaggedKey implements WritableComparable<TaggedKey> {
    @Getter @Setter // 항공사코드
    private String carrierCode;
    @Getter @Setter // 조인 태그
    private Integer tag;

    @Override
    public int compareTo(TaggedKey key){
        int result = this.carrierCode.compareTo(key.carrierCode);

        if (result == 0)
            return this.tag.compareTo(key.tag);
        return result;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        WritableUtils.writeString(out, carrierCode);
        out.writeInt(tag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        carrierCode = WritableUtils.readString(in);
        tag = in.readInt();
    }
}

 

복합키 비교기

이 복합키를 비교해주는 Comparator클래스를 구현합니다.

먼저 항공사 코드끼리 비교하고, 태그를 비교해주는 순서입니다.

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class TaggedKeyComparator extends WritableComparator {
    protected TaggedKeyComparator(){
        super(TaggedKey.class, true);
    }

    @SuppressWarnings("rawtypes")
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        TaggedKey k1 = (TaggedKey) a;
        TaggedKey k2 = (TaggedKey) b;

        int cmp = k1.getCarrierCode().compareTo(k2.getCarrierCode());
        if (cmp != 0)
            return cmp;

        return k1.getTag().compareTo(k2.getTag());
    }
}

 

그룹키 파티셔너

이전 포스팅에서 언급하였듯이 파티셔너는 맵의 결과가 어떤 리듀서로 전달될지를 결정합니다.

이 경우에는 항공사 코드의 해시값을 사용하기 때문에 같은 항공사의 데이터는 같이 처리되게 됩니다.

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class TaggedGroupKeyPartitioner extends Partitioner<TaggedKey, Text> {
    @Override
    public int getPartition(TaggedKey key, Text val, int numPartitions) {
        int hash = key.getCarrierCode().hashCode();
        return hash % numPartitions;
    }
}

 

그룹키 비교기

해당 그룹키들을 비교해주기 위해 Comparator클래스를 구현합니다.

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class TaggedGroupKeyComparator extends WritableComparator {

    protected TaggedGroupKeyComparator() {
        super(TaggedKey.class, true);
    }

    @SuppressWarnings("rawtypes")
    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        TaggedKey k1 = (TaggedKey) w1;
        TaggedKey k2 = (TaggedKey) w2;

        // 그룹키인 항공사 코드값 정렬
        return k1.getCarrierCode().compareTo(k2.getCarrierCode());
    }
}

 

항공사코드 Mapper클래스

항공사 코드 데이터들을 처리해주는 매퍼를 구현합니다.

코드를 보시면 Tag는 0으로 설정해주는 것을 볼 수 있습니다.

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class CarrierCodeMapper extends Mapper<LongWritable, Text, TaggedKey, Text> {
    TaggedKey outputKey = new TaggedKey();
    Text outputValue = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        CarrierCodeParser parser = new CarrierCodeParser(value);

        outputKey.setCarrierCode(parser.getCarrierCode());
        outputKey.setTag(0);
        outputValue.set(parser.getCarrierName());

        context.write(outputKey, outputValue);
    }
}

 

운항데이터 Mapper클래스

항공사 운항 데이터들을 처리해주는 매퍼를 구현합니다.

이번에는 Tag를 1로 설정해줍니다.

아래 코드에서 AirlinePerformanceParser를 사용하게 되는데, 이전 포스팅에서 썼던 코드를 그대로 사용합니다.

코드는 이 포스팅에서 찾으실 수 있습니다: https://jyoondev.tistory.com/48

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MapperWithReduceSideJoin extends Mapper<LongWritable, Text, TaggedKey, Text> {
    TaggedKey outputKey = new TaggedKey();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        AirlinePerformanceParser parser = new AirlinePerformanceParser(value);

        outputKey.setCarrierCode(parser.getUniqueCarrier());
        outputKey.setTag(1);
        context.write(outputKey, value);
    }
}

 

Reducer구현

이제 조인을 수행하는 리듀서를 구현해주겠습니다.

그룹키 비교기에 의해 비교가 끝난 결과는 다음과 같이 정렬되게 됩니다.

항공사코드, 0	항공사이름
항공사코드, 1	항공사운항정보1
항공사코드, 1	항공사운항정보2
항공사코드, 1	항공사운항정보3
...

따라서 첫번째 줄인 항공사 이름을 먼저 가져오고, 그 뒤에 모든 운항정보를 가져오는 코드가 되겠습니다.

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class ReducerWithReduceSideJoin extends Reducer<TaggedKey, Text, Text, Text> {
    private Text outputKey = new Text();
    private Text outputValue = new Text();

    @Override
    protected void reduce(TaggedKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        Iterator<Text> iterator = values.iterator();
        // 순차적으로 항공사 이름을 조회
        Text carrierName = new Text(iterator.next());

        // 운항 정보를 조회
        while (iterator.hasNext()){
            Text record = iterator.next();
            outputKey.set(key.getCarrierCode());
            outputValue = new Text(carrierName.toString() + "\t" + record.toString());
            context.write(outputKey, outputValue);
        }
    }
}

 

리듀스사이드 조인 실행

이제 작성한 맵리듀스 프로그램을 실행하도록 하겠습니다.

artifact를 설정한 뒤 jar빌드를 하고 파일을 전송해줍니다.

아래 커맨드로 실행해보도록 하겠습니다.

> hadoop jar reduceSide.jar com.jyoon.study.reduceSide.ReduceSideJoin meta/carriers.csv input reduce_join_output

공간이 부족할 수 있으니 파일 한두개로만 실행하는게 좋을 것 같습니다.

 

맵리듀스가 끝나면 결과를 확인해봅니다.

> hadoop fs -cat reduce_join_output/part-r-00000 | head -10

결과는 아래와 같습니다.

9E	Pinnacle Airlines Inc.	2007,7,23,1,1512,1515,1625,1631,9E,2912,89189E,133,136,93,-6,-3,DTW,XNA,716,26,14,0,,0,0,0,0,0,0
9E	Pinnacle Airlines Inc.	2007,7,24,2,1513,1515,1620,1631,9E,2912,87189E,127,136,92,-11,-2,DTW,XNA,716,24,11,0,,0,0,0,0,0,0
9E	Pinnacle Airlines Inc.	2007,7,31,2,1940,1940,2049,2045,9E,5969,80369E,69,65,40,4,0,MEM,SGF,243,11,18,0,,0,0,0,0,0,0
9E	Pinnacle Airlines Inc.	2007,7,30,1,1937,1940,2036,2045,9E,5969,85169E,59,65,40,-9,-3,MEM,SGF,243,9,10,0,,0,0,0,0,0,0
9E	Pinnacle Airlines Inc.	2007,7,29,7,2009,1940,2107,2045,9E,5969,86969E,58,65,42,22,29,MEM,SGF,243,6,10,0,,0,22,0,0,0,0
9E	Pinnacle Airlines Inc.	2007,7,28,6,2116,1940,2221,2045,9E,5969,80259E,65,65,44,96,96,MEM,SGF,243,7,14,0,,0,0,0,0,0,96
9E	Pinnacle Airlines Inc.	2007,7,27,5,1936,1940,2034,2045,9E,5969,85059E,58,65,43,-11,-4,MEM,SGF,243,2,13,0,,0,0,0,0,0,0
9E	Pinnacle Airlines Inc.	2007,7,26,4,2203,1940,2310,2045,9E,5969,80009E,67,65,42,145,143,MEM,SGF,243,7,18,0,,0,0,0,2,0,143
9E	Pinnacle Airlines Inc.	2007,7,25,3,1937,1940,2040,2045,9E,5969,89369E,63,65,40,-5,-3,MEM,SGF,243,5,18,0,,0,0,0,0,0,0
9E	Pinnacle Airlines Inc.	2007,7,24,2,1935,1940,2038,2045,9E,5969,84959E,63,65,41,-7,-5,MEM,SGF,243,7,15,0,,0,0,0,0,0,0

결과를 보면 항공사 이름이 잘 들어와 있는것을 볼 수 있습니다.

우선 눈에 띄게 하기 위해 \t를 사용했지만, 실제 조인을 수행하려면 기존 데이터와 같은 형식으로 comma를 사용하면 될 듯 합니다.

 

참고자료

이 포스팅은 "시작하세요! 하둡 프로그래밍" 책의 예제를 무작정 따라해본 포스팅입니다.

https://wikibook.co.kr/beginning-hadoop-programming-2rev/

728x90
반응형