From 7191fcd1329f6af35e1857c8eb356023bdcff6e4 Mon Sep 17 00:00:00 2001 From: Ben Reed Date: Mon, 18 Aug 2025 12:29:45 -0300 Subject: [PATCH] feat: Implement RSS scrapers for MailChimp and Podcast feeds MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Created base RSS scraper class with common functionality - Implemented MailChimp RSS scraper for newsletters - Implemented Podcast RSS scraper with audio/image extraction - State management for incremental updates - All 9 tests passing for RSS scrapers 🤖 Generated with Claude Code Co-Authored-By: Claude --- src/rss_scraper.py | 293 ++++++++++++++++++++++++++++++++++++++ tests/test_rss_scraper.py | 235 ++++++++++++++++++++++++++++++ 2 files changed, 528 insertions(+) create mode 100644 src/rss_scraper.py create mode 100644 tests/test_rss_scraper.py diff --git a/src/rss_scraper.py b/src/rss_scraper.py new file mode 100644 index 0000000..1d20483 --- /dev/null +++ b/src/rss_scraper.py @@ -0,0 +1,293 @@ +import os +import feedparser +from typing import Any, Dict, List, Optional +from datetime import datetime +from src.base_scraper import BaseScraper, ScraperConfig + + +class BaseRSSScraper(BaseScraper): + """Base class for RSS feed scrapers.""" + + def __init__(self, config: ScraperConfig, feed_url_env: str): + super().__init__(config) + self.feed_url = os.getenv(feed_url_env, '') + if not self.feed_url: + self.logger.error(f"No feed URL found in environment variable {feed_url_env}") + + def fetch_feed(self) -> List[Dict[str, Any]]: + """Fetch and parse RSS feed.""" + try: + self.logger.info(f"Fetching RSS feed from {self.feed_url}") + feed = feedparser.parse(self.feed_url) + + # Check if it's a feedparser result or a dict (for testing) + if hasattr(feed, 'bozo'): + if feed.bozo: + self.logger.warning(f"Feed parsing had issues: {feed.bozo_exception}") + entries = feed.get('entries', []) + else: + # Handle dict input (for testing) + entries = feed.get('entries', []) if isinstance(feed, dict) else [] + + self.logger.info(f"Found {len(entries)} entries in feed") + + # Convert entries to list of dicts for easier handling + processed_entries = [] + for entry in entries: + processed_entry = dict(entry) + processed_entries.append(processed_entry) + + return processed_entries + + except Exception as e: + self.logger.error(f"Error fetching RSS feed: {e}") + return [] + + def fetch_content(self) -> List[Dict[str, Any]]: + """Fetch content from RSS feed.""" + return self.fetch_feed() + + def get_incremental_items(self, items: List[Dict[str, Any]], state: Dict[str, Any]) -> List[Dict[str, Any]]: + """Get only new items since last sync.""" + if not state: + return items + + last_item_id = state.get('last_item_id') + if not last_item_id: + return items + + # Find the index of the last synced item + last_index = -1 + for i, item in enumerate(items): + if item.get('id') == last_item_id or item.get('guid') == last_item_id: + last_index = i + break + + if last_index == -1: + # Last item not found, return all items + self.logger.warning(f"Last synced item {last_item_id} not found in feed") + return items + + # Return only items before the last synced one (newer items) + return items[:last_index] + + def update_state(self, state: Dict[str, Any], items: List[Dict[str, Any]]) -> Dict[str, Any]: + """Update state with latest item information.""" + if not items: + return state + + # Get the first item (most recent) + latest_item = items[0] + + state['last_item_id'] = latest_item.get('id') or latest_item.get('guid') + state['last_item_date'] = latest_item.get('published') + state['last_sync'] = datetime.now(self.tz).isoformat() + state['item_count'] = len(items) + + return state + + +class RSSScraperMailChimp(BaseRSSScraper): + """MailChimp RSS feed scraper.""" + + def __init__(self, config: ScraperConfig): + super().__init__(config, 'MAILCHIMP_RSS_URL') + + def format_markdown(self, items: List[Dict[str, Any]]) -> str: + """Format RSS items as markdown.""" + markdown_sections = [] + + for item in items: + section = [] + + # ID + item_id = item.get('id') or item.get('guid', 'N/A') + section.append(f"# ID: {item_id}") + section.append("") + + # Title + title = item.get('title', 'Untitled') + section.append(f"## Title: {title}") + section.append("") + + # Type + section.append("## Type: newsletter") + section.append("") + + # Link + link = item.get('link', '') + section.append(f"## Link: {link}") + section.append("") + + # Publish Date + pub_date = item.get('published') or item.get('pubDate', '') + section.append(f"## Publish Date: {pub_date}") + section.append("") + + # Content/Description + section.append("## Content:") + + # Try to get full content first, then summary, then description + content = item.get('content') + if content and isinstance(content, list) and len(content) > 0: + content_html = content[0].get('value', '') + if content_html: + content_md = self.convert_to_markdown(content_html) + section.append(content_md) + elif item.get('summary'): + summary_md = self.convert_to_markdown(item.get('summary')) + section.append(summary_md) + elif item.get('description'): + desc_md = self.convert_to_markdown(item.get('description')) + section.append(desc_md) + + section.append("") + + # Separator + section.append("-" * 50) + section.append("") + + markdown_sections.append('\n'.join(section)) + + return '\n'.join(markdown_sections) + + +class RSSScraperPodcast(BaseRSSScraper): + """Podcast RSS feed scraper.""" + + def __init__(self, config: ScraperConfig): + super().__init__(config, 'PODCAST_RSS_URL') + + def extract_audio_link(self, item: Dict[str, Any]) -> Optional[str]: + """Extract audio link from podcast item.""" + # Check for enclosures (standard podcast format) + enclosures = item.get('enclosures', []) + for enclosure in enclosures: + if 'href' in enclosure: + return enclosure['href'] + elif 'url' in enclosure: + return enclosure['url'] + + # Check for media content + media_content = item.get('media_content', []) + for media in media_content: + if media.get('type', '').startswith('audio'): + return media.get('url') + + return None + + def extract_image_link(self, item: Dict[str, Any]) -> Optional[str]: + """Extract image link from podcast item.""" + # Check for iTunes image + if 'image' in item: + if isinstance(item['image'], dict): + return item['image'].get('href') or item['image'].get('url') + elif isinstance(item['image'], str): + return item['image'] + + # Check for media thumbnail + media_thumbnail = item.get('media_thumbnail', []) + if media_thumbnail and len(media_thumbnail) > 0: + return media_thumbnail[0].get('url') + + return None + + def fetch_content(self) -> List[Dict[str, Any]]: + """Fetch and enrich podcast content.""" + items = super().fetch_content() + + # Enrich with audio and image links + for item in items: + item['audio_link'] = self.extract_audio_link(item) + item['image_link'] = self.extract_image_link(item) + item['duration'] = item.get('itunes_duration', '') + + # Download audio file if configured + if item['audio_link'] and self.config.data_dir: + # Extract filename from URL + audio_filename = item['audio_link'].split('/')[-1] + # Store reference for later download + item['audio_filename'] = audio_filename + + return items + + def format_markdown(self, items: List[Dict[str, Any]]) -> str: + """Format podcast items as markdown.""" + markdown_sections = [] + + for item in items: + section = [] + + # ID + item_id = item.get('id') or item.get('guid', 'N/A') + section.append(f"# ID: {item_id}") + section.append("") + + # Title + title = item.get('title', 'Untitled') + section.append(f"## Title: {title}") + section.append("") + + # Subtitle + subtitle = item.get('subtitle', '') + if subtitle: + section.append(f"## Subtitle: {subtitle}") + section.append("") + + # Type + section.append("## Type: podcast") + section.append("") + + # Author + author = item.get('author') or item.get('itunes_author', 'Unknown') + section.append(f"## Author: {author}") + section.append("") + + # Publish Date + pub_date = item.get('published') or item.get('pubDate', '') + section.append(f"## Publish Date: {pub_date}") + section.append("") + + # Duration + duration = item.get('duration') or item.get('itunes_duration', '') + if duration: + section.append(f"## Duration: {duration}") + section.append("") + + # Audio Link + audio_link = item.get('audio_link', '') + if audio_link: + section.append(f"## Audio Link: {audio_link}") + section.append("") + + # Image + image_link = item.get('image_link', '') + if image_link: + section.append(f"## Image: {image_link}") + section.append("") + + # Episode Link + link = item.get('link', '') + section.append(f"## Episode Link: {link}") + section.append("") + + # Description + section.append("## Description:") + + # Try to get description + if item.get('description'): + desc_md = self.convert_to_markdown(item.get('description')) + section.append(desc_md) + elif item.get('summary'): + summary_md = self.convert_to_markdown(item.get('summary')) + section.append(summary_md) + + section.append("") + + # Separator + section.append("-" * 50) + section.append("") + + markdown_sections.append('\n'.join(section)) + + return '\n'.join(markdown_sections) \ No newline at end of file diff --git a/tests/test_rss_scraper.py b/tests/test_rss_scraper.py new file mode 100644 index 0000000..6f15812 --- /dev/null +++ b/tests/test_rss_scraper.py @@ -0,0 +1,235 @@ +import pytest +from unittest.mock import Mock, patch, MagicMock +from datetime import datetime +from pathlib import Path +from src.rss_scraper import RSSScraperMailChimp, RSSScraperPodcast +from src.base_scraper import ScraperConfig + + +class TestRSSScraperMailChimp: + @pytest.fixture + def config(self): + return ScraperConfig( + source_name="mailchimp", + brand_name="hvacknowitall", + data_dir=Path("data"), + logs_dir=Path("logs"), + timezone="America/Halifax" + ) + + @pytest.fixture + def mock_env(self): + with patch.dict('os.environ', { + 'MAILCHIMP_RSS_URL': 'https://hvacknowitall.com/feed/' + }): + yield + + @pytest.fixture + def sample_rss_feed(self): + return """ + + + HVAC Know It All Newsletter + https://hvacknowitall.com + HVAC Tips and Tricks + + Newsletter Issue 1 + https://hvacknowitall.com/newsletter/1 + Newsletter content 1 + Mon, 01 Jan 2024 12:00:00 GMT + newsletter-1 + + + Newsletter Issue 2 + https://hvacknowitall.com/newsletter/2 + Newsletter content 2 + Tue, 02 Jan 2024 12:00:00 GMT + newsletter-2 + + + """ + + def test_initialization(self, config, mock_env): + scraper = RSSScraperMailChimp(config) + assert scraper.config == config + assert scraper.feed_url == 'https://hvacknowitall.com/feed/' + + @patch('feedparser.parse') + def test_fetch_feed(self, mock_parse, config, mock_env, sample_rss_feed): + mock_feed = { + 'entries': [ + { + 'title': 'Newsletter Issue 1', + 'link': 'https://hvacknowitall.com/newsletter/1', + 'description': 'Newsletter content 1', + 'published': 'Mon, 01 Jan 2024 12:00:00 GMT', + 'id': 'newsletter-1' + }, + { + 'title': 'Newsletter Issue 2', + 'link': 'https://hvacknowitall.com/newsletter/2', + 'description': 'Newsletter content 2', + 'published': 'Tue, 02 Jan 2024 12:00:00 GMT', + 'id': 'newsletter-2' + } + ] + } + mock_parse.return_value = mock_feed + + scraper = RSSScraperMailChimp(config) + entries = scraper.fetch_feed() + + assert len(entries) == 2 + assert entries[0]['title'] == 'Newsletter Issue 1' + mock_parse.assert_called_once_with(scraper.feed_url) + + def test_format_markdown(self, config, mock_env): + scraper = RSSScraperMailChimp(config) + + items = [ + { + 'id': 'newsletter-1', + 'title': 'Newsletter Issue 1', + 'link': 'https://hvacknowitall.com/newsletter/1', + 'description': 'Newsletter content 1', + 'published': 'Mon, 01 Jan 2024 12:00:00 GMT' + } + ] + + markdown = scraper.format_markdown(items) + + assert '# ID: newsletter-1' in markdown + assert '## Title: Newsletter Issue 1' in markdown + assert '## Type: newsletter' in markdown + assert '## Link: https://hvacknowitall.com/newsletter/1' in markdown + assert '## Publish Date: Mon, 01 Jan 2024 12:00:00 GMT' in markdown + assert 'Newsletter content 1' in markdown + + def test_get_incremental_items(self, config, mock_env): + scraper = RSSScraperMailChimp(config) + + items = [ + {'id': 'newsletter-3', 'published': 'Wed, 03 Jan 2024 12:00:00 GMT'}, + {'id': 'newsletter-2', 'published': 'Tue, 02 Jan 2024 12:00:00 GMT'}, + {'id': 'newsletter-1', 'published': 'Mon, 01 Jan 2024 12:00:00 GMT'} + ] + + # Test with no previous state + state = {} + new_items = scraper.get_incremental_items(items, state) + assert len(new_items) == 3 + + # Test with existing state + state = {'last_item_id': 'newsletter-2'} + new_items = scraper.get_incremental_items(items, state) + assert len(new_items) == 1 + assert new_items[0]['id'] == 'newsletter-3' + + +class TestRSSScraperPodcast: + @pytest.fixture + def config(self): + return ScraperConfig( + source_name="podcast", + brand_name="hvacknowitall", + data_dir=Path("data"), + logs_dir=Path("logs"), + timezone="America/Halifax" + ) + + @pytest.fixture + def mock_env(self): + with patch.dict('os.environ', { + 'PODCAST_RSS_URL': 'https://hvacknowitall.com/podcast/feed/' + }): + yield + + @pytest.fixture + def sample_podcast_feed(self): + return { + 'entries': [ + { + 'title': 'Episode 1: HVAC Basics', + 'subtitle': 'Learn the basics', + 'link': 'https://hvacknowitall.com/podcast/1', + 'description': 'In this episode we discuss HVAC basics', + 'published': 'Mon, 01 Jan 2024 12:00:00 GMT', + 'id': 'episode-1', + 'author': 'John Doe', + 'enclosures': [{'href': 'https://hvacknowitall.com/audio/ep1.mp3'}], + 'itunes_duration': '45:30', + 'image': {'href': 'https://hvacknowitall.com/images/ep1.jpg'} + } + ] + } + + def test_initialization(self, config, mock_env): + scraper = RSSScraperPodcast(config) + assert scraper.config == config + assert scraper.feed_url == 'https://hvacknowitall.com/podcast/feed/' + + @patch('feedparser.parse') + def test_fetch_feed_with_podcast_fields(self, mock_parse, config, mock_env, sample_podcast_feed): + mock_parse.return_value = sample_podcast_feed + + scraper = RSSScraperPodcast(config) + entries = scraper.fetch_feed() + + assert len(entries) == 1 + entry = entries[0] + assert entry['title'] == 'Episode 1: HVAC Basics' + assert entry.get('subtitle') == 'Learn the basics' + assert entry.get('author') == 'John Doe' + assert entry.get('itunes_duration') == '45:30' + + def test_extract_audio_link(self, config, mock_env): + scraper = RSSScraperPodcast(config) + + item = { + 'enclosures': [ + {'href': 'https://hvacknowitall.com/audio/ep1.mp3', 'type': 'audio/mpeg'} + ] + } + + audio_link = scraper.extract_audio_link(item) + assert audio_link == 'https://hvacknowitall.com/audio/ep1.mp3' + + def test_extract_image_link(self, config, mock_env): + scraper = RSSScraperPodcast(config) + + item = { + 'image': {'href': 'https://hvacknowitall.com/images/ep1.jpg'} + } + + image_link = scraper.extract_image_link(item) + assert image_link == 'https://hvacknowitall.com/images/ep1.jpg' + + def test_format_markdown_podcast(self, config, mock_env): + scraper = RSSScraperPodcast(config) + + items = [ + { + 'id': 'episode-1', + 'title': 'Episode 1: HVAC Basics', + 'subtitle': 'Learn the basics', + 'link': 'https://hvacknowitall.com/podcast/1', + 'description': 'In this episode we discuss HVAC basics', + 'published': 'Mon, 01 Jan 2024 12:00:00 GMT', + 'author': 'John Doe', + 'audio_link': 'https://hvacknowitall.com/audio/ep1.mp3', + 'duration': '45:30', + 'image_link': 'https://hvacknowitall.com/images/ep1.jpg' + } + ] + + markdown = scraper.format_markdown(items) + + assert '# ID: episode-1' in markdown + assert '## Title: Episode 1: HVAC Basics' in markdown + assert '## Subtitle: Learn the basics' in markdown + assert '## Type: podcast' in markdown + assert '## Author: John Doe' in markdown + assert '## Duration: 45:30' in markdown + assert '## Audio Link: https://hvacknowitall.com/audio/ep1.mp3' in markdown + assert '## Image: https://hvacknowitall.com/images/ep1.jpg' in markdown + assert '## Episode Link: https://hvacknowitall.com/podcast/1' in markdown \ No newline at end of file