본문 바로가기

TIL

[240318] 스파크(spark): 병렬/분산처리, 샘플링, 분할, Dask, 자동화

[spark를 이용한 빅데이터 분석 by 안창배 튜터]

 

[1] 스파크(spark): 컴퓨터와 데이터, 메모리, 클라우드 

 

1. 병렬 처리

1) 병렬 처리란? 
- 여러 개의 코어에서 동시 다발적으로 일을 나누어서 수행하는 것이 병렬처리 

- 주로 단일 컴퓨터(머신) 내의 여러 코어나 프로세서를 사용하여 작업을 동시에 수행

- 파이썬은 기본적으로 1개의 코어를 사용하나 특정 명령어 사용 시 여러 개 코어 사용 가능 

 └ 파이썬에서는 Multiprocessing이나 joblib이 흔히 사용

 └ sklearn의 njobs가 코어 개수를 제어하는 기능


2) 분산 처리

- 네트워크를 통해 연결된 여러 대의 컴퓨터(노드)가 각각의 작업을 수행하는 것

- Spark를 포함해 Ray, Dask 등의 라이브러리 사용 가능

🖥️ CPU vs GPU
- 스파크는 기본적으로 CPU를 사용
- LLM의 발달로 GPU에 대한 수요가 크게 늘었고, 스파크도 GPU 기반 병렬처리를 지원(단, 별도 영역)

 

3) 병렬 처리 vs. 벡터화(Vectorize)

- 전처리 단계에서 벡터화(Vectorize) 여부에 따라 수십 수백배 이상 속도 차이 발생 

- 한 기기의 메모리 안에서 처리가 가능한 규모라면, Spark보다는 연산 최적화하는 것을 권장

- Numpy(Pandas 포함)는 벡터화 연산을 통해 단순 반복작업을 굉장히 빠르게 처리

 └ 특히 for문의 순회를 최적화하는데 많이 사용

 └  개별 머신은 벡터화를 수행하여 속도 최적화

 

4) 병렬처리의 기술 (참고)

- Pickling과 Job broker

 └ Pickling: 스파크 포함 분산/병렬처리 도구는 내부에서 데이터를 Picklie로 변환하여 송수신 함
 └ Job broke: 분산/병렬 처리시 어떤 프로세서에 어떤 데이터를 보내고, 어떤 작업을, 어떻게 취합할지 관리하는 매니저

   · 스파크에서는 클러스터 매니저Cluster Manager가 Yarn, Mesos, Kube 등 이용
   · 병렬 처리에서 명시적으로 드러나는 매니저는 없지만, 내부적으로 해당 기능이 필요

   · Airflow의 경우 Celery나 Rabbit MQ 등을 사용할 수 있습니다

- 스레드와 프로세스
 └ 프로세스는 독립된 메모리 공간을 가지는 반면, 스레드는 같은 메모리 공간을 공유한다는 것이 가장 큰 차이

 └ 파이썬은 멀티프로세싱이 일반적이며, 메모리가 독립돼 있어 각 프로세스가 자원을 공유하지 못 하는 것이 특징 

- Joblib과 머신러닝
 └ Joblib: 파이썬 제공하는 병렬처리 패키지 중 가장 쉽고 안정적으로 쓸 수 있는 패키지(in Sklearn)
 └ k개의 멀티프로세싱을 할 때 k개 완료되어야 다음 순회로 넘어가 약간 비효율적인 것이 단점

     · 20개 작업을 4개의 프로세서로 멀티프로세싱을 하면? 1~4 끝나야 5~8로 넘어감
 └ 그 외 대안으로는 multiprocessing 패키지도 많이 사용

     · 주피터 노트북에서 안되는 것이 단점 

 

2. 대용량 데이터 처리하는 법 (Without Spark)

1) 샘플링(Sampling)

① Random sampling 

- 가장 쉽고 단순한 방법이나 문제가 많아 비추천

- 표본이 우연히 편향될 수 있고, 매번 결과가 달라지는 것이 큰 단점 

② Stratified sampling (층화추출)


- 특정 집단 편향을 줄이기 위해 집단별로 특정 비율을 뽑는 층화추출 방법

- 머신러닝 내 분류Classification 문제에서 자주 다룸 
- 랜덤 샘플링과 유사하게, 매번 결과가 달라진다는 문제가 있음

③ Systematic sampling (체계적/계통 샘플링) ★

3을 기준으로 3씩 건너 뛰는 ID만 추출


- ID와 같이, 패턴이 없거나 순서에 영향을 받지 않는 칼럼을 기준으로 추출하는 방법론 

└ ID 등 기준 칼럼에 패턴이 있는지는 한 번 검사해보는 것을 추천 

- 건너뛰는 단위는 소수(3, 5, 7, 11 등)를 사용하는 것이 특정 패턴을 피하기에 좋음
└ 예를 들어 짝수를 사용할 경우, 짝/홀에 대한 패턴이 잡힐 수 있기 때문

 

▶ 샘플링 시나리오 사례: 커머스의 캠페인 매출 효과 분석 (Spark 활용)

더보기

- 시나리오
 └ 10만명의 유저가 상품 X와 관련된 캠페인 C에 참여 

 └ 상품 X는 다른 프로모션 P에서도 활용이 되었고, 오가닉 판매도 있음 

 └ 상품 X의 판매에 대해 캠페인 C가 얼마나 기여했는지 정량적인 측정이 필요한 상황
- 분석 계획
 (1) 캠페인에 참여한 유저 목록을 테이블로 추출
 (2) 각 유저에 대해, 최근(보통 1주일) 해당 상품에 어떤 경로로 얼마나 노출되었는지 집계         

예시:  프로모션 1회, 자연 노출 2회, 캠페인 1회


 (3)  매출액을 각 비율에 맞추어 분배합니다
     └ 특정 유저의 매출이 10만원이면, 기여도는 프로모션/캠페인 각 2.5만원(25%), 자연 노출 5만원(50%)
     └ 최근 노출에 가중치를 주거나, 세션 내 구매면 바로 직전 노출에 기여도를 할당하는 등 variation 가능

 (4) 각 유저에 대한 복잡한 연산이 들어가므로, 일부 유저에 한 해 먼저 검사하며 로을 개발

 (5) 전체 유저에 대해서는 Spark나 분할처리 사용


2) 분할 처리

- 컴퓨터 메모리에 전체 데이터의 10%가 올라가는 것이 한계라면, 10%씩 나누어 10번 작업하는 것 

- 샘플링과 궁합이 좋은 방법론

- 데이터가 완벽하게 분할되는 경우에 한 해 가능(소셜 네트워크와 같은 그래프는 독립성이 떨어져 분할이 어려움)


① 데이터 분할 설계
- 단일 기기에서 최대 처리 가능한 유저의 수에 따라 소수(ex. 13, 17..) 개로 데이터 분할 partition  

- 13개 파티션을 user_id 기준으로 정의한다면? 

 └ i번째 파티션은 user_id를 13으로 나눈 나머지가 i인 유저의 집단
 └ ex) 14 % 13 = 1이므로, user_id가 14인 user는 1번 파티션에 할당

 └ 분할 대상 칼럼이 숫자가 아니어도, 해싱을 통해 수치형으로 변환 가능


② 집계 데이터 저장
- 각 파티션의 데이터를 불러와, 필요한 연산을 수행한 뒤 최종 결과만을 집계하여 저장 

 └ 모든 분할된 데이터를 하나의 컴퓨터로 올릴 순 없어도, 집계 결과는 한 컴퓨터에서 확인 가능하기 때문

 └ 각 유저와 상품에 대한 모든 이벤트를 포함한 전체 데이터보다 집계된 데이터가 훨씬 작아서 가능! 

- 집계는 결과만 딱 남기는 것보다는, 중간 계산 과정이 있어야 추후 검사가 가능하므로 유저 단위로 남기는 게 좋음

#결과값만 남기는 것은 지양
{
     "campaign_c":568244534.772,
     "promotion_p":420385423.312,
     "organic":15532259.246
}
    
# 유저별 데이터 등 중간 계산 과정 확인 가능하도록 저장
[
     1:{
         "campaign_c":200
         "promotion_p":500
         "organic":0
     },
     3:{
         "campaign_c":1000
         "promotion_p":1000
         "organic":3000
     },
 ...


    
③ Progress 관리하기: Supervisord
* 난이도가 높고 Window에서 활용하기 어려우니 개념만 참고

- Supervisord란? 

└ 다양한 사유로 죽는 프로세스를 반복적으로 부활시켜 컴퓨터게 계속 돌게 하는 기능

특정 배시 스크립트(ex. python3 main.py)를 실행하고, 해당 스크립트가 죽을 때 마다 다시 실행

[참조]

- 설치 및 실행 시 참조 

더보기

[설치]
- Linux: apt-get install supervisor
- Centos: yum install supervisord
- 모두 안될 시 pip 사용: pip install supervisord
 └ config 셋업이 안돼서 비추
- unable to locate package 해결책 [참조]
- unix:///var/run/supervisor.sock no such file 해결

 └ sudo service supervisor stop

 └ sudo service supervisor start

 

[Config]
- vi /etc/supervisord/conf.d 이 폴더 안에 실행할 프로세스를 설정
 └ 기본 스크립트: 파일이 없으면 만들어서 붙여넣기 (pip이나 yum으로 설치할 때 이럴 수 있음)

[unix_http_server]
        file=/tmp/supervisor.sock   ; the path to the socket file
        
        [supervisord]
        logfile=/tmp/supervisord.log ; main log file; default $CWD/supervisord.log
        logfile_maxbytes=50MB        ; max main logfile bytes b4 rotation; default 50MB
        logfile_backups=10           ; # of main logfile backups; 0 means none, default 10
        loglevel=info                ; log level; default info; others: debug,warn,trace
        pidfile=/tmp/supervisord.pid ; supervisord pidfile; default supervisord.pid
        nodaemon=false               ; start in foreground if true; default false
        minfds=1024                  ; min. avail startup file descriptors; default 1024
        minprocs=200                 ; min. avail process descriptors;default 200
        
        [rpcinterface:supervisor]
        supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
        
        [supervisorctl]
        serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL  for a unix socket
        
        [program:mysh]
        command=sh my.sh               ; the program (relative uses PATH, can take args)
        process_name=%(program_name)s  ; process_name expr (default %(program_name)s)
        numprocs=1                     ; number of processes copies to start (def 1)
        directory=/tmp                 ; directory to cwd to before exec (def no cwd)
        ;umask=022                     ; umask for process (default None)
        ;priority=999                  ; the relative start priority (default 999)
        ;autostart=true                ; start at supervisord start (default: true)
        startsecs=1                    ; # of secs prog must stay up to be running (def. 1)
        startretries=3                 ; max # of serial start failures when starting (default 3)
        autorestart=true               ; when to restart if exited after running (def: unexpected)
        ;exitcodes=0,2                 ; 'expected' exit codes used with autorestart (default 0,2)
        ;stopsignal=QUIT               ; signal used to kill process (default TERM)
        ;stopwaitsecs=10               ; max num secs to wait b4 SIGKILL (default 10)
        ;stopasgroup=false             ; send stop signal to the UNIX process group (default false)
        ;killasgroup=false             ; SIGKILL the UNIX process group (def false)
        ;user=chrism                   ; setuid to this UNIX account to run the program
        ;redirect_stderr=true          ; redirect proc stderr to stdout (default false)
        stdout_logfile=/tmp/mysh.log   ; stdout log path, NONE for none; default AUTO
        ;stdout_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
        ;stdout_logfile_backups=10     ; # of stdout logfile backups (0 means none, default 10)
        ;stdout_capture_maxbytes=1MB   ; number of bytes in 'capturemode' (default 0)
        ;stdout_events_enabled=false   ; emit events on stdout writes (default false)
        stderr_logfile=/tmp/mysh.err   ; stderr log path, NONE for none; default AUTO
        ;stderr_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
        ;stderr_logfile_backups=10     ; # of stderr logfile backups (0 means none, default 10)
        ;stderr_capture_maxbytes=1MB   ; number of bytes in 'capturemode' (default 0)
        environment=ACTOR="kildong"    ; process environment additions (def no adds)


- 설정 상세
 └ 위의 conf폴더 안에 {process_name}.conf 형식으로 컨피그 파일 생성
 └ 아래는 토큰 카운트 프로세스 예시

    [program:token_count]
    command = python /data/private/repos/llm-stork/src/token_count.py # 실행 스크립트
    directory = /data/private/repos/llm-stork/src # 실행 디렉토리
    user = root # 실행할 유저
    stdout_logfile = /data/private/logs/token_count_larget.py # 로그 저장 위치
    redirect_stderr = true # 에러 출력 여부
    
    [program:program2]
    ...


[Supervisord 실행]
- 실행: sudo supervisord
- 프로세스 실행
 └ 전부 실행: supervisorctl start all
 └ 하나만 실행: supervisorctl start {program_name}
- supervisorctl로 들어가서 status나 start해도 됨
 └ supervisorctl status라는 명령어는
 └  supervisorctl로 들어가서 status를 치는 것과 같음
- 관련 명령어  

 └ status: 현상태 보기
 └ start: 프로세스 시작  

 └ reload: 컨피그 다시 읽으며 재시작 → 설정한 뒤 이걸 해줘야함
 └ stop: 프로세스 중단
 └ restar`: 프로세스 재시작
- 잘 실행되는 예시

- 서버가 죽어서 꼬일 시
 └ 인지 못한 사이에 죽었을 때, 포트가 물려있을 수 있다
 └ 재구동하면 [Errno 98] Address already in use만 반복될 시 (이것은 fastapi의 예시)
 └ netstat -tnlp로 사용 중인 포트 확인 후
 └ 해당 포트에 물려있는 프로세스를 보고 (PID 확인)
 └ kill -0 {PID}

 

3) Dask 

① Dask란?

- 파이썬 한정 Spark 다음으로 유명한 분산 처리 도구 (파이썬 외에는 도구가 다양함)

- NumPy, Pandas 및 sklearn과 같은 Python 라이브러리를 쉽게 확장할 수 있어, 코드가 쉽고 학습 난이도가 낮음

└ 반대로 Spark를 쓸 때는 코드를 많이 바꿔야 함 

- Spark보다는 비교적 가볍게 사용 가능하나 구조 자체는 유사함

 

🖥️ Pandas vs. Dask vs. Spark

 

- 흔히 중간 규모(1~100GB) 데이터를 Dask로 하라고 권장

- 단, 3개를 모두 공부해야 하므로 웬만하면 Pandas, 감당 안되면 Spark 사용도 괜찮은 방법 

 

 

② Dask의 유용성: 싱글 머신으로 큰 파일을 읽을 때

- 수십 기가의 큰 파일을 꼭 읽어야 할 때, dask로 파일 쪼개는 게 유용

└ 보통 parquet는 2기가 짜리로 파일을 청킹(짤라내는 것)하나, 몇십 기가 수준으로 용량이 큰 경우엔 dask 사용 고려

└ dask로 파일을 쪼갠 뒤, pandas로 세부적인 전처리 및 분석 

- 아래 코드와 같이 사용하면 메모리를 초과하는 파일도 처리 가능

from dask import dataframe as dd
from tqdm import tqdm

path = ## 파일 경로
df = dd.read_parquet(path)

df = df.repartition(npartitions=20) # 20개로 분할

for i in tqdm(range(df.npartitions)):
    df.partitions[i].to_parquet(f"{i}.parquet")

 

③ Ray, modin, RAPIDs… 


- 많은 분산처리 도구가 있고, 각기 장점이 있다고 하지만, Spark와 Pandas로 충분

- 보통 일부 기능이 구현돼 있지 않거나, 굳이 따로 공부하는 게 번거롭거나, 자잘한 오류가 많음 

 

4) 자동화

- 스파크는 보통 주기적인 데이터 처리에 많이 쓰이기 때문에 스케쥴링 및 자동화를 함께 다루는 경우가 다수 

 

① 자동화가 필요한 이유는? 

└ Batch job:주기적인 대규모 데이터에 대한 처리

Streaming job: 실시간으로 발생하는 데이터에 대한 처리


② 스케쥴링과 Cron
- Cron은 가장 쉽고 단순한 스케쥴링 도구로 프트웨어 이름인 동시에, 스케쥴링 표현식의 이름이기도 함

 * 윈도우에서는 사용이 다소 다르고, 복잡

crontab에 하루에 1번 돌려라는 식으로 스케쥴링 명령어를 적는 것
스케쥴링 명령어: 5개의 문자열로 스케쥴을 관리


- 쉽게 세팅할 수 있으나 단점이 많아 규모 있는 회사에서는 Airflow나 Jenkins를 많이 사용
└ cron job이 실패할 경우, 따로 알람이 오지는 않으며, 알림 세팅해도 프레세스 죽으면 끝

cron이 도는 서버 자체가 죽으면 확인이 어렵고, 로그 제외 히스토리 파악이 힘듦

GUI를 따로 제공하지 않아 명령어 기반으로 해야 하며, 로드 밸런싱을 포함한 자원 관리를 지원하지 않음

③ 모니터링과 히스토리 관리
- 주기적인 Batch job을 설정 후, 의도대로 수행되지 않았을 때 알림이 오도록 설정하는 것이 모니터링 

└ 보통 슬랙 등 메신저로 수신하거나 자체 모니터링 도구 지원 툴이 있지만 수동 설정도 어렵지 않음 

다음은 일반적인 batch job 함수 예시

...

if __name__ == "__main__":
    try:
        main()
    except Exception as e:   #에러 메시지 보내기
        post_to_slack(str(e), True)


- 완료된 job들이 언제 어떻게 수행됐는지 파악하는 것 또한 중요
└ 히스토리 관리를 위해 코드 단에서 어떻게 작업이 수행되었는지 기록logging하는 것이 중요
다음은 파이썬의 logging 사용 예시

import logging

logging.basicConfig(
    format='%(asctime)s %(levelname)s:%(message)s',
    level=logging.DEBUG,
    datefmt='%m/%d/%Y %I:%M:%S %p',
)

logging.debug("Sample debug message")
>> 04/23/2022 04:33:13 PM DEBUG:Sample debug message