스파크 튜토리얼 - (4) 페어 RDD
페어 RDD
페어 RDD란 key-value쌍으로 이루어진 RDD를 말합니다.
파이썬 에서는 Tuple로 이뤄진 RDD가 곧 페어 RDD가 됩니다.
페어 RDD 생성
먼저 간단하게 parallelize메소드를 사용해 int key-value페어로 이뤄진 페어RDD를 생성하겠습니다.
examplePairRDD = sc.parallelize([(1, 3), (1, 5), (2, 4), (3, 3), (4, 8), (4, 2), (3, 1)])
examplePairRDD
페어 RDD 트랜스포메이션
페어 RDD는 기본 RDD에서 사용 가능한 메소드들은 모두 사용할 수 있다.
또한 추가적으로 key-value쌍으로 작업하기 편한 메소드들 역시 제공된다.
대표적인 페어 RDD 메소드들은 다음 표에서 확인할 수 있다.
각 키에 대해 연산을 적용 rdd.mapValues(lambda x : x + 1) 림스멮미뎐(려추) 메소드 |
목적 | 예) |
reduceByKey(func) | 동일 키에 대한 값들을 reduce | rdd.reduceByKey(lambda x, y: x + y) |
groupByKey() | 동일 키에 대한 값들을 group | rdd.groupByKey() |
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) |
다른 결과의 타입을 써서 동일 키의 값들을 combine | 생략 |
mapValues(func) | 각 키에 대해 연산을 적용 | rdd.mapValues(lambda x : x + 1) |
keys() | 키값들을 리턴 | rdd.keys() |
values() | value값들을 리턴 | rdd.values() |
sortByKey() | 키로 정렬한 RDD 리턴 | rdd.sortByKey() |
reduceByKey
위에서 생성한 페어 RDD로 몇가지 예제를 실행해 보려 합니다.
먼저 아래는 reduceByKey를 사용해 각 key에 대한 value들을 모두 더하는 예제입니다.
{
i:j
for i, j in examplePairRDD.reduceByKey(lambda x, y : x + y).collect()
}
예제를 보면, collect함수는 iterable한 결과값만을 리턴하기 때문에 dict comprehension을 사용해 dict를 만들어 주었습니다.
mapValues
다음은 각 원소들의 value값을 방문해 제곱을 해주는 예제입니다.
{
i:j
for i, j in examplePairRDD.mapValues(lambda x: x**2).collect()
}
고객 데이터
w3school사이트에 있는 데이터를 조금 수정해서, 고객 데이터를 key-value쌍으로 분석해 보겠습니다.
아래 테이블은 데이터에서 "고객 이름"과 "나라"만 뽑은 데이터의 샘플입니다
CustomerName | Country |
Alfreds Futterkiste | Germany |
Ana Trujillo Emparedados y helados | Mexico |
Antonio Moreno Taqueria | Mexico |
Around the Horn | UK |
Berglunds snabbkop | Sweden |
데이터는 아래 파일에 있습니다. 편의를 위해 컬럼명은 제거했습니다.
텍스트 데이터 로딩
먼저 지난번 포스팅에서와 같이 텍스트 데이터를 로딩하겠습니다.
아래는 데이터를 로딩 해 첫번째 라인을 뽑아본 것입니다.
customerLines = sc.textFile("name-customers.csv")
customerLines.first()
위의 데이터를 전처리 해주겠습니다.
map을 사용해 각 값들을 방문하면서, 콤마 (,)로 split하고 튜플로 리턴하게 됩니다.
customerPairs = customerLines.map(lambda x: (x.split(",")[1], x.split(",")[0]))
customerPairs
groupByKey
groupByKey는 하나의 Key에 대한 여러 값들을 리스트 형태로 리턴합니다.
다음 예제는 나라별로 고객의 이름을 리스트 형태로 리턴하게 됩니다.
그리고 해당 데이터로 dict를 만들고 UK에 사는 고객들만 출력해 보았습니다.
customerPairCollected = customerPairs.groupByKey().collect()
customerDict = {
country : [c for c in customers]
for country, customers in customerPairCollected
}
customerDict['UK']
sortByKey
다음 예제는 RDD를 Key값으로 정렬하고, 키값들만 뽑아 상위 10개를 출력한 결과입니다.
[k for k in customerPairs.sortByKey().keys().collect()][:10]
맵리듀스
다음은 하둡에서 사용하는 맵리듀스의 예제를 보겠습니다.
아래 코드는 고객 데이터에서 나라별로 고객이 몇명이 있는지 카운트 하는 맵리듀스 예제입니다.
각 Key값에 대해 1로 카운트하고, 각 Value들을 키에 대해 더하는 것으로 리듀스를 진행합니다.
mapReduced = customerPairs.mapValues(lambda x : 1).reduceByKey(lambda x, y: x + y)
{
i:j
for i, j in mapReduced.collect()
}
여기까지 페어 RDD에 대해 알아보았습니다.