빅데이터/스파크

스파크 튜토리얼 - (8) 스파크 스트리밍

Johnny Yoon 2019. 8. 14. 19:57
728x90
반응형

 

Spark Streaming

스트리밍이랑 실시간으로 끊임없이 들어오는 데이터를 의미합니다. 
Spark Streaming이란 이렇게 실시간으로 들어오는 데이터를 처리하기 위한 모듈입니다. 
이러한 스트리밍 데이터는 개발자가 지정한 단위의 시간동안 들어온 데이터를, 묶음으로 Batch 처리를 하게 됩니다.
아래의 그림을 보면 이해가 빠를 것입니다.

출처: Learning Spark, O'rielly

이렇게 들어오는 데이터 소스는, Apache Kafka, Kinesis와 같은 메시지 서버일 가능성이 높습니다.

하지만 Kafka나 Kinesis를 실습하기에는 너무 길어지기에, 파이썬 웹소켓을 통해 스트리밍 데이터를 보내보도록 하겠습니다.

 

 

Spark Streaming Context 생성

Spark Streaming은 pyspark.streaming패키지에서 불러올 수 있고, 
SparkContext를 주입하여 사용 가능합니다. 
그리고 마지막에 주입되는 인자 1은 1초에 한번씩 들어오는 데이터를 처리한다는 의미입니다.

In [1]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext(appName="streaming")
ssc = StreamingContext(sc, 1)

 

 

Socket설정

Spark Streaming은 특정 포트를 통해 데이터를 받아들입니다. 
해당 포트를 9999로 설정해 줍니다.

In [2]:
lines = ssc.socketTextStream("localhost", 9999)
lines, type(lines)
Out[2]:
(<pyspark.streaming.dstream.DStream at 0x7f8724875ba8>,
 pyspark.streaming.dstream.DStream)

 

 

작업 설정

Streaming으로 받아온 텍스트에 수행해줄 작업을 설정합니다. 
간단하게 's' 혹은 'S'가 포함된 라인들을 모두 뽑아 출력하는 작업을 설정했습니다.

In [3]:
words = lines.filter(lambda line : 'h' in line or 'H' in line)
words.pprint()

 

 

스파크 스트리밍 서버

스파크 스트리밍 서버를 구동시켜 보겠습니다.

다음과 같이 입력하면, 스파크 스트리밍 서버가 시작됩니다.

해당 서버는 웹소켓을 사용해 통신하게 됩니다.

In [6]:
ssc.start()
ssc.awaitTermination()

 

 

스트리밍 서버로 데이터 전송

이제 Linux명령어를 사용해 스트리밍 서버로 데이터를 전송해 보겠습니다.

nc -lk localhost 9999
Hello
Spark Spark Spark
Hello Spark

 

명령어를 입력한 뒤 각각의 텍스트를 입력해줍니다.

위를 입력하고 결과를 보면, 텍스트가 잘 들어왔고,

두번째 줄인 "Spark Spark Spark"는 filter작업에 의해 잘 걸러지는 것을 볼 수 있습니다.

 
-------------------------------------------
Time: 2019-08-14 14:15:53
-------------------------------------------
Hello

-------------------------------------------
Time: 2019-08-14 14:15:54
-------------------------------------------

-------------------------------------------
Time: 2019-08-14 14:15:55
-------------------------------------------
Hello Spark

-------------------------------------------
Time: 2019-08-14 14:15:56
-------------------------------------------

 

(스트리밍 서버는 프로세스 종료를 통해 멈춰주었습니다.)

 

파이썬 Socket을 이용한 스트리밍 전송

이번에는 파이썬 코드를 이용해 WebSocket으로 스트리밍 데이터를 전송해 보겠습니다.

사용할 코드는 다음과 같습니다.

import time
import socket

URL = 'localhost'
PORT = 9999

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((URL, PORT))
sock.listen(1)
conn, addr = sock.accept()
with open('./data/dummy.txt') as infile:
  for line in infile.readlines():
    print('sending line: ', line)
    conn.send(line.encode())
    time.sleep(0.3)

conn.close()
sock.close()

 

스트리밍 서버에서 전송할 텍스트파일을 첨부하겠습니다.

해당파일은, 유명한 더미텍스트 몇가지를 모아 문장별로 나눠놓은 것입니다.

dummy.txt
6.3 kB

 

다시 스트리밍 서버를 구동시키겠습니다.

In [7]:
ssc.start()
ssc.awaitTermination()

 

 

파이썬 코드를 실행하면, 웹소켓 서버가 실행되고, 해당 서버에서 텍스트를 0.3초에 한줄 씩 보내게 됩니다.

결과를 확인하시면, 다음과 같이 나옵니다.

 
-------------------------------------------
Time: 2019-08-14 14:17:44
-------------------------------------------
Phasellus nec posuere arcu.
Quisque rhoncus ornare magna viverra maximus.

-------------------------------------------
Time: 2019-08-14 14:17:45
-------------------------------------------

-------------------------------------------
Time: 2019-08-14 14:17:46
-------------------------------------------
Suspendisse vel diam et urna cursus volutpat ut nec nibh.

-------------------------------------------
Time: 2019-08-14 14:17:47
-------------------------------------------

-------------------------------------------
Time: 2019-08-14 14:17:48
-------------------------------------------
Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos.

-------------------------------------------
Time: 2019-08-14 14:17:49
-------------------------------------------
Etiam malesuada tellus quis erat elementum, quis elementum tellus pharetra.

-------------------------------------------
Time: 2019-08-14 14:17:50
-------------------------------------------
Phasellus rutrum imperdiet magna, at pretium enim.
Donec ornare vehicula purus et posuere.

-------------------------------------------
Time: 2019-08-14 14:17:51
-------------------------------------------

-------------------------------------------
Time: 2019-08-14 14:17:52
-------------------------------------------
Phasellus non rutrum ipsum.

-------------------------------------------
Time: 2019-08-14 14:17:53
-------------------------------------------
Nunc a tincidunt nibh, id ullamcorper ante.

-------------------------------------------
Time: 2019-08-14 14:17:54
-------------------------------------------
Aliquam ultricies sed nisi vehicula rutrum.

-------------------------------------------
Time: 2019-08-14 14:17:55
-------------------------------------------

-------------------------------------------
Time: 2019-08-14 14:17:56
-------------------------------------------
Vestibulum imperdiet, tortor sit amet bibendum euismod, diam nisl aliquam elit, vehicula laoreet libero enim eu tortor.

-------------------------------------------
Time: 2019-08-14 14:17:57
-------------------------------------------

 

결과를 자세히 보면 걸러진 데이터의 양이 제각각인 것을 볼 수 있습니다.

보내는 쪽에서는 0.3초에 한개씩 보내고, 스파크는 1초에 한번씩 배치처리를 하기 때문에,

양이 제각각이 되는 것입니다.

 

여기까지 스파크 스트리밍에 대해서 알아보았습니다.

 

 

 

728x90
반응형