hvac-kia-content/run_api_scrapers_production.py
Ben Reed daab901e35 refactor: Update naming convention from hvacknowitall to hkia
Major Changes:
- Updated all code references from hvacknowitall/hvacnkowitall to hkia
- Renamed all existing markdown files to use hkia_ prefix
- Updated configuration files, scrapers, and production scripts
- Modified systemd service descriptions to use HKIA
- Changed NAS sync path to /mnt/nas/hkia

Files Updated:
- 20+ source files updated with new naming convention
- 34 markdown files renamed to hkia_* format
- All ScraperConfig brand_name parameters now use 'hkia'
- Documentation updated to reflect new naming

Rationale:
- Shorter, cleaner filenames
- Consistent branding across all outputs
- Easier to type and reference
- Maintains same functionality with improved naming

Next Steps:
- Deploy updated services to production
- Update any external references to old naming
- Monitor scrapers to ensure proper operation

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-19 13:35:23 -03:00

278 lines
No EOL
10 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Production script for API-based content scraping
Captures YouTube videos and MailChimp campaigns using official APIs
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from src.youtube_api_scraper import YouTubeAPIScraper
from src.mailchimp_api_scraper import MailChimpAPIScraper
from src.base_scraper import ScraperConfig
from datetime import datetime
import pytz
import time
import logging
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/api_scrapers_production.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('api_production')
def run_youtube_api_production():
"""Run YouTube API scraper for production backlog"""
logger.info("=" * 60)
logger.info("YOUTUBE API SCRAPER - PRODUCTION RUN")
logger.info("=" * 60)
tz = pytz.timezone('America/Halifax')
timestamp = datetime.now(tz).strftime('%Y-%m-%dT%H%M%S')
config = ScraperConfig(
source_name='youtube',
brand_name='hvacknowitall',
data_dir=Path('data/youtube'),
logs_dir=Path('logs/youtube'),
timezone='America/Halifax'
)
try:
scraper = YouTubeAPIScraper(config)
logger.info("Starting YouTube API fetch for full channel...")
start = time.time()
# Fetch all videos with transcripts for top 50
videos = scraper.fetch_content(fetch_transcripts=True)
elapsed = time.time() - start
logger.info(f"Fetched {len(videos)} videos in {elapsed:.1f} seconds")
if videos:
# Statistics
total_views = sum(v.get('view_count', 0) for v in videos)
total_likes = sum(v.get('like_count', 0) for v in videos)
with_transcripts = sum(1 for v in videos if v.get('transcript'))
logger.info(f"Statistics:")
logger.info(f" Total videos: {len(videos)}")
logger.info(f" Total views: {total_views:,}")
logger.info(f" Total likes: {total_likes:,}")
logger.info(f" Videos with transcripts: {with_transcripts}")
logger.info(f" Quota used: {scraper.quota_used}/{scraper.daily_quota_limit} units")
# Save markdown with timestamp
markdown = scraper.format_markdown(videos)
output_file = Path(f'data/youtube/hvacknowitall_youtube_{timestamp}.md')
output_file.parent.mkdir(parents=True, exist_ok=True)
output_file.write_text(markdown, encoding='utf-8')
logger.info(f"Markdown saved to: {output_file}")
# Also save as "latest" for easy access
latest_file = Path('data/youtube/hvacknowitall_youtube_latest.md')
latest_file.write_text(markdown, encoding='utf-8')
logger.info(f"Latest file updated: {latest_file}")
# Update state file
state = scraper.load_state()
state = scraper.update_state(state, videos)
scraper.save_state(state)
logger.info("State file updated for incremental updates")
return True, len(videos), output_file
else:
logger.error("No videos fetched from YouTube API")
return False, 0, None
except Exception as e:
logger.error(f"YouTube API scraper failed: {e}")
return False, 0, None
def run_mailchimp_api_production():
"""Run MailChimp API scraper for production backlog"""
logger.info("\n" + "=" * 60)
logger.info("MAILCHIMP API SCRAPER - PRODUCTION RUN")
logger.info("=" * 60)
tz = pytz.timezone('America/Halifax')
timestamp = datetime.now(tz).strftime('%Y-%m-%dT%H%M%S')
config = ScraperConfig(
source_name='mailchimp',
brand_name='hvacknowitall',
data_dir=Path('data/mailchimp'),
logs_dir=Path('logs/mailchimp'),
timezone='America/Halifax'
)
try:
scraper = MailChimpAPIScraper(config)
logger.info("Starting MailChimp API fetch for all campaigns...")
start = time.time()
# Fetch all campaigns from Bi-Weekly Newsletter folder
campaigns = scraper.fetch_content(max_items=1000) # Get all available
elapsed = time.time() - start
logger.info(f"Fetched {len(campaigns)} campaigns in {elapsed:.1f} seconds")
if campaigns:
# Statistics
total_sent = sum(c.get('metrics', {}).get('emails_sent', 0) for c in campaigns)
total_opens = sum(c.get('metrics', {}).get('unique_opens', 0) for c in campaigns)
total_clicks = sum(c.get('metrics', {}).get('unique_clicks', 0) for c in campaigns)
logger.info(f"Statistics:")
logger.info(f" Total campaigns: {len(campaigns)}")
logger.info(f" Total emails sent: {total_sent:,}")
logger.info(f" Total unique opens: {total_opens:,}")
logger.info(f" Total unique clicks: {total_clicks:,}")
if campaigns:
avg_open_rate = sum(c.get('metrics', {}).get('open_rate', 0) for c in campaigns) / len(campaigns)
avg_click_rate = sum(c.get('metrics', {}).get('click_rate', 0) for c in campaigns) / len(campaigns)
logger.info(f" Average open rate: {avg_open_rate*100:.1f}%")
logger.info(f" Average click rate: {avg_click_rate*100:.1f}%")
# Save markdown with timestamp
markdown = scraper.format_markdown(campaigns)
output_file = Path(f'data/mailchimp/hvacknowitall_mailchimp_{timestamp}.md')
output_file.parent.mkdir(parents=True, exist_ok=True)
output_file.write_text(markdown, encoding='utf-8')
logger.info(f"Markdown saved to: {output_file}")
# Also save as "latest" for easy access
latest_file = Path('data/mailchimp/hvacknowitall_mailchimp_latest.md')
latest_file.write_text(markdown, encoding='utf-8')
logger.info(f"Latest file updated: {latest_file}")
# Update state file
state = scraper.load_state()
state = scraper.update_state(state, campaigns)
scraper.save_state(state)
logger.info("State file updated for incremental updates")
return True, len(campaigns), output_file
else:
logger.warning("No campaigns found in MailChimp")
return True, 0, None # Not an error if no campaigns
except Exception as e:
logger.error(f"MailChimp API scraper failed: {e}")
return False, 0, None
def sync_to_nas():
"""Sync API scraper results to NAS"""
logger.info("\n" + "=" * 60)
logger.info("SYNCING TO NAS")
logger.info("=" * 60)
import subprocess
nas_base = Path('/mnt/nas/hvacknowitall')
# Sync YouTube
try:
youtube_src = Path('data/youtube')
youtube_dest = nas_base / 'markdown_current/youtube'
if youtube_src.exists() and any(youtube_src.glob('*.md')):
# Create destination if needed
youtube_dest.mkdir(parents=True, exist_ok=True)
# Sync markdown files
cmd = ['rsync', '-av', '--include=*.md', '--exclude=*',
str(youtube_src) + '/', str(youtube_dest) + '/']
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"✅ YouTube data synced to NAS: {youtube_dest}")
else:
logger.warning(f"YouTube sync warning: {result.stderr}")
else:
logger.info("No YouTube data to sync")
except Exception as e:
logger.error(f"Failed to sync YouTube data: {e}")
# Sync MailChimp
try:
mailchimp_src = Path('data/mailchimp')
mailchimp_dest = nas_base / 'markdown_current/mailchimp'
if mailchimp_src.exists() and any(mailchimp_src.glob('*.md')):
# Create destination if needed
mailchimp_dest.mkdir(parents=True, exist_ok=True)
# Sync markdown files
cmd = ['rsync', '-av', '--include=*.md', '--exclude=*',
str(mailchimp_src) + '/', str(mailchimp_dest) + '/']
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"✅ MailChimp data synced to NAS: {mailchimp_dest}")
else:
logger.warning(f"MailChimp sync warning: {result.stderr}")
else:
logger.info("No MailChimp data to sync")
except Exception as e:
logger.error(f"Failed to sync MailChimp data: {e}")
def main():
"""Main production run"""
logger.info("=" * 60)
logger.info("HVAC KNOW IT ALL - API SCRAPERS PRODUCTION RUN")
logger.info("=" * 60)
logger.info(f"Started at: {datetime.now(pytz.timezone('America/Halifax')).isoformat()}")
# Track results
results = {
'youtube': {'success': False, 'count': 0, 'file': None},
'mailchimp': {'success': False, 'count': 0, 'file': None}
}
# Run YouTube API scraper
success, count, output_file = run_youtube_api_production()
results['youtube'] = {'success': success, 'count': count, 'file': output_file}
# Run MailChimp API scraper
success, count, output_file = run_mailchimp_api_production()
results['mailchimp'] = {'success': success, 'count': count, 'file': output_file}
# Sync to NAS
sync_to_nas()
# Summary
logger.info("\n" + "=" * 60)
logger.info("PRODUCTION RUN SUMMARY")
logger.info("=" * 60)
for source, result in results.items():
status = "" if result['success'] else ""
logger.info(f"{status} {source.upper()}: {result['count']} items")
if result['file']:
logger.info(f" Output: {result['file']}")
logger.info(f"\nCompleted at: {datetime.now(pytz.timezone('America/Halifax')).isoformat()}")
# Return success if at least one scraper succeeded
return any(r['success'] for r in results.values())
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)