[spark를 이용한 빅데이터 분석 by 안창배 튜터]
[1] 스파크(spark): 컴퓨터와 데이터, 메모리, 클라우드
[2] 스파크(spark): 병렬/분산처리, 샘플링, 분할, Dask, 자동화
1. Spark란?
1) 스파크 등장 배경
- 과거엔 대규모 연산 및 데이터 처리를 프로세서 성능 향상에 맡겼으나, 물리적 방열 등 단일 CPU 기능 개선에는 한계가 있었음
- 하여, 모든 코어가 같은 속도로 동작하는 병렬 CPU 코어를 더 많이 추가하는 방향으로 발전 선회
- 데이터 수집 비용이 저렴해져 데이터 클러스터에서 처리해야 할 만큼 거대해짐
2) 스파크의 역사
- 요약: 스파크는 많은 변화를 겪어 왔으며, 최소 2022년 이후 자료를 살펴보는 것을 추천
- UC 버클리 대학교에서 2009년 스파크 연구 프로젝트로 시작
- 1.0 이전 스파크 초기 버전은 함수형 연산 관점에서 API를 정의
- 스파크 1.0 버전부터 구조화된 데이터를 기반으로 동작하는 신규 API인 스파크 SQL 추가
- 이후 DataFrame, 머신러닝 파이프라인, 자동 최적화를 수행하는 구조의 스트리밍 등 더 강력한 구조체 기반 신규 API 추가
3) 아파치 스파크의 철학
- 통합: 데이터 읽기부터 SQL/머신러닝/스트림까지, 같은 연산 엔진과 일관성 있는 API로 수행 가능하도록 설계 (하나의 프레임워크)
- 컴퓨팅 엔진: 데이터를 연산하는 역할만 수행할 뿐 영구 저장소 역할은 수행하지 않음 (저장소 연동 필요)
- 라이브러리: 엔진에서 제공하는 표준 라이브러리와 오픈 소스(서드파트 패키지 형태)로 제공하는 다양한 외부 라이브러리 지원
2. How Spark Works?
1) 스파크의 기본 구조
① Driver와 Executor
- 관리자인 Driver와 일꾼인 Executor로 구성되며, Cluster Manager가 전반적인 과정을 다 통제함
└ Driver가 일을 나눠주는 역할을 하는데, 스파크 언어 API를 통해 다양한 언어(Pyspark, Scala 등)로 표현
└ 실제 일하는 것은 Executor로, 내부 스파크 코드를 수행함
② JVM와 Scala, Pyspark
- Spark는 Java를 기반으로 개발되어 JVM(Java Virtual Machine)을 통해 구동
└ 단, 파이썬 코드와 통용되는 Pyspark로도 어지간한 것은 감당 가능해 Java와 Scala까지는 안 가도 됨
③ 분산 처리와 Partition
- Partition이란, 모든 executor가 병렬로 일할 수 있도록 데이터를 쪼갠 단위
└ 다른 말로 하면, 클러스터의 하나의 물리적 시스템에 있는 row의 모음 (전체 데이터 쪼개놓은 것 중 하나)
└어느 데이터가 어느 파티션에 들어가도록 직접 조작은 불가하나, 하이레벨 설계만 가능(실행은 스파크가 알아서)
└ 특정 Partition의 메모리 부하가 커지면 OOM으로 스파크가 종료될 수 있음
- Repartition을 통해 파티션 재구성 가능하며, 적절한 Repartition으로 밸런스 조정
- Partition, Repartition 사이의 데이터 통신은 네트워크 통신이므로, 비싼 연산임(ex. 정렬)
└ 가능하면 partition 내부에서 연산이 가능하도록 하는 것이 좋음
2) 스파크의 특징
① Transformation: 게으른 컴퓨터(Lazy computation)
- Spark는 바로바로 주어진 변환Transformation을 수행하지 않음
- 일을 미루고 여러 변환들을 차곡차곡 쌓아둔 뒤, 한 번에 최적화 수행
② Action: 리니지(Lineage)와 Optimization
- 변환이 필요한 일을 수행하도록 구동하는 작업이 Action
└ count, collect, take, top, show, write, toDF 등
- 이때 스파크가 의도적으로 미루어둔 쌓인 일은 Lineage로, 전체 Lineage를 보고 필요 없는 연산은 제거
- Lineage로 병목현상이나 OOM이 확인되기도 함
③ RDD, Dataset, DataFrame
- 2014년 정식 출시된 Spark v1은 RDD API를 이용하여 데이터를 처리했지만, 불편함이 많았음
- Spark 1.3d에서 처리 속도 증가를 위해 DataFrame이 소개되었으며, 현재 PySpark는 대부분 이를 기반으로 함
- Spark 1.6에서 Dataset이 추가되었으며 다양한 용도가 있음
3) Spark 작업의 수행 과정
① DataFrame 등을 이용해 코드 작성
② 정상적인 코드라면 스파크가 논리적 실행 계획으로 변환
└ Driver와 Execute의 정보를 고려하지 않고, 논리적인 가능성만 참조
③ 논리적 실행 계획을 물리적 실행 계획으로 변환
└ 변환 과정에서 테이블의 크기와 파티션 수 등을 고려해 추가 최적화 가능 여부 확인
④ 클러스터에서 물리적 실행 계획(RDD 처리)을 실행
4) Spark 데이터 타입
- Pandas와는 미묘하게 다르지만, 유사
5) 스파크 대시보드
3. 스파크 꿀팁 기능
1) 브로드 캐스트
- 모든 Worker에게 큰 규모의 입력 데이터셋을 효율적으로 제공할 때 사용하는 방법
- 메모리를 효율적으로 사용하는 방법론 중 하나
- Spark에서 브로드캐스트는 모든 Executor가 접근 가능한 공통의 변수
└ spark.sparkContext.broadcast()
└ 너무 큰 데이터는 부하가 심해질 수 있음
- 모든 Executor가 공유하되 반대로 그 값이 가변하는 값은 어큐물레이터
- (cf) 브로드 캐스트는 numpy나 pandas에서도 존재하는 개념 ex) df['num'] + 1
2) Join
- 두 개의 테이블을 병합하는 것으로 pandas의 merge 같은 기능
- 테이블 크기에 따라 전략이 다르나 결국 클러스터 간 통신을 하느냐, 마느냐의 문제
- Big to Big join: 셔플 조인이라고도 부르며, 두 개의 큰 테이블을 병합
└ 모든 Executor가 다른 Executor와 통신
└ 데이터가 어떻게 분할되어있느냐에 따라 혼잡도가 달라짐
- Big to Small join: 하나의 큰 테이블과 다른 작은 테이블을 병합
└ 작은 테이블을 복사 떠서 모든 Executor에게 전송
└ 처음에 대규모 통신을 하고, 이후 노드간 통신이 필요없어 보통 더 빠른 작업이 가능
- Small to Small join: 2개의 작은 테이블 병합. 시간도 오래 안걸리고 공간 차지도 적음
- 기타 Join 팁
└ 미리 데이터가 잘 분할되어 있다면, 셔플 조인도 빠르게 수행 가능
└ 어떤 Join은 Filter의 역할을 수행하기 때문에, Join 순서에 따라 속도가 달라질 수도 있음
3) 고급 I/O
- Inferschema: 데이터 타입을 자동으로 추론하는 기능
└ 다소 느리고 의도하지 않은 타입이 할당되기도 하므로 아예 스키마를 강제하는 것도 좋은 방법
- Write options
└ mode: 'overwrite'라고 명시하지 않으면, 기존의 파일이 있을 때 에러가 남
└ partitionBy: 어떤 칼럼을 기준으로 데이터를 분할해 저장할지 명시
└ maxPartitionBytes: 분할된 파일의 최대 크기를 조정
- 파일 크기 관리가 굉장히 중요한데, 일반적으로 500MB ~ 2GB 정도가 적절
└ 작은 파일을 많이 생성하면 메타 데이터 관리에 부하 발생
└ 파일이 커지면 일부 row만 필요하더라도 전체 데이터 블록 읽어야 하기 때문에 비효율적
- 여러 익스큐터가 같은 파일을 동시에 읽을 수 없지만, 여러 파일을 동시에 읽을 수는 있음
└ 쓰기 작업 전 파티션 수를 조절로 병렬 처리 파일 수 제어 가능
- 복합 데이터 유형(nested)은 모든 파일 포맷에서 적합하지 않으므로 비추천
'TIL' 카테고리의 다른 글
[240321] 클러스터링 분석 - ② 차원 축소: 주성분 분석(PCA)과 t-SNE (0) | 2024.03.21 |
---|---|
[240320] 파이썬으로 하는 클러스터링 분석 - ① 기본 개념과 거리 계산 (0) | 2024.03.20 |
[240319] 파이썬: 코드카타 44 & SQL: 코드카타 156~157 (0) | 2024.03.19 |
[240318] 스파크(spark): 병렬/분산처리, 샘플링, 분할, Dask, 자동화 (3) | 2024.03.18 |
[240318] 파이썬: 코드카타 43 & SQL: 코드카타 149~155 (0) | 2024.03.18 |