hvac-kia-content/consolidate_markdown_sources.py
Ben Reed fc3af8e19f feat: Add AI-powered content classification system
- Implement Claude Haiku integration for content analysis
- Create structured JSON output with summaries and metadata
- Add markdown consolidation with deduplication
- Process 447 YouTube videos and 431 podcast episodes
- Generate clean classified files for Claude Desktop projects
- Include comprehensive documentation and usage examples
- Cost-effective processing at ~.30 for 878 items
- Optimize rate limiting for 80,000 tokens/minute API limit

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-03 19:33:32 -03:00

251 lines
No EOL
9.5 KiB
Python

#!/usr/bin/env python3
"""
Consolidate HVAC Know It All markdown files with deduplication.
Creates 5 clean consolidated files:
- blog.md (WordPress content)
- podcast.md (all podcast episodes)
- youtube.md (all YouTube videos)
- instagram.md (all Instagram posts)
- mailchimp.md (all MailChimp content)
Deduplicates by keeping the most recent version of each content item.
"""
import sys
from pathlib import Path
from collections import defaultdict, OrderedDict
from datetime import datetime
import re
# Add src to path
sys.path.insert(0, str(Path(__file__).parent))
from src.content_analysis.content_parser import ContentParser
class MarkdownConsolidator:
"""Consolidates markdown files with deduplication."""
def __init__(self, data_dir: Path, output_dir: Path):
self.data_dir = data_dir
self.output_dir = output_dir
self.parser = ContentParser()
# Source mapping
self.source_patterns = {
'blog': ['wordpress'],
'podcast': ['podcast', 'Podcast'],
'youtube': ['youtube', 'YouTube', 'Youtube'],
'instagram': ['instagram', 'Instagram'],
'mailchimp': ['mailchimp', 'MailChimp']
}
def find_files_for_source(self, source: str) -> list[Path]:
"""Find all markdown files for a given source."""
patterns = self.source_patterns[source]
files = []
# Search both current and archived directories
search_paths = [
self.data_dir / "markdown_current",
self.data_dir / "markdown_archives"
]
for search_path in search_paths:
if search_path.exists():
for pattern in patterns:
files.extend(search_path.rglob(f"hkia_{pattern}_*.md"))
files.extend(search_path.rglob(f"hkia_{pattern.lower()}_*.md"))
return sorted(files)
def extract_file_timestamp(self, file_path: Path) -> datetime:
"""Extract timestamp from filename for version comparison."""
filename = file_path.name
# Try different timestamp patterns
patterns = [
r'(\d{4}-\d{2}-\d{2}T\d{6})', # 2025-08-27T144143
r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})', # 2025-08-27T14:41:43
r'(\d{8}_\d{6})' # 20250827_144143
]
for pattern in patterns:
match = re.search(pattern, filename)
if match:
timestamp_str = match.group(1)
try:
if 'T' in timestamp_str and ':' in timestamp_str:
return datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%S')
elif 'T' in timestamp_str:
return datetime.strptime(timestamp_str, '%Y-%m-%dT%H%M%S')
else:
return datetime.strptime(timestamp_str, '%Y%m%d_%H%M%S')
except ValueError:
continue
# Fallback to file modification time
return datetime.fromtimestamp(file_path.stat().st_mtime)
def consolidate_source(self, source: str) -> dict:
"""Consolidate all files for a source, keeping most recent versions."""
files = self.find_files_for_source(source)
if not files:
print(f"⚠️ No files found for {source}")
return {'items': [], 'stats': {'files': 0, 'total_items': 0, 'unique_items': 0}}
print(f"📁 Processing {source}: {len(files)} files")
# Track all items with their timestamps
all_items = {} # id -> (item, timestamp, file)
for file_path in files:
try:
file_timestamp = self.extract_file_timestamp(file_path)
items = self.parser.parse_markdown_file(file_path)
print(f" {file_path.name}: {len(items)} items ({file_timestamp})")
for item in items:
item_id = item.id
if not item_id:
continue
# Keep the most recent version
if item_id not in all_items or file_timestamp > all_items[item_id][1]:
all_items[item_id] = (item, file_timestamp, file_path.name)
except Exception as e:
print(f" ❌ Error parsing {file_path.name}: {e}")
continue
# Sort by timestamp for consistent output
unique_items = []
for item_id, (item, timestamp, filename) in all_items.items():
unique_items.append((item, timestamp))
unique_items.sort(key=lambda x: x[1], reverse=True) # Most recent first
final_items = [item for item, timestamp in unique_items]
stats = {
'files': len(files),
'total_items': sum(len(self.parser.parse_markdown_file(f)) for f in files),
'unique_items': len(final_items)
}
print(f"{source}: {stats['unique_items']} unique items (from {stats['total_items']} total)")
return {'items': final_items, 'stats': stats}
def write_consolidated_file(self, source: str, items: list, output_file: Path):
"""Write consolidated markdown file."""
with open(output_file, 'w', encoding='utf-8') as f:
f.write(f"# HVAC Know It All - {source.title()} Content\n")
f.write(f"# Consolidated from all historical data\n")
f.write(f"# Generated: {datetime.now().isoformat()}\n")
f.write(f"# Total items: {len(items)}\n\n")
for item in items:
# Write item in markdown format
f.write(f"# ID: {item.id}\n\n")
if item.title:
f.write(f"## Title: {item.title}\n\n")
if item.content_type:
f.write(f"## Type: {item.content_type}\n\n")
if item.author:
f.write(f"## Author: {item.author}\n\n")
if item.url:
f.write(f"## Link: {item.url}\n\n")
if hasattr(item, 'published_date') and item.published_date:
f.write(f"## Publish Date: {item.published_date}\n\n")
if hasattr(item, 'upload_date') and item.upload_date:
f.write(f"## Upload Date: {item.upload_date}\n\n")
# Add source-specific metadata
if hasattr(item, 'duration') and item.duration:
f.write(f"## Duration: {item.duration}\n\n")
if hasattr(item, 'views') and item.views:
f.write(f"## Views: {item.views}\n\n")
if hasattr(item, 'likes') and item.likes:
f.write(f"## Likes: {item.likes}\n\n")
if hasattr(item, 'comments') and item.comments:
f.write(f"## Comments: {item.comments}\n\n")
if hasattr(item, 'engagement_rate') and item.engagement_rate:
f.write(f"## Engagement Rate: {item.engagement_rate}\n\n")
if hasattr(item, 'thumbnail') and item.thumbnail:
f.write(f"## Thumbnail: {item.thumbnail}\n\n")
if hasattr(item, 'image') and item.image:
f.write(f"## Image: {item.image}\n\n")
# Description/content
if item.description:
f.write(f"## Description:\n{item.description}\n\n")
# Categories/tags
if item.categories:
f.write(f"## Categories: {', '.join(item.categories)}\n\n")
f.write("-" * 50 + "\n\n")
def consolidate_all(self):
"""Consolidate all sources."""
self.output_dir.mkdir(parents=True, exist_ok=True)
print("🔄 HVAC Know It All Markdown Consolidation")
print("=" * 50)
results = {}
for source in self.source_patterns.keys():
result = self.consolidate_source(source)
results[source] = result
if result['items']:
output_file = self.output_dir / f"hkia_{source}_consolidated.md"
self.write_consolidated_file(source, result['items'], output_file)
print(f" 📝 Saved to {output_file}")
else:
print(f" ⚠️ No content to save for {source}")
print()
# Summary
print("📊 CONSOLIDATION SUMMARY")
print("=" * 50)
total_unique = 0
for source, result in results.items():
stats = result['stats']
print(f"{source:12}: {stats['unique_items']:4} unique items (from {stats['total_items']:4} total, {stats['files']:2} files)")
total_unique += stats['unique_items']
print(f"{'TOTAL':12}: {total_unique:4} unique items")
print(f"\n✅ Consolidated files saved to {self.output_dir}")
return results
def main():
"""Main consolidation function."""
data_dir = Path('data')
output_dir = Path('data/consolidated')
consolidator = MarkdownConsolidator(data_dir, output_dir)
results = consolidator.consolidate_all()
return results
if __name__ == "__main__":
main()