RDD Operations - Understanding closures
23 August 2019
Spark의 가장 어려운 점 중 하나는 클러스터에서 코드를 실행할 때 변수 및 메서드의 범위와 수명주기를 이해하는 것입니다.
범위 밖의 변수를 수정하는 RDD 작업은 빈번한 혼동의 원인이 될 수 있습니다.
아래 예에서는 foreach ()를 사용하여 카운터를 늘리는 코드를 살펴 보지만 다른 작업에서도 비슷한 문제가 발생할 수 있습니다.
Example
아래의 간단한 합계 RDD를 고려해봅시다.
이 작업은 동일한 JVM 내에서 실행 중인지 여부에 따라 다르게 작동 할 수 있습니다.
일반적으로 로컬 모드에서 Spark를 실행하는 경우 (--master = local [n])
Spark 응용 프로그램을 클러스터에 배포하는 경우 (예 : spark-submit to YARN)
로 비교할 수 있습니다.
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);
// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);
println("Counter value: " + counter);
Local vs. cluster modes
위 코드의 동작은 정의되어 있지 않으며 의도한대로 작동하지 않을 수 있습니다.
작업을 실행하기 위해 Spark는 RDD 작업 처리를 task로 나누고 각 task는 실행 프로그램에서 실행됩니다.
실행하기 전에 Spark는 closure를 계산합니다.
closure는 실행 프로그램이 RDD에서 계산을 수행하기 위해 보여야하는 변수 및 메소드(이 경우 foreach ())입니다.
이 closure는 직렬화되어 각 실행자에게 전송됩니다.
각 실행 프로그램으로 전송된 closure 내의 변수가 이제 복사되므로 foreach 함수 내에서 참조되는 카운터는 더 이상 드라이버 노드의 카운터가 아닙니다.
드라이버 노드의 메모리에는 여전히 카운터가 있지만 더 이상 executor에게 보이지 않습니다!
실행자는 serialized된 closure의 사본만 볼 수 있습니다.
따라서 카운터의 모든 작업이 serialized된 closure 내에서 값을 참조하므로 카운터의 최종 값은 여전히 0입니다.
로컬 모드의 일부 상황에서는 foreach 함수는 실제로 드라이버와 동일한 JVM 내에서 실행되며 동일한 원래 카운터를 참조하여 실제로 업데이트 할 수 있습니다.
이러한 종류의 시나리오에서 올바르게 정의 된 동작을 보장하려면 Accumulator를 사용해야합니다.
Spark의 Accumulator는 실행이 클러스터의 작업자 노드간에 분할될 때 변수를 안전하게 업데이트하는 메커니즘을 제공하기 위해 특별히 사용됩니다.
이 가이드의 Accumulators 섹션에서 이에 대해 자세히 설명합니다.
일반적으로 loop나 지역변수로 이루어져 있는 closure는 전역 상태를 변경하는 용도로 사용되어서는 안됩니다.
Spark는 클로저 외부에서 참조된 객체에 대한 돌연변이 동작을 정의하거나 보장하지 않습니다.
이 작업을 수행하는 일부 코드는 로컬 모드에서 작동 할 수 있지만 우연히 발생하는 것이며 이러한 코드는 분산 모드에서 예상대로 작동하지 않습니다.
일부 글로벌 집계가 필요한 경우에는 Accumulator를 대신 사용하십시오.
Printing elements of an RDD
또 다른 일반적인 사례는 rdd.foreach (println) 또는 rdd.map (println)을 사용하여 RDD 요소를 인쇄하려고 하는 것입니다.
이 경우, 단일 머신에서는 예상대로 출력을 생성하고 모든 RDD 요소를 인쇄합니다.
그러나 클러스터 모드에서는 executor가 호출하는 stdout의 출력은 이제 드라이버가 아닌 executor의 stdout에 기록되므로 드라이버의 stdout은 이를 표시하지 않습니다!
드라이버의 모든 요소를 인쇄하려면 collect () 메소드를 사용하여 먼저 RDD를 드라이버 노드로 가져 오십시오. rdd.collect (). foreach (println).
그러나 collect()가 전체 RDD를 단일 시스템으로 가져 오기 때문에 드라이버의 메모리 부족이 발생할 수 있습니다.
만약 RDD의 일부 요소만 출력해야하는 경우에는 다음과 같이 take()를 사용하는 것이 안전한 방법입니다. : rdd.take(100).foreach(println)
참조 : https://spark.apache.org/docs/2.1.1/programming-guide.html#understanding-closures-a-nameclosureslinka