스파크 튜토리얼 - (7) 스파크 SQL
Spark SQL
스파크는 여러 방면에서 SQL을 사용할 수 있도록 지원합니다.
이전 포스팅들에서는 csv나 json파일을 통해 DataFrame을 만드는 방법을 잠깐 설명했습니다.
이 DataFrame 기능이 곧 Spark SQL에 포함되는 기능입니다.
Spark SQL과 DataFrame
SQLContext는 spark session을 내장하고 있습니다.
아래와 같이 입력하게 되면 sqlCtx가 내장하고 있는 SparkSession을 가지고 올 수 있습니다.
sqlCtx.sparkSession
이 내용이 중요한 것은, SparkSession이 파일 로딩 및 JDBC나
여러 Connector를 사용한 SQL서버와의 연결이 가능하기 떄문입니다.
따라서 JDBC나 ODBC를 지원하는 모든 SQL서버는 스파크와 연결될 수 있습니다.
SQL 커넥터
이 포스팅의 예제를 수행하기 위해서는 SQL서버가 필요합니다.
MySQL서버를 설치한 후 예제를 수행해주시기 바랍니다.
저는 이 예제에서 MySQL 8버전을 사용하고 있습니다.
SQL에서 사용하고 있는 테이블은 다음 파일을 import해서 사용했습니다.
스파크가 MySQL과 커넥션을 하기 위한 JDBC Connector를 다운받아 주어야 합니다.
https://dev.mysql.com/downloads/connector/j/
위의 링크에서 Platform Independent를 선택해 다운받고, 필요한 디렉토리에 넣어두겠습니다.
tar명령어를 통해 압축을 풀어줍니다.
SparkContext
위의 connector를 내장하고있는 SparkContext를 생성하겠습니다.
.set함수를 통해 mysql connector를 지정해 줄 수 있습니다.
import pyspark
conf = pyspark.SparkConf()\
.setAppName("spark-sql")\
.set("spark.driver.extraClassPath", "./data/mysql-connector-java-8.0.17/mysql-connector-java-8.0.17.jar")
sc = pyspark.SparkContext(conf=conf)
이제 SQLContext를 생성하고, 그 안에 내장하고 있는 SparkSession을 가져옵니다.
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
spark = sqlCtx.sparkSession
SQL Connection
JDBC를 사용해 SQL을 연결해 주기 위한 준비를 하겠습니다.
MySQL서버에서 spark라는 사용자를 만든 뒤 비밀번호를 spark로 설정해주고,
sql_study라는 DB의 모든 권한을 주었습니다.
명령어는 다음과 같습니다.
create user 'spark'@'%' identified by 'spark';
GRANT ALL PRIVILEGES ON spl_study.* TO 'spark'@'%' WITH GRANT OPTION;
MySQL Connection 설정을 위한 변수를 선언해줍니다.
sql_url = "localhost"
user = "spark"
password = "spark"
database = "sql_study"
table = "orders"
그리고 read의 format을 jdbc로 설정하고, connection을 생성해 줍니다.
자세히 보면 dbtable을 지정하고 있는것을 볼 수 있습니다.
Spark SQL Connection은 테이블을 지정해 주거나, SQL 테스트 명령어를 주어야 합니다.
jdbc = spark.read.format("jdbc")\
.option("driver", "com.mysql.jdbc.Driver")\
.option("url", "jdbc:mysql://{}:3306/{}?serverTimezone=Asia/Seoul ".format(sql_url, database))\
.option("user", user)\
.option("password", password)\
.option("dbtable", table)\
.load()
jdbc.show()
가져온 결과를 보면 테이블 형식인 것을 확인할 수 있습니다.
위의 테이블은 DataFrame이므로 이전 포스팅들에서 사용한 모든 연산을 수행할 수 있습니다.
파일을 통해 테이블 생성
이전 포스팅에서 파일을 통해 테이블을 생성하는 예제가 있었습니다.
코드는 다음과 같습니다.
customers = sqlCtx.read.option("delimiter", ",")\
.option("header", "true")\
.csv("./data/customers.csv")
customers.registerTempTable("customers")
customers
위에서 생성한 테이블에 SQL문을 직접 날릴 수 있습니다.
이는 sql함수를 통해 실행합니다.
sqlCtx.sql('''
SELECT CustomerName, City, Country
FROM customers
LIMIT 10
''').show()
사용자 지정 함수
Hive QL과 같이, Spark SQL은 사용자가 함수를 생성할 수 있습니다.
아래의 코드는 사용자 지정 함수를 sql세션에 등록하는 예제입니다.
from pyspark.sql.types import IntegerType
sqlCtx.registerFunction("stringLength", lambda x: len(x), IntegerType())
length_df = sqlCtx.sql('''
SELECT CustomerName, stringLength(CustomerName) AS NameLength
FROM customers
LIMIT 10
''')
length_df.show()
위에서 정의한 함수는 단순히 string의 size를 카운트하는 함수입니다.
SQL Join
위에서 정의한 두개의 테이블에 Join또한 수행할수 있습니다.
아래의 코드는 Join을 수행하는 예제입니다.
customers.join(jdbc)\
.where(customers["CustomerID"] == jdbc["CustomerID"])\
.select(customers['CustomerID'], 'CustomerName', 'OrderID', 'OrderDate')\
.show()
이와 같이 Spark에서 SQL서버와 연결하고, SQL명령어들을 text를 통해 수행할 수 있습니다.