Add Instagram scraper with instaloader and parallel processing orchestrator

- Implement Instagram scraper with aggressive rate limiting
- Add orchestrator for running all scrapers in parallel
- Create comprehensive tests for Instagram scraper (11 tests)
- Create tests for orchestrator (9 tests)
- Fix Instagram test issues with post type detection
- All 60 tests passing successfully

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Ben Reed 2025-08-18 12:56:57 -03:00
parent c1831d3a52
commit b89655c829
7 changed files with 1210 additions and 0 deletions

399
src/instagram_scraper.py Normal file
View file

@ -0,0 +1,399 @@
import os
import time
import random
from typing import Any, Dict, List, Optional
from datetime import datetime
from pathlib import Path
import instaloader
from src.base_scraper import BaseScraper, ScraperConfig
class InstagramScraper(BaseScraper):
"""Instagram scraper using instaloader with aggressive rate limiting."""
def __init__(self, config: ScraperConfig):
super().__init__(config)
self.username = os.getenv('INSTAGRAM_USERNAME')
self.password = os.getenv('INSTAGRAM_PASSWORD')
self.target_account = os.getenv('INSTAGRAM_TARGET', 'hvacknowitall')
# Session file for persistence
self.session_file = self.config.data_dir / '.sessions' / f'{self.username}'
self.session_file.parent.mkdir(parents=True, exist_ok=True)
# Initialize loader
self.loader = self._setup_loader()
self._login()
# Request counter for rate limiting
self.request_count = 0
self.max_requests_per_hour = 100
def _setup_loader(self) -> instaloader.Instaloader:
"""Setup Instaloader with conservative settings."""
loader = instaloader.Instaloader(
quiet=True,
user_agent='Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Mobile/15E148 Safari/604.1',
dirname_pattern=str(self.config.data_dir / 'media' / 'Instagram'),
filename_pattern='{date_utc}_UTC_{shortcode}',
download_pictures=False, # Don't download by default
download_videos=False,
download_video_thumbnails=False,
download_geotags=False,
download_comments=False,
save_metadata=False,
compress_json=False,
post_metadata_txt_pattern='',
storyitem_metadata_txt_pattern='',
max_connection_attempts=3,
request_timeout=30.0,
rate_controller=lambda x: time.sleep(random.uniform(5, 10)) # Built-in rate limiting
)
return loader
def _login(self) -> None:
"""Login to Instagram or load existing session."""
try:
# Try to load existing session
if self.session_file.exists():
self.loader.load_session_from_file(str(self.session_file), self.username)
self.logger.info("Loaded existing Instagram session")
else:
# Login with credentials
self.logger.info("Logging in to Instagram...")
self.loader.login(self.username, self.password)
self.loader.save_session_to_file(str(self.session_file))
self.logger.info("Instagram login successful, session saved")
except Exception as e:
self.logger.error(f"Instagram login error: {e}")
def _aggressive_delay(self, min_seconds: float = 5, max_seconds: float = 10) -> None:
"""Add aggressive random delay for Instagram."""
delay = random.uniform(min_seconds, max_seconds)
self.logger.debug(f"Waiting {delay:.2f} seconds (Instagram rate limiting)...")
time.sleep(delay)
def _check_rate_limit(self) -> None:
"""Check and enforce rate limiting."""
self.request_count += 1
if self.request_count >= self.max_requests_per_hour:
self.logger.warning(f"Rate limit reached ({self.max_requests_per_hour} requests), pausing for 1 hour...")
time.sleep(3600) # Wait 1 hour
self.request_count = 0
elif self.request_count % 10 == 0:
# Take a longer break every 10 requests
self.logger.info("Taking extended break after 10 requests...")
self._aggressive_delay(30, 60)
def _get_post_type(self, post) -> str:
"""Determine post type from Instagram post object."""
typename = getattr(post, 'typename', '')
is_video = getattr(post, 'is_video', False)
if typename == 'GraphStoryImage' or typename == 'GraphStoryVideo':
return 'story'
elif 'Video' in typename or is_video:
return 'reel'
else:
return 'post'
def fetch_posts(self, max_posts: int = 20) -> List[Dict[str, Any]]:
"""Fetch posts from Instagram profile."""
posts_data = []
try:
self.logger.info(f"Fetching posts from @{self.target_account}")
# Get profile
profile = instaloader.Profile.from_username(self.loader.context, self.target_account)
self._check_rate_limit()
# Get posts
posts = profile.get_posts()
count = 0
for post in posts:
if count >= max_posts:
break
try:
# Extract post data
post_data = {
'id': post.shortcode,
'type': self._get_post_type(post),
'caption': post.caption if post.caption else '',
'author': post.owner_username,
'publish_date': post.date_utc.isoformat(),
'link': f'https://www.instagram.com/p/{post.shortcode}/',
'likes': post.likes,
'comments': post.comments,
'views': post.video_view_count if hasattr(post, 'video_view_count') else None,
'media_count': post.mediacount if hasattr(post, 'mediacount') else 1,
'hashtags': list(post.caption_hashtags) if post.caption else [],
'mentions': list(post.caption_mentions) if post.caption else [],
'is_video': getattr(post, 'is_video', False)
}
posts_data.append(post_data)
count += 1
# Aggressive rate limiting between posts
self._aggressive_delay()
self._check_rate_limit()
# Log progress
if count % 5 == 0:
self.logger.info(f"Fetched {count}/{max_posts} posts")
except Exception as e:
self.logger.error(f"Error processing post: {e}")
continue
self.logger.info(f"Successfully fetched {len(posts_data)} posts")
except Exception as e:
self.logger.error(f"Error fetching posts: {e}")
return posts_data
def fetch_stories(self) -> List[Dict[str, Any]]:
"""Fetch stories from Instagram profile."""
stories_data = []
try:
self.logger.info(f"Fetching stories from @{self.target_account}")
# Get profile
profile = instaloader.Profile.from_username(self.loader.context, self.target_account)
self._check_rate_limit()
# Get user ID for stories
userid = profile.userid
# Get stories
for story in self.loader.get_stories(userids=[userid]):
for item in story:
try:
story_data = {
'id': item.mediaid,
'type': 'story',
'caption': '', # Stories usually don't have captions
'author': item.owner_username,
'publish_date': item.date_utc.isoformat(),
'link': f'https://www.instagram.com/stories/{item.owner_username}/{item.mediaid}/',
'is_video': item.is_video if hasattr(item, 'is_video') else False
}
stories_data.append(story_data)
# Rate limiting
self._aggressive_delay()
self._check_rate_limit()
except Exception as e:
self.logger.error(f"Error processing story: {e}")
continue
self.logger.info(f"Successfully fetched {len(stories_data)} stories")
except Exception as e:
self.logger.error(f"Error fetching stories: {e}")
return stories_data
def fetch_reels(self, max_reels: int = 10) -> List[Dict[str, Any]]:
"""Fetch reels (videos) from Instagram profile."""
reels_data = []
try:
self.logger.info(f"Fetching reels from @{self.target_account}")
# Get profile
profile = instaloader.Profile.from_username(self.loader.context, self.target_account)
self._check_rate_limit()
# Get posts and filter for videos/reels
posts = profile.get_posts()
count = 0
for post in posts:
if count >= max_reels:
break
# Check if it's a video/reel
if not getattr(post, 'is_video', False):
continue
try:
reel_data = {
'id': post.shortcode,
'type': 'reel',
'caption': post.caption if post.caption else '',
'author': post.owner_username,
'publish_date': post.date_utc.isoformat(),
'link': f'https://www.instagram.com/reel/{post.shortcode}/',
'likes': post.likes,
'comments': post.comments,
'views': post.video_view_count if hasattr(post, 'video_view_count') else None,
'duration': post.video_duration if hasattr(post, 'video_duration') else None,
'hashtags': list(post.caption_hashtags) if post.caption else [],
'mentions': list(post.caption_mentions) if post.caption else []
}
reels_data.append(reel_data)
count += 1
# Aggressive rate limiting
self._aggressive_delay()
self._check_rate_limit()
except Exception as e:
self.logger.error(f"Error processing reel: {e}")
continue
self.logger.info(f"Successfully fetched {len(reels_data)} reels")
except Exception as e:
self.logger.error(f"Error fetching reels: {e}")
return reels_data
def fetch_content(self) -> List[Dict[str, Any]]:
"""Fetch all content types from Instagram."""
all_content = []
# Fetch posts
posts = self.fetch_posts(max_posts=20)
all_content.extend(posts)
# Take a break between content types
self.logger.info("Taking break before fetching stories...")
self._aggressive_delay(15, 30)
# Fetch stories
stories = self.fetch_stories()
all_content.extend(stories)
# Note: Reels are included in posts (videos)
# so we don't need to fetch them separately
self.logger.info(f"Total content fetched: {len(all_content)} items")
return all_content
def format_markdown(self, items: List[Dict[str, Any]]) -> str:
"""Format Instagram content as markdown."""
markdown_sections = []
for item in items:
section = []
# ID
item_id = item.get('id', 'N/A')
section.append(f"# ID: {item_id}")
section.append("")
# Type
item_type = item.get('type', 'post')
section.append(f"## Type: {item_type}")
section.append("")
# Author
author = item.get('author', 'Unknown')
section.append(f"## Author: {author}")
section.append("")
# Publish Date
pub_date = item.get('publish_date', '')
section.append(f"## Publish Date: {pub_date}")
section.append("")
# Link
link = item.get('link', '')
section.append(f"## Link: {link}")
section.append("")
# Engagement metrics
likes = item.get('likes')
if likes is not None:
section.append(f"## Likes: {likes}")
section.append("")
comments = item.get('comments')
if comments is not None:
section.append(f"## Comments: {comments}")
section.append("")
views = item.get('views')
if views is not None:
section.append(f"## Views: {views}")
section.append("")
# Hashtags
hashtags = item.get('hashtags', [])
if hashtags:
hashtags_str = ', '.join(hashtags)
section.append(f"## Hashtags: {hashtags_str}")
section.append("")
# Mentions
mentions = item.get('mentions', [])
if mentions:
mentions_str = ', '.join(mentions)
section.append(f"## Mentions: {mentions_str}")
section.append("")
# Caption/Description
section.append("## Description:")
caption = item.get('caption', '')
if caption:
# Limit caption to first 500 characters
if len(caption) > 500:
caption = caption[:500] + "..."
section.append(caption)
section.append("")
# Separator
section.append("-" * 50)
section.append("")
markdown_sections.append('\n'.join(section))
return '\n'.join(markdown_sections)
def get_incremental_items(self, items: List[Dict[str, Any]], state: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Get only new posts since last sync."""
if not state:
return items
last_post_id = state.get('last_post_id')
if not last_post_id:
return items
# Filter for posts newer than the last synced
new_items = []
for item in items:
if item.get('id') == last_post_id:
break # Found the last synced post
new_items.append(item)
return new_items
def update_state(self, state: Dict[str, Any], items: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Update state with latest post information."""
if not items:
return state
# Get the first item (most recent)
latest_item = items[0]
state['last_post_id'] = latest_item.get('id')
state['last_post_date'] = latest_item.get('publish_date')
state['last_sync'] = datetime.now(self.tz).isoformat()
state['post_count'] = len([i for i in items if i.get('type') == 'post'])
state['story_count'] = len([i for i in items if i.get('type') == 'story'])
state['reel_count'] = len([i for i in items if i.get('type') == 'reel'])
return state

352
src/orchestrator.py Normal file
View file

@ -0,0 +1,352 @@
#!/usr/bin/env python3
"""
Orchestrator for running all scrapers in parallel.
"""
import os
import sys
import time
import logging
import multiprocessing
from pathlib import Path
from typing import List, Dict, Any, Optional
from datetime import datetime
import pytz
import json
# Import all scrapers
from src.base_scraper import ScraperConfig
from src.wordpress_scraper import WordPressScraper
from src.rss_scraper import RSSScraperMailChimp, RSSScraperPodcast
from src.youtube_scraper import YouTubeScraper
from src.instagram_scraper import InstagramScraper
class ScraperOrchestrator:
"""Orchestrator for running multiple scrapers in parallel."""
def __init__(self, base_data_dir: Path = Path("data"),
base_logs_dir: Path = Path("logs"),
brand_name: str = "hvacknowitall",
timezone: str = "America/Halifax"):
"""Initialize the orchestrator."""
self.base_data_dir = base_data_dir
self.base_logs_dir = base_logs_dir
self.brand_name = brand_name
self.timezone = timezone
self.tz = pytz.timezone(timezone)
# Setup orchestrator logger
self.logger = self._setup_logger()
# Initialize scrapers
self.scrapers = self._initialize_scrapers()
# Statistics file
self.stats_file = self.base_data_dir / "orchestrator_stats.json"
def _setup_logger(self) -> logging.Logger:
"""Setup logger for orchestrator."""
logger = logging.getLogger("hvacknowitall_orchestrator")
logger.setLevel(logging.INFO)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# File handler
log_file = self.base_logs_dir / "orchestrator.log"
log_file.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.DEBUG)
# Formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
def _initialize_scrapers(self) -> List[tuple]:
"""Initialize all scraper instances."""
scrapers = []
# WordPress scraper
if os.getenv('WORDPRESS_API_URL'):
config = ScraperConfig(
source_name="wordpress",
brand_name=self.brand_name,
data_dir=self.base_data_dir,
logs_dir=self.base_logs_dir,
timezone=self.timezone
)
scrapers.append(("WordPress", WordPressScraper(config)))
self.logger.info("Initialized WordPress scraper")
# MailChimp RSS scraper
if os.getenv('MAILCHIMP_RSS_URL'):
config = ScraperConfig(
source_name="mailchimp",
brand_name=self.brand_name,
data_dir=self.base_data_dir,
logs_dir=self.base_logs_dir,
timezone=self.timezone
)
scrapers.append(("MailChimp", RSSScraperMailChimp(config)))
self.logger.info("Initialized MailChimp RSS scraper")
# Podcast RSS scraper
if os.getenv('PODCAST_RSS_URL'):
config = ScraperConfig(
source_name="podcast",
brand_name=self.brand_name,
data_dir=self.base_data_dir,
logs_dir=self.base_logs_dir,
timezone=self.timezone
)
scrapers.append(("Podcast", RSSScraperPodcast(config)))
self.logger.info("Initialized Podcast RSS scraper")
# YouTube scraper
if os.getenv('YOUTUBE_CHANNEL_URL'):
config = ScraperConfig(
source_name="youtube",
brand_name=self.brand_name,
data_dir=self.base_data_dir,
logs_dir=self.base_logs_dir,
timezone=self.timezone
)
scrapers.append(("YouTube", YouTubeScraper(config)))
self.logger.info("Initialized YouTube scraper")
# Instagram scraper
if os.getenv('INSTAGRAM_USERNAME'):
config = ScraperConfig(
source_name="instagram",
brand_name=self.brand_name,
data_dir=self.base_data_dir,
logs_dir=self.base_logs_dir,
timezone=self.timezone
)
scrapers.append(("Instagram", InstagramScraper(config)))
self.logger.info("Initialized Instagram scraper")
return scrapers
def _run_scraper(self, scraper_info: tuple) -> Dict[str, Any]:
"""Run a single scraper and return results."""
name, scraper = scraper_info
result = {
'name': name,
'status': 'pending',
'items_count': 0,
'new_items': 0,
'error': None,
'start_time': datetime.now(self.tz).isoformat(),
'end_time': None,
'duration_seconds': 0
}
try:
start_time = time.time()
self.logger.info(f"Starting {name} scraper...")
# Load state
state = scraper.load_state()
# Fetch content
items = scraper.fetch_content()
result['items_count'] = len(items)
# Filter for incremental items
new_items = scraper.get_incremental_items(items, state)
result['new_items'] = len(new_items)
if new_items:
# Format as markdown
markdown_content = scraper.format_markdown(new_items)
# Archive existing file
scraper.archive_current_file()
# Save new markdown
filename = scraper.generate_filename()
file_path = self.base_data_dir / filename
with open(file_path, 'w', encoding='utf-8') as f:
f.write(markdown_content)
self.logger.info(f"{name}: Saved {len(new_items)} new items to {filename}")
# Update state
new_state = scraper.update_state(state, items)
scraper.save_state(new_state)
else:
self.logger.info(f"{name}: No new items found")
result['status'] = 'success'
result['end_time'] = datetime.now(self.tz).isoformat()
result['duration_seconds'] = round(time.time() - start_time, 2)
except Exception as e:
self.logger.error(f"{name} scraper failed: {e}")
result['status'] = 'error'
result['error'] = str(e)
result['end_time'] = datetime.now(self.tz).isoformat()
result['duration_seconds'] = round(time.time() - start_time, 2)
return result
def run_sequential(self) -> List[Dict[str, Any]]:
"""Run all scrapers sequentially."""
self.logger.info("Starting sequential scraping...")
results = []
for scraper_info in self.scrapers:
result = self._run_scraper(scraper_info)
results.append(result)
return results
def run_parallel(self, max_workers: Optional[int] = None) -> List[Dict[str, Any]]:
"""Run all scrapers in parallel using multiprocessing."""
self.logger.info(f"Starting parallel scraping with {max_workers or 'all'} workers...")
if not self.scrapers:
self.logger.warning("No scrapers configured")
return []
# Use number of scrapers as max workers if not specified
if max_workers is None:
max_workers = len(self.scrapers)
with multiprocessing.Pool(processes=max_workers) as pool:
results = pool.map(self._run_scraper, self.scrapers)
return results
def save_statistics(self, results: List[Dict[str, Any]]) -> None:
"""Save run statistics to file."""
stats = {
'run_time': datetime.now(self.tz).isoformat(),
'total_scrapers': len(results),
'successful': sum(1 for r in results if r['status'] == 'success'),
'failed': sum(1 for r in results if r['status'] == 'error'),
'total_items': sum(r['items_count'] for r in results),
'new_items': sum(r['new_items'] for r in results),
'total_duration': sum(r['duration_seconds'] for r in results),
'results': results
}
# Load existing stats if file exists
all_stats = []
if self.stats_file.exists():
try:
with open(self.stats_file, 'r') as f:
all_stats = json.load(f)
except:
pass
# Append new stats (keep last 100 runs)
all_stats.append(stats)
if len(all_stats) > 100:
all_stats = all_stats[-100:]
# Save to file
with open(self.stats_file, 'w') as f:
json.dump(all_stats, f, indent=2)
self.logger.info(f"Statistics saved to {self.stats_file}")
def print_summary(self, results: List[Dict[str, Any]]) -> None:
"""Print a summary of the scraping results."""
print("\n" + "="*60)
print("SCRAPING SUMMARY")
print("="*60)
for result in results:
status_symbol = "" if result['status'] == 'success' else ""
print(f"\n{status_symbol} {result['name']}:")
print(f" Status: {result['status']}")
print(f" Items found: {result['items_count']}")
print(f" New items: {result['new_items']}")
print(f" Duration: {result['duration_seconds']}s")
if result['error']:
print(f" Error: {result['error']}")
print("\n" + "-"*60)
print("TOTALS:")
print(f" Successful: {sum(1 for r in results if r['status'] == 'success')}/{len(results)}")
print(f" Total items: {sum(r['items_count'] for r in results)}")
print(f" New items: {sum(r['new_items'] for r in results)}")
print(f" Total time: {sum(r['duration_seconds'] for r in results):.2f}s")
print("="*60 + "\n")
def run(self, parallel: bool = True, max_workers: Optional[int] = None) -> None:
"""Main run method."""
start_time = time.time()
self.logger.info(f"Starting orchestrator at {datetime.now(self.tz).isoformat()}")
self.logger.info(f"Configured scrapers: {len(self.scrapers)}")
if not self.scrapers:
self.logger.error("No scrapers configured. Please check your .env file.")
return
# Run scrapers
if parallel:
results = self.run_parallel(max_workers)
else:
results = self.run_sequential()
# Save statistics
self.save_statistics(results)
# Print summary
self.print_summary(results)
total_time = time.time() - start_time
self.logger.info(f"Orchestrator completed in {total_time:.2f} seconds")
def main():
"""Main entry point."""
import argparse
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Parse arguments
parser = argparse.ArgumentParser(description="Run HVAC Know It All content scrapers")
parser.add_argument('--sequential', action='store_true',
help='Run scrapers sequentially instead of in parallel')
parser.add_argument('--max-workers', type=int, default=None,
help='Maximum number of parallel workers')
parser.add_argument('--data-dir', type=str, default='data',
help='Base data directory')
parser.add_argument('--logs-dir', type=str, default='logs',
help='Base logs directory')
args = parser.parse_args()
# Create orchestrator
orchestrator = ScraperOrchestrator(
base_data_dir=Path(args.data_dir),
base_logs_dir=Path(args.logs_dir)
)
# Run scrapers
orchestrator.run(
parallel=not args.sequential,
max_workers=args.max_workers
)
if __name__ == "__main__":
main()

