데이터 파이프라인 구축

클라우드 아키텍트를 넘어서 데이터 아키텍트가 되어보기

SoniaComp
16 min readJul 26, 2021

➡️ 렛츠고

나의 아키텍처 설계 원칙을 갖고, 선배 개발자들과 의논할 수 있는 사람이 되고 싶다. 그래서 내 의견을 말하고, 다른 분의 의견에 대해 반박이나 지지하는 의견을 제시할 수 있게 하자!

[ 데이터 플랫폼 구축하기 ]
- 데이터 플랫폼의 고객: 데이터
- 아키텍처
- 클라이언트와의 인터랙션
- 데이터 수집 방법
- 일괄 처리 (Batch Job)
- 스트리밍 데이터 처리 (실시간 데이터 처리)
- 데이터 접근
- Case Study) Airbnb의 이벤트 로그 처리
- Case Study) LINE 광고 데이터 파이프라인 BigDB
[ 더 알아보기 ]
- Spark
- Kafka
- Scala

[ 데이터 플랫폼 구축하기 ]

데이터 플랫폼의 고객: 데이터

정형 데이터

  • 데이터베이스의 정해진 규칙(Rule)에 맞게 데이터를 들어간 데이터 중에 수치 만으로 의미 파악이 쉬운 데이터
  • 비정형 데이터를 전처리 작업을 통해 정형 데이터로 만들기도 한다.

비정형 데이터

  • 로그: 클라이언트 생성 로그와 서버 생성 로그가 있다.
  • 정해진 규칙이 없어서 값의 의미를 쉽게 파악하기 힘든 경우
  • 텍스트, 음성, 영상과 같은 데이터

반정형 데이터

  • 일반적인 데이터 베이스는 아니지만 스키마를 가지고 있는 형태 (연산 기능을 제공하는 API 없음)
  • JSON, XML
  • 로그: 클라이언트 생성 로그와 서버 생성 로그가 있다.

Big Data

  • Volume : 크기
  • Velocity : 빠르게 처리하고 분석할 수 있는 속성
  • Variety : 다양한 종류의 데이터
  • Value + Veracity

아키텍처

[ 빅데이터 솔루션 워크로드 유형 ]

  • 미사용 빅데이터 원본의 일괄 처리
  • 사용중인 빅데이터의 실시간 처리
  • 빅데이터의 대화형 탐색
  • 예측 분석 및 기계학습

[ 빅데이터 아키텍처 구성요소 ]

1. 데이터 원본

  • 애플리케이션 데이터 저장소 (관계형 DB)
  • 애플리케이션에서 생성하는 정적파일 (웹 서버 로그 파일)
  • 실시간 데이터 원본 (IoT 디바이스)

2. 데이터 스토리지

  • Data Lake: 대용량 파일을 다양한 형식으로 저장할 수 있는 분산 파일 저장소에 저장

3. Batch Processing

  • 필터링, 집계 및 그 밖의 분석을 위한 데이터를 준비하기 위해 데이터 파일을 처리
  • Azure Data Lake Analytics

4. 실시간 메시지 수집

  • 스트림 버퍼링
  • 메시지에 대한 버퍼로 작동하고 스케일 아웃 처리, 안정적인 전달 및 기타 메시지 큐 의미 체계를 지원하는 메시지 수집 저장소
  • Azure Event Hubs, Azure IoT Hub, Kafka

5. 스트림 처리

  • 필터링, 집계 및 그 밖의 분석을 위한 데이터 준비를 통해 해당 메시지를 처리
  • 처리된 스트림 데이터는 출력 싱크에 기록
  • Azure Stream Analytics, Spark Streaming

6. 분석 데이터 저장소

  • 대다수의 빅데이터 솔루션은 분석할 데이터를 준비한 다음, 분석 도구를 사용하여 쿼리할 수 있는 구조화된 형식으로 처리된 데이터를 제공
  • BI 솔루션에서 볼 수 있는 Kimball 스타일의 관계형 데이터 웨어하우스
  • HBase 같이 대기 시간이 짧은 NoSQL 기술
  • 분산 데이터 저장소의 데이터 파일에 대한 메타 데이터 추상화를 제공하는 대화형 Hive Database
  • Azure Synapse Analytics, HDInsight

7. 분석 및 보고

  • 분석 및 보고를 통해 데이터에 대한 정보를 제공하는 것
  • Azure Analysis Services: 데이터 모델링 계층
  • BI

