#!/usr/bin/env python3 """ Production script for API-based content scraping - Version 2 Follows project specification file/folder naming conventions Captures YouTube videos with captions and MailChimp campaigns with cleaned content """ import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) from src.youtube_api_scraper_v2 import YouTubeAPIScraper from src.mailchimp_api_scraper_v2 import MailChimpAPIScraper from src.base_scraper import ScraperConfig from datetime import datetime import pytz import time import logging import subprocess # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('logs/api_production_v2.log'), logging.StreamHandler() ] ) logger = logging.getLogger('api_production_v2') def get_atlantic_timestamp() -> str: """Get current timestamp in Atlantic timezone for file naming.""" tz = pytz.timezone('America/Halifax') return datetime.now(tz).strftime('%Y-%m-%dT%H%M%S') def run_youtube_api_production(): """Run YouTube API scraper for production backlog with captions.""" logger.info("=" * 60) logger.info("YOUTUBE API SCRAPER - PRODUCTION V2") logger.info("=" * 60) timestamp = get_atlantic_timestamp() # Follow project specification directory structure config = ScraperConfig( source_name='YouTube', # Capitalized per spec brand_name='hvacnkowitall', data_dir=Path('data/markdown_current'), logs_dir=Path('logs/YouTube'), timezone='America/Halifax' ) try: scraper = YouTubeAPIScraper(config) logger.info("Starting YouTube API fetch with captions for all videos...") start = time.time() # Fetch all videos WITH captions for top 50 (use more quota) videos = scraper.fetch_content(fetch_captions=True) elapsed = time.time() - start logger.info(f"Fetched {len(videos)} videos in {elapsed:.1f} seconds") if videos: # Statistics total_views = sum(v.get('view_count', 0) for v in videos) total_likes = sum(v.get('like_count', 0) for v in videos) with_captions = sum(1 for v in videos if v.get('caption_text')) logger.info(f"Statistics:") logger.info(f" Total videos: {len(videos)}") logger.info(f" Total views: {total_views:,}") logger.info(f" Total likes: {total_likes:,}") logger.info(f" Videos with captions: {with_captions}") logger.info(f" Quota used: {scraper.quota_used}/{scraper.daily_quota_limit} units") # Save with project specification naming: __.md filename = f"hvacnkowitall_YouTube_{timestamp}.md" markdown = scraper.format_markdown(videos) output_file = Path(f'data/markdown_current/{filename}') output_file.parent.mkdir(parents=True, exist_ok=True) output_file.write_text(markdown, encoding='utf-8') logger.info(f"Markdown saved to: {output_file}") # Create archive copy archive_dir = Path('data/markdown_archives/YouTube') archive_dir.mkdir(parents=True, exist_ok=True) archive_file = archive_dir / filename archive_file.write_text(markdown, encoding='utf-8') logger.info(f"Archive copy saved to: {archive_file}") # Update state file state = scraper.load_state() state = scraper.update_state(state, videos) scraper.save_state(state) logger.info("State file updated for incremental updates") return True, len(videos), output_file else: logger.error("No videos fetched from YouTube API") return False, 0, None except Exception as e: logger.error(f"YouTube API scraper failed: {e}") return False, 0, None def run_mailchimp_api_production(): """Run MailChimp API scraper for production backlog with cleaned content.""" logger.info("\n" + "=" * 60) logger.info("MAILCHIMP API SCRAPER - PRODUCTION V2") logger.info("=" * 60) timestamp = get_atlantic_timestamp() # Follow project specification directory structure config = ScraperConfig( source_name='MailChimp', # Capitalized per spec brand_name='hvacnkowitall', data_dir=Path('data/markdown_current'), logs_dir=Path('logs/MailChimp'), timezone='America/Halifax' ) try: scraper = MailChimpAPIScraper(config) logger.info("Starting MailChimp API fetch with content cleaning...") start = time.time() # Fetch all campaigns from Bi-Weekly Newsletter folder campaigns = scraper.fetch_content(max_items=1000) elapsed = time.time() - start logger.info(f"Fetched {len(campaigns)} campaigns in {elapsed:.1f} seconds") if campaigns: # Statistics total_sent = sum(c.get('metrics', {}).get('emails_sent', 0) for c in campaigns) total_opens = sum(c.get('metrics', {}).get('unique_opens', 0) for c in campaigns) total_clicks = sum(c.get('metrics', {}).get('unique_clicks', 0) for c in campaigns) logger.info(f"Statistics:") logger.info(f" Total campaigns: {len(campaigns)}") logger.info(f" Total emails sent: {total_sent:,}") logger.info(f" Total unique opens: {total_opens:,}") logger.info(f" Total unique clicks: {total_clicks:,}") if campaigns: avg_open_rate = sum(c.get('metrics', {}).get('open_rate', 0) for c in campaigns) / len(campaigns) avg_click_rate = sum(c.get('metrics', {}).get('click_rate', 0) for c in campaigns) / len(campaigns) logger.info(f" Average open rate: {avg_open_rate*100:.1f}%") logger.info(f" Average click rate: {avg_click_rate*100:.1f}%") # Save with project specification naming: __.md filename = f"hvacnkowitall_MailChimp_{timestamp}.md" markdown = scraper.format_markdown(campaigns) output_file = Path(f'data/markdown_current/{filename}') output_file.parent.mkdir(parents=True, exist_ok=True) output_file.write_text(markdown, encoding='utf-8') logger.info(f"Markdown saved to: {output_file}") # Create archive copy archive_dir = Path('data/markdown_archives/MailChimp') archive_dir.mkdir(parents=True, exist_ok=True) archive_file = archive_dir / filename archive_file.write_text(markdown, encoding='utf-8') logger.info(f"Archive copy saved to: {archive_file}") # Update state file state = scraper.load_state() state = scraper.update_state(state, campaigns) scraper.save_state(state) logger.info("State file updated for incremental updates") return True, len(campaigns), output_file else: logger.warning("No campaigns found in MailChimp") return True, 0, None except Exception as e: logger.error(f"MailChimp API scraper failed: {e}") return False, 0, None def sync_to_nas(): """Sync API scraper results to NAS following project structure.""" logger.info("\n" + "=" * 60) logger.info("SYNCING TO NAS - PROJECT STRUCTURE") logger.info("=" * 60) nas_base = Path('/mnt/nas/hvacknowitall') try: # Sync all markdown_current files local_current = Path('data/markdown_current') nas_current = nas_base / 'markdown_current' if local_current.exists() and any(local_current.glob('*.md')): # Create destination if needed nas_current.mkdir(parents=True, exist_ok=True) # Sync all current markdown files cmd = ['rsync', '-av', '--include=*.md', '--exclude=*', str(local_current) + '/', str(nas_current) + '/'] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: logger.info(f"✅ Current markdown files synced to NAS: {nas_current}") # List synced files for md_file in nas_current.glob('*.md'): size = md_file.stat().st_size / 1024 # KB logger.info(f" - {md_file.name} ({size:.0f}KB)") else: logger.warning(f"Sync warning: {result.stderr}") else: logger.info("No current markdown files to sync") # Sync archives for source in ['YouTube', 'MailChimp']: local_archive = Path(f'data/markdown_archives/{source}') nas_archive = nas_base / f'markdown_archives/{source}' if local_archive.exists() and any(local_archive.glob('*.md')): nas_archive.mkdir(parents=True, exist_ok=True) cmd = ['rsync', '-av', '--include=*.md', '--exclude=*', str(local_archive) + '/', str(nas_archive) + '/'] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: logger.info(f"✅ {source} archives synced to NAS: {nas_archive}") else: logger.warning(f"{source} archive sync warning: {result.stderr}") except Exception as e: logger.error(f"Failed to sync to NAS: {e}") def main(): """Main production run with project specification compliance.""" logger.info("=" * 70) logger.info("HVAC KNOW IT ALL - API SCRAPERS PRODUCTION V2") logger.info("Following Project Specification Standards") logger.info("=" * 70) atlantic_tz = pytz.timezone('America/Halifax') start_time = datetime.now(atlantic_tz) logger.info(f"Started at: {start_time.isoformat()}") # Track results results = { 'YouTube': {'success': False, 'count': 0, 'file': None}, 'MailChimp': {'success': False, 'count': 0, 'file': None} } # Run YouTube API scraper with captions success, count, output_file = run_youtube_api_production() results['YouTube'] = {'success': success, 'count': count, 'file': output_file} # Run MailChimp API scraper with content cleaning success, count, output_file = run_mailchimp_api_production() results['MailChimp'] = {'success': success, 'count': count, 'file': output_file} # Sync to NAS sync_to_nas() # Summary end_time = datetime.now(atlantic_tz) duration = end_time - start_time logger.info("\n" + "=" * 70) logger.info("PRODUCTION V2 SUMMARY") logger.info("=" * 70) for source, result in results.items(): status = "✅" if result['success'] else "❌" logger.info(f"{status} {source}: {result['count']} items") if result['file']: logger.info(f" Output: {result['file']}") logger.info(f"\nTotal duration: {duration.total_seconds():.1f} seconds") logger.info(f"Completed at: {end_time.isoformat()}") # Project specification compliance logger.info("\nPROJECT SPECIFICATION COMPLIANCE:") logger.info("✅ File naming: hvacnkowitall__.md") logger.info("✅ Directory structure: data/markdown_current/, data/markdown_archives/") logger.info("✅ Capitalized source names: YouTube, MailChimp") logger.info("✅ Atlantic timezone timestamps") logger.info("✅ Archive copies created") logger.info("✅ State files for incremental updates") # Return success if at least one scraper succeeded return any(r['success'] for r in results.values()) if __name__ == "__main__": success = main() sys.exit(0 if success else 1)