|
|
|
|
|
|
|
|
|
|
|
import seaborn as sns |
|
|
import matplotlib.pyplot as plt |
|
|
from sklearn.feature_selection import SelectKBest, chi2 |
|
|
from sklearn.preprocessing import FunctionTransformer |
|
|
from sklearn.pipeline import Pipeline |
|
|
from sklearn.metrics import ( |
|
|
accuracy_score, precision_score, recall_score, f1_score, |
|
|
roc_auc_score, confusion_matrix, classification_report, |
|
|
precision_recall_curve, roc_curve |
|
|
) |
|
|
from sklearn.model_selection import ( |
|
|
train_test_split, cross_val_score, GridSearchCV, |
|
|
StratifiedKFold, validation_curve, cross_validate |
|
|
) |
|
|
from sklearn.ensemble import RandomForestClassifier |
|
|
from sklearn.linear_model import LogisticRegression |
|
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from pathlib import Path |
|
|
import logging |
|
|
import json |
|
|
import joblib |
|
|
import hashlib |
|
|
import sys |
|
|
import os |
|
|
import time |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Dict, Tuple, Optional, Any, List |
|
|
import warnings |
|
|
import re |
|
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
|
handlers=[ |
|
|
logging.FileHandler('/tmp/model_training.log'), |
|
|
logging.StreamHandler() |
|
|
] |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
def preprocess_text_function(texts): |
|
|
""" |
|
|
Standalone function for text preprocessing - pickle-safe |
|
|
""" |
|
|
def clean_single_text(text): |
|
|
|
|
|
text = str(text) |
|
|
|
|
|
|
|
|
text = re.sub(r'http\S+|www\S+|https\S+', '', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\S+@\S+', '', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'[!]{2,}', '!', text) |
|
|
text = re.sub(r'[?]{2,}', '?', text) |
|
|
text = re.sub(r'[.]{3,}', '...', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'[^a-zA-Z\s.!?]', '', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
return text.strip().lower() |
|
|
|
|
|
|
|
|
processed = [] |
|
|
for text in texts: |
|
|
processed.append(clean_single_text(text)) |
|
|
|
|
|
return processed |
|
|
|
|
|
|
|
|
class ProgressTracker: |
|
|
"""Progress tracking with time estimation""" |
|
|
|
|
|
def __init__(self, total_steps: int, description: str = "Training"): |
|
|
self.total_steps = total_steps |
|
|
self.current_step = 0 |
|
|
self.start_time = time.time() |
|
|
self.description = description |
|
|
self.step_times = [] |
|
|
|
|
|
def update(self, step_name: str = ""): |
|
|
"""Update progress and print status""" |
|
|
self.current_step += 1 |
|
|
current_time = time.time() |
|
|
elapsed = current_time - self.start_time |
|
|
|
|
|
|
|
|
progress_pct = (self.current_step / self.total_steps) * 100 |
|
|
|
|
|
|
|
|
if self.current_step > 0: |
|
|
avg_time_per_step = elapsed / self.current_step |
|
|
remaining_steps = self.total_steps - self.current_step |
|
|
eta_seconds = avg_time_per_step * remaining_steps |
|
|
eta = timedelta(seconds=int(eta_seconds)) |
|
|
else: |
|
|
eta = "calculating..." |
|
|
|
|
|
|
|
|
bar_length = 30 |
|
|
filled_length = int(bar_length * self.current_step // self.total_steps) |
|
|
bar = '█' * filled_length + '░' * (bar_length - filled_length) |
|
|
|
|
|
|
|
|
status_msg = f"\r{self.description}: [{bar}] {progress_pct:.1f}% | Step {self.current_step}/{self.total_steps}" |
|
|
if step_name: |
|
|
status_msg += f" | {step_name}" |
|
|
if eta != "calculating...": |
|
|
status_msg += f" | ETA: {eta}" |
|
|
|
|
|
print(status_msg, end='', flush=True) |
|
|
|
|
|
|
|
|
progress_json = { |
|
|
"type": "progress", |
|
|
"step": self.current_step, |
|
|
"total": self.total_steps, |
|
|
"percentage": progress_pct, |
|
|
"eta": str(eta) if eta != "calculating..." else None, |
|
|
"step_name": step_name, |
|
|
"elapsed": elapsed |
|
|
} |
|
|
print(f"\nPROGRESS_JSON: {json.dumps(progress_json)}") |
|
|
|
|
|
|
|
|
if len(self.step_times) >= 3: |
|
|
self.step_times.pop(0) |
|
|
self.step_times.append(current_time - (self.start_time + sum(self.step_times))) |
|
|
|
|
|
def finish(self): |
|
|
"""Complete progress tracking""" |
|
|
total_time = time.time() - self.start_time |
|
|
print(f"\n{self.description} completed in {timedelta(seconds=int(total_time))}") |
|
|
|
|
|
|
|
|
def estimate_training_time(dataset_size: int, enable_tuning: bool = True, cv_folds: int = 5) -> Dict: |
|
|
"""Estimate training time based on dataset characteristics""" |
|
|
|
|
|
|
|
|
base_times = { |
|
|
'preprocessing': max(0.1, dataset_size * 0.001), |
|
|
'vectorization': max(0.5, dataset_size * 0.01), |
|
|
'feature_selection': max(0.2, dataset_size * 0.005), |
|
|
'simple_training': max(1.0, dataset_size * 0.02), |
|
|
'evaluation': max(0.5, dataset_size * 0.01), |
|
|
} |
|
|
|
|
|
|
|
|
tuning_multipliers = { |
|
|
'logistic_regression': 8 if enable_tuning else 1, |
|
|
'random_forest': 12 if enable_tuning else 1, |
|
|
} |
|
|
|
|
|
|
|
|
cv_multiplier = cv_folds if dataset_size > 100 else 1 |
|
|
|
|
|
|
|
|
estimates = {} |
|
|
|
|
|
|
|
|
estimates['data_loading'] = 0.5 |
|
|
estimates['preprocessing'] = base_times['preprocessing'] |
|
|
estimates['vectorization'] = base_times['vectorization'] |
|
|
estimates['feature_selection'] = base_times['feature_selection'] |
|
|
|
|
|
|
|
|
for model_name, multiplier in tuning_multipliers.items(): |
|
|
model_time = base_times['simple_training'] * multiplier * cv_multiplier |
|
|
estimates[f'{model_name}_training'] = model_time |
|
|
estimates[f'{model_name}_evaluation'] = base_times['evaluation'] |
|
|
|
|
|
|
|
|
estimates['cross_validation'] = base_times['simple_training'] * cv_folds * 0.5 |
|
|
|
|
|
|
|
|
estimates['model_saving'] = 1.0 |
|
|
|
|
|
|
|
|
total_estimate = sum(estimates.values()) |
|
|
|
|
|
|
|
|
total_estimate *= 1.2 |
|
|
|
|
|
return { |
|
|
'detailed_estimates': estimates, |
|
|
'total_seconds': total_estimate, |
|
|
'total_formatted': str(timedelta(seconds=int(total_estimate))), |
|
|
'dataset_size': dataset_size, |
|
|
'enable_tuning': enable_tuning, |
|
|
'cv_folds': cv_folds |
|
|
} |
|
|
|
|
|
|
|
|
class CrossValidationManager: |
|
|
"""Advanced cross-validation management with comprehensive metrics""" |
|
|
|
|
|
def __init__(self, cv_folds: int = 5, random_state: int = 42): |
|
|
self.cv_folds = cv_folds |
|
|
self.random_state = random_state |
|
|
self.cv_results = {} |
|
|
|
|
|
def create_cv_strategy(self, X, y) -> StratifiedKFold: |
|
|
"""Create appropriate CV strategy based on data characteristics""" |
|
|
|
|
|
n_samples = len(X) |
|
|
min_samples_per_fold = 3 |
|
|
max_folds = n_samples // min_samples_per_fold |
|
|
|
|
|
|
|
|
unique_classes = np.unique(y) |
|
|
min_class_count = min([np.sum(y == cls) for cls in unique_classes]) |
|
|
|
|
|
|
|
|
max_folds_by_class = min_class_count |
|
|
|
|
|
actual_folds = max(2, min(self.cv_folds, max_folds, max_folds_by_class)) |
|
|
|
|
|
logger.info(f"Using {actual_folds} CV folds (requested: {self.cv_folds})") |
|
|
|
|
|
return StratifiedKFold( |
|
|
n_splits=actual_folds, |
|
|
shuffle=True, |
|
|
random_state=self.random_state |
|
|
) |
|
|
|
|
|
def perform_cross_validation(self, pipeline, X, y, cv_strategy=None) -> Dict: |
|
|
"""Perform comprehensive cross-validation with multiple metrics""" |
|
|
|
|
|
if cv_strategy is None: |
|
|
cv_strategy = self.create_cv_strategy(X, y) |
|
|
|
|
|
logger.info(f"Starting cross-validation with {cv_strategy.n_splits} folds...") |
|
|
|
|
|
|
|
|
scoring_metrics = { |
|
|
'accuracy': 'accuracy', |
|
|
'precision': 'precision_weighted', |
|
|
'recall': 'recall_weighted', |
|
|
'f1': 'f1_weighted', |
|
|
'roc_auc': 'roc_auc' |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
cv_scores = cross_validate( |
|
|
pipeline, X, y, |
|
|
cv=cv_strategy, |
|
|
scoring=scoring_metrics, |
|
|
return_train_score=True, |
|
|
n_jobs=1, |
|
|
verbose=0 |
|
|
) |
|
|
|
|
|
|
|
|
cv_results = { |
|
|
'n_splits': cv_strategy.n_splits, |
|
|
'test_scores': {}, |
|
|
'train_scores': {}, |
|
|
'fold_results': [] |
|
|
} |
|
|
|
|
|
|
|
|
for metric_name in scoring_metrics.keys(): |
|
|
test_key = f'test_{metric_name}' |
|
|
train_key = f'train_{metric_name}' |
|
|
|
|
|
if test_key in cv_scores: |
|
|
test_scores = cv_scores[test_key] |
|
|
cv_results['test_scores'][metric_name] = { |
|
|
'mean': float(np.mean(test_scores)), |
|
|
'std': float(np.std(test_scores)), |
|
|
'min': float(np.min(test_scores)), |
|
|
'max': float(np.max(test_scores)), |
|
|
'scores': test_scores.tolist() |
|
|
} |
|
|
|
|
|
if train_key in cv_scores: |
|
|
train_scores = cv_scores[train_key] |
|
|
cv_results['train_scores'][metric_name] = { |
|
|
'mean': float(np.mean(train_scores)), |
|
|
'std': float(np.std(train_scores)), |
|
|
'min': float(np.min(train_scores)), |
|
|
'max': float(np.max(train_scores)), |
|
|
'scores': train_scores.tolist() |
|
|
} |
|
|
|
|
|
|
|
|
for fold_idx in range(cv_strategy.n_splits): |
|
|
fold_result = { |
|
|
'fold': fold_idx + 1, |
|
|
'test_scores': {}, |
|
|
'train_scores': {} |
|
|
} |
|
|
|
|
|
for metric_name in scoring_metrics.keys(): |
|
|
test_key = f'test_{metric_name}' |
|
|
train_key = f'train_{metric_name}' |
|
|
|
|
|
if test_key in cv_scores: |
|
|
fold_result['test_scores'][metric_name] = float(cv_scores[test_key][fold_idx]) |
|
|
if train_key in cv_scores: |
|
|
fold_result['train_scores'][metric_name] = float(cv_scores[train_key][fold_idx]) |
|
|
|
|
|
cv_results['fold_results'].append(fold_result) |
|
|
|
|
|
|
|
|
if 'accuracy' in cv_results['test_scores'] and 'accuracy' in cv_results['train_scores']: |
|
|
train_mean = cv_results['train_scores']['accuracy']['mean'] |
|
|
test_mean = cv_results['test_scores']['accuracy']['mean'] |
|
|
cv_results['overfitting_score'] = float(train_mean - test_mean) |
|
|
|
|
|
|
|
|
if 'accuracy' in cv_results['test_scores']: |
|
|
test_std = cv_results['test_scores']['accuracy']['std'] |
|
|
test_mean = cv_results['test_scores']['accuracy']['mean'] |
|
|
cv_results['stability_score'] = float(1 - (test_std / test_mean)) if test_mean > 0 else 0 |
|
|
|
|
|
logger.info(f"Cross-validation completed successfully") |
|
|
logger.info(f"Mean test accuracy: {cv_results['test_scores'].get('accuracy', {}).get('mean', 'N/A'):.4f}") |
|
|
logger.info(f"Mean test F1: {cv_results['test_scores'].get('f1', {}).get('mean', 'N/A'):.4f}") |
|
|
|
|
|
return cv_results |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Cross-validation failed: {e}") |
|
|
return { |
|
|
'error': str(e), |
|
|
'n_splits': cv_strategy.n_splits if cv_strategy else self.cv_folds, |
|
|
'fallback': True |
|
|
} |
|
|
|
|
|
def compare_cv_results(self, results1: Dict, results2: Dict, metric: str = 'f1') -> Dict: |
|
|
"""Compare cross-validation results between two models""" |
|
|
|
|
|
try: |
|
|
if 'error' in results1 or 'error' in results2: |
|
|
return {'error': 'Cannot compare results with errors'} |
|
|
|
|
|
scores1 = results1['test_scores'][metric]['scores'] |
|
|
scores2 = results2['test_scores'][metric]['scores'] |
|
|
|
|
|
|
|
|
from scipy import stats |
|
|
t_stat, p_value = stats.ttest_rel(scores1, scores2) |
|
|
|
|
|
comparison = { |
|
|
'metric': metric, |
|
|
'model1_mean': results1['test_scores'][metric]['mean'], |
|
|
'model2_mean': results2['test_scores'][metric]['mean'], |
|
|
'model1_std': results1['test_scores'][metric]['std'], |
|
|
'model2_std': results2['test_scores'][metric]['std'], |
|
|
'difference': results2['test_scores'][metric]['mean'] - results1['test_scores'][metric]['mean'], |
|
|
'paired_ttest': { |
|
|
't_statistic': float(t_stat), |
|
|
'p_value': float(p_value), |
|
|
'significant': p_value < 0.05 |
|
|
}, |
|
|
'effect_size': float(abs(t_stat) / np.sqrt(len(scores1))) if len(scores1) > 0 else 0 |
|
|
} |
|
|
|
|
|
return comparison |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"CV comparison failed: {e}") |
|
|
return {'error': str(e)} |
|
|
|
|
|
|
|
|
class RobustModelTrainer: |
|
|
"""Production-ready model trainer with comprehensive cross-validation""" |
|
|
|
|
|
def __init__(self): |
|
|
self.setup_paths() |
|
|
self.setup_training_config() |
|
|
self.setup_models() |
|
|
self.progress_tracker = None |
|
|
self.cv_manager = CrossValidationManager() |
|
|
|
|
|
def setup_paths(self): |
|
|
"""Setup all necessary paths with proper permissions""" |
|
|
self.base_dir = Path("/tmp") |
|
|
self.data_dir = self.base_dir / "data" |
|
|
self.model_dir = self.base_dir / "model" |
|
|
self.results_dir = self.base_dir / "results" |
|
|
|
|
|
|
|
|
for dir_path in [self.data_dir, self.model_dir, self.results_dir]: |
|
|
dir_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
try: |
|
|
dir_path.chmod(0o755) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
self.data_path = self.data_dir / "combined_dataset.csv" |
|
|
self.model_path = Path("/tmp/model.pkl") |
|
|
self.vectorizer_path = Path("/tmp/vectorizer.pkl") |
|
|
self.pipeline_path = Path("/tmp/pipeline.pkl") |
|
|
self.metadata_path = Path("/tmp/metadata.json") |
|
|
self.evaluation_path = self.results_dir / "evaluation_results.json" |
|
|
|
|
|
def setup_training_config(self): |
|
|
"""Setup training configuration with CV parameters""" |
|
|
self.test_size = 0.2 |
|
|
self.validation_size = 0.1 |
|
|
self.random_state = 42 |
|
|
self.cv_folds = 5 |
|
|
self.max_features = 5000 |
|
|
self.min_df = 1 |
|
|
self.max_df = 0.95 |
|
|
self.ngram_range = (1, 2) |
|
|
self.max_iter = 500 |
|
|
self.class_weight = 'balanced' |
|
|
self.feature_selection_k = 2000 |
|
|
|
|
|
def setup_models(self): |
|
|
"""Setup model configurations for comparison""" |
|
|
self.models = { |
|
|
'logistic_regression': { |
|
|
'model': LogisticRegression( |
|
|
max_iter=self.max_iter, |
|
|
class_weight=self.class_weight, |
|
|
random_state=self.random_state, |
|
|
n_jobs=-1 |
|
|
), |
|
|
'param_grid': { |
|
|
'model__C': [0.1, 1, 10], |
|
|
'model__penalty': ['l2'] |
|
|
} |
|
|
}, |
|
|
'random_forest': { |
|
|
'model': RandomForestClassifier( |
|
|
n_estimators=50, |
|
|
class_weight=self.class_weight, |
|
|
random_state=self.random_state, |
|
|
n_jobs=-1 |
|
|
), |
|
|
'param_grid': { |
|
|
'model__n_estimators': [50, 100], |
|
|
'model__max_depth': [10, None] |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
def load_and_validate_data(self) -> Tuple[bool, Optional[pd.DataFrame], str]: |
|
|
"""Load and validate training data""" |
|
|
try: |
|
|
logger.info("Loading training data...") |
|
|
if self.progress_tracker: |
|
|
self.progress_tracker.update("Loading data") |
|
|
|
|
|
if not self.data_path.exists(): |
|
|
return False, None, f"Data file not found: {self.data_path}" |
|
|
|
|
|
|
|
|
df = pd.read_csv(self.data_path) |
|
|
|
|
|
|
|
|
if df.empty: |
|
|
return False, None, "Dataset is empty" |
|
|
|
|
|
required_columns = ['text', 'label'] |
|
|
missing_columns = [ |
|
|
col for col in required_columns if col not in df.columns] |
|
|
if missing_columns: |
|
|
return False, None, f"Missing required columns: {missing_columns}" |
|
|
|
|
|
|
|
|
initial_count = len(df) |
|
|
df = df.dropna(subset=required_columns) |
|
|
if len(df) < initial_count: |
|
|
logger.warning( |
|
|
f"Removed {initial_count - len(df)} rows with missing values") |
|
|
|
|
|
|
|
|
df = df[df['text'].astype(str).str.len() > 10] |
|
|
|
|
|
|
|
|
unique_labels = df['label'].unique() |
|
|
if len(unique_labels) < 2: |
|
|
return False, None, f"Need at least 2 classes, found: {unique_labels}" |
|
|
|
|
|
|
|
|
min_samples_for_cv = self.cv_folds * 2 |
|
|
if len(df) < min_samples_for_cv: |
|
|
logger.warning(f"Dataset size ({len(df)}) is small for {self.cv_folds}-fold CV") |
|
|
|
|
|
self.cv_manager.cv_folds = max(2, len(df) // 3) |
|
|
logger.info(f"Adjusted CV folds to {self.cv_manager.cv_folds}") |
|
|
|
|
|
|
|
|
label_counts = df['label'].value_counts() |
|
|
min_class_ratio = label_counts.min() / label_counts.max() |
|
|
if min_class_ratio < 0.1: |
|
|
logger.warning( |
|
|
f"Severe class imbalance detected: {min_class_ratio:.3f}") |
|
|
|
|
|
logger.info( |
|
|
f"Data validation successful: {len(df)} samples, {len(unique_labels)} classes") |
|
|
logger.info(f"Class distribution: {label_counts.to_dict()}") |
|
|
|
|
|
return True, df, "Data loaded successfully" |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error loading data: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
return False, None, error_msg |
|
|
|
|
|
def create_preprocessing_pipeline(self) -> Pipeline: |
|
|
"""Create preprocessing pipeline""" |
|
|
|
|
|
if self.progress_tracker: |
|
|
self.progress_tracker.update("Creating pipeline") |
|
|
|
|
|
|
|
|
text_preprocessor = FunctionTransformer( |
|
|
func=preprocess_text_function, |
|
|
validate=False |
|
|
) |
|
|
|
|
|
|
|
|
vectorizer = TfidfVectorizer( |
|
|
max_features=self.max_features, |
|
|
min_df=self.min_df, |
|
|
max_df=self.max_df, |
|
|
ngram_range=self.ngram_range, |
|
|
stop_words='english', |
|
|
sublinear_tf=True, |
|
|
norm='l2' |
|
|
) |
|
|
|
|
|
|
|
|
feature_selector = SelectKBest( |
|
|
score_func=chi2, |
|
|
k=min(self.feature_selection_k, self.max_features) |
|
|
) |
|
|
|
|
|
|
|
|
pipeline = Pipeline([ |
|
|
('preprocess', text_preprocessor), |
|
|
('vectorize', vectorizer), |
|
|
('feature_select', feature_selector), |
|
|
('model', None) |
|
|
]) |
|
|
|
|
|
return pipeline |
|
|
|
|
|
def comprehensive_evaluation(self, model, X_test, y_test, X_train=None, y_train=None) -> Dict: |
|
|
"""Comprehensive model evaluation with cross-validation integration""" |
|
|
|
|
|
if self.progress_tracker: |
|
|
self.progress_tracker.update("Evaluating model") |
|
|
|
|
|
|
|
|
y_pred = model.predict(X_test) |
|
|
y_pred_proba = model.predict_proba(X_test)[:, 1] |
|
|
|
|
|
|
|
|
metrics = { |
|
|
'accuracy': float(accuracy_score(y_test, y_pred)), |
|
|
'precision': float(precision_score(y_test, y_pred, average='weighted')), |
|
|
'recall': float(recall_score(y_test, y_pred, average='weighted')), |
|
|
'f1': float(f1_score(y_test, y_pred, average='weighted')), |
|
|
'roc_auc': float(roc_auc_score(y_test, y_pred_proba)) |
|
|
} |
|
|
|
|
|
|
|
|
cm = confusion_matrix(y_test, y_pred) |
|
|
metrics['confusion_matrix'] = cm.tolist() |
|
|
|
|
|
|
|
|
if X_train is not None and y_train is not None: |
|
|
|
|
|
X_full = np.concatenate([X_train, X_test]) |
|
|
y_full = np.concatenate([y_train, y_test]) |
|
|
|
|
|
logger.info("Performing cross-validation on full dataset...") |
|
|
cv_results = self.cv_manager.perform_cross_validation(model, X_full, y_full) |
|
|
metrics['cross_validation'] = cv_results |
|
|
|
|
|
|
|
|
if 'test_scores' in cv_results and 'f1' in cv_results['test_scores']: |
|
|
cv_f1_mean = cv_results['test_scores']['f1']['mean'] |
|
|
cv_f1_std = cv_results['test_scores']['f1']['std'] |
|
|
logger.info(f"CV F1 Score: {cv_f1_mean:.4f} (±{cv_f1_std:.4f})") |
|
|
|
|
|
|
|
|
try: |
|
|
if X_train is not None and y_train is not None: |
|
|
y_train_pred = model.predict(X_train) |
|
|
train_accuracy = accuracy_score(y_train, y_train_pred) |
|
|
metrics['train_accuracy'] = float(train_accuracy) |
|
|
metrics['overfitting_score'] = float( |
|
|
train_accuracy - metrics['accuracy']) |
|
|
except Exception as e: |
|
|
logger.warning(f"Overfitting detection failed: {e}") |
|
|
|
|
|
return metrics |
|
|
|
|
|
def hyperparameter_tuning_with_cv(self, pipeline, X_train, y_train, model_name: str) -> Tuple[Any, Dict]: |
|
|
"""Perform hyperparameter tuning with nested cross-validation""" |
|
|
|
|
|
if self.progress_tracker: |
|
|
self.progress_tracker.update(f"Tuning {model_name} with CV") |
|
|
|
|
|
try: |
|
|
|
|
|
pipeline.set_params(model=self.models[model_name]['model']) |
|
|
|
|
|
|
|
|
if len(X_train) < 20: |
|
|
logger.info(f"Skipping hyperparameter tuning for {model_name} due to small dataset") |
|
|
pipeline.fit(X_train, y_train) |
|
|
|
|
|
|
|
|
cv_results = self.cv_manager.perform_cross_validation(pipeline, X_train, y_train) |
|
|
|
|
|
return pipeline, { |
|
|
'best_params': 'default_parameters', |
|
|
'best_score': cv_results.get('test_scores', {}).get('f1', {}).get('mean', 'not_calculated'), |
|
|
'best_estimator': pipeline, |
|
|
'cross_validation': cv_results, |
|
|
'note': 'Hyperparameter tuning skipped for small dataset' |
|
|
} |
|
|
|
|
|
|
|
|
param_grid = self.models[model_name]['param_grid'] |
|
|
|
|
|
|
|
|
cv_strategy = self.cv_manager.create_cv_strategy(X_train, y_train) |
|
|
|
|
|
|
|
|
grid_search = GridSearchCV( |
|
|
pipeline, |
|
|
param_grid, |
|
|
cv=cv_strategy, |
|
|
scoring='f1_weighted', |
|
|
n_jobs=1, |
|
|
verbose=0, |
|
|
return_train_score=True |
|
|
) |
|
|
|
|
|
|
|
|
logger.info(f"Starting hyperparameter tuning for {model_name}...") |
|
|
grid_search.fit(X_train, y_train) |
|
|
|
|
|
|
|
|
logger.info(f"Performing final CV evaluation for {model_name}...") |
|
|
best_cv_results = self.cv_manager.perform_cross_validation( |
|
|
grid_search.best_estimator_, X_train, y_train, cv_strategy |
|
|
) |
|
|
|
|
|
|
|
|
tuning_results = { |
|
|
'best_params': grid_search.best_params_, |
|
|
'best_score': float(grid_search.best_score_), |
|
|
'best_estimator': grid_search.best_estimator_, |
|
|
'cv_folds_used': cv_strategy.n_splits, |
|
|
'cross_validation': best_cv_results, |
|
|
'grid_search_results': { |
|
|
'mean_test_scores': grid_search.cv_results_['mean_test_score'].tolist(), |
|
|
'std_test_scores': grid_search.cv_results_['std_test_score'].tolist(), |
|
|
'mean_train_scores': grid_search.cv_results_.get('mean_train_score', []).tolist() if 'mean_train_score' in grid_search.cv_results_ else [], |
|
|
'params': grid_search.cv_results_['params'] |
|
|
} |
|
|
} |
|
|
|
|
|
logger.info(f"Hyperparameter tuning completed for {model_name}") |
|
|
logger.info(f"Best CV score: {grid_search.best_score_:.4f}") |
|
|
logger.info(f"Best params: {grid_search.best_params_}") |
|
|
|
|
|
if 'test_scores' in best_cv_results and 'f1' in best_cv_results['test_scores']: |
|
|
final_f1 = best_cv_results['test_scores']['f1']['mean'] |
|
|
final_f1_std = best_cv_results['test_scores']['f1']['std'] |
|
|
logger.info(f"Final CV F1: {final_f1:.4f} (±{final_f1_std:.4f})") |
|
|
|
|
|
return grid_search.best_estimator_, tuning_results |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Hyperparameter tuning failed for {model_name}: {str(e)}") |
|
|
|
|
|
try: |
|
|
pipeline.set_params(model=self.models[model_name]['model']) |
|
|
pipeline.fit(X_train, y_train) |
|
|
|
|
|
|
|
|
cv_results = self.cv_manager.perform_cross_validation(pipeline, X_train, y_train) |
|
|
|
|
|
return pipeline, { |
|
|
'error': str(e), |
|
|
'fallback': 'simple_training', |
|
|
'cross_validation': cv_results |
|
|
} |
|
|
except Exception as e2: |
|
|
logger.error(f"Fallback training also failed for {model_name}: {str(e2)}") |
|
|
raise Exception(f"Both hyperparameter tuning and fallback training failed: {str(e)} | {str(e2)}") |
|
|
|
|
|
def train_and_evaluate_models(self, X_train, X_test, y_train, y_test) -> Dict: |
|
|
"""Train and evaluate multiple models with comprehensive CV""" |
|
|
|
|
|
results = {} |
|
|
|
|
|
for model_name in self.models.keys(): |
|
|
logger.info(f"Training {model_name} with cross-validation...") |
|
|
|
|
|
try: |
|
|
|
|
|
pipeline = self.create_preprocessing_pipeline() |
|
|
|
|
|
|
|
|
best_model, tuning_results = self.hyperparameter_tuning_with_cv( |
|
|
pipeline, X_train, y_train, model_name |
|
|
) |
|
|
|
|
|
|
|
|
evaluation_metrics = self.comprehensive_evaluation( |
|
|
best_model, X_test, y_test, X_train, y_train |
|
|
) |
|
|
|
|
|
|
|
|
results[model_name] = { |
|
|
'model': best_model, |
|
|
'tuning_results': tuning_results, |
|
|
'evaluation_metrics': evaluation_metrics, |
|
|
'training_time': datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
test_f1 = evaluation_metrics['f1'] |
|
|
cv_results = evaluation_metrics.get('cross_validation', {}) |
|
|
cv_f1_mean = cv_results.get('test_scores', {}).get('f1', {}).get('mean', 'N/A') |
|
|
cv_f1_std = cv_results.get('test_scores', {}).get('f1', {}).get('std', 'N/A') |
|
|
|
|
|
logger.info(f"Model {model_name} - Test F1: {test_f1:.4f}, " |
|
|
f"CV F1: {cv_f1_mean:.4f if cv_f1_mean != 'N/A' else cv_f1_mean} " |
|
|
f"(±{cv_f1_std:.4f if cv_f1_std != 'N/A' else cv_f1_std})") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Training failed for {model_name}: {str(e)}") |
|
|
results[model_name] = {'error': str(e)} |
|
|
|
|
|
return results |
|
|
|
|
|
def select_best_model(self, results: Dict) -> Tuple[str, Any, Dict]: |
|
|
"""Select the best performing model based on CV results""" |
|
|
|
|
|
if self.progress_tracker: |
|
|
self.progress_tracker.update("Selecting best model") |
|
|
|
|
|
best_model_name = None |
|
|
best_model = None |
|
|
best_score = -1 |
|
|
best_metrics = None |
|
|
|
|
|
for model_name, result in results.items(): |
|
|
if 'error' in result: |
|
|
continue |
|
|
|
|
|
|
|
|
cv_results = result['evaluation_metrics'].get('cross_validation', {}) |
|
|
if 'test_scores' in cv_results and 'f1' in cv_results['test_scores']: |
|
|
f1_score = cv_results['test_scores']['f1']['mean'] |
|
|
score_type = "CV F1" |
|
|
else: |
|
|
f1_score = result['evaluation_metrics']['f1'] |
|
|
score_type = "Test F1" |
|
|
|
|
|
if f1_score > best_score: |
|
|
best_score = f1_score |
|
|
best_model_name = model_name |
|
|
best_model = result['model'] |
|
|
best_metrics = result['evaluation_metrics'] |
|
|
|
|
|
if best_model_name is None: |
|
|
raise ValueError("No models trained successfully") |
|
|
|
|
|
logger.info(f"Best model: {best_model_name} with {score_type} score: {best_score:.4f}") |
|
|
return best_model_name, best_model, best_metrics |
|
|
|
|
|
def save_model_artifacts(self, model, model_name: str, metrics: Dict, results: Dict) -> bool: |
|
|
"""Save model artifacts and enhanced metadata with CV results""" |
|
|
try: |
|
|
if self.progress_tracker: |
|
|
self.progress_tracker.update("Saving model") |
|
|
|
|
|
|
|
|
try: |
|
|
joblib.dump(model, self.pipeline_path) |
|
|
logger.info(f"✅ Saved pipeline to {self.pipeline_path}") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to save pipeline: {e}") |
|
|
|
|
|
alt_pipeline_path = Path("/tmp") / "pipeline.pkl" |
|
|
joblib.dump(model, alt_pipeline_path) |
|
|
logger.info(f"✅ Saved pipeline to {alt_pipeline_path}") |
|
|
|
|
|
|
|
|
try: |
|
|
if hasattr(model, 'named_steps') and 'model' in model.named_steps: |
|
|
joblib.dump(model.named_steps['model'], self.model_path) |
|
|
logger.info(f"✅ Saved model to {self.model_path}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not save model component: {e}") |
|
|
|
|
|
try: |
|
|
if hasattr(model, 'named_steps') and 'vectorize' in model.named_steps: |
|
|
joblib.dump(model.named_steps['vectorize'], self.vectorizer_path) |
|
|
logger.info(f"✅ Saved vectorizer to {self.vectorizer_path}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not save vectorizer component: {e}") |
|
|
|
|
|
|
|
|
data_hash = hashlib.md5(str(datetime.now()).encode()).hexdigest() |
|
|
|
|
|
|
|
|
cv_results = metrics.get('cross_validation', {}) |
|
|
|
|
|
|
|
|
metadata = { |
|
|
'model_version': f"v1.0_{datetime.now().strftime('%Y%m%d_%H%M%S')}", |
|
|
'model_type': model_name, |
|
|
'data_version': data_hash, |
|
|
'test_accuracy': metrics['accuracy'], |
|
|
'test_f1': metrics['f1'], |
|
|
'test_precision': metrics['precision'], |
|
|
'test_recall': metrics['recall'], |
|
|
'test_roc_auc': metrics['roc_auc'], |
|
|
'overfitting_score': metrics.get('overfitting_score', 'Unknown'), |
|
|
'timestamp': datetime.now().isoformat(), |
|
|
'training_config': { |
|
|
'test_size': self.test_size, |
|
|
'cv_folds': self.cv_folds, |
|
|
'max_features': self.max_features, |
|
|
'ngram_range': self.ngram_range, |
|
|
'feature_selection_k': self.feature_selection_k |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if cv_results and 'test_scores' in cv_results: |
|
|
metadata['cross_validation'] = { |
|
|
'n_splits': cv_results.get('n_splits', self.cv_folds), |
|
|
'test_scores': cv_results['test_scores'], |
|
|
'train_scores': cv_results.get('train_scores', {}), |
|
|
'overfitting_score': cv_results.get('overfitting_score', 'Unknown'), |
|
|
'stability_score': cv_results.get('stability_score', 'Unknown'), |
|
|
'individual_fold_results': cv_results.get('fold_results', []) |
|
|
} |
|
|
|
|
|
|
|
|
if 'f1' in cv_results['test_scores']: |
|
|
metadata['cv_f1_mean'] = cv_results['test_scores']['f1']['mean'] |
|
|
metadata['cv_f1_std'] = cv_results['test_scores']['f1']['std'] |
|
|
metadata['cv_f1_min'] = cv_results['test_scores']['f1']['min'] |
|
|
metadata['cv_f1_max'] = cv_results['test_scores']['f1']['max'] |
|
|
|
|
|
if 'accuracy' in cv_results['test_scores']: |
|
|
metadata['cv_accuracy_mean'] = cv_results['test_scores']['accuracy']['mean'] |
|
|
metadata['cv_accuracy_std'] = cv_results['test_scores']['accuracy']['std'] |
|
|
|
|
|
|
|
|
if len(results) > 1: |
|
|
model_comparison = {} |
|
|
for other_model_name, other_result in results.items(): |
|
|
if other_model_name != model_name and 'error' not in other_result: |
|
|
other_cv = other_result['evaluation_metrics'].get('cross_validation', {}) |
|
|
if cv_results and other_cv: |
|
|
comparison = self.cv_manager.compare_cv_results(cv_results, other_cv) |
|
|
model_comparison[other_model_name] = comparison |
|
|
|
|
|
if model_comparison: |
|
|
metadata['model_comparison'] = model_comparison |
|
|
|
|
|
|
|
|
try: |
|
|
with open(self.metadata_path, 'w') as f: |
|
|
json.dump(metadata, f, indent=2) |
|
|
logger.info(f"✅ Saved enhanced metadata to {self.metadata_path}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not save metadata: {e}") |
|
|
|
|
|
logger.info(f"✅ Model artifacts saved successfully with CV results") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to save model artifacts: {str(e)}") |
|
|
|
|
|
try: |
|
|
joblib.dump(model, Path("/tmp/pipeline_backup.pkl")) |
|
|
logger.info("✅ Saved backup pipeline") |
|
|
return True |
|
|
except Exception as e2: |
|
|
logger.error(f"Failed to save backup pipeline: {str(e2)}") |
|
|
return False |
|
|
|
|
|
def train_model(self, data_path: str = None) -> Tuple[bool, str]: |
|
|
"""Main training function with comprehensive CV pipeline""" |
|
|
try: |
|
|
logger.info("Starting enhanced model training with cross-validation...") |
|
|
|
|
|
|
|
|
if data_path: |
|
|
self.data_path = Path(data_path) |
|
|
|
|
|
|
|
|
success, df, message = self.load_and_validate_data() |
|
|
if not success: |
|
|
return False, message |
|
|
|
|
|
|
|
|
time_estimate = estimate_training_time( |
|
|
len(df), |
|
|
enable_tuning=True, |
|
|
cv_folds=self.cv_folds |
|
|
) |
|
|
|
|
|
print(f"\n📊 Enhanced Training Configuration:") |
|
|
print(f"Dataset size: {len(df)} samples") |
|
|
print(f"Cross-validation folds: {self.cv_folds}") |
|
|
print(f"Estimated time: {time_estimate['total_formatted']}") |
|
|
print(f"Models to train: {len(self.models)}") |
|
|
print(f"Hyperparameter tuning: Enabled") |
|
|
print() |
|
|
|
|
|
|
|
|
total_steps = 4 + (len(self.models) * 3) + 1 |
|
|
self.progress_tracker = ProgressTracker(total_steps, "CV Training Progress") |
|
|
|
|
|
|
|
|
X = df['text'].values |
|
|
y = df['label'].values |
|
|
|
|
|
|
|
|
self.progress_tracker.update("Splitting data") |
|
|
|
|
|
|
|
|
if len(X) < 10: |
|
|
test_size = max(0.1, 1/len(X)) |
|
|
else: |
|
|
test_size = self.test_size |
|
|
|
|
|
|
|
|
label_counts = pd.Series(y).value_counts() |
|
|
min_class_count = label_counts.min() |
|
|
can_stratify = min_class_count >= 2 and len(y) >= 4 |
|
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split( |
|
|
X, y, |
|
|
test_size=test_size, |
|
|
stratify=y if can_stratify else None, |
|
|
random_state=self.random_state |
|
|
) |
|
|
|
|
|
logger.info(f"Data split: {len(X_train)} train, {len(X_test)} test") |
|
|
|
|
|
|
|
|
if len(X_train) < 3: |
|
|
logger.warning(f"Very small training set: {len(X_train)} samples. CV results may be unreliable.") |
|
|
if len(X_test) < 1: |
|
|
return False, "Cannot create test set. Dataset too small." |
|
|
|
|
|
|
|
|
results = self.train_and_evaluate_models(X_train, X_test, y_train, y_test) |
|
|
|
|
|
|
|
|
best_model_name, best_model, best_metrics = self.select_best_model(results) |
|
|
|
|
|
|
|
|
if not self.save_model_artifacts(best_model, best_model_name, best_metrics, results): |
|
|
return False, "Failed to save model artifacts" |
|
|
|
|
|
|
|
|
self.progress_tracker.finish() |
|
|
|
|
|
|
|
|
cv_results = best_metrics.get('cross_validation', {}) |
|
|
cv_info = "" |
|
|
if 'test_scores' in cv_results and 'f1' in cv_results['test_scores']: |
|
|
cv_f1_mean = cv_results['test_scores']['f1']['mean'] |
|
|
cv_f1_std = cv_results['test_scores']['f1']['std'] |
|
|
cv_info = f", CV F1: {cv_f1_mean:.4f} (±{cv_f1_std:.4f})" |
|
|
|
|
|
success_message = ( |
|
|
f"Enhanced model training completed successfully. " |
|
|
f"Best model: {best_model_name} " |
|
|
f"(Test F1: {best_metrics['f1']:.4f}, Test Accuracy: {best_metrics['accuracy']:.4f}{cv_info})" |
|
|
) |
|
|
|
|
|
logger.info(success_message) |
|
|
return True, success_message |
|
|
|
|
|
except Exception as e: |
|
|
if self.progress_tracker: |
|
|
print() |
|
|
error_message = f"Enhanced model training failed: {str(e)}" |
|
|
logger.error(error_message) |
|
|
return False, error_message |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main execution function with enhanced CV support""" |
|
|
import argparse |
|
|
|
|
|
|
|
|
parser = argparse.ArgumentParser(description='Train fake news detection model with cross-validation') |
|
|
parser.add_argument('--data_path', type=str, help='Path to training data CSV file') |
|
|
parser.add_argument('--config_path', type=str, help='Path to training configuration JSON file') |
|
|
parser.add_argument('--cv_folds', type=int, default=5, help='Number of cross-validation folds') |
|
|
args = parser.parse_args() |
|
|
|
|
|
trainer = RobustModelTrainer() |
|
|
|
|
|
|
|
|
if args.cv_folds: |
|
|
trainer.cv_folds = args.cv_folds |
|
|
trainer.cv_manager.cv_folds = args.cv_folds |
|
|
|
|
|
|
|
|
if args.config_path and Path(args.config_path).exists(): |
|
|
try: |
|
|
with open(args.config_path, 'r') as f: |
|
|
config = json.load(f) |
|
|
|
|
|
|
|
|
trainer.test_size = config.get('test_size', trainer.test_size) |
|
|
trainer.cv_folds = config.get('cv_folds', trainer.cv_folds) |
|
|
trainer.cv_manager.cv_folds = trainer.cv_folds |
|
|
trainer.max_features = config.get('max_features', trainer.max_features) |
|
|
trainer.ngram_range = tuple(config.get('ngram_range', trainer.ngram_range)) |
|
|
|
|
|
|
|
|
selected_models = config.get('selected_models') |
|
|
if selected_models and len(selected_models) < len(trainer.models): |
|
|
all_models = trainer.models.copy() |
|
|
trainer.models = {k: v for k, v in all_models.items() if k in selected_models} |
|
|
|
|
|
|
|
|
trainer.feature_selection_k = min(trainer.feature_selection_k, trainer.max_features) |
|
|
|
|
|
logger.info(f"Applied custom configuration with {trainer.cv_folds} CV folds: {config}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to load configuration: {e}, using defaults") |
|
|
|
|
|
success, message = trainer.train_model(data_path=args.data_path) |
|
|
|
|
|
if success: |
|
|
print(f"✅ {message}") |
|
|
else: |
|
|
print(f"❌ {message}") |
|
|
exit(1) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |