728x90
반응형

빅데이터 41

카프카 - (5) 컨슈머

카프카 프로듀서 카프카 브로커를 서버, 파티션을 저장소라고 가정하면, 카프카 컨슈머는 카프카로부터 데이터를 제공받아 소비하는 클라이언트에 해당한다. 메시지 소비 지난 포스팅에서 생성한 토픽 kafka-test에 메시지를 소비해보자. 메시지를 소비하는 것 역시 bin 디렉토리의 스크립트 중 하나를 활용한다. --from-beginning 옵션을 주면 처리 토픽에서 보관중인 첫 오프셋의 데이터부터 모두 받아오게 된다. bin/kafka-console-consumer.sh --bootstrap-server johnny:9092 \ --topic kafka-test \ --from-beginning value1 value2 value1 value2 위의 예제에서는 value값만 가져온다. 따라서 "print.k..

카프카 - (4) 프로듀서

카프카 프로듀서 카프카 브로커를 서버, 파티션을 저장소라고 가정하면, 카프카 프로듀서는 카프카에게 데이터를 제공하는 클라이언트에 해당한다. 메시지 전송 지난 포스팅에서 생성한 토픽 kafka-test에 메시지를 전송해보자. 메시지를 전송하는 것 역시 bin 디렉토리의 스크립트 중 하나를 활용한다. bin/kafka-console-producer.sh --bootstrap-server johnny:9092 -- topic kafka-test --property "parse.key=true" --property "key.seperator=";" > key1;value1 > key2;value2 parse.key 프로퍼티는 전송할 메시지에 키를 추가하는 옵션이다. key.seperator 프로퍼티는 전송할 메..

카프카 - (3) 토픽

카프카 토픽 생성 먼저 지난번 포스팅에서 사용한 네트워크를 통한 통신으로 토픽을 생성해본다. 로컬 기기의 kafka_2.12-2.5.0 디렉토리에서 다음 커맨드를 실행해 준다. * 지난 포스팅에서 카프카가 설치된 서버를 /etc/hosts에 johnny로 등록해준 것을 기억하자. bin/kafka-topics.sh --create --bootstrap-server johnny:9092 --topic kafka-test 토픽 생성이 성공적으로 되었다면 다음과 같은 메시지를 출력할 것이다. Created topic kafka-test. 토픽 생성 규칙 카프카 토픽은 영어 대소문자와 숫자, 그리고 대쉬(-), 언더스코더(_), 마침표(.) 로 조합할 수 있다. 한가지 특별한 규칙이 있다면, 언더스코어(_)와 ..

카프카 - (2) 설치

카프카 설치 카프카는 자바기반의 언어로 작성되었기 때문에 구동하기 위해서는 자바가 필요하다. 따라서 자바를 설치해 주어야 한다. 해당 포스팅에서는 자바 1.8이 사용되었고, 리눅스 환경에서 실행 되었다. 카프카를 설치하는 서버는 virtualbox와 같은 vm이나 aws와 같은 클라우드 가상환경을 사용하는 것을 추천한다. 자바 설치가 완료되면 카프카를 설치한다. wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz 설치가 완료되면 tar 파일의 압축을 풀어준다. tar xvf kafka_2.12-2.5.0.tgz 다음은 카프카가 사용할 Heap의 사이즈를 정해주어야 한다. home 디렉토리의 .bashrc 파일에 다음 한줄을 넣어주어..

카프카 - (1) 개념

카프카란? 카프카는 파편화된 데이터 수집 및 분배 작업을 위해 링크드인에서 고안된 분산 큐(queue) 시스템이다. 상용 서비스를 개발하다 보면 서비스에서 생겨나는 여러 다른 데이터를 여러 다른 시스템으로 전송하게 된다. 이러한 데이터들을 단일화된 시스템으로 처리하지 않는다면 시스템이 커질수록 관리가 어려워진다. 카프카는 이러한 문제를 하나의 확장 가능한 시스템에서 처리하기 위한 시스템이다. 빅데이터 시스템 하둡과 같은 빅데이터 시스템은 주로 두가지의 코어한 개념을 지닌다. 바로 고가용성(High Availability)과 장애허용성(Fault Tolerance) 이다. 빅데이터 시스템을 공부할 때에는 이 두가지 개념을 항상 기억하고 학습하면 도움이 된다. 고가용성 (High Availability) 고..

스파크 튜토리얼 - (8) 스파크 스트리밍

Spark Streaming 스트리밍이랑 실시간으로 끊임없이 들어오는 데이터를 의미합니다. Spark Streaming이란 이렇게 실시간으로 들어오는 데이터를 처리하기 위한 모듈입니다. 이러한 스트리밍 데이터는 개발자가 지정한 단위의 시간동안 들어온 데이터를, 묶음으로 Batch 처리를 하게 됩니다. 아래의 그림을 보면 이해가 빠를 것입니다. 이렇게 들어오는 데이터 소스는, Apache Kafka, Kinesis와 같은 메시지 서버일 가능성이 높습니다. 하지만 Kafka나 Kinesis를 실습하기에는 너무 길어지기에, 파이썬 웹소켓을 통해 스트리밍 데이터를 보내보도록 하겠습니다. Spark Streaming Context 생성 Spark Streaming은 pyspark.streaming패키지에서 불러올..

스파크 튜토리얼 - (7) 스파크 SQL

Spark SQL 스파크는 여러 방면에서 SQL을 사용할 수 있도록 지원합니다. 이전 포스팅들에서는 csv나 json파일을 통해 DataFrame을 만드는 방법을 잠깐 설명했습니다. 이 DataFrame 기능이 곧 Spark SQL에 포함되는 기능입니다. Spark SQL과 DataFrame SQLContext는 spark session을 내장하고 있습니다. 아래와 같이 입력하게 되면 sqlCtx가 내장하고 있는 SparkSession을 가지고 올 수 있습니다. In [ ]: sqlCtx.sparkSession 이 내용이 중요한 것은, SparkSession이 파일 로딩 및 JDBC나 여러 Connector를 사용한 SQL서버와의 연결이 가능하기 떄문입니다. 따라서 JDBC나 ODBC를 지원하는 모든 S..

스파크 튜토리얼 - (6) 데이터프레임

DataFrame DataFrame은 테이블처럼 구조화된 데이터로, 스키마를 표현할 수 있는 RDD의 확장 구조체입니다. python의 pandas나 SQL을 써본사람이라면, 이해가 빠를것입니다. Row DataFrame의 행을 Row라고 합니다.. Row들이 곧 하나의 레코드가 되고, Row들이 RDD를 내장함으로서, 효과적으로 데이터를 접근할 수 있게 해줍니다. 데이터 지난번 포스팅에서 사용했던 json데이터를 사용하겠습니다. 다운로드 링크는 아래에 있습니다. DataFrame 생성 데이터프레임은 크게 두가지 방법으로 생성할 수 있습니다. 스파크 세션을 통해 직접 생성 SQL컨텍스트의 테이블을 통해 생성 두가지 방법 모두 스파크 내부에서의 동작은 크게 다르지 않습니다. 각각의 방법을 살펴보도록 하겠습..

스파크 튜토리얼 - (5) 파일 로딩

파이썬 파일 로딩 Python은 여러 구조화된 파일을 로딩하는데 좋은 라이브러리들을 제공합니다. 이번 포스팅에서는 Python에서 json과 csv파일을 로딩하고, 그것들을 RDD에 로딩하는 방법을 알아보겠습니다. 예제에 사용할 파일 에제에서는 두개의 파일을 사용할 것입니다. json파일과 csv파일을 다운받아 사용하시기 바랍니다. json json파일은 구조화된 파일로, 주로 웹에서 데이터를 주고받을 때 쓰입니다. 파이썬에서 이 파일 포맷을 로딩할 때는 json패키지를 사용합니다. 다음은 rdd에서 텍스트 파일 형식으로 읽어온 뒤, 각 line의 json을 로딩하는 예제입니다. In [5]: import json inputJson = sc.textFile("./data/cars.json")\ .map(..

스파크 튜토리얼 - (4) 페어 RDD

페어 RDD 페어 RDD란 key-value쌍으로 이루어진 RDD를 말합니다. 파이썬 에서는 Tuple로 이뤄진 RDD가 곧 페어 RDD가 됩니다. 페어 RDD 생성 먼저 간단하게 parallelize메소드를 사용해 int key-value페어로 이뤄진 페어RDD를 생성하겠습니다. In [46]: examplePairRDD = sc.parallelize([(1, 3), (1, 5), (2, 4), (3, 3), (4, 8), (4, 2), (3, 1)]) examplePairRDD Out[46]: ParallelCollectionRDD[77] at parallelize at PythonRDD.scala:194 페어 RDD 트랜스포메이션 페어 RDD는 기본 RDD에서 사용 가능한 메소드들은 모두 사용할 수..

728x90
반응형