15 분 소요

Spark 이론 4 :

Spark 동작 특징, Lazy Evaluation

1. Spark 동작 특징

1.1. 데이터 스토리지

  • Spark는 다양한 유형의 데이터 소스와 작업 가능하다.
    • HDFS(Hadoop Distributed File System), Apache Kafka, Apache Cassandra, Amazon S3 등
  • Spark는 다양한 데이터 형식으로 저장 가능하다.
    • CSV, JSON, Parquet, Avro 등

1.2. RDD

  • RDD(Resilient Distributed Datasets)는 Spark의 기본 구성 요소이다.
  • 시스템 클러스터에서 병렬로 처리할 수 있는 데이터 요소의 분산된 컬렉션
  • RDD는 변경할 수 없음, 일단 생성되면 내용을 수정 불가

1.3. Spark Cluster

  • Apache Spark는 하나의 시스템이 마스터 노드로 작동하고 나머지는 작업자 노드로 작동하는 시스템 클러스터에서 실행되도록 설계되었다.
  • 마스터 노드는 작업을 예약하고 클러스터 전체에서 Spark 작업 실행을 조정하는 반면 작업자 노드는 데이터에 대한 실제 계산을 수행합니다.

1.4. Transformations

  • Transformation(변환)은 새로운 RDD를 생성하기 위해 RDD에 적용할 수 있는 작업이다.
    • 예시 : filter, map, reduceByKey, join 등

1.5. Actions

  • Action(작업)은 RDD에서 변환 실행을 트리거하고 결과를 드라이버 프로그램 또는 스토리지 시스템에 반환하는 작업이다.
    • 예시 : 계산, 수집, 축소, 저장 등

1.6. Lazy Evaluation

  • Lazy Evaluation은 가능한 마지막 순간까지 데이터 처리 작업의 실행을 연기하는 데 사용되는 기술이다.
  • Lazy Evaluation은 불필요한 계산을 피하기 때문에 성능을 향상시키고 리소스 사용량을 줄일 수 있다.
  • Spark는 Lazy Evaluation를 사용하여 변환 실행을 최적화한다.
  • 변환이 호출될 때 즉시 적용하는 대신 Spark는 변환을 기억하고 작업이 호출될 때만 적용한다.
  • 이를 통해 Spark는 한 번에 하나의 변환이 아닌 전체 워크플로를 기반으로 실행 계획을 최적화할 수 있다.

1.7. 인메모리 계산

  • Spark는 메모리 내 계산을 수행하도록 설계되었다.
  • 비용이 많이 드는 디스크 읽기 및 쓰기를 피하기 위해 가능한 한 많은 데이터를 메모리에 유지하려고 한다.

2. Lazy Evaluation

2.1. Spark에서 작동 방식

  • Spark에서 Lazy Evaluation는 DAG(Directed Acyclic Graph)라는 기술을 사용하여 구현된다.

  • Spark에서 Lazy Evaluation에서는 RDD의 Transformation이 즉시 실행되지 않고 Transformation을 설명하기 위해 계보가 구축된다.
  • 여기서 계보란 RDD 간의 종속성을 나타내는 DAG이다.
    • DAG : 원하는 출력을 생성하기 위해 입력 데이터에 적용해야 하는 Transformation 시퀀스를 나타내는 그래프 (방향성 비순환 그래프)
    • DAG 노드 : 필터링 또는 그룹화와 같은 변환 작업
    • DAG 엣지 : Transformation 간의 데이터 흐름
  • Spark 프로그램이 실행되면 Spark는 먼저 입력 데이터에 적용해야 하는 Transformation DAG를 빌드한다.
    • 그러나 이 단계에서는 실제로 어떤 Transformation도 실행되지 않음
    • 대신 Spark는 RDD 또는 DataFrame에서 Action이 호출될 때까지 대기
      • Action : 행 수 계산하거나 데이터를 디스크에 쓰는 것 등
  • Action이 호출되면 Spark는 DAG를 사용하여 원하는 출력을 생성하기 위해 Transformation 실행 순서를 결정한다.
    • 이후, DAG의 Transformation이 실행되고 그 결과가 드라이버 프로그램으로 반환
  • 모든 Transformation을 느리게 평가할 수 있는 것은 아니며 일부는 네트워크를 통한 데이터 셔플링과 관련된 변환과 같이 즉각적인 실행이 필요할 수 있다.

2.2. Transformation

  • Transformation이란, 기존 RDD에서 새로운 RDD를 생성하는 작업이다.

  • Transformation은 느리게 평가된다.
    • 즉시 실행을 트리거하지 않고 나중에 실행할 변환을 설명하는 계보 구축
  • Transformation은 변경할 수 없다.
    • 원래 RDD 또는 DataFrame을 수정하지 않고 새 변환을 생성
  • Transformation은 데이터 셔플링이 필요한지 여부에 따라 좁은 변환 또는 넓은 변환으로 추가 분류될 수 있다.

1) RDD 함수

  • map(func): RDD의 각 요소에 함수를 적용하고 변환된 요소와 함께 새로운 RDD를 반환
  • filter(func): 조건자 함수를 기반으로 RDD의 요소를 필터링하고 필터링된 요소가 있는 새 RDD를 반환
  • flatMap(func): RDD의 각 요소에 함수를 적용하고 평평하게 변환된 요소가 있는 새 RDD를 반환
  • groupByKey(): 키로 RDD의 키-값 쌍을 그룹화하고 그룹과 함께 새 RDD를 반환
  • reduceByKey(func): 동일한 키를 가진 RDD의 키-값 쌍 값에 이항 연산자를 적용하고 감소된 값을 가진 새 RDD를 반환
  • sortByKey(): RDD의 키-값 쌍을 키로 정렬하고 정렬된 쌍으로 새 RDD를 반환
  • join(otherRDD): RDD의 키-값 쌍을 동일한 키를 가진 다른 RDD와 결합하고 결합된 쌍으로 새 RDD를 반환
  • mapPartitions(func): RDD의 각 파티션에 함수를 적용하고 변환된 파티션이 있는 새 RDD를 반환
  • coalesce(numPartitions): RDD의 파티션 수를 지정된 수로 줄임
  • repartition(numPartitions): RDD의 파티션 수를 지정된 수로 늘리거나 줄임

2) DataFrame 함수

  • select(*cols): DataFrame에서 지정된 열을 선택하고 선택한 열이 포함된 새 DataFrame을 반환
  • filter(condition): 지정된 조건을 기반으로 DataFrame의 행을 필터링하고 필터링된 행으로 새 DataFrame을 반환
  • groupBy(*cols): DataFrame의 행을 지정된 열로 그룹화하고 그룹화된 행이 있는 새 DataFrame을 반환
  • agg(*exprs): 지정된 집계를 DataFrame의 열에 적용하고 집계된 값이 포함된 새 DataFrame을 반환
  • join(otherDF, joinExprs): 지정된 조인 식을 기반으로 DataFrame의 행을 다른 DataFrame과 조인하고 조인된 행이 있는 새 DataFrame 반환
  • withColumn(colName, col): 지정된 이름 및 값 표현식을 사용하여 DataFrame에 새 열을 추가하고 추가된 열이 있는 새 DataFrame 반환
  • orderBy(*cols, **kwargs): 지정된 열을 기준으로 DataFrame의 행을 정렬하고 정렬된 행으로 새 DataFrame 반환
  • distinct(): 원래 DataFrame의 고유한 행이 있는 새 DataFrame을 반환
  • drop(*cols): DataFrame에서 지정된 열을 삭제하고 삭제된 열 없이 새 DataFrame을 반환
  • selectExpr(*exprs): 지정된 표현식을 기반으로 DataFrame에서 열을 선택하고 선택한 열이 포함된 새 DataFrame을 반환

2.3. Action

  • Action이란, Transformation 실행을 트리거하고 드라이버 프로그램에 값을 반환하는 작업이다.

  • Action은 Spark가 지연된 계산을 평가하고 빌드된 Transformation의 DAG를 실행하는 지점이다.

  • Action은 데이터를 수정하거나 외부 저장소에 쓸 수 있다.

1) RDD 함수

  • count(): RDD의 요소 수를 반환
  • collect(): RDD의 모든 요소를 드라이버 프로그램에 목록으로 반환
  • take(n): RDD의 처음 n개 요소를 목록으로 반환
  • first(): RDD의 첫 번째 요소를 반환
  • foreach(func): RDD의 각 요소에 함수를 적용
  • reduce(func): RDD의 요소에 이진 연산자를 적용하고 결과를 반환
  • saveAsTextFile(path): RDD의 요소를 지정된 경로의 텍스트 파일에 저장
  • countByKey(): RDD에 있는 각 키의 개수와 함께 사전을 반환
  • foreachPartition(func): RDD의 각 파티션에 함수를 적용
  • top(n): RDD의 상위 n개 요소를 목록으로 반환

