hvac-kia-content/classify_youtube_podcast_only.py
Ben Reed fc3af8e19f feat: Add AI-powered content classification system
- Implement Claude Haiku integration for content analysis
- Create structured JSON output with summaries and metadata
- Add markdown consolidation with deduplication
- Process 447 YouTube videos and 431 podcast episodes
- Generate clean classified files for Claude Desktop projects
- Include comprehensive documentation and usage examples
- Cost-effective processing at ~.30 for 878 items
- Optimize rate limiting for 80,000 tokens/minute API limit

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-03 19:33:32 -03:00

86 lines
No EOL
3.2 KiB
Python

#!/usr/bin/env python3
"""
Classify ONLY YouTube and Podcast content with conservative rate limiting.
"""
import asyncio
import sys
from pathlib import Path
import json
from datetime import datetime
import logging
# Add src to path
sys.path.insert(0, str(Path(__file__).parent))
from src.content_analysis.content_classifier import ContentClassifier
from src.content_analysis.content_parser import ContentParser
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
async def classify_source(source_name: str, file_path: str, classifier: ContentClassifier, parser: ContentParser):
"""Classify a single source with conservative rate limiting."""
logger.info(f"Starting {source_name} classification...")
# Parse markdown file
items = parser.parse_markdown_file(Path(file_path))
logger.info(f"Found {len(items)} {source_name} items")
if not items:
logger.warning(f"No items found in {file_path}")
return
# Classify with batch size 1 for maximum rate limit control
classified_items = await classifier.classify_content_batch(items, batch_size=1)
# Save results
output_file = f'data/clean_classified/{source_name.lower()}.json'
result = {
'source_file': file_path,
'processed_at': datetime.now().isoformat(),
'total_items': len(classified_items),
'source_name': source_name,
'classified_content': [item.to_dict() for item in classified_items]
}
# Ensure output directory exists
Path(output_file).parent.mkdir(parents=True, exist_ok=True)
with open(output_file, 'w') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
logger.info(f"✅ Successfully classified and saved {len(classified_items)} {source_name} items to {output_file}")
async def main():
"""Main classification function for YouTube and Podcast only."""
logger.info("🚀 Starting YouTube and Podcast classification with conservative rate limiting")
# Initialize classifier and parser
classifier = ContentClassifier() # Uses ANTHROPIC_API_KEY from environment
parser = ContentParser()
# Define sources to process (ONLY YouTube and Podcast as requested)
sources = [
('YouTube', 'data/consolidated/hkia_youtube_consolidated.md'),
('Podcast', 'data/consolidated/hkia_podcast_consolidated.md')
]
# Process each source sequentially to avoid rate limit conflicts
for source_name, file_path in sources:
try:
await classify_source(source_name, file_path, classifier, parser)
logger.info(f"✅ Completed {source_name} classification")
# Brief delay between sources
if source_name != sources[-1][0]: # Not the last source
logger.info("⏳ Waiting 10 seconds before next source...")
await asyncio.sleep(10)
except Exception as e:
logger.error(f"❌ Error processing {source_name}: {e}")
logger.info("🎉 All YouTube and Podcast classification completed!")
if __name__ == "__main__":
asyncio.run(main())