빅데이터/스파크

스파크 튜토리얼 - (7) 스파크 SQL

Johnny Yoon 2019. 8. 13. 19:27
728x90
반응형

Spark SQL

스파크는 여러 방면에서 SQL을 사용할 수 있도록 지원합니다.

이전 포스팅들에서는 csv나 json파일을 통해 DataFrame을 만드는 방법을 잠깐 설명했습니다.

이 DataFrame 기능이 곧 Spark SQL에 포함되는 기능입니다.

 

 

Spark SQL과 DataFrame

SQLContext는 spark session을 내장하고 있습니다.

아래와 같이 입력하게 되면 sqlCtx가 내장하고 있는 SparkSession을 가지고 올 수 있습니다.

In [ ]:
sqlCtx.sparkSession

 

이 내용이 중요한 것은, SparkSession이 파일 로딩 및 JDBC나

여러 Connector를 사용한 SQL서버와의 연결이 가능하기 떄문입니다.

따라서 JDBC나 ODBC를 지원하는 모든 SQL서버는 스파크와 연결될 수 있습니다.

 

SQL 커넥터

이 포스팅의 예제를 수행하기 위해서는 SQL서버가 필요합니다.

MySQL서버를 설치한 후 예제를 수행해주시기 바랍니다.

저는 이 예제에서 MySQL 8버전을 사용하고 있습니다.

 

SQL에서 사용하고 있는 테이블은 다음 파일을 import해서 사용했습니다.

orders.csv
0.00MB

 

스파크가 MySQL과 커넥션을 하기 위한 JDBC Connector를 다운받아 주어야 합니다.

https://dev.mysql.com/downloads/connector/j/

위의 링크에서 Platform Independent를 선택해 다운받고, 필요한 디렉토리에 넣어두겠습니다.

tar명령어를 통해 압축을 풀어줍니다.

 

SparkContext

위의 connector를 내장하고있는 SparkContext를 생성하겠습니다.

.set함수를 통해 mysql connector를 지정해 줄 수 있습니다.

In [1]:
import pyspark

conf = pyspark.SparkConf()\
        .setAppName("spark-sql")\
        .set("spark.driver.extraClassPath", "./data/mysql-connector-java-8.0.17/mysql-connector-java-8.0.17.jar")
sc = pyspark.SparkContext(conf=conf)

 

이제 SQLContext를 생성하고, 그 안에 내장하고 있는 SparkSession을 가져옵니다.

In [2]:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
spark = sqlCtx.sparkSession

 

SQL Connection

JDBC를 사용해 SQL을 연결해 주기 위한 준비를 하겠습니다.

MySQL서버에서 spark라는 사용자를 만든 뒤 비밀번호를 spark로 설정해주고,

sql_study라는 DB의 모든 권한을 주었습니다.

명령어는 다음과 같습니다.

In [ ]:
create user 'spark'@'%' identified by 'spark';
GRANT ALL PRIVILEGES ON spl_study.* TO 'spark'@'%' WITH GRANT OPTION;

 

MySQL Connection 설정을 위한 변수를 선언해줍니다.

In [3]:
sql_url = "localhost"
user = "spark"
password = "spark"
database = "sql_study"
table = "orders"

 

그리고 read의 format을 jdbc로 설정하고, connection을 생성해 줍니다.

자세히 보면 dbtable을 지정하고 있는것을 볼 수 있습니다.

Spark SQL Connection은 테이블을 지정해 주거나, SQL 테스트 명령어를 주어야 합니다.

In [5]:
jdbc = spark.read.format("jdbc")\
                .option("driver", "com.mysql.jdbc.Driver")\
                .option("url", "jdbc:mysql://{}:3306/{}?serverTimezone=Asia/Seoul ".format(sql_url, database))\
                .option("user", user)\
                .option("password", password)\
                .option("dbtable", table)\
                .load()
jdbc.show()
 
+-------+----------+----------+-------------------+---------+
|OrderID|CustomerID|EmployeeID|          OrderDate|ShipperID|
+-------+----------+----------+-------------------+---------+
|  10248|        90|         5|1996-07-04 00:00:00|        3|
|  10249|        81|         6|1996-07-05 00:00:00|        1|
|  10250|        34|         4|1996-07-08 00:00:00|        2|
|  10251|        84|         3|1996-07-08 00:00:00|        1|
|  10252|        76|         4|1996-07-09 00:00:00|        2|
|  10253|        34|         3|1996-07-10 00:00:00|        2|
|  10254|        14|         5|1996-07-11 00:00:00|        2|
|  10255|        68|         9|1996-07-12 00:00:00|        3|
|  10256|        88|         3|1996-07-15 00:00:00|        2|
|  10257|        35|         4|1996-07-16 00:00:00|        3|
|  10258|        20|         1|1996-07-17 00:00:00|        1|
|  10259|        13|         4|1996-07-18 00:00:00|        3|
|  10260|        55|         4|1996-07-19 00:00:00|        1|
|  10261|        61|         4|1996-07-19 00:00:00|        2|
|  10262|        65|         8|1996-07-22 00:00:00|        3|
|  10263|        20|         9|1996-07-23 00:00:00|        3|
|  10264|        24|         6|1996-07-24 00:00:00|        3|
|  10265|         7|         2|1996-07-25 00:00:00|        1|
|  10266|        87|         3|1996-07-26 00:00:00|        3|
|  10267|        25|         4|1996-07-29 00:00:00|        1|
+-------+----------+----------+-------------------+---------+
only showing top 20 rows

 

가져온 결과를 보면 테이블 형식인 것을 확인할 수 있습니다.

위의 테이블은 DataFrame이므로 이전 포스팅들에서 사용한 모든 연산을 수행할 수 있습니다.

 

파일을 통해 테이블 생성

이전 포스팅에서 파일을 통해 테이블을 생성하는 예제가 있었습니다.

코드는 다음과 같습니다.

In [8]:
customers = sqlCtx.read.option("delimiter", ",")\
                        .option("header", "true")\
                        .csv("./data/customers.csv")
customers.registerTempTable("customers")
customers
Out[8]:
DataFrame[CustomerID: string, CustomerName: string, ContactName: string, Address: string, City: string, PostalCode: string, Country: string]

 

위에서 생성한 테이블에 SQL문을 직접 날릴 수 있습니다.

이는 sql함수를 통해 실행합니다.

In [10]:
sqlCtx.sql('''
SELECT CustomerName, City, Country 
FROM customers 
LIMIT 10
''').show()
 
+--------------------+-----------+-------+
|        CustomerName|       City|Country|
+--------------------+-----------+-------+
| Alfreds Futterkiste|     Berlin|Germany|
|Ana Trujillo Empa...|Mexico D.F.| Mexico|
|Antonio Moreno Ta...|Mexico D.F.| Mexico|
|     Around the Horn|     London|     UK|
|  Berglunds snabbkop|     Luleda| Sweden|
|Blauer See Delika...|   Mannheim|Germany|
|Blondel pere et fils| Strasbourg| France|
|Bolido Comidas pr...|     Madrid|  Spain|
|            Bon app'|  Marseille| France|
|Bottom-Dollar Mar...|  Tsawassen| Canada|
+--------------------+-----------+-------+

 

사용자 지정 함수

Hive QL과 같이, Spark SQL은 사용자가 함수를 생성할 수 있습니다.

아래의 코드는 사용자 지정 함수를 sql세션에 등록하는 예제입니다.

In [11]:
from pyspark.sql.types import IntegerType

sqlCtx.registerFunction("stringLength", lambda x: len(x), IntegerType())
length_df = sqlCtx.sql('''
SELECT CustomerName, stringLength(CustomerName) AS NameLength
FROM customers
LIMIT 10
''')
length_df.show()
 
+--------------------+----------+
|        CustomerName|NameLength|
+--------------------+----------+
| Alfreds Futterkiste|        19|
|Ana Trujillo Empa...|        34|
|Antonio Moreno Ta...|        23|
|     Around the Horn|        15|
|  Berglunds snabbkop|        18|
|Blauer See Delika...|        23|
|Blondel pere et fils|        20|
|Bolido Comidas pr...|        25|
|            Bon app'|         8|
|Bottom-Dollar Mar...|        22|
+--------------------+----------+

 

위에서 정의한 함수는 단순히 string의 size를 카운트하는 함수입니다.

 

SQL Join

위에서 정의한 두개의 테이블에 Join또한 수행할수 있습니다.

아래의 코드는 Join을 수행하는 예제입니다.

In [22]:
customers.join(jdbc)\
        .where(customers["CustomerID"] == jdbc["CustomerID"])\
        .select(customers['CustomerID'], 'CustomerName', 'OrderID', 'OrderDate')\
        .show()
 
+----------+--------------------+-------+-------------------+
|CustomerID|        CustomerName|OrderID|          OrderDate|
+----------+--------------------+-------+-------------------+
|        90|         Wilman Kala|  10248|1996-07-04 00:00:00|
|        81|Tradicao Hipermer...|  10249|1996-07-05 00:00:00|
|        34|       Hanari Carnes|  10250|1996-07-08 00:00:00|
|        84|Victuailles en stock|  10251|1996-07-08 00:00:00|
|        76|    Supremes delices|  10252|1996-07-09 00:00:00|
|        34|       Hanari Carnes|  10253|1996-07-10 00:00:00|
|        14|   Chop-suey Chinese|  10254|1996-07-11 00:00:00|
|        68|  Richter Supermarkt|  10255|1996-07-12 00:00:00|
|        88|Wellington Import...|  10256|1996-07-15 00:00:00|
|        35|    HILARIoN-Abastos|  10257|1996-07-16 00:00:00|
|        20|        Ernst Handel|  10258|1996-07-17 00:00:00|
|        13|Centro comercial ...|  10259|1996-07-18 00:00:00|
|        55|Old World Delicat...|  10260|1996-07-19 00:00:00|
|        61|         Que Delicia|  10261|1996-07-19 00:00:00|
|        65|Rattlesnake Canyo...|  10262|1996-07-22 00:00:00|
|        20|        Ernst Handel|  10263|1996-07-23 00:00:00|
|        24|      Folk och fa HB|  10264|1996-07-24 00:00:00|
|         7|Blondel pere et fils|  10265|1996-07-25 00:00:00|
|        87|      Wartian Herkku|  10266|1996-07-26 00:00:00|
|        25|      Frankenversand|  10267|1996-07-29 00:00:00|
+----------+--------------------+-------+-------------------+
only showing top 20 rows

 

 

이와 같이 Spark에서 SQL서버와 연결하고, SQL명령어들을 text를 통해 수행할 수 있습니다.

 

728x90
반응형