2) DataFrame 함수

  • count(): DataFrame의 행 수 반환
  • collect(): DataFrame의 모든 행을 행 목록으로 드라이버 프로그램에 반환
  • show(n=20, truncate=True, vertical=False): DataFrame의 처음 n 행을 콘솔에 출력
  • first(): DataFrame의 첫 번째 행 반환
  • head(n=1): DataFrame의 처음 n개 행을 행 목록으로 반환
  • foreach(func): DataFrame의 각 행에 함수를 적용
  • toPandas(): DataFrame을 Pandas DataFrame으로 변환
  • describe(*cols): DataFrame의 지정된 열에 대한 요약 통계를 새 DataFrame으로 반환
  • write.save(path, format=None, mode=None, partitionBy=None, **options): DataFrame을 지정된 경로에 지정된 형식으로 저장
  • groupBy(*cols).count(): DataFrame의 행을 지정된 열로 그룹화하고 각 그룹의 개수와 함께 새 DataFrame을 반환

2.4. toDebugStrings()

toDebugString() 는 RDD 또는 DataFrame의 계보를 트리 형식으로 시각화하는 함수로 실행될 Transformation 순서를 이해하는 데 사용할 수 있다.

  • 계보에는 데이터에 적용된 변환과 RDD 간의 종속성 표시
  • 어떤 Transformation이 지연되고 어떤 Transformation이 실행되었는지 표시

1) 예시

  • 각 라인은 계보의 RDD 또는 단계를 나타낸다.

    • 위 단계로 갈 수록 후속 단계로 RDD에서 수행되는 작업을 의미

    • 맨 위 단계는 최종 출력 RDD
    • 맨 아래 단계는 원래 RDD
df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True)
df_filtered = df.filter(df["age"] > 30)
df_grouped = df_filtered.groupBy("gender").count()
print(df_grouped.rdd.toDebugString())
(2) MapPartitionsRDD[6] at count at <ipython-input-4-9d8378572d7b>:8 []
 |  ShuffledRowRDD[5] at groupBy at <ipython-input-4-9d8378572d7b>:8 []
 |  MapPartitionsRDD[4] at mapPartitions at PythonRDD.scala:427 []
 |  MapPartitionsRDD[3] at mapPartitions at PythonRDD.scala:427 []
 |  FileScanRDD[2] at csv at NativeMethodAccessorImpl.java:0 []

2) ShuffledRDDMapPartitionsRDD

  • 둘 다 모두 Spark의 RDD 유형이며 Spark의 DAG에서 서로 다른 계산 단계를 나타낸다.
  • MapPartitionsRDD
    • 좁은 종속성
    • 상위 RDD의 각 파티션을 새 파티션에 매핑하는 Transformation을 적용하여 생성되는 RDD
    • 파티션 간에 데이터를 섞지 않고 주어진 함수를 각 파티션에 독립적으로 적용
    • 함수의 결과는 파티션 간에 결합되어 최종 RDD 형성
    • 결과 RDD는 상위 RDD와 동일한 수의 파티션을 가짐
      • 예를 들어 RDD의 각 요소에 함수를 적용하는 map 작업은 일반적으로 MapPartitionsRDD로 구현됨
  • ShuffledRowRDDShuffleRDD
    • 광범위한 종속성
    • groupBy 또는 join 작업과 같은 데이터 셔플이 필요한 변환에 의해 생성되는 RDD
    • 데이터는 키를 기준으로 분할된 다음 네트워크를 통해 데이터가 필요한 노드로 섞임
    • 특히 데이터가 크거나 키가 많은 경우 비용이 많이 드는 작업이 될 수 있으며 데이터가 네트워크를 통해 여러 번 전송될 수 있음
    • ShuffledRowRDD : 셔플되는 데이터가 행 형식일 때 사용
    • ShuffledRDD : 셔플되는 데이터가 행 형식이 아닐 때 사용
  • 두 RDD 유형의 주요 차이점
    • MapPartitionsRDD는 로컬에서 실행할 수 있는 변환
    • ShuffledRDD는 데이터 셔플이 필요하고 네트워크를 통한 데이터 이동이 있을 수 있는 변환

