[Spark] Lazy Evaluation
Spark 이론 4 :
Spark 동작 특징, Lazy Evaluation
1. Spark 동작 특징
1.1. 데이터 스토리지
- Spark는 다양한 유형의 데이터 소스와 작업 가능하다.
- HDFS(Hadoop Distributed File System), Apache Kafka, Apache Cassandra, Amazon S3 등
- Spark는 다양한 데이터 형식으로 저장 가능하다.
- CSV, JSON, Parquet, Avro 등
1.2. RDD
- RDD(Resilient Distributed Datasets)는 Spark의 기본 구성 요소이다.
- 시스템 클러스터에서 병렬로 처리할 수 있는 데이터 요소의 분산된 컬렉션
- RDD는 변경할 수 없음, 일단 생성되면 내용을 수정 불가
1.3. Spark Cluster
- Apache Spark는 하나의 시스템이 마스터 노드로 작동하고 나머지는 작업자 노드로 작동하는 시스템 클러스터에서 실행되도록 설계되었다.
- 마스터 노드는 작업을 예약하고 클러스터 전체에서 Spark 작업 실행을 조정하는 반면 작업자 노드는 데이터에 대한 실제 계산을 수행합니다.
1.4. Transformations
- Transformation(변환)은 새로운 RDD를 생성하기 위해 RDD에 적용할 수 있는 작업이다.
- 예시 : filter, map, reduceByKey, join 등
1.5. Actions
- Action(작업)은 RDD에서 변환 실행을 트리거하고 결과를 드라이버 프로그램 또는 스토리지 시스템에 반환하는 작업이다.
- 예시 : 계산, 수집, 축소, 저장 등
1.6. Lazy Evaluation
- Lazy Evaluation은 가능한 마지막 순간까지 데이터 처리 작업의 실행을 연기하는 데 사용되는 기술이다.
- Lazy Evaluation은 불필요한 계산을 피하기 때문에 성능을 향상시키고 리소스 사용량을 줄일 수 있다.
- Spark는 Lazy Evaluation를 사용하여 변환 실행을 최적화한다.
- 변환이 호출될 때 즉시 적용하는 대신 Spark는 변환을 기억하고 작업이 호출될 때만 적용한다.
- 이를 통해 Spark는 한 번에 하나의 변환이 아닌 전체 워크플로를 기반으로 실행 계획을 최적화할 수 있다.
1.7. 인메모리 계산
- Spark는 메모리 내 계산을 수행하도록 설계되었다.
- 비용이 많이 드는 디스크 읽기 및 쓰기를 피하기 위해 가능한 한 많은 데이터를 메모리에 유지하려고 한다.
2. Lazy Evaluation
2.1. Spark에서 작동 방식
-
Spark에서 Lazy Evaluation는 DAG(Directed Acyclic Graph)라는 기술을 사용하여 구현된다.
- Spark에서 Lazy Evaluation에서는 RDD의 Transformation이 즉시 실행되지 않고 Transformation을 설명하기 위해 계보가 구축된다.
- 여기서 계보란 RDD 간의 종속성을 나타내는 DAG이다.
- DAG : 원하는 출력을 생성하기 위해 입력 데이터에 적용해야 하는 Transformation 시퀀스를 나타내는 그래프 (방향성 비순환 그래프)
- DAG 노드 : 필터링 또는 그룹화와 같은 변환 작업
- DAG 엣지 : Transformation 간의 데이터 흐름
- Spark 프로그램이 실행되면 Spark는 먼저 입력 데이터에 적용해야 하는 Transformation DAG를 빌드한다.
- 그러나 이 단계에서는 실제로 어떤 Transformation도 실행되지 않음
- 대신 Spark는 RDD 또는 DataFrame에서 Action이 호출될 때까지 대기
- Action : 행 수 계산하거나 데이터를 디스크에 쓰는 것 등
- Action이 호출되면 Spark는 DAG를 사용하여 원하는 출력을 생성하기 위해 Transformation 실행 순서를 결정한다.
- 이후, DAG의 Transformation이 실행되고 그 결과가 드라이버 프로그램으로 반환
- 모든 Transformation을 느리게 평가할 수 있는 것은 아니며 일부는 네트워크를 통한 데이터 셔플링과 관련된 변환과 같이 즉각적인 실행이 필요할 수 있다.
2.2. Transformation
-
Transformation이란, 기존 RDD에서 새로운 RDD를 생성하는 작업이다.
- Transformation은 느리게 평가된다.
- 즉시 실행을 트리거하지 않고 나중에 실행할 변환을 설명하는 계보 구축
- Transformation은 변경할 수 없다.
- 원래 RDD 또는 DataFrame을 수정하지 않고 새 변환을 생성
- Transformation은 데이터 셔플링이 필요한지 여부에 따라 좁은 변환 또는 넓은 변환으로 추가 분류될 수 있다.
1) RDD 함수
map(func)
: RDD의 각 요소에 함수를 적용하고 변환된 요소와 함께 새로운 RDD를 반환filter(func)
: 조건자 함수를 기반으로 RDD의 요소를 필터링하고 필터링된 요소가 있는 새 RDD를 반환flatMap(func)
: RDD의 각 요소에 함수를 적용하고 평평하게 변환된 요소가 있는 새 RDD를 반환groupByKey()
: 키로 RDD의 키-값 쌍을 그룹화하고 그룹과 함께 새 RDD를 반환reduceByKey(func)
: 동일한 키를 가진 RDD의 키-값 쌍 값에 이항 연산자를 적용하고 감소된 값을 가진 새 RDD를 반환sortByKey()
: RDD의 키-값 쌍을 키로 정렬하고 정렬된 쌍으로 새 RDD를 반환join(otherRDD)
: RDD의 키-값 쌍을 동일한 키를 가진 다른 RDD와 결합하고 결합된 쌍으로 새 RDD를 반환mapPartitions(func)
: RDD의 각 파티션에 함수를 적용하고 변환된 파티션이 있는 새 RDD를 반환coalesce(numPartitions)
: RDD의 파티션 수를 지정된 수로 줄임repartition(numPartitions)
: RDD의 파티션 수를 지정된 수로 늘리거나 줄임
2) DataFrame 함수
select(*cols)
: DataFrame에서 지정된 열을 선택하고 선택한 열이 포함된 새 DataFrame을 반환filter(condition)
: 지정된 조건을 기반으로 DataFrame의 행을 필터링하고 필터링된 행으로 새 DataFrame을 반환groupBy(*cols)
: DataFrame의 행을 지정된 열로 그룹화하고 그룹화된 행이 있는 새 DataFrame을 반환agg(*exprs)
: 지정된 집계를 DataFrame의 열에 적용하고 집계된 값이 포함된 새 DataFrame을 반환join(otherDF, joinExprs)
: 지정된 조인 식을 기반으로 DataFrame의 행을 다른 DataFrame과 조인하고 조인된 행이 있는 새 DataFrame 반환withColumn(colName, col)
: 지정된 이름 및 값 표현식을 사용하여 DataFrame에 새 열을 추가하고 추가된 열이 있는 새 DataFrame 반환orderBy(*cols, **kwargs)
: 지정된 열을 기준으로 DataFrame의 행을 정렬하고 정렬된 행으로 새 DataFrame 반환distinct()
: 원래 DataFrame의 고유한 행이 있는 새 DataFrame을 반환drop(*cols)
: DataFrame에서 지정된 열을 삭제하고 삭제된 열 없이 새 DataFrame을 반환selectExpr(*exprs)
: 지정된 표현식을 기반으로 DataFrame에서 열을 선택하고 선택한 열이 포함된 새 DataFrame을 반환
2.3. Action
-
Action이란, Transformation 실행을 트리거하고 드라이버 프로그램에 값을 반환하는 작업이다.
-
Action은 Spark가 지연된 계산을 평가하고 빌드된 Transformation의 DAG를 실행하는 지점이다.
-
Action은 데이터를 수정하거나 외부 저장소에 쓸 수 있다.
1) RDD 함수
count()
: RDD의 요소 수를 반환collect()
: RDD의 모든 요소를 드라이버 프로그램에 목록으로 반환take(n)
: RDD의 처음 n개 요소를 목록으로 반환first()
: RDD의 첫 번째 요소를 반환foreach(func)
: RDD의 각 요소에 함수를 적용reduce(func)
: RDD의 요소에 이진 연산자를 적용하고 결과를 반환saveAsTextFile(path)
: RDD의 요소를 지정된 경로의 텍스트 파일에 저장countByKey()
: RDD에 있는 각 키의 개수와 함께 사전을 반환foreachPartition(func)
: RDD의 각 파티션에 함수를 적용top(n)
: RDD의 상위 n개 요소를 목록으로 반환
2) DataFrame 함수
count()
: DataFrame의 행 수 반환collect()
: DataFrame의 모든 행을 행 목록으로 드라이버 프로그램에 반환show(n=20, truncate=True, vertical=False)
: DataFrame의 처음 n 행을 콘솔에 출력first()
: DataFrame의 첫 번째 행 반환head(n=1)
: DataFrame의 처음 n개 행을 행 목록으로 반환foreach(func)
: DataFrame의 각 행에 함수를 적용toPandas()
: DataFrame을 Pandas DataFrame으로 변환describe(*cols)
: DataFrame의 지정된 열에 대한 요약 통계를 새 DataFrame으로 반환write.save(path, format=None, mode=None, partitionBy=None, **options)
: DataFrame을 지정된 경로에 지정된 형식으로 저장groupBy(*cols).count()
: DataFrame의 행을 지정된 열로 그룹화하고 각 그룹의 개수와 함께 새 DataFrame을 반환
2.4. toDebugStrings()
toDebugString()
는 RDD 또는 DataFrame의 계보를 트리 형식으로 시각화하는 함수로 실행될 Transformation 순서를 이해하는 데 사용할 수 있다.
- 계보에는 데이터에 적용된 변환과 RDD 간의 종속성 표시
- 어떤 Transformation이 지연되고 어떤 Transformation이 실행되었는지 표시
1) 예시
-
각 라인은 계보의 RDD 또는 단계를 나타낸다.
-
위 단계로 갈 수록 후속 단계로 RDD에서 수행되는 작업을 의미
- 맨 위 단계는 최종 출력 RDD
- 맨 아래 단계는 원래 RDD
-
df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True)
df_filtered = df.filter(df["age"] > 30)
df_grouped = df_filtered.groupBy("gender").count()
print(df_grouped.rdd.toDebugString())
(2) MapPartitionsRDD[6] at count at <ipython-input-4-9d8378572d7b>:8 []
| ShuffledRowRDD[5] at groupBy at <ipython-input-4-9d8378572d7b>:8 []
| MapPartitionsRDD[4] at mapPartitions at PythonRDD.scala:427 []
| MapPartitionsRDD[3] at mapPartitions at PythonRDD.scala:427 []
| FileScanRDD[2] at csv at NativeMethodAccessorImpl.java:0 []
2) ShuffledRDD
및 MapPartitionsRDD
- 둘 다 모두 Spark의 RDD 유형이며 Spark의 DAG에서 서로 다른 계산 단계를 나타낸다.
MapPartitionsRDD
- 좁은 종속성
- 상위 RDD의 각 파티션을 새 파티션에 매핑하는 Transformation을 적용하여 생성되는 RDD
- 파티션 간에 데이터를 섞지 않고 주어진 함수를 각 파티션에 독립적으로 적용
- 함수의 결과는 파티션 간에 결합되어 최종 RDD 형성
- 결과 RDD는 상위 RDD와 동일한 수의 파티션을 가짐
- 예를 들어 RDD의 각 요소에 함수를 적용하는
map
작업은 일반적으로MapPartitionsRDD
로 구현됨
- 예를 들어 RDD의 각 요소에 함수를 적용하는
ShuffledRowRDD
과ShuffleRDD
- 광범위한 종속성
- groupBy 또는 join 작업과 같은 데이터 셔플이 필요한 변환에 의해 생성되는 RDD
- 데이터는 키를 기준으로 분할된 다음 네트워크를 통해 데이터가 필요한 노드로 섞임
- 특히 데이터가 크거나 키가 많은 경우 비용이 많이 드는 작업이 될 수 있으며 데이터가 네트워크를 통해 여러 번 전송될 수 있음
ShuffledRowRDD
: 셔플되는 데이터가 행 형식일 때 사용ShuffledRDD
: 셔플되는 데이터가 행 형식이 아닐 때 사용
- 두 RDD 유형의 주요 차이점
- MapPartitionsRDD는 로컬에서 실행할 수 있는 변환
- ShuffledRDD는 데이터 셔플이 필요하고 네트워크를 통한 데이터 이동이 있을 수 있는 변환
2) 최적화 및 디버깅에 유용
toDebugString()
의 출력을 해석하면 Spark 애플리케이션의 계산 그래프를 이해하고 최적화 및 디버깅하는 데 유용하다.- 계산 그래프에서 병목 현상을 식별하고 제거할 수 있는 불필요한 Transformation 파악
- 캐싱이 성능에 미치는 영향 파악
- 종속성이 넓은 단계가 있는 경우 코드에서 불필요한 셔플이 발생할 수 있다.
- 좁은 종속성 : 작업 간에 데이터를 섞이지 않음 (MapPartitionsRDD)
- 넓은 종속성 : 작업 간에 데이터가 섞이고 있음 (ShuffledRDD, ShuffledRowRDD)
- 변환 전
- groupByKey : 네트워크에서 모든 데이터를 섞은 다음 함께 그룹화하는 함수로 대규모 데이터 세트 처리 시 성능 문제 야기
- 아래 코드는 축소 작업이 일어나기 전에 네트워크 전체에서 불필요하게 데이터가 섞일 수 있음
rdd = sc.parallelize([-2, -1, 0, 1, 2, 3, 4, 5], 4)
rdd_filtered = rdd.filter(lambda x: x > 0)
rdd_mapped = rdd_filtered.map(lambda x: (x, 1))
rdd_grouped = rdd_mapped.groupByKey()
rdd_reduced = rdd_grouped.map(lambda x: (x[0], sum(x[1])))
print("rdd_reduced lineage: ", rdd_reduced.toDebugString())
rdd_reduced lineage:
(4) PythonRDD[10] at RDD at PythonRDD.scala:53 []
| MapPartitionsRDD[9] at mapPartitions at PythonRDD.scala:141 []
| ShuffledRDD[8] at partitionBy at NativeMethodAccessorImpl.java:0 []
+-(4) MapPartitionsRDD[7] at mapPartitions at PythonRDD.scala:141 []
| FilterRDD[6] at filter at PythonRDD.scala:89 []
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []
- 변환 후
- reduceByKey : 로컬에서 데이터를 먼저 줄인 다음 섞기 때문에 더 효율적
- 아래 코드로 수정 후 groupByKey와 sum이 없어져서 계보가 단순해짐
rdd = sc.parallelize([-2, -1, 0, 1, 2, 3, 4, 5], 4)
rdd_filtered = rdd.filter(lambda x: x > 0)
rdd_mapped = rdd_filtered.map(lambda x: (x, 1))
rdd_reduced = rdd_mapped.reduceByKey(lambda x, y: x+y)
print("rdd_reduced lineage: ", rdd_reduced.toDebugString())
rdd_reduced lineage:
PythonRDD[14] at RDD at PythonRDD.scala:53 []
| MapPartitionsRDD[13] at mapPartitions at PythonRDD.scala:141 []
| ShuffledRDD[12] at partitionBy at NativeMethodAccessorImpl.java:0 []
+-(4) MapPartitionsRDD[11] at mapPartitions at PythonRDD.scala:141 []
| FilterRDD[6] at filter at PythonRDD.scala:89 []
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []
2.5. Spark에서 중요성
- Lazy Evaluation은 가능한 마지막 순간까지 Transformation 실행을 연기함으로써 입력 데이터 및 사용 가능한 리소스의 특성을 기반으로 Transformation를 최적화할 수 있다.
- Transformation 실행 순서(시퀀스) 최적화
- 여러 Transformation 결합 및 불필요한 계산 최소화
- 네트워크에서 데이터 이동 최소화
- 예를 들어 Spark는 파티셔닝 및 캐싱 기술을 사용하여 이미 처리된 데이터를 다시 계산하지 않도록 할 수 있고, 이로 인해 성능이 크게 향상되고 리소스 사용량이 감소할 수 있다.
3. 생각
- Lazy Evaluation은 효율적이고 최적화된 데이터 처리를 가능하게 하는 Spark의 핵심 기능이다(계산 및 네트워크 오버헤드를 최소화).
- Lazy Evaluation를 통해 계산 최적화를 하고, 인메모리 처리 방식을 통해 중복 계산을 방지하는데 이 2가지의 시너지 효과 덕분에 Spark가 효율적인 처리가 가능한 것 같다.
- Lazy Evaluation이 계산 최적화를 해주지만, 네트워크 간 데이터 이동(Shuffle)을 최소화할 수 있도록
toDebugStrings()
등을 활용하여 코드적인 부분도 더 고민해봐야겠다.