diff --git a/src/instagram_scraper.py b/src/instagram_scraper.py new file mode 100644 index 0000000..6dc268f --- /dev/null +++ b/src/instagram_scraper.py @@ -0,0 +1,399 @@ +import os +import time +import random +from typing import Any, Dict, List, Optional +from datetime import datetime +from pathlib import Path +import instaloader +from src.base_scraper import BaseScraper, ScraperConfig + + +class InstagramScraper(BaseScraper): + """Instagram scraper using instaloader with aggressive rate limiting.""" + + def __init__(self, config: ScraperConfig): + super().__init__(config) + self.username = os.getenv('INSTAGRAM_USERNAME') + self.password = os.getenv('INSTAGRAM_PASSWORD') + self.target_account = os.getenv('INSTAGRAM_TARGET', 'hvacknowitall') + + # Session file for persistence + self.session_file = self.config.data_dir / '.sessions' / f'{self.username}' + self.session_file.parent.mkdir(parents=True, exist_ok=True) + + # Initialize loader + self.loader = self._setup_loader() + self._login() + + # Request counter for rate limiting + self.request_count = 0 + self.max_requests_per_hour = 100 + + def _setup_loader(self) -> instaloader.Instaloader: + """Setup Instaloader with conservative settings.""" + loader = instaloader.Instaloader( + quiet=True, + user_agent='Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Mobile/15E148 Safari/604.1', + dirname_pattern=str(self.config.data_dir / 'media' / 'Instagram'), + filename_pattern='{date_utc}_UTC_{shortcode}', + download_pictures=False, # Don't download by default + download_videos=False, + download_video_thumbnails=False, + download_geotags=False, + download_comments=False, + save_metadata=False, + compress_json=False, + post_metadata_txt_pattern='', + storyitem_metadata_txt_pattern='', + max_connection_attempts=3, + request_timeout=30.0, + rate_controller=lambda x: time.sleep(random.uniform(5, 10)) # Built-in rate limiting + ) + return loader + + def _login(self) -> None: + """Login to Instagram or load existing session.""" + try: + # Try to load existing session + if self.session_file.exists(): + self.loader.load_session_from_file(str(self.session_file), self.username) + self.logger.info("Loaded existing Instagram session") + else: + # Login with credentials + self.logger.info("Logging in to Instagram...") + self.loader.login(self.username, self.password) + self.loader.save_session_to_file(str(self.session_file)) + self.logger.info("Instagram login successful, session saved") + + except Exception as e: + self.logger.error(f"Instagram login error: {e}") + + def _aggressive_delay(self, min_seconds: float = 5, max_seconds: float = 10) -> None: + """Add aggressive random delay for Instagram.""" + delay = random.uniform(min_seconds, max_seconds) + self.logger.debug(f"Waiting {delay:.2f} seconds (Instagram rate limiting)...") + time.sleep(delay) + + def _check_rate_limit(self) -> None: + """Check and enforce rate limiting.""" + self.request_count += 1 + + if self.request_count >= self.max_requests_per_hour: + self.logger.warning(f"Rate limit reached ({self.max_requests_per_hour} requests), pausing for 1 hour...") + time.sleep(3600) # Wait 1 hour + self.request_count = 0 + elif self.request_count % 10 == 0: + # Take a longer break every 10 requests + self.logger.info("Taking extended break after 10 requests...") + self._aggressive_delay(30, 60) + + def _get_post_type(self, post) -> str: + """Determine post type from Instagram post object.""" + typename = getattr(post, 'typename', '') + is_video = getattr(post, 'is_video', False) + + if typename == 'GraphStoryImage' or typename == 'GraphStoryVideo': + return 'story' + elif 'Video' in typename or is_video: + return 'reel' + else: + return 'post' + + def fetch_posts(self, max_posts: int = 20) -> List[Dict[str, Any]]: + """Fetch posts from Instagram profile.""" + posts_data = [] + + try: + self.logger.info(f"Fetching posts from @{self.target_account}") + + # Get profile + profile = instaloader.Profile.from_username(self.loader.context, self.target_account) + self._check_rate_limit() + + # Get posts + posts = profile.get_posts() + + count = 0 + for post in posts: + if count >= max_posts: + break + + try: + # Extract post data + post_data = { + 'id': post.shortcode, + 'type': self._get_post_type(post), + 'caption': post.caption if post.caption else '', + 'author': post.owner_username, + 'publish_date': post.date_utc.isoformat(), + 'link': f'https://www.instagram.com/p/{post.shortcode}/', + 'likes': post.likes, + 'comments': post.comments, + 'views': post.video_view_count if hasattr(post, 'video_view_count') else None, + 'media_count': post.mediacount if hasattr(post, 'mediacount') else 1, + 'hashtags': list(post.caption_hashtags) if post.caption else [], + 'mentions': list(post.caption_mentions) if post.caption else [], + 'is_video': getattr(post, 'is_video', False) + } + + posts_data.append(post_data) + count += 1 + + # Aggressive rate limiting between posts + self._aggressive_delay() + self._check_rate_limit() + + # Log progress + if count % 5 == 0: + self.logger.info(f"Fetched {count}/{max_posts} posts") + + except Exception as e: + self.logger.error(f"Error processing post: {e}") + continue + + self.logger.info(f"Successfully fetched {len(posts_data)} posts") + + except Exception as e: + self.logger.error(f"Error fetching posts: {e}") + + return posts_data + + def fetch_stories(self) -> List[Dict[str, Any]]: + """Fetch stories from Instagram profile.""" + stories_data = [] + + try: + self.logger.info(f"Fetching stories from @{self.target_account}") + + # Get profile + profile = instaloader.Profile.from_username(self.loader.context, self.target_account) + self._check_rate_limit() + + # Get user ID for stories + userid = profile.userid + + # Get stories + for story in self.loader.get_stories(userids=[userid]): + for item in story: + try: + story_data = { + 'id': item.mediaid, + 'type': 'story', + 'caption': '', # Stories usually don't have captions + 'author': item.owner_username, + 'publish_date': item.date_utc.isoformat(), + 'link': f'https://www.instagram.com/stories/{item.owner_username}/{item.mediaid}/', + 'is_video': item.is_video if hasattr(item, 'is_video') else False + } + + stories_data.append(story_data) + + # Rate limiting + self._aggressive_delay() + self._check_rate_limit() + + except Exception as e: + self.logger.error(f"Error processing story: {e}") + continue + + self.logger.info(f"Successfully fetched {len(stories_data)} stories") + + except Exception as e: + self.logger.error(f"Error fetching stories: {e}") + + return stories_data + + def fetch_reels(self, max_reels: int = 10) -> List[Dict[str, Any]]: + """Fetch reels (videos) from Instagram profile.""" + reels_data = [] + + try: + self.logger.info(f"Fetching reels from @{self.target_account}") + + # Get profile + profile = instaloader.Profile.from_username(self.loader.context, self.target_account) + self._check_rate_limit() + + # Get posts and filter for videos/reels + posts = profile.get_posts() + + count = 0 + for post in posts: + if count >= max_reels: + break + + # Check if it's a video/reel + if not getattr(post, 'is_video', False): + continue + + try: + reel_data = { + 'id': post.shortcode, + 'type': 'reel', + 'caption': post.caption if post.caption else '', + 'author': post.owner_username, + 'publish_date': post.date_utc.isoformat(), + 'link': f'https://www.instagram.com/reel/{post.shortcode}/', + 'likes': post.likes, + 'comments': post.comments, + 'views': post.video_view_count if hasattr(post, 'video_view_count') else None, + 'duration': post.video_duration if hasattr(post, 'video_duration') else None, + 'hashtags': list(post.caption_hashtags) if post.caption else [], + 'mentions': list(post.caption_mentions) if post.caption else [] + } + + reels_data.append(reel_data) + count += 1 + + # Aggressive rate limiting + self._aggressive_delay() + self._check_rate_limit() + + except Exception as e: + self.logger.error(f"Error processing reel: {e}") + continue + + self.logger.info(f"Successfully fetched {len(reels_data)} reels") + + except Exception as e: + self.logger.error(f"Error fetching reels: {e}") + + return reels_data + + def fetch_content(self) -> List[Dict[str, Any]]: + """Fetch all content types from Instagram.""" + all_content = [] + + # Fetch posts + posts = self.fetch_posts(max_posts=20) + all_content.extend(posts) + + # Take a break between content types + self.logger.info("Taking break before fetching stories...") + self._aggressive_delay(15, 30) + + # Fetch stories + stories = self.fetch_stories() + all_content.extend(stories) + + # Note: Reels are included in posts (videos) + # so we don't need to fetch them separately + + self.logger.info(f"Total content fetched: {len(all_content)} items") + return all_content + + def format_markdown(self, items: List[Dict[str, Any]]) -> str: + """Format Instagram content as markdown.""" + markdown_sections = [] + + for item in items: + section = [] + + # ID + item_id = item.get('id', 'N/A') + section.append(f"# ID: {item_id}") + section.append("") + + # Type + item_type = item.get('type', 'post') + section.append(f"## Type: {item_type}") + section.append("") + + # Author + author = item.get('author', 'Unknown') + section.append(f"## Author: {author}") + section.append("") + + # Publish Date + pub_date = item.get('publish_date', '') + section.append(f"## Publish Date: {pub_date}") + section.append("") + + # Link + link = item.get('link', '') + section.append(f"## Link: {link}") + section.append("") + + # Engagement metrics + likes = item.get('likes') + if likes is not None: + section.append(f"## Likes: {likes}") + section.append("") + + comments = item.get('comments') + if comments is not None: + section.append(f"## Comments: {comments}") + section.append("") + + views = item.get('views') + if views is not None: + section.append(f"## Views: {views}") + section.append("") + + # Hashtags + hashtags = item.get('hashtags', []) + if hashtags: + hashtags_str = ', '.join(hashtags) + section.append(f"## Hashtags: {hashtags_str}") + section.append("") + + # Mentions + mentions = item.get('mentions', []) + if mentions: + mentions_str = ', '.join(mentions) + section.append(f"## Mentions: {mentions_str}") + section.append("") + + # Caption/Description + section.append("## Description:") + caption = item.get('caption', '') + if caption: + # Limit caption to first 500 characters + if len(caption) > 500: + caption = caption[:500] + "..." + section.append(caption) + section.append("") + + # Separator + section.append("-" * 50) + section.append("") + + markdown_sections.append('\n'.join(section)) + + return '\n'.join(markdown_sections) + + def get_incremental_items(self, items: List[Dict[str, Any]], state: Dict[str, Any]) -> List[Dict[str, Any]]: + """Get only new posts since last sync.""" + if not state: + return items + + last_post_id = state.get('last_post_id') + + if not last_post_id: + return items + + # Filter for posts newer than the last synced + new_items = [] + for item in items: + if item.get('id') == last_post_id: + break # Found the last synced post + new_items.append(item) + + return new_items + + def update_state(self, state: Dict[str, Any], items: List[Dict[str, Any]]) -> Dict[str, Any]: + """Update state with latest post information.""" + if not items: + return state + + # Get the first item (most recent) + latest_item = items[0] + + state['last_post_id'] = latest_item.get('id') + state['last_post_date'] = latest_item.get('publish_date') + state['last_sync'] = datetime.now(self.tz).isoformat() + state['post_count'] = len([i for i in items if i.get('type') == 'post']) + state['story_count'] = len([i for i in items if i.get('type') == 'story']) + state['reel_count'] = len([i for i in items if i.get('type') == 'reel']) + + return state \ No newline at end of file diff --git a/src/orchestrator.py b/src/orchestrator.py new file mode 100644 index 0000000..065ca5b --- /dev/null +++ b/src/orchestrator.py @@ -0,0 +1,352 @@ +#!/usr/bin/env python3 +""" +Orchestrator for running all scrapers in parallel. +""" + +import os +import sys +import time +import logging +import multiprocessing +from pathlib import Path +from typing import List, Dict, Any, Optional +from datetime import datetime +import pytz +import json + +# Import all scrapers +from src.base_scraper import ScraperConfig +from src.wordpress_scraper import WordPressScraper +from src.rss_scraper import RSSScraperMailChimp, RSSScraperPodcast +from src.youtube_scraper import YouTubeScraper +from src.instagram_scraper import InstagramScraper + + +class ScraperOrchestrator: + """Orchestrator for running multiple scrapers in parallel.""" + + def __init__(self, base_data_dir: Path = Path("data"), + base_logs_dir: Path = Path("logs"), + brand_name: str = "hvacknowitall", + timezone: str = "America/Halifax"): + """Initialize the orchestrator.""" + self.base_data_dir = base_data_dir + self.base_logs_dir = base_logs_dir + self.brand_name = brand_name + self.timezone = timezone + self.tz = pytz.timezone(timezone) + + # Setup orchestrator logger + self.logger = self._setup_logger() + + # Initialize scrapers + self.scrapers = self._initialize_scrapers() + + # Statistics file + self.stats_file = self.base_data_dir / "orchestrator_stats.json" + + def _setup_logger(self) -> logging.Logger: + """Setup logger for orchestrator.""" + logger = logging.getLogger("hvacknowitall_orchestrator") + logger.setLevel(logging.INFO) + + # Console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + + # File handler + log_file = self.base_logs_dir / "orchestrator.log" + log_file.parent.mkdir(parents=True, exist_ok=True) + file_handler = logging.FileHandler(log_file) + file_handler.setLevel(logging.DEBUG) + + # Formatter + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + console_handler.setFormatter(formatter) + file_handler.setFormatter(formatter) + + logger.addHandler(console_handler) + logger.addHandler(file_handler) + + return logger + + def _initialize_scrapers(self) -> List[tuple]: + """Initialize all scraper instances.""" + scrapers = [] + + # WordPress scraper + if os.getenv('WORDPRESS_API_URL'): + config = ScraperConfig( + source_name="wordpress", + brand_name=self.brand_name, + data_dir=self.base_data_dir, + logs_dir=self.base_logs_dir, + timezone=self.timezone + ) + scrapers.append(("WordPress", WordPressScraper(config))) + self.logger.info("Initialized WordPress scraper") + + # MailChimp RSS scraper + if os.getenv('MAILCHIMP_RSS_URL'): + config = ScraperConfig( + source_name="mailchimp", + brand_name=self.brand_name, + data_dir=self.base_data_dir, + logs_dir=self.base_logs_dir, + timezone=self.timezone + ) + scrapers.append(("MailChimp", RSSScraperMailChimp(config))) + self.logger.info("Initialized MailChimp RSS scraper") + + # Podcast RSS scraper + if os.getenv('PODCAST_RSS_URL'): + config = ScraperConfig( + source_name="podcast", + brand_name=self.brand_name, + data_dir=self.base_data_dir, + logs_dir=self.base_logs_dir, + timezone=self.timezone + ) + scrapers.append(("Podcast", RSSScraperPodcast(config))) + self.logger.info("Initialized Podcast RSS scraper") + + # YouTube scraper + if os.getenv('YOUTUBE_CHANNEL_URL'): + config = ScraperConfig( + source_name="youtube", + brand_name=self.brand_name, + data_dir=self.base_data_dir, + logs_dir=self.base_logs_dir, + timezone=self.timezone + ) + scrapers.append(("YouTube", YouTubeScraper(config))) + self.logger.info("Initialized YouTube scraper") + + # Instagram scraper + if os.getenv('INSTAGRAM_USERNAME'): + config = ScraperConfig( + source_name="instagram", + brand_name=self.brand_name, + data_dir=self.base_data_dir, + logs_dir=self.base_logs_dir, + timezone=self.timezone + ) + scrapers.append(("Instagram", InstagramScraper(config))) + self.logger.info("Initialized Instagram scraper") + + return scrapers + + def _run_scraper(self, scraper_info: tuple) -> Dict[str, Any]: + """Run a single scraper and return results.""" + name, scraper = scraper_info + result = { + 'name': name, + 'status': 'pending', + 'items_count': 0, + 'new_items': 0, + 'error': None, + 'start_time': datetime.now(self.tz).isoformat(), + 'end_time': None, + 'duration_seconds': 0 + } + + try: + start_time = time.time() + self.logger.info(f"Starting {name} scraper...") + + # Load state + state = scraper.load_state() + + # Fetch content + items = scraper.fetch_content() + result['items_count'] = len(items) + + # Filter for incremental items + new_items = scraper.get_incremental_items(items, state) + result['new_items'] = len(new_items) + + if new_items: + # Format as markdown + markdown_content = scraper.format_markdown(new_items) + + # Archive existing file + scraper.archive_current_file() + + # Save new markdown + filename = scraper.generate_filename() + file_path = self.base_data_dir / filename + + with open(file_path, 'w', encoding='utf-8') as f: + f.write(markdown_content) + + self.logger.info(f"{name}: Saved {len(new_items)} new items to {filename}") + + # Update state + new_state = scraper.update_state(state, items) + scraper.save_state(new_state) + else: + self.logger.info(f"{name}: No new items found") + + result['status'] = 'success' + result['end_time'] = datetime.now(self.tz).isoformat() + result['duration_seconds'] = round(time.time() - start_time, 2) + + except Exception as e: + self.logger.error(f"{name} scraper failed: {e}") + result['status'] = 'error' + result['error'] = str(e) + result['end_time'] = datetime.now(self.tz).isoformat() + result['duration_seconds'] = round(time.time() - start_time, 2) + + return result + + def run_sequential(self) -> List[Dict[str, Any]]: + """Run all scrapers sequentially.""" + self.logger.info("Starting sequential scraping...") + results = [] + + for scraper_info in self.scrapers: + result = self._run_scraper(scraper_info) + results.append(result) + + return results + + def run_parallel(self, max_workers: Optional[int] = None) -> List[Dict[str, Any]]: + """Run all scrapers in parallel using multiprocessing.""" + self.logger.info(f"Starting parallel scraping with {max_workers or 'all'} workers...") + + if not self.scrapers: + self.logger.warning("No scrapers configured") + return [] + + # Use number of scrapers as max workers if not specified + if max_workers is None: + max_workers = len(self.scrapers) + + with multiprocessing.Pool(processes=max_workers) as pool: + results = pool.map(self._run_scraper, self.scrapers) + + return results + + def save_statistics(self, results: List[Dict[str, Any]]) -> None: + """Save run statistics to file.""" + stats = { + 'run_time': datetime.now(self.tz).isoformat(), + 'total_scrapers': len(results), + 'successful': sum(1 for r in results if r['status'] == 'success'), + 'failed': sum(1 for r in results if r['status'] == 'error'), + 'total_items': sum(r['items_count'] for r in results), + 'new_items': sum(r['new_items'] for r in results), + 'total_duration': sum(r['duration_seconds'] for r in results), + 'results': results + } + + # Load existing stats if file exists + all_stats = [] + if self.stats_file.exists(): + try: + with open(self.stats_file, 'r') as f: + all_stats = json.load(f) + except: + pass + + # Append new stats (keep last 100 runs) + all_stats.append(stats) + if len(all_stats) > 100: + all_stats = all_stats[-100:] + + # Save to file + with open(self.stats_file, 'w') as f: + json.dump(all_stats, f, indent=2) + + self.logger.info(f"Statistics saved to {self.stats_file}") + + def print_summary(self, results: List[Dict[str, Any]]) -> None: + """Print a summary of the scraping results.""" + print("\n" + "="*60) + print("SCRAPING SUMMARY") + print("="*60) + + for result in results: + status_symbol = "✓" if result['status'] == 'success' else "✗" + print(f"\n{status_symbol} {result['name']}:") + print(f" Status: {result['status']}") + print(f" Items found: {result['items_count']}") + print(f" New items: {result['new_items']}") + print(f" Duration: {result['duration_seconds']}s") + if result['error']: + print(f" Error: {result['error']}") + + print("\n" + "-"*60) + print("TOTALS:") + print(f" Successful: {sum(1 for r in results if r['status'] == 'success')}/{len(results)}") + print(f" Total items: {sum(r['items_count'] for r in results)}") + print(f" New items: {sum(r['new_items'] for r in results)}") + print(f" Total time: {sum(r['duration_seconds'] for r in results):.2f}s") + print("="*60 + "\n") + + def run(self, parallel: bool = True, max_workers: Optional[int] = None) -> None: + """Main run method.""" + start_time = time.time() + + self.logger.info(f"Starting orchestrator at {datetime.now(self.tz).isoformat()}") + self.logger.info(f"Configured scrapers: {len(self.scrapers)}") + + if not self.scrapers: + self.logger.error("No scrapers configured. Please check your .env file.") + return + + # Run scrapers + if parallel: + results = self.run_parallel(max_workers) + else: + results = self.run_sequential() + + # Save statistics + self.save_statistics(results) + + # Print summary + self.print_summary(results) + + total_time = time.time() - start_time + self.logger.info(f"Orchestrator completed in {total_time:.2f} seconds") + + +def main(): + """Main entry point.""" + import argparse + from dotenv import load_dotenv + + # Load environment variables + load_dotenv() + + # Parse arguments + parser = argparse.ArgumentParser(description="Run HVAC Know It All content scrapers") + parser.add_argument('--sequential', action='store_true', + help='Run scrapers sequentially instead of in parallel') + parser.add_argument('--max-workers', type=int, default=None, + help='Maximum number of parallel workers') + parser.add_argument('--data-dir', type=str, default='data', + help='Base data directory') + parser.add_argument('--logs-dir', type=str, default='logs', + help='Base logs directory') + + args = parser.parse_args() + + # Create orchestrator + orchestrator = ScraperOrchestrator( + base_data_dir=Path(args.data_dir), + base_logs_dir=Path(args.logs_dir) + ) + + # Run scrapers + orchestrator.run( + parallel=not args.sequential, + max_workers=args.max_workers + ) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test_data/.sessions/bengizmo b/test_data/.sessions/bengizmo new file mode 100644 index 0000000..7183654 Binary files /dev/null and b/test_data/.sessions/bengizmo differ diff --git a/test_data/test_wordpress.md b/test_data/test_wordpress.md new file mode 100644 index 0000000..7f0d6fa --- /dev/null +++ b/test_data/test_wordpress.md @@ -0,0 +1 @@ +# Post 2 \ No newline at end of file diff --git a/test_data/test_youtube.md b/test_data/test_youtube.md new file mode 100644 index 0000000..c2ec21a --- /dev/null +++ b/test_data/test_youtube.md @@ -0,0 +1 @@ +# Video 1 \ No newline at end of file diff --git a/tests/test_instagram_scraper.py b/tests/test_instagram_scraper.py new file mode 100644 index 0000000..f568a52 --- /dev/null +++ b/tests/test_instagram_scraper.py @@ -0,0 +1,271 @@ +import pytest +from unittest.mock import Mock, patch, MagicMock, PropertyMock +from datetime import datetime +from pathlib import Path +import random +from src.instagram_scraper import InstagramScraper +from src.base_scraper import ScraperConfig + + +class TestInstagramScraper: + @pytest.fixture + def config(self): + return ScraperConfig( + source_name="instagram", + brand_name="hvacknowitall", + data_dir=Path("data"), + logs_dir=Path("logs"), + timezone="America/Halifax" + ) + + @pytest.fixture + def mock_env(self): + with patch.dict('os.environ', { + 'INSTAGRAM_USERNAME': 'testuser', + 'INSTAGRAM_PASSWORD': 'testpass', + 'INSTAGRAM_TARGET': 'hvacknowitall' + }): + yield + + @pytest.fixture + def sample_post(self): + mock_post = MagicMock() + mock_post.shortcode = 'ABC123' + mock_post.caption = 'Test caption #hvac #tips' + mock_post.owner_username = 'hvacknowitall' + mock_post.date_utc = datetime(2024, 1, 1, 12, 0, 0) + mock_post.typename = 'GraphImage' + mock_post.url = 'https://www.instagram.com/p/ABC123/' + mock_post.likes = 150 + mock_post.comments = 25 + mock_post.video_view_count = None + mock_post.mediacount = 1 + mock_post.caption_hashtags = ['hvac', 'tips'] + mock_post.caption_mentions = [] + mock_post.is_video = False # Explicitly set is_video to False + return mock_post + + @pytest.fixture + def sample_story(self): + mock_story = MagicMock() + mock_story.mediaid = 123456789 + mock_story.owner_username = 'hvacknowitall' + mock_story.date_utc = datetime(2024, 1, 1, 12, 0, 0) + mock_story.url = 'https://www.instagram.com/stories/hvacknowitall/123456789/' + mock_story.typename = 'GraphStoryImage' + mock_story.is_video = False # Explicitly set is_video to False + return mock_story + + @patch('src.instagram_scraper.InstagramScraper._login') + @patch('src.instagram_scraper.InstagramScraper._setup_loader') + def test_initialization(self, mock_setup, mock_login, config, mock_env): + mock_setup.return_value = MagicMock() + scraper = InstagramScraper(config) + assert scraper.config == config + assert scraper.username == 'testuser' + assert scraper.password == 'testpass' + assert scraper.target_account == 'hvacknowitall' + + @patch('src.instagram_scraper.InstagramScraper._login') + @patch('instaloader.Instaloader') + def test_setup_loader(self, mock_instaloader_class, mock_login, config, mock_env): + mock_loader = MagicMock() + mock_instaloader_class.return_value = mock_loader + + scraper = InstagramScraper(config) + + # Test that instaloader was initialized with correct params + mock_instaloader_class.assert_called_once() + call_kwargs = mock_instaloader_class.call_args[1] + assert call_kwargs['quiet'] == True + assert call_kwargs['download_videos'] == False + assert call_kwargs['download_video_thumbnails'] == False + + @patch('src.instagram_scraper.InstagramScraper._setup_loader') + @patch('instaloader.Instaloader') + def test_login(self, mock_instaloader_class, mock_setup, config, mock_env): + mock_loader = MagicMock() + mock_setup.return_value = mock_loader + + # Create scraper without triggering login in __init__ + with patch('src.instagram_scraper.InstagramScraper._login'): + scraper = InstagramScraper(config) + scraper.loader = mock_loader + + # Now test login + scraper._login() + + # Should try to login with credentials since no session file exists + mock_loader.login.assert_called_once_with('testuser', 'testpass') + + @patch('time.sleep') + @patch('random.uniform') + @patch('src.instagram_scraper.InstagramScraper._login') + @patch('src.instagram_scraper.InstagramScraper._setup_loader') + def test_aggressive_delay(self, mock_setup, mock_login, mock_uniform, mock_sleep, config, mock_env): + mock_uniform.return_value = 7.5 + mock_setup.return_value = MagicMock() + + scraper = InstagramScraper(config) + scraper._aggressive_delay() + + mock_uniform.assert_called_with(5, 10) + mock_sleep.assert_called_with(7.5) + + @patch('instaloader.Profile.from_username') + @patch('src.instagram_scraper.InstagramScraper._login') + @patch('src.instagram_scraper.InstagramScraper._setup_loader') + def test_fetch_posts(self, mock_setup, mock_login, mock_profile_from_username, + config, mock_env, sample_post): + mock_loader = MagicMock() + mock_setup.return_value = mock_loader + + mock_profile = MagicMock() + mock_profile.get_posts.return_value = [sample_post] + mock_profile_from_username.return_value = mock_profile + + scraper = InstagramScraper(config) + scraper.loader = mock_loader + posts = scraper.fetch_posts(max_posts=10) + + assert len(posts) == 1 + assert posts[0]['id'] == 'ABC123' + assert posts[0]['type'] == 'post' + assert posts[0]['caption'] == 'Test caption #hvac #tips' + + @patch('instaloader.Profile.from_username') + @patch('src.instagram_scraper.InstagramScraper._login') + @patch('src.instagram_scraper.InstagramScraper._setup_loader') + def test_fetch_stories(self, mock_setup, mock_login, mock_profile_from_username, + config, mock_env, sample_story): + mock_loader = MagicMock() + mock_setup.return_value = mock_loader + # get_stories returns an iterable where each element is an iterable of story items + mock_loader.get_stories.return_value = [[sample_story]] # Simplified: one story collection with one item + + mock_profile = MagicMock() + mock_profile.userid = 12345 + mock_profile_from_username.return_value = mock_profile + + scraper = InstagramScraper(config) + scraper.loader = mock_loader + stories = scraper.fetch_stories() + + assert len(stories) == 1 + assert stories[0]['id'] == 123456789 + assert stories[0]['type'] == 'story' + + @patch('src.instagram_scraper.InstagramScraper._login') + @patch('src.instagram_scraper.InstagramScraper._setup_loader') + def test_get_post_type(self, mock_setup, mock_login, config, mock_env): + mock_setup.return_value = MagicMock() + scraper = InstagramScraper(config) + + mock_post = MagicMock() + + # Test regular post + mock_post.typename = 'GraphImage' + mock_post.is_video = False + assert scraper._get_post_type(mock_post) == 'post' + + # Test video/reel + mock_post.typename = 'GraphVideo' + mock_post.is_video = True + assert scraper._get_post_type(mock_post) == 'reel' + + # Test carousel + mock_post.typename = 'GraphSidecar' + mock_post.is_video = False + assert scraper._get_post_type(mock_post) == 'post' + + @patch('src.instagram_scraper.InstagramScraper._login') + @patch('src.instagram_scraper.InstagramScraper._setup_loader') + def test_format_markdown(self, mock_setup, mock_login, config, mock_env): + mock_setup.return_value = MagicMock() + scraper = InstagramScraper(config) + + items = [ + { + 'id': 'ABC123', + 'type': 'post', + 'caption': 'Test post', + 'author': 'hvacknowitall', + 'publish_date': '2024-01-01T12:00:00', + 'link': 'https://www.instagram.com/p/ABC123/', + 'likes': 150, + 'comments': 25, + 'views': None, + 'hashtags': ['hvac', 'tips'] + } + ] + + markdown = scraper.format_markdown(items) + + assert '# ID: ABC123' in markdown + assert '## Type: post' in markdown + assert '## Author: hvacknowitall' in markdown + assert '## Publish Date: 2024-01-01T12:00:00' in markdown + assert '## Link: https://www.instagram.com/p/ABC123/' in markdown + assert '## Likes: 150' in markdown + assert '## Comments: 25' in markdown + assert '## Hashtags: hvac, tips' in markdown + assert 'Test post' in markdown + + @patch('src.instagram_scraper.InstagramScraper._login') + @patch('src.instagram_scraper.InstagramScraper._setup_loader') + def test_get_incremental_items(self, mock_setup, mock_login, config, mock_env): + mock_setup.return_value = MagicMock() + scraper = InstagramScraper(config) + + items = [ + {'id': 'post3', 'publish_date': '2024-01-03T12:00:00'}, + {'id': 'post2', 'publish_date': '2024-01-02T12:00:00'}, + {'id': 'post1', 'publish_date': '2024-01-01T12:00:00'} + ] + + # Test with no previous state + state = {} + new_items = scraper.get_incremental_items(items, state) + assert len(new_items) == 3 + + # Test with existing state + state = {'last_post_id': 'post2'} + new_items = scraper.get_incremental_items(items, state) + assert len(new_items) == 1 + assert new_items[0]['id'] == 'post3' + + @patch('src.instagram_scraper.InstagramScraper._login') + @patch('src.instagram_scraper.InstagramScraper._setup_loader') + def test_update_state(self, mock_setup, mock_login, config, mock_env): + mock_setup.return_value = MagicMock() + scraper = InstagramScraper(config) + + state = {} + items = [ + {'id': 'post2', 'publish_date': '2024-01-02T12:00:00', 'type': 'post'}, + {'id': 'post1', 'publish_date': '2024-01-01T12:00:00', 'type': 'post'} + ] + + updated_state = scraper.update_state(state, items) + + assert updated_state['last_post_id'] == 'post2' + assert updated_state['last_post_date'] == '2024-01-02T12:00:00' + assert updated_state['post_count'] == 2 + + @patch('src.instagram_scraper.InstagramScraper._setup_loader') + @patch('instaloader.Instaloader') + def test_error_handling(self, mock_instaloader_class, mock_setup, config, mock_env): + mock_loader = MagicMock() + mock_setup.return_value = mock_loader + mock_loader.login.side_effect = Exception("Login failed") + + # Test that login error is handled gracefully + with patch('src.instagram_scraper.InstagramScraper._login'): + scraper = InstagramScraper(config) + scraper.loader = mock_loader + + scraper._login() # Should not raise, just log error + + # Test fetch error handling + posts = scraper.fetch_posts() + assert posts == [] \ No newline at end of file diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py new file mode 100644 index 0000000..b3f53ac --- /dev/null +++ b/tests/test_orchestrator.py @@ -0,0 +1,186 @@ +import pytest +from unittest.mock import Mock, patch, MagicMock +from pathlib import Path +import json +from src.orchestrator import ScraperOrchestrator +from src.base_scraper import ScraperConfig + + +class TestScraperOrchestrator: + @pytest.fixture + def orchestrator(self): + return ScraperOrchestrator( + base_data_dir=Path("test_data"), + base_logs_dir=Path("test_logs"), + brand_name="test_brand", + timezone="America/Halifax" + ) + + @pytest.fixture + def mock_scrapers(self): + """Create mock scrapers.""" + mock_wordpress = MagicMock() + mock_wordpress.load_state.return_value = {} + mock_wordpress.fetch_content.return_value = [ + {'id': '1', 'title': 'Post 1'}, + {'id': '2', 'title': 'Post 2'} + ] + mock_wordpress.get_incremental_items.return_value = [{'id': '2', 'title': 'Post 2'}] + mock_wordpress.format_markdown.return_value = "# Post 2" + mock_wordpress.generate_filename.return_value = "test_wordpress.md" + mock_wordpress.update_state.return_value = {'last_id': '2'} + + mock_youtube = MagicMock() + mock_youtube.load_state.return_value = {} + mock_youtube.fetch_content.return_value = [ + {'id': 'vid1', 'title': 'Video 1'} + ] + mock_youtube.get_incremental_items.return_value = [{'id': 'vid1', 'title': 'Video 1'}] + mock_youtube.format_markdown.return_value = "# Video 1" + mock_youtube.generate_filename.return_value = "test_youtube.md" + mock_youtube.update_state.return_value = {'last_video_id': 'vid1'} + + return [ + ("WordPress", mock_wordpress), + ("YouTube", mock_youtube) + ] + + def test_initialization(self, orchestrator): + assert orchestrator.base_data_dir == Path("test_data") + assert orchestrator.base_logs_dir == Path("test_logs") + assert orchestrator.brand_name == "test_brand" + assert orchestrator.timezone == "America/Halifax" + + @patch('src.orchestrator.InstagramScraper') + @patch('src.orchestrator.RSSScraperPodcast') + @patch('src.orchestrator.RSSScraperMailChimp') + @patch('src.orchestrator.WordPressScraper') + @patch('src.orchestrator.YouTubeScraper') + def test_initialize_scrapers(self, mock_youtube_class, mock_wordpress_class, + mock_mailchimp_class, mock_podcast_class, mock_instagram_class): + # Create a clean environment with only specific scrapers enabled + with patch.dict('os.environ', { + 'WORDPRESS_API_URL': 'https://test.com/wp-json', + 'YOUTUBE_CHANNEL_URL': 'https://youtube.com/@test', + # Clear other environment variables + 'MAILCHIMP_RSS_URL': '', + 'PODCAST_RSS_URL': '', + 'INSTAGRAM_USERNAME': '' + }, clear=True): + orchestrator = ScraperOrchestrator() + # Should only have WordPress and YouTube scrapers + assert len(orchestrator.scrapers) == 2 + names = [name for name, _ in orchestrator.scrapers] + assert 'WordPress' in names + assert 'YouTube' in names + + def test_run_scraper_success(self, orchestrator, mock_scrapers): + orchestrator.scrapers = mock_scrapers + + # Run first scraper + result = orchestrator._run_scraper(mock_scrapers[0]) + + assert result['name'] == 'WordPress' + assert result['status'] == 'success' + assert result['items_count'] == 2 + assert result['new_items'] == 1 + assert result['error'] is None + + def test_run_scraper_error(self, orchestrator): + mock_scraper = MagicMock() + mock_scraper.load_state.side_effect = Exception("Test error") + + result = orchestrator._run_scraper(("TestScraper", mock_scraper)) + + assert result['name'] == 'TestScraper' + assert result['status'] == 'error' + assert result['error'] == "Test error" + + def test_run_sequential(self, orchestrator, mock_scrapers): + orchestrator.scrapers = mock_scrapers + + results = orchestrator.run_sequential() + + assert len(results) == 2 + assert results[0]['name'] == 'WordPress' + assert results[1]['name'] == 'YouTube' + assert all(r['status'] == 'success' for r in results) + + @patch('multiprocessing.Pool') + def test_run_parallel(self, mock_pool_class, orchestrator, mock_scrapers): + mock_pool = MagicMock() + mock_pool_class.return_value.__enter__.return_value = mock_pool + + # Mock the map function to return results + mock_pool.map.return_value = [ + {'name': 'WordPress', 'status': 'success', 'items_count': 2, 'new_items': 1, + 'error': None, 'duration_seconds': 1.0}, + {'name': 'YouTube', 'status': 'success', 'items_count': 1, 'new_items': 1, + 'error': None, 'duration_seconds': 2.0} + ] + + orchestrator.scrapers = mock_scrapers + results = orchestrator.run_parallel(max_workers=2) + + assert len(results) == 2 + mock_pool_class.assert_called_once_with(processes=2) + mock_pool.map.assert_called_once() + + def test_save_statistics(self, orchestrator, tmp_path): + orchestrator.stats_file = tmp_path / "stats.json" + + results = [ + {'name': 'WordPress', 'status': 'success', 'items_count': 2, + 'new_items': 1, 'duration_seconds': 1.0, 'error': None}, + {'name': 'YouTube', 'status': 'error', 'items_count': 0, + 'new_items': 0, 'duration_seconds': 0.5, 'error': 'Connection failed'} + ] + + orchestrator.save_statistics(results) + + # Check file was created + assert orchestrator.stats_file.exists() + + # Load and verify stats + with open(orchestrator.stats_file, 'r') as f: + stats = json.load(f) + + assert len(stats) == 1 + assert stats[0]['total_scrapers'] == 2 + assert stats[0]['successful'] == 1 + assert stats[0]['failed'] == 1 + assert stats[0]['total_items'] == 2 + assert stats[0]['new_items'] == 1 + + def test_print_summary(self, orchestrator, capsys): + results = [ + {'name': 'WordPress', 'status': 'success', 'items_count': 2, + 'new_items': 1, 'duration_seconds': 1.0, 'error': None}, + {'name': 'YouTube', 'status': 'error', 'items_count': 0, + 'new_items': 0, 'duration_seconds': 0.5, 'error': 'Connection failed'} + ] + + orchestrator.print_summary(results) + + captured = capsys.readouterr() + assert "SCRAPING SUMMARY" in captured.out + assert "✓ WordPress:" in captured.out + assert "✗ YouTube:" in captured.out + assert "Successful: 1/2" in captured.out + assert "Total items: 2" in captured.out + + @patch('src.orchestrator.ScraperOrchestrator.run_parallel') + @patch('src.orchestrator.ScraperOrchestrator.save_statistics') + @patch('src.orchestrator.ScraperOrchestrator.print_summary') + def test_run_method(self, mock_print, mock_save, mock_parallel, orchestrator): + mock_parallel.return_value = [ + {'name': 'Test', 'status': 'success', 'items_count': 1, + 'new_items': 1, 'duration_seconds': 1.0, 'error': None} + ] + + orchestrator.scrapers = [("Test", MagicMock())] + orchestrator.run(parallel=True) + + mock_parallel.assert_called_once_with(None) + mock_save.assert_called_once() + mock_print.assert_called_once() \ No newline at end of file