Binary file not shown.

View file

@ -0,0 +1 @@
# Post 2

View file

@ -0,0 +1 @@
# Video 1

View file

@ -0,0 +1,271 @@
import pytest
from unittest.mock import Mock, patch, MagicMock, PropertyMock
from datetime import datetime
from pathlib import Path
import random
from src.instagram_scraper import InstagramScraper
from src.base_scraper import ScraperConfig
class TestInstagramScraper:
@pytest.fixture
def config(self):
return ScraperConfig(
source_name="instagram",
brand_name="hvacknowitall",
data_dir=Path("data"),
logs_dir=Path("logs"),
timezone="America/Halifax"
)
@pytest.fixture
def mock_env(self):
with patch.dict('os.environ', {
'INSTAGRAM_USERNAME': 'testuser',
'INSTAGRAM_PASSWORD': 'testpass',
'INSTAGRAM_TARGET': 'hvacknowitall'
}):
yield
@pytest.fixture
def sample_post(self):
mock_post = MagicMock()
mock_post.shortcode = 'ABC123'
mock_post.caption = 'Test caption #hvac #tips'
mock_post.owner_username = 'hvacknowitall'
mock_post.date_utc = datetime(2024, 1, 1, 12, 0, 0)
mock_post.typename = 'GraphImage'
mock_post.url = 'https://www.instagram.com/p/ABC123/'
mock_post.likes = 150
mock_post.comments = 25
mock_post.video_view_count = None
mock_post.mediacount = 1
mock_post.caption_hashtags = ['hvac', 'tips']
mock_post.caption_mentions = []
mock_post.is_video = False # Explicitly set is_video to False
return mock_post
@pytest.fixture
def sample_story(self):
mock_story = MagicMock()
mock_story.mediaid = 123456789
mock_story.owner_username = 'hvacknowitall'
mock_story.date_utc = datetime(2024, 1, 1, 12, 0, 0)
mock_story.url = 'https://www.instagram.com/stories/hvacknowitall/123456789/'
mock_story.typename = 'GraphStoryImage'
mock_story.is_video = False # Explicitly set is_video to False
return mock_story
@patch('src.instagram_scraper.InstagramScraper._login')
@patch('src.instagram_scraper.InstagramScraper._setup_loader')
def test_initialization(self, mock_setup, mock_login, config, mock_env):
mock_setup.return_value = MagicMock()
scraper = InstagramScraper(config)
assert scraper.config == config
assert scraper.username == 'testuser'
assert scraper.password == 'testpass'
assert scraper.target_account == 'hvacknowitall'
@patch('src.instagram_scraper.InstagramScraper._login')
@patch('instaloader.Instaloader')
def test_setup_loader(self, mock_instaloader_class, mock_login, config, mock_env):
mock_loader = MagicMock()
mock_instaloader_class.return_value = mock_loader
scraper = InstagramScraper(config)
# Test that instaloader was initialized with correct params
mock_instaloader_class.assert_called_once()
call_kwargs = mock_instaloader_class.call_args[1]
assert call_kwargs['quiet'] == True
assert call_kwargs['download_videos'] == False
assert call_kwargs['download_video_thumbnails'] == False
@patch('src.instagram_scraper.InstagramScraper._setup_loader')
@patch('instaloader.Instaloader')
def test_login(self, mock_instaloader_class, mock_setup, config, mock_env):
mock_loader = MagicMock()
mock_setup.return_value = mock_loader
# Create scraper without triggering login in __init__
with patch('src.instagram_scraper.InstagramScraper._login'):
scraper = InstagramScraper(config)
scraper.loader = mock_loader
# Now test login
scraper._login()
# Should try to login with credentials since no session file exists
mock_loader.login.assert_called_once_with('testuser', 'testpass')
@patch('time.sleep')
@patch('random.uniform')
@patch('src.instagram_scraper.InstagramScraper._login')
@patch('src.instagram_scraper.InstagramScraper._setup_loader')
def test_aggressive_delay(self, mock_setup, mock_login, mock_uniform, mock_sleep, config, mock_env):
mock_uniform.return_value = 7.5
mock_setup.return_value = MagicMock()
scraper = InstagramScraper(config)
scraper._aggressive_delay()
mock_uniform.assert_called_with(5, 10)
mock_sleep.assert_called_with(7.5)
@patch('instaloader.Profile.from_username')
@patch('src.instagram_scraper.InstagramScraper._login')
@patch('src.instagram_scraper.InstagramScraper._setup_loader')
def test_fetch_posts(self, mock_setup, mock_login, mock_profile_from_username,
config, mock_env, sample_post):
mock_loader = MagicMock()
mock_setup.return_value = mock_loader
mock_profile = MagicMock()
mock_profile.get_posts.return_value = [sample_post]
mock_profile_from_username.return_value = mock_profile
scraper = InstagramScraper(config)
scraper.loader = mock_loader
posts = scraper.fetch_posts(max_posts=10)
assert len(posts) == 1
assert posts[0]['id'] == 'ABC123'
assert posts[0]['type'] == 'post'
assert posts[0]['caption'] == 'Test caption #hvac #tips'
@patch('instaloader.Profile.from_username')
@patch('src.instagram_scraper.InstagramScraper._login')
@patch('src.instagram_scraper.InstagramScraper._setup_loader')
def test_fetch_stories(self, mock_setup, mock_login, mock_profile_from_username,
config, mock_env, sample_story):
mock_loader = MagicMock()
mock_setup.return_value = mock_loader
# get_stories returns an iterable where each element is an iterable of story items
mock_loader.get_stories.return_value = [[sample_story]] # Simplified: one story collection with one item
mock_profile = MagicMock()
mock_profile.userid = 12345
mock_profile_from_username.return_value = mock_profile
scraper = InstagramScraper(config)
scraper.loader = mock_loader
stories = scraper.fetch_stories()
assert len(stories) == 1
assert stories[0]['id'] == 123456789
assert stories[0]['type'] == 'story'
@patch('src.instagram_scraper.InstagramScraper._login')
@patch('src.instagram_scraper.InstagramScraper._setup_loader')
def test_get_post_type(self, mock_setup, mock_login, config, mock_env):
mock_setup.return_value = MagicMock()
scraper = InstagramScraper(config)
mock_post = MagicMock()
# Test regular post
mock_post.typename = 'GraphImage'
mock_post.is_video = False
assert scraper._get_post_type(mock_post) == 'post'
# Test video/reel
mock_post.typename = 'GraphVideo'
mock_post.is_video = True
assert scraper._get_post_type(mock_post) == 'reel'
# Test carousel
mock_post.typename = 'GraphSidecar'
mock_post.is_video = False
assert scraper._get_post_type(mock_post) == 'post'
@patch('src.instagram_scraper.InstagramScraper._login')
@patch('src.instagram_scraper.InstagramScraper._setup_loader')
def test_format_markdown(self, mock_setup, mock_login, config, mock_env):
mock_setup.return_value = MagicMock()
scraper = InstagramScraper(config)
items = [
{
'id': 'ABC123',
'type': 'post',
'caption': 'Test post',
'author': 'hvacknowitall',
'publish_date': '2024-01-01T12:00:00',
'link': 'https://www.instagram.com/p/ABC123/',
'likes': 150,
'comments': 25,
'views': None,
'hashtags': ['hvac', 'tips']
}
]
markdown = scraper.format_markdown(items)
assert '# ID: ABC123' in markdown
assert '## Type: post' in markdown
assert '## Author: hvacknowitall' in markdown
assert '## Publish Date: 2024-01-01T12:00:00' in markdown
assert '## Link: https://www.instagram.com/p/ABC123/' in markdown
assert '## Likes: 150' in markdown
assert '## Comments: 25' in markdown
assert '## Hashtags: hvac, tips' in markdown
assert 'Test post' in markdown
@patch('src.instagram_scraper.InstagramScraper._login')
@patch('src.instagram_scraper.InstagramScraper._setup_loader')
def test_get_incremental_items(self, mock_setup, mock_login, config, mock_env):
mock_setup.return_value = MagicMock()
scraper = InstagramScraper(config)
items = [
{'id': 'post3', 'publish_date': '2024-01-03T12:00:00'},
{'id': 'post2', 'publish_date': '2024-01-02T12:00:00'},
{'id': 'post1', 'publish_date': '2024-01-01T12:00:00'}
]
# Test with no previous state
state = {}
new_items = scraper.get_incremental_items(items, state)
assert len(new_items) == 3
# Test with existing state
state = {'last_post_id': 'post2'}
new_items = scraper.get_incremental_items(items, state)
assert len(new_items) == 1
assert new_items[0]['id'] == 'post3'
@patch('src.instagram_scraper.InstagramScraper._login')
@patch('src.instagram_scraper.InstagramScraper._setup_loader')
def test_update_state(self, mock_setup, mock_login, config, mock_env):
mock_setup.return_value = MagicMock()
scraper = InstagramScraper(config)
state = {}
items = [
{'id': 'post2', 'publish_date': '2024-01-02T12:00:00', 'type': 'post'},
{'id': 'post1', 'publish_date': '2024-01-01T12:00:00', 'type': 'post'}
]
updated_state = scraper.update_state(state, items)
assert updated_state['last_post_id'] == 'post2'
assert updated_state['last_post_date'] == '2024-01-02T12:00:00'
assert updated_state['post_count'] == 2
@patch('src.instagram_scraper.InstagramScraper._setup_loader')
@patch('instaloader.Instaloader')
def test_error_handling(self, mock_instaloader_class, mock_setup, config, mock_env):
mock_loader = MagicMock()
mock_setup.return_value = mock_loader
mock_loader.login.side_effect = Exception("Login failed")
# Test that login error is handled gracefully
with patch('src.instagram_scraper.InstagramScraper._login'):
scraper = InstagramScraper(config)
scraper.loader = mock_loader
scraper._login() # Should not raise, just log error
# Test fetch error handling
posts = scraper.fetch_posts()
assert posts == []

186
tests/test_orchestrator.py Normal file
View file

@ -0,0 +1,186 @@
import pytest
from unittest.mock import Mock, patch, MagicMock
from pathlib import Path
import json
from src.orchestrator import ScraperOrchestrator
from src.base_scraper import ScraperConfig
class TestScraperOrchestrator:
@pytest.fixture
def orchestrator(self):
return ScraperOrchestrator(
base_data_dir=Path("test_data"),
base_logs_dir=Path("test_logs"),
brand_name="test_brand",
timezone="America/Halifax"
)
@pytest.fixture
def mock_scrapers(self):
"""Create mock scrapers."""
mock_wordpress = MagicMock()
mock_wordpress.load_state.return_value = {}
mock_wordpress.fetch_content.return_value = [
{'id': '1', 'title': 'Post 1'},
{'id': '2', 'title': 'Post 2'}
]
mock_wordpress.get_incremental_items.return_value = [{'id': '2', 'title': 'Post 2'}]
mock_wordpress.format_markdown.return_value = "# Post 2"
mock_wordpress.generate_filename.return_value = "test_wordpress.md"
mock_wordpress.update_state.return_value = {'last_id': '2'}
mock_youtube = MagicMock()
mock_youtube.load_state.return_value = {}
mock_youtube.fetch_content.return_value = [
{'id': 'vid1', 'title': 'Video 1'}
]
mock_youtube.get_incremental_items.return_value = [{'id': 'vid1', 'title': 'Video 1'}]
mock_youtube.format_markdown.return_value = "# Video 1"
mock_youtube.generate_filename.return_value = "test_youtube.md"
mock_youtube.update_state.return_value = {'last_video_id': 'vid1'}
return [
("WordPress", mock_wordpress),
("YouTube", mock_youtube)
]
def test_initialization(self, orchestrator):
assert orchestrator.base_data_dir == Path("test_data")
assert orchestrator.base_logs_dir == Path("test_logs")
assert orchestrator.brand_name == "test_brand"
assert orchestrator.timezone == "America/Halifax"
@patch('src.orchestrator.InstagramScraper')
@patch('src.orchestrator.RSSScraperPodcast')
@patch('src.orchestrator.RSSScraperMailChimp')
@patch('src.orchestrator.WordPressScraper')
@patch('src.orchestrator.YouTubeScraper')
def test_initialize_scrapers(self, mock_youtube_class, mock_wordpress_class,
mock_mailchimp_class, mock_podcast_class, mock_instagram_class):
# Create a clean environment with only specific scrapers enabled
with patch.dict('os.environ', {
'WORDPRESS_API_URL': 'https://test.com/wp-json',
'YOUTUBE_CHANNEL_URL': 'https://youtube.com/@test',
# Clear other environment variables
'MAILCHIMP_RSS_URL': '',
'PODCAST_RSS_URL': '',
'INSTAGRAM_USERNAME': ''
}, clear=True):
orchestrator = ScraperOrchestrator()
# Should only have WordPress and YouTube scrapers
assert len(orchestrator.scrapers) == 2
names = [name for name, _ in orchestrator.scrapers]
assert 'WordPress' in names
assert 'YouTube' in names
def test_run_scraper_success(self, orchestrator, mock_scrapers):
orchestrator.scrapers = mock_scrapers
# Run first scraper
result = orchestrator._run_scraper(mock_scrapers[0])
assert result['name'] == 'WordPress'
assert result['status'] == 'success'
assert result['items_count'] == 2
assert result['new_items'] == 1
assert result['error'] is None
def test_run_scraper_error(self, orchestrator):
mock_scraper = MagicMock()
mock_scraper.load_state.side_effect = Exception("Test error")
result = orchestrator._run_scraper(("TestScraper", mock_scraper))
assert result['name'] == 'TestScraper'
assert result['status'] == 'error'
assert result['error'] == "Test error"
def test_run_sequential(self, orchestrator, mock_scrapers):
orchestrator.scrapers = mock_scrapers
results = orchestrator.run_sequential()
assert len(results) == 2
assert results[0]['name'] == 'WordPress'
assert results[1]['name'] == 'YouTube'
assert all(r['status'] == 'success' for r in results)
@patch('multiprocessing.Pool')
def test_run_parallel(self, mock_pool_class, orchestrator, mock_scrapers):
mock_pool = MagicMock()
mock_pool_class.return_value.__enter__.return_value = mock_pool
# Mock the map function to return results
mock_pool.map.return_value = [
{'name': 'WordPress', 'status': 'success', 'items_count': 2, 'new_items': 1,
'error': None, 'duration_seconds': 1.0},
{'name': 'YouTube', 'status': 'success', 'items_count': 1, 'new_items': 1,
'error': None, 'duration_seconds': 2.0}
]
orchestrator.scrapers = mock_scrapers
results = orchestrator.run_parallel(max_workers=2)
assert len(results) == 2
mock_pool_class.assert_called_once_with(processes=2)
mock_pool.map.assert_called_once()
def test_save_statistics(self, orchestrator, tmp_path):
orchestrator.stats_file = tmp_path / "stats.json"
results = [
{'name': 'WordPress', 'status': 'success', 'items_count': 2,
'new_items': 1, 'duration_seconds': 1.0, 'error': None},
{'name': 'YouTube', 'status': 'error', 'items_count': 0,
'new_items': 0, 'duration_seconds': 0.5, 'error': 'Connection failed'}
]
orchestrator.save_statistics(results)
# Check file was created
assert orchestrator.stats_file.exists()
# Load and verify stats
with open(orchestrator.stats_file, 'r') as f:
stats = json.load(f)
assert len(stats) == 1
assert stats[0]['total_scrapers'] == 2
assert stats[0]['successful'] == 1
assert stats[0]['failed'] == 1
assert stats[0]['total_items'] == 2
assert stats[0]['new_items'] == 1
def test_print_summary(self, orchestrator, capsys):
results = [
{'name': 'WordPress', 'status': 'success', 'items_count': 2,
'new_items': 1, 'duration_seconds': 1.0, 'error': None},
{'name': 'YouTube', 'status': 'error', 'items_count': 0,
'new_items': 0, 'duration_seconds': 0.5, 'error': 'Connection failed'}
]
orchestrator.print_summary(results)
captured = capsys.readouterr()
assert "SCRAPING SUMMARY" in captured.out
assert "✓ WordPress:" in captured.out
assert "✗ YouTube:" in captured.out
assert "Successful: 1/2" in captured.out
assert "Total items: 2" in captured.out
@patch('src.orchestrator.ScraperOrchestrator.run_parallel')
@patch('src.orchestrator.ScraperOrchestrator.save_statistics')
@patch('src.orchestrator.ScraperOrchestrator.print_summary')
def test_run_method(self, mock_print, mock_save, mock_parallel, orchestrator):
mock_parallel.return_value = [
{'name': 'Test', 'status': 'success', 'items_count': 1,
'new_items': 1, 'duration_seconds': 1.0, 'error': None}
]
orchestrator.scrapers = [("Test", MagicMock())]
orchestrator.run(parallel=True)
mock_parallel.assert_called_once_with(None)
mock_save.assert_called_once()
mock_print.assert_called_once()