create first app

17 May 2020

scala언어를 이용해, 가장 기본적인 application 예제를 생성해보도록 하겠습니다. maven이나 gradle과 같은 build tool로는 널리 사용되고 있는 sbt를 사용하고, spark는 현재 최신 버전인 2.4.5를 사용하도록 하겠습니다. 가장 먼저 아래와 같이 build.sbt를 작성해주도록 합니다. - build.sbt name := "spark-application-example" version := "0.1-SNAPSHOT" scalaVersion := "2.11.12"

read more

create first spark test

17 May 2020

scalatest를 이용하여, spark app을 테스트하는 예제를 작성해보도록 하겠습니다. 기본 템플릿은 이전 create first app을 참고 부탁드립니다. 기존에 만들어진 FirstApp을 기반으로, scalatest를 작성해보도록 합니다. 가장 먼저 test실행시 사용할 BaseSpec을 작성해주도록 합니다. - BaseSpec.scala package org.shashaka.test

read more

Performance Tuning

16 September 2019

일부 워크로드의 경우 메모리에 데이터를 캐싱하거나 일부 옵션을 설정하여 성능을 향상시킬 수 있습니다. Caching Data In Memory Spark SQL은 spark.catalog.cacheTable("tableName") 또는 dataFrame.cache()를 호출하여 메모리 내에 테이블을 캐시 할 수 있습니다. 그런 다음 Spark SQL은 필요한 열만 검색하고 자동으로 압축을 조정하여 메모리 사용 및 GC를 최소화합니다. spark.catalog.uncacheTable("tableName")을 호출하여 테이블을 메모리에서 제거 할 수도 있습니다. 메모리 내 캐싱 구성은 SparkSession에서 setConf 메소드를 사용하거나 SQL에서 SET key = value 명령을 실행하여 수행 할 수 있습니다. - spark.sql.inMemoryColumnarStorage.compressed Default : true Description: true로 설정하면 Spark SQL은 데이터 통계에 따라 각 열에 대한 압축 코덱을 자동으로 선택합니다. - spark.sql.inMemoryColumnarStorage.batchSize Default: 10000 Description: 컬럼 캐싱에 대한...

read more

Generic Load/Save Functions

11 September 2019

가장 간단한 형식에서, 기본 데이터 소스(parquet)는 모든 작업에 사용됩니다. Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet"); usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet"); Manually Specifying Options 데이터 소스에 전달하려는 추가 옵션과 함께 사용될 데이터 소스를 수동으로 지정할 수도 있습니다. 데이터 소스는 완전한 이름(예 : org.apache.spark.sql.parquet)으로 지정되지만 내장 소스의 경우 짧은 이름 (json, parquet, jdbc, orc, libsvm, csv, text)을 사용할 수도 있습니다. 모든 데이터 소스 유형에서 로드된 DataFrame은 syntax를 사용하여 다른 유형으로 변환될 수 있습니다. JSON 파일을 로드하려면 다음을 사용할 수 있습니다. Dataset<Row> peopleDFCsv = spark.read().format("csv") .option("sep", ";") .option("inferSchema", "true") .option("header", "true") .load("examples/src/main/resources/people.csv"); 추가 옵션은 쓰기 작업 중에도 사용됩니다. 예를 들어 ORC 데이터 소스의 블룸 필터 및 사전 인코딩을...

read more

Aggregations

10 September 2019

내장 DataFrames 함수는 count(), countDistinct(), avg(), max(), min() 등과 같은 공통 집계를 제공합니다. 이러한 기능은 DataFrames용으로 설계되었지만 Spark SQL에는 Scala 및 Java의 일부 기능에 대해 강력한 형식의 데이터 집합을 사용할 수있는 버전도 있습니다. 또한 사용자는 사전 정의된 집계 함수 뿐만 아니라 자체적으로 작성한 함수도 사용가능합니다.

read more

Creating DataFrames

10 September 2019

read more

Creating Datasets

10 September 2019

데이터 세트는 RDD와 유사하지만, 데이터 세트는 Java 직렬화 또는 Kryo 대신 특수 인코더를 사용하여 객체를 직렬화하고 네트워크를 통해 전송합니다. 인코더와 표준적인 직렬화는 둘다 객체를 바이트로 변환하는 역할을 합니다. 하지만 인코더는 코드를 동적으로 생성하며 Spark가 바이트를 객체로 역직렬화하지 않고 필터링, 정렬 및 해싱과 같은 많은 작업을 수행 할 수있는 형식을 사용합니다. import java.util.Arrays; import java.util.Collections; import java.io.Serializable;

read more

Creating Datasets

10 September 2019

Spark SQL의 임시 뷰는 세션 범위이며 이를 생성하는 세션이 종료되면 사라집니다. 모든 세션에서 공유되는 임시 뷰를 원하고 Spark 애플리케이션이 종료 될 때까지 활성 상태로 유지하려면 global temporary view를 작성할 수 있습니다. Global temporary view는 데이터베이스 global_temp에 연결되어 있으며 정규화된 이름을 사용하여 참조해야합니다 (예 : select * FROM global_temp.view1.) // Register the DataFrame as a global temporary view df.createGlobalTempView("people");

read more

Interoperating with RDDs

10 September 2019

Spark SQL은 기존 RDD를 데이터 세트로 변환하는 두 가지 방법을 지원합니다. 첫 번째 방법은 리플렉션을 사용하여 특정 유형의 객체를 포함하는 RDD의 스키마를 유추합니다. 이 리플렉션 기반 접근 방식은 보다 간결한 코드로 연결되며 Spark 애플리케이션을 작성하는 동안 이미 스키마를 알고있을 때 잘 작동합니다. 데이터 집합을 만드는 두 번째 방법은 스키마를 구성한 다음 기존 RDD에 적용 할 수 있는 프로그래밍 인터페이스를 사용하는 것입니다. 이 방법은 더 장황하지만 런타임까지 열과 해당 유형을 알 수 없는 경우에도 데이터 집합을 구성 할 수 있습니다. Inferring the Schema Using Reflection Spark SQL은 JavaBeans RDD를 DataFrame으로 자동 변환하는 기능을 지원합니다. 리플렉션을 사용하여 얻은 BeanInfo는 테이블의 스키마를 정의합니다....

read more

Running SQL Queries Programmatically

10 September 2019

SparkSession의 sql 함수를 사용하면 응용 프로그램에서 프로그래밍 방식으로 SQL 쿼리를 실행하고 결과를 Dataset<Row>로 반환 할 수 있습니다. import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row;

read more

Untyped Dataset Operations

10 September 2019

DataFrames는 구조화된 데이터 조작을 위한 Scala, Java, Python, R에 대한 도메인 별 언어를 제공합니다. Spark 2.0에서 DataFrames는 Scala 및 Java API에서 제공되는 rows 데이터 집합입니다. 이러한 유형의 작업은 강력한 유형의 스칼라 / 자바 데이터 세트와 함께 제공되는 "유형 변환"과 달리 "비 유형 변환"이라고도합니다. 아래에서 dataSet을 활용한 구조화된 데이터 작업의 예를 확인할 수 있습니다.

read more

Cluster Mode Overview

01 September 2019

Components

read more

Shared Variables

25 August 2019

일반적으로 Spark 작업 (예 : map 또는 reduce)에 전달 된 함수가 원격 클러스터 노드에서 실행될 때는 함수에 사용된 모든 변수의 별도 사본을 사용해 작동합니다. 이러한 변수는 각 machine에 복사되며 원격 machine의 변수에 대한 업데이트는 드라이버 프로그램으로 다시 전파되지 않습니다. 일반적으로, 공유 변수를 각각의 task 사이에서 읽고 쓰는 것은 비효율적입니다. 그러나 Spark는 broadcast 변수와 accumulator라는 두 가지 일반적인 사용 패턴에 대한 공유 변수를 제공합니다. Broadcast Variables 브로드 캐스트 변수를 사용하면 task에 사본을 제공하지 않고 각각의 machine에 캐시된 read-only 변수를 제공할 수 있습니다. 예를 들어, 모든 노드에 효율적인 방식으로 큰 입력 데이터 세트의 사본을 제공하는 데 사용할 수 있습니다. Spark는 효율적인 브로드...

read more

RDD Operations - Actions

24 August 2019

다음 표는 Spark에서 지원하는 몇 가지 일반적인 작업을 보여줍니다. RDD API 문서 (Scala, Java, Python, R)를 참조하십시오. 자세한 내용은 RDD 함수 doc (Scala, Java)을 참조하십시오. * reduce(func) 함수 func 두 개의 인수를 사용하여 하나를 반환)를 사용하여 데이터 집합의 요소를 집계합니다. 함수는 병렬로 올바르게 계산될 수 있도록 정류적이고 연관성이 있어야합니다. * collect() 데이터 세트의 모든 요소를 ​​드라이버 프로그램에 배열로 반환합니다. 이것은 일반적으로 데이터의 작은 하위 집합을 반환하는 필터 또는 다른 작업 후에 유용합니다. * count() 데이터 세트의 전체 요소의 갯수를 반환합니다. * first() 데이터 세트의 가장 처음 요소를 반환합니다. take(1) 메서드 호출과 비슷한 기능입니다. * take(n) 데이터 세트의 처음 n개 요소가있는...

read more

RDD Operations - RDD Persistence

24 August 2019

Spark에서 가장 중요한 기능 중 하나는 여러 작업에서 전달하는 메모리의 데이터 집합을 유지(또는 캐싱)하는 것입니다. RDD의 결과값을 저장하면서, 각각의 노드는 다른 파티션의 메모리에서 계산된 내용을 저장하고 계산된 데이터 세트(혹은 데이터 세트로부터 파생된 값)에 대한 Actino이 수행될 때 그 값을 재사용합니다. 이를 통해 향후 작업이 훨씬 빨라질 수 있습니다 (보통 10배 이상). 캐싱은 반복 알고리즘과 빠른 대화식 사용을 위한 핵심 도구입니다. persist() 또는 cache() 메소드를 사용하여 해당 RDD가 메모리에 지속되도록 표시 할 수 있습니다. 해당 값들은 Action에서 처음 계산 될 때 노드의 메모리에 유지됩니다. 스파크의 캐시는 실패에 대한 대응이 되어있기 때문에, RDD의 파티션이 손실되면 원래 RDD를 생성한 변환을 사용하여 RDD가 자동으로...

read more

RDD Operations - Shuffle operations

24 August 2019

Spark 내의 특정 작업은 셔플이라고하는 이벤트를 트리거합니다. 셔플 (shuffle)은 데이터를 재분배하여 Spark간에 다르게 그룹화되는 Spark의 메커니즘입니다. 이것은 일반적으로 executor와 machine간에 데이터를 복사하고, 복잡하고 값비싼 작업인 shuffle을 만드는 것을 포함하고 있습니다.

read more

RDD Operations - Transformations

24 August 2019

다음 표는 Spark에서 지원하는 몇 가지 일반적인 변환을 보여줍니다. 자세한 내용은 RDD API 문서(Scala, Java, Python, R) 및 pair RDD 함수 문서(Scala, Java)를 참조하십시오. * map(func) 소스의 각 요소를 함수 func를 통해 전달하여 형성된 새로운 분산 데이터 세트를 반환합니다. * filter(func) 해당 func이 true인 경우만을 택하는 새로운 분산 데이터 세트를 반환합니다. * flatMap(func) map과 비슷하지만 각 입력 항목에 대해 0개 이상의 출력 항목을 생성할 수 있습니다. 따라서 func은 단일 항목이 아닌 Seq를 반환해야합니다. * mapPartitions(func) map과 유사하지만 RDD의 각 파티션(블록)에서 별도로 실행되므로 func는 T 유형의 RDD에서 실행될 때 Iterator <T> => Iterator <U> 유형으로 바뀌게 됩니다. * mapPartitionsWithIndex(func) mapPartitions와 유사하지만...

read more

RDD Operations - Working with Key-Value Pairs

23 August 2019

대부분의 Spark 작업은 모든 유형의 객체가 포함된 RDD에서 작동하지만 일부 특수한 작업은 키-값 쌍의 RDD에서만 사용할 수 있습니다. 가장 일반적인 예는 요소를 key를 기준으로 그룹화 또는 집계하는 것과 같은 분산된 "suffle"작업입니다. Java에서 키-값 쌍은 Scala 표준 라이브러리의 scala.Tuple2 클래스를 사용하여 표시됩니다. new Tuple2(a, b)를 호출하여 튜플을 만들고 나중에 tuple._1 () 및 tuple._2 ()를 사용하여 해당 필드에 액세스 할 수 있습니다. 키-값 쌍의 RDD는 JavaPairRDD 클래스로 표시됩니다. mapToPair 및 flatMapToPair와 같은 특수한 경우의 map 작업을 사용하여 JavaRDD로부터 JavaPairRDD를 만들 수 있습니다. 예를 들어, 다음 코드는 키-값 쌍에서 reduceByKey 연산을 사용하여 파일에서 각 텍스트 행이 몇 번 발생하는지 계산합니다.

read more

RDD Operations - Passing Functions to Spark

23 August 2019

Spark의 API는 클러스터 위에서 동작하기 위해서 driver프로그램의 passing function에 크게 의존합니다. Java에서 함수는 org.apache.spark.api.java.function 패키지의 인터페이스를 구현하는 클래스로 표시됩니다. 이러한 기능을 만드는 방법에는 두 가지가 있습니다. - 익명의 내부 클래스 또는 명명 된 클래스로 자체 클래스에서 함수 인터페이스를 구현하고 해당 인스턴스를 Spark에 전달하십시오. - lambda expression을 사용하여 구현을 간결하게 정의하십시오. 이 가이드의 대부분은 간결성을 위해 람다 구문을 사용하지만, 동일한 API를 모두 긴 형식으로 사용하는 것 역시 쉽습니다. 예를 들어, 다음과 같이 코드를 작성할 수 있습니다.

read more

RDD Operations - Understanding closures

23 August 2019

Spark의 가장 어려운 점 중 하나는 클러스터에서 코드를 실행할 때 변수 및 메서드의 범위와 수명주기를 이해하는 것입니다. 범위 밖의 변수를 수정하는 RDD 작업은 빈번한 혼동의 원인이 될 수 있습니다. 아래 예에서는 foreach ()를 사용하여 카운터를 늘리는 코드를 살펴 보지만 다른 작업에서도 비슷한 문제가 발생할 수 있습니다.

read more

RDD Operations - Basic

21 August 2019

RDD는 아래 두가지 타입의 operation을 지원합니다. transformation : 기존 데이터에서 새 데이터 세트를 생성하는 작업. action: dataSet에 대한 계산을 실행 한 후 드라이버 프로그램에 값을 반환하는 작업. 예를 들어, map은 함수를 통해 각 데이터 집합 요소를 전달하고 새 RDD를 결과로 반환하는 transformation입니다. 반면에 reduce는 일부 함수를 사용하여 RDD의 모든 요소를 ​​집계하고 최종 결과를 드라이버 프로그램으로 리턴하는 조치입니다 (분산 dataSet를 리턴하는 병렬 reduceByKey도 있음). Spark의 모든 변환은 lazy 기반이라 결과를 그때 그때 바로 계산합니다. 대신,일부 기본 데이터 세트(예 : 파일)에 적용된 변환 만 기억합니다. transformation은 드라이버 프로그램으로 결과를 리턴해야하는 경우에만 실행됩니다. 이 디자인은 Spark가 보다 효율적으로 실행되도록합니다. 예를 들어, map을 통해...

read more

External Datasets

19 August 2019

Spark는 로컬 파일 시스템, HDFS, Cassandra, HBase, Amazon S3 등 Hadoop이 지원하는 모든 스토리지 소스에서 분산 데이터 세트를 생성 할 수 있습니다. Spark는 텍스트 파일, SequenceFile 및 기타 Hadoop InputFormat을 지원합니다. 텍스트 파일 RDD는 SparkContext의 textFile 메서드를 사용하여 만들 수 있습니다. 이 메소드는 파일의 URI (시스템의 로컬 경로 또는 hdfs : //, s3a : // 등의 URI)를 가져 와서 각각의 행으로 이루어진 collection처럼 읽습니다. 다음은 호출 예제입니다.

read more