데이터 품질과 거버넌스
데이터 품질과 거버넌스¶
개요¶
데이터 품질은 데이터의 정확성, 완전성, 일관성을 보장하는 것이고, 데이터 거버넌스는 데이터 자산을 체계적으로 관리하는 프레임워크입니다. 신뢰할 수 있는 데이터 파이프라인을 위해 필수적입니다.
1. 데이터 품질 차원¶
1.1 품질 차원 정의¶
┌────────────────────────────────────────────────────────────────┐
│ 데이터 품질 6대 차원 │
├────────────────────────────────────────────────────────────────┤
│ │
│ 1. 정확성 (Accuracy) │
│ - 데이터가 실제 값을 올바르게 반영하는가? │
│ - 예: 고객 이메일이 유효한 형식인가? │
│ │
│ 2. 완전성 (Completeness) │
│ - 필요한 모든 데이터가 존재하는가? │
│ - 예: 필수 필드에 NULL이 없는가? │
│ │
│ 3. 일관성 (Consistency) │
│ - 데이터가 여러 시스템 간 일치하는가? │
│ - 예: 주문 수가 주문 테이블과 집계 테이블에서 동일한가? │
│ │
│ 4. 적시성 (Timeliness) │
│ - 데이터가 적절한 시간 내에 제공되는가? │
│ - 예: 실시간 대시보드가 5분 내 갱신되는가? │
│ │
│ 5. 유일성 (Uniqueness) │
│ - 중복 데이터가 없는가? │
│ - 예: 동일한 주문이 중복 기록되지 않았는가? │
│ │
│ 6. 유효성 (Validity) │
│ - 데이터가 정의된 규칙을 준수하는가? │
│ - 예: 날짜가 올바른 형식인가? │
│ │
└────────────────────────────────────────────────────────────────┘
1.2 품질 메트릭 예시¶
from dataclasses import dataclass
from typing import Optional
import pandas as pd
@dataclass
class DataQualityMetrics:
"""데이터 품질 메트릭"""
table_name: str
row_count: int
null_count: dict[str, int]
duplicate_count: int
freshness_hours: float
schema_valid: bool
def calculate_quality_metrics(df: pd.DataFrame, table_name: str) -> DataQualityMetrics:
"""품질 메트릭 계산"""
# 완전성: NULL 수
null_count = {col: df[col].isna().sum() for col in df.columns}
# 유일성: 중복 수
duplicate_count = df.duplicated().sum()
return DataQualityMetrics(
table_name=table_name,
row_count=len(df),
null_count=null_count,
duplicate_count=duplicate_count,
freshness_hours=0, # 별도 계산 필요
schema_valid=True # 별도 검증 필요
)
def quality_score(metrics: DataQualityMetrics) -> float:
"""0-100 품질 점수 계산"""
scores = []
# 완전성 점수 (NULL 비율)
total_cells = metrics.row_count * len(metrics.null_count)
total_nulls = sum(metrics.null_count.values())
completeness = (1 - total_nulls / total_cells) * 100 if total_cells > 0 else 100
scores.append(completeness)
# 유일성 점수 (중복 비율)
uniqueness = (1 - metrics.duplicate_count / metrics.row_count) * 100 if metrics.row_count > 0 else 100
scores.append(uniqueness)
return sum(scores) / len(scores)
2. Great Expectations¶
2.1 설치 및 초기화¶
# 설치
pip install great_expectations
# 프로젝트 초기화
great_expectations init
2.2 기본 사용법¶
import great_expectations as gx
import pandas as pd
# Context 생성
context = gx.get_context()
# 데이터 소스 추가
datasource = context.sources.add_pandas("my_datasource")
# 데이터 에셋 정의
data_asset = datasource.add_dataframe_asset(name="orders")
# DataFrame 로드
df = pd.read_csv("orders.csv")
# Batch Request
batch_request = data_asset.build_batch_request(dataframe=df)
# Expectation Suite 생성
suite = context.add_expectation_suite("orders_suite")
# Validator 생성
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="orders_suite"
)
2.3 Expectations 정의¶
# 기본 Expectations
# NULL 없음
validator.expect_column_values_to_not_be_null("order_id")
# 유니크
validator.expect_column_values_to_be_unique("order_id")
# 값 범위
validator.expect_column_values_to_be_between(
"amount",
min_value=0,
max_value=1000000
)
# 허용 값 목록
validator.expect_column_values_to_be_in_set(
"status",
["pending", "completed", "cancelled", "refunded"]
)
# 정규식 매칭
validator.expect_column_values_to_match_regex(
"email",
r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
)
# 테이블 행 수
validator.expect_table_row_count_to_be_between(
min_value=1000,
max_value=1000000
)
# 컬럼 존재
validator.expect_table_columns_to_match_set(
["order_id", "customer_id", "amount", "status", "order_date"]
)
# 날짜 형식
validator.expect_column_values_to_match_strftime_format(
"order_date",
"%Y-%m-%d"
)
# 참조 무결성 (다른 테이블)
validator.expect_column_values_to_be_in_set(
"customer_id",
customer_ids_list # 고객 테이블의 ID 목록
)
# Suite 저장
validator.save_expectation_suite(discard_failed_expectations=False)
2.4 검증 실행¶
# Checkpoint 생성 및 실행
checkpoint = context.add_or_update_checkpoint(
name="orders_checkpoint",
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": "orders_suite"
}
]
)
# 검증 실행
result = checkpoint.run()
# 결과 확인
print(f"Success: {result.success}")
print(f"Statistics: {result.statistics}")
# 실패한 Expectations 확인
for validation_result in result.list_validation_results():
for exp_result in validation_result.results:
if not exp_result.success:
print(f"Failed: {exp_result.expectation_config.expectation_type}")
print(f" Column: {exp_result.expectation_config.kwargs.get('column')}")
print(f" Result: {exp_result.result}")
2.5 데이터 문서 생성¶
# Data Docs 빌드 및 열기
context.build_data_docs()
context.open_data_docs()
3. Airflow 통합¶
3.1 Great Expectations Operator¶
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import great_expectations as gx
def validate_data(**kwargs):
"""Great Expectations 검증 Task"""
context = gx.get_context()
# Checkpoint 실행
result = context.run_checkpoint(
checkpoint_name="orders_checkpoint"
)
if not result.success:
raise ValueError("Data quality check failed!")
return result.statistics
with DAG(
'data_quality_dag',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
) as dag:
validate = PythonOperator(
task_id='validate_orders',
python_callable=validate_data,
)
3.2 커스텀 품질 검사¶
from airflow.operators.python import PythonOperator, BranchPythonOperator
def check_row_count(**kwargs):
"""행 수 검증"""
import pandas as pd
df = pd.read_parquet(f"/data/{kwargs['ds']}/orders.parquet")
row_count = len(df)
# XCom으로 메트릭 저장
kwargs['ti'].xcom_push(key='row_count', value=row_count)
if row_count < 1000:
raise ValueError(f"Row count too low: {row_count}")
return row_count
def check_freshness(**kwargs):
"""데이터 신선도 검증"""
from datetime import datetime, timedelta
# 파일 수정 시간 확인
import os
file_path = f"/data/{kwargs['ds']}/orders.parquet"
mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
age_hours = (datetime.now() - mtime).total_seconds() / 3600
if age_hours > 24:
raise ValueError(f"Data too old: {age_hours:.1f} hours")
return age_hours
def decide_next_step(**kwargs):
"""품질 결과에 따른 분기"""
ti = kwargs['ti']
row_count = ti.xcom_pull(task_ids='check_row_count', key='row_count')
if row_count > 10000:
return 'process_large_batch'
else:
return 'process_small_batch'
with DAG('quality_checks_dag', ...) as dag:
check_rows = PythonOperator(
task_id='check_row_count',
python_callable=check_row_count,
)
check_fresh = PythonOperator(
task_id='check_freshness',
python_callable=check_freshness,
)
branch = BranchPythonOperator(
task_id='decide_processing',
python_callable=decide_next_step,
)
[check_rows, check_fresh] >> branch
4. 데이터 카탈로그¶
4.1 카탈로그 개념¶
┌────────────────────────────────────────────────────────────────┐
│ 데이터 카탈로그 │
├────────────────────────────────────────────────────────────────┤
│ │
│ 메타데이터 관리 시스템: │
│ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ 기술 메타데이터 │ │
│ │ - 스키마, 데이터 타입, 파티션 │ │
│ │ - 위치, 형식, 크기 │ │
│ │ - 생성일, 수정일 │ │
│ └────────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ 비즈니스 메타데이터 │ │
│ │ - 설명, 정의, 용어 │ │
│ │ - 소유자, 관리자 │ │
│ │ - 태그, 분류 │ │
│ └────────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ 운영 메타데이터 │ │
│ │ - 사용 빈도, 쿼리 패턴 │ │
│ │ - 품질 점수, 이슈 │ │
│ │ - 접근 권한 │ │
│ └────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────┘
4.2 카탈로그 도구¶
| 도구 | 유형 | 특징 |
|---|---|---|
| DataHub | 오픈소스 | LinkedIn 개발, 범용 |
| Apache Atlas | 오픈소스 | Hadoop 생태계 |
| Amundsen | 오픈소스 | Lyft 개발, 검색 중심 |
| OpenMetadata | 오픈소스 | 올인원 플랫폼 |
| Atlan | 상용 | 협업 중심 |
| Alation | 상용 | 엔터프라이즈 |
4.3 DataHub 예시¶
# DataHub 메타데이터 수집 예시
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
DatasetPropertiesClass,
SchemaMetadataClass,
SchemaFieldClass,
StringTypeClass,
NumberTypeClass,
)
# Emitter 생성
emitter = DatahubRestEmitter(gms_server="http://localhost:8080")
# 데이터셋 URN
dataset_urn = make_dataset_urn(
platform="postgres",
name="analytics.public.fact_orders",
env="PROD"
)
# 데이터셋 속성
properties = DatasetPropertiesClass(
description="주문 팩트 테이블",
customProperties={
"owner": "data-team@company.com",
"sla": "daily",
"pii": "false"
}
)
# 스키마 정의
schema = SchemaMetadataClass(
schemaName="fact_orders",
platform=f"urn:li:dataPlatform:postgres",
fields=[
SchemaFieldClass(
fieldPath="order_id",
type=StringTypeClass(),
description="주문 고유 ID"
),
SchemaFieldClass(
fieldPath="amount",
type=NumberTypeClass(),
description="주문 금액"
),
]
)
# 메타데이터 emit
emitter.emit_mce(properties)
emitter.emit_mce(schema)
5. 데이터 리니지¶
5.1 리니지 개념¶
┌────────────────────────────────────────────────────────────────┐
│ 데이터 리니지 (Lineage) │
├────────────────────────────────────────────────────────────────┤
│ │
│ 데이터의 출처와 변환 과정을 추적: │
│ │
│ Raw Sources Staging Marts │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ orders │───────→│stg_orders│─────→│fct_orders│ │
│ │ (raw) │ │ │ │ │ │
│ └──────────┘ └──────────┘ └────┬─────┘ │
│ │ │
│ ┌──────────┐ ┌──────────┐ │ │
│ │customers │───────→│stg_customers│────────→│ │
│ │ (raw) │ │ │ │ │
│ └──────────┘ └──────────┘ │ │
│ ↓ │
│ ┌──────────┐ │
│ │ dashboard│ │
│ │ (BI) │ │
│ └──────────┘ │
│ │
│ 활용: │
│ - 영향 분석: 소스 변경 시 영향받는 대상 파악 │
│ - 근본 원인 분석: 데이터 이슈의 원인 추적 │
│ - 규정 준수: 데이터 흐름 감사 │
│ │
└────────────────────────────────────────────────────────────────┘
5.2 dbt 리니지¶
# dbt 리니지 생성
dbt docs generate
# 리니지 확인 (docs 서버)
dbt docs serve
# dbt 모델 메타데이터
version: 2
models:
- name: fct_orders
description: "주문 팩트 테이블"
meta:
owner: "data-team"
upstream:
- stg_orders
- stg_customers
downstream:
- sales_dashboard
- ml_model_features
5.3 OpenLineage¶
# OpenLineage를 사용한 리니지 추적
from openlineage.client import OpenLineageClient
from openlineage.client.run import Run, Job, RunEvent, RunState
from openlineage.client.facet import (
SqlJobFacet,
SchemaDatasetFacet,
SchemaField,
)
from datetime import datetime
import uuid
client = OpenLineageClient(url="http://localhost:5000")
# Job 정의
job = Job(
namespace="my_pipeline",
name="transform_orders"
)
# Run 시작
run_id = str(uuid.uuid4())
run = Run(runId=run_id)
# 입력 데이터셋
input_datasets = [
{
"namespace": "postgres",
"name": "raw.orders",
"facets": {
"schema": SchemaDatasetFacet(
fields=[
SchemaField(name="order_id", type="string"),
SchemaField(name="amount", type="decimal"),
]
)
}
}
]
# 출력 데이터셋
output_datasets = [
{
"namespace": "postgres",
"name": "analytics.fct_orders",
}
]
# Start 이벤트
client.emit(
RunEvent(
eventType=RunState.START,
eventTime=datetime.now().isoformat(),
run=run,
job=job,
inputs=input_datasets,
outputs=output_datasets,
)
)
# ... 실제 변환 작업 ...
# Complete 이벤트
client.emit(
RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.now().isoformat(),
run=run,
job=job,
)
)
6. 거버넌스 프레임워크¶
6.1 데이터 거버넌스 구성 요소¶
┌────────────────────────────────────────────────────────────────┐
│ 데이터 거버넌스 프레임워크 │
├────────────────────────────────────────────────────────────────┤
│ │
│ 1. 조직 (Organization) │
│ - 데이터 스튜어드 지정 │
│ - 역할과 책임 정의 │
│ - 거버넌스 위원회 │
│ │
│ 2. 정책 (Policies) │
│ - 데이터 분류 정책 │
│ - 접근 제어 정책 │
│ - 보존/삭제 정책 │
│ - 품질 기준 │
│ │
│ 3. 프로세스 (Processes) │
│ - 데이터 요청/승인 프로세스 │
│ - 이슈 관리 프로세스 │
│ - 변경 관리 프로세스 │
│ │
│ 4. 기술 (Technology) │
│ - 데이터 카탈로그 │
│ - 품질 모니터링 │
│ - 접근 제어 시스템 │
│ - 감사 로그 │
│ │
└────────────────────────────────────────────────────────────────┘
6.2 데이터 분류¶
from enum import Enum
class DataClassification(Enum):
"""데이터 민감도 분류"""
PUBLIC = "public" # 공개 가능
INTERNAL = "internal" # 내부 사용
CONFIDENTIAL = "confidential" # 기밀
RESTRICTED = "restricted" # 제한적 (PII, 금융)
class DataClassifier:
"""자동 데이터 분류"""
PII_PATTERNS = {
'email': r'[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+',
'phone': r'\d{3}-\d{3,4}-\d{4}',
'ssn': r'\d{3}-\d{2}-\d{4}',
'credit_card': r'\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}',
}
PII_COLUMN_NAMES = [
'email', 'phone', 'ssn', 'social_security',
'credit_card', 'password', 'address'
]
@classmethod
def classify_column(cls, column_name: str, sample_values: list) -> DataClassification:
"""컬럼 분류"""
column_lower = column_name.lower()
# 컬럼명 기반 분류
if any(pii in column_lower for pii in cls.PII_COLUMN_NAMES):
return DataClassification.RESTRICTED
# 값 패턴 기반 분류
import re
for value in sample_values[:100]: # 샘플링
if value is None:
continue
for pii_type, pattern in cls.PII_PATTERNS.items():
if re.match(pattern, str(value)):
return DataClassification.RESTRICTED
return DataClassification.INTERNAL
연습 문제¶
문제 1: Great Expectations¶
주문 데이터에 대한 Expectation Suite를 작성하세요 (NULL 체크, 유니크, 값 범위, 참조 무결성).
문제 2: 품질 대시보드¶
일별 데이터 품질 점수를 계산하고 시각화하는 파이프라인을 설계하세요.
문제 3: 리니지 추적¶
ETL 파이프라인의 리니지를 자동으로 추적하는 시스템을 설계하세요.
요약¶
| 개념 | 설명 |
|---|---|
| 데이터 품질 | 정확성, 완전성, 일관성, 적시성 보장 |
| Great Expectations | Python 기반 데이터 품질 프레임워크 |
| 데이터 카탈로그 | 메타데이터 관리 시스템 |
| 데이터 리니지 | 데이터 출처와 변환 추적 |
| 데이터 거버넌스 | 데이터 자산의 체계적 관리 |