빅데이터/스파크

스파크 튜토리얼 - (4) 페어 RDD

Johnny Yoon 2019. 8. 9. 19:47
728x90
반응형

페어 RDD

페어 RDD란 key-value쌍으로 이루어진 RDD를 말합니다.

파이썬 에서는 Tuple로 이뤄진 RDD가 곧 페어 RDD가 됩니다.

 

페어 RDD 생성

먼저 간단하게 parallelize메소드를 사용해 int key-value페어로 이뤄진 페어RDD를 생성하겠습니다.

In [46]:
examplePairRDD = sc.parallelize([(1, 3), (1, 5), (2, 4), (3, 3), (4, 8), (4, 2), (3, 1)])
examplePairRDD
Out[46]:
ParallelCollectionRDD[77] at parallelize at PythonRDD.scala:194

 

페어 RDD 트랜스포메이션

페어 RDD는 기본 RDD에서 사용 가능한 메소드들은 모두 사용할 수 있다.

또한 추가적으로 key-value쌍으로 작업하기 편한 메소드들 역시 제공된다.

대표적인 페어 RDD 메소드들은 다음 표에서 확인할 수 있다.

각 키에 대해 연산을 적용 rdd.mapValues(lambda x : x + 1)
림스멮미뎐(려추) 메소드
목적 예)
reduceByKey(func) 동일 키에 대한 값들을 reduce rdd.reduceByKey(lambda x, y: x + y)
groupByKey() 동일 키에 대한 값들을 group rdd.groupByKey()

combineByKey(createCombiner,

mergeValue,

mergeCombiners,

partitioner)

다른 결과의 타입을 써서 동일 키의 값들을 combine 생략
mapValues(func) 각 키에 대해 연산을 적용 rdd.mapValues(lambda x : x + 1)
keys() 키값들을 리턴 rdd.keys()
values() value값들을 리턴 rdd.values()
sortByKey() 키로 정렬한 RDD 리턴 rdd.sortByKey()

 

reduceByKey

위에서 생성한 페어 RDD로 몇가지 예제를 실행해 보려 합니다.

먼저 아래는 reduceByKey를 사용해 각 key에 대한 value들을 모두 더하는 예제입니다.

In [49]:
{
    i:j 
    for i, j in examplePairRDD.reduceByKey(lambda x, y : x + y).collect()
}
Out[49]:
{1: 8, 2: 4, 3: 4, 4: 10}

 

예제를 보면, collect함수는 iterable한 결과값만을 리턴하기 때문에 dict comprehension을 사용해 dict를 만들어 주었습니다.

 

mapValues

다음은 각 원소들의 value값을 방문해 제곱을 해주는 예제입니다.

In [50]:
{
    i:j
    for i, j in examplePairRDD.mapValues(lambda x: x**2).collect()
}
Out[50]:
{1: 25, 2: 16, 3: 1, 4: 4}

 

고객 데이터

w3school사이트에 있는 데이터를 조금 수정해서, 고객 데이터를 key-value쌍으로 분석해 보겠습니다.

https://www.w3schools.com/sql

 

아래 테이블은 데이터에서 "고객 이름"과 "나라"만 뽑은 데이터의 샘플입니다

CustomerName Country
Alfreds Futterkiste Germany
Ana Trujillo Emparedados y helados Mexico
Antonio Moreno Taqueria Mexico
Around the Horn UK
Berglunds snabbkop Sweden

데이터는 아래 파일에 있습니다. 편의를 위해 컬럼명은 제거했습니다.

name-customers.csv
0.00MB

 

텍스트 데이터 로딩

먼저 지난번 포스팅에서와 같이 텍스트 데이터를 로딩하겠습니다.

아래는 데이터를 로딩 해 첫번째 라인을 뽑아본 것입니다.

In [8]:
customerLines = sc.textFile("name-customers.csv")
customerLines.first()
Out[8]:
'Alfreds Futterkiste,Germany'

 

위의 데이터를 전처리 해주겠습니다.

map을 사용해 각 값들을 방문하면서, 콤마 (,)로 split하고 튜플로 리턴하게 됩니다.

In [21]:
customerPairs = customerLines.map(lambda x: (x.split(",")[1], x.split(",")[0]))
customerPairs
Out[21]:
PythonRDD[54] at RDD at PythonRDD.scala:52

 

groupByKey

groupByKey는 하나의 Key에 대한 여러 값들을 리스트 형태로 리턴합니다.

다음 예제는 나라별로 고객의 이름을 리스트 형태로 리턴하게 됩니다.

그리고 해당 데이터로 dict를 만들고 UK에 사는 고객들만 출력해 보았습니다.

In [53]:
customerPairCollected = customerPairs.groupByKey().collect()
customerDict = {
    country : [c for c in customers]
    for country, customers in customerPairCollected
}
customerDict['UK']
Out[53]:
['Around the Horn',
 "B's Beverages",
 'Consolidated Holdings',
 'Eastern Connection',
 'Island Trading',
 'North/South',
 'Seven Seas Imports']

 

sortByKey

다음 예제는 RDD를 Key값으로 정렬하고, 키값들만 뽑아 상위 10개를 출력한 결과입니다.

In [54]:
[k for k in customerPairs.sortByKey().keys().collect()][:10]
Out[54]:
['Argentina',
 'Argentina',
 'Argentina',
 'Austria',
 'Austria',
 'Belgium',
 'Belgium',
 'Brazil',
 'Brazil',
 'Brazil']

 

맵리듀스

다음은 하둡에서 사용하는 맵리듀스의 예제를 보겠습니다.

아래 코드는 고객 데이터에서 나라별로 고객이 몇명이 있는지 카운트 하는 맵리듀스 예제입니다.

각 Key값에 대해 1로 카운트하고, 각 Value들을 키에 대해 더하는 것으로 리듀스를 진행합니다.

In [55]:
mapReduced = customerPairs.mapValues(lambda x : 1).reduceByKey(lambda x, y: x + y)
{
    i:j
    for i, j in mapReduced.collect()
}
Out[55]:
{'Germany': 11,
 'Mexico': 5,
 'UK': 7,
 'Sweden': 2,
 'France': 11,
 'Spain': 5,
 'Canada': 3,
 'Argentina': 3,
 'Switzerland': 2,
 'Brazil': 9,
 'Austria': 2,
 'Italy': 3,
 'Portugal': 2,
 'USA': 13,
 'Venezuela': 4,
 'Ireland': 1,
 'Belgium': 2,
 'Norway': 1,
 'Denmark': 2,
 'Finland': 2,
 'Poland': 1}

 

여기까지 페어 RDD에 대해 알아보았습니다.

 

728x90
반응형