import os import re import requests import time import random from typing import Any, Dict, List, Optional from datetime import datetime from pathlib import Path from bs4 import BeautifulSoup from src.base_scraper import BaseScraper, ScraperConfig class MailChimpArchiveScraper(BaseScraper): """MailChimp campaign archive scraper using web scraping to access historical content.""" def __init__(self, config: ScraperConfig): super().__init__(config) # Extract user and list IDs from the RSS URL rss_url = os.getenv('MAILCHIMP_RSS_URL', '') self.user_id = self._extract_param(rss_url, 'u') self.list_id = self._extract_param(rss_url, 'id') if not self.user_id or not self.list_id: self.logger.error("Could not extract user ID and list ID from MAILCHIMP_RSS_URL") # Archive base URL self.archive_base = f"https://us10.campaign-archive.com/home/?u={self.user_id}&id={self.list_id}" # Session for persistent connections self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) def _extract_param(self, url: str, param: str) -> str: """Extract parameter value from URL.""" match = re.search(f'{param}=([^&]+)', url) return match.group(1) if match else '' def _human_delay(self, min_seconds: float = 1, max_seconds: float = 3) -> None: """Add human-like delays between requests.""" delay = random.uniform(min_seconds, max_seconds) self.logger.debug(f"Waiting {delay:.2f} seconds...") time.sleep(delay) def fetch_archive_pages(self, max_pages: int = 50) -> List[str]: """Fetch campaign archive pages and extract individual campaign URLs.""" campaign_urls = [] page = 1 try: while page <= max_pages: # MailChimp archive pagination (if it exists) if page == 1: url = self.archive_base else: # Try common pagination patterns url = f"{self.archive_base}&page={page}" self.logger.info(f"Fetching archive page {page}: {url}") response = self.session.get(url, timeout=30) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') # Look for campaign links in various formats campaign_links = [] # Method 1: Look for direct campaign links for link in soup.find_all('a', href=True): href = link['href'] if 'campaign-archive.com' in href and '&e=' in href: if href not in campaign_links: campaign_links.append(href) # Method 2: Look for JavaScript-embedded campaign IDs scripts = soup.find_all('script') for script in scripts: if script.string: # Look for campaign IDs in JavaScript campaign_ids = re.findall(r'id["\']?\s*:\s*["\']([a-f0-9]+)["\']', script.string) for campaign_id in campaign_ids: campaign_url = f"https://us10.campaign-archive.com/?u={self.user_id}&id={campaign_id}" if campaign_url not in campaign_links: campaign_links.append(campaign_url) if not campaign_links: self.logger.info(f"No more campaigns found on page {page}, stopping") break campaign_urls.extend(campaign_links) self.logger.info(f"Found {len(campaign_links)} campaigns on page {page}") # Check for pagination indicators has_next = soup.find('a', string=re.compile(r'next|more|older', re.I)) if not has_next and page > 1: self.logger.info("No more pages found") break page += 1 self._human_delay(2, 5) # Be respectful to MailChimp except Exception as e: self.logger.error(f"Error fetching archive pages: {e}") # Remove duplicates and sort unique_urls = list(set(campaign_urls)) self.logger.info(f"Found {len(unique_urls)} unique campaign URLs") return unique_urls def fetch_campaign_content(self, campaign_url: str) -> Optional[Dict[str, Any]]: """Fetch content from a single campaign URL.""" try: self.logger.debug(f"Fetching campaign: {campaign_url}") response = self.session.get(campaign_url, timeout=30) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') # Extract campaign data campaign_data = { 'id': self._extract_campaign_id(campaign_url), 'title': self._extract_title(soup), 'date': self._extract_date(soup), 'content': self._extract_content(soup), 'link': campaign_url } return campaign_data except Exception as e: self.logger.error(f"Error fetching campaign {campaign_url}: {e}") return None def _extract_campaign_id(self, url: str) -> str: """Extract campaign ID from URL.""" match = re.search(r'id=([a-f0-9]+)', url) return match.group(1) if match else '' def _extract_title(self, soup: BeautifulSoup) -> str: """Extract campaign title.""" # Try multiple selectors for title title_selectors = ['title', 'h1', '.mcnTextContent h1', '.headerContainer h1'] for selector in title_selectors: element = soup.select_one(selector) if element and element.get_text(strip=True): title = element.get_text(strip=True) # Clean up common MailChimp title artifacts title = re.sub(r'\s*\|\s*HVAC Know It All.*$', '', title) return title return "Untitled Campaign" def _extract_date(self, soup: BeautifulSoup) -> str: """Extract campaign send date.""" # Look for date indicators in various formats date_patterns = [ r'(\w+ \d{1,2}, \d{4})', # January 15, 2023 r'(\d{1,2}/\d{1,2}/\d{4})', # 1/15/2023 r'(\d{4}-\d{2}-\d{2})', # 2023-01-15 ] # Search in text content text = soup.get_text() for pattern in date_patterns: match = re.search(pattern, text) if match: try: # Try to parse and standardize the date date_str = match.group(1) # You could add date parsing logic here return date_str except: continue # Fallback to current date if no date found return datetime.now(self.tz).isoformat() def _extract_content(self, soup: BeautifulSoup) -> str: """Extract campaign content.""" # Remove script and style elements for script in soup(["script", "style"]): script.decompose() # Try to find the main content area content_selectors = [ '.mcnTextContent', '.bodyContainer', '.templateContainer', '#templateBody', 'body' ] for selector in content_selectors: content_elem = soup.select_one(selector) if content_elem: # Convert to markdown-like format content = self.convert_to_markdown(str(content_elem)) if content and len(content.strip()) > 100: # Reasonable content length return content # Fallback to all text return soup.get_text(separator='\n', strip=True) def fetch_content(self, max_campaigns: int = 100) -> List[Dict[str, Any]]: """Fetch historical MailChimp campaigns.""" campaigns_data = [] try: self.logger.info(f"Starting MailChimp archive scraping for {max_campaigns} campaigns") # Get campaign URLs from archive pages campaign_urls = self.fetch_archive_pages(max_pages=20) if not campaign_urls: self.logger.warning("No campaign URLs found") return campaigns_data # Limit to requested number campaign_urls = campaign_urls[:max_campaigns] # Fetch content from each campaign for i, url in enumerate(campaign_urls): campaign_data = self.fetch_campaign_content(url) if campaign_data: campaigns_data.append(campaign_data) if (i + 1) % 10 == 0: self.logger.info(f"Processed {i + 1}/{len(campaign_urls)} campaigns") # Rate limiting self._human_delay(1, 3) self.logger.info(f"Successfully fetched {len(campaigns_data)} campaigns") except Exception as e: self.logger.error(f"Error in fetch_content: {e}") return campaigns_data def format_markdown(self, items: List[Dict[str, Any]]) -> str: """Format MailChimp campaigns as markdown.""" markdown_sections = [] for item in items: section = [] # ID section.append(f"# ID: {item.get('id', 'N/A')}") section.append("") # Title section.append(f"## Title: {item.get('title', 'Untitled')}") section.append("") # Date section.append(f"## Date: {item.get('date', '')}") section.append("") # Link section.append(f"## Link: {item.get('link', '')}") section.append("") # Content section.append("## Content:") content = item.get('content', '') if content: # Limit content length for readability if len(content) > 5000: content = content[:5000] + "..." section.append(content) section.append("") # Separator section.append("-" * 50) section.append("") markdown_sections.append('\n'.join(section)) return '\n'.join(markdown_sections) def get_incremental_items(self, items: List[Dict[str, Any]], state: Dict[str, Any]) -> List[Dict[str, Any]]: """Get only new campaigns since last sync.""" if not state: return items last_campaign_id = state.get('last_campaign_id') if not last_campaign_id: return items # Filter for campaigns newer than the last synced new_items = [] for item in items: if item.get('id') == last_campaign_id: break # Found the last synced campaign new_items.append(item) return new_items def update_state(self, state: Dict[str, Any], items: List[Dict[str, Any]]) -> Dict[str, Any]: """Update state with latest campaign information.""" if not items: return state # Get the first item (most recent) latest_item = items[0] state['last_campaign_id'] = latest_item.get('id') state['last_campaign_date'] = latest_item.get('date') state['last_sync'] = datetime.now(self.tz).isoformat() state['campaign_count'] = len(items) return state