hvac-kia-content/src/mailchimp_api_scraper.py
Ben Reed daab901e35 refactor: Update naming convention from hvacknowitall to hkia
Major Changes:
- Updated all code references from hvacknowitall/hvacnkowitall to hkia
- Renamed all existing markdown files to use hkia_ prefix
- Updated configuration files, scrapers, and production scripts
- Modified systemd service descriptions to use HKIA
- Changed NAS sync path to /mnt/nas/hkia

Files Updated:
- 20+ source files updated with new naming convention
- 34 markdown files renamed to hkia_* format
- All ScraperConfig brand_name parameters now use 'hkia'
- Documentation updated to reflect new naming

Rationale:
- Shorter, cleaner filenames
- Consistent branding across all outputs
- Easier to type and reference
- Maintains same functionality with improved naming

Next Steps:
- Deploy updated services to production
- Update any external references to old naming
- Monitor scrapers to ensure proper operation

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-19 13:35:23 -03:00

355 lines
No EOL
14 KiB
Python

#!/usr/bin/env python3
"""
MailChimp API scraper for fetching campaign data and metrics
Fetches only campaigns from "Bi-Weekly Newsletter" folder
"""
import os
import time
import requests
from typing import Any, Dict, List, Optional
from datetime import datetime
from src.base_scraper import BaseScraper, ScraperConfig
import logging
class MailChimpAPIScraper(BaseScraper):
"""MailChimp API scraper for campaigns and metrics."""
def __init__(self, config: ScraperConfig):
super().__init__(config)
self.api_key = os.getenv('MAILCHIMP_API_KEY')
self.server_prefix = os.getenv('MAILCHIMP_SERVER_PREFIX', 'us10')
if not self.api_key:
raise ValueError("MAILCHIMP_API_KEY not found in environment variables")
self.base_url = f"https://{self.server_prefix}.api.mailchimp.com/3.0"
self.headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
# Cache folder ID for "Bi-Weekly Newsletter"
self.target_folder_id = None
self.target_folder_name = "Bi-Weekly Newsletter"
self.logger.info(f"Initialized MailChimp API scraper for server: {self.server_prefix}")
def _test_connection(self) -> bool:
"""Test API connection."""
try:
response = requests.get(f"{self.base_url}/ping", headers=self.headers)
if response.status_code == 200:
self.logger.info("MailChimp API connection successful")
return True
else:
self.logger.error(f"MailChimp API connection failed: {response.status_code}")
return False
except Exception as e:
self.logger.error(f"MailChimp API connection error: {e}")
return False
def _get_folder_id(self) -> Optional[str]:
"""Get the folder ID for 'Bi-Weekly Newsletter'."""
if self.target_folder_id:
return self.target_folder_id
try:
response = requests.get(
f"{self.base_url}/campaign-folders",
headers=self.headers,
params={'count': 100}
)
if response.status_code == 200:
folders_data = response.json()
for folder in folders_data.get('folders', []):
if folder['name'] == self.target_folder_name:
self.target_folder_id = folder['id']
self.logger.info(f"Found '{self.target_folder_name}' folder: {self.target_folder_id}")
return self.target_folder_id
self.logger.warning(f"'{self.target_folder_name}' folder not found")
else:
self.logger.error(f"Failed to fetch folders: {response.status_code}")
except Exception as e:
self.logger.error(f"Error fetching folders: {e}")
return None
def _fetch_campaign_content(self, campaign_id: str) -> Optional[Dict[str, Any]]:
"""Fetch campaign content."""
try:
response = requests.get(
f"{self.base_url}/campaigns/{campaign_id}/content",
headers=self.headers
)
if response.status_code == 200:
return response.json()
else:
self.logger.warning(f"Failed to fetch content for campaign {campaign_id}: {response.status_code}")
return None
except Exception as e:
self.logger.error(f"Error fetching campaign content: {e}")
return None
def _fetch_campaign_report(self, campaign_id: str) -> Optional[Dict[str, Any]]:
"""Fetch campaign report with metrics."""
try:
response = requests.get(
f"{self.base_url}/reports/{campaign_id}",
headers=self.headers
)
if response.status_code == 200:
return response.json()
else:
self.logger.warning(f"Failed to fetch report for campaign {campaign_id}: {response.status_code}")
return None
except Exception as e:
self.logger.error(f"Error fetching campaign report: {e}")
return None
def fetch_content(self, max_items: int = None) -> List[Dict[str, Any]]:
"""Fetch campaigns from MailChimp API."""
# Test connection first
if not self._test_connection():
self.logger.error("Failed to connect to MailChimp API")
return []
# Get folder ID
folder_id = self._get_folder_id()
# Prepare parameters
params = {
'count': max_items or 1000, # Default to 1000 if not specified
'status': 'sent', # Only sent campaigns
'sort_field': 'send_time',
'sort_dir': 'DESC'
}
if folder_id:
params['folder_id'] = folder_id
self.logger.info(f"Fetching campaigns from '{self.target_folder_name}' folder")
else:
self.logger.info("Fetching all sent campaigns")
try:
response = requests.get(
f"{self.base_url}/campaigns",
headers=self.headers,
params=params
)
if response.status_code != 200:
self.logger.error(f"Failed to fetch campaigns: {response.status_code}")
return []
campaigns_data = response.json()
campaigns = campaigns_data.get('campaigns', [])
self.logger.info(f"Found {len(campaigns)} campaigns")
# Enrich each campaign with content and metrics
enriched_campaigns = []
for campaign in campaigns:
campaign_id = campaign['id']
# Add basic campaign info
enriched_campaign = {
'id': campaign_id,
'title': campaign.get('settings', {}).get('subject_line', 'Untitled'),
'preview_text': campaign.get('settings', {}).get('preview_text', ''),
'from_name': campaign.get('settings', {}).get('from_name', ''),
'reply_to': campaign.get('settings', {}).get('reply_to', ''),
'send_time': campaign.get('send_time'),
'status': campaign.get('status'),
'type': campaign.get('type', 'regular'),
'archive_url': campaign.get('archive_url', ''),
'long_archive_url': campaign.get('long_archive_url', ''),
'folder_id': campaign.get('settings', {}).get('folder_id')
}
# Fetch content
content_data = self._fetch_campaign_content(campaign_id)
if content_data:
enriched_campaign['plain_text'] = content_data.get('plain_text', '')
enriched_campaign['html'] = content_data.get('html', '')
# Convert HTML to markdown if needed
if enriched_campaign['html'] and not enriched_campaign['plain_text']:
enriched_campaign['plain_text'] = self.convert_to_markdown(
enriched_campaign['html'],
content_type="text/html"
)
# Fetch metrics
report_data = self._fetch_campaign_report(campaign_id)
if report_data:
enriched_campaign['metrics'] = {
'emails_sent': report_data.get('emails_sent', 0),
'unique_opens': report_data.get('opens', {}).get('unique_opens', 0),
'open_rate': report_data.get('opens', {}).get('open_rate', 0),
'total_opens': report_data.get('opens', {}).get('opens_total', 0),
'unique_clicks': report_data.get('clicks', {}).get('unique_clicks', 0),
'click_rate': report_data.get('clicks', {}).get('click_rate', 0),
'total_clicks': report_data.get('clicks', {}).get('clicks_total', 0),
'unsubscribed': report_data.get('unsubscribed', 0),
'bounces': {
'hard': report_data.get('bounces', {}).get('hard_bounces', 0),
'soft': report_data.get('bounces', {}).get('soft_bounces', 0),
'syntax_errors': report_data.get('bounces', {}).get('syntax_errors', 0)
},
'abuse_reports': report_data.get('abuse_reports', 0),
'forwards': {
'count': report_data.get('forwards', {}).get('forwards_count', 0),
'opens': report_data.get('forwards', {}).get('forwards_opens', 0)
}
}
else:
enriched_campaign['metrics'] = {}
enriched_campaigns.append(enriched_campaign)
# Add small delay to avoid rate limiting
time.sleep(0.5)
return enriched_campaigns
except Exception as e:
self.logger.error(f"Error fetching campaigns: {e}")
return []
def format_markdown(self, campaigns: List[Dict[str, Any]]) -> str:
"""Format campaigns as markdown with enhanced metrics."""
markdown_sections = []
for campaign in campaigns:
section = []
# ID
section.append(f"# ID: {campaign.get('id', 'N/A')}")
section.append("")
# Title
section.append(f"## Title: {campaign.get('title', 'Untitled')}")
section.append("")
# Type
section.append(f"## Type: email_campaign")
section.append("")
# Send Time
send_time = campaign.get('send_time', '')
if send_time:
section.append(f"## Send Date: {send_time}")
section.append("")
# From and Reply-to
from_name = campaign.get('from_name', '')
reply_to = campaign.get('reply_to', '')
if from_name:
section.append(f"## From: {from_name}")
if reply_to:
section.append(f"## Reply To: {reply_to}")
section.append("")
# Archive URL
archive_url = campaign.get('long_archive_url') or campaign.get('archive_url', '')
if archive_url:
section.append(f"## Archive URL: {archive_url}")
section.append("")
# Metrics
metrics = campaign.get('metrics', {})
if metrics:
section.append("## Metrics:")
section.append(f"### Emails Sent: {metrics.get('emails_sent', 0)}")
section.append(f"### Opens: {metrics.get('unique_opens', 0)} unique ({metrics.get('open_rate', 0)*100:.1f}%)")
section.append(f"### Clicks: {metrics.get('unique_clicks', 0)} unique ({metrics.get('click_rate', 0)*100:.1f}%)")
section.append(f"### Unsubscribes: {metrics.get('unsubscribed', 0)}")
bounces = metrics.get('bounces', {})
total_bounces = bounces.get('hard', 0) + bounces.get('soft', 0)
if total_bounces > 0:
section.append(f"### Bounces: {total_bounces} (Hard: {bounces.get('hard', 0)}, Soft: {bounces.get('soft', 0)})")
if metrics.get('abuse_reports', 0) > 0:
section.append(f"### Abuse Reports: {metrics.get('abuse_reports', 0)}")
forwards = metrics.get('forwards', {})
if forwards.get('count', 0) > 0:
section.append(f"### Forwards: {forwards.get('count', 0)}")
section.append("")
# Preview Text
preview_text = campaign.get('preview_text', '')
if preview_text:
section.append(f"## Preview Text:")
section.append(preview_text)
section.append("")
# Content
content = campaign.get('plain_text', '')
if content:
section.append("## Content:")
section.append(content)
section.append("")
# Separator
section.append("-" * 50)
section.append("")
markdown_sections.append('\n'.join(section))
return '\n'.join(markdown_sections)
def get_incremental_items(self, items: List[Dict[str, Any]], state: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Get only new campaigns since last sync."""
if not state:
return items
last_campaign_id = state.get('last_campaign_id')
last_send_time = state.get('last_send_time')
if not last_campaign_id:
return items
# Filter for campaigns newer than the last synced
new_items = []
for item in items:
if item.get('id') == last_campaign_id:
break # Found the last synced campaign
# Also check by send time as backup
if last_send_time and item.get('send_time'):
if item['send_time'] <= last_send_time:
continue
new_items.append(item)
return new_items
def update_state(self, state: Dict[str, Any], items: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Update state with latest campaign information."""
if not items:
return state
# Get the first item (most recent)
latest_item = items[0]
state['last_campaign_id'] = latest_item.get('id')
state['last_send_time'] = latest_item.get('send_time')
state['last_campaign_title'] = latest_item.get('title')
state['last_sync'] = datetime.now(self.tz).isoformat()
state['campaign_count'] = len(items)
return state