8. 오케스트레이션

  • 파이프라인은 DAG(방향성 있는 비순환 그래프, Directed Acyclic Graph)라는 엔티티로 통합.
  • Azure Data Factory

[ 람다 아키텍처 ]

출처: https://docs.microsoft.com/ko-kr/azure/architecture/data-guide/big-data/
  • 매우 큰 데이터 집합을 사용할 경우 클라이언트가 필요로 하는 쿼리 종류를 실행하는 데 시간이 오래 걸릴 수 있다.
  • 실시간으로 수행 X
  • MapReduce 와 같이 전체 데이터 집합에 대해 병렬로 작동하는 알고리즘이 필요
  • 원시 데이터와 별도로 저장되고 쿼리에 사용됨
  • 일부 결과를 실시간으로 가져온 후 (정확도가 다소 손실될 수 있음 — 데이터 슬라이딩 기간 처리) 결과를 일괄 처리 분석의 결과와 조합
  1. 일괄 처리 계층(실행 부하 적음): 모든 데이터를 원시 형식으로 저장 & 해당 데이터에 대해 일괄 처리 수행. 시기 적절성은 떨어짐.
    ( 데이터 변경 불가능. 특정 데이터 값의 변경 내용은 새 타임 스탬프 이벤트 레코드로 저장됨 → 모든 시점에서 재계산을 수행할 수 있음. )
    → Batch Layer [ Master Data ]: 시간이 많이 소요되는 높은 정확도의 계산
    → Serving Later [ Batch views ]: 효율적인 쿼리를 위해 인덱싱 제공
  2. 빠른 레이어(실행 부하 과다): 데이터를 실시간으로 분석. 정확도는 떨어지지만 짧은 대기시간을 제공하도록 디자인 됨.
    → Real-time views: 가장 최근 데이터를 기준으로 하는 증분 업데이트, Serving Layer 업데이트

단점: 복잡. 데이터 경로가 둘로 나뉘어서 중복된 계산 논리가 발생할 수 있다.

[ 카파 아키텍처 ]

출처: https://docs.microsoft.com/ko-kr/azure/architecture/data-guide/big-data/
  • 단일 경로
  • 데이터 전체가 수집 (Long-term store)
  • 모든 이벤트 처리가 입력 스트림에서 수행되고 실시간 보기오 유지됨 (Speed Layer)
  • 전체 데이터 집합을 다시 계산해야 하는 경우 스트림을 재생하기만 하면 된다.

[ IoT (사물 인터넷) : 이벤트 기반 아키텍처 ]

출처: https://docs.microsoft.com/ko-kr/azure/architecture/data-guide/big-data/

클라이언트와의 Interaction

  • SDK(Software Development Kit): SW 개발에 필요한 기능들을 묶어둔 것. API도 포함되어 있다. [ 도구 집합 ]
  • API 서버: 어플리케이션 서버가 제공하는 기능을 API 요청을 호출받아 제공하는 것. [ 인터페이스 ]

데이터 수집 방법

  • 정밀도와 데이터 처리 속도 사이에 상호 절충 관계가 있다.

어떤 데이터를 받아야 할까? ex. Active User

  • 사용자의 행동으로 인한 이벤트 발생에 대해 프론트에서 로그 수집
  • 이벤트 발생할때마다 보내기 보다는, 백그라운드에서 Async 하게 사용자 행동 로그를 수집. → 서비스 품질에 영향을 미치지 않음 & 데이터 누락을 줄임.
  • Logging Rule
    1) 시간
    2) Local Storage에 Data holding → 특정 수 이상, 특정 시간 이상 발송 (로컬 저장소는 한정되어 있기 때문에 무한정 홀딩할 수 없음)
  • 특정 이벤트별로 데이터 수집

데이터 전송 처리 과정

[ 전송 확인 여부 ]

  • HTTP Status Code: 200

[ 시간 기준 ]

  • 클라이언트 타임
    → 타임 싱크가 안되어 있으면?
  • 서버마다 시간이 다르고, 서버가 받는 시간과 클라이언트가 데이터를 생성하는 시간 다름
  • 시간 보정 방법
    → Tik 기술: 싱크 맞추는 옵션. 최초의 이벤트 발생 시 시간 보정.
    → 보정 정책: 데이터 정밀도가 크게 중요하지 않은 경우 차선책으로 선택.

[ 데이터 재전송으로 인한 중복 피하기 ]

  • 웹서버가 App 서버로 부터 성공 응답을 받기 전에 서버가 다운된 경우 (NginX)
  • 고유한 ID 값 부여

