hvac-kia-content/tests/test_orchestrator_integration.py
Ben Reed 8d5750b1d1 Add comprehensive test infrastructure
- Created unit tests for BaseScraper with mocking
- Added integration tests for parallel processing
- Created end-to-end tests with realistic mock data
- Fixed initialization order in BaseScraper (logger before user agent)
- Fixed orchestrator method name (archive_current_file)
- Added tenacity dependency for retry logic
- Validated parallel processing performance and overlap detection
- Confirmed spec-compliant markdown formatting in tests

Tests cover:
- Base scraper functionality (state, markdown, retry logic, media downloads)
- Parallel vs sequential execution timing
- Error isolation between scrapers
- Directory structure creation
- State management across runs
- Full workflow with realistic data

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-18 21:16:14 -03:00

316 lines
No EOL
12 KiB
Python

#!/usr/bin/env python3
"""
Integration tests for ContentOrchestrator parallel processing
"""
import pytest
import json
import time
import tempfile
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
from concurrent.futures import ThreadPoolExecutor, as_completed
# Add project to path
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.orchestrator import ContentOrchestrator
from src.base_scraper import BaseScraper, ScraperConfig
class MockScraper(BaseScraper):
"""Mock scraper for testing parallel processing"""
def __init__(self, config: ScraperConfig, delay: float = 0.1, fail: bool = False):
super().__init__(config)
self.delay = delay
self.fail = fail
self.run_called = False
self.run_start_time = None
self.run_end_time = None
def fetch_content(self):
return [
{
'id': f'{self.config.source_name}_1',
'title': f'Test {self.config.source_name} Title',
'url': f'https://example.com/{self.config.source_name}/1',
'description': f'Test description for {self.config.source_name}',
'likes': 10,
'comments': 5
}
]
def get_incremental_items(self, items, state):
# Return all items for testing
return items
def update_state(self, state, items):
state['last_id'] = items[-1]['id'] if items else None
return state
def run(self):
self.run_called = True
self.run_start_time = time.time()
if self.fail:
self.logger.error(f"Mock failure for {self.config.source_name}")
raise Exception(f"Mock failure for {self.config.source_name}")
# Simulate processing time
time.sleep(self.delay)
try:
# Call parent run method for actual processing
super().run()
finally:
self.run_end_time = time.time()
class TestOrchestratorIntegration:
"""Integration tests for ContentOrchestrator"""
@pytest.fixture
def temp_config(self):
"""Create temporary config for testing"""
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
config = ScraperConfig(
source_name="integration_test",
brand_name="testbrand",
data_dir=temp_path / "data",
logs_dir=temp_path / "logs",
timezone="America/Halifax"
)
yield config
@pytest.fixture
def orchestrator(self, temp_config):
"""Create orchestrator with mock scrapers"""
orchestrator = ContentOrchestrator(temp_config.data_dir, temp_config.logs_dir)
# Replace real scrapers with mock scrapers
orchestrator.scrapers = {
'fast_scraper': MockScraper(
ScraperConfig(
source_name="fast_scraper",
brand_name=temp_config.brand_name,
data_dir=temp_config.data_dir,
logs_dir=temp_config.logs_dir,
timezone=temp_config.timezone
),
delay=0.1
),
'slow_scraper': MockScraper(
ScraperConfig(
source_name="slow_scraper",
brand_name=temp_config.brand_name,
data_dir=temp_config.data_dir,
logs_dir=temp_config.logs_dir,
timezone=temp_config.timezone
),
delay=0.3
),
'medium_scraper': MockScraper(
ScraperConfig(
source_name="medium_scraper",
brand_name=temp_config.brand_name,
data_dir=temp_config.data_dir,
logs_dir=temp_config.logs_dir,
timezone=temp_config.timezone
),
delay=0.2
)
}
return orchestrator
def test_parallel_execution_timing(self, orchestrator):
"""Test that parallel execution is faster than sequential"""
scrapers = orchestrator.scrapers
# Time parallel execution
start_time = time.time()
orchestrator.run_all_scrapers(parallel=True, max_workers=3)
parallel_time = time.time() - start_time
# Reset scrapers for sequential test
for scraper in scrapers.values():
scraper.run_called = False
scraper.run_start_time = None
scraper.run_end_time = None
# Time sequential execution
start_time = time.time()
orchestrator.run_all_scrapers(parallel=False)
sequential_time = time.time() - start_time
# Parallel should be significantly faster
assert parallel_time < sequential_time
assert parallel_time < 0.5 # Should complete well under total delay time
assert sequential_time > 0.6 # Should be close to sum of delays (0.1 + 0.2 + 0.3)
# Verify all scrapers ran
for scraper in scrapers.values():
assert scraper.run_called
def test_parallel_scraper_overlap(self, orchestrator):
"""Test that scrapers actually run in parallel (overlapping execution)"""
orchestrator.run_all_scrapers(parallel=True, max_workers=3)
scrapers = list(orchestrator.scrapers.values())
# Check that at least two scrapers had overlapping execution
overlaps = 0
for i in range(len(scrapers)):
for j in range(i + 1, len(scrapers)):
scraper_a = scrapers[i]
scraper_b = scrapers[j]
# Check if execution times overlap
if (scraper_a.run_start_time < scraper_b.run_end_time and
scraper_b.run_start_time < scraper_a.run_end_time):
overlaps += 1
assert overlaps > 0, "No overlapping execution detected - scrapers may not be running in parallel"
def test_error_isolation(self, orchestrator):
"""Test that one failing scraper doesn't crash others"""
# Make one scraper fail
orchestrator.scrapers['slow_scraper'].fail = True
# Run parallel processing
orchestrator.run_all_scrapers(parallel=True, max_workers=3)
# Check that other scrapers still completed successfully
assert orchestrator.scrapers['fast_scraper'].run_called
assert orchestrator.scrapers['medium_scraper'].run_called
assert orchestrator.scrapers['slow_scraper'].run_called # It ran but failed
# Check that successful scrapers produced output
fast_files = list((orchestrator.config.data_dir / "markdown_current").glob("*fast_scraper*"))
medium_files = list((orchestrator.config.data_dir / "markdown_current").glob("*medium_scraper*"))
slow_files = list((orchestrator.config.data_dir / "markdown_current").glob("*slow_scraper*"))
assert len(fast_files) > 0, "Fast scraper should have produced output"
assert len(medium_files) > 0, "Medium scraper should have produced output"
assert len(slow_files) == 0, "Slow scraper should not have produced output due to failure"
def test_worker_pool_limits(self, orchestrator):
"""Test that max_workers parameter is respected"""
# Add more scrapers than workers
for i in range(5):
scraper_name = f'extra_scraper_{i}'
orchestrator.scrapers[scraper_name] = MockScraper(
ScraperConfig(
source_name=scraper_name,
brand_name=orchestrator.config.brand_name,
data_dir=orchestrator.config.data_dir,
logs_dir=orchestrator.config.logs_dir,
timezone=orchestrator.config.timezone
),
delay=0.1
)
# Run with limited workers
start_time = time.time()
orchestrator.run_all_scrapers(parallel=True, max_workers=2)
execution_time = time.time() - start_time
# With 8 scrapers and 2 workers, should take longer than with unlimited workers
# but still benefit from parallelism
assert execution_time > 0.3 # Should take multiple batches
assert execution_time < 1.0 # But still faster than fully sequential
# Verify all scrapers ran
for scraper in orchestrator.scrapers.values():
assert scraper.run_called
def test_markdown_output_format(self, orchestrator):
"""Test that parallel processing produces spec-compliant markdown"""
orchestrator.run_all_scrapers(parallel=True, max_workers=3)
# Check markdown files were created
markdown_files = list((orchestrator.config.data_dir / "markdown_current").glob("*.md"))
assert len(markdown_files) == 3 # One per successful scraper
# Verify markdown format for each file
for file_path in markdown_files:
content = file_path.read_text(encoding='utf-8')
# Check spec-compliant format
assert "# ID:" in content
assert "## Title:" in content
assert "## Type:" in content
assert "## Permalink:" in content
assert "## Description:" in content
assert "## Metadata:" in content
assert "### Comments:" in content
assert "### Likes:" in content
assert "### Tags:" in content
def test_state_management_isolation(self, orchestrator):
"""Test that state management works correctly in parallel"""
# Run scrapers twice to test state persistence
orchestrator.run_all_scrapers(parallel=True, max_workers=3)
# Check that state files were created
state_files = list((orchestrator.config.data_dir / ".state").glob("*_state.json"))
assert len(state_files) == 3
# Verify state content
for state_file in state_files:
state_data = json.loads(state_file.read_text())
assert 'last_update' in state_data
assert 'last_item_count' in state_data
assert state_data['last_item_count'] == 1 # Mock scrapers return 1 item
def test_directory_structure_creation(self, orchestrator):
"""Test that parallel processing creates proper directory structure"""
orchestrator.run_all_scrapers(parallel=True, max_workers=3)
base_data_dir = orchestrator.config.data_dir
# Check that all required directories exist
assert (base_data_dir / "markdown_current").exists()
assert (base_data_dir / ".state").exists()
# Check source-specific directories
for source_name in orchestrator.scrapers.keys():
source_title = source_name.title()
assert (base_data_dir / "markdown_archives" / source_title).exists()
assert (base_data_dir / "media" / source_title).exists()
assert (orchestrator.config.logs_dir / source_title).exists()
@patch('src.orchestrator.ContentOrchestrator.sync_to_nas')
def test_nas_sync_called(self, mock_sync, orchestrator):
"""Test that NAS sync is called after parallel processing"""
orchestrator.run_all_scrapers(parallel=True, max_workers=3)
# Verify sync was called
mock_sync.assert_called_once()
def test_logging_isolation(self, orchestrator):
"""Test that logging works correctly in parallel processing"""
orchestrator.run_all_scrapers(parallel=True, max_workers=3)
# Check that log files were created for each scraper
log_files = []
for source_name in orchestrator.scrapers.keys():
source_title = source_name.title()
log_file = orchestrator.config.logs_dir / source_title / f"{source_name}.log"
if log_file.exists():
log_files.append(log_file)
assert len(log_files) == 3
# Verify log content
for log_file in log_files:
log_content = log_file.read_text()
assert "Starting" in log_content
assert "Successfully processed" in log_content
if __name__ == '__main__':
pytest.main([__file__, '-v'])