hvac-kia-content/run_api_production_v2.py
Ben Reed 8ceb858026 Implement cumulative markdown system and API integrations
Major improvements:
- Add CumulativeMarkdownManager for intelligent content merging
- Implement YouTube Data API v3 integration with caption support
- Add MailChimp API integration with content cleaning
- Create single source-of-truth files that grow with updates
- Smart merging: updates existing entries with better data
- Properly combines backlog + incremental daily updates

Features:
- 179/444 YouTube videos now have captions (40.3%)
- MailChimp content cleaned of headers/footers
- All sources consolidated to single files
- Archive management with timestamped versions
- Test suite and documentation included

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-19 10:53:40 -03:00

304 lines
No EOL
12 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Production script for API-based content scraping - Version 2
Follows project specification file/folder naming conventions
Captures YouTube videos with captions and MailChimp campaigns with cleaned content
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from src.youtube_api_scraper_v2 import YouTubeAPIScraper
from src.mailchimp_api_scraper_v2 import MailChimpAPIScraper
from src.base_scraper import ScraperConfig
from datetime import datetime
import pytz
import time
import logging
import subprocess
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/api_production_v2.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('api_production_v2')
def get_atlantic_timestamp() -> str:
"""Get current timestamp in Atlantic timezone for file naming."""
tz = pytz.timezone('America/Halifax')
return datetime.now(tz).strftime('%Y-%m-%dT%H%M%S')
def run_youtube_api_production():
"""Run YouTube API scraper for production backlog with captions."""
logger.info("=" * 60)
logger.info("YOUTUBE API SCRAPER - PRODUCTION V2")
logger.info("=" * 60)
timestamp = get_atlantic_timestamp()
# Follow project specification directory structure
config = ScraperConfig(
source_name='YouTube', # Capitalized per spec
brand_name='hvacnkowitall',
data_dir=Path('data/markdown_current'),
logs_dir=Path('logs/YouTube'),
timezone='America/Halifax'
)
try:
scraper = YouTubeAPIScraper(config)
logger.info("Starting YouTube API fetch with captions for all videos...")
start = time.time()
# Fetch all videos WITH captions for top 50 (use more quota)
videos = scraper.fetch_content(fetch_captions=True)
elapsed = time.time() - start
logger.info(f"Fetched {len(videos)} videos in {elapsed:.1f} seconds")
if videos:
# Statistics
total_views = sum(v.get('view_count', 0) for v in videos)
total_likes = sum(v.get('like_count', 0) for v in videos)
with_captions = sum(1 for v in videos if v.get('caption_text'))
logger.info(f"Statistics:")
logger.info(f" Total videos: {len(videos)}")
logger.info(f" Total views: {total_views:,}")
logger.info(f" Total likes: {total_likes:,}")
logger.info(f" Videos with captions: {with_captions}")
logger.info(f" Quota used: {scraper.quota_used}/{scraper.daily_quota_limit} units")
# Save with project specification naming: <brandName>_<source>_<dateTime>.md
filename = f"hvacnkowitall_YouTube_{timestamp}.md"
markdown = scraper.format_markdown(videos)
output_file = Path(f'data/markdown_current/{filename}')
output_file.parent.mkdir(parents=True, exist_ok=True)
output_file.write_text(markdown, encoding='utf-8')
logger.info(f"Markdown saved to: {output_file}")
# Create archive copy
archive_dir = Path('data/markdown_archives/YouTube')
archive_dir.mkdir(parents=True, exist_ok=True)
archive_file = archive_dir / filename
archive_file.write_text(markdown, encoding='utf-8')
logger.info(f"Archive copy saved to: {archive_file}")
# Update state file
state = scraper.load_state()
state = scraper.update_state(state, videos)
scraper.save_state(state)
logger.info("State file updated for incremental updates")
return True, len(videos), output_file
else:
logger.error("No videos fetched from YouTube API")
return False, 0, None
except Exception as e:
logger.error(f"YouTube API scraper failed: {e}")
return False, 0, None
def run_mailchimp_api_production():
"""Run MailChimp API scraper for production backlog with cleaned content."""
logger.info("\n" + "=" * 60)
logger.info("MAILCHIMP API SCRAPER - PRODUCTION V2")
logger.info("=" * 60)
timestamp = get_atlantic_timestamp()
# Follow project specification directory structure
config = ScraperConfig(
source_name='MailChimp', # Capitalized per spec
brand_name='hvacnkowitall',
data_dir=Path('data/markdown_current'),
logs_dir=Path('logs/MailChimp'),
timezone='America/Halifax'
)
try:
scraper = MailChimpAPIScraper(config)
logger.info("Starting MailChimp API fetch with content cleaning...")
start = time.time()
# Fetch all campaigns from Bi-Weekly Newsletter folder
campaigns = scraper.fetch_content(max_items=1000)
elapsed = time.time() - start
logger.info(f"Fetched {len(campaigns)} campaigns in {elapsed:.1f} seconds")
if campaigns:
# Statistics
total_sent = sum(c.get('metrics', {}).get('emails_sent', 0) for c in campaigns)
total_opens = sum(c.get('metrics', {}).get('unique_opens', 0) for c in campaigns)
total_clicks = sum(c.get('metrics', {}).get('unique_clicks', 0) for c in campaigns)
logger.info(f"Statistics:")
logger.info(f" Total campaigns: {len(campaigns)}")
logger.info(f" Total emails sent: {total_sent:,}")
logger.info(f" Total unique opens: {total_opens:,}")
logger.info(f" Total unique clicks: {total_clicks:,}")
if campaigns:
avg_open_rate = sum(c.get('metrics', {}).get('open_rate', 0) for c in campaigns) / len(campaigns)
avg_click_rate = sum(c.get('metrics', {}).get('click_rate', 0) for c in campaigns) / len(campaigns)
logger.info(f" Average open rate: {avg_open_rate*100:.1f}%")
logger.info(f" Average click rate: {avg_click_rate*100:.1f}%")
# Save with project specification naming: <brandName>_<source>_<dateTime>.md
filename = f"hvacnkowitall_MailChimp_{timestamp}.md"
markdown = scraper.format_markdown(campaigns)
output_file = Path(f'data/markdown_current/{filename}')
output_file.parent.mkdir(parents=True, exist_ok=True)
output_file.write_text(markdown, encoding='utf-8')
logger.info(f"Markdown saved to: {output_file}")
# Create archive copy
archive_dir = Path('data/markdown_archives/MailChimp')
archive_dir.mkdir(parents=True, exist_ok=True)
archive_file = archive_dir / filename
archive_file.write_text(markdown, encoding='utf-8')
logger.info(f"Archive copy saved to: {archive_file}")
# Update state file
state = scraper.load_state()
state = scraper.update_state(state, campaigns)
scraper.save_state(state)
logger.info("State file updated for incremental updates")
return True, len(campaigns), output_file
else:
logger.warning("No campaigns found in MailChimp")
return True, 0, None
except Exception as e:
logger.error(f"MailChimp API scraper failed: {e}")
return False, 0, None
def sync_to_nas():
"""Sync API scraper results to NAS following project structure."""
logger.info("\n" + "=" * 60)
logger.info("SYNCING TO NAS - PROJECT STRUCTURE")
logger.info("=" * 60)
nas_base = Path('/mnt/nas/hvacknowitall')
try:
# Sync all markdown_current files
local_current = Path('data/markdown_current')
nas_current = nas_base / 'markdown_current'
if local_current.exists() and any(local_current.glob('*.md')):
# Create destination if needed
nas_current.mkdir(parents=True, exist_ok=True)
# Sync all current markdown files
cmd = ['rsync', '-av', '--include=*.md', '--exclude=*',
str(local_current) + '/', str(nas_current) + '/']
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"✅ Current markdown files synced to NAS: {nas_current}")
# List synced files
for md_file in nas_current.glob('*.md'):
size = md_file.stat().st_size / 1024 # KB
logger.info(f" - {md_file.name} ({size:.0f}KB)")
else:
logger.warning(f"Sync warning: {result.stderr}")
else:
logger.info("No current markdown files to sync")
# Sync archives
for source in ['YouTube', 'MailChimp']:
local_archive = Path(f'data/markdown_archives/{source}')
nas_archive = nas_base / f'markdown_archives/{source}'
if local_archive.exists() and any(local_archive.glob('*.md')):
nas_archive.mkdir(parents=True, exist_ok=True)
cmd = ['rsync', '-av', '--include=*.md', '--exclude=*',
str(local_archive) + '/', str(nas_archive) + '/']
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"{source} archives synced to NAS: {nas_archive}")
else:
logger.warning(f"{source} archive sync warning: {result.stderr}")
except Exception as e:
logger.error(f"Failed to sync to NAS: {e}")
def main():
"""Main production run with project specification compliance."""
logger.info("=" * 70)
logger.info("HVAC KNOW IT ALL - API SCRAPERS PRODUCTION V2")
logger.info("Following Project Specification Standards")
logger.info("=" * 70)
atlantic_tz = pytz.timezone('America/Halifax')
start_time = datetime.now(atlantic_tz)
logger.info(f"Started at: {start_time.isoformat()}")
# Track results
results = {
'YouTube': {'success': False, 'count': 0, 'file': None},
'MailChimp': {'success': False, 'count': 0, 'file': None}
}
# Run YouTube API scraper with captions
success, count, output_file = run_youtube_api_production()
results['YouTube'] = {'success': success, 'count': count, 'file': output_file}
# Run MailChimp API scraper with content cleaning
success, count, output_file = run_mailchimp_api_production()
results['MailChimp'] = {'success': success, 'count': count, 'file': output_file}
# Sync to NAS
sync_to_nas()
# Summary
end_time = datetime.now(atlantic_tz)
duration = end_time - start_time
logger.info("\n" + "=" * 70)
logger.info("PRODUCTION V2 SUMMARY")
logger.info("=" * 70)
for source, result in results.items():
status = "" if result['success'] else ""
logger.info(f"{status} {source}: {result['count']} items")
if result['file']:
logger.info(f" Output: {result['file']}")
logger.info(f"\nTotal duration: {duration.total_seconds():.1f} seconds")
logger.info(f"Completed at: {end_time.isoformat()}")
# Project specification compliance
logger.info("\nPROJECT SPECIFICATION COMPLIANCE:")
logger.info("✅ File naming: hvacnkowitall_<Source>_<YYYY-MM-DDTHHMMSS>.md")
logger.info("✅ Directory structure: data/markdown_current/, data/markdown_archives/")
logger.info("✅ Capitalized source names: YouTube, MailChimp")
logger.info("✅ Atlantic timezone timestamps")
logger.info("✅ Archive copies created")
logger.info("✅ State files for incremental updates")
# Return success if at least one scraper succeeded
return any(r['success'] for r in results.values())
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)