Spark Streaming
- Stream
- 일반적으로 스트림 데이터가 생성되는 시점과 처리되는 시점 사이의 간격이 짧을 수록 좋음
- Spark Streaming
- 데이터의 가치를 유지하기 위해 배치 처리보다 짧은 간격(마이크로 배치)으로 데이터를 처리하는 스파크의 서브 모듈
- Receiver
- Kafka, Flume, Kinesis, File, CustomReceiver 등
- Window Operations
- Window Length와 Sliding Interval로 구성
- 예) 10초(Sliding Interval)마다 최근 30초(Window Length)동안 수집된 데이터 분석
- Configuration
- spark.streaming.blockInterval
- Stream을 저장하기 위해 구성하는 Block 생성 주기 (기본값 200ms, 최소값 50ms)
- spark.streaming.backpressure.initialRate
- Backpressure 메커니즘 사용 시 가장 처음 배치 처리에 대한 별도 Receiving Rate 설정
Spark Cluster Manager
- Hadoop YARN (까다롭지만 안정적)
- Executor, Core 등의 설정을 상세하게 설정해줘야 함
- DynamicAllocation 기능으로 완화할 수 있으나 Mesos와는 다른 메커니즘
- Queue의 계층적 그룹화로 자원의 관리 및 우선순위 설정이 가능
- 그룹별 자원 제한으로 동일한 환경에서 Mesos보다 HeapMemory 에러가 자주 발생
- Apache Mesos (쉽고 활용성이 높지만 Customizing 어려움)
- Coarse-Grained 모드 (기본값): 1개의 Executor 당 다수의 Task를 지향
- 애플리케이션 통계로부터 Executor 수의 동적 Resize 지원
- YARN mode 보다 자원 설정이 좀 더 추상적이기 때문에 탄력적인 관리 가능
- ex) spark.executor.cores (기본값: 모든 사용 가능한 core 수)
- 계층 구조는 없지만 Zone으로 구성하는 것은 가능 (정확한 확인 필요)
- DevOps 측면에서는 YARN보다 Mesos가 우수함
Data Sharing
- cache()
- 무조건 MEMORY_ONLY로 저장
- When to use caching
- When we re-use RDD while working in iterative machine learning applications
- While we re-use RDD in standalone spark applications
- When RDD computations are expensive, we use caching mechanism. It helps in reducing the cost of recovery if, in case one executor fails
- persist(StorageLevel)
- StorageLevel을 지정하지 않으면 현재 설정된 레벨로 저장됨
- 애플리케이션 간 공유를 위해 DISK_ONLY 레벨로 HDFS에 저장할 수 있음
- 애플리케이션이 종료되어도 제거되지 않음
- broadcast(T)
- Broadcast를 이용하여 각 노드 혹은 스테이지 마다 필요한 데이터들을 효율적으로 공유함
- Spark 내부에서도 각 스테이지 마다 필요한 공통 데이터들을 Broadcast를 이용하여 배포함
- RDD는 기본적으로 애플리케이션에 종속되어 애플리케이션 간 공유 불가
- Persist를 이용한 HDFS에 저장하는 방법
- Persist 함수 사용 시 DISK_ONLY를 이용하여 HDFS에 데이터를 저장
- 다른 애플리케이션에서 HDFS에 저장된 데이터를 로드하도록 동기화
- In-memory Data Grid에 저장하는 방법
- 클러스터 내 메모리 자원을 동기화하여 애플리케이션 간 데이터 공유 가능
- 예) Ignite, Hazelcast, Alluxio 등
DynamicAllocation
- 자원 사용량에 따라 Executor가 추가되거나 삭제되는 기능
- spark.dynamicAllocation.enabled 옵션을 true로 설정하여 활성화
- spark.shuffle.service.enabled 옵션을 true로 설정하여 External Shuffle Service를 활성화
- Shuffle 서비스가 Executor에 종속적이기 때문에 Scaling에 따라 Executor가 강제 종료되는 경우 Shuffle 파일(RDD)들이 같이 삭제되는 것을 막기 위한 설정
- 옵션이 활성화되는 경우 Shuffle 파일(RDD)들은 캐시되어 삭제되지 않음
- Scale-out 옵션
- spark.dynamicAllocation.schedulerBacklogTimeout (기본값 1s)
- Backlog가 Timeout 동안 존재하는 경우 Executor를 추가 배치
- Scale-in 옵션
- spark.dynamicAllocation.executorIdleTimeout (기본값 60s)
- 해당 시간 동안 자원이 사용되지 않는 Executor는 제거
BackPressure
- Spark Streaming에서 버퍼링 현상을 막기 위해 소스 데이터를 조절
- 경우에 따라 BackPressure 대신 DynamicAllocation을 사용할 수 있음
- spark.streaming.backpressure.enabled 옵션을 true로 설정하여 활성화
- 배치 처리 시간이 데이터가 수집되는 배치 간격보다 짧도록 유지해야 함
- StreamingListeners에서 스트림 데이터 수집을 감시
- 이를 위해 RateController에서 스트림 데이터에 대한
processingEndTime, processingDelay, schedulingDelay, numRecords를 보고 - RateController로부터 수집된 정보를 이용하여 RateEstimator가 BackPressure를 위한 Rate 계산을 수행하며, Spark는 오직 PIDRateEstimator만을 이용함
- PIDRateEstimator는 PID 제어 기법을 이용하며,
DirectKafkaInputDStream, ReceiverInputDStreams에 의해 사용됨 - 위 과정으로 측정되는 Rate에 따라 스트림 처리 시간 지연에 따른 버퍼링 문제를 Sender쪽으로 보내 Spark의 부하를 줄임
- Spark Streaming에서 버퍼링 현상을 막기 위해 소스 데이터를 조절
Spark Configurations
- spark.driver.maxResultSize
- collect() 등으로 생성된 결과 값의 최대 크기(1G)
- spark.task.maxFailures
- 최대 Task 실행 횟수(기본값 4 = 첫시도 1 + 재시도 3)
- spark.memory.fraction
- 전체 힙 영역에서 Executor와 RDD 데이터 저장에 사용될 크기의 비율(0.6)
- Executor에서 객체직렬화/역직렬화, GC 방지 등을 위한 조율 필요
- 나머지 40%는 사용자 데이터 구조, 내부 메타데이터 등을 위해 사용됨
- spark.memory.storageFraction
- 할당된 메모리에서 데이터 저장에 사용할 비율(0.5)
- 이 값을 크게 할 경우 Executor에서 사용할 메모리가 줄어듬
- Executor에 의해 제거되지 않는(보존되는) 캐시된 블록들이 저장됨
- spark.default.parallelism (기본값: 모든 Executor를 위한 전체 Core 수)
- Transformation(join, reduceByKey, parallelize 등)을 위한 파티션(기본값 최대 128MB)의 수
- 기본적으로 HDFS 블록 당 1 파티션을 생성함
- 사용자가 지정하지 않는 경우 처리할 파일의 크기와 옵션에 따라 Map 태스크와 Reduce 파티션 수가 자동으로 결정됨
- 병렬 처리 수준을 늘려도 스파크는 Executor를 재사용하기 때문에 오버헤드(200ms)가 크지 않음
- 일반적으로 CPU 코어 당 2~3 Task 정도를 권장
- spark.locality.wait
- Task를 할당할 때 Locality를 보장할 수 있도록 단계(Process, Node, Rack)별 대기하는 시간
- 기본값 3초이며, 기본값으로도 일반적으로 잘 작동됨
- spark.streaming.kafka.maxRatePerPartition
- Kafka Direct API를 사용할 때 최대 처리율을 설정하지 않으면 Spark Streaming을 중단했다가 다시 시작할 때, 그 동안 처리되지 않고 쌓인 데이터를 한꺼번에 처리함
Spark Configurations (YARN)
- spark.yarn.am.(cores/memory) - Client Mode
- AM 코어/메모리 수 (Client Mode 에서는 Spark Driver가 YARN cluster에서 수행되지 않음)
- spark.driver.cores/memory - Cluster Mode
- spark.yarn.max.executor.failures
- 애플리케이션 Fail을 결정짓는 Executor Fail 수 (기본값: numExecutors * 2, minimum 3)
- spark.executor.instances (컨테이너 수)
- Static allocation에서 Executor의 수(기본값 2)
- spark.dynamicAllocation.enabled에서는 이 값을 초기값으로 할당 후 동적으로 증가
- spark.executor.cores (컨테이너 내 스레드 수)
- Executor에 할당할 코어 수(기본값 YARN: 1, Standalone/Mesos: 사용 가능한 전체 코어 수)
- spark.yarn.(executor/driver/am).memoryOverhead
- Executor 당 할당할 off-heap 메모리 크기
- executorMemory * 0.10 (minimum 384 MB)
Spark Tuning
- 메모리 사용량 측정 방법
- 데이터 셋을 RDD로 만듦 -> 캐시에 RDD를 넣음 -> 웹페이지의 “Storage”페이지에서 확인
- SizeEstimator의 estimate를 이용하여 특정 객체의 메모리 사용량을 확인하고 조율할 수 있음
- Tuning Tips
- HashMap과 같은 Java/Scala의 Wrapper 컬렉션 클래스 대신 원시 타입을 사용하라
- ‘fastutil’라이브러리에 원시 타입으로 구현된 컬랙션 클래스를 이용할 수 있음
- 원시 타입은 메모리 사용량을 줄임과 동시에 GC 오버헤드의 감소 효과를 가져옴
- Key를 구성하는 요소로 String 대신 numeric ID 혹은 enumeration 객체를 이용하라
- RDD persistence API에서 MEMORY_ONLY_SER 옵션과 같은 직렬화 저장소를 활성화하라
- KryoSerializer를 적극적으로 추천함
- 객체 크기가 큰 경우 spark.kryoserializer.buffer(.max) 옵션을 증가시켜야 함
- Unsafe serialization is greater than 23 times faster than standard use of java.io.Serializable
- Use of RandomAccessFile can speed up standard buffered serialization by almost 4 times
- GC는 힙공간에서 YOUNG(Minor GC, Eden -> Survivor1 -> Survivor2) -> OLD -> Full GC
- 각 단계별 GC의 빈도를 모니터링하여 튜닝을 수행하며, 일반적으로 Full GC가 가장 큰 영향을 미침
- GC에 의한 성능 제약을 회피하기 위해 JVM 외의 메모리 공간에 데이터를 저장
- JVM 메모리에서보다는 성능이 떨어지지만 HDD 보다는 빠름
- 메모리 클러스터에서 안정적인 HA 기술 제공
- Ignite, Hazelcast 등
- 데이터 양이 대량일 경우 Persist하지 않고 필요할 때마다 재계산하는 것이 더 빠를 수 있음
- 빠른 응답과 데이터 유실 방지를 위해서는 Replicated Storage 옵션을 사용하는 것이 유리함
- 단, 메모리 여유가 적어지기 때문에 상황에 따라 적용
https://zerogravitylabs.ca/spark-performance-tuning-checklist/
https://readme.skplanet.com/?p=12465
'Development > Spark' 카테고리의 다른 글
간단하게 Spark ML Model 로딩하기 (Decision Tree) (0) | 2019.08.11 |
---|---|
Spark Streaming vs Structured Streaming 비교 체험(?) (0) | 2019.05.12 |
Spark 공부 - 구조, RDD, Storage Level (0) | 2019.01.24 |