'Development/Spark'에 해당되는 글 4건

 

Spark Streaming

 

  • Stream
    • 일반적으로 스트림 데이터가 생성되는 시점과 처리되는 시점 사이의 간격이 짧을 수록 좋음
  • Spark Streaming
    • 데이터의 가치를 유지하기 위해 배치 처리보다 짧은 간격(마이크로 배치)으로 데이터를 처리하는 스파크의 서브 모듈

 

 

  • Receiver
    • Kafka, Flume, Kinesis, File, CustomReceiver 등
  • Window Operations
    • Window Length와 Sliding Interval로 구성
    • 예) 10초(Sliding Interval)마다 최근 30초(Window Length)동안 수집된 데이터 분석
  • Configuration
    • spark.streaming.blockInterval
      • Stream을 저장하기 위해 구성하는 Block 생성 주기 (기본값 200ms, 최소값 50ms)
    • spark.streaming.backpressure.initialRate
      • Backpressure 메커니즘 사용 시 가장 처음 배치 처리에 대한 별도 Receiving Rate 설정

 

 

 

Spark Cluster Manager

 

  • Hadoop YARN (까다롭지만 안정적)
    • Executor, Core 등의 설정을 상세하게 설정해줘야 함
      • DynamicAllocation 기능으로 완화할 수 있으나 Mesos와는 다른 메커니즘
    • Queue의 계층적 그룹화로 자원의 관리 및 우선순위 설정이 가능
      • 그룹별 자원 제한으로 동일한 환경에서 Mesos보다 HeapMemory 에러가 자주 발생
  • Apache Mesos (쉽고 활용성이 높지만 Customizing 어려움)
    • Coarse-Grained 모드 (기본값): 1개의 Executor 당 다수의 Task를 지향
      • 애플리케이션 통계로부터 Executor 수의 동적 Resize 지원
    • YARN mode 보다 자원 설정이 좀 더 추상적이기 때문에 탄력적인 관리 가능
      • ex) spark.executor.cores (기본값: 모든 사용 가능한 core 수)
    • 계층 구조는 없지만 Zone으로 구성하는 것은 가능 (정확한 확인 필요)
    • DevOps 측면에서는 YARN보다 Mesos가 우수함

 

Data Sharing

 

  • cache()
    • 무조건 MEMORY_ONLY로 저장
    • When to use caching
      • When we re-use RDD  while working in iterative machine learning applications
      • While we re-use RDD  in standalone spark applications
      • When RDD computations are expensive, we use caching mechanism. It helps in reducing the cost of recovery if, in case one executor fails
  • persist(StorageLevel)
    • StorageLevel을 지정하지 않으면 현재 설정된 레벨로 저장됨
    • 애플리케이션 간 공유를 위해 DISK_ONLY 레벨로 HDFS에 저장할 수 있음
    • 애플리케이션이 종료되어도 제거되지 않음
  • broadcast(T)
    • Broadcast를 이용하여 각 노드 혹은 스테이지 마다 필요한 데이터들을 효율적으로 공유함
    • Spark 내부에서도 각 스테이지 마다 필요한 공통 데이터들을 Broadcast를 이용하여 배포함

 

  • RDD는 기본적으로 애플리케이션에 종속되어 애플리케이션 간 공유 불가
    • Persist를 이용한 HDFS에 저장하는 방법
      • Persist 함수 사용 시 DISK_ONLY를 이용하여 HDFS에 데이터를 저장
      • 다른 애플리케이션에서 HDFS에 저장된 데이터를 로드하도록 동기화
    • In-memory Data Grid에 저장하는 방법
      • 클러스터 내 메모리 자원을 동기화하여 애플리케이션 간 데이터 공유 가능
      • 예) Ignite, Hazelcast, Alluxio 등

 

 

 

 

 

DynamicAllocation

 

  • 자원 사용량에 따라 Executor가 추가되거나 삭제되는 기능
    • spark.dynamicAllocation.enabled 옵션을 true로 설정하여 활성화
    • spark.shuffle.service.enabled 옵션을 true로 설정하여 External Shuffle Service를 활성화
      • Shuffle 서비스가 Executor에 종속적이기 때문에 Scaling에 따라 Executor가 강제 종료되는 경우 Shuffle 파일(RDD)들이 같이 삭제되는 것을 막기 위한 설정
      • 옵션이 활성화되는 경우 Shuffle 파일(RDD)들은 캐시되어 삭제되지 않음
    • Scale-out 옵션
      • spark.dynamicAllocation.schedulerBacklogTimeout (기본값 1s)
        • Backlog가 Timeout 동안 존재하는 경우 Executor를 추가 배치
    • Scale-in 옵션
      • spark.dynamicAllocation.executorIdleTimeout (기본값 60s)
        • 해당 시간 동안 자원이 사용되지 않는 Executor는 제거

 

BackPressure

 

  • Spark Streaming에서 버퍼링 현상을 막기 위해 소스 데이터를 조절
    • 경우에 따라 BackPressure 대신 DynamicAllocation을 사용할 수 있음
    • spark.streaming.backpressure.enabled 옵션을 true로 설정하여 활성화
    • 배치 처리 시간이 데이터가 수집되는 배치 간격보다 짧도록 유지해야 함
      • StreamingListeners에서 스트림 데이터 수집을 감시
      • 이를 위해 RateController에서 스트림 데이터에 대한
        processingEndTime, processingDelay, schedulingDelay, numRecords를 보고
      • RateController로부터 수집된 정보를 이용하여 RateEstimator가 BackPressure를 위한 Rate 계산을 수행하며, Spark는 오직 PIDRateEstimator만을 이용함
        • PIDRateEstimator는 PID 제어 기법을 이용하며,
          DirectKafkaInputDStream, ReceiverInputDStreams에 의해 사용됨
    • 위 과정으로 측정되는 Rate에 따라 스트림 처리 시간 지연에 따른 버퍼링 문제를 Sender쪽으로 보내 Spark의 부하를 줄임

 

  • Spark Streaming에서 버퍼링 현상을 막기 위해 소스 데이터를 조절

 

 

 

 

Spark Configurations

 

  • spark.driver.maxResultSize
    • collect() 등으로 생성된 결과 값의 최대 크기(1G)

 

  • spark.task.maxFailures
    • 최대 Task 실행 횟수(기본값 4 = 첫시도 1 + 재시도 3)

 

  • spark.memory.fraction
    • 전체 힙 영역에서 Executor와 RDD 데이터 저장에 사용될 크기의 비율(0.6)
    • Executor에서 객체직렬화/역직렬화, GC 방지 등을 위한 조율 필요
    • 나머지 40%는 사용자 데이터 구조, 내부 메타데이터 등을 위해 사용됨

 

  • spark.memory.storageFraction
    • 할당된 메모리에서 데이터 저장에 사용할 비율(0.5)
    • 이 값을 크게 할 경우 Executor에서 사용할 메모리가 줄어듬
    • Executor에 의해 제거되지 않는(보존되는) 캐시된 블록들이 저장됨

 

  • spark.default.parallelism (기본값: 모든 Executor를 위한 전체 Core 수)
    • Transformation(join, reduceByKey, parallelize 등)을 위한 파티션(기본값 최대 128MB)의 수
      • 기본적으로 HDFS 블록 당 1 파티션을 생성함
    • 사용자가 지정하지 않는 경우 처리할 파일의 크기와 옵션에 따라 Map 태스크와 Reduce 파티션 수가 자동으로 결정됨
    • 병렬 처리 수준을 늘려도 스파크는 Executor를 재사용하기 때문에 오버헤드(200ms)가 크지 않음
    • 일반적으로 CPU 코어 당 2~3 Task 정도를 권장

 

  • spark.locality.wait
    • Task를 할당할 때 Locality를 보장할 수 있도록 단계(Process, Node, Rack)별 대기하는 시간
    • 기본값 3초이며, 기본값으로도 일반적으로 잘 작동됨
  • spark.streaming.kafka.maxRatePerPartition
    • Kafka Direct API를 사용할 때 최대 처리율을 설정하지 않으면 Spark Streaming을 중단했다가 다시 시작할 때, 그 동안 처리되지 않고 쌓인 데이터를 한꺼번에 처리함

 

