- Implement Claude Haiku integration for content analysis - Create structured JSON output with summaries and metadata - Add markdown consolidation with deduplication - Process 447 YouTube videos and 431 podcast episodes - Generate clean classified files for Claude Desktop projects - Include comprehensive documentation and usage examples - Cost-effective processing at ~.30 for 878 items - Optimize rate limiting for 80,000 tokens/minute API limit 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
86 lines
No EOL
3.2 KiB
Python
86 lines
No EOL
3.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Classify ONLY YouTube and Podcast content with conservative rate limiting.
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
from pathlib import Path
|
|
import json
|
|
from datetime import datetime
|
|
import logging
|
|
|
|
# Add src to path
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from src.content_analysis.content_classifier import ContentClassifier
|
|
from src.content_analysis.content_parser import ContentParser
|
|
|
|
# Set up logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
async def classify_source(source_name: str, file_path: str, classifier: ContentClassifier, parser: ContentParser):
|
|
"""Classify a single source with conservative rate limiting."""
|
|
logger.info(f"Starting {source_name} classification...")
|
|
|
|
# Parse markdown file
|
|
items = parser.parse_markdown_file(Path(file_path))
|
|
logger.info(f"Found {len(items)} {source_name} items")
|
|
|
|
if not items:
|
|
logger.warning(f"No items found in {file_path}")
|
|
return
|
|
|
|
# Classify with batch size 1 for maximum rate limit control
|
|
classified_items = await classifier.classify_content_batch(items, batch_size=1)
|
|
|
|
# Save results
|
|
output_file = f'data/clean_classified/{source_name.lower()}.json'
|
|
result = {
|
|
'source_file': file_path,
|
|
'processed_at': datetime.now().isoformat(),
|
|
'total_items': len(classified_items),
|
|
'source_name': source_name,
|
|
'classified_content': [item.to_dict() for item in classified_items]
|
|
}
|
|
|
|
# Ensure output directory exists
|
|
Path(output_file).parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(output_file, 'w') as f:
|
|
json.dump(result, f, indent=2, ensure_ascii=False)
|
|
|
|
logger.info(f"✅ Successfully classified and saved {len(classified_items)} {source_name} items to {output_file}")
|
|
|
|
async def main():
|
|
"""Main classification function for YouTube and Podcast only."""
|
|
logger.info("🚀 Starting YouTube and Podcast classification with conservative rate limiting")
|
|
|
|
# Initialize classifier and parser
|
|
classifier = ContentClassifier() # Uses ANTHROPIC_API_KEY from environment
|
|
parser = ContentParser()
|
|
|
|
# Define sources to process (ONLY YouTube and Podcast as requested)
|
|
sources = [
|
|
('YouTube', 'data/consolidated/hkia_youtube_consolidated.md'),
|
|
('Podcast', 'data/consolidated/hkia_podcast_consolidated.md')
|
|
]
|
|
|
|
# Process each source sequentially to avoid rate limit conflicts
|
|
for source_name, file_path in sources:
|
|
try:
|
|
await classify_source(source_name, file_path, classifier, parser)
|
|
logger.info(f"✅ Completed {source_name} classification")
|
|
|
|
# Brief delay between sources
|
|
if source_name != sources[-1][0]: # Not the last source
|
|
logger.info("⏳ Waiting 10 seconds before next source...")
|
|
await asyncio.sleep(10)
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error processing {source_name}: {e}")
|
|
|
|
logger.info("🎉 All YouTube and Podcast classification completed!")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |