빅데이터/스파크

스파크 튜토리얼 - (5) 파일 로딩

Johnny Yoon 2019. 8. 11. 11:37
728x90
반응형

파이썬 파일 로딩

Python은 여러 구조화된 파일을 로딩하는데 좋은 라이브러리들을 제공합니다.

이번 포스팅에서는 Python에서 json과 csv파일을 로딩하고,

그것들을 RDD에 로딩하는 방법을 알아보겠습니다.

 

예제에 사용할 파일

에제에서는 두개의 파일을 사용할 것입니다.

json파일과 csv파일을 다운받아 사용하시기 바랍니다.

cars.json
0.00MB
customers.csv
0.01MB

 

json

json파일은 구조화된 파일로, 주로 웹에서 데이터를 주고받을 때 쓰입니다.

파이썬에서 이 파일 포맷을 로딩할 때는 json패키지를 사용합니다.

 

다음은 rdd에서 텍스트 파일 형식으로 읽어온 뒤,

각 line의 json을 로딩하는 예제입니다.

In [5]:
import json
inputJson = sc.textFile("./data/cars.json")\
              .map(lambda x: json.loads(x))
inputJson
Out[5]:
PythonRDD[6] at RDD at PythonRDD.scala:53

 

위의 코드를 보면, 텍스트파일의 각 라인을 json.load로,

파이썬에서 사용 가능한 json포맷으로 바꿔주어 로딩하는 것을 볼 수 있습니다.

 

이제 위에서 로딩한 json을 rdd의 첫 라인을 통해 살펴보겠습니다.

In [6]:
inputJson.first()
Out[6]:
{'brand': 'Ford', 'models': {'name': 'Fiesta', 'price': '14260'}}

 

json파일이 잘 로딩이 된것을 확인할 수 있습니다.

 

csv

이번에는 같은 형식으로 csv파일을 로딩해 주겠습니다.

csv파일은 일반 텍스트 파일과 많이 다를바가 없지만, 콤마 (,)나 탭 (\t)과 같은.

delemeter를 통해 테이블 구조를 표현합니다.

이 때문에 excel에서도 많이 쓰이죠.

 

이 파일의 특별한 점은, header즉 컬럼이름이 있는 줄이 있을 수 있다는 것입니다.

예제 파일은 header를 가지고 있기 때문에 해당 row를 파일에서 지우고 따로 보관해 둡니다.

In [2]:
import csv
from io import StringIO

inputFile = sc.textFile('./data/customers.csv')

# retrieve and remove header line
header = inputFile.first()
inputFile = inputFile.filter(lambda line: line != header)

 

전처리가 완료 되었으면 이제 csv파일을 한줄 씩 로딩하는 코드를 작성합니다.

json파일은 json.load를 사용해 손쉽게 해결했지만,

csv파일은 각 라인을 처리하는 함수를 따로 작성해 주었습니다.

In [3]:
def loadRecord(line):
    inputLine = StringIO(line)
    reader = csv.DictReader(inputLine, fieldnames=header.split(","))
    return next(reader)

 

위 코드를 잘 보시면, 각 라인마다 header를 지정해 주고 있습니다.

파일을 로딩한 결과는 아래의 코드에서 확인할 수 있습니다.

rdd의 첫 줄을 출력해 확인해 줍니다.

In [4]:
inputCSV = inputFile.map(loadRecord)
inputCSV.first()
Out[4]:
OrderedDict([('CustomerID', '1'),
             ('CustomerName', 'Alfreds Futterkiste'),
             ('ContactName', 'Maria Anders'),
             ('Address', 'Obere Str. 57'),
             ('City', 'Berlin'),
             ('PostalCode', '12209'),
             ('Country', 'Germany')])

 

Spark SQL

Spark SQL에 대해서는 추후에 다른 포스팅에서 더 자세하게 살펴볼 것이지만,

Spark SQL에서도 구조화된 파일을 로딩해, 테이블 처럼 사용하고,

Spark에서 제공하는 SQL기능을 사용해 손쉽게 데이터를 조회할 수 있습니다.

 

json

json파일을 로딩해서 SQL테이블과 같이 사용하는 방법은 다음과 같습니다.

하둡에서 사용하는 Hive를 사용하는 HiveContext를 가져옵니다.

이 때 Spark는 DataFraem이라는 구조체를 사용합니다.

DataFrame에 대해서는 추후에 Spark SQL에서 설명하겠지만, 테이블과 비슷한 구조라고 이해하시면 됩니다.

In [18]:
from pyspark.sql import HiveContext

hiveCtx = HiveContext(sc)
cars = hiveCtx.read.json("./data/cars.json")
cars.registerTempTable("cars")
cars
Out[18]:
DataFrame[brand: string, models: struct<name:string,price:string>]

 

위 코드를 보시면, json파일을 통해 hive 테이블을 생성합니다.

결과를 보면 DataFrame이라는 구조체와 함께, 스키마를 설명하고 있는 것을 볼 수 있습니다.

 

아래는 생성한 hive테이블에서 데이터를 조회하는 코드입니다.

이 때 SQL과 거의 유사한 HiveQL을 사용하게 됩니다.

In [19]:
carsResult = hiveCtx.sql("SELECT brand, models.name FROM cars")
carsResult.collect()
Out[19]:
[Row(brand='Ford', name='Fiesta'),
 Row(brand='Ford', name='Focus'),
 Row(brand='Ford', name='Mustang'),
 Row(brand='BMW', name='320'),
 Row(brand='BMW', name='X3'),
 Row(brand='BMW', name='X5'),
 Row(brand='Fiat', name='500')]

 

csv

csv역시 HiveContext를 통해 손쉽게 가져와 테이블로 생성할 수 있습니다.

아래는 같은 방식으로 csv를 DataFrame으로 만드는 코드 입니다.

In [21]:
customers = hiveCtx.read.option("delimiter", ",")\
                        .option("header", "true")\
                        .csv("./data/customers.csv")
customers.registerTempTable("customers")
customers
Out[21]:
DataFrame[CustomerID: string, CustomerName: string, ContactName: string, Address: string, City: string, PostalCode: string, Country: string]

 

역시 비슷한 방식으로 HQL을 사용해 데이터를 조회합니다.

customer테이블은 레코드가 더 많기 때문에, 조회한 결과 중 상위 10개만 출력합니다.

In [22]:
customerResult = hiveCtx.sql("SELECT CustomerName, City FROM customers")
customerResult.collect()[:10]
Out[22]:
[Row(CustomerName='Alfreds Futterkiste', City='Berlin'),
 Row(CustomerName='Ana Trujillo Emparedados y helados', City='Mexico D.F.'),
 Row(CustomerName='Antonio Moreno Taqueria', City='Mexico D.F.'),
 Row(CustomerName='Around the Horn', City='London'),
 Row(CustomerName='Berglunds snabbkop', City='Luleda'),
 Row(CustomerName='Blauer See Delikatessen', City='Mannheim'),
 Row(CustomerName='Blondel pere et fils', City='Strasbourg'),
 Row(CustomerName='Bolido Comidas preparadas', City='Madrid'),
 Row(CustomerName="Bon app'", City='Marseille'),
 Row(CustomerName='Bottom-Dollar Marketse', City='Tsawassen')]

 

 

SQL을 사용해 보신 분은 느끼시겠지만,

메모리에 로딩해놓은 임시 Hive테이블을 사용하기 때문에

SQL에 비해서 속도가 월등하게 빠른것을 느끼실 것입니다.

 

728x90
반응형