On the journey of
[PySPARK] 스파크 SQL과 데이터세트 본문
자바와 스칼라를 위한 단일 API
데이터세트는 강력한 형식의 객체를 위해 통합되고 단일한 API를 제공
- 오직 스칼라와 자바만이 강력하게 형식화된 타입으로 지정됨
- 파이썬과 R은 형식화되지 않은 타입의 데이터 프레임 API를 지원
데이터 세트는 데이터 프레임 API에서 익숙하게 사용되는 DSL 연산자나 함수형 프로그래밍을 사용하여 병렬로 작동할 수 있는 도메인별 형식화된 객체
데이터세트를 위한 스칼라 케이스 클래스와 자바빈
스파크는 작업 중 인코더를 통해 아래의 내부적 데이터 타입을 언어별 타입에 맞게 맵핑
- stringType
- BinaryType
- IntegerType
- BooleanType
- MapType
Dataset[T] 생성을 위한 예제 설명, T는 스칼라 객체이며 객체를 정의하는 case class가 필요하다.
아래와 같은 블로거 JSON파일을 예시로 설명! 티스토리는 json 언어설정이 없어서 텍스트로 적어둔다.
#JSON으로 작성!
{id: 1, first: "Jules", last: "Damji", url: "https://tinyurl.1", date:
"1/4/2016", hits: 4535, campaigns: {"twitter", "LinkedIn"}},
...
{id: 87, first: "Brooke", last: "Wenig", url: "https://tinyurl.2", date:
"5/5/2018", hits: 8908, campaigns: {"twitter", "LinkedIn"}}
분산된 Dataset[Bloggers] 생성하려면 각 개별 필드를 정의하는 스칼라 케이스 정의
// In Scala
case class Bloggers(
id:Int, first:String, last:String, url:String, date:String,
hits: Int, campaigns:Array[String]
)
정의 후 데이터 원본에서 파일 읽기
val bloggers = "../data/bloggers.json"
val bloggersDS = spark
.read
.format("json") .option("path", bloggers) .load()
.as[Bloggers]
분산 데이터 컬렉션 결과의 각 행은 Blogger 유형으로 정의된다.
자바에서도 자바빈 클래스를 생성한 다음 인코더를 사용해 똑같이 할 수 있다.
// In Java
import org.apache.spark.sql.Encoders;
import java.io.Serializable;
public class Bloggers implements Serializable {
private int id;
private String first;
private String last;
private String url;
private String date;
private int hits;
private Array[String] campaigns;
// JavaBean getters and setters
int getID() { return id; }
void setID(int i) { id = i; }
String getFirst() { return first; }
void setFirst(String f) { first = f; }
String getLast() { return last; }
void setLast(String l) { last = l; }
String getURL() { return url; }
void setURL (String u) { url = u; }
String getDate() { return date; }
Void setDate(String d) { date = d; }
int getHits() { return hits; }
void setHits(int h) { hits = h; }
Array[String] getCampaigns() { return campaigns; }
void setCampaigns(Array[String] c) { campaigns = c; } }
// Create Encoder
Encoder<Bloggers> BloggerEncoder = Encoders.bean(Bloggers.class);
String bloggers = "../bloggers.json"
Dataset<Bloggers>bloggersDS = spark
.read
.format("json")
.option("path", bloggers)
.load()
.as(BloggerEncoder);
- 데이터세트 API에서는 미리 데이터 유형을 정의하고 케이스 클래스 또는 자바빈 클래스가 스키마와 일치해야한다. 조건이 까다롭지만, 데이터세트 API로 작업하는 것은 쉽고 간결하며, 선언적이다. 또한 동일한 관계 연산자를 사용할 수 있다는 장점이 있다.
- 샘플 데이터셋을 생성하는 한가지 간단하고 동적인 방법은 SparkSession 인스턴스를 사용하는 것!
샘플 데이터 변환
- 데이터세트 : 도메인별 객체의 강력하게 정형화된 컬렉션
- 이러한 객체는 함수적 연산을 사용해 병렬로 변환할 수 있다.
- map(), reduce(), filter(), select(), aggregate()
- 이러한 객체는 함수적 연산을 사용해 병렬로 변환할 수 있다.
고차 함수 및 함수형 프로그래밍
- filter()로 사용량이 900분을 초과하는 dsUsage 데이터 세트의 모든 사용자 반환하는 방법
- filter 함수에 대한 인수로 함수 표현식 사용 ⇒ filter() 방법에 대한 인수로 람다 식 {d.usage > 900} 사용
- 함수를 정의하고 해당 함수를 filter() 함수의 인수로 제공 ⇒ def filterWithUsage(u: Usage) = u.dump > 900 사용해 스칼라 함수 정의
- 고차함수 map()을 사용해 계산된 값 반환하기
- MapFunction<T> 정의
- 익명 클래스 또는 MapFunction<T>을 상속받는 정의된 클래스일 수 있음
- 계산값이 어떤 사용자와 연관되어 있는지 확인하
- 추가 필드를 사용하여 스칼라 케이스 클래스 또는 자바빈 클래스인 UsageCost 생성
- 비용을 계산하는 함수를 정의하여 map() 함수에 사용
- 스칼라 예제 (단, 여기서 map()으로 계산된 새로운 열과 다른 모든 열을 가짐)
// cost 필드를 포함한 새로운 케이스 클래스 생성
case class UsageCost(uid: Int, uname: String, usage: Int, cost: Double)
// Usage를 인수로 사용하여 사용 비용을 계산
// 새로운 UsageCost 객체 반환
def ComputeUserCostUsage(u: Usage): UsageCost = {
val v = if (u.usage > 750) u.usage * 0.15 else u.usage * 0.50
UsageCost(u.uid, u.uname, u.usage, v)
}
// 기존 데이터세트에 map()을 사용
dsUsage.map(u => computeUserCostUsage(u)).show(5)
- Java 예제
// 자바빈 클래스를 위한 인코더 생성
Encoder<UsageCost> usageCostEncoder = Encoders.bean(UsageCost.class);
// 데이터에 map() 함수를 적용
dsUsage.map( (MapFunction<Usage, UsageCost>) u -> {
double v = 0.0;
if (u.usage > 750) v = u.usage * 0.15;
else v = u.usage * 0.50;
return new UsageCost(u.uid, u.uname, u.usage, v);
});
usageCostEncoder.show(5);
- 고차 함수 및 데이터세트 사용 시 주의점
- 입력된 JVM 객체를 함수에 대한 인수로 사용
- 읽기 쉽게 만든 도트 표기법을 사용하여 형식화된 JVM 객체 내의 개별 필드에 액세스
- 일부 기능 및 람다 시그니처는 형안전(type-safe)이 보장되어 컴파일 시점 오류 감지를 보장하고 스파크에게 어떤 데이터 유형, 어떤 작업을 수행할지 지시할 수 있음
- 코드는 람다 표현식의 자바 또는 스칼라 언어 기능을 사용 → 읽기 쉽고, 표현적이며, 간결
- 스파크는 자바나 스칼라의 고차 함수 생성자 없이도 map(), filter()와 동등한 기능을 제공 → 데이터세트 또는 데이터프레임에서 함수형 프로그래밍을 사용할 필요가 없음 단, 조건부 DSL 연산자나 SQL 표현식 사용가능
- 고차 함수 및 함수형 프로그래밍 in 데이터프레임
- 고차 함수 및 함수형 프로그래밍은 데이터세트에만 있는 것이 아니고 데이터 프레임과 함께 사용 가능하다. 데이터프레임은 Dataset[Row]이며, Row는 다양한 유형의 필드를 저장할 수 있는 일반 비정형 JVM 객체.
- 메서드 시그니처는 행에서 작동하는 식 또는 함수를 사용. 이는 각 행의 데이터 유형이 식 또는 함수에 대한 입력값이 될 수 있음을 의미
- 데이터세트의 경우 데이터 유형에 대한 JVM과 스파크의 내부 이진 형식 간 데이터를 효율적으로 변환하는 메커니즘인 인코더를 사용
- 고차 함수 및 함수형 프로그래밍 in 데이터프레임
데이터프레임을 데이터세트로 변환
- 기존 데이터 프레임 df를 SomeCaseClass 유형의 데이터 집합으로 변환하려면 df.as[SomeCaseClass] 표기법 사용
- 스칼라 예제
val bloggersDS = spark
.read.format("json")
.option("path", "/data/bloggers/bloggers.json")
.load()
.as[Blogger]
- spark.read.format(”json”): DataFrame<Row> 반환
- .as[Blogger]: 인코더를 사용하여 스파크 내부 메모리 표현에서 JVM Blogger 객체로 직렬화/역직렬화
- 인코더: JVM과 내부 텅스텐 포맷 사이를 직렬화하고 역직렬화하는 효율적인 메커니즘
데이터세트 및 데이터 프레임을 위한 메모리 관리
- 스파크는 집중적인 인메모리 분산 빅데이터 엔진 → 메모리를 효율적으로 사용하는 것이 실행 속도에 큰 영향
- Spark 1.0
- 메모리 스토리지, 직렬화 및 역직렬화에 RDD 기반 자바 객체 사용
- 리소스 측면에서 비용이 많이 들고 속다가 느림
- 스토리지가 자바 힙에 할당되어 대규모 데이터세트에 대한 JVM 가비지 컬렉션에 좌우됨
- 텅스텐(Spark 1.x에 도입된 프로젝트)
- 오프셋과 포인터를 사용하여 오프 힙 메모리에 데이터세트와 데이터 프레임을 배치하는 새로운 내부 행 기반 형식
- 오프 힙에 메모리를 직접 할당하는 것은 스파크가 GC(가비지 컬렉션)에 의해 받는 영향을 줄인다는 것을 의미
- Spark 2.x
- 전체 단계 코드 생성 및 벡터화된 칼럼 기반 메모리 레이아웃을 특징으로하는 2세대 텅스텐 엔진 도입 → 최신 컴파일러의 아이디어와 기술 기반으로 제작
- 빠른 병렬 데이터 액세스를 위하여 단일 명령, 다중 데이터(SIMD) 접근 방식의 최신 CPU 및 캐시 아키텍처 활용
데이터 집합 인코더
- 인코더는 오프 힙 메모리의 데이터를 스파크의 내부 텅스텐 포맷에서 JVM 자바 오브젝트로 변환
- 스파크 내부 형식에서 원시 데이터 유형을 포함한 JVM 객체로 데이터세트 객체를 직렬화하고 역직렬화함
- Encoder[T]는 스파크의 내부 텅스텐 형식에서 Dataset[T]로 변환됨
- 스파크는 원시 유형, 스칼라 케이스 클래스 및 자바빈에 대한 인코더를 자동으로 생성할 수 있는 내장 지원 기능을 가지고 있음
- 자바와 크리오(Kryo) 직렬화, 역직렬화에 비교했을 때 스파크 인코더는 상당히 빠름
스파크의 내부 형식과 자바 객체 형식 비교
- 스칼라의 경우, 스파크가 효율적인 변환을 위해 바이트 코드를 자동으로 생성함
- 자바 객체에는 헤더 정보, 해시 코드, 유니코드 정보 등 큰 오버헤드가 있음
- 스파크는 데이터세트 또는 데이터 프레임을 위한 JVM 기반 객체를 생성하는 대신, 오프 힙 자바 메모리를 할당하여 데이터를 레이아웃하고, 인코더를 사용하여 데이터를 메모리 내 표현에서 JVM 객체로 변환함
직렬화 및 역직렬화
- 직렬화 (인코딩) 송신자가 전송 가능한 형태(이진 형식)으로 만들어 객체들의 데이터가 연속적인 데이터로 변환된다. 스트림을 통해 데이터를 수신자가 읽을 수 있다.
- 역직렬화 (디코딩) 수신사에 의해 전송된 스트림 데이터를 다시 역으로 직렬화하여 객체의 형태로 만든다.
기본적으로 JVM에 자체 자바 직렬화기와 역직렬화기가 내장되어 있지만 비효율적이고 느린 데 비해, 데이터세트의 인코더는 몇가지 장점이 있다.
- 힙 메모리에 객체를 저장하며 크기가 작아서 공간을 적게 차지한다.
- 메모리 주소와 오프셋이 있는 간단한 포인터 연산으로 메모리를 가로질러 빠르게 직렬화한다.
- 이진 표현을 스파크 내부 표현으로 빠르게 역직렬화하기 때문에 JVM의 가비지 컬렉션(메모리 관리)로 인한 일시 중지에 영향을 받지 않는다.
데이터세트 사용 비용
데이터 프레임 vs 데이터 세트 : 데이터 세트 몇 가지 이점 존재 but 비용 듦 ex. filter(), map(), flatMap() 사용할 때 스파크 내부 텅스텐 형식 -> JVM 객체로 역직렬화 하는 비용
스파크 인코더 도입 전 경미한 비용 -> 대규모 데이터 세트, 쿼리 (비용 발생)
비용 절감 절략
- 과도한 직렬화, 역직렬화 완화하기 위한 전략
- 쿼리에서 DSL표현 사용, 람다를 고차 함수에 대한 인수로 과도하게 사용 (익명성 높이기) 람다 런타임까지 카탈리스트 옵티마이저에서 익명이며 명확하지 않음 = 사용자가 수행하는 작업을 식별할 수 없으므로 쿼리 최적화 불가
- 직렬화 및 역직렬화 최소화되도록 쿼리 함께 연결 (일반적)
- 람다 및 DSL로 쿼리를 연결하는 비효율적인 방법 vs 효율적인 방법 비교
- 비효율적인 방법은 텅스텐으로부터 직렬화하고 역직렬화 하는 부분을 반복적으로 수행한다.
- DSL만 사용하고 람다를 사용하지 않는 방법은 직렬화/역직렬화가 필요하지 않아 훨씬 효율적이다.
요약
- 스파크가 통합된 고차원 API의 일부인 데이터세트 구성을 수용하기위해, 어떻게 메모리를 관리하는 지 알아봤다.
- 데이터세트 사용과 관련된 일부 비용을 고려했고, 어떻게 비용을 줄일 수 있는 지 방법들을 알아봤다.
- 인코더가 스파크의 내부 텅스텐 이진 형식에서 JVM 객체로 직렬화/역직렬화하는 방법을 간단히 살펴봤다.
'Experiences & Study > PySPARK & Data Engineering' 카테고리의 다른 글
[PySPARK] 정형 스트리밍 Part.1 (0) | 2023.09.05 |
---|---|
[PySPARK] 스파크 애플리케이션의 최적화 및 튜닝 (0) | 2023.09.04 |
[PySPARK] 스파크 SQL과 데이터프레임 Part.2 (2) (0) | 2023.09.03 |
[PySPARK] 스파크 SQL과 데이터프레임 Part.2 (1) (0) | 2023.09.03 |
[PySPARK] 스파크 SQL과 데이터 프레임 Part 1 (0) | 2023.08.31 |