스파크 튜토리얼 - (6) 데이터프레임
DataFrame
DataFrame은 테이블처럼 구조화된 데이터로, 스키마를 표현할 수 있는 RDD의 확장 구조체입니다.
python의 pandas나 SQL을 써본사람이라면, 이해가 빠를것입니다.
Row
DataFrame의 행을 Row라고 합니다..
Row들이 곧 하나의 레코드가 되고, Row들이 RDD를 내장함으로서, 효과적으로 데이터를 접근할 수 있게 해줍니다.
데이터
지난번 포스팅에서 사용했던 json데이터를 사용하겠습니다.
다운로드 링크는 아래에 있습니다.
DataFrame 생성
데이터프레임은 크게 두가지 방법으로 생성할 수 있습니다.
- 스파크 세션을 통해 직접 생성
- SQL컨텍스트의 테이블을 통해 생성
두가지 방법 모두 스파크 내부에서의 동작은 크게 다르지 않습니다.
각각의 방법을 살펴보도록 하겠습니다.
먼저 손쉬운 등록을 위해 SQLContext를 import해주겠습니다.
from pyspark.sql import SQLContext
import json
sqlCtx = SQLContext(sc)
직접 DataFrame 생성
스파크 세션을 통해 직접 생성하는 방법을 살펴보겠습니다.
SQLContext는 SparkSession의 createDataFrame을 내장하고 있습니다.
따라서 아래와 같이 json파일을 RDD로 가져와, RDD를 통해 데이터 프레임을 생성합니다.
jsonRDD = inputJson = sc.textFile("./data/cars.json")\
.map(lambda x: json.loads(x))
cars = sqlCtx.createDataFrame(jsonRDD)
cars
데이터프레임을 잘 살펴보면, 서브테이블인 models는 Map으로 구성되어 있습니다.
Map은 JVM에서 사용하는 key-value쌍의 데이터구조입니다.
생성한 데이터프레임의 스키마를 살펴보겠습니다.
스키마를 보는 명령은 다음과 같습니다.
cars.printSchema()
이제 테이블 자체를 살펴보겠습니다.
테이블을 프린트하는 명령은 다음과 같습니다.
cars.show()
테이블을 잘 살펴보면, -> 화살표가 보일것입니다.
이는 Scala에서 사용하는 포인터라는 개념입니다.
첫 레코드(Row)를 출력해보겠습니다.
첫 레코드를 출력하는것은 RDD의 첫줄을 출력하는 명령과 같습니다.
cars.first()
잘 살펴보면, 서브테이블인 models는 Map구조인 파이썬의 dict와 같은 모양으로 나오는 것을 볼 수 있습니다.
SQL컨텍스트를 통해 생성
다음은 SQL컨텍스트를 통해 데이터프레임을 테이블로 등록해 보겠습니다.
아래 코드를 실행하면, 비슷한 결과를 얻게 됩니다.
자세히 보면 한가지 다른 것은, 서브테이블은 Map이 아니라 struct를 리턴하게 됩니다.
cars = sqlCtx.read.json("./data/cars.json")
cars.registerTempTable("cars")
cars
위와 같이 스키마를 출력해 보겠습니다.
이 역시 위에서 생성한 것과는 조금 다른것을 알 수 있습니다.
cars.printSchema()
이번에는 테이블을 출력해 보겠습니다.
cars.show()
models의 생김새가 위와같은 ->가 없이 파이썬의 리스트 형태로 나오는 것을 볼 수 있습니다.
마지막으로 첫번째 레코드를 출력해 보겠습니다.
cars.first()
이 역시 위와는 다르게 models는 Row로 표현된 것을 볼 수 있습니다.
여러 방면에서 보기에 SQLContext를 통해 생성한 데이터프레임이 파이썬에는 더 적합해보입니다.
아래의 예제들은 SQLContext를 통해 생성된 데이터프레임을 사용하였습니다.
(하지만 아래의 예제는 두 방식 모두 동일하게 동작합니다.)
데이터프레임 연산
데이터프레임을 사용해 테이블 연산을 수행해 보겠습니다.
아래의 연산들은 SQL을 사용해보신 분들이라면 익숙할 것입니다.
select를 통한 컬럼 출력
아래는 select를 사용해 brand컬럼을 출력해 보는 예제 입니다.
sql의 select문과 동일하게 동작합니다.
cars.select("brand").show()
이번에는 서브테이블인 models의 컬럼을 출력해 보겠습니다.
서브테이블 (struct)는 마침표를 사용해 표현합니다.
cars.select("models.price").show()
컬럼 타입 변환
이번에는 컬럼의 타입을 변환해 보겠습니다.
아래는 컬럼 변환을 하기 전에 서브테이블인 models를 brand와 같은 레벨로 flatten한 뒤,
price의 타입을 Integer로 변환시켜준 예제 입니다.
새로운 테이블은 cars_flatten에 저장이 되고, RDD와 마찬가지로,
DataFrame은 내부적으로 연산이 실행되면 새로운 DataFrame을 생성하게 됩니다.
from pyspark.sql.types import IntegerType
cars_flatten = cars.select("brand", "models.*");
cars_flatten = cars_flatten.withColumn("price", cars_flatten["price"].cast(IntegerType()))
cars_flatten.show()
비교 연산
이제 DataFrame에 비교 연산을 해보겠습니다.
아래는 price컬럼이 20000보다 높은 레코드들만 뽑는 예제입니다.
컬럼에 비교 연산을 실행할 때는, []인덱싱을 사용해 컬럼에 비교를 해줍니다.
cars_flatten.filter(cars_flatten["price"] > 20000).show()
집계 연산
마지막으로 집계 연산을 수행해 보겠습니다.
아래는 brand컬럼의 데이터로 그룹핑한 다음, 각 레코드의 수를 count해주는 예제입니다.
cars_flatten.groupBy("brand").count().show()
여기까지 데이터프레임과 관련된 연산들을 알아보았습니다.