'spark'에 해당되는 글 3건

 

현재 개발/운영 중인 플랫폼에서는 Spark Streaming을 중심으로 데이터 처리를 진행하고 있다.

 

Spark Streaming은 Micro-batch 방식으로 데이터를 처리하는 RDD 기반 프레임워크인데...

 

문제는 Catalyst Optimizer와 거리가 먼 RDD 기반이라는 점이다(?)

 

이에 따라 Spark 2.0 이후 Structured Streaming이 추가었으며, 이제는 Continuous Processing을 제외학곤 [Experimental] 딱지를 지웠다.

 

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.

 

물론 RDD를 굳이 Dataset으로 변환하여 Optimizer의 강점을 활용할 수도 있다.

 

Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);

 

하지만 Structued Streaming은 애초에 RDD가 아닌 Dataset을 기반으로 운영된다.

 

따라서 Spark Streaming이 RDD를 기반으로 Micro-batch를 수행했다면, Structured Streaming은 Dataset을 기반으로 Micro-batch를 수행한다.

 

Spark Streaming은 Kafka를 소스로 다음과 같이 데이터 처리 로직을 개발한다.

 

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("topicA", "topicB");

JavaInputDStream<ConsumerRecord<String, String>> stream =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
  );

stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));

 

Spark Streaming 라이브러리를 이용한 Kafka Consumer로 JavaInputDStream을 소스 데이터로 Transform을 수행한다.

 

반면, Structured Streaming은 Kafka를 소스로 다음과 같이 데이터 처리 로직을 개발한다.

 

Dataset<Row> df = sparkSession
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2,...")
    .option("subscribe", "topic1,topic2")
    .load();
    
StreamingQuery query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
	.writeStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "topic1")
    .start();
    
query.awaitTermination();

 

Structured Streaming에서는 위와 같이 readStream, writeStream을 통해 source와 output을 설정함에 따라 로직을 수행한다.

 

Kafka를 사용할 때 특징 중 하나는 Commit을 하지 않고 Offset 관리를 checkpointLocation으로 진행한다는 점이다.

 

이전에 Kafka 라이브러리의 Commit을 이용했다면 처음 접할 시 어색한 부분일 수 있다.

 

하지만 Kafka Commit을 수행하지 않고 Checkpoint를 기반으로 Offset을 관리함에 따라 실시간 스트림이 유입되는 Kafka Broker의 Workload를 가중시키지 않고 단순화 시켜 부하를 Spark, HDFS로 옮길 수 있다.

 

Structured Streaming에서는 단지 Checkpoint로 저장된 Offset을 읽어와 해당 Kafka Topic으로 maxOffsetsPerTrigger에 해당하는 메시지를 요청할 뿐이다.

 

따라서 Kafka 기반의 데이터 처리 로직을 개발할 때 Structured Streaming을 이용하면, 높은 성능과 효율적 Kafka 연동이 가능하다.

 

이를 통해 쉽게 Source에서 Output까지 스트림 처리 라인을 구축할 수 있다...만 사실 기존 RDD 기반 환경을 Dataset으로 변환한다고 크게 성능이 향상되거나 장애 처리가 쉬워지지는 않는다.

 

다만, Spark Eco System의 릴리즈 방향성을 본다면 점차 Structured Streaming을 주도적으로 개발 프로세스를 맞춰갈 필요는 있을 것 같다는 생각이다.

 

참고: Spark Streaming, Structured Streaming

블로그 이미지

나뷜나뷜

,