빅데이터/하둡

하둡 1.0 튜토리얼 - (14) 부분정렬

_금융덕후_ 2019. 7. 18. 20:51
728x90
반응형

 

부분정렬

이번에는 같은 데이터를 다른 방식으로 정렬해보겠습니다. 항공 데이터를 운항 거리 순으로 정렬해보겠습니다.

부분 정렬이란, Map작업의 결과를 MapFile이라는 다른 형식으로 변경해 저장한 뒤, 이를 검색하는 방법입니다.

이는 먼저 입력 데이터를 시퀀스파일을 변환하고, 시퀀스파일을 맵파일로 변경합니다.

그리고 맵파일에서 데이터를 검색/조회하는 방식으로 수행됩니다.

 

프로젝트 생성

먼저 프로젝트를 생성해주도록 하겠습니다.

IntelliJ에서 partialSort라는 프로젝트를 생성합니다.

이 프로젝트에서는 3개의 각기 다른 jar빌드를 해주어야 하기 때문에, 하위 모듈들을 생성해주겠습니다.

가장 상위 모듈인 partialSort모듈에 오른쪽 클릭을 하시고, New > Module을 선택해 모듈들을 만들어줍니다.

3개의 모듈은 다음과 같습니다: seqGenerator, mapGenerator, search.

 

root(partialSort)모듈의 gradle설정

root모듈인 partialSort의 build.gradle을 변경해주겠습니다.

3개의 하위 모듈 모두 같은 dependencies를 사용할것이기 때문에 subprojects에 설정해줍니다.

buildscript {
    repositories {
        mavenCentral()
    }
}

allprojects {
    apply plugin: 'java'
    group 'com.jyoon.study'
    version '1.0-SNAPSHOT'
}

subprojects {
    repositories {
        mavenCentral()
    }
    dependencies {
        testCompile group: 'junit', name: 'junit', version: '4.12'
        implementation 'org.projectlombok:lombok:1.16.16'
        implementation 'org.apache.hadoop:hadoop-common:2.9.2'
        implementation 'org.apache.hadoop:hadoop-mapreduce-client-core:2.9.2'
    }
}

마지막으로 루트 프로젝트의 src폴더는 지워주시면 되겠습니다.

 

seqGenerator 모듈

먼저 첫번째 모듈을 작성해주겠습니다.

seqGenerator > src > main > java폴더 안의 패키지 구조를 com.jyoon.study.seqGenerator로 만들어주겠습니다.

AirlinePerformanceParser파일은 지난번 포스팅과 동일하게 사용하겠습니다.

 

Mapper 클래스

먼저 Mapper클래스를 정의해 주겠습니다.

가장 먼저 달라진점은 이전에는 mapreduce 패키지의 Mapper를 사용했지만,

MapFile형식의 출력을 mapreduce패키지에서 핸들링할 수 없기 때문에, mapred패키지의 Mapper로 바뀌었습니다.

위에서 언급한대로 데이터에서는 운항거리 즉 Distance만 뽑아오게 되어있습니다.

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

import java.io.IOException;

public class DistanceMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> {
    private IntWritable outputKey = new IntWritable();
    public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
        AirlinePerformanceParser parser = new AirlinePerformanceParser(value);
        try {
            if (parser.isArrivalDelayAvailable()) {
                outputKey.set(parser.getDistance());
                output.collect(outputKey, value);
            }
        } catch (ArrayIndexOutOfBoundsException e){
            outputKey.set(0);
            output.collect(outputKey, value);
            e.printStackTrace();
        } catch (Exception e){
            outputKey.set(0);
            output.collect(outputKey, value);
            e.printStackTrace();
        }
    }
}

 

Sequence 파일 생성

이제 위에서 정의해준 Mapper클래스를 사용해 시퀀스 파일을 생성하는 클래스를 만들어주겠습니다.

이전에 보았던 main클래스들과 많이 유사하지만, 이번에는 시퀀스파일의 출력을 압축하는 코드가 추가되었습니다.

public class SequenceFileCreator extends Configured implements Tool {
    public static void main(String[] args) throws Exception{
        int res = ToolRunner.run(new Configuration(), new SequenceFileCreator(), args);
        System.out.println("MR-Job Result: "+ res);
    }

    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(SequenceFileCreator.class);
        conf.setJobName("SequenceFileCreator");
        conf.setMapperClass(DistanceMapper.class);
        conf.setNumReduceTasks(0);

        // 입출력 경로 설정
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        // 출력 데이터 상세 설정
        conf.setOutputFormat(SequenceFileOutputFormat.class);
        conf.setOutputKeyClass(IntWritable.class);
        conf.setOutputValueClass(Text.class);

        // 시퀀스파일 압축 포맷 설정
        SequenceFileOutputFormat.setCompressOutput(conf, true);
        SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);
        SequenceFileOutputFormat.setOutputCompressionType(conf, SequenceFile.CompressionType.BLOCK);

        JobClient.runJob(conf);
        return 0;
    }
}

또한 run메소드의 4번째 줄을 보면 ReduceTask가 0으로 설정되어 있습니다.

이는 우리가 시퀀스 파일을 생성할때는 Reduce작업이 필요하지 않기 때문입니다.

 

드라이버 실행

이제 위에서 정의해준 프로그램이 제대로 동작하는지 확인해보겠습니다.

위와 같은 Artifact설정을 해주겠습니다. META-INF가 생성되는 폴더를 변경해주었습니다.

그리고 artifact의 이름을 seqGenerator로 변경해줍니다.

Jar빌드를 실행하고 Scp로 Jar파일을 옮겨줍니다.

그리고 아래 커맨드를 사용해 2008년의 데이터에 대한 시퀀스 파일을 생성해주겠습니다.

> hadoop jar seqGenerator.jar com.jyoon.study.seqGenerator.SequenceFileCreator input/2008.csv 2008_sequencefile

 

맵리듀스 작업이 끝나면 결과를 확인하기 위해 fs -ls 명령으로 2008_sequencefile내부의 파일들을 확인해보겠습니다.

Found 13 items
-rw-r--r--   3 doop supergroup          0 2019-07-17 03:42 /user/doop/2008_sequencefile/_SUCCESS
drwxr-xr-x   - doop supergroup          0 2019-07-17 03:41 /user/doop/2008_sequencefile/_logs
-rw-r--r--   3 doop supergroup   18075488 2019-07-17 03:41 /user/doop/2008_sequencefile/part-00000
-rw-r--r--   3 doop supergroup   18196309 2019-07-17 03:41 /user/doop/2008_sequencefile/part-00001
-rw-r--r--   3 doop supergroup   17768589 2019-07-17 03:41 /user/doop/2008_sequencefile/part-00002
-rw-r--r--   3 doop supergroup   18062799 2019-07-17 03:41 /user/doop/2008_sequencefile/part-00003
-rw-r--r--   3 doop supergroup   17841380 2019-07-17 03:41 /user/doop/2008_sequencefile/part-00004
-rw-r--r--   3 doop supergroup   17703777 2019-07-17 03:41 /user/doop/2008_sequencefile/part-00005
-rw-r--r--   3 doop supergroup   17796043 2019-07-17 03:42 /user/doop/2008_sequencefile/part-00006
-rw-r--r--   3 doop supergroup   17954569 2019-07-17 03:42 /user/doop/2008_sequencefile/part-00007
-rw-r--r--   3 doop supergroup   17058263 2019-07-17 03:42 /user/doop/2008_sequencefile/part-00008
-rw-r--r--   3 doop supergroup   17666089 2019-07-17 03:42 /user/doop/2008_sequencefile/part-00009
-rw-r--r--   3 doop supergroup    4655450 2019-07-17 03:42 /user/doop/2008_sequencefile/part-00010

총 11개의 시퀀스 파일들이 생성되었습니다.

 

이제 다음 명령을 사용해 파일 내용을 확인해보겠습니다.

> hadoop fs -text 2008_sequencefile/part-00000 | head -10

여기서 cat을 사용하지 않고 text를 사용한 이유는, 압축 파일의 내용을 확인하기 위해서입니다.

cat을 사용하면 binary형식의 알아볼 수 없는 데이터가 출력될것입니다.