Spark Configurations (YARN)

 

  • spark.yarn.am.(cores/memory) - Client Mode
    • AM 코어/메모리 수 (Client Mode 에서는 Spark Driver가 YARN cluster에서 수행되지 않음)
    • spark.driver.cores/memory - Cluster Mode
  • spark.yarn.max.executor.failures
    • 애플리케이션 Fail을 결정짓는 Executor Fail 수 (기본값: numExecutors * 2, minimum 3)

 

  • spark.executor.instances (컨테이너 수)
    • Static allocation에서 Executor의 수(기본값 2)
    • spark.dynamicAllocation.enabled에서는 이 값을 초기값으로 할당 후 동적으로 증가
  • spark.executor.cores (컨테이너 내 스레드 수)
    • Executor에 할당할 코어 수(기본값 YARN: 1, Standalone/Mesos: 사용 가능한 전체 코어 수)

 

  • spark.yarn.(executor/driver/am).memoryOverhead
    • Executor 당 할당할 off-heap 메모리 크기
    • executorMemory * 0.10 (minimum 384 MB)

 

Spark Tuning

 

  • 메모리 사용량 측정 방법
    • 데이터 셋을 RDD로 만듦 -> 캐시에 RDD를 넣음 -> 웹페이지의 “Storage”페이지에서 확인
    • SizeEstimator의 estimate를 이용하여 특정 객체의 메모리 사용량을 확인하고 조율할 수 있음
  • Tuning Tips
    • HashMap과 같은 Java/Scala의 Wrapper 컬렉션 클래스 대신 원시 타입을 사용하라
      • ‘fastutil’라이브러리에 원시 타입으로 구현된 컬랙션 클래스를 이용할 수 있음
      • 원시 타입은 메모리 사용량을 줄임과 동시에 GC 오버헤드의 감소 효과를 가져옴
    • Key를 구성하는 요소로 String 대신 numeric ID 혹은 enumeration 객체를 이용하라
    • RDD persistence API에서 MEMORY_ONLY_SER 옵션과 같은 직렬화 저장소를 활성화하라
      • KryoSerializer를 적극적으로 추천함
      • 객체 크기가 큰 경우 spark.kryoserializer.buffer(.max) 옵션을 증가시켜야 함
      • Unsafe serialization is greater than 23 times faster than standard use of java.io.Serializable
      • Use of RandomAccessFile can speed up standard buffered serialization by almost 4 times
    • GC는 힙공간에서 YOUNG(Minor GC, Eden -> Survivor1 -> Survivor2) -> OLD -> Full GC
      • 각 단계별 GC의 빈도를 모니터링하여 튜닝을 수행하며, 일반적으로 Full GC가 가장 큰 영향을 미침
    • GC에 의한 성능 제약을 회피하기 위해 JVM 외의 메모리 공간에 데이터를 저장
      • JVM 메모리에서보다는 성능이 떨어지지만 HDD 보다는 빠름
      • 메모리 클러스터에서 안정적인 HA 기술 제공
      • Ignite, Hazelcast
    • 데이터 양이 대량일 경우 Persist하지 않고 필요할 때마다 재계산하는 것이 더 빠를 수 있음
    • 빠른 응답과 데이터 유실 방지를 위해서는 Replicated Storage 옵션을 사용하는 것이 유리함
      • 단, 메모리 여유가 적어지기 때문에 상황에 따라 적용

 

https://zerogravitylabs.ca/spark-performance-tuning-checklist/

 

https://readme.skplanet.com/?p=12465

 

 

 

블로그 이미지

나뷜나뷜

,