05. 데이터 큐레이션
05. 데이터 큐레이션¶
개요¶
Foundation Model의 성능은 데이터 품질과 다양성에 크게 의존합니다. "Garbage in, garbage out"이 그 어느 때보다 중요합니다. 이 레슨에서는 대규모 사전학습 데이터셋의 구축, 정제, 관리 방법을 다룹니다.
1. 주요 Pre-training 데이터셋¶
1.1 데이터셋 개요¶
┌──────────────────────────────────────────────────────────────────┐
│ Pre-training 데이터셋 진화 │
├──────────────────────────────────────────────────────────────────┤
│ │
│ 2018: BookCorpus + Wikipedia (3.3B 토큰) → BERT │
│ │ │
│ 2019: WebText (40GB, Reddit 링크) → GPT-2 │
│ │ │
│ 2020: C4 (750GB, Common Crawl 정제) → T5 │
│ │ │
│ 2020: The Pile (825GB, 22개 소스) → GPT-Neo, Pythia │
│ │ │
│ 2022: ROOTS (1.6TB, 59언어) → BLOOM │
│ │ │
│ 2023: RedPajama (1.2T 토큰) → RedPajama-INCITE │
│ │ │
│ 2024: FineWeb (15T 토큰) → 최신 오픈 모델들 │
│ │
└──────────────────────────────────────────────────────────────────┘
1.2 주요 데이터셋 비교¶
| 데이터셋 | 크기 | 소스 | 특징 |
|---|---|---|---|
| The Pile | 825GB | 22개 다양한 소스 | 코드, 학술, 책 포함 |
| C4 | 750GB | Common Crawl | 영어만, 필터링됨 |
| RedPajama | 1.2T 토큰 | LLaMA 레시피 복제 | 오픈소스 |
| ROOTS | 1.6TB | 59개 언어 | 다국어, BigScience |
| FineWeb | 15T 토큰 | Common Crawl | HuggingFace, 최신 |
| Dolma | 3T 토큰 | 다양한 소스 | Allen AI, 투명성 강조 |
1.3 The Pile 구성¶
# The Pile의 22개 하위 데이터셋
PILE_COMPONENTS = {
# 웹 텍스트
'Pile-CC': 227.12, # Common Crawl 정제
'OpenWebText2': 62.77, # Reddit 링크 웹페이지
# 책과 문학
'Books3': 100.96, # 도서
'BookCorpus2': 6.30, # 추가 도서
'Gutenberg': 10.88, # 공개 도서
# 학술
'PubMed Central': 90.27, # 의학 논문
'ArXiv': 56.21, # 과학 논문
'PubMed Abstracts': 19.26, # 논문 초록
'PhilPapers': 2.38, # 철학 논문
'NIH ExPorter': 1.89, # NIH 연구 정보
# 코드
'Github': 95.16, # 깃허브 코드
'StackExchange': 32.20, # Q&A
# 기타
'Wikipedia (en)': 16.11,
'FreeLaw': 51.15, # 법률 문서
'USPTO': 22.90, # 특허
'DM Mathematics': 7.75, # 수학 문제
'Ubuntu IRC': 5.52, # IRC 로그
'EuroParl': 4.59, # EU 의회
'HackerNews': 3.90,
'YoutubeSubtitles': 3.73,
'Enron Emails': 0.88,
}
# 비율 계산
total = sum(PILE_COMPONENTS.values())
for name, size in sorted(PILE_COMPONENTS.items(), key=lambda x: -x[1])[:5]:
print(f"{name}: {size:.1f}GB ({size/total*100:.1f}%)")
2. 데이터 수집¶
2.1 Common Crawl 활용¶
import gzip
import json
from warcio.archiveiterator import ArchiveIterator
import requests
class CommonCrawlExtractor:
"""Common Crawl에서 텍스트 추출"""
CC_INDEX_URL = "https://index.commoncrawl.org/CC-MAIN-2024-10-index"
def fetch_warc_paths(self, domain: str, limit: int = 100) -> list[str]:
"""특정 도메인의 WARC 파일 경로 조회"""
params = {
'url': f'*.{domain}/*',
'output': 'json',
'limit': limit
}
response = requests.get(self.CC_INDEX_URL, params=params)
return [json.loads(line)['filename'] for line in response.text.strip().split('\n')]
def extract_text_from_warc(self, warc_url: str) -> list[dict]:
"""WARC 파일에서 텍스트 추출"""
results = []
response = requests.get(
f"https://data.commoncrawl.org/{warc_url}",
stream=True
)
with gzip.open(response.raw, 'rb') as stream:
for record in ArchiveIterator(stream):
if record.rec_type == 'response':
url = record.rec_headers.get_header('WARC-Target-URI')
content = record.content_stream().read().decode('utf-8', errors='ignore')
# HTML에서 텍스트 추출 (trafilatura 등 사용)
text = self.extract_text(content)
if text:
results.append({
'url': url,
'text': text,
'timestamp': record.rec_headers.get_header('WARC-Date')
})
return results
def extract_text(self, html: str) -> str:
"""HTML에서 본문 텍스트 추출"""
try:
import trafilatura
return trafilatura.extract(html)
except:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
# script, style 제거
for tag in soup(['script', 'style', 'nav', 'footer']):
tag.decompose()
return soup.get_text(separator=' ', strip=True)
2.2 GitHub 코드 수집¶
import os
from github import Github
from typing import Generator
class GitHubCodeCollector:
"""GitHub에서 코드 수집"""
# 수집할 언어와 확장자
LANGUAGES = {
'python': ['.py'],
'javascript': ['.js', '.jsx', '.ts', '.tsx'],
'java': ['.java'],
'cpp': ['.cpp', '.hpp', '.c', '.h'],
'go': ['.go'],
'rust': ['.rs'],
}
def __init__(self, token: str):
self.github = Github(token)
def collect_repos(
self,
language: str,
min_stars: int = 100,
limit: int = 1000
) -> Generator[dict, None, None]:
"""인기 저장소 수집"""
query = f"language:{language} stars:>{min_stars}"
repos = self.github.search_repositories(query, sort='stars')
for i, repo in enumerate(repos):
if i >= limit:
break
yield {
'name': repo.full_name,
'stars': repo.stargazers_count,
'language': repo.language,
'license': repo.license.key if repo.license else None,
'url': repo.html_url
}
def extract_code_files(
self,
repo_name: str,
extensions: list[str]
) -> Generator[dict, None, None]:
"""저장소에서 코드 파일 추출"""
repo = self.github.get_repo(repo_name)
try:
contents = repo.get_contents("")
while contents:
file_content = contents.pop(0)
if file_content.type == "dir":
contents.extend(repo.get_contents(file_content.path))
elif any(file_content.path.endswith(ext) for ext in extensions):
try:
content = file_content.decoded_content.decode('utf-8')
yield {
'path': file_content.path,
'content': content,
'size': file_content.size
}
except:
continue
except Exception as e:
print(f"Error processing {repo_name}: {e}")
3. 데이터 정제 파이프라인¶
3.1 품질 필터링¶
import re
from typing import Optional
import fasttext
from collections import Counter
class QualityFilter:
"""텍스트 품질 필터링"""
def __init__(self, lang_model_path: str = 'lid.176.bin'):
# FastText 언어 감지 모델
self.lang_detector = fasttext.load_model(lang_model_path)
def filter_document(self, text: str, target_lang: str = 'en') -> Optional[str]:
"""
문서 필터링
Returns:
정제된 텍스트 또는 None (필터링됨)
"""
# 1. 기본 필터
if not self._basic_filter(text):
return None
# 2. 언어 필터
if not self._language_filter(text, target_lang):
return None
# 3. 품질 점수
if not self._quality_score_filter(text):
return None
# 4. 텍스트 정제
cleaned = self._clean_text(text)
return cleaned if len(cleaned) > 100 else None
def _basic_filter(self, text: str) -> bool:
"""기본 필터링 규칙"""
# 최소/최대 길이
if len(text) < 100 or len(text) > 100000:
return False
# 단어 수
words = text.split()
if len(words) < 20:
return False
# 평균 단어 길이 (너무 짧거나 긴 경우 스팸 가능성)
avg_word_len = sum(len(w) for w in words) / len(words)
if avg_word_len < 3 or avg_word_len > 15:
return False
# 알파벳 비율
alpha_chars = sum(c.isalpha() for c in text)
if alpha_chars / len(text) < 0.6:
return False
return True
def _language_filter(self, text: str, target_lang: str) -> bool:
"""언어 필터링"""
# 첫 500자로 언어 감지
sample = text[:500].replace('\n', ' ')
predictions = self.lang_detector.predict(sample, k=1)
lang = predictions[0][0].replace('__label__', '')
confidence = predictions[1][0]
return lang == target_lang and confidence > 0.8
def _quality_score_filter(self, text: str) -> bool:
"""품질 점수 기반 필터링"""
lines = text.split('\n')
# 줄 끝 구두점 비율
end_punct = sum(1 for line in lines if line.strip() and line.strip()[-1] in '.!?')
punct_ratio = end_punct / max(len(lines), 1)
# 대문자로 시작하는 줄 비율
cap_start = sum(1 for line in lines if line.strip() and line.strip()[0].isupper())
cap_ratio = cap_start / max(len(lines), 1)
# 불릿/번호 목록 비율 (너무 높으면 목록 페이지)
bullet_lines = sum(1 for line in lines if re.match(r'^\s*[\-\*\•\d\.]\s', line))
bullet_ratio = bullet_lines / max(len(lines), 1)
# 품질 점수
if punct_ratio < 0.3: # 너무 적은 구두점
return False
if bullet_ratio > 0.5: # 너무 많은 목록
return False
return True
def _clean_text(self, text: str) -> str:
"""텍스트 정제"""
# URL 제거
text = re.sub(r'https?://\S+', '', text)
# 이메일 제거
text = re.sub(r'\S+@\S+\.\S+', '[EMAIL]', text)
# 과도한 공백 정리
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r' {2,}', ' ', text)
# 제어 문자 제거
text = ''.join(c for c in text if c.isprintable() or c in '\n\t')
return text.strip()
3.2 중복 제거¶
import hashlib
from datasketch import MinHash, MinHashLSH
from typing import Generator
class DeduplicationPipeline:
"""대규모 중복 제거 파이프라인"""
def __init__(
self,
num_perm: int = 128,
threshold: float = 0.8,
ngram_size: int = 5
):
self.num_perm = num_perm
self.threshold = threshold
self.ngram_size = ngram_size
# LSH 인덱스
self.lsh = MinHashLSH(threshold=threshold, num_perm=num_perm)
self.seen_hashes = set()
def get_minhash(self, text: str) -> MinHash:
"""텍스트의 MinHash 계산"""
minhash = MinHash(num_perm=self.num_perm)
# N-gram 생성
words = text.lower().split()
for i in range(len(words) - self.ngram_size + 1):
ngram = ' '.join(words[i:i + self.ngram_size])
minhash.update(ngram.encode('utf-8'))
return minhash
def exact_dedup(self, text: str) -> bool:
"""
정확한 중복 제거 (해시 기반)
Returns:
True if unique, False if duplicate
"""
# 정규화된 텍스트의 해시
normalized = ' '.join(text.lower().split())
text_hash = hashlib.md5(normalized.encode()).hexdigest()
if text_hash in self.seen_hashes:
return False
self.seen_hashes.add(text_hash)
return True
def fuzzy_dedup(self, doc_id: str, text: str) -> bool:
"""
퍼지 중복 제거 (MinHash LSH)
Returns:
True if unique, False if near-duplicate found
"""
minhash = self.get_minhash(text)
# 유사 문서 검색
result = self.lsh.query(minhash)
if result:
return False
# 새 문서 추가
self.lsh.insert(doc_id, minhash)
return True
def deduplicate_stream(
self,
documents: Generator[dict, None, None]
) -> Generator[dict, None, None]:
"""
스트리밍 중복 제거
"""
for i, doc in enumerate(documents):
text = doc['text']
doc_id = doc.get('id', str(i))
# 1단계: 정확한 중복
if not self.exact_dedup(text):
continue
# 2단계: 유사 중복
if not self.fuzzy_dedup(doc_id, text):
continue
yield doc
# 사용 예시
def deduplicate_dataset(input_path: str, output_path: str):
"""데이터셋 중복 제거"""
pipeline = DeduplicationPipeline(threshold=0.85)
def read_documents(path):
with open(path, 'r') as f:
for line in f:
yield json.loads(line)
unique_count = 0
total_count = 0
with open(output_path, 'w') as out:
for doc in pipeline.deduplicate_stream(read_documents(input_path)):
out.write(json.dumps(doc) + '\n')
unique_count += 1
total_count += 1
print(f"Total: {total_count}, Unique: {unique_count}")
print(f"Dedup ratio: {(1 - unique_count/total_count)*100:.1f}%")
4. 데이터 믹싱¶
4.1 도메인 믹싱 전략¶
import numpy as np
from dataclasses import dataclass
from typing import Iterator
@dataclass
class DataSource:
name: str
path: str
weight: float # 샘플링 가중치
quality_score: float # 품질 점수 (0-1)
class DataMixer:
"""
다양한 소스의 데이터 믹싱
전략:
1. 품질 기반: 고품질 소스 더 많이 샘플링
2. 다양성 기반: 모든 도메인 균형있게
3. Scaling law 기반: 최적 비율 탐색
"""
# LLaMA 스타일 믹싱 비율
LLAMA_MIX = {
'CommonCrawl': 0.67, # 웹
'C4': 0.15, # 정제된 웹
'Github': 0.045, # 코드
'Wikipedia': 0.045, # 백과사전
'Books': 0.045, # 도서
'ArXiv': 0.025, # 과학
'StackExchange': 0.02, # Q&A
}
def __init__(self, sources: list[DataSource]):
self.sources = sources
self.normalize_weights()
def normalize_weights(self):
"""가중치 정규화"""
total = sum(s.weight for s in self.sources)
for source in self.sources:
source.weight /= total
def temperature_sampling(
self,
temperature: float = 1.0
) -> list[float]:
"""
Temperature 기반 샘플링 확률 조정
temperature < 1: 고빈도 소스에 집중
temperature > 1: 균등하게 분산
"""
weights = np.array([s.weight for s in self.sources])
# Temperature 적용
adjusted = np.power(weights, 1 / temperature)
adjusted /= adjusted.sum()
return adjusted.tolist()
def sample_batch(
self,
batch_size: int,
temperature: float = 1.0
) -> list[tuple[str, int]]:
"""
배치 샘플링
Returns:
List of (source_name, num_samples)
"""
probs = self.temperature_sampling(temperature)
# 각 소스에서 샘플링할 문서 수
samples = np.random.multinomial(batch_size, probs)
return [
(source.name, count)
for source, count in zip(self.sources, samples)
]
def iter_mixed_data(
self,
batch_size: int = 1000,
temperature: float = 1.0
) -> Iterator[dict]:
"""혼합 데이터 이터레이터"""
source_iters = {
s.name: self._read_source(s.path)
for s in self.sources
}
while True:
batch_plan = self.sample_batch(batch_size, temperature)
for source_name, count in batch_plan:
for _ in range(count):
try:
yield next(source_iters[source_name])
except StopIteration:
# 소스 재시작 또는 종료
break
@staticmethod
def _read_source(path: str) -> Iterator[dict]:
"""데이터 소스 읽기"""
with open(path, 'r') as f:
for line in f:
yield json.loads(line)
# 최적 믹싱 비율 탐색
def find_optimal_mix(
sources: list[DataSource],
validation_data: list,
model_fn,
n_trials: int = 20
) -> dict[str, float]:
"""
Bayesian Optimization으로 최적 믹싱 비율 탐색
"""
import optuna
def objective(trial):
# 각 소스의 가중치 샘플링
weights = {}
for source in sources:
weights[source.name] = trial.suggest_float(
source.name, 0.01, 1.0
)
# 정규화
total = sum(weights.values())
weights = {k: v/total for k, v in weights.items()}
# 모델 학습 및 검증
# (실제로는 작은 모델로 프록시 실험)
val_loss = model_fn(weights, validation_data)
return val_loss
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=n_trials)
return study.best_params
4.2 다국어 믹싱¶
class MultilingualMixer:
"""
다국어 데이터 믹싱
전략:
1. 영어 과대표집 방지
2. 저자원 언어 업샘플링
3. 언어 유사성 기반 그룹핑
"""
# 언어별 기본 비율 (BLOOM 스타일)
BLOOM_RATIOS = {
'en': 0.30, # 영어
'zh': 0.15, # 중국어
'fr': 0.12, # 프랑스어
'es': 0.10, # 스페인어
'pt': 0.08, # 포르투갈어
'ar': 0.05, # 아랍어
# ... 기타 언어
}
def __init__(self, language_weights: dict[str, float]):
self.language_weights = language_weights
def exponential_smoothing(
self,
alpha: float = 0.3
) -> dict[str, float]:
"""
지수 스무딩으로 저자원 언어 업샘플링
P(lang) ∝ P_original(lang)^alpha
alpha < 1: 저자원 언어 비율 증가
alpha = 1: 원본 비율 유지
"""
smoothed = {
lang: weight ** alpha
for lang, weight in self.language_weights.items()
}
total = sum(smoothed.values())
return {lang: w/total for lang, w in smoothed.items()}
def sample_by_language(
self,
documents: list[dict],
target_ratio: dict[str, float]
) -> list[dict]:
"""언어별 목표 비율에 맞게 샘플링"""
by_lang = {}
for doc in documents:
lang = doc.get('lang', 'en')
by_lang.setdefault(lang, []).append(doc)
sampled = []
total_target = len(documents)
for lang, ratio in target_ratio.items():
if lang in by_lang:
n_samples = int(total_target * ratio)
lang_docs = by_lang[lang]
if len(lang_docs) >= n_samples:
# 다운샘플링
sampled.extend(np.random.choice(lang_docs, n_samples, replace=False))
else:
# 업샘플링
sampled.extend(np.random.choice(lang_docs, n_samples, replace=True))
return sampled
5. 데이터 품질 평가¶
5.1 자동 품질 점수¶
import kenlm
from transformers import AutoModelForSequenceClassification, AutoTokenizer
class DataQualityScorer:
"""데이터 품질 자동 평가"""
def __init__(
self,
perplexity_model_path: str = None,
classifier_model_name: str = None
):
# 1. Perplexity 기반 (KenLM)
if perplexity_model_path:
self.lm = kenlm.Model(perplexity_model_path)
else:
self.lm = None
# 2. 분류기 기반 (예: 위키피디아 vs 웹)
if classifier_model_name:
self.classifier = AutoModelForSequenceClassification.from_pretrained(
classifier_model_name
)
self.tokenizer = AutoTokenizer.from_pretrained(classifier_model_name)
else:
self.classifier = None
def perplexity_score(self, text: str) -> float:
"""
KenLM perplexity 점수
낮을수록 고품질 (언어 모델에 자연스러운 텍스트)
"""
if self.lm is None:
return 0.0
# 문장 단위 perplexity
score = self.lm.score(text, bos=True, eos=True)
perplexity = 10 ** (-score / len(text.split()))
return perplexity
def classifier_score(self, text: str) -> float:
"""
품질 분류기 점수 (0-1)
높을수록 고품질
"""
if self.classifier is None:
return 0.5
inputs = self.tokenizer(
text[:512],
return_tensors='pt',
truncation=True
)
with torch.no_grad():
outputs = self.classifier(**inputs)
probs = torch.softmax(outputs.logits, dim=-1)
# positive class 확률
return probs[0, 1].item()
def heuristic_score(self, text: str) -> dict[str, float]:
"""휴리스틱 기반 품질 점수"""
lines = text.split('\n')
words = text.split()
scores = {
# 1. 알파벳 비율
'alpha_ratio': sum(c.isalpha() for c in text) / max(len(text), 1),
# 2. 줄당 평균 단어 수
'words_per_line': len(words) / max(len(lines), 1),
# 3. 중복 줄 비율
'unique_lines_ratio': len(set(lines)) / max(len(lines), 1),
# 4. 구두점 비율
'punct_ratio': sum(c in '.,!?;:' for c in text) / max(len(text), 1),
# 5. 대문자 비율 (너무 높으면 스팸)
'caps_ratio': sum(c.isupper() for c in text) / max(len(text), 1),
# 6. 숫자 비율
'digit_ratio': sum(c.isdigit() for c in text) / max(len(text), 1),
}
return scores
def combined_score(self, text: str) -> float:
"""종합 품질 점수"""
heuristics = self.heuristic_score(text)
# 각 휴리스틱의 이상적 범위
score = 1.0
# 알파벳 비율: 0.7-0.9 이상적
if heuristics['alpha_ratio'] < 0.6:
score *= 0.8
# 대문자 비율: 0.1 이하 이상적
if heuristics['caps_ratio'] > 0.3:
score *= 0.7
# 중복 줄: 0.8 이상 이상적
if heuristics['unique_lines_ratio'] < 0.5:
score *= 0.6
# Perplexity 점수 (낮을수록 좋음)
ppl = self.perplexity_score(text)
if ppl > 1000:
score *= 0.5
elif ppl > 500:
score *= 0.8
return score
6. 실습: FineWeb 스타일 파이프라인¶
class FineWebPipeline:
"""
FineWeb 스타일 데이터 파이프라인
단계:
1. URL 필터링
2. 텍스트 추출
3. 언어 감지
4. 품질 필터링
5. 중복 제거
6. PII 제거
"""
def __init__(self):
self.quality_filter = QualityFilter()
self.dedup = DeduplicationPipeline()
self.quality_scorer = DataQualityScorer()
def process_batch(
self,
warc_batch: list[dict]
) -> list[dict]:
"""배치 처리"""
results = []
for record in warc_batch:
# 1. URL 필터링
if not self._url_filter(record['url']):
continue
# 2. 텍스트 추출
text = self._extract_text(record['html'])
if not text:
continue
# 3. 품질 필터링
text = self.quality_filter.filter_document(text)
if not text:
continue
# 4. 품질 점수
score = self.quality_scorer.combined_score(text)
if score < 0.5:
continue
# 5. PII 마스킹
text = self._mask_pii(text)
results.append({
'url': record['url'],
'text': text,
'quality_score': score
})
# 6. 중복 제거
return list(self.dedup.deduplicate_stream(iter(results)))
def _url_filter(self, url: str) -> bool:
"""URL 기반 필터링"""
# 블랙리스트 도메인
blacklist = ['porn', 'xxx', 'adult', 'gambling']
if any(b in url.lower() for b in blacklist):
return False
# 허용 확장자
if any(url.endswith(ext) for ext in ['.pdf', '.jpg', '.png', '.gif']):
return False
return True
def _extract_text(self, html: str) -> str:
"""HTML에서 본문 추출"""
import trafilatura
return trafilatura.extract(html) or ''
def _mask_pii(self, text: str) -> str:
"""개인정보 마스킹"""
import re
# 이메일
text = re.sub(r'\b[\w.-]+@[\w.-]+\.\w+\b', '[EMAIL]', text)
# 전화번호 (미국 형식)
text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]', text)
# IP 주소
text = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '[IP]', text)
# 신용카드
text = re.sub(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', '[CARD]', text)
return text
# 실행
if __name__ == "__main__":
pipeline = FineWebPipeline()
# Common Crawl 배치 처리
warc_batch = [...] # WARC 레코드
cleaned_data = pipeline.process_batch(warc_batch)
print(f"입력: {len(warc_batch)}, 출력: {len(cleaned_data)}")
print(f"필터링 비율: {(1 - len(cleaned_data)/len(warc_batch))*100:.1f}%")
참고 자료¶
데이터셋¶
논문¶
- Gao et al. (2020). "The Pile: An 800GB Dataset of Diverse Text"
- Penedo et al. (2023). "The RefinedWeb Dataset for Falcon LLM"
- Soldaini et al. (2024). "Dolma: An Open Corpus of 3T Tokens"
도구¶
- trafilatura: HTML 텍스트 추출
- datasketch: MinHash LSH
- fasttext: 언어 감지