#!/usr/bin/env python3 """ Unified cookie management system for YouTube authentication Based on compendium project's successful implementation """ import os import time import fcntl import shutil from pathlib import Path from typing import Optional, List, Dict, Any from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) class CookieManager: """Unified cookie discovery and validation system""" def __init__(self): self.priority_paths = self._get_priority_paths() self.max_age_days = 90 self.min_size = 50 self.max_size = 50 * 1024 * 1024 # 50MB def _get_priority_paths(self) -> List[Path]: """Get cookie paths in priority order""" paths = [] # 1. Environment variable (highest priority) env_path = os.getenv('YOUTUBE_COOKIES_PATH') if env_path: paths.append(Path(env_path)) # 2. Container paths paths.extend([ Path('/app/youtube_cookies.txt'), Path('/app/cookies.txt'), ]) # 3. NAS production paths nas_base = Path('/mnt/nas/app_data') if nas_base.exists(): paths.extend([ nas_base / 'cookies' / 'youtube_cookies.txt', nas_base / 'cookies' / 'cookies.txt', ]) # 4. Local development paths project_root = Path(__file__).parent.parent paths.extend([ project_root / 'data_production_backlog' / '.cookies' / 'youtube_cookies.txt', project_root / 'data_production_backlog' / '.cookies' / 'cookies.txt', project_root / '.cookies' / 'youtube_cookies.txt', project_root / '.cookies' / 'cookies.txt', ]) return paths def find_valid_cookies(self) -> Optional[Path]: """Find the first valid cookie file in priority order""" for cookie_path in self.priority_paths: if self._validate_cookie_file(cookie_path): logger.info(f"Found valid cookies: {cookie_path}") return cookie_path logger.warning("No valid cookie files found") return None def _validate_cookie_file(self, cookie_path: Path) -> bool: """Validate a cookie file""" try: # Check existence and accessibility if not cookie_path.exists(): return False if not cookie_path.is_file(): return False if not os.access(cookie_path, os.R_OK): logger.warning(f"Cookie file not readable: {cookie_path}") return False # Check file size file_size = cookie_path.stat().st_size if file_size < self.min_size: logger.warning(f"Cookie file too small ({file_size} bytes): {cookie_path}") return False if file_size > self.max_size: logger.warning(f"Cookie file too large ({file_size} bytes): {cookie_path}") return False # Check file age mtime = datetime.fromtimestamp(cookie_path.stat().st_mtime) age = datetime.now() - mtime if age > timedelta(days=self.max_age_days): logger.warning(f"Cookie file too old ({age.days} days): {cookie_path}") return False # Validate Netscape format if not self._validate_netscape_format(cookie_path): return False logger.debug(f"Cookie file validated: {cookie_path} ({file_size} bytes, {age.days} days old)") return True except Exception as e: logger.warning(f"Error validating cookie file {cookie_path}: {e}") return False def _validate_netscape_format(self, cookie_path: Path) -> bool: """Validate cookie file is in proper Netscape format""" try: content = cookie_path.read_text(encoding='utf-8', errors='ignore') lines = content.strip().split('\n') # Should have header if not any('Netscape HTTP Cookie File' in line for line in lines[:5]): logger.warning(f"Missing Netscape header: {cookie_path}") return False # Count valid cookie lines (non-comment, non-empty) cookie_count = 0 for line in lines: line = line.strip() if line and not line.startswith('#'): # Basic tab-separated format check parts = line.split('\t') if len(parts) >= 6: # domain, flag, path, secure, expiration, name, [value] cookie_count += 1 if cookie_count < 3: # Need at least a few cookies logger.warning(f"Too few valid cookies ({cookie_count}): {cookie_path}") return False logger.debug(f"Found {cookie_count} valid cookies in {cookie_path}") return True except Exception as e: logger.warning(f"Error reading cookie file {cookie_path}: {e}") return False def backup_cookies(self, cookie_path: Path) -> Optional[Path]: """Create backup of cookie file""" try: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_path = cookie_path.with_suffix(f'.backup_{timestamp}') shutil.copy2(cookie_path, backup_path) logger.info(f"Backed up cookies to: {backup_path}") return backup_path except Exception as e: logger.error(f"Failed to backup cookies {cookie_path}: {e}") return None def update_cookies(self, new_cookie_path: Path, target_path: Optional[Path] = None) -> bool: """Atomically update cookie file with new cookies""" if target_path is None: target_path = self.find_valid_cookies() if target_path is None: # Use first priority path as default target_path = self.priority_paths[0] target_path.parent.mkdir(parents=True, exist_ok=True) try: # Validate new cookies first if not self._validate_cookie_file(new_cookie_path): logger.error(f"New cookie file failed validation: {new_cookie_path}") return False # Backup existing cookies if target_path.exists(): backup_path = self.backup_cookies(target_path) if backup_path is None: logger.warning("Failed to backup existing cookies, proceeding anyway") # Atomic replacement using file locking temp_path = target_path.with_suffix('.tmp') try: # Copy new cookies to temp file shutil.copy2(new_cookie_path, temp_path) # Lock and replace atomically with open(temp_path, 'r+b') as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) temp_path.replace(target_path) logger.info(f"Successfully updated cookies: {target_path}") return True finally: if temp_path.exists(): temp_path.unlink() except Exception as e: logger.error(f"Failed to update cookies: {e}") return False def get_cookie_stats(self) -> Dict[str, Any]: """Get statistics about available cookie files""" stats = { 'valid_files': [], 'invalid_files': [], 'total_cookies': 0, 'newest_file': None, 'oldest_file': None, } for cookie_path in self.priority_paths: if cookie_path.exists(): if self._validate_cookie_file(cookie_path): file_info = { 'path': str(cookie_path), 'size': cookie_path.stat().st_size, 'mtime': datetime.fromtimestamp(cookie_path.stat().st_mtime), 'cookie_count': self._count_cookies(cookie_path), } stats['valid_files'].append(file_info) stats['total_cookies'] += file_info['cookie_count'] if stats['newest_file'] is None or file_info['mtime'] > stats['newest_file']['mtime']: stats['newest_file'] = file_info if stats['oldest_file'] is None or file_info['mtime'] < stats['oldest_file']['mtime']: stats['oldest_file'] = file_info else: stats['invalid_files'].append(str(cookie_path)) return stats def _count_cookies(self, cookie_path: Path) -> int: """Count valid cookies in file""" try: content = cookie_path.read_text(encoding='utf-8', errors='ignore') lines = content.strip().split('\n') count = 0 for line in lines: line = line.strip() if line and not line.startswith('#'): parts = line.split('\t') if len(parts) >= 6: count += 1 return count except Exception: return 0 def cleanup_old_backups(self, keep_count: int = 5): """Clean up old backup files, keeping only the most recent""" for cookie_path in self.priority_paths: if cookie_path.exists(): backup_pattern = f"{cookie_path.stem}.backup_*" backup_files = list(cookie_path.parent.glob(backup_pattern)) if len(backup_files) > keep_count: # Sort by modification time (newest first) backup_files.sort(key=lambda p: p.stat().st_mtime, reverse=True) # Remove old backups for old_backup in backup_files[keep_count:]: try: old_backup.unlink() logger.debug(f"Removed old backup: {old_backup}") except Exception as e: logger.warning(f"Failed to remove backup {old_backup}: {e}") # Convenience functions def get_youtube_cookies() -> Optional[Path]: """Get valid YouTube cookies file""" manager = CookieManager() return manager.find_valid_cookies() def update_youtube_cookies(new_cookie_path: Path) -> bool: """Update YouTube cookies""" manager = CookieManager() return manager.update_cookies(new_cookie_path) def get_cookie_stats() -> Dict[str, Any]: """Get cookie file statistics""" manager = CookieManager() return manager.get_cookie_stats()