On the journey of

[PySPARK] 스파크 SQL과 데이터세트 본문

Experiences & Study/PySPARK & Data Engineering

[PySPARK] 스파크 SQL과 데이터세트

dlrpskdi 2023. 9. 3. 11:18

 

자바와 스칼라를 위한 단일 API

데이터세트는 강력한 형식의 객체를 위해 통합되고 단일한 API를 제공

  • 오직 스칼라와 자바만이 강력하게 형식화된 타입으로 지정됨
  • 파이썬과 R은 형식화되지 않은 타입의 데이터 프레임 API를 지원

데이터 세트는 데이터 프레임 API에서 익숙하게 사용되는 DSL 연산자나 함수형 프로그래밍을 사용하여 병렬로 작동할 수 있는 도메인별 형식화된 객체


데이터세트를 위한 스칼라 케이스 클래스와 자바빈

스파크는 작업 중 인코더를 통해 아래의 내부적 데이터 타입을 언어별 타입에 맞게 맵핑

  • stringType
  • BinaryType
  • IntegerType
  • BooleanType
  • MapType

Dataset[T] 생성을 위한 예제 설명, T는 스칼라 객체이며 객체를 정의하는 case class가 필요하다.

아래와 같은 블로거 JSON파일을 예시로 설명! 티스토리는 json 언어설정이 없어서 텍스트로 적어둔다.

#JSON으로 작성!
{id: 1, first: "Jules", last: "Damji", url: "https://tinyurl.1", date:
"1/4/2016", hits: 4535, campaigns: {"twitter", "LinkedIn"}},
...
{id: 87, first: "Brooke", last: "Wenig", url: "https://tinyurl.2", date:
"5/5/2018", hits: 8908, campaigns: {"twitter", "LinkedIn"}}

분산된 Dataset[Bloggers] 생성하려면 각 개별 필드를 정의하는 스칼라 케이스 정의

// In Scala
case class Bloggers(
	id:Int, first:String, last:String, url:String, date:String, 
	hits: Int, campaigns:Array[String]
)

정의 후 데이터 원본에서 파일 읽기

val bloggers = "../data/bloggers.json" 
val bloggersDS = spark
.read
.format("json") .option("path", bloggers) .load()
.as[Bloggers]

분산 데이터 컬렉션 결과의 각 행은 Blogger 유형으로 정의된다.

자바에서도 자바빈 클래스를 생성한 다음 인코더를 사용해 똑같이 할 수 있다.

// In Java
import org.apache.spark.sql.Encoders; 
import java.io.Serializable;
public class Bloggers implements Serializable { 
	private int id;
	private String first;
	private String last;
	private String url;
	private String date;
	private int hits;
	private Array[String] campaigns;
  // JavaBean getters and setters
	int getID() { return id; }
	void setID(int i) { id = i; }
	String getFirst() { return first; } 
	void setFirst(String f) { first = f; } 
	String getLast() { return last; } 
	void setLast(String l) { last = l; } 
	String getURL() { return url; }
	void setURL (String u) { url = u; } 
	String getDate() { return date; } 
	Void setDate(String d) { date = d; } 
	int getHits() { return hits; }
	void setHits(int h) { hits = h; }

Array[String] getCampaigns() { return campaigns; } 
void setCampaigns(Array[String] c) { campaigns = c; } }

// Create Encoder
Encoder<Bloggers> BloggerEncoder = Encoders.bean(Bloggers.class);
String bloggers = "../bloggers.json"
Dataset<Bloggers>bloggersDS = spark
  .read
  .format("json")
  .option("path", bloggers)
  .load()
  .as(BloggerEncoder);
  • 데이터세트 API에서는 미리 데이터 유형을 정의하고 케이스 클래스 또는 자바빈 클래스가 스키마와 일치해야한다. 조건이 까다롭지만, 데이터세트 API로 작업하는 것은  쉽고 간결하며, 선언적이다. 또한 동일한 관계 연산자를 사용할 수 있다는 장점이 있다. 
  • 샘플 데이터셋을 생성하는 한가지 간단하고 동적인 방법은 SparkSession 인스턴스를 사용하는 것!
샘플 데이터 변환
  • 데이터세트 : 도메인별 객체의 강력하게 정형화된 컬렉션
    • 이러한 객체는 함수적 연산을 사용해 병렬로 변환할 수 있다.
      • map(), reduce(), filter(), select(), aggregate()
고차 함수 및 함수형 프로그래밍
  • filter()로 사용량이 900분을 초과하는 dsUsage 데이터 세트의 모든 사용자 반환하는 방법
    • filter 함수에 대한 인수로 함수 표현식 사용 ⇒ filter() 방법에 대한 인수로 람다 식 {d.usage > 900} 사용
    • 함수를 정의하고 해당 함수를 filter() 함수의 인수로 제공 ⇒ def filterWithUsage(u: Usage) = u.dump > 900 사용해 스칼라 함수 정의
  • 고차함수 map()을 사용해 계산된 값 반환하기
    • MapFunction<T> 정의
    • 익명 클래스 또는 MapFunction<T>을 상속받는 정의된 클래스일 수 있음
  • 계산값이 어떤 사용자와 연관되어 있는지 확인하
    1. 추가 필드를 사용하여 스칼라 케이스 클래스 또는 자바빈 클래스인 UsageCost 생성
    2. 비용을 계산하는 함수를 정의하여 map() 함수에 사용
  • 스칼라 예제 (단, 여기서 map()으로 계산된 새로운 열과 다른 모든 열을 가짐)
// cost 필드를 포함한 새로운 케이스 클래스 생성
case class UsageCost(uid: Int, uname: String, usage: Int, cost: Double)

// Usage를 인수로 사용하여 사용 비용을 계산
// 새로운 UsageCost 객체 반환
def ComputeUserCostUsage(u: Usage): UsageCost = {
	val v = if (u.usage > 750) u.usage * 0.15 else u.usage * 0.50
	UsageCost(u.uid, u.uname, u.usage, v)
}

// 기존 데이터세트에 map()을 사용
dsUsage.map(u => computeUserCostUsage(u)).show(5)
  • Java 예제
// 자바빈 클래스를 위한 인코더 생성
Encoder<UsageCost> usageCostEncoder = Encoders.bean(UsageCost.class);

// 데이터에 map() 함수를 적용
dsUsage.map( (MapFunction<Usage, UsageCost>) u -> {
	double v = 0.0;
	if (u.usage > 750) v = u.usage * 0.15;
	else v = u.usage * 0.50;
	return new UsageCost(u.uid, u.uname, u.usage, v);
});

usageCostEncoder.show(5);
  • 고차 함수 및 데이터세트 사용 시 주의점
    • 입력된 JVM 객체를 함수에 대한 인수로 사용
    • 읽기 쉽게 만든 도트 표기법을 사용하여 형식화된 JVM 객체 내의 개별 필드에 액세스
    • 일부 기능 및 람다 시그니처는 형안전(type-safe)이 보장되어 컴파일 시점 오류 감지를 보장하고 스파크에게 어떤 데이터 유형, 어떤 작업을 수행할지 지시할 수 있음
    • 코드는 람다 표현식의 자바 또는 스칼라 언어 기능을 사용 → 읽기 쉽고, 표현적이며, 간결
    • 스파크는 자바나 스칼라의 고차 함수 생성자 없이도 map(), filter()와 동등한 기능을 제공 → 데이터세트 또는 데이터프레임에서 함수형 프로그래밍을 사용할 필요가 없음 단, 조건부 DSL 연산자나 SQL 표현식 사용가능
      • 고차 함수 및 함수형 프로그래밍 in 데이터프레임
        • 고차 함수 및 함수형 프로그래밍은 데이터세트에만 있는 것이 아니고 데이터 프레임과 함께 사용 가능하다. 데이터프레임은 Dataset[Row]이며, Row는 다양한 유형의 필드를 저장할 수 있는 일반 비정형 JVM 객체.
        • 메서드 시그니처는 행에서 작동하는 식 또는 함수를 사용. 이는 각 행의 데이터 유형이 식 또는 함수에 대한 입력값이 될 수 있음을 의미
      • 데이터세트의 경우 데이터 유형에 대한 JVM과 스파크의 내부 이진 형식 간 데이터를 효율적으로 변환하는 메커니즘인코더를 사용

데이터프레임을 데이터세트로 변환
  • 기존 데이터 프레임 df를 SomeCaseClass 유형의 데이터 집합으로 변환하려면 df.as[SomeCaseClass] 표기법 사용
  • 스칼라 예제
val bloggersDS = spark
	.read.format("json")
	.option("path", "/data/bloggers/bloggers.json")
	.load()
	.as[Blogger]
  • spark.read.format(”json”): DataFrame<Row> 반환
  • .as[Blogger]: 인코더를 사용하여 스파크 내부 메모리 표현에서 JVM Blogger 객체로 직렬화/역직렬화
    • 인코더: JVM과 내부 텅스텐 포맷 사이를 직렬화하고 역직렬화하는 효율적인 메커니즘
데이터세트 및 데이터 프레임을 위한 메모리 관리
  • 스파크는 집중적인 인메모리 분산 빅데이터 엔진 → 메모리를 효율적으로 사용하는 것이 실행 속도에 큰 영향
  • Spark 1.0
    • 메모리 스토리지, 직렬화 및 역직렬화에 RDD 기반 자바 객체 사용
    • 리소스 측면에서 비용이 많이 들고 속다가 느림
    • 스토리지가 자바 힙에 할당되어 대규모 데이터세트에 대한 JVM 가비지 컬렉션에 좌우됨
  • 텅스텐(Spark 1.x에 도입된 프로젝트)
    • 오프셋과 포인터를 사용하여 오프 힙 메모리에 데이터세트와 데이터 프레임을 배치하는 새로운 내부 행 기반 형식
    • 오프 힙에 메모리를 직접 할당하는 것은 스파크가 GC(가비지 컬렉션)에 의해 받는 영향을 줄인다는 것을 의미
  • Spark 2.x
    • 전체 단계 코드 생성 및 벡터화된 칼럼 기반 메모리 레이아웃을 특징으로하는 2세대 텅스텐 엔진 도입 → 최신 컴파일러의 아이디어와 기술 기반으로 제작
    • 빠른 병렬 데이터 액세스를 위하여 단일 명령, 다중 데이터(SIMD) 접근 방식의 최신 CPU 및 캐시 아키텍처 활용
데이터 집합 인코더
  • 인코더는 오프 힙 메모리의 데이터를 스파크의 내부 텅스텐 포맷에서 JVM 자바 오브젝트로 변환
    • 스파크 내부 형식에서 원시 데이터 유형을 포함한 JVM 객체로 데이터세트 객체를 직렬화하고 역직렬화함
    • Encoder[T]는 스파크의 내부 텅스텐 형식에서 Dataset[T]로 변환됨
  • 스파크는 원시 유형, 스칼라 케이스 클래스 및 자바빈에 대한 인코더를 자동으로 생성할 수 있는 내장 지원 기능을 가지고 있음
    • 자바와 크리오(Kryo) 직렬화, 역직렬화에 비교했을 때 스파크 인코더는 상당히 빠름
스파크의 내부 형식과 자바 객체 형식 비교
  • 스칼라의 경우, 스파크가 효율적인 변환을 위해 바이트 코드를 자동으로 생성함
  • 자바 객체에는 헤더 정보, 해시 코드, 유니코드 정보 등 큰 오버헤드가 있음
  • 스파크는 데이터세트 또는 데이터 프레임을 위한 JVM 기반 객체를 생성하는 대신, 오프 힙 자바 메모리를 할당하여 데이터를 레이아웃하고, 인코더를 사용하여 데이터를 메모리 내 표현에서 JVM 객체로 변환함
직렬화 및 역직렬화
  • 직렬화 (인코딩) 송신자가 전송 가능한 형태(이진 형식)으로 만들어 객체들의 데이터가 연속적인 데이터로 변환된다. 스트림을 통해 데이터를 수신자가 읽을 수 있다.
  • 역직렬화 (디코딩) 수신사에 의해 전송된 스트림 데이터를 다시 역으로 직렬화하여 객체의 형태로 만든다.

기본적으로 JVM에 자체 자바 직렬화기와 역직렬화기가 내장되어 있지만 비효율적이고 느린 데 비해, 데이터세트의 인코더는 몇가지 장점이 있다.

  1. 힙 메모리에 객체를 저장하며 크기가 작아서 공간을 적게 차지한다.
  2. 메모리 주소와 오프셋이 있는 간단한 포인터 연산으로 메모리를 가로질러 빠르게 직렬화한다.
  3. 이진 표현을 스파크 내부 표현으로 빠르게 역직렬화하기 때문에 JVM의 가비지 컬렉션(메모리 관리)로 인한 일시 중지에 영향을 받지 않는다.
데이터세트 사용 비용

데이터 프레임 vs 데이터 세트 : 데이터 세트 몇 가지 이점 존재 but 비용 듦 ex. filter(), map(), flatMap() 사용할 때 스파크 내부 텅스텐 형식 -> JVM 객체로 역직렬화 하는 비용

스파크 인코더 도입 전 경미한 비용 -> 대규모 데이터 세트, 쿼리 (비용 발생)

비용 절감 절략
  • 과도한 직렬화, 역직렬화 완화하기 위한 전략
  1. 쿼리에서 DSL표현 사용, 람다를 고차 함수에 대한 인수로 과도하게 사용 (익명성 높이기) 람다 런타임까지 카탈리스트 옵티마이저에서 익명이며 명확하지 않음 = 사용자가 수행하는 작업을 식별할 수 없으므로 쿼리 최적화 불가
  2. 직렬화 및 역직렬화 최소화되도록 쿼리 함께 연결 (일반적)
  • 람다 및 DSL로 쿼리를 연결하는 비효율적인 방법 vs 효율적인 방법 비교
    • 비효율적인 방법은 텅스텐으로부터 직렬화하고 역직렬화 하는 부분을 반복적으로 수행한다.
    • DSL만 사용하고 람다를 사용하지 않는 방법은 직렬화/역직렬화가 필요하지 않아 훨씬 효율적이다.
요약
- 스파크가 통합된 고차원 API의 일부인 데이터세트 구성을 수용하기위해, 어떻게 메모리를 관리하는 지 알아봤다.
- 데이터세트 사용과 관련된 일부 비용을 고려했고, 어떻게 비용을 줄일 수 있는 지 방법들을 알아봤다.
- 인코더가 스파크의 내부 텅스텐 이진 형식에서 JVM 객체로 직렬화/역직렬화하는 방법을 간단히 살펴봤다.