810	2008,1,3,4,2003,1955,2211,2225,WN,335,N712SW,128,150,116,-14,8,IAD,TPA,810,4,8,0,,0,NA,NA,NA,NA,NA
810	2008,1,3,4,754,735,1002,1000,WN,3231,N772SW,128,145,113,2,19,IAD,TPA,810,5,10,0,,0,NA,NA,NA,NA,NA
515	2008,1,3,4,628,620,804,750,WN,448,N428WN,96,90,76,14,8,IND,BWI,515,3,17,0,,0,NA,NA,NA,NA,NA
515	2008,1,3,4,926,930,1054,1100,WN,1746,N612SW,88,90,78,-6,-4,IND,BWI,515,3,7,0,,0,NA,NA,NA,NA,NA
515	2008,1,3,4,1829,1755,1959,1925,WN,3920,N464WN,90,90,77,34,34,IND,BWI,515,3,10,0,,0,2,0,0,0,32
688	2008,1,3,4,1940,1915,2121,2110,WN,378,N726SW,101,115,87,11,25,IND,JAX,688,4,10,0,,0,NA,NA,NA,NA,NA
1591	2008,1,3,4,1937,1830,2037,1940,WN,509,N763SW,240,250,230,57,67,IND,LAS,1591,3,7,0,,0,10,0,0,0,47
1591	2008,1,3,4,1039,1040,1132,1150,WN,535,N428WN,233,250,219,-18,-1,IND,LAS,1591,7,7,0,,0,NA,NA,NA,NA,NA
451	2008,1,3,4,617,615,652,650,WN,11,N689SW,95,95,70,2,2,IND,MCI,451,6,19,0,,0,NA,NA,NA,NA,NA
451	2008,1,3,4,1620,1620,1639,1655,WN,810,N648SW,79,95,70,-16,0,IND,MCI,451,3,6,0,,0,NA,NA,NA,NA,NA

위 파일을 보면 각각의 운항거리 (810, 515 ...)들이 Key값 그리고 데이터 전체가 Value값으로 들어가 있는것을 볼 수 있습니다.

그리고 각 키 값들은 정렬이 되어있지 않습니다.

 

mapGenerator 모듈

이제 두번째 모듈을 작성해주겠습니다.

mapGenerator > src > main > java폴더 안의 패키지 구조를 com.jyoon.study.mapGenerator로 만들어주겠습니다.

 

MapFile생성

MapFile을 생성하기 위한 프로그램을 구현하겠습니다.

위의 Sequence파일을 생성하는 클래스와 유사한 Driver형식의 클래스입니다.

아래 코드를 보면 출력 Key값의 타입은 Int로 운항 거리를 Key값으로 합니다.

시퀀스를 그대로 MapFile로 변환하는 것이기 때문에 별도의 Map과 Reduce작업이 필요하지 않습니다.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MapFileCreator extends Configured implements Tool {
    public static void main(String[] args) throws Exception{
        int res = ToolRunner.run(new Configuration(), new MapFileCreator(), args);
        System.out.println("MR-Job Result: "+ res);
    }

    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(MapFileCreator.class);
        conf.setJobName("MapFileCreator");

        // 입출력 경로 설정
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        // 출력 데이터 상세 설정
        conf.setInputFormat(SequenceFileInputFormat.class);
        conf.setOutputFormat(MapFileOutputFormat.class);
        conf.setOutputKeyClass(IntWritable.class);

        // 시퀀스파일 압축 포맷 설정
        SequenceFileOutputFormat.setCompressOutput(conf, true);
        SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);
        SequenceFileOutputFormat.setOutputCompressionType(conf, SequenceFile.CompressionType.BLOCK);

        JobClient.runJob(conf);
        return 0;
    }
}

 

드라이버 실행

이번에는 위와 같은 Artifact설정을 해주겠습니다. 그리고 artifact이름을 mapGenerator.jar로 변경해줍니다.

빌드가 끝나면 Jar빌드를 실행하고 Scp로 Jar파일을 옮겨줍니다.

그리고 아래 커맨드를 사용해 시퀀스를 MapFile로 변환해줍니다.

> hadoop jar mapGenerator.jar com.jyoon.study.mapGenerator.MapFileCreator input/2008.csv 2008_sequencefile

 

맵파일 생성 작업은 아래와 같이 part-00000이라는 디렉토리를 만들어 냅니다. 

Found 3 items
-rw-r--r--   3 doop supergroup          0 2019-07-17 05:19 /user/doop/2008_mapfile/_SUCCESS
drwxr-xr-x   - doop supergroup          0 2019-07-17 05:18 /user/doop/2008_mapfile/_logs
drwxr-xr-x   - doop supergroup          0 2019-07-17 05:19 /user/doop/2008_mapfile/part-00000

 

따라서 작업이 끝나면 아래의 명령어를 통해 part-00000 내부의 파일들을 보겠습니다.

> hadoop fs -ls 2008_mapfile/part-00000

index파일과 data파일이 생긴것을 확인할 수 있습니다.

 

이제 아래 명령어로 내용을 확인해보겠습니다.

> hadoop fs -text 2008_mapfile/part-00000/data | head -10

다음과 같이 Key들이 운항거리로 잘 정렬되어 있는것을 볼 수 있습니다.

11	2008,8,10,7,1315,1220,1415,1320,OH,5572,N819CA,60,60,14,55,55,JFK,LGA,11,8,38,0,,0,55,0,0,0,0
11	2008,5,15,4,2037,1800,2125,1900,OH,4988,N806CA,48,60,31,145,157,JFK,LGA,11,10,7,0,,0,145,0,0,0,0
21	2008,5,9,5,48,100,117,130,AA,588,N061AA,29,30,11,-13,-12,MIA,FLL,21,6,12,0,,0,NA,NA,NA,NA,NA
24	2008,3,12,3,955,931,1021,948,9E,2009,91619E,26,17,10,33,24,IAH,HOU,24,7,9,0,,0,0,0,9,0,24
24	2008,11,27,4,943,940,1014,956,9E,5816,91469E,31,16,9,18,3,IAH,HOU,24,5,17,0,,0,0,0,18,0,0
24	2008,1,2,3,1245,1025,1340,1125,OH,5610,N806CA,55,60,11,135,140,IAD,DCA,24,5,39,0,,0,135,0,0,0,0
30	2008,1,6,7,2226,2200,2301,2240,CO,348,N56859,35,40,11,21,26,SJC,SFO,30,7,17,0,,0,0,0,0,0,21
30	2008,1,8,2,816,805,907,855,B6,9002,N236JB,51,50,19,12,11,JFK,HPN,30,5,27,0,,0,NA,NA,NA,NA,NA
30	2008,9,22,1,1340,1325,1553,1425,OH,6898,N710CA,133,60,27,88,15,HPN,JFK,30,12,94,0,,0,0,0,88,0,0
30	2008,8,8,5,1448,1440,1602,1540,OH,5052,N442CA,74,60,23,22,8,HPN,JFK,30,9,42,0,,0,0,0,22,0,0

 

검색(search) 모듈

위에서 MapFile은 검색/조회에 쓰이는 파일이라고 언급한적이 있습니다.

다음으로는 이 MapFile로 우리가 원하는 키에 해당하는 값들을 조회해보도록 하겠습니다.

참고로 이 프로그램은 맵리듀스 프로그램은 아닙니다.

 

드라이버 클래스

아래는 MapFile에서 Key값으로 데이터를 조회해오는 코드입니다.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.MapFileOutputFormat;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SearchValueList extends Configured implements Tool {
    public static void main(String[] args) throws Exception{
        int res = ToolRunner.run(new Configuration(), new SearchValueList(), args);
        System.out.println("MR-Job Result: "+ res);
    }

    @Override
    public int run(String[] args) throws Exception {
        Path path = new Path(args[0]);
        FileSystem fs = path.getFileSystem(getConf());

        MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, path, getConf());

        IntWritable key = new IntWritable(Integer.parseInt(args[1]));
        Text value = new Text();

        Partitioner<IntWritable, Text> partitioner = new HashPartitioner<>();
        MapFile.Reader reader = readers[partitioner.getPartition(key, value, readers.length)];

        Writable entry = reader.get(key, value);
        if(entry == null)
            System.out.println("The requested key was not found.");

        IntWritable nextKey = new IntWritable();
        do{
            System.out.println(value.toString());
        } while (reader.next(nextKey, value) && key.equals(nextKey));

        return 0;
    }
}

 

드라이버 실행

위의 코드를 다음과같이 빌드하고 실행하도록 하겠습니다.

jar 빌드가 끝나면 ssh로 해당 파일을 전송하고 다음과같이 2008_mapfile안의 로그파일들을 지워주겠습니다.

이것은 위에 구현한 검색 드라이버가 _로 시작하는 로그파일 폴더들을 뒤지다가 에러를 내기 때문입니다.

> hadoop fs -rmr 2008_mapfile/_*

2008_mapfile 뒤의 _*를 꼭 주의해서 넣어주시기 바랍니다.

 

이제 jar파일을 실행하도록 하겠습니다.

> hadoop jar search.jar com.jyoon.study.search.SearchValueList 2008_mapfile 100 | head -10

 

100의 운항 거리를 가진 기록들 중 상위 10개는 다음과 같습니다.

2008,1,15,2,1512,1510,1542,1544,AQ,215,N841AL,30,34,24,-2,2,OGG,HNL,100,3,3,0,,0,NA,NA,NA,NA,NA
2008,1,16,3,1505,1510,1535,1544,AQ,215,N841AL,30,34,22,-9,-5,OGG,HNL,100,3,5,0,,0,NA,NA,NA,NA,NA
2008,1,17,4,1508,1510,1538,1544,AQ,215,N841AL,30,34,23,-6,-2,OGG,HNL,100,4,3,0,,0,NA,NA,NA,NA,NA
2008,1,18,5,1514,1510,1548,1544,AQ,215,N841AL,34,34,22,4,4,OGG,HNL,100,3,9,0,,0,NA,NA,NA,NA,NA
2008,1,19,6,1515,1510,1549,1544,AQ,215,N836AL,34,34,24,5,5,OGG,HNL,100,5,5,0,,0,NA,NA,NA,NA,NA
2008,1,20,7,1509,1510,1540,1544,AQ,215,N841AL,31,34,23,-4,-1,OGG,HNL,100,3,5,0,,0,NA,NA,NA,NA,NA
2008,1,21,1,1507,1510,1543,1544,AQ,215,N841AL,36,34,25,-1,-3,OGG,HNL,100,3,8,0,,0,NA,NA,NA,NA,NA
2008,1,22,2,1506,1510,1543,1544,AQ,215,N841AL,37,34,23,-1,-4,OGG,HNL,100,10,4,0,,0,NA,NA,NA,NA,NA
2008,1,23,3,1512,1510,1541,1544,AQ,215,N841AL,29,34,23,-3,2,OGG,HNL,100,3,3,0,,0,NA,NA,NA,NA,NA
2008,1,24,4,1454,1510,1531,1544,AQ,215,N841AL,37,34,21,-13,-16,OGG,HNL,100,5,11,0,,0,NA,NA,NA,NA,NA

데이터를 잘 살펴보시면 모든 행이 100을 가지고있는것을 보실 수 있을 것입니다.

 

 

거리를 143으로 바꾸면 다음과 같은 결과를 얻을 수 있습니다.

2008,5,18,7,1341,1346,1441,1435,OO,6573,N965SW,60,49,32,6,-5,HDN,DEN,142,19,9,0,,0,NA,NA,NA,NA,NA
2008,5,12,1,1331,1346,1422,1435,OO,6573,N945SW,51,49,32,-13,-15,HDN,DEN,142,8,11,0,,0,NA,NA,NA,NA,NA
2008,5,5,1,1622,1622,1712,1711,OO,6610,N960SW,50,49,32,1,0,HDN,DEN,142,8,10,0,,0,NA,NA,NA,NA,NA
2008,5,5,1,1504,1505,1551,1555,OO,6610,N960SW,47,50,29,-4,-1,DEN,HDN,142,3,15,0,,0,NA,NA,NA,NA,NA
2008,5,12,1,1225,1229,1305,1319,OO,6573,N945SW,40,50,24,-14,-4,DEN,HDN,142,3,13,0,,0,NA,NA,NA,NA,NA
2008,5,19,1,1505,1505,1544,1555,OO,6610,N953SW,39,50,24,-11,0,DEN,HDN,142,3,12,0,,0,NA,NA,NA,NA,NA
2008,5,11,7,1330,1346,1444,1435,OO,6573,N945SW,74,49,27,9,-16,HDN,DEN,142,30,17,0,,0,NA,NA,NA,NA,NA
2008,5,2,5,1227,1229,1315,1319,OO,6573,N915SW,48,50,25,-4,-2,DEN,HDN,142,3,20,0,,0,NA,NA,NA,NA,NA
2008,5,11,7,1504,1505,1552,1555,OO,6610,N945SW,48,50,25,-3,-1,DEN,HDN,142,3,20,0,,0,NA,NA,NA,NA,NA
2008,5,2,5,1341,1345,1441,1434,OO,6573,N915SW,60,49,40,7,-4,HDN,DEN,142,8,12,0,,0,NA,NA,NA,NA,NA

 

참고자료

이 포스팅은 "시작하세요! 하둡 프로그래밍" 책의 예제를 무작정 따라해본 포스팅입니다.

https://wikibook.co.kr/beginning-hadoop-programming-2rev/

728x90
반응형