빅데이터/스파크

스파크 튜토리얼 - (3) RDD

_금융덕후_ 2019. 7. 29. 18:42
728x90
반응형

RDD

RDD는 Resilient Distributed Dataset의 약자입니다. 직역하면 탄력 분산 데이터셋이 되겠습니다.

이는 분산되어 존재하는 데이터들의 모임, 즉 클러스터에 분배되어 있는 데이터들을

하나로 관리하는 개념이라고 생각하면 편할 것 같습니다.

스파크의 모든 데이터 타입들은 RDD를 기반으로 만들어져 있고,

데이터끼리의 연산들은 RDD의 연산으로 이루어져 있습니다.

RDD는 HDFS의 파일과 같이 변경이 불가능한, 즉 쓰기가 불가능한 데이터입니다.

 

RDD 연산

RDD는 두가지 연산으로 이루어져 있습니다.

  • Transformation
  • Action

 

Transformation

트랜스포메이션은 RDD끼리의 연산입니다.

이전 포스팅에서 사용했던 filter메소드 역시 트랜스포메이션의 일종입니다.

트랜스포메이션은 호출이 될 때 바로 수행되지는 않습니다.

이를 스파크에서는 Lazy Execution 또는 Lazy Evaluation이라고 하는데,

액션을 취할때까지 기다렸다가 모든 트랜스포메이션 작업을 취합해 가장 효율적인 계산을 수행하게 됩니다.

위에서 언급했듯이 RDD는 쓰기가 불가능한 데이터셋입니다.

따라서 트랜스포메이션이 행해지면, 기존 RDD에 수행되는것이 아니라,

새로운 RDD를 만들어내고 그 새로운 RDD에 수행 결과가 적용되게 됩니다.

 

Action

액션은 트랜스포메이션들이 한번 이상 행해지고 나서 실제 Evaluation이 이뤄지는 작업입니다.

이전 포스팅에서 사용했던 first메소드가 액션의 일종입니다.

 

예제)

액션이 정말 Lazy Execution을 수행하는지 확인하려면, 한가지 실험을 해보면 됩니다.

먼저 README.md를 주피터의 workspace에서 옮기거나 지우고, 텍스트파일을 로드 해보겠습니다.

 

In [2]:
readmeRDD = sc.textFile('README.md')
readmeRDD
Out[2]:
README.md MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:0

 

원래의 파이썬에서 이러한 로직이 수행되었다면 곧바로 파일을 찾을 수 없다는 에러가 났을것입니다.

 

이제 first함수를 수행해 보겠습니다. 이 때 비로소 에러가 나는것을 확인할 수 있습니다.

 

In [3]:
sparkLines = readmeRDD.filter(lambda x : "Spark" in x)
sparkLines
Out[3]:
PythonRDD[3] at RDD at PythonRDD.scala:52

 

이를 통해 우리는 RDD가 Lazy Execution을 수행한다는 것을 확인할 수 있습니다.

 

RDD 생성

RDD는 기본적으로 두가지 방법으로 생성될 수 있습니다.

데이터를 직접 만드는 방법, 그리고 외부 데이터를 로드하는 방법입니다.

데이터를 직접 만드는 방법은 아래와 같이 parallelize메소드를 사용해 생성합니다.

 

In [1]:
linesRDD = sc.parallelize(["test", "this is a test rdd"])
linesRDD
Out[1]:
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:194

 

데이터를 외부에서 로드하는 방식은 위의 Readme를 로드했던 방식과 동일합니다.

 

RDD 트랜스포메이션

트랜스포메이션은 기존 RDD에 연산을 수행한 새로운 RDD를 만들어 돌려주는 연산 방식입니다.

RDD의 수행 결과가 트랜스포메이션인지, 액션인지 확인하고 싶을 때는 결과를 출력해보면 됩니다.

Lazy Execution때문에 트랜스포메이션의 결과는 실제 수행 결과가 아니라 RDD를 리턴힙니다.

 

In [3]:
sparkLines = readmeRDD.filter(lambda x : "Spark" in x)
sparkLines
Out[3]:
PythonRDD[3] at RDD at PythonRDD.scala:52

 

RDD 액션

액션은 기존의 트랜스포메이션 연산을 한번에 최적의 경로를 계산해 수행한 뒤, 결과를 리턴하는 연산 방식입니다.

위의 sparkLines의 연산 결과를 출력하려면 다음과 같이 실행하면 됩니다.

 

In [4]:
sparkLines.count()
Out[4]:
20

 

자주 쓰는 RDD 연산

아래의 함수들은 위에서 본 텍스트와 같이 기본 타입의 RDD에 많이 쓰이는 연산들 입니다.

파이썬이나 유사한 함수형을 지원하는 언어를 써보신 분들이라면 아래의 연산들이 많이 익숙할 것입니다.

map

map은 각 원소들에 무언가를 수행하고 싶을 때 사용하는 연산입니다.

아래는 각 원소들을 제곱하는 map연산입니다.

 

In [17]:
numbers = sc.parallelize(list(range(5)))
squared = numbers.map(lambda x : x * x).collect()
squared
Out[17]:
[0, 1, 4, 9, 16]

 

flatmap

flatmap은 중첩된 리스트들의 원소를 하나의 리스트로 flatten해서 리턴하는 연산입니다.

아래는 string의 각 원소들은 스페이스 단위로 나눈 뒤 하나의 리스트로 반환하는 연산입니다.

 

In [20]:
strings = sc.parallelize(["hello spark", "hi python"])
splitted = strings.flatMap(lambda x : x.split(" ")).collect()
splitted
Out[20]:
['hello', 'spark', 'hi', 'python']

 

filter

filter는 말그대로 원소들을 조건으로 필터링하는 연산입니다.

아래는 리스트에서 2의 배수를 필터링하는 예제입니다.

 

In [21]:
numbers = sc.parallelize(list(range(1, 30, 3)))
result = numbers.filter(lambda x : x % 2 == 0).collect()
result
Out[21]:
[4, 10, 16, 22, 28]

 

728x90
반응형