Major improvements: - Add CumulativeMarkdownManager for intelligent content merging - Implement YouTube Data API v3 integration with caption support - Add MailChimp API integration with content cleaning - Create single source-of-truth files that grow with updates - Smart merging: updates existing entries with better data - Properly combines backlog + incremental daily updates Features: - 179/444 YouTube videos now have captions (40.3%) - MailChimp content cleaned of headers/footers - All sources consolidated to single files - Archive management with timestamped versions - Test suite and documentation included 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
226 lines
No EOL
7.6 KiB
Python
226 lines
No EOL
7.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Consolidate multiple markdown files per source into single current files
|
|
Combines backlog data and incremental updates into one source of truth
|
|
Follows project specification naming: hvacnkowitall_<source>_<dateTime>.md
|
|
"""
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
|
|
from datetime import datetime
|
|
import pytz
|
|
import re
|
|
from typing import Dict, List, Set
|
|
import logging
|
|
|
|
# Set up logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('logs/consolidation.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger('consolidator')
|
|
|
|
|
|
def get_atlantic_timestamp() -> str:
|
|
"""Get current timestamp in Atlantic timezone."""
|
|
tz = pytz.timezone('America/Halifax')
|
|
return datetime.now(tz).strftime('%Y-%m-%dT%H%M%S')
|
|
|
|
|
|
def parse_markdown_sections(content: str) -> List[Dict]:
|
|
"""Parse markdown content into sections by ID."""
|
|
sections = []
|
|
|
|
# Split by ID headers
|
|
parts = content.split('# ID: ')
|
|
|
|
for part in parts[1:]: # Skip first empty part
|
|
if not part.strip():
|
|
continue
|
|
|
|
lines = part.strip().split('\n')
|
|
section_id = lines[0].strip()
|
|
|
|
# Get the full section content
|
|
section_content = f"# ID: {section_id}\n" + '\n'.join(lines[1:])
|
|
|
|
sections.append({
|
|
'id': section_id,
|
|
'content': section_content
|
|
})
|
|
|
|
return sections
|
|
|
|
|
|
def consolidate_source_files(source_name: str) -> bool:
|
|
"""Consolidate all files for a specific source into one current file."""
|
|
logger.info(f"Consolidating {source_name} files...")
|
|
|
|
current_dir = Path('data/markdown_current')
|
|
archives_dir = Path('data/markdown_archives')
|
|
|
|
# Find all files for this source
|
|
pattern = f"hvacnkowitall_{source_name}_*.md"
|
|
current_files = list(current_dir.glob(pattern))
|
|
|
|
# Also check for files with different naming (like captions files)
|
|
alt_patterns = [
|
|
f"*{source_name}*.md",
|
|
f"hvacnkowitall_{source_name.lower()}_*.md"
|
|
]
|
|
|
|
for alt_pattern in alt_patterns:
|
|
current_files.extend(current_dir.glob(alt_pattern))
|
|
|
|
# Remove duplicates
|
|
current_files = list(set(current_files))
|
|
|
|
if not current_files:
|
|
logger.warning(f"No files found for source: {source_name}")
|
|
return False
|
|
|
|
logger.info(f"Found {len(current_files)} files for {source_name}: {[f.name for f in current_files]}")
|
|
|
|
# Track unique sections by ID
|
|
sections_by_id: Dict[str, Dict] = {}
|
|
all_sections = []
|
|
|
|
# Process each file
|
|
for file_path in current_files:
|
|
logger.info(f"Processing {file_path.name}...")
|
|
|
|
try:
|
|
content = file_path.read_text(encoding='utf-8')
|
|
sections = parse_markdown_sections(content)
|
|
|
|
logger.info(f" Found {len(sections)} sections")
|
|
|
|
# Add sections, preferring newer data
|
|
for section in sections:
|
|
section_id = section['id']
|
|
|
|
# If we haven't seen this ID, add it
|
|
if section_id not in sections_by_id:
|
|
sections_by_id[section_id] = section
|
|
all_sections.append(section)
|
|
else:
|
|
# Check if this version has more content (like captions)
|
|
old_content = sections_by_id[section_id]['content']
|
|
new_content = section['content']
|
|
|
|
# Prefer content with captions/more detail
|
|
if ('Caption Status:' in new_content and 'Caption Status:' not in old_content) or \
|
|
len(new_content) > len(old_content):
|
|
logger.info(f" Updating section {section_id} with more detailed content")
|
|
# Update in place
|
|
for i, existing in enumerate(all_sections):
|
|
if existing['id'] == section_id:
|
|
all_sections[i] = section
|
|
sections_by_id[section_id] = section
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing {file_path}: {e}")
|
|
continue
|
|
|
|
if not all_sections:
|
|
logger.warning(f"No sections found for {source_name}")
|
|
return False
|
|
|
|
# Create consolidated content
|
|
consolidated_content = []
|
|
|
|
# Sort sections by ID for consistency
|
|
all_sections.sort(key=lambda x: x['id'])
|
|
|
|
for section in all_sections:
|
|
consolidated_content.append(section['content'])
|
|
consolidated_content.append("") # Add separator
|
|
|
|
# Generate new filename following project specification
|
|
timestamp = get_atlantic_timestamp()
|
|
new_filename = f"hvacnkowitall_{source_name}_{timestamp}.md"
|
|
new_file_path = current_dir / new_filename
|
|
|
|
# Save consolidated file
|
|
final_content = '\n'.join(consolidated_content)
|
|
new_file_path.write_text(final_content, encoding='utf-8')
|
|
|
|
logger.info(f"Created consolidated file: {new_filename}")
|
|
logger.info(f" Total sections: {len(all_sections)}")
|
|
logger.info(f" File size: {len(final_content):,} characters")
|
|
|
|
# Archive old files
|
|
archive_source_dir = archives_dir / source_name
|
|
archive_source_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
archived_count = 0
|
|
for old_file in current_files:
|
|
if old_file.name != new_filename: # Don't archive the new file
|
|
try:
|
|
archive_path = archive_source_dir / old_file.name
|
|
old_file.rename(archive_path)
|
|
archived_count += 1
|
|
logger.info(f" Archived: {old_file.name}")
|
|
except Exception as e:
|
|
logger.error(f"Error archiving {old_file.name}: {e}")
|
|
|
|
logger.info(f"Archived {archived_count} old files for {source_name}")
|
|
|
|
# Create copy in archives as well
|
|
archive_current_path = archive_source_dir / new_filename
|
|
archive_current_path.write_text(final_content, encoding='utf-8')
|
|
|
|
return True
|
|
|
|
|
|
def main():
|
|
"""Main consolidation function."""
|
|
logger.info("=" * 60)
|
|
logger.info("CONSOLIDATING CURRENT MARKDOWN FILES")
|
|
logger.info("=" * 60)
|
|
|
|
# Create directories if needed
|
|
Path('data/markdown_current').mkdir(parents=True, exist_ok=True)
|
|
Path('data/markdown_archives').mkdir(parents=True, exist_ok=True)
|
|
Path('logs').mkdir(parents=True, exist_ok=True)
|
|
|
|
# Define sources to consolidate
|
|
sources = ['YouTube', 'MailChimp', 'Instagram', 'TikTok', 'Podcast']
|
|
|
|
consolidated = []
|
|
failed = []
|
|
|
|
for source in sources:
|
|
logger.info(f"\n{'-' * 40}")
|
|
try:
|
|
if consolidate_source_files(source):
|
|
consolidated.append(source)
|
|
else:
|
|
failed.append(source)
|
|
except Exception as e:
|
|
logger.error(f"Failed to consolidate {source}: {e}")
|
|
failed.append(source)
|
|
|
|
logger.info(f"\n{'=' * 60}")
|
|
logger.info("CONSOLIDATION SUMMARY")
|
|
logger.info(f"{'=' * 60}")
|
|
logger.info(f"Successfully consolidated: {consolidated}")
|
|
logger.info(f"Failed/No data: {failed}")
|
|
|
|
# List final current files
|
|
current_files = list(Path('data/markdown_current').glob('*.md'))
|
|
logger.info(f"\nFinal current files:")
|
|
for file in sorted(current_files):
|
|
size = file.stat().st_size
|
|
logger.info(f" {file.name} ({size:,} bytes)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |