feat: Implement RSS scrapers for MailChimp and Podcast feeds

- Created base RSS scraper class with common functionality
- Implemented MailChimp RSS scraper for newsletters
- Implemented Podcast RSS scraper with audio/image extraction
- State management for incremental updates
- All 9 tests passing for RSS scrapers

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Ben Reed 2025-08-18 12:29:45 -03:00
parent 95e0499791
commit 7191fcd132
2 changed files with 528 additions and 0 deletions

293
src/rss_scraper.py Normal file
View file

@ -0,0 +1,293 @@
import os
import feedparser
from typing import Any, Dict, List, Optional
from datetime import datetime
from src.base_scraper import BaseScraper, ScraperConfig
class BaseRSSScraper(BaseScraper):
"""Base class for RSS feed scrapers."""
def __init__(self, config: ScraperConfig, feed_url_env: str):
super().__init__(config)
self.feed_url = os.getenv(feed_url_env, '')
if not self.feed_url:
self.logger.error(f"No feed URL found in environment variable {feed_url_env}")
def fetch_feed(self) -> List[Dict[str, Any]]:
"""Fetch and parse RSS feed."""
try:
self.logger.info(f"Fetching RSS feed from {self.feed_url}")
feed = feedparser.parse(self.feed_url)
# Check if it's a feedparser result or a dict (for testing)
if hasattr(feed, 'bozo'):
if feed.bozo:
self.logger.warning(f"Feed parsing had issues: {feed.bozo_exception}")
entries = feed.get('entries', [])
else:
# Handle dict input (for testing)
entries = feed.get('entries', []) if isinstance(feed, dict) else []
self.logger.info(f"Found {len(entries)} entries in feed")
# Convert entries to list of dicts for easier handling
processed_entries = []
for entry in entries:
processed_entry = dict(entry)
processed_entries.append(processed_entry)
return processed_entries
except Exception as e:
self.logger.error(f"Error fetching RSS feed: {e}")
return []
def fetch_content(self) -> List[Dict[str, Any]]:
"""Fetch content from RSS feed."""
return self.fetch_feed()
def get_incremental_items(self, items: List[Dict[str, Any]], state: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Get only new items since last sync."""
if not state:
return items
last_item_id = state.get('last_item_id')
if not last_item_id:
return items
# Find the index of the last synced item
last_index = -1
for i, item in enumerate(items):
if item.get('id') == last_item_id or item.get('guid') == last_item_id:
last_index = i
break
if last_index == -1:
# Last item not found, return all items
self.logger.warning(f"Last synced item {last_item_id} not found in feed")
return items
# Return only items before the last synced one (newer items)
return items[:last_index]
def update_state(self, state: Dict[str, Any], items: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Update state with latest item information."""
if not items:
return state
# Get the first item (most recent)
latest_item = items[0]
state['last_item_id'] = latest_item.get('id') or latest_item.get('guid')
state['last_item_date'] = latest_item.get('published')
state['last_sync'] = datetime.now(self.tz).isoformat()
state['item_count'] = len(items)
return state
class RSSScraperMailChimp(BaseRSSScraper):
"""MailChimp RSS feed scraper."""
def __init__(self, config: ScraperConfig):
super().__init__(config, 'MAILCHIMP_RSS_URL')
def format_markdown(self, items: List[Dict[str, Any]]) -> str:
"""Format RSS items as markdown."""
markdown_sections = []
for item in items:
section = []
# ID
item_id = item.get('id') or item.get('guid', 'N/A')
section.append(f"# ID: {item_id}")
section.append("")
# Title
title = item.get('title', 'Untitled')
section.append(f"## Title: {title}")
section.append("")
# Type
section.append("## Type: newsletter")
section.append("")
# Link
link = item.get('link', '')
section.append(f"## Link: {link}")
section.append("")
# Publish Date
pub_date = item.get('published') or item.get('pubDate', '')
section.append(f"## Publish Date: {pub_date}")
section.append("")
# Content/Description
section.append("## Content:")
# Try to get full content first, then summary, then description
content = item.get('content')
if content and isinstance(content, list) and len(content) > 0:
content_html = content[0].get('value', '')
if content_html:
content_md = self.convert_to_markdown(content_html)
section.append(content_md)
elif item.get('summary'):
summary_md = self.convert_to_markdown(item.get('summary'))
section.append(summary_md)
elif item.get('description'):
desc_md = self.convert_to_markdown(item.get('description'))
section.append(desc_md)
section.append("")
# Separator
section.append("-" * 50)
section.append("")
markdown_sections.append('\n'.join(section))
return '\n'.join(markdown_sections)
class RSSScraperPodcast(BaseRSSScraper):
"""Podcast RSS feed scraper."""
def __init__(self, config: ScraperConfig):
super().__init__(config, 'PODCAST_RSS_URL')
def extract_audio_link(self, item: Dict[str, Any]) -> Optional[str]:
"""Extract audio link from podcast item."""
# Check for enclosures (standard podcast format)
enclosures = item.get('enclosures', [])
for enclosure in enclosures:
if 'href' in enclosure:
return enclosure['href']
elif 'url' in enclosure:
return enclosure['url']
# Check for media content
media_content = item.get('media_content', [])
for media in media_content:
if media.get('type', '').startswith('audio'):
return media.get('url')
return None
def extract_image_link(self, item: Dict[str, Any]) -> Optional[str]:
"""Extract image link from podcast item."""
# Check for iTunes image
if 'image' in item:
if isinstance(item['image'], dict):
return item['image'].get('href') or item['image'].get('url')
elif isinstance(item['image'], str):
return item['image']
# Check for media thumbnail
media_thumbnail = item.get('media_thumbnail', [])
if media_thumbnail and len(media_thumbnail) > 0:
return media_thumbnail[0].get('url')
return None
def fetch_content(self) -> List[Dict[str, Any]]:
"""Fetch and enrich podcast content."""
items = super().fetch_content()
# Enrich with audio and image links
for item in items:
item['audio_link'] = self.extract_audio_link(item)
item['image_link'] = self.extract_image_link(item)
item['duration'] = item.get('itunes_duration', '')
# Download audio file if configured
if item['audio_link'] and self.config.data_dir:
# Extract filename from URL
audio_filename = item['audio_link'].split('/')[-1]
# Store reference for later download
item['audio_filename'] = audio_filename
return items
def format_markdown(self, items: List[Dict[str, Any]]) -> str:
"""Format podcast items as markdown."""
markdown_sections = []
for item in items:
section = []
# ID
item_id = item.get('id') or item.get('guid', 'N/A')
section.append(f"# ID: {item_id}")
section.append("")
# Title
title = item.get('title', 'Untitled')
section.append(f"## Title: {title}")
section.append("")
# Subtitle
subtitle = item.get('subtitle', '')
if subtitle:
section.append(f"## Subtitle: {subtitle}")
section.append("")
# Type
section.append("## Type: podcast")
section.append("")
# Author
author = item.get('author') or item.get('itunes_author', 'Unknown')
section.append(f"## Author: {author}")
section.append("")
# Publish Date
pub_date = item.get('published') or item.get('pubDate', '')
section.append(f"## Publish Date: {pub_date}")
section.append("")
# Duration
duration = item.get('duration') or item.get('itunes_duration', '')
if duration:
section.append(f"## Duration: {duration}")
section.append("")
# Audio Link
audio_link = item.get('audio_link', '')
if audio_link:
section.append(f"## Audio Link: {audio_link}")
section.append("")
# Image
image_link = item.get('image_link', '')
if image_link:
section.append(f"## Image: {image_link}")
section.append("")
# Episode Link
link = item.get('link', '')
section.append(f"## Episode Link: {link}")
section.append("")
# Description
section.append("## Description:")
# Try to get description
if item.get('description'):
desc_md = self.convert_to_markdown(item.get('description'))
section.append(desc_md)
elif item.get('summary'):
summary_md = self.convert_to_markdown(item.get('summary'))
section.append(summary_md)
section.append("")
# Separator
section.append("-" * 50)
section.append("")
markdown_sections.append('\n'.join(section))
return '\n'.join(markdown_sections)

235
tests/test_rss_scraper.py Normal file
View file

@ -0,0 +1,235 @@
import pytest
from unittest.mock import Mock, patch, MagicMock
from datetime import datetime
from pathlib import Path
from src.rss_scraper import RSSScraperMailChimp, RSSScraperPodcast
from src.base_scraper import ScraperConfig
class TestRSSScraperMailChimp:
@pytest.fixture
def config(self):
return ScraperConfig(
source_name="mailchimp",
brand_name="hvacknowitall",
data_dir=Path("data"),
logs_dir=Path("logs"),
timezone="America/Halifax"
)
@pytest.fixture
def mock_env(self):
with patch.dict('os.environ', {
'MAILCHIMP_RSS_URL': 'https://hvacknowitall.com/feed/'
}):
yield
@pytest.fixture
def sample_rss_feed(self):
return """<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0">
<channel>
<title>HVAC Know It All Newsletter</title>
<link>https://hvacknowitall.com</link>
<description>HVAC Tips and Tricks</description>
<item>
<title>Newsletter Issue 1</title>
<link>https://hvacknowitall.com/newsletter/1</link>
<description>Newsletter content 1</description>
<pubDate>Mon, 01 Jan 2024 12:00:00 GMT</pubDate>
<guid>newsletter-1</guid>
</item>
<item>
<title>Newsletter Issue 2</title>
<link>https://hvacknowitall.com/newsletter/2</link>
<description>Newsletter content 2</description>
<pubDate>Tue, 02 Jan 2024 12:00:00 GMT</pubDate>
<guid>newsletter-2</guid>
</item>
</channel>
</rss>"""
def test_initialization(self, config, mock_env):
scraper = RSSScraperMailChimp(config)
assert scraper.config == config
assert scraper.feed_url == 'https://hvacknowitall.com/feed/'
@patch('feedparser.parse')
def test_fetch_feed(self, mock_parse, config, mock_env, sample_rss_feed):
mock_feed = {
'entries': [
{
'title': 'Newsletter Issue 1',
'link': 'https://hvacknowitall.com/newsletter/1',
'description': 'Newsletter content 1',
'published': 'Mon, 01 Jan 2024 12:00:00 GMT',
'id': 'newsletter-1'
},
{
'title': 'Newsletter Issue 2',
'link': 'https://hvacknowitall.com/newsletter/2',
'description': 'Newsletter content 2',
'published': 'Tue, 02 Jan 2024 12:00:00 GMT',
'id': 'newsletter-2'
}
]
}
mock_parse.return_value = mock_feed
scraper = RSSScraperMailChimp(config)
entries = scraper.fetch_feed()
assert len(entries) == 2
assert entries[0]['title'] == 'Newsletter Issue 1'
mock_parse.assert_called_once_with(scraper.feed_url)
def test_format_markdown(self, config, mock_env):
scraper = RSSScraperMailChimp(config)
items = [
{
'id': 'newsletter-1',
'title': 'Newsletter Issue 1',
'link': 'https://hvacknowitall.com/newsletter/1',
'description': 'Newsletter content 1',
'published': 'Mon, 01 Jan 2024 12:00:00 GMT'
}
]
markdown = scraper.format_markdown(items)
assert '# ID: newsletter-1' in markdown
assert '## Title: Newsletter Issue 1' in markdown
assert '## Type: newsletter' in markdown
assert '## Link: https://hvacknowitall.com/newsletter/1' in markdown
assert '## Publish Date: Mon, 01 Jan 2024 12:00:00 GMT' in markdown
assert 'Newsletter content 1' in markdown
def test_get_incremental_items(self, config, mock_env):
scraper = RSSScraperMailChimp(config)
items = [
{'id': 'newsletter-3', 'published': 'Wed, 03 Jan 2024 12:00:00 GMT'},
{'id': 'newsletter-2', 'published': 'Tue, 02 Jan 2024 12:00:00 GMT'},
{'id': 'newsletter-1', 'published': 'Mon, 01 Jan 2024 12:00:00 GMT'}
]
# Test with no previous state
state = {}
new_items = scraper.get_incremental_items(items, state)
assert len(new_items) == 3
# Test with existing state
state = {'last_item_id': 'newsletter-2'}
new_items = scraper.get_incremental_items(items, state)
assert len(new_items) == 1
assert new_items[0]['id'] == 'newsletter-3'
class TestRSSScraperPodcast:
@pytest.fixture
def config(self):
return ScraperConfig(
source_name="podcast",
brand_name="hvacknowitall",
data_dir=Path("data"),
logs_dir=Path("logs"),
timezone="America/Halifax"
)
@pytest.fixture
def mock_env(self):
with patch.dict('os.environ', {
'PODCAST_RSS_URL': 'https://hvacknowitall.com/podcast/feed/'
}):
yield
@pytest.fixture
def sample_podcast_feed(self):
return {
'entries': [
{
'title': 'Episode 1: HVAC Basics',
'subtitle': 'Learn the basics',
'link': 'https://hvacknowitall.com/podcast/1',
'description': 'In this episode we discuss HVAC basics',
'published': 'Mon, 01 Jan 2024 12:00:00 GMT',
'id': 'episode-1',
'author': 'John Doe',
'enclosures': [{'href': 'https://hvacknowitall.com/audio/ep1.mp3'}],
'itunes_duration': '45:30',
'image': {'href': 'https://hvacknowitall.com/images/ep1.jpg'}
}
]
}
def test_initialization(self, config, mock_env):
scraper = RSSScraperPodcast(config)
assert scraper.config == config
assert scraper.feed_url == 'https://hvacknowitall.com/podcast/feed/'
@patch('feedparser.parse')
def test_fetch_feed_with_podcast_fields(self, mock_parse, config, mock_env, sample_podcast_feed):
mock_parse.return_value = sample_podcast_feed
scraper = RSSScraperPodcast(config)
entries = scraper.fetch_feed()
assert len(entries) == 1
entry = entries[0]
assert entry['title'] == 'Episode 1: HVAC Basics'
assert entry.get('subtitle') == 'Learn the basics'
assert entry.get('author') == 'John Doe'
assert entry.get('itunes_duration') == '45:30'
def test_extract_audio_link(self, config, mock_env):
scraper = RSSScraperPodcast(config)
item = {
'enclosures': [
{'href': 'https://hvacknowitall.com/audio/ep1.mp3', 'type': 'audio/mpeg'}
]
}
audio_link = scraper.extract_audio_link(item)
assert audio_link == 'https://hvacknowitall.com/audio/ep1.mp3'
def test_extract_image_link(self, config, mock_env):
scraper = RSSScraperPodcast(config)
item = {
'image': {'href': 'https://hvacknowitall.com/images/ep1.jpg'}
}
image_link = scraper.extract_image_link(item)
assert image_link == 'https://hvacknowitall.com/images/ep1.jpg'
def test_format_markdown_podcast(self, config, mock_env):
scraper = RSSScraperPodcast(config)
items = [
{
'id': 'episode-1',
'title': 'Episode 1: HVAC Basics',
'subtitle': 'Learn the basics',
'link': 'https://hvacknowitall.com/podcast/1',
'description': 'In this episode we discuss HVAC basics',
'published': 'Mon, 01 Jan 2024 12:00:00 GMT',
'author': 'John Doe',
'audio_link': 'https://hvacknowitall.com/audio/ep1.mp3',
'duration': '45:30',
'image_link': 'https://hvacknowitall.com/images/ep1.jpg'
}
]
markdown = scraper.format_markdown(items)
assert '# ID: episode-1' in markdown
assert '## Title: Episode 1: HVAC Basics' in markdown
assert '## Subtitle: Learn the basics' in markdown
assert '## Type: podcast' in markdown
assert '## Author: John Doe' in markdown
assert '## Duration: 45:30' in markdown
assert '## Audio Link: https://hvacknowitall.com/audio/ep1.mp3' in markdown
assert '## Image: https://hvacknowitall.com/images/ep1.jpg' in markdown
assert '## Episode Link: https://hvacknowitall.com/podcast/1' in markdown