- Implement Claude Haiku integration for content analysis - Create structured JSON output with summaries and metadata - Add markdown consolidation with deduplication - Process 447 YouTube videos and 431 podcast episodes - Generate clean classified files for Claude Desktop projects - Include comprehensive documentation and usage examples - Cost-effective processing at ~.30 for 878 items - Optimize rate limiting for 80,000 tokens/minute API limit 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			251 lines
		
	
	
		
			No EOL
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			251 lines
		
	
	
		
			No EOL
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | |
| """
 | |
| Consolidate HVAC Know It All markdown files with deduplication.
 | |
| 
 | |
| Creates 5 clean consolidated files:
 | |
| - blog.md (WordPress content)
 | |
| - podcast.md (all podcast episodes)  
 | |
| - youtube.md (all YouTube videos)
 | |
| - instagram.md (all Instagram posts)
 | |
| - mailchimp.md (all MailChimp content)
 | |
| 
 | |
| Deduplicates by keeping the most recent version of each content item.
 | |
| """
 | |
| 
 | |
| import sys
 | |
| from pathlib import Path
 | |
| from collections import defaultdict, OrderedDict
 | |
| from datetime import datetime
 | |
| import re
 | |
| 
 | |
| # Add src to path
 | |
| sys.path.insert(0, str(Path(__file__).parent))
 | |
| from src.content_analysis.content_parser import ContentParser
 | |
| 
 | |
| 
 | |
| class MarkdownConsolidator:
 | |
|     """Consolidates markdown files with deduplication."""
 | |
|     
 | |
|     def __init__(self, data_dir: Path, output_dir: Path):
 | |
|         self.data_dir = data_dir
 | |
|         self.output_dir = output_dir
 | |
|         self.parser = ContentParser()
 | |
|         
 | |
|         # Source mapping
 | |
|         self.source_patterns = {
 | |
|             'blog': ['wordpress'],
 | |
|             'podcast': ['podcast', 'Podcast'],
 | |
|             'youtube': ['youtube', 'YouTube', 'Youtube'], 
 | |
|             'instagram': ['instagram', 'Instagram'],
 | |
|             'mailchimp': ['mailchimp', 'MailChimp']
 | |
|         }
 | |
|     
 | |
|     def find_files_for_source(self, source: str) -> list[Path]:
 | |
|         """Find all markdown files for a given source."""
 | |
|         patterns = self.source_patterns[source]
 | |
|         files = []
 | |
|         
 | |
|         # Search both current and archived directories
 | |
|         search_paths = [
 | |
|             self.data_dir / "markdown_current",
 | |
|             self.data_dir / "markdown_archives"
 | |
|         ]
 | |
|         
 | |
|         for search_path in search_paths:
 | |
|             if search_path.exists():
 | |
|                 for pattern in patterns:
 | |
|                     files.extend(search_path.rglob(f"hkia_{pattern}_*.md"))
 | |
|                     files.extend(search_path.rglob(f"hkia_{pattern.lower()}_*.md"))
 | |
|         
 | |
|         return sorted(files)
 | |
|     
 | |
|     def extract_file_timestamp(self, file_path: Path) -> datetime:
 | |
|         """Extract timestamp from filename for version comparison."""
 | |
|         filename = file_path.name
 | |
|         
 | |
|         # Try different timestamp patterns
 | |
|         patterns = [
 | |
|             r'(\d{4}-\d{2}-\d{2}T\d{6})',  # 2025-08-27T144143
 | |
|             r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})',  # 2025-08-27T14:41:43
 | |
|             r'(\d{8}_\d{6})'  # 20250827_144143
 | |
|         ]
 | |
|         
 | |
|         for pattern in patterns:
 | |
|             match = re.search(pattern, filename)
 | |
|             if match:
 | |
|                 timestamp_str = match.group(1)
 | |
|                 try:
 | |
|                     if 'T' in timestamp_str and ':' in timestamp_str:
 | |
|                         return datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%S')
 | |
|                     elif 'T' in timestamp_str:
 | |
|                         return datetime.strptime(timestamp_str, '%Y-%m-%dT%H%M%S')
 | |
|                     else:
 | |
|                         return datetime.strptime(timestamp_str, '%Y%m%d_%H%M%S')
 | |
|                 except ValueError:
 | |
|                     continue
 | |
|         
 | |
|         # Fallback to file modification time
 | |
|         return datetime.fromtimestamp(file_path.stat().st_mtime)
 | |
|     
 | |
|     def consolidate_source(self, source: str) -> dict:
 | |
|         """Consolidate all files for a source, keeping most recent versions."""
 | |
|         files = self.find_files_for_source(source)
 | |
|         if not files:
 | |
|             print(f"⚠️  No files found for {source}")
 | |
|             return {'items': [], 'stats': {'files': 0, 'total_items': 0, 'unique_items': 0}}
 | |
|         
 | |
|         print(f"📁 Processing {source}: {len(files)} files")
 | |
|         
 | |
|         # Track all items with their timestamps
 | |
|         all_items = {}  # id -> (item, timestamp, file)
 | |
|         
 | |
|         for file_path in files:
 | |
|             try:
 | |
|                 file_timestamp = self.extract_file_timestamp(file_path)
 | |
|                 items = self.parser.parse_markdown_file(file_path)
 | |
|                 
 | |
|                 print(f"   {file_path.name}: {len(items)} items ({file_timestamp})")
 | |
|                 
 | |
|                 for item in items:
 | |
|                     item_id = item.id
 | |
|                     if not item_id:
 | |
|                         continue
 | |
|                     
 | |
|                     # Keep the most recent version
 | |
|                     if item_id not in all_items or file_timestamp > all_items[item_id][1]:
 | |
|                         all_items[item_id] = (item, file_timestamp, file_path.name)
 | |
|                 
 | |
|             except Exception as e:
 | |
|                 print(f"   ❌ Error parsing {file_path.name}: {e}")
 | |
|                 continue
 | |
|         
 | |
|         # Sort by timestamp for consistent output
 | |
|         unique_items = []
 | |
|         for item_id, (item, timestamp, filename) in all_items.items():
 | |
|             unique_items.append((item, timestamp))
 | |
|         
 | |
|         unique_items.sort(key=lambda x: x[1], reverse=True)  # Most recent first
 | |
|         final_items = [item for item, timestamp in unique_items]
 | |
|         
 | |
|         stats = {
 | |
|             'files': len(files),
 | |
|             'total_items': sum(len(self.parser.parse_markdown_file(f)) for f in files),
 | |
|             'unique_items': len(final_items)
 | |
|         }
 | |
|         
 | |
|         print(f"   ✅ {source}: {stats['unique_items']} unique items (from {stats['total_items']} total)")
 | |
|         
 | |
|         return {'items': final_items, 'stats': stats}
 | |
|     
 | |
|     def write_consolidated_file(self, source: str, items: list, output_file: Path):
 | |
|         """Write consolidated markdown file."""
 | |
|         with open(output_file, 'w', encoding='utf-8') as f:
 | |
|             f.write(f"# HVAC Know It All - {source.title()} Content\n")
 | |
|             f.write(f"# Consolidated from all historical data\n")
 | |
|             f.write(f"# Generated: {datetime.now().isoformat()}\n")
 | |
|             f.write(f"# Total items: {len(items)}\n\n")
 | |
|             
 | |
|             for item in items:
 | |
|                 # Write item in markdown format
 | |
|                 f.write(f"# ID: {item.id}\n\n")
 | |
|                 
 | |
|                 if item.title:
 | |
|                     f.write(f"## Title: {item.title}\n\n")
 | |
|                 
 | |
|                 if item.content_type:
 | |
|                     f.write(f"## Type: {item.content_type}\n\n")
 | |
|                 
 | |
|                 if item.author:
 | |
|                     f.write(f"## Author: {item.author}\n\n")
 | |
|                 
 | |
|                 if item.url:
 | |
|                     f.write(f"## Link: {item.url}\n\n")
 | |
|                 
 | |
|                 if hasattr(item, 'published_date') and item.published_date:
 | |
|                     f.write(f"## Publish Date: {item.published_date}\n\n")
 | |
|                 
 | |
|                 if hasattr(item, 'upload_date') and item.upload_date:
 | |
|                     f.write(f"## Upload Date: {item.upload_date}\n\n")
 | |
|                 
 | |
|                 # Add source-specific metadata
 | |
|                 if hasattr(item, 'duration') and item.duration:
 | |
|                     f.write(f"## Duration: {item.duration}\n\n")
 | |
|                 
 | |
|                 if hasattr(item, 'views') and item.views:
 | |
|                     f.write(f"## Views: {item.views}\n\n")
 | |
|                 
 | |
|                 if hasattr(item, 'likes') and item.likes:
 | |
|                     f.write(f"## Likes: {item.likes}\n\n")
 | |
|                 
 | |
|                 if hasattr(item, 'comments') and item.comments:
 | |
|                     f.write(f"## Comments: {item.comments}\n\n")
 | |
|                 
 | |
|                 if hasattr(item, 'engagement_rate') and item.engagement_rate:
 | |
|                     f.write(f"## Engagement Rate: {item.engagement_rate}\n\n")
 | |
|                 
 | |
|                 if hasattr(item, 'thumbnail') and item.thumbnail:
 | |
|                     f.write(f"## Thumbnail: {item.thumbnail}\n\n")
 | |
|                 
 | |
|                 if hasattr(item, 'image') and item.image:
 | |
|                     f.write(f"## Image: {item.image}\n\n")
 | |
|                 
 | |
|                 # Description/content
 | |
|                 if item.description:
 | |
|                     f.write(f"## Description:\n{item.description}\n\n")
 | |
|                 
 | |
|                 # Categories/tags
 | |
|                 if item.categories:
 | |
|                     f.write(f"## Categories: {', '.join(item.categories)}\n\n")
 | |
|                 
 | |
|                 f.write("-" * 50 + "\n\n")
 | |
|     
 | |
|     def consolidate_all(self):
 | |
|         """Consolidate all sources."""
 | |
|         self.output_dir.mkdir(parents=True, exist_ok=True)
 | |
|         
 | |
|         print("🔄 HVAC Know It All Markdown Consolidation")
 | |
|         print("=" * 50)
 | |
|         
 | |
|         results = {}
 | |
|         
 | |
|         for source in self.source_patterns.keys():
 | |
|             result = self.consolidate_source(source)
 | |
|             results[source] = result
 | |
|             
 | |
|             if result['items']:
 | |
|                 output_file = self.output_dir / f"hkia_{source}_consolidated.md"
 | |
|                 self.write_consolidated_file(source, result['items'], output_file)
 | |
|                 print(f"   📝 Saved to {output_file}")
 | |
|             else:
 | |
|                 print(f"   ⚠️  No content to save for {source}")
 | |
|             
 | |
|             print()
 | |
|         
 | |
|         # Summary
 | |
|         print("📊 CONSOLIDATION SUMMARY")
 | |
|         print("=" * 50)
 | |
|         total_unique = 0
 | |
|         for source, result in results.items():
 | |
|             stats = result['stats']
 | |
|             print(f"{source:12}: {stats['unique_items']:4} unique items (from {stats['total_items']:4} total, {stats['files']:2} files)")
 | |
|             total_unique += stats['unique_items']
 | |
|         
 | |
|         print(f"{'TOTAL':12}: {total_unique:4} unique items")
 | |
|         print(f"\n✅ Consolidated files saved to {self.output_dir}")
 | |
|         
 | |
|         return results
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     """Main consolidation function."""
 | |
|     data_dir = Path('data')
 | |
|     output_dir = Path('data/consolidated')
 | |
|     
 | |
|     consolidator = MarkdownConsolidator(data_dir, output_dir)
 | |
|     results = consolidator.consolidate_all()
 | |
|     
 | |
|     return results
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main() |