#!/usr/bin/env python3 """ Consolidate HVAC Know It All markdown files with deduplication. Creates 5 clean consolidated files: - blog.md (WordPress content) - podcast.md (all podcast episodes) - youtube.md (all YouTube videos) - instagram.md (all Instagram posts) - mailchimp.md (all MailChimp content) Deduplicates by keeping the most recent version of each content item. """ import sys from pathlib import Path from collections import defaultdict, OrderedDict from datetime import datetime import re # Add src to path sys.path.insert(0, str(Path(__file__).parent)) from src.content_analysis.content_parser import ContentParser class MarkdownConsolidator: """Consolidates markdown files with deduplication.""" def __init__(self, data_dir: Path, output_dir: Path): self.data_dir = data_dir self.output_dir = output_dir self.parser = ContentParser() # Source mapping self.source_patterns = { 'blog': ['wordpress'], 'podcast': ['podcast', 'Podcast'], 'youtube': ['youtube', 'YouTube', 'Youtube'], 'instagram': ['instagram', 'Instagram'], 'mailchimp': ['mailchimp', 'MailChimp'] } def find_files_for_source(self, source: str) -> list[Path]: """Find all markdown files for a given source.""" patterns = self.source_patterns[source] files = [] # Search both current and archived directories search_paths = [ self.data_dir / "markdown_current", self.data_dir / "markdown_archives" ] for search_path in search_paths: if search_path.exists(): for pattern in patterns: files.extend(search_path.rglob(f"hkia_{pattern}_*.md")) files.extend(search_path.rglob(f"hkia_{pattern.lower()}_*.md")) return sorted(files) def extract_file_timestamp(self, file_path: Path) -> datetime: """Extract timestamp from filename for version comparison.""" filename = file_path.name # Try different timestamp patterns patterns = [ r'(\d{4}-\d{2}-\d{2}T\d{6})', # 2025-08-27T144143 r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})', # 2025-08-27T14:41:43 r'(\d{8}_\d{6})' # 20250827_144143 ] for pattern in patterns: match = re.search(pattern, filename) if match: timestamp_str = match.group(1) try: if 'T' in timestamp_str and ':' in timestamp_str: return datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%S') elif 'T' in timestamp_str: return datetime.strptime(timestamp_str, '%Y-%m-%dT%H%M%S') else: return datetime.strptime(timestamp_str, '%Y%m%d_%H%M%S') except ValueError: continue # Fallback to file modification time return datetime.fromtimestamp(file_path.stat().st_mtime) def consolidate_source(self, source: str) -> dict: """Consolidate all files for a source, keeping most recent versions.""" files = self.find_files_for_source(source) if not files: print(f"āš ļø No files found for {source}") return {'items': [], 'stats': {'files': 0, 'total_items': 0, 'unique_items': 0}} print(f"šŸ“ Processing {source}: {len(files)} files") # Track all items with their timestamps all_items = {} # id -> (item, timestamp, file) for file_path in files: try: file_timestamp = self.extract_file_timestamp(file_path) items = self.parser.parse_markdown_file(file_path) print(f" {file_path.name}: {len(items)} items ({file_timestamp})") for item in items: item_id = item.id if not item_id: continue # Keep the most recent version if item_id not in all_items or file_timestamp > all_items[item_id][1]: all_items[item_id] = (item, file_timestamp, file_path.name) except Exception as e: print(f" āŒ Error parsing {file_path.name}: {e}") continue # Sort by timestamp for consistent output unique_items = [] for item_id, (item, timestamp, filename) in all_items.items(): unique_items.append((item, timestamp)) unique_items.sort(key=lambda x: x[1], reverse=True) # Most recent first final_items = [item for item, timestamp in unique_items] stats = { 'files': len(files), 'total_items': sum(len(self.parser.parse_markdown_file(f)) for f in files), 'unique_items': len(final_items) } print(f" āœ… {source}: {stats['unique_items']} unique items (from {stats['total_items']} total)") return {'items': final_items, 'stats': stats} def write_consolidated_file(self, source: str, items: list, output_file: Path): """Write consolidated markdown file.""" with open(output_file, 'w', encoding='utf-8') as f: f.write(f"# HVAC Know It All - {source.title()} Content\n") f.write(f"# Consolidated from all historical data\n") f.write(f"# Generated: {datetime.now().isoformat()}\n") f.write(f"# Total items: {len(items)}\n\n") for item in items: # Write item in markdown format f.write(f"# ID: {item.id}\n\n") if item.title: f.write(f"## Title: {item.title}\n\n") if item.content_type: f.write(f"## Type: {item.content_type}\n\n") if item.author: f.write(f"## Author: {item.author}\n\n") if item.url: f.write(f"## Link: {item.url}\n\n") if hasattr(item, 'published_date') and item.published_date: f.write(f"## Publish Date: {item.published_date}\n\n") if hasattr(item, 'upload_date') and item.upload_date: f.write(f"## Upload Date: {item.upload_date}\n\n") # Add source-specific metadata if hasattr(item, 'duration') and item.duration: f.write(f"## Duration: {item.duration}\n\n") if hasattr(item, 'views') and item.views: f.write(f"## Views: {item.views}\n\n") if hasattr(item, 'likes') and item.likes: f.write(f"## Likes: {item.likes}\n\n") if hasattr(item, 'comments') and item.comments: f.write(f"## Comments: {item.comments}\n\n") if hasattr(item, 'engagement_rate') and item.engagement_rate: f.write(f"## Engagement Rate: {item.engagement_rate}\n\n") if hasattr(item, 'thumbnail') and item.thumbnail: f.write(f"## Thumbnail: {item.thumbnail}\n\n") if hasattr(item, 'image') and item.image: f.write(f"## Image: {item.image}\n\n") # Description/content if item.description: f.write(f"## Description:\n{item.description}\n\n") # Categories/tags if item.categories: f.write(f"## Categories: {', '.join(item.categories)}\n\n") f.write("-" * 50 + "\n\n") def consolidate_all(self): """Consolidate all sources.""" self.output_dir.mkdir(parents=True, exist_ok=True) print("šŸ”„ HVAC Know It All Markdown Consolidation") print("=" * 50) results = {} for source in self.source_patterns.keys(): result = self.consolidate_source(source) results[source] = result if result['items']: output_file = self.output_dir / f"hkia_{source}_consolidated.md" self.write_consolidated_file(source, result['items'], output_file) print(f" šŸ“ Saved to {output_file}") else: print(f" āš ļø No content to save for {source}") print() # Summary print("šŸ“Š CONSOLIDATION SUMMARY") print("=" * 50) total_unique = 0 for source, result in results.items(): stats = result['stats'] print(f"{source:12}: {stats['unique_items']:4} unique items (from {stats['total_items']:4} total, {stats['files']:2} files)") total_unique += stats['unique_items'] print(f"{'TOTAL':12}: {total_unique:4} unique items") print(f"\nāœ… Consolidated files saved to {self.output_dir}") return results def main(): """Main consolidation function.""" data_dir = Path('data') output_dir = Path('data/consolidated') consolidator = MarkdownConsolidator(data_dir, output_dir) results = consolidator.consolidate_all() return results if __name__ == "__main__": main()