맵리듀스 작성
먼저 항공 출발 지연 데이터를 조회하는 맵리듀스를 작성하겠습니다. 이 프로그램은 년도별로 얼마나 많은 항공기에 출발지연이 발생했는지를 집계해주는 프로그램입니다.
출발지연의 Mapper 클래스
먼저 출발지연의 Mapper클래스를 작성해주겠습니다.
이전에 언급한 바와 같이 맵과 리듀스는 입력과 출력을 <Key, Value>페어로 주고받습니다.
아래의 코드에서 Map의 입력의 Key는 오프셋 즉 Long의 숫자값이고, Value는 운항 데이터의 한줄 전체 Text입니다.
출력의 Key는 운항년도,운항월 형식의 Text이고, Value는 항상 1의 Int값을 출력합니다.
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class DepartureDelayCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
// 맵 작업의 출력 키
private Text outputKey = new Text();
// 맵 작업의 출력 값 -> 항상 1값 (카운트)
private final static IntWritable outputValue = new IntWritable(1);
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
AirlinePerformanceParser parser = new AirlinePerformanceParser(value);
// 출력키는 년도,월 형식
outputKey.set(parser.getYear() + "," + parser.getMonth());
if(parser.getDepartureDelayTime() > 0){
context.write(outputKey, outputValue);
}
}
}
출발지연의 Reducer 클래스
리듀서 클래스는 Mapper에서 1로 설정한 Int값들을 집계해주는 역할을 합니다.
아래의 코드를 DelayCountReducer.java의 파일에 작성해줍니다.
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class DelayCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values)
sum += value.get();
result.set(sum);
context.write(key, result);
}
}
출발지연의 Driver 클래스
이전 포스팅에서도 언급했듯이 Driver클래스는 하둡이 맵리듀스 프로그램을 어떻게 돌릴지 설정해주고 실행의 세부 방법을 셋팅해주는 클래스 입니다. 이전 워드카운트의 드라이버 클래스와 형식은 거의 동일합니다.
파일을 DepartureDelayCount.java라고 이름을 짓고 아래 코드를 작성해줍니다. 메소드는 main하나 입니다.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class DepartureDelayCount {
public static void main(String[] args) throws Exception {
if (args.length != 2){ // 입출력 데이터 확인
System.err.println("Usage: DepartureDelayCount <input> <output>");
System.exit(2);
}
Configuration conf = new Configuration();
Job job = new Job(conf, "DepartureDelayCount");
// 입출력 경로 설정
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 입출력 포맷 설정
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// Job, Mapper, Reducer 클래스 설정
job.setJarByClass(DepartureDelayCount.class);
job.setMapperClass(DepartureDelayCountMapper.class);
job.setReducerClass(DelayCountReducer.class);
// 출력 Key, Value 유형 설정
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
}
}
도착지연의 Mapper 클래스
이제 도착지연의 Mapper클래스를 작성해 주겠습니다.
마지막에 체크하는 데이터가 다르다는 것을 제외하면, 출발지연 Mapper와 동일합니다.
ArrivalDelayCountMapper.java라고 이름 짓고 아래 코드를 작성해줍니다.
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ArrivalDelayCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
// 맵 작업의 출력 키
private Text outputKey = new Text();
// 맵 작업의 출력 값 -> 항상 1값 (카운트)
private final static IntWritable outputValue = new IntWritable(1);
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
AirlinePerformanceParser parser = new AirlinePerformanceParser(value);
// 출력키는 년도,월 형식
outputKey.set(parser.getYear() + "," + parser.getMonth());
if(parser.getArrivalDelayTime() > 0){
context.write(outputKey, outputValue);
}
}
}
도착지연의 Reducer 클래스
리듀서는 새로 작성하지 않고 출발지연의 Reducer를 그대로 사용합니다.
도착지연의 Driver 클래스
이 클래스 역시 Job의 이름, Mapper와 Driver클래스 설정을 제외하면 출발지연의 Driver클래스와 동일합니다.
ArrivalDelayCount.java라고 이름 짓고 코드를 작성해줍니다.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class ArrivalDelayCount {
public static void main(String[] args) throws Exception {
if (args.length != 2){ // 입출력 데이터 확인
System.err.println("Usage: ArrivalDelayCount <input> <output>");
System.exit(2);
}
Configuration conf = new Configuration();
Job job = new Job(conf, "ArrivalDelayCount");
// 입출력 경로 설정
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 입출력 포맷 설정
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// Job, Mapper, Reducer 클래스 설정
job.setJarByClass(ArrivalDelayCount.class);
job.setMapperClass(ArrivalDelayCountMapper.class);
job.setReducerClass(DelayCountReducer.class);
// 출력 Key, Value 유형 설정
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
}
}
Jar빌드 및 실행
이제 프로그램을 Jar로 빌드하고 실행해주도록 하겠습니다.
Jar는 하나의 main클래스만 지정이 가능하기 때문에 두개의 main클래스를 각각 Jar 빌드를 설정해 주어야 합니다.
먼저 main > java폴더에 두개의 패키지를 생성해 주고 각각의 이름을 arrivalmeta 와 departuremeta로 설정해줍니다.
그리고 이전 방식과 동일하게 File > Project Structure에서 Artifact를 추가해줍니다.
이 때 main 클래스를 설정한 뒤 아래 Directory for META-INF설정에서 폴더를 클릭하고,
각각 생성해준 패키지폴더를 선택해 줍니다.
그리고 아래와 같이 Name옵션에서 빌드명을 다르게 설정해 주면 됩니다.
설정이 끝나면 Build > Build Artifacts에서 각각의 jar를 빌드합니다.
빌드가 끝나면 아래와 같이 out 폴더에 두개의 jar폴더가 따로 생깁니다.
각각의 jar 파일들을 이름을 바꿔 따로 저장한 뒤 하둡 VM으로 scp를 사용해 전송해줍니다.
Jar 실행
출발 지연 집계 실행
먼저 departure.jar를 실행시켜줍니다. 이 작업이 끝나는데 저는 4분정도가 소요되었습니다.
> hadoop jar departure.jar input dep_delay_count
맵과 리듀스가 모두 끝났다면 이제 출력된 파일을 확인해보겠습니다.
> hadoop fs -cat dep_delay_count/part-r-00000 | tail -10
다음과 같은 정보가 출력됩니다. (결과는 저와 다를 수 있습니다.)
2008,11 157278
2008,12 263949
2008,2 252765
2008,3 271969
2008,4 220864
2008,5 220614
2008,6 271014
2008,7 253632
2008,8 231349
2008,9 147061
도착 지연 집계 실행
같은 방식으로 arrival.jar를 실행시켜줍니다.
> hadoop jar arrival.jar input arr_delay_count
도착의 결과도 파일의 뒷부분을 확인해보겠습니다.
hadoop fs -cat arr_delay_count/part-r-00000 | tail -10
다음과 같은 정보가 출력됩니다. (역시 결과는 다를 수 있습니다.)
2008,11 181506
2008,12 280493
2008,2 278902
2008,3 294556
2008,4 256142
2008,5 254673
2008,6 295897
2008,7 264630
2008,8 239737
2008,9 169959
시각화
아래는 집계 결과를 시각화한 결과입니다.
(책에서는 엑셀로 했지만 저는 파이썬을 사용했습니다. 엑셀을 잘 다룰줄 몰라서...)
참고자료
이 포스팅은 "시작하세요! 하둡 프로그래밍" 책의 예제를 무작정 따라해본 포스팅입니다.
'빅데이터 > 하둡' 카테고리의 다른 글
하둡 1.0 튜토리얼 - (12) 다수의 파일 출력 (0) | 2019.07.17 |
---|---|
하둡 1.0 튜토리얼 - (11) 사용자 정의 옵션 (0) | 2019.07.17 |
하둡 1.0 튜토리얼 - (9) 항공 데이터 분석 1 (0) | 2019.07.15 |
하둡 1.0 튜토리얼 - (8) 맵리듀스 (0) | 2019.07.14 |
하둡 1.0 튜토리얼 - (7) HDFS 파일 입출력 (0) | 2019.07.14 |