다수의 출력
지난 포스팅에서 -D옵션을 이용해 각각 출발 지연과 도착 지연의 집계를 하나의 jar파일에서 따로 실행하는 법을 공부했습니다.
하지만 이 경우도 귀찮은 점이 있습니다. 매번 커맨드를 돌릴 때 다르게 매개변수를 주어야하고,
출발과 도착의 데이터가 병렬로 처리될 수 없다는 문제점도 있습니다.
이를 위해 하둡에서 다수의 파일 출력을 다루는 법을 공부하겠습니다.
Mapper 클래스
먼저 Mapper클래스를 다음과 같이 수정해주었습니다.
파일 이름은 MODelayCountMapper라고 지었습니다.
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MODelayCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable outputValue = new IntWritable(1);
private Text outputkey = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
AirlinePerformanceParser parser = new AirlinePerformanceParser(value);
if (parser.getDepartureDelayTime() > 0){
outputkey.set("D," + parser.getYear() + "," + parser.getMonth());
context.write(outputkey, outputValue);
}
if(parser.getArrivalDelayTime() > 0){
outputkey.set("A, " + parser.getYear() + "," + parser.getMonth());
context.write(outputkey, outputValue);
}
}
}
코드를 보면 알 수 있지만, 사용자 옵션에 대한 코드는 모두 지워주었고,
출력값의 가장 앞에 각각 새로운 컬럼인 D와 A를 추가해준 것을 볼 수 있습니다.
Reducer에서 이 값들을 통해 출력을 분배 해줄 것입니다
Reducer 클래스
Reduce클래스를 작성해주겠습니다.
MultipleOutput인스턴스를 만들고 Context가 아닌 MultipleOutput인스턴스에 출력해주게 됩니다.
그리고 Mapper에서 정의해 주었던 첫번째 컬럼, D 혹은 A의 값,은 여기서 사용한 뒤, 이 값을 뺀 나머지를 출력해줍니다.
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.io.IOException;
public class MODelayCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private MultipleOutputs<Text, IntWritable> mos;
private Text outputKey = new Text();
private IntWritable result = new IntWritable();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
mos = new MultipleOutputs<>(context);
}
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
String[] columns = key.toString().split(",");
outputKey.set(columns[1] + "," + columns[2]);
int sum = 0;
for (IntWritable value : values)
sum += value.get();
result.set(sum);
if(columns[0].equals("D")){
mos.write("departure", outputKey, result);
} else {
mos.write("arrival", outputKey, result);
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
Driver클래스
Driver클래스에는 MultipleOutput관련 설정을 추가해줍니다.
또한 DelayCount에 관련된 클래스들을 모두 MODelayCount로 수정해주어야합니다.
Configuration관련 코드는 지울 필요는 없어서 그냥 두었습니다.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MODelayCount extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MODelayCount(), args);
System.out.println("MR-Job Result: " + res);
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2){ // 입출력 데이터 확인
System.err.println("Usage: MODelayCount <input> <output>");
System.exit(2);
}
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
// Job 이름 변경
Job job = new Job(getConf(), "MODelayCount");
// 입출력 경로 설정
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 입출력 포맷 설정
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// Job, Mapper, Reducer 클래스 설정
job.setJarByClass(MODelayCount.class);
job.setMapperClass(MODelayCountMapper.class);
job.setReducerClass(MODelayCountReducer.class);
// 출력 Key, Value 유형 설정
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// MultipleOutput 설정
MultipleOutputs.addNamedOutput(job, "departure", TextOutputFormat.class, Text.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class, Text.class, IntWritable.class);
job.waitForCompletion(true);
return 0;
}
}
프로그램 실행
jar 빌드
지난 포스팅과 동일하게 jar빌드 설정을 만들고 빌드를 실행해줍니다.
기존 META-INF폴더와 artifact설정은 지워주었습니다.
그리고 캐시 정보가 함께 들어갈 수 있으니 Clean을 실행한 뒤 다시 Build 실행해줍니다.
jar 실행
이제 구현한 프로그램을 실행하도록 하겠습니다.
scp를 사용해 doop01기기에 jar파일을 옮겨주고, 다음 커맨드를 실행합니다.
> hadoop jar airlinedata.jar com.jyoon.study.airlinedata.MODelayCount input delay_count_mos
위를 실행하면 맵리듀스가 실행됩니다. 역시 4분정도 소요되었습니다.
병렬로 처리하기 때문에 각각 4-5분씩 걸리던 프로그램이 통합 약 6분정도가 걸렸습니다.
실행이 끝나면 명령을 사용해 결과를 출력해보겠습니다.
> hadoop fs -cat delay_count_mos/departure-r-00000 | tail -10
이전 포스팅과 동일한 다음과 같은 결과를 확인할 수 있습니다.
2008,11 157278
2008,12 263949
2008,2 252765
2008,3 271969
2008,4 220864
2008,5 220614
2008,6 271014
2008,7 253632
2008,8 231349
2008,9 147061
그리고 이전에 사용했던 part-r-00000 파일 역시 Context에 아무것도 출력해 주지 않았기 때문에 비어있음을 확인할 수 있습니다.
> hadoop fs -cat delay_count_mos/part-r-00000 | tail -10
참고자료
이 포스팅은 "시작하세요! 하둡 프로그래밍" 책의 예제를 무작정 따라해본 포스팅입니다.
https://wikibook.co.kr/beginning-hadoop-programming-2rev/
'빅데이터 > 하둡' 카테고리의 다른 글
하둡 1.0 튜토리얼 - (14) 부분정렬 (0) | 2019.07.18 |
---|---|
하둡 1.0 튜토리얼 - (13) 보조정렬 (0) | 2019.07.18 |
하둡 1.0 튜토리얼 - (11) 사용자 정의 옵션 (0) | 2019.07.17 |
하둡 1.0 튜토리얼 - (10) 항공 데이터 분석 2 (2) | 2019.07.15 |
하둡 1.0 튜토리얼 - (9) 항공 데이터 분석 1 (0) | 2019.07.15 |