본문 바로가기
Big Data/Apache Spark

[Spark] Spark Streaming 의 종류 (DStream, Structured Streaming)

by seaweed_one 2023. 1. 7.
728x90

안녕하세요. 씨위드입니다.

오늘은 Spark Streaming의 종류에 대하여 알아보겠습니다. 

Spark Streaming은 DStream과 Structured Streaming으로 나눠집니다.

두 스트리밍에는 어떤 차이점이 있는지 알아보기 위하여 먼저 DStream에 대해 간단히 알아본 뒤 Structured Streaming에 대하여 조금 더 자세하게 알아보겠습니다.

 

DStream

DStream은 Discreatized Stream, 즉 불연속적 스트림입니다.

디스트림은 스파크의 코어인 RDD의 개념을 바탕으로 구축되었습니다.

카프카 등의 소스로부터 발생되는 데이터를 스파크에서 사용할 수 있도록 데이터의 형태를 재구성한 것이 바로 DStream입니다.

DStream의 생성, 연산, 사용

DStream의 생성과 연산 그리고 사용에 대하여 알아보겠습니다.

생성

DStream의 생성은 카프카 등의 외부의 입력 소스로부터 만들어지거나 다른 DStream에 transfromation연산을 적용하여 생성합니다.

DStream은 시간별로 도착한 데이터들의 연속적인 모음으로 이해하실 수 있는데요.

내부적으로 각각의 DStream은 각 시간별로 도착한 RDD들의 연속적인 모음입니다.

 

연산

DStream에는 RDD와 마찬가지로 transfromation과 action 연산이 존재합니다.

먼저 Transfromation에 대하여 알아볼까요?

Transfromation 연산은 새로운 DStream을 생성 가능합니다.

Transfromation은 stateless(무상태)와 stateful(상태유지)로 나뉘는데요.

각각의 상태에 대하여 알아보겠습니다.

 

stateless에는 일반적 RDD 트랜스포메이션들이 포함됩니다.

이 상태에서는 각 배치 처리가 앞의 배치에 있는 데이터와 상관없이 진행됩니다.

반면 stateful는 현재 배치의 결과를 만들기 위해 이전 배치의 데이터나 중간 결과를 이용하는데요.

슬라이딩 윈도나 시간별 상태 추적을 바탕으로 하는 트랜스포메이션 상태를 가지는 연산의 경우 CheckPoint를 설정이 필요합니다.

 

다음은 Action입니다. 

Action은 외부 시스템에 데이터를 써주는 결과 연산, 즉 Output Operation입니다.

DStream은 RDD에서 가능한 것과 동일한 종류의 많은 출력 연산을 지원합니다.

데이터를 외부 시스템에 쓴다는 점에서 RDD의 액션과 유사하지만 스파크 스트리밍에서는 매시간 단계마다 주기적으로 실행하고 출력을 묶음(배치) 단위로 생성합니다.

 

 

사용

마지막으로 DStream의 사용입니다.

스트리밍 기능의 주 시작점인 StreamingContext를 생성하여 데이터를 처리하는 데 사용하게 되는데요.

이는 별도의 스레드에서 실행되므로 사용자 애플리케이션 종료 시에도 작업을 유지하기 위해 스트리밍 연산 완료를 기다리게 하는 메서드인 awaitTermination()을 호출할 필요가 있습니다.

StreamingContext는 소스(DStream, RDD) 생성과 스트리밍 처리 시작, 종료 등을 수행하는데요.

스파크 스트리밍이 데이터를 받기 시작하려면 명시적으로 StreamingContext에 start를 호출해야 하며 스파크 스트리밍은 내부의 StreamingContext에 스파크 작업들을 스케줄링합니다.

StreamingContext는 한 번만 시작할 수 있으므로 DStream과 출력 연산에 대한 작업들을 완료한 후에 시작해야 한다는 점을 명심해야 하시길 바랍니다.

아래는 사용 예시입니다.

sc = new StreamingContext(conf, Second(1)) //StreamingContext 생성(시간 설정)
sc.start() //StreamingContext를 시작하고 “종료”를 기다림. 
sc.awaitTermination() //작업이 끝나기를 기다림.

 