2) 최적화 및 디버깅에 유용

  • toDebugString() 의 출력을 해석하면 Spark 애플리케이션의 계산 그래프를 이해하고 최적화 및 디버깅하는 데 유용하다.
    • 계산 그래프에서 병목 현상을 식별하고 제거할 수 있는 불필요한 Transformation 파악
    • 캐싱이 성능에 미치는 영향 파악
  • 종속성이 넓은 단계가 있는 경우 코드에서 불필요한 셔플이 발생할 수 있다.
    • 좁은 종속성 : 작업 간에 데이터를 섞이지 않음 (MapPartitionsRDD)
    • 넓은 종속성 : 작업 간에 데이터가 섞이고 있음 (ShuffledRDD, ShuffledRowRDD)
  • 변환 전
    • groupByKey : 네트워크에서 모든 데이터를 섞은 다음 함께 그룹화하는 함수로 대규모 데이터 세트 처리 시 성능 문제 야기
    • 아래 코드는 축소 작업이 일어나기 전에 네트워크 전체에서 불필요하게 데이터가 섞일 수 있음
rdd = sc.parallelize([-2, -1, 0, 1, 2, 3, 4, 5], 4)
rdd_filtered = rdd.filter(lambda x: x > 0)
rdd_mapped = rdd_filtered.map(lambda x: (x, 1))
rdd_grouped = rdd_mapped.groupByKey()
rdd_reduced = rdd_grouped.map(lambda x: (x[0], sum(x[1])))
print("rdd_reduced lineage: ", rdd_reduced.toDebugString())

rdd_reduced lineage:  
(4) PythonRDD[10] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[9] at mapPartitions at PythonRDD.scala:141 []
 |  ShuffledRDD[8] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(4) MapPartitionsRDD[7] at mapPartitions at PythonRDD.scala:141 []
    |  FilterRDD[6] at filter at PythonRDD.scala:89 []
    |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []
  • 변환 후
    • reduceByKey : 로컬에서 데이터를 먼저 줄인 다음 섞기 때문에 더 효율적
    • 아래 코드로 수정 후 groupByKey와 sum이 없어져서 계보가 단순해짐
rdd = sc.parallelize([-2, -1, 0, 1, 2, 3, 4, 5], 4)
rdd_filtered = rdd.filter(lambda x: x > 0)
rdd_mapped = rdd_filtered.map(lambda x: (x, 1))
rdd_reduced = rdd_mapped.reduceByKey(lambda x, y: x+y)
print("rdd_reduced lineage: ", rdd_reduced.toDebugString())

rdd_reduced lineage:  
PythonRDD[14] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[13] at mapPartitions at PythonRDD.scala:141 []
 |  ShuffledRDD[12] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(4) MapPartitionsRDD[11] at mapPartitions at PythonRDD.scala:141 []
    |  FilterRDD[6] at filter at PythonRDD.scala:89 []
    |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []

2.5. Spark에서 중요성

  • Lazy Evaluation은 가능한 마지막 순간까지 Transformation 실행을 연기함으로써 입력 데이터 및 사용 가능한 리소스의 특성을 기반으로 Transformation를 최적화할 수 있다.
    • Transformation 실행 순서(시퀀스) 최적화
    • 여러 Transformation 결합 및 불필요한 계산 최소화
    • 네트워크에서 데이터 이동 최소화
  • 예를 들어 Spark는 파티셔닝 및 캐싱 기술을 사용하여 이미 처리된 데이터를 다시 계산하지 않도록 할 수 있고, 이로 인해 성능이 크게 향상되고 리소스 사용량이 감소할 수 있다.

3. 생각

  • Lazy Evaluation은 효율적이고 최적화된 데이터 처리를 가능하게 하는 Spark의 핵심 기능이다(계산 및 네트워크 오버헤드를 최소화).
  • Lazy Evaluation를 통해 계산 최적화를 하고, 인메모리 처리 방식을 통해 중복 계산을 방지하는데 이 2가지의 시너지 효과 덕분에 Spark가 효율적인 처리가 가능한 것 같다.
  • Lazy Evaluation이 계산 최적화를 해주지만, 네트워크 간 데이터 이동(Shuffle)을 최소화할 수 있도록 toDebugStrings() 등을 활용하여 코드적인 부분도 더 고민해봐야겠다.

REFERENCES