RDD Operations - Understanding closures

23 August 2019

Spark의 가장 어려운 점 중 하나는 클러스터에서 코드를 실행할 때 변수 및 메서드의 범위와 수명주기를 이해하는 것입니다.
범위 밖의 변수를 수정하는 RDD 작업은 빈번한 혼동의 원인이 될 수 있습니다.
아래 예에서는 foreach ()를 사용하여 카운터를 늘리는 코드를 살펴 보지만 다른 작업에서도 비슷한 문제가 발생할 수 있습니다.

Example


아래의 간단한 합계 RDD를 고려해봅시다.
이 작업은 동일한 JVM 내에서 실행 중인지 여부에 따라 다르게 작동 할 수 있습니다.
일반적으로 로컬 모드에서 Spark를 실행하는 경우 (--master = local [n])
Spark 응용 프로그램을 클러스터에 배포하는 경우 (예 : spark-submit to YARN)
로 비교할 수 있습니다.

int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);

Local vs. cluster modes


위 코드의 동작은 정의되어 있지 않으며 의도한대로 작동하지 않을 수 있습니다.
작업을 실행하기 위해 Spark는 RDD 작업 처리를 task로 나누고 각 task는 실행 프로그램에서 실행됩니다.
실행하기 전에 Spark는 closure를 계산합니다.
closure는 실행 프로그램이 RDD에서 계산을 수행하기 위해 보여야하는 변수 및 메소드(이 경우 foreach ())입니다.
이 closure는 직렬화되어 각 실행자에게 전송됩니다.

각 실행 프로그램으로 전송된 closure 내의 변수가 이제 복사되므로 foreach 함수 내에서 참조되는 카운터는 더 이상 드라이버 노드의 카운터가 아닙니다.
드라이버 노드의 메모리에는 여전히 카운터가 있지만 더 이상 executor에게 보이지 않습니다!
실행자는 serialized된 closure의 사본만 볼 수 있습니다.
따라서 카운터의 모든 작업이 serialized된 closure 내에서 값을 참조하므로 카운터의 최종 값은 여전히 ​​0입니다.

로컬 모드의 일부 상황에서는 foreach 함수는 실제로 드라이버와 동일한 JVM 내에서 실행되며 동일한 원래 카운터를 참조하여 실제로 업데이트 할 수 있습니다.

이러한 종류의 시나리오에서 올바르게 정의 된 동작을 보장하려면 Accumulator를 사용해야합니다.
Spark의 Accumulator는 실행이 클러스터의 작업자 노드간에 분할될 때 변수를 안전하게 업데이트하는 메커니즘을 제공하기 위해 특별히 사용됩니다.
이 가이드의 Accumulators 섹션에서 이에 대해 자세히 설명합니다.

일반적으로 loop나 지역변수로 이루어져 있는 closure는 전역 상태를 변경하는 용도로 사용되어서는 안됩니다.
Spark는 클로저 외부에서 참조된 객체에 대한 돌연변이 동작을 정의하거나 보장하지 않습니다.
이 작업을 수행하는 일부 코드는 로컬 모드에서 작동 할 수 있지만 우연히 발생하는 것이며 이러한 코드는 분산 모드에서 예상대로 작동하지 않습니다.
일부 글로벌 집계가 필요한 경우에는 Accumulator를 대신 사용하십시오.

Printing elements of an RDD


또 다른 일반적인 사례는 rdd.foreach (println) 또는 rdd.map (println)을 사용하여 RDD 요소를 인쇄하려고 하는 것입니다.
이 경우, 단일 머신에서는 예상대로 출력을 생성하고 모든 RDD 요소를 인쇄합니다.
그러나 클러스터 모드에서는 executor가 호출하는 stdout의 출력은 이제 드라이버가 아닌 executor의 stdout에 기록되므로 드라이버의 stdout은 이를 표시하지 않습니다!
드라이버의 모든 요소를 ​​인쇄하려면 collect () 메소드를 사용하여 먼저 RDD를 드라이버 노드로 가져 오십시오. rdd.collect (). foreach (println).
그러나 collect()가 전체 RDD를 단일 시스템으로 가져 오기 때문에 드라이버의 메모리 부족이 발생할 수 있습니다.
만약 RDD의 일부 요소만 출력해야하는 경우에는 다음과 같이 take()를 사용하는 것이 안전한 방법입니다. : rdd.take(100).foreach(println)

참조 : https://spark.apache.org/docs/2.1.1/programming-guide.html#understanding-closures-a-nameclosureslinka

이 문서는 개인적인 목적이나 배포하기 위해서 복사할 수 있다. 출력물이든 디지털 문서든 각 복사본에 어떤 비용도 청구할 수 없고 모든 복사본에는 이 카피라이트 문구가 있어야 한다.