hvac-kia-content/tests/test_orchestrator.py
Ben Reed b89655c829 Add Instagram scraper with instaloader and parallel processing orchestrator
- Implement Instagram scraper with aggressive rate limiting
- Add orchestrator for running all scrapers in parallel
- Create comprehensive tests for Instagram scraper (11 tests)
- Create tests for orchestrator (9 tests)
- Fix Instagram test issues with post type detection
- All 60 tests passing successfully

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-18 12:56:57 -03:00

186 lines
No EOL
7.5 KiB
Python

import pytest
from unittest.mock import Mock, patch, MagicMock
from pathlib import Path
import json
from src.orchestrator import ScraperOrchestrator
from src.base_scraper import ScraperConfig
class TestScraperOrchestrator:
@pytest.fixture
def orchestrator(self):
return ScraperOrchestrator(
base_data_dir=Path("test_data"),
base_logs_dir=Path("test_logs"),
brand_name="test_brand",
timezone="America/Halifax"
)
@pytest.fixture
def mock_scrapers(self):
"""Create mock scrapers."""
mock_wordpress = MagicMock()
mock_wordpress.load_state.return_value = {}
mock_wordpress.fetch_content.return_value = [
{'id': '1', 'title': 'Post 1'},
{'id': '2', 'title': 'Post 2'}
]
mock_wordpress.get_incremental_items.return_value = [{'id': '2', 'title': 'Post 2'}]
mock_wordpress.format_markdown.return_value = "# Post 2"
mock_wordpress.generate_filename.return_value = "test_wordpress.md"
mock_wordpress.update_state.return_value = {'last_id': '2'}
mock_youtube = MagicMock()
mock_youtube.load_state.return_value = {}
mock_youtube.fetch_content.return_value = [
{'id': 'vid1', 'title': 'Video 1'}
]
mock_youtube.get_incremental_items.return_value = [{'id': 'vid1', 'title': 'Video 1'}]
mock_youtube.format_markdown.return_value = "# Video 1"
mock_youtube.generate_filename.return_value = "test_youtube.md"
mock_youtube.update_state.return_value = {'last_video_id': 'vid1'}
return [
("WordPress", mock_wordpress),
("YouTube", mock_youtube)
]
def test_initialization(self, orchestrator):
assert orchestrator.base_data_dir == Path("test_data")
assert orchestrator.base_logs_dir == Path("test_logs")
assert orchestrator.brand_name == "test_brand"
assert orchestrator.timezone == "America/Halifax"
@patch('src.orchestrator.InstagramScraper')
@patch('src.orchestrator.RSSScraperPodcast')
@patch('src.orchestrator.RSSScraperMailChimp')
@patch('src.orchestrator.WordPressScraper')
@patch('src.orchestrator.YouTubeScraper')
def test_initialize_scrapers(self, mock_youtube_class, mock_wordpress_class,
mock_mailchimp_class, mock_podcast_class, mock_instagram_class):
# Create a clean environment with only specific scrapers enabled
with patch.dict('os.environ', {
'WORDPRESS_API_URL': 'https://test.com/wp-json',
'YOUTUBE_CHANNEL_URL': 'https://youtube.com/@test',
# Clear other environment variables
'MAILCHIMP_RSS_URL': '',
'PODCAST_RSS_URL': '',
'INSTAGRAM_USERNAME': ''
}, clear=True):
orchestrator = ScraperOrchestrator()
# Should only have WordPress and YouTube scrapers
assert len(orchestrator.scrapers) == 2
names = [name for name, _ in orchestrator.scrapers]
assert 'WordPress' in names
assert 'YouTube' in names
def test_run_scraper_success(self, orchestrator, mock_scrapers):
orchestrator.scrapers = mock_scrapers
# Run first scraper
result = orchestrator._run_scraper(mock_scrapers[0])
assert result['name'] == 'WordPress'
assert result['status'] == 'success'
assert result['items_count'] == 2
assert result['new_items'] == 1
assert result['error'] is None
def test_run_scraper_error(self, orchestrator):
mock_scraper = MagicMock()
mock_scraper.load_state.side_effect = Exception("Test error")
result = orchestrator._run_scraper(("TestScraper", mock_scraper))
assert result['name'] == 'TestScraper'
assert result['status'] == 'error'
assert result['error'] == "Test error"
def test_run_sequential(self, orchestrator, mock_scrapers):
orchestrator.scrapers = mock_scrapers
results = orchestrator.run_sequential()
assert len(results) == 2
assert results[0]['name'] == 'WordPress'
assert results[1]['name'] == 'YouTube'
assert all(r['status'] == 'success' for r in results)
@patch('multiprocessing.Pool')
def test_run_parallel(self, mock_pool_class, orchestrator, mock_scrapers):
mock_pool = MagicMock()
mock_pool_class.return_value.__enter__.return_value = mock_pool
# Mock the map function to return results
mock_pool.map.return_value = [
{'name': 'WordPress', 'status': 'success', 'items_count': 2, 'new_items': 1,
'error': None, 'duration_seconds': 1.0},
{'name': 'YouTube', 'status': 'success', 'items_count': 1, 'new_items': 1,
'error': None, 'duration_seconds': 2.0}
]
orchestrator.scrapers = mock_scrapers
results = orchestrator.run_parallel(max_workers=2)
assert len(results) == 2
mock_pool_class.assert_called_once_with(processes=2)
mock_pool.map.assert_called_once()
def test_save_statistics(self, orchestrator, tmp_path):
orchestrator.stats_file = tmp_path / "stats.json"
results = [
{'name': 'WordPress', 'status': 'success', 'items_count': 2,
'new_items': 1, 'duration_seconds': 1.0, 'error': None},
{'name': 'YouTube', 'status': 'error', 'items_count': 0,
'new_items': 0, 'duration_seconds': 0.5, 'error': 'Connection failed'}
]
orchestrator.save_statistics(results)
# Check file was created
assert orchestrator.stats_file.exists()
# Load and verify stats
with open(orchestrator.stats_file, 'r') as f:
stats = json.load(f)
assert len(stats) == 1
assert stats[0]['total_scrapers'] == 2
assert stats[0]['successful'] == 1
assert stats[0]['failed'] == 1
assert stats[0]['total_items'] == 2
assert stats[0]['new_items'] == 1
def test_print_summary(self, orchestrator, capsys):
results = [
{'name': 'WordPress', 'status': 'success', 'items_count': 2,
'new_items': 1, 'duration_seconds': 1.0, 'error': None},
{'name': 'YouTube', 'status': 'error', 'items_count': 0,
'new_items': 0, 'duration_seconds': 0.5, 'error': 'Connection failed'}
]
orchestrator.print_summary(results)
captured = capsys.readouterr()
assert "SCRAPING SUMMARY" in captured.out
assert "✓ WordPress:" in captured.out
assert "✗ YouTube:" in captured.out
assert "Successful: 1/2" in captured.out
assert "Total items: 2" in captured.out
@patch('src.orchestrator.ScraperOrchestrator.run_parallel')
@patch('src.orchestrator.ScraperOrchestrator.save_statistics')
@patch('src.orchestrator.ScraperOrchestrator.print_summary')
def test_run_method(self, mock_print, mock_save, mock_parallel, orchestrator):
mock_parallel.return_value = [
{'name': 'Test', 'status': 'success', 'items_count': 1,
'new_items': 1, 'duration_seconds': 1.0, 'error': None}
]
orchestrator.scrapers = [("Test", MagicMock())]
orchestrator.run(parallel=True)
mock_parallel.assert_called_once_with(None)
mock_save.assert_called_once()
mock_print.assert_called_once()