- Implement Instagram scraper with aggressive rate limiting
- Add orchestrator for running all scrapers in parallel
- Create comprehensive tests for Instagram scraper (11 tests)
- Create tests for orchestrator (9 tests)
- Fix Instagram test issues with post type detection
- All 60 tests passing successfully
🤖 Generated with Claude Code
Co-Authored-By: Claude <noreply@anthropic.com>
186 lines
No EOL
7.5 KiB
Python
186 lines
No EOL
7.5 KiB
Python
import pytest
|
|
from unittest.mock import Mock, patch, MagicMock
|
|
from pathlib import Path
|
|
import json
|
|
from src.orchestrator import ScraperOrchestrator
|
|
from src.base_scraper import ScraperConfig
|
|
|
|
|
|
class TestScraperOrchestrator:
|
|
@pytest.fixture
|
|
def orchestrator(self):
|
|
return ScraperOrchestrator(
|
|
base_data_dir=Path("test_data"),
|
|
base_logs_dir=Path("test_logs"),
|
|
brand_name="test_brand",
|
|
timezone="America/Halifax"
|
|
)
|
|
|
|
@pytest.fixture
|
|
def mock_scrapers(self):
|
|
"""Create mock scrapers."""
|
|
mock_wordpress = MagicMock()
|
|
mock_wordpress.load_state.return_value = {}
|
|
mock_wordpress.fetch_content.return_value = [
|
|
{'id': '1', 'title': 'Post 1'},
|
|
{'id': '2', 'title': 'Post 2'}
|
|
]
|
|
mock_wordpress.get_incremental_items.return_value = [{'id': '2', 'title': 'Post 2'}]
|
|
mock_wordpress.format_markdown.return_value = "# Post 2"
|
|
mock_wordpress.generate_filename.return_value = "test_wordpress.md"
|
|
mock_wordpress.update_state.return_value = {'last_id': '2'}
|
|
|
|
mock_youtube = MagicMock()
|
|
mock_youtube.load_state.return_value = {}
|
|
mock_youtube.fetch_content.return_value = [
|
|
{'id': 'vid1', 'title': 'Video 1'}
|
|
]
|
|
mock_youtube.get_incremental_items.return_value = [{'id': 'vid1', 'title': 'Video 1'}]
|
|
mock_youtube.format_markdown.return_value = "# Video 1"
|
|
mock_youtube.generate_filename.return_value = "test_youtube.md"
|
|
mock_youtube.update_state.return_value = {'last_video_id': 'vid1'}
|
|
|
|
return [
|
|
("WordPress", mock_wordpress),
|
|
("YouTube", mock_youtube)
|
|
]
|
|
|
|
def test_initialization(self, orchestrator):
|
|
assert orchestrator.base_data_dir == Path("test_data")
|
|
assert orchestrator.base_logs_dir == Path("test_logs")
|
|
assert orchestrator.brand_name == "test_brand"
|
|
assert orchestrator.timezone == "America/Halifax"
|
|
|
|
@patch('src.orchestrator.InstagramScraper')
|
|
@patch('src.orchestrator.RSSScraperPodcast')
|
|
@patch('src.orchestrator.RSSScraperMailChimp')
|
|
@patch('src.orchestrator.WordPressScraper')
|
|
@patch('src.orchestrator.YouTubeScraper')
|
|
def test_initialize_scrapers(self, mock_youtube_class, mock_wordpress_class,
|
|
mock_mailchimp_class, mock_podcast_class, mock_instagram_class):
|
|
# Create a clean environment with only specific scrapers enabled
|
|
with patch.dict('os.environ', {
|
|
'WORDPRESS_API_URL': 'https://test.com/wp-json',
|
|
'YOUTUBE_CHANNEL_URL': 'https://youtube.com/@test',
|
|
# Clear other environment variables
|
|
'MAILCHIMP_RSS_URL': '',
|
|
'PODCAST_RSS_URL': '',
|
|
'INSTAGRAM_USERNAME': ''
|
|
}, clear=True):
|
|
orchestrator = ScraperOrchestrator()
|
|
# Should only have WordPress and YouTube scrapers
|
|
assert len(orchestrator.scrapers) == 2
|
|
names = [name for name, _ in orchestrator.scrapers]
|
|
assert 'WordPress' in names
|
|
assert 'YouTube' in names
|
|
|
|
def test_run_scraper_success(self, orchestrator, mock_scrapers):
|
|
orchestrator.scrapers = mock_scrapers
|
|
|
|
# Run first scraper
|
|
result = orchestrator._run_scraper(mock_scrapers[0])
|
|
|
|
assert result['name'] == 'WordPress'
|
|
assert result['status'] == 'success'
|
|
assert result['items_count'] == 2
|
|
assert result['new_items'] == 1
|
|
assert result['error'] is None
|
|
|
|
def test_run_scraper_error(self, orchestrator):
|
|
mock_scraper = MagicMock()
|
|
mock_scraper.load_state.side_effect = Exception("Test error")
|
|
|
|
result = orchestrator._run_scraper(("TestScraper", mock_scraper))
|
|
|
|
assert result['name'] == 'TestScraper'
|
|
assert result['status'] == 'error'
|
|
assert result['error'] == "Test error"
|
|
|
|
def test_run_sequential(self, orchestrator, mock_scrapers):
|
|
orchestrator.scrapers = mock_scrapers
|
|
|
|
results = orchestrator.run_sequential()
|
|
|
|
assert len(results) == 2
|
|
assert results[0]['name'] == 'WordPress'
|
|
assert results[1]['name'] == 'YouTube'
|
|
assert all(r['status'] == 'success' for r in results)
|
|
|
|
@patch('multiprocessing.Pool')
|
|
def test_run_parallel(self, mock_pool_class, orchestrator, mock_scrapers):
|
|
mock_pool = MagicMock()
|
|
mock_pool_class.return_value.__enter__.return_value = mock_pool
|
|
|
|
# Mock the map function to return results
|
|
mock_pool.map.return_value = [
|
|
{'name': 'WordPress', 'status': 'success', 'items_count': 2, 'new_items': 1,
|
|
'error': None, 'duration_seconds': 1.0},
|
|
{'name': 'YouTube', 'status': 'success', 'items_count': 1, 'new_items': 1,
|
|
'error': None, 'duration_seconds': 2.0}
|
|
]
|
|
|
|
orchestrator.scrapers = mock_scrapers
|
|
results = orchestrator.run_parallel(max_workers=2)
|
|
|
|
assert len(results) == 2
|
|
mock_pool_class.assert_called_once_with(processes=2)
|
|
mock_pool.map.assert_called_once()
|
|
|
|
def test_save_statistics(self, orchestrator, tmp_path):
|
|
orchestrator.stats_file = tmp_path / "stats.json"
|
|
|
|
results = [
|
|
{'name': 'WordPress', 'status': 'success', 'items_count': 2,
|
|
'new_items': 1, 'duration_seconds': 1.0, 'error': None},
|
|
{'name': 'YouTube', 'status': 'error', 'items_count': 0,
|
|
'new_items': 0, 'duration_seconds': 0.5, 'error': 'Connection failed'}
|
|
]
|
|
|
|
orchestrator.save_statistics(results)
|
|
|
|
# Check file was created
|
|
assert orchestrator.stats_file.exists()
|
|
|
|
# Load and verify stats
|
|
with open(orchestrator.stats_file, 'r') as f:
|
|
stats = json.load(f)
|
|
|
|
assert len(stats) == 1
|
|
assert stats[0]['total_scrapers'] == 2
|
|
assert stats[0]['successful'] == 1
|
|
assert stats[0]['failed'] == 1
|
|
assert stats[0]['total_items'] == 2
|
|
assert stats[0]['new_items'] == 1
|
|
|
|
def test_print_summary(self, orchestrator, capsys):
|
|
results = [
|
|
{'name': 'WordPress', 'status': 'success', 'items_count': 2,
|
|
'new_items': 1, 'duration_seconds': 1.0, 'error': None},
|
|
{'name': 'YouTube', 'status': 'error', 'items_count': 0,
|
|
'new_items': 0, 'duration_seconds': 0.5, 'error': 'Connection failed'}
|
|
]
|
|
|
|
orchestrator.print_summary(results)
|
|
|
|
captured = capsys.readouterr()
|
|
assert "SCRAPING SUMMARY" in captured.out
|
|
assert "✓ WordPress:" in captured.out
|
|
assert "✗ YouTube:" in captured.out
|
|
assert "Successful: 1/2" in captured.out
|
|
assert "Total items: 2" in captured.out
|
|
|
|
@patch('src.orchestrator.ScraperOrchestrator.run_parallel')
|
|
@patch('src.orchestrator.ScraperOrchestrator.save_statistics')
|
|
@patch('src.orchestrator.ScraperOrchestrator.print_summary')
|
|
def test_run_method(self, mock_print, mock_save, mock_parallel, orchestrator):
|
|
mock_parallel.return_value = [
|
|
{'name': 'Test', 'status': 'success', 'items_count': 1,
|
|
'new_items': 1, 'duration_seconds': 1.0, 'error': None}
|
|
]
|
|
|
|
orchestrator.scrapers = [("Test", MagicMock())]
|
|
orchestrator.run(parallel=True)
|
|
|
|
mock_parallel.assert_called_once_with(None)
|
|
mock_save.assert_called_once()
|
|
mock_print.assert_called_once() |