#!/usr/bin/env python3 """ Classify ONLY YouTube and Podcast content with conservative rate limiting. """ import asyncio import sys from pathlib import Path import json from datetime import datetime import logging # Add src to path sys.path.insert(0, str(Path(__file__).parent)) from src.content_analysis.content_classifier import ContentClassifier from src.content_analysis.content_parser import ContentParser # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) async def classify_source(source_name: str, file_path: str, classifier: ContentClassifier, parser: ContentParser): """Classify a single source with conservative rate limiting.""" logger.info(f"Starting {source_name} classification...") # Parse markdown file items = parser.parse_markdown_file(Path(file_path)) logger.info(f"Found {len(items)} {source_name} items") if not items: logger.warning(f"No items found in {file_path}") return # Classify with batch size 1 for maximum rate limit control classified_items = await classifier.classify_content_batch(items, batch_size=1) # Save results output_file = f'data/clean_classified/{source_name.lower()}.json' result = { 'source_file': file_path, 'processed_at': datetime.now().isoformat(), 'total_items': len(classified_items), 'source_name': source_name, 'classified_content': [item.to_dict() for item in classified_items] } # Ensure output directory exists Path(output_file).parent.mkdir(parents=True, exist_ok=True) with open(output_file, 'w') as f: json.dump(result, f, indent=2, ensure_ascii=False) logger.info(f"✅ Successfully classified and saved {len(classified_items)} {source_name} items to {output_file}") async def main(): """Main classification function for YouTube and Podcast only.""" logger.info("🚀 Starting YouTube and Podcast classification with conservative rate limiting") # Initialize classifier and parser classifier = ContentClassifier() # Uses ANTHROPIC_API_KEY from environment parser = ContentParser() # Define sources to process (ONLY YouTube and Podcast as requested) sources = [ ('YouTube', 'data/consolidated/hkia_youtube_consolidated.md'), ('Podcast', 'data/consolidated/hkia_podcast_consolidated.md') ] # Process each source sequentially to avoid rate limit conflicts for source_name, file_path in sources: try: await classify_source(source_name, file_path, classifier, parser) logger.info(f"✅ Completed {source_name} classification") # Brief delay between sources if source_name != sources[-1][0]: # Not the last source logger.info("⏳ Waiting 10 seconds before next source...") await asyncio.sleep(10) except Exception as e: logger.error(f"❌ Error processing {source_name}: {e}") logger.info("🎉 All YouTube and Podcast classification completed!") if __name__ == "__main__": asyncio.run(main())