기존 데이터의 문제
지난 포스팅에서 집계한 데이터에는 한가지 문제가 있습니다. 바로 키 값들이 정렬이 안되어 있습니다.
데이터를 조금 더 뽑아보면 문제를 알 수 있습니다.
> hadoop fs -cat delay_count_mos/departure-r-00000 | tail -15
마지막 15줄의 데이터를 뽑아보면 다음과 같이 나옵니다.
2007,7 307864
2007,8 298530
2007,9 195615
2008,1 247948
2008,10 162531
2008,11 157278
2008,12 263949
2008,2 252765
2008,3 271969
2008,4 220864
2008,5 220614
2008,6 271014
2008,7 253632
2008,8 231349
2008,9 147061
문제가 보이시나요?
단순 String을 비교하기 때문에 월별로 정렬되어 있지가 않습니다.
1 다음 2가 아니라 10, 11, 12가 나와버리는 것이 문제입니다.
이 문제를 해결하기 위해 복합키를 구현하고 보조 정렬을 하겠습니다.
프로젝트 생성
먼저 프로젝트를 생성하겠습니다.
IntelliJ에서 새로운 프로젝트를 만들고 secondarySort라고 이름 짓습니다.
이전에 했던것처럼 com.jyoon.study.secondarySort의 패키지 구조를 만들어주었습니다.
그리고 gradle dependency는 이전 프로젝트와 같습니다.
Lombok 플러그인
코드의 생산성을 위해 한가지 플러그인을 사용하겠습니다.
gradle dependency에 다음 한줄을 추가해줍니다.
dependencies {
...
implementation 'org.projectlombok:lombok:1.16.16'
...
}
이 Lombok 플러그인은 Getter Setter나 Constructor와 같은 것들을 자동으로 생성해주는 라이브러리입니다.
이를 빌드시 적용하려면 설정을 하나 해주어야 합니다.
File > Setting 메뉴로 들어가셔서 Build, Execution, Deployment 메뉴를 열어주시고,
Annotation Processors메뉴에서 Enable Annotation Processing 체크박스를 클릭해줍니다.
보조 정렬
보조 정렬은 데이터의 키 값을 그룹핑하고, 그 그룹핑된 키를 따로 비교해 정렬하는 하둡의 정렬 방식입니다.
이를 구현하기 위해 먼저 복합키를 구현하겠습니다.
복합키
import lombok.*;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@NoArgsConstructor
@AllArgsConstructor
public class DataKey implements WritableComparable<DataKey> {
@Getter @Setter
private String year;
@Getter @Setter
private Integer month;
@Override
public void readFields(DataInput dataInput) throws IOException {
year = WritableUtils.readString(dataInput);
month = dataInput.readInt();
}
@Override
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeString(dataOutput, year);
dataOutput.writeInt(month);
}
@Override
public int compareTo(DataKey dataKey) {
int result = year.compareTo(dataKey.year);
if(result == 0)
result = month.compareTo(dataKey.month);
return result;
}
@Override
public String toString() {
return new StringBuilder().append(year).append(",").append(month).toString();
}
}
먼저 코드에는 Lombok을 사용한 플러그인들이 보일겁니다.
두개의 Constructor들을 만들어주고, 두개의 멤버 변수들에 Getter와 Setter를 Annotation을 통해 만들어주었습니다.
readFields메소드는 데이터를 통해 키를 만들어주는 역할을 합니다.
write는 키를 output에 출력해주는 역할을 합니다.
compareTo는 키값들을 비교할 때 사용합니다.
자세히 보시면 먼저 year로 비교하고 나중에 month로 비교하는 것을 보실 수 있습니다.
이를 통해 년도와 월별로 잘 비교된 정렬이 나올 수 있습니다.
마지막으로 toString을 재정의 해주는 이유는, Mapper와 Reducer에서 해당 키를 toString을 호출해 사용하기 때문입니다.
복합키 Comparator
Comparator는 DataKey들을 비교하고 순서를 부여하는 클래스 입니다.
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class DataKeyComparator extends WritableComparator {
protected DataKeyComparator(){
super(DataKey.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
DataKey k1 = (DataKey) a;
DataKey k2 = (DataKey) b;
return k1.compareTo(k2);
}
}
compare 메소드를 재정의하는데, DataKey에 정의해주었던 compareTo 메소드를 사용해 비교해줍니다.
그룹키 Partitioner
Partitioner는 맵의 결과가 어떤 리듀스의 입력 데이터로 보내질지를 결정합니다.
아래 파티셔너는 년도의 해시값을 사용해 파티셔너를 결정하게 됩니다.
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class GroupKeyPartitioner extends Partitioner<DataKey, IntWritable> {
@Override
public int getPartition(DataKey dataKey, IntWritable intWritable, int numPartitions) {
int hash = dataKey.getYear().hashCode();
return hash % numPartitions;
}
}
그룹키 Comparator
앞서 Partitioner에서 년도의 해시값을 사용했기 때문에, 그룹키의 비교는 년도값으로 이뤄집니다.
따라서 그룹키의 Comparator에서는 각 키의 년도를 비교합니다.
이는 같은 년도의 데이터를 하나의 Reducer에서 처리하기 위함입니다.
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class GroupKeyComparator extends WritableComparator {
protected GroupKeyComparator(){
super(DataKey.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
DataKey k1 = (DataKey) a;
DataKey k2 = (DataKey) b;
return k1.getYear().compareTo(k2.getYear());
}
}
맵리듀스 구현
Row Parser
먼저 AirlinePerformanceParser.java파일의 내용을 복사해 오도록 하겠습니다.
이 코드에는 Lombok플러그인을 사용한 Getter와 Setter생성이 있습니다.
import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.io.Text;
public class AirlinePerformanceParser {
@Getter @Setter
private int year;
@Getter @Setter
private int month;
@Getter @Setter
private int arrivalDelayTime = 0;
@Getter @Setter
private int departureDelayTime = 0;
@Getter @Setter
private int distance = 0;
@Getter @Setter
private boolean arrivalDelayAvailable = true;
@Getter @Setter
private boolean departureDelayAvailable = true;
@Getter @Setter
private boolean distanceAvailable = true;
@Getter @Setter
private String uniqueCarrier;
public AirlinePerformanceParser(Text text){
try {
// 한 row를 comma로 split 한다
String[] columns = text.toString().split(",");
year = Integer.parseInt(columns[0]); // 운항 년도
month = Integer.parseInt(columns[1]); // 운항 월
uniqueCarrier = columns[8]; // 항공사 코드
if (!columns[15].equals("NA")) // 항공기 출발 지연 시간
departureDelayTime = Integer.parseInt(columns[15]);
else
departureDelayAvailable = false;
if (!columns[14].equals("NA")) // 항공기 도착 지연 시간
arrivalDelayTime = Integer.parseInt(columns[14]);
else
arrivalDelayAvailable = false;
if (!columns[18].equals("NA")) // 운항 거리
distance = Integer.parseInt(columns[18]);
else
distanceAvailable = false;
} catch (Exception ex){
System.out.println("Error parsing a record: " + ex.getMessage());
}
}
}
Mapper 클래스
Mapper 클래스 역시 지난번 클래스와 유사합니다.
출발과 도착 데이터를 구분할 수 있게 year 앞에 D와 A를 붙여주었습니다.
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class DelayCountMapperWithDataKey extends Mapper<LongWritable, Text, DataKey, IntWritable> {
private final static IntWritable outputValue = new IntWritable(1);
private DataKey outputKey = new DataKey();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
AirlinePerformanceParser parser = new AirlinePerformanceParser(value);
if (parser.isDepartureDelayAvailable()){
if (parser.getDepartureDelayTime() > 0) {
outputKey.setYear("D," + parser.getYear());
outputKey.setMonth(parser.getMonth());
context.write(outputKey, outputValue);
}
}
if (parser.isArrivalDelayAvailable()){
if(parser.getArrivalDelayTime() > 0){
outputKey.setYear("A," + parser.getYear());
outputKey.setMonth(parser.getMonth());
context.write(outputKey, outputValue);
}
}
}
}
Reducer 클래스
Reducer클래스는 이전 Reducer와 많이 다른것을 볼 수 있습니다.
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.io.IOException;
public class DelayCountReducerWithDataKey extends Reducer<DataKey, IntWritable, DataKey, IntWritable> {
private MultipleOutputs<DataKey, IntWritable> mos;
private DataKey outputKey = new DataKey();
private IntWritable result = new IntWritable();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
mos = new MultipleOutputs<>(context);
}
public void reduce(DataKey key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
String[] columns = key.getYear().split(",");
int sum = 0;
Integer month = key.getMonth();
String filePrefix;
if(columns[0].equals("D"))
filePrefix = "departure";
else
filePrefix = "arrival";
for (IntWritable value : values){
if(!month.equals(key.getMonth())){
writeTo(filePrefix, key, month, sum);
sum = 0;
}
sum += value.get();
month = key.getMonth();
System.out.println("current month in reducer: " + month);
}
if (key.getMonth().equals(month)){
writeTo(filePrefix, key, key.getMonth(), sum);
}
}
private void writeTo(String filePrefix, DataKey key, Integer month, Integer sum) throws IOException, InterruptedException {
result.set(sum);
outputKey.setYear(key.getYear().substring(2)); // 앞두자를 잘라냄
outputKey.setMonth(month);
mos.write(filePrefix, outputKey, result);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
Reducer에서는 데이터의 제일 앞 컬럼을 비교해 departure와 arrival로 출력을 나눠주고,
월별로 다른 집계를 내는 역할을 하고있습니다.
Driver 클래스
이제 맵리듀스를 설정해주는 Driver클래스를 구현하겠습니다.
아래의 설정들을 보시면, 입출력 형식에 DataKey가 포함되었고,
정렬수행에 관련된 클래스들이 설정된 것을 볼 수 있습니다.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DelayCountWithDataKey extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new DelayCountWithDataKey(), args);
System.out.println("MR-Job Result: " + res);
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2){ // 입출력 데이터 확인
System.err.println("Usage: DelayCountWithDataKey <input> <output>");
System.exit(2);
}
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
Job job = new Job(getConf(), "DelayCountWithDataKey");
// 입출력 경로 설정
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// 입력 Key, Value 유형 설정
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 출력 Key, Value 유형 설정
job.setOutputKeyClass(DataKey.class);
job.setOutputValueClass(IntWritable.class);
// 정렬 수행관련 설정
job.setJarByClass(DelayCountWithDataKey.class);
job.setPartitionerClass(GroupKeyPartitioner.class);
job.setGroupingComparatorClass(GroupKeyComparator.class);
job.setSortComparatorClass(DataKeyComparator.class);
job.setMapperClass(DelayCountMapperWIthDataKey.class);
job.setReducerClass(DelayCountReducerWithDataKey.class);
// MultipleOutput 설정
MultipleOutputs.addNamedOutput(job, "departure", TextOutputFormat.class, DataKey.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class, DataKey.class, IntWritable.class);
job.waitForCompletion(true);
return 0;
}
}
프로그램 실행
jar 빌드
이제 jar를 빌드하고 실행해보도록 하겠습니다.
artifact를 설정한 후 jar빌드를 하고 scp로 doop01기기로 보내주시면 됩니다.
jar 실행
처음 기기는 20기가씩 밖에 설정하지 않았기 때문에, 모든 데이터를 전부 정렬 하기에는 저장공간의 문제가 있습니다.
이는 하둡이 각 기기에 tmp파일을 사용해 맵과 리듀스의 결과를 저장하기 때문인데요,
실행하다 저장공간 때문에 에러가 나는 상황을 방지하기 위해 2000년대의 데이터만 집계를 해주겠습니다.
> hadoop jar secondarySort.jar com.jyoon.study.secondarySort.DelayCountWithDataKey input/200* delay_count_sort
위의 커맨드를 보시면 input 디렉터리 안의 200으로 시작하는 파일에만 집계를 실행하도록 되어있습니다.
따라서 2000.csv 부터 2008.csv까지만 정렬과 집계가 실행됩니다.
결과 확인
맵리듀스 작업이 끝나면 아래의 커맨드로 결과를 확인해보겠습니다.
> hadoop fs -cat delay_count_sort/departure-r-00000 | tail -15
결과는 다음과 같이 나옵니다.
2007,10 231129
2007,11 217557
2007,12 304011
2008,1 247948
2008,2 252765
2008,3 271969
2008,4 220864
2008,5 220614
2008,6 271014
2008,7 253632
2008,8 231349
2008,9 147061
2008,10 162531
2008,11 157278
2008,12 263949
보이는 바와 같이 1부터 12월까지 월별로 잘 정렬이 된것을 볼 수 있습니다.
참고자료
이 포스팅은 "시작하세요! 하둡 프로그래밍" 책의 예제를 무작정 따라해본 포스팅입니다.
https://wikibook.co.kr/beginning-hadoop-programming-2rev/
'빅데이터 > 하둡' 카테고리의 다른 글
하둡 1.0 튜토리얼 - (15) 전체정렬 (0) | 2019.07.18 |
---|---|
하둡 1.0 튜토리얼 - (14) 부분정렬 (0) | 2019.07.18 |
하둡 1.0 튜토리얼 - (12) 다수의 파일 출력 (0) | 2019.07.17 |
하둡 1.0 튜토리얼 - (11) 사용자 정의 옵션 (0) | 2019.07.17 |
하둡 1.0 튜토리얼 - (10) 항공 데이터 분석 2 (2) | 2019.07.15 |