Generic Load/Save Functions

11 September 2019

가장 간단한 형식에서, 기본 데이터 소스(parquet)는 모든 작업에 사용됩니다.

Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

Manually Specifying Options


데이터 소스에 전달하려는 추가 옵션과 함께 사용될 데이터 소스를 수동으로 지정할 수도 있습니다.
데이터 소스는 완전한 이름(예 : org.apache.spark.sql.parquet)으로 지정되지만 내장 소스의 경우 짧은 이름 (json, parquet, jdbc, orc, libsvm, csv, text)을 사용할 수도 있습니다.
모든 데이터 소스 유형에서 로드된 DataFrame은 syntax를 사용하여 다른 유형으로 변환될 수 있습니다.

JSON 파일을 로드하려면 다음을 사용할 수 있습니다.
Dataset<Row> peopleDFCsv = spark.read().format("csv")
  .option("sep", ";")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("examples/src/main/resources/people.csv");

추가 옵션은 쓰기 작업 중에도 사용됩니다.
예를 들어 ORC 데이터 소스의 블룸 필터 및 사전 인코딩을 제어 할 수 있습니다.
다음 ORC 예제는 favorite_color에 블룸 필터를 만들고 name 및 favorite_color에 사전 인코딩을 사용합니다.
Parquet의 경우 parquet.enable.dictionary도 있습니다.

usersDF.write().format("orc")
  .option("orc.bloom.filter.columns", "favorite_color")
  .option("orc.dictionary.key.threshold", "1.0")
  .save("users_with_options.orc");

Run SQL on files directly


read API를 사용하여 파일을 DataFrame에 로드하고 쿼리하는 대신, SQL을 사용하여 해당 파일을 직접 쿼리 할 수도 있습니다.

Dataset sqlDF =
  spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");

Save Modes


Save 기능은 선택적으로 SaveMode를 취할 수 있으며, 이는 데이터가 존재하는 경우 처리하는 방법을 지정합니다.
이러한 SaveMode를는 Lock을 사용하지도 않고, atomic 이지도 않습니다.
또한 덮어 쓰기를 수행하면 새 데이터를 쓰기 전에 기존 데이터가 삭제됩니다.

SaveMode.ErrorIfExists (default)
데이터 소스에 DataFrame을 저장할 때 데이터가 이미 존재하면 예외가 발생합니다.

SaveMode.Append
데이터 소스에 DataFrame을 저장할 때 데이터 / 테이블이 이미 존재하는 경우 DataFrame의 내용이 기존 데이터에 추가 될 것으로 예상됩니다.

SaveMode.Overwrite
덮어 쓰기 모드는 DataFrame을 데이터 소스에 저장할 때 데이터 / 테이블이 이미 존재하는 경우 DataFrame의 내용으로 기존 데이터를 덮어 씁니다.

SaveMode.Ignore
Ignore 모드는 데이터 소스에 DataFrame을 저장할 때 데이터가 이미 존재하는 경우 저장 작업을 수행하면 DataFrame의 내용을 저장하지 않고 기존 데이터를 변경하지 않습니다.
이것은 SQL에 존재하는 CREATE TABLE IF NOT EXISTS 과 유사합니다.

Saving to Persistent Tables


saveAsTable 명령을 사용하여 DataFrame을 Hive 메타 스토어에 저장할 수도 있습니다.
이 기능을 사용하기 위해 기존 Hive 배포는 필요하지 않습니다.
Spark는 기본 로컬 Hive 메타 스토어(Derby 사용)를 생성할 것입니다.
createOrReplaceTempView 명령과 달리 saveAsTable은 DataFrame의 내용을 구체화하고 Hive 메타 스토어의 데이터에 대한 포인터를 만듭니다.
동일한 메타 스토어에 대한 연결을 유지하는 한 Spark 프로그램이 재시작된 후에도 영구 테이블이 계속 존재합니다.
DataFrame은 SparkSession에 테이블 이름을 명시하여 table 메서드를 호출함으로써 table에 생성될 수 있습니다.
text, parquet, json 등의 파일 기반 데이터소스에서는, path 옵션을 이용해 custom table을 명시할 수 있습니다. (예 : df.write.option("path", "/some/path").saveAsTable("t").)
custom 테이블이 삭제되면 사용자 정의 테이블 경로는 제거되지 않고 테이블 데이터가 여전히 존재합니다.
사용자 정의 테이블 경로를 지정하지 않으면 Spark는 웨어 하우스 디렉토리 아래의 기본 테이블 경로에 데이터를 씁니다.
default 테이블이 삭제되면 default 테이블 경로도 제거됩니다.

Spark 2.1부터 영구 데이터 소스 테이블에는 Hive 메타 스토어에 저장된 파티션 별 메타 데이터가 있습니다.
이러한 동작은 여러 가지 이점이 있습니다. :

- 메타 스토어는 쿼리에 필요한 파티션만 리턴 할 수 있으므로 첫 번째 쿼리의 모든 파티션을 테이블로 감지하는 것은 더 이상 필요하지 않습니다.
- ALTER TABLE PARTITION ... SET LOCATION과 같은 Hive DDL을 이제 Datasource API로 작성된 테이블에 사용할 수 있습니다.

외부 데이터 소스 테이블을 생성 할 때 기본적으로 파티션 정보는 수집되지 않습니다.
메타 스토어에서 파티션 정보를 동기화하기 위해 MSCK REPAIR TABLE을 호출 할 수 있습니다.

Bucketing, Sorting and Partitioning


파일 기반 데이터 소스의 경우 출력을 버킷 및 정렬 또는 분할할 수도 있습니다.
bucketing 및 정렬은 영구 테이블에만 적용됩니다.:

peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");

데이터 세트 API를 사용할 때 파티셔닝은 save 및 saveAsTable과 함께 사용할 수 있습니다.

usersDF
  .write()
  .partitionBy("favorite_color")
  .format("parquet")
  .save("namesPartByColor.parquet");

단일 테이블에 파티셔닝과 bucketing을 모두 사용할 수 있습니다.

peopleDF
  .write()
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("people_partitioned_bucketed");

partitionBy는 Partition Discovery 섹션에 설명 된대로 디렉토리 구조를 만듭니다.
따라서 카디널리티가 높은 열에는 적용이 제한됩니다.
반대로 bucketBy는 고정 된 수의 버킷에 데이터를 분산 시키며 여러 고유 한 값이 제한되지 않은 경우 사용할 수 있습니다.


https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options

이 문서는 개인적인 목적이나 배포하기 위해서 복사할 수 있다. 출력물이든 디지털 문서든 각 복사본에 어떤 비용도 청구할 수 없고 모든 복사본에는 이 카피라이트 문구가 있어야 한다.