Shared Variables
25 August 2019
일반적으로 Spark 작업 (예 : map 또는 reduce)에 전달 된 함수가 원격 클러스터 노드에서 실행될 때는 함수에 사용된 모든 변수의 별도 사본을 사용해 작동합니다.
이러한 변수는 각 machine에 복사되며 원격 machine의 변수에 대한 업데이트는 드라이버 프로그램으로 다시 전파되지 않습니다.
일반적으로, 공유 변수를 각각의 task 사이에서 읽고 쓰는 것은 비효율적입니다.
그러나 Spark는 broadcast 변수와 accumulator라는 두 가지 일반적인 사용 패턴에 대한 공유 변수를 제공합니다.
Broadcast Variables
브로드 캐스트 변수를 사용하면 task에 사본을 제공하지 않고 각각의 machine에 캐시된 read-only 변수를 제공할 수 있습니다.
예를 들어, 모든 노드에 효율적인 방식으로 큰 입력 데이터 세트의 사본을 제공하는 데 사용할 수 있습니다.
Spark는 효율적인 브로드 캐스트 알고리즘을 사용하여 브로드 캐스트 변수를 배포하여 통신 비용을 줄입니다.
스파크 작업은 분산된 "shuffle"작업으로 분리 된 일련의 단계를 통해 실행됩니다.
Spark는 각 단계에서의 작업에 필요한 공통 데이터를 자동으로 브로드 캐스트합니다.
이 방법으로 데이터는 직렬화되어 브로드캐스트되고, 각각의 task가 수행되기 전에 역직렬화됩니다.
즉, 브로드 캐스트 변수를 명시적으로 생성하는 것은 여러 단계의 작업에 동일한 데이터가 필요하거나 데이터를 역직렬화 된 형식으로 캐싱하는 것이 중요한 경우에만 유용합니다.
브로드 캐스트 변수는 SparkContext.broadcast(v)를 호출하여 변수 v에서 작성됩니다.
브로드 캐스트 변수는 v를 감싸고 있는 rapper이며 value 메소드를 호출하여 해당 값에 액세스 할 수 있습니다.
아래 코드는 이것을 보여줍니다:
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3]
브로드 캐스트 변수가 작성된 후에는 v가 노드에 두 번 이상 전달되지 않도록 클러스터에서 실행되는 모든 함수에서 값 v 대신 브로드캐스트 변수를 사용해야합니다.또한 모든 노드가 브로드 캐스트 변수의 동일한 값을 갖도록 오브젝트 v가 브로드 캐스트 된 후에 수정해서는 안됩니다.(예 : 변수가 나중에 새 노드에 전달되는 경우)
Accumulators
Accumulator는 연관성있고 상호작용적인 연산을 통해서만 "추가"되는 변수이므로 병렬로 효율적으로 지원 될 수 있습니다.
이것은 mapReduce와 같이 갯수를 세거나 또는 합계를 구현하는 데 사용할 수 있습니다.
Spark는 기본적으로 숫자 유형의 accumulator를 지원하며 프로그래머는 새로운 유형에 대한 지원을 추가 할 수 있습니다.
사용자는 이름이 붙거나 붙지않은 Accumulator를 만들 수 있습니다.
Long 또는 Double 유형의 값을 각각 누적하기 위해 SparkContext.longAccumulator() 또는 SparkContext.doubleAccumulator()를 호출하여 숫자 누적기를 만들 수 있습니다.
그런 다음 add 메소드를 사용하여 클러스터에서 해당 작업을 추가 할 수 있습니다.
그러나 각각의 클러스터는 값을 읽을 수 없습니다.
드라이버 프로그램만이 value 메서드를 사용하여 누산기의 값을 읽을 수 있습니다.
아래 코드는 배열 요소를 더하는 데 사용되는 accumulator를 보여줍니다.
LongAccumulator accum = jsc.sc().longAccumulator();
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
accum.value();
// returns 10
이 코드에서는 long 타입을 지원하는 이미 지원되는 accumulator를 사용하였지만, 프로그래머는 AccumulatorV2의 subclass를 구현하여 새로운 타입을 지원할 수도 있습니다.AccumulatorV2 abstract 클래스에는 재정의해야하는 몇 가지 메소드가 있습니다:
accumulator를 0으로 리셋하는 reset 메서드,
accumulator에 값을 추가하는 add 메서드,
accumulator에 다른 같은 종류의 accumulator를 병합하는 merge 메서드가 있습니다.
재정의해야하는 다른 method에 대한 설명은 API 설명서에 포함되어 있습니다.
예를 들어, 수학 벡터를 나타내는 MyVector 클래스를 구현한다고 가정하면 아래와 같습니다:
class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {
private MyVector myVector = MyVector.createZeroVector();
public void reset() {
myVector.reset();
}
public void add(MyVector v) {
myVector.add(v);
}
...
}
// Then, create an Accumulator of this type:
VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
// Then, register it into spark context:
jsc.sc().register(myVectorAcc, "MyVectorAcc1");
프로그래머가 자체 AccumulatorV2 유형을 정의 할 때 결과 유형은 추가된 요소의 유형과 다를 수 있습니다.Warning: Spark 작업이 완료되면 Spark는 이 작업에서 누적된 업데이트를 accumulator에 병합하려고 시도합니다.
이 작업이 실패하면 Spark는 실패를 무시하고 작업을 성공으로 표시하고 다른 작업을 계속 실행합니다.
따라서 버그가있는 accumulator는 Spark 작업에 영향을 미치지 않지만 Spark 작업이 성공하더라도 올바르게 업데이트되지 않을 수 있습니다.
내부 작업 만 수행하는 accumulator 업데이트의 경우 Spark는 각 작업의 accumulator 업데이트가 한 번만 적용된다는 것을 보장합니다.
예를 들어, 재시작된 작업은 기존의 accumulator에 다시 업데이트하지 않습니다.
하지만 transformation 작업에서, 각각의 task가 재실행되면 한 번 이상 업데이트될 수 있다는 사실을 유저는 인지하고 있어야 합니다.
RDD의 operation 내에서 update가 있을 경우, RDD가 operation의 일부로 계산된 후에만 값이 갱신됩니다.
결과적으로 map()과 변환 내에서 accumulator 업데이트가 실행되는 것은 보장되지 않습니다.
아래 코드는 이 속성을 보여줍니다.
LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.
참조 : https://spark.apache.org/docs/2.1.1/programming-guide.html#shared-variables