[ 전송 시간으로 인한 Delay ]

일괄 처리 (Batch Job)

일괄 데이터 처리 흐름 (참고: https://docs.microsoft.com/ko-kr/azure/architecture/data-guide/big-data/batch-processing)
  • 원본 데이터는 원본 Application 자체에 의해 또는 오케스트레이션 워크플로우에 의해 데이터 스토리지에 로드
  • 데이터는 병렬 작업에 의해 내부에서 처리됨
  • 변환된 결과가 분석 및 보고 구성 요소에 의해 쿼리될 수 있는 저장소로 로드
  • ex. 기계 학습을 위한 모델링 준비 데이터 제공
  • ex. csv, json 파일 집합을 쿼리 준비가 완료된 스키마화 및 구조화 형식으로 변환

과제

  • 데이터 형식 및 인코딩: 데이터 로드 및 구분 분석 논리는 이런 문제를 감지하여 처리
  • 오케스트레이션 시간 조작: 잘못된 순서의 레코드를 처리

스트리밍 데이터 처리 (실시간 데이터 처리)

실시간 데이터 처리 흐름 (참고: https://docs.microsoft.com/ko-kr/azure/architecture/data-guide/big-data/real-time-processing)
  • 실시간으로 캡처되고 최소 대기 시간으로 처리되는 데이터 스트림을 사용하여 처리
  • ex. 센서 데이터를 사용하여 높은 트래픽 볼륨을 감지
  • 바인딩되지 않은 입력 데이터의 스트림 처리로 정의되며 처리 대기 시간이 매우 짧다
  • JSON 같은 비구조화, 반구조화 형식으로 도착
  • 실시간 사용을 지원하기 위해 좀 더 짧은 소요시간을 유지

과제

  • 실시간으로 (고용량)메시지를 수집, 처리 및 저장하는 것
    (수집 파이프라인을 차단하지 않는 방식으로 처리를 수행해야 함)
  • 데이터 저장소는 대용량 쓰기를 지원해야 함
  • 작업을 빠르게 수행하는 것

실시간 데이터

  • Near Real-Time: 초 단위 수준의 지연 시간 보장
  • Real-Time: 밀리 세컨드 수준의 데이터 처리 보장
  • Real-Real-Time: 마이크로 세컨드 수준의 데이터 처리 보장

데이터 접근

NoSQL

  • 빠르다: HashMap 구조를 사용
  • 구조가 정해져있지 않기 때문에 자유로운 데이터 추가가 가능

인덱스

출처: https://12bme.tistory.com/138

컴퓨터의 CPU 나 메모리와 같은 전기적 특성을 띤 장치의 성능은 짧은 시간 동안 매우 빠른 속도로 발전했지만, 디스크와 같은 기계식 장치의 성능은 상당히 제한적으로 발전.
→ DB 의 성능 튜닝은 어떻게 디스크 I/O 를 줄이는지와 연관됨.

[ 저장 매체 ]

  1. 내장 디스크(Internal Disk): 개인용 PC 의 본체 내에 장착된 디스크와 같은 매체.
    서버용으로 사용되는 디스크는 개인 PC 에 장착되는 것보다는 빠르고 안정적임.
  2. DAS (Direct Attached Storage)
    내장 디스크의 용량 문제를 해결하기 위해 사용하는 것. 디스크만 있음.
    컴퓨터 본체에 연결해서 사용해야 함.
    최근의 DAS 는 디스크를 최대 200개까지 장착할 수 있는 것들도 있기 때문에 대용량 디스크가 필요한 경우 적합.
    DAS 는 내장 디스크와 같이 컴퓨터 본체와 SATA 나 SAS 또는 SCSI 케이블로 연결
    단점) 반드시 하나의 컴퓨터 본체에 연결해서 사용해서 디스크 정보를 여러 컴퓨터가 동시에 공유하는 것은 불가능
  3. NAS (Network Attached Storage)
    여러 컴퓨터에서 동시에 사용할 수 있음.
    TCP/IP 를 통해 컴퓨터에 연결. SATA 나 SAS 방식의 직접 연결보다 속도가 느림.
    빈번한 데이터 읽고 쓰기가 필요한 DB 서버용으로는 거의 사용되지 않음.
  4. SAN (Storage Area Network)
    SAN 은 DAS 로 구축할 수 없는 아주 대용량의 스토리지 공간을 제공하는 장치
    여러 컴퓨터에서 동시에 사용할 수 있을 뿐더러 컴퓨터 본체와 광케이블로 연결되기 때문에 상당히 빠르고 안정적인 데이터 처리(읽고 쓰기)를 보장합니다.
    그만큼 고가의 구축 비용이 들기 때문에 각 기업에서는 중요 데이터를 보관할 경우에, 일반적으로 사용됩니다.

[ SSD : Solid State Drive ]
대부분의 저장 매체는 Platter(플래터, 디스크 드라이브 내부의 데이터 저장용 원판)를 회전 시켜서 데이터를 읽고 쓰는 기계적인 방식을 사용.
→ 플래터방식의 기계식 디스크 드라이브를 대체하기 위한 전자식 저장 매체.

SSD는 기존의 디스크 드라이브에서 데이터 저장용 플래터를 제거하고 대신 플래시 메모리를 장착하고 있음. 플래시 메모리는 전원이 공급되지 않아도 데이터가 삭제 되지 않음. 또 컴퓨터 메모리 보다는 느리지만 디스크 드라이브 보다는 훨씬 빠름.

[ 랜덤 I/O 와 순차 I/O ]
순차 I/O 는 연속된 3개의 페이지를 접근하게 되는 방식이라 디스크에 기록하기 위해 한번 시스템 콜을 요청하지만 랜덤 I/O 는 3개의 페이지를 디스크에 기록하기 위해 3번의 시스템 콜을 하게 되는 방식. 디스크의 성능은 디스크 헤더의 위치 이동 없이 얼마나 많은 데이터를 한번에 기록하는 것에 의해 결정됨.

랜덤 I/O 를 줄이기 위해, 쿼리를 처리하는 데 꼭 필요한 데이터만 읽도록 함.

  • OLTP: On-Line Transaction Processing(데이터 갱신 위주)
  • OLAP: On-Line Analytic Processing(데이터 조회 위주)

데이터 베이스 서버에는 캐시 메모리가 장착된 RAID 컨트롤러가 일반적으로 사용되는데, RAID 컨트롤러의 캐시 메모리는 아주 빈번한 파일 동기화 작업이 호출되는 순차 I/O 를 효율적으로 처리될 수 있게 변환하는 역할을 함.

[ 인덱스 ]

인덱스를 하나 더 추가할지 말지는 데이터의 저장 속도를 어디까지 희생할 수 있는지, 읽기 속도를 얼마나 더 빠르게 만들어야 하는지의 여부에 따라 결정돼야 함.

인덱스 알고리즘은 다음과 같다.

  • B-Tree 알고리즘
  • Hash 인덱스
  • Fractal-Tree 알고리즘

Case Study) Airbnb의 이벤트 로그 처리