Structured Streaming

이번에는 구조적 스트리밍에 대하여 알아보겠습니다.

구조적 스트리밍은 Spark 2.0 이상에서 제공하는 API로 Spark Session으로 생성됩니다.

Spark Session은 모든 스파크 관련 기능의 시작점으로 스파크 세션클래스를 임포트 하여 생성하는데요.

스파크세션 사용 시 스트리밍 데이터 소스에서 DataFrame , DataSet 생성이 가능합니다.

아래는 그 예시입니다.

import org.apache.spark.sql.SparkSession
val spark = SparkSession
 .builder
 .appName("StructuredNetworkWordCount")
 .getOrCreate()

 

구조적 스트리밍은 SparkSQL 엔진을 기반으로 개발되었습니다.

배치연산을 정의하는 것과 같은 방법으로 스트리밍 연산을 정의하며 내부적으로는 마이크로배치 프로세싱 엔진을 통해 처리됩니다.

 

구조적 스트리밍은 꾸준히 생성되는 데이터를 무한히 증가하는 하나의 커다란 데이터 셋으로 간주하는데요.

데이터 스트림을 ‘데이터가 연속적으로 추가되는 테이블’처럼 다룹니다.

새로운 데이터를 Row단위로 계속해서 쌓으며 새로운 행 추가 시 데이터 테이블을 업데이트하고 결과 행을 외부 싱크(외부 저장장치)에 기록하는 것이죠.

출처 : https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html#conte

싱크라는 단어가 생소하신 분들도 계실 텐데요.

데이터를 읽는 장소를 소스(대표적으로 카프카가 존재), 쓰는 장소를 싱크라고 일컫습니다.

 

또한 구조적 스트리밍은 DataFrame 기반의 스트리밍을 지원합니다.

DataFrame과 결합하여 결과를 받을 수 있고 쿼리 사용 또한 가능합니다.

사용 데이터 모델은 DataFrame과 DataSet이며 '이수연 , 정연우, 김민우 , 박상용. (2020). 아파치 스파크 기반 빅 데이터 처리 프레임워크 성능 최적화 요소 분석'이라는 논문에 따르면 다른 환경 설정은 유지한 채 데이터 구조만 Dstream에서 DataFrame으로 변경한 경우 실행시간이 160초에서 85초로 성능이 약 47% 증가하였다고 합니다.

구조적 스트리밍 사용 시 데이터 구조 최적화를 통한 성능 향상을 기대해볼 수 있을 것 같습니다.

 

구조적 스트리밍에서는 데이터 처리 시 중복 데이터 관리를 위해 Time Stamp를 도입하였습니다.

무작위로 도착한 레코드 내부에 기록된 Time Stamp를 기준으로 데이터를 처리하는데요.

데이터 도착 시간이 아닌 데이터 생성 시간을 기준으로 처리하므로 데이터가 늦게 업로드되거나 순서가 뒤섞인 채 시스템에 들어와도 처리가 가능합니다.

 

또한 엔진이 자동으로 데이터의 현재 이벤트 시간을 추적하고 그에 따라 오래된 상태를 정리하는 WarterMarking이 도입되었습니다.

WarterMark는 과거 데이터의 보관 주기 제한 시 사용하는데요.

데이터가 지연되는 임계값을 지정함으로써 쿼리의 워터마크 정의가 가능합니다.

이벤트발생 시각을 기준으로 WarterMark를 설정하여 발생 시각과 도착 시간이 많이 차이가 날 경우 오래된 데이터는 폐기함으로써 이벤트 시간에서 지연되는 데이터를*WarterMarking 기법을 통해 제어하는 것이죠.

 

결과적으로 타임스탬프와 워터마크를 이용해 데이터가 지연되는 임계값지정이 가능하고 임계값 내의 데이터는 집계되지만 그 이후의 데이터는 삭제됩니다.

출처 : https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html#conte

워터마킹 조건에 관한 부분은 아래의 공식 문서를 참고 부탁드립니다.

 

Structured Streaming Programming Guide - Spark 3.1.2 Documentation

Structured Streaming Programming Guide Overview Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on s

spark.apache.org

 

Streaming DataFrame & DataSet 생성, 연산, 사용

생성

구조적 스트리밍 처리를 위한 DataSet 생성 방법으로는 DataStreamReader를 사용하여야 합니다.

DataStreamReader의 경우는 SparkSession의 readStream()을 통해 생성이 가능한데요.

스파크 2.3.0 버전 기준으로 파일, 카프카, 소켓 소스 등을 지원합니다.

 

사용

스트리밍 데이터를 처리하기 위해서는 DataStreamWriter를 이용한 쿼리 작업이 필요합니다.

DataStreamWriter는 데이터 셋의 writeStream 메서드를 사용하며 저장 모드, 쿼리명, 트리거 주기, 체크포인트 설정 가능한데요. DataStreamWriter는 스트리밍 애플리케이션을 시작하기 위한 start() 메서드를 제공하며 start 메서드의 반환 값은 Streaming Query 타입입니다. 

이를 통해 실행 중인 애플리케이션의 모니터링 및 관리가 가능하고 쿼리명, id 조회, 쿼리 중지, 소스 및 싱크의 상태 정보 조회 또한 가능합니다.

awaitTermination을 통한 실행 동작 제어가 가능합니다.

 

저장

결과를 언제 output 할지는 trigger를 통해 정의가 가능합니다.

시간단위뿐만 아니라 메가바이트 단위 등의 주기에 따라서도 아웃풋이 가능한데요.

만약 정의하지 않으면 가능한 빠르게 트리거가 배치 처리를 수행합니다.

엔진은 모든 스트리밍 소스의 위치 추적을 위해 체크포인팅 기법이나 선행기입로그(write-ahead logs)를 사용하여 각 트리거에서 처리되는 데이터의 오프셋 범위를 기록합니다.

트리거 타입 지정을 통해 어떤 방식으로 실행될지를 정의하는데 지원하는 trigger , output 타입에 대해 알아보겠습니다.

 

먼저 Trigger입니다.

Trigger는 기본값, 고정간격 마이크로 배치, 일회성 마이크로 배치, 연속형 처리로 나눠지게 됩니다.

기본 값

이전 작업 종료 시 (마이크로 배치) 바로 다음 작업에 들어갑니다.

고정 간격 마이크로 배치(Fixed interval micro-batches)

고정된 인터벌을 주고 작업합니다.

일회성 마이크로배치(One-time micro-batch)

사용 가능한 모든 데이터를 처리하고 자체적으로 중지하기 위해 단 한 번의 마이크로 배치를 실행합니다.

연속형 처리(Continuous with fixed checkpoint interval (experimental))

마이크로 배치 방식은 데이터가 쌓이길 기다려 한꺼번에 효율적으로 작업이 가능하지만 지연 시간이 발생합니다.

반면 연속형 처리는 레코드가 오면 하나씩 바로 작업하는 방식으로 굉장히 빠른 응답 속도를 보여줍니다.

 

해당 기능은 spark 2.3 이상 사용 가능합니다.

 

이번에는 Output 타입에 대하여 알아보겠습니다.

DataStreamWriter의 저장모드에는 아래와 같이 세 가지가 존재합니다.

Append

마지막 트리거 이후 새로 처리된 행만 출력합니다.

새롭게 추가된 데이터만 출력하기 때문에 결과 테이블의 기존 Row를 변경하지 않은 쿼리에만 적용 가능합니다.

Update

마지막 트리거 이후 업데이트된 행만 출력합니다.

마지막 출력시점부터 다음 출력 발생하는 시점 동안 변경된 데이터만 출력하게 됩니다.

Complete

지금까지 처리된 모든 행을 출력합니다.

데이터 프레임이 가지고 있는 전체 데이터를 모두 출력하는 방법입니다.

 

728x90

'Big Data > Apache Spark' 카테고리의 다른 글

[Spark] Spark & ulimit error  (3) 2023.10.15
[Spark] Spark Streaming 실행 방식  (0) 2023.01.08
[Spark] Spark Streaming 이란?  (0) 2022.12.21
[Spark] Spark Cluster Manager  (2) 2022.12.16
[Spark] Spark Deploy Mode  (0) 2022.12.16