#!/usr/bin/env python3 """ Real-world testing script for all scrapers. Tests both recent posts and backlog fetching with actual data. """ import os import sys import json import time from pathlib import Path from datetime import datetime import argparse from dotenv import load_dotenv # Add src to path sys.path.insert(0, str(Path(__file__).parent)) from src.base_scraper import ScraperConfig from src.wordpress_scraper import WordPressScraper from src.rss_scraper import RSSScraperMailChimp, RSSScraperPodcast from src.youtube_scraper import YouTubeScraper from src.instagram_scraper import InstagramScraper from src.tiktok_scraper_advanced import TikTokScraperAdvanced def test_scraper(scraper_class, scraper_name, max_items=3, test_type="recent"): """Test a single scraper with real data.""" print(f"\n{'='*60}") print(f"Testing {scraper_name} - {test_type} ({max_items} items)") print('='*60) # Create test directories test_data_dir = Path(f"test_data/{test_type}") test_logs_dir = Path(f"test_logs/{test_type}") config = ScraperConfig( source_name=scraper_name.lower().replace(" ", "_"), brand_name="hvacknowitall", data_dir=test_data_dir, logs_dir=test_logs_dir, timezone="America/Halifax" ) try: # Initialize scraper scraper = scraper_class(config) # For backlog testing, clear state to fetch all items if test_type == "backlog": if scraper.state_file.exists(): scraper.state_file.unlink() print(f"Cleared state for {scraper_name} backlog testing") # Fetch content with limit print(f"Fetching content from {scraper_name}...") start_time = time.time() # For scrapers that support max_items parameter if scraper_name in ["YouTube", "Instagram", "TikTok"]: if scraper_name == "YouTube": items = scraper.fetch_channel_videos(max_videos=max_items) elif scraper_name == "Instagram": items = scraper.fetch_content(max_posts=max_items) elif scraper_name == "TikTok": # For TikTok, optionally fetch captions (only in backlog mode for testing) fetch_captions = (test_type == "backlog" and max_items <= 5) if fetch_captions: print(f" Note: Fetching captions for up to {min(max_items, 3)} videos...") items = scraper.fetch_content( max_posts=max_items, fetch_captions=fetch_captions, max_caption_fetches=min(max_items, 3) # Limit to 3 for testing ) else: # For RSS and WordPress scrapers - all now support max_items items = scraper.fetch_content(max_items=max_items) elapsed = time.time() - start_time if not items: print(f"❌ No items fetched from {scraper_name}") return False print(f"✅ Fetched {len(items)} items in {elapsed:.2f} seconds") # Format as markdown markdown = scraper.format_markdown(items) # Save to test file output_file = test_data_dir / f"{scraper_name.lower()}_{test_type}_test.md" output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, 'w', encoding='utf-8') as f: f.write(markdown) print(f"✅ Saved to {output_file}") # Display summary print(f"\nSummary for {scraper_name}:") print(f" - Items fetched: {len(items)}") print(f" - Time taken: {elapsed:.2f}s") print(f" - Output size: {len(markdown)} characters") # Display first item details if items: first_item = items[0] print(f"\nFirst item preview:") # Display relevant fields based on scraper type if 'title' in first_item: title = first_item.get('title', 'N/A') # Handle WordPress nested title structure if isinstance(title, dict): title = title.get('rendered', 'N/A') print(f" Title: {str(title)[:80]}") if 'description' in first_item: desc = first_item.get('description', 'N/A') if desc: print(f" Description: {desc[:80]}...") if 'caption' in first_item: caption = first_item.get('caption', 'N/A') if caption: print(f" Caption: {caption[:80]}...") if 'author' in first_item: print(f" Author: {first_item.get('author', 'N/A')}") if 'channel' in first_item: print(f" Channel: {first_item.get('channel', 'N/A')}") if 'publish_date' in first_item: print(f" Date: {first_item.get('publish_date', 'N/A')}") elif 'date' in first_item: print(f" Date: {first_item.get('date', 'N/A')}") if 'link' in first_item: print(f" Link: {first_item.get('link', 'N/A')[:80]}") elif 'url' in first_item: print(f" URL: {first_item.get('url', 'N/A')[:80]}") return True except Exception as e: print(f"❌ Error testing {scraper_name}: {e}") import traceback traceback.print_exc() return False def run_all_tests(max_items=3, test_type="recent"): """Run tests for all configured scrapers.""" print(f"\n{'#'*60}") print(f"# Running {test_type} tests with {max_items} items per source") print(f"{'#'*60}") results = {} # Test WordPress if os.getenv('WORDPRESS_API_URL'): print("\n🔧 Testing WordPress Scraper") results['WordPress'] = test_scraper(WordPressScraper, "WordPress", max_items, test_type) else: print("\n⚠️ WordPress not configured (WORDPRESS_API_URL missing)") # Test MailChimp RSS if os.getenv('MAILCHIMP_RSS_URL'): print("\n🔧 Testing MailChimp RSS Scraper") results['MailChimp'] = test_scraper(RSSScraperMailChimp, "MailChimp", max_items, test_type) else: print("\n⚠️ MailChimp RSS not configured (MAILCHIMP_RSS_URL missing)") # Test Podcast RSS if os.getenv('PODCAST_RSS_URL'): print("\n🔧 Testing Podcast RSS Scraper") results['Podcast'] = test_scraper(RSSScraperPodcast, "Podcast", max_items, test_type) else: print("\n⚠️ Podcast RSS not configured (PODCAST_RSS_URL missing)") # Test YouTube if os.getenv('YOUTUBE_CHANNEL_URL'): print("\n🔧 Testing YouTube Scraper") results['YouTube'] = test_scraper(YouTubeScraper, "YouTube", max_items, test_type) else: print("\n⚠️ YouTube not configured (YOUTUBE_CHANNEL_URL missing)") # Test Instagram if os.getenv('INSTAGRAM_USERNAME'): print("\n🔧 Testing Instagram Scraper") print("⚠️ Note: Instagram may require manual login or rate limiting") results['Instagram'] = test_scraper(InstagramScraper, "Instagram", max_items, test_type) else: print("\n⚠️ Instagram not configured (INSTAGRAM_USERNAME missing)") # Test TikTok if os.getenv('TIKTOK_USERNAME'): print("\n🔧 Testing TikTok Scraper (Advanced with Headed Browser)") print("⚠️ Note: TikTok will open a browser window on DISPLAY=:0") results['TikTok'] = test_scraper(TikTokScraperAdvanced, "TikTok", max_items, test_type) else: print("\n⚠️ TikTok not configured (TIKTOK_USERNAME missing)") # Print summary print(f"\n{'='*60}") print(f"TEST SUMMARY - {test_type} ({max_items} items)") print('='*60) for scraper, success in results.items(): status = "✅ PASSED" if success else "❌ FAILED" print(f"{scraper:15} {status}") total = len(results) passed = sum(1 for s in results.values() if s) print(f"\nTotal: {passed}/{total} passed") return all(results.values()) def main(): """Main entry point.""" parser = argparse.ArgumentParser(description="Test scrapers with real data") parser.add_argument('--items', type=int, default=3, help='Number of items to fetch per source (default: 3)') parser.add_argument('--type', choices=['recent', 'backlog', 'both'], default='recent', help='Test type: recent posts, backlog, or both (default: recent)') parser.add_argument('--source', type=str, default=None, help='Test specific source only (wordpress, mailchimp, podcast, youtube, instagram, tiktok)') args = parser.parse_args() # Load environment variables load_dotenv() # Determine which tests to run test_types = [] if args.type == 'both': test_types = ['recent', 'backlog'] else: test_types = [args.type] all_passed = True for test_type in test_types: if args.source: # Test specific source source_map = { 'wordpress': (WordPressScraper, "WordPress"), 'mailchimp': (RSSScraperMailChimp, "MailChimp"), 'podcast': (RSSScraperPodcast, "Podcast"), 'youtube': (YouTubeScraper, "YouTube"), 'instagram': (InstagramScraper, "Instagram"), 'tiktok': (TikTokScraperAdvanced, "TikTok") } if args.source.lower() in source_map: scraper_class, scraper_name = source_map[args.source.lower()] success = test_scraper(scraper_class, scraper_name, args.items, test_type) all_passed = all_passed and success else: print(f"Unknown source: {args.source}") all_passed = False else: # Test all sources success = run_all_tests(args.items, test_type) all_passed = all_passed and success # Exit with appropriate code sys.exit(0 if all_passed else 1) if __name__ == "__main__": main()