참고: https://brunch.co.kr/@sonjoosik/3

Airstream : Airbnb의 Streaming 처리 프레임워크

  1. Spark 의 병렬성은 Kafka 파티션 수에 의해 결정
    → 균형잡힌 Spark Kafka Reader
    Kafka 파티션과 Spark Task 사이에는 1대 1 관계가 성립
    기본적으로 이벤트가 Spark 에서 처리될 때 순서를 보장하기 위해서 하나의 Spark Task는 하나의 Kafka 파티션에서 데이터를 읽도록 만들어져 있음
  2. 이벤트 용량과 크기 쏠림 현상
    → 균형 잡힌 파티션 알고리즘
    다양한 이벤트 종류들은 저마다 제 각각의 용량과 크기로 기록된다.
    QPS(Queries-per-seoncd)가 서비스마다 다르다.
  3. 실시간 수집과 여유 공간
    → 상단과 하단 시스템의 발전
    실시간 작업의 속도를 따라잡기 위해 상당히 긴 시간이 걸림.
    수평적인 확장을 용이하게 하기 위해 어떻게 partition 을 구성해야 하는가 하는 문제
    → Kafka Reader 를 개선함으로써 완벽하게 극복. 이벤트 로그 처리 시간의 불균형이 병목 지점이니까 이런 불균형을 없애기 위해 처리 시간을 균등하게 분배해야 함.

Case Study) LINE 광고 데이터 파이프라인 BigDB

참고: https://engineering.linecorp.com/ko/blog/bigdb-an-ad-data-pipeline-for-line/

Big DB의 기능

  • 스키마의 생성과 관리를 통해 스트리밍 데이터와 정적인 데이터를 hive Context Table로 생성하는 흐름을 제어하고, 사용자 요청의 데이터 형식을 지정하여 데이터를 조회하는 기능을 제공
  • 데이터 읽기, 쓰기를 위한 멀티 세션을 지원
  • 분석 전에 분석에 필요한 사용자 요청의 데이터를 Spark SQL 기반으로 실시간 데이터 Join 기능을 제공
  • 저장소를 크게 두 개로 구분하고, 사용자 요청의 가공 데이터를 스케일에 맞는 저장소를 지정하여 가용성을 확보할 수 있는 구조를 제공

컴포넌트

  • Message Proxy: JSON 데이터를 수집하고 Kafka 의 정해진 Topic 에 Produce함
  • Kafka: 수집된 JSON 데이터를 7일간 보관하며, Partition은 스트리밍에서 사용하는 코어의 개수에 따라 변경
  • Streaming: Spark를 사용하며, 5초 주기로 Task를 수행하고 간단한 데이터 변환 작업 후 스키마에서 지정한 Table에 Data Frame을 InsertionInto로 추가하게 됨. Streaming 이 두개로 나누어져 있고, 왼쪽은 집계 데이터를 위한 Table, 오른쪽은 원본 데이터와 원본과의 Join 데이터를 위한 Table을 다루게 됩니다.
  • HDFS: 수집된 JSON 데이터를 압축한 후 Parquet 형식으로 저장하게 됨. 왼쪽은 SSD 디스크(읽기/쓰기 빠름)를, 오른쪽은 SATA 디스크(용량이 큼)를 사용하고 있다. Federation 설정으로 각각의 HDFS 는 Namespace 만으로 편하게 접근할 수 있다.
  • Spark: 재플린, Streaming, BigDB 각각 별도의 세션으로 동작을 하며, Hive Meta-Store를 사용하여 Table은 세션 간에 공유됩니다. Locality를 위해서 Table(실제 데이터 파일)이 존재하는 파트의 자원을 사용
  • End Point: Web/Zeppelin/BigDB API 등에서 Spark의 자원과 Table을 사용하게 됩니다.
  • BigDB Core/API: akka.http를 사용하고 있습니다. 스키마의 생성과 관리, 쿼리 기반의 집계나 스케줄링된 작업의 수행을 관리합니다.

[ 더 알아보기 ]

Spark

  • In-Memory 기반의 데이터 처리
    디스크 기반인 Hadoop 보다 훨씬 빠르다.
  • 분산 병렬 처리
출처: https://spark.apache.org/docs/latest/cluster-overview.html

Kafka

참고 medium 블로그 링크

  • 아파치 재단의 카프카는 Pub-Sub 모델 메시지 큐
  • 분산 환경에 특화되어 설계되어 있다는 특징을 가짐
  • 클러스터 구성, fail-over, replication과 같은 여러 가지 특징을 가짐

Topic, Partition

참고 링크

  • Topic: 데이터를 최종적으로 저장하는 곳
  • 데이터를 구분하기 위한 분류값 혹은 구분된 저장소
  • 카프카는 데이터를 주고 받을 때 지정된 토픽으로 주고 받게 되며, 토픽을 어떻게 설계할 것인지가 핵심이 될 수 있다.
  • Partition: 저장소 안에 분리되어진 공간
  • 데이터를 더 많이 빨리 보내고 처리하기 이해 만들어진 곳
  • 파티션이 많아지면, 노드(브로커)가 다운이 됐을 경우 관리해야 할 포인트가 많아지고 오래 걸림. → 파티션 단위로 분산처리를 하기 때문.

Producer, Consumer

  • Producer: 메시지를 생산하는 주체
  • 메시지를 만들고 Topic 에 메시지를 쓴다.
  • Consumer: 메시지를 소비하는 주체

Consumer Group

  • Consumer Group: 하나의 Topic 에 대한 책임을 가지고 있음.

Broker, Zookeeper

  • Broker: 카프카 서버

Replication

  • 수평적 스케일 아웃

Scala

나는 이전에 파이썬 개발자였던 만큼 파이썬을 기준으로 설명하겠다.

  • val, var : 상수와 변수 구분
  • 파이썬은 인덴테이션으로 블록 구분 하지만, Scala 는 { } 로 구분
  • 파이썬에서 지원하는 API 와 유사한 API
  • Collection API 로 Array, Set, List(불연속 메모리 할당, 포인터 접근), Map
  • 객체지향과 함수형을 모두 지원

--

--

SoniaComp

Data Engineer interested in Data Infrastructure Powering Fintech Innovation (https://www.linkedin.com/in/sonia-comp/)