diff --git a/README.md b/README.md index 2e9d072..28e40fc 100644 --- a/README.md +++ b/README.md @@ -5,10 +5,11 @@ A containerized Python application that aggregates content from multiple HVAC Kn ## Features - **Multi-source content aggregation** from YouTube, Instagram, TikTok, MailChimp, WordPress, and Podcast RSS +- **Comprehensive image downloading** for all visual content (Instagram posts, YouTube thumbnails, Podcast artwork) - **Cumulative markdown management** - Single source-of-truth files that grow with backlog and incremental updates - **API integrations** for YouTube Data API v3 and MailChimp API - **Intelligent content merging** with caption/transcript updates and metric tracking -- **Automated NAS synchronization** to `/mnt/nas/hvacknowitall/` +- **Automated NAS synchronization** to `/mnt/nas/hvacknowitall/` for both markdown and media files - **State management** for incremental updates - **Parallel processing** for multiple sources - **Atlantic timezone** (America/Halifax) timestamps @@ -112,6 +113,10 @@ data/ │ ├── Instagram/ │ └── ... ├── media/ # Downloaded media files +│ ├── Instagram/ # Instagram images and video thumbnails +│ ├── YouTube/ # YouTube video thumbnails +│ ├── Podcast/ # Podcast episode artwork +│ └── ... └── .state/ # State files for incremental updates logs/ # Log files by source @@ -139,10 +144,10 @@ tests/ # Test files ### Systemd Services Services are configured in `/etc/systemd/system/`: -- `hvac-content-8am.service` - Morning run -- `hvac-content-12pm.service` - Noon run -- `hvac-content-8am.timer` - Morning schedule -- `hvac-content-12pm.timer` - Noon schedule +- `hvac-content-images-8am.service` - Morning run with image downloads +- `hvac-content-images-12pm.service` - Noon run with image downloads +- `hvac-content-images-8am.timer` - Morning schedule (8 AM Atlantic) +- `hvac-content-images-12pm.timer` - Noon schedule (12 PM Atlantic) ### Manual Deployment @@ -207,6 +212,33 @@ tail -f logs/YouTube/youtube_*.log uv run python -m src.youtube_api_scraper_v2 --test ``` +## Recent Updates (2025-08-19) + +### Comprehensive Image Downloading +- Implemented full image download capability for all content sources +- Instagram: Downloads all post images, carousel images, and video thumbnails +- YouTube: Automatically fetches highest quality video thumbnails +- Podcasts: Downloads episode artwork and thumbnails +- Consistent naming: `{source}_{item_id}_{type}.{ext}` +- Media organized in `data/media/{source}/` directories + +### File Naming Standardization +- Migrated to project specification compliant naming +- Format: `__.md` +- Example: `hvacnkowitall_instagram_2025-08-19T100511.md` +- Archived legacy file structures to `markdown_archives/legacy_structure/` + +### Instagram Backlog Expansion +- Completed initial 1000 posts capture with images +- Currently capturing posts 1001-2000 with rate limiting +- Cumulative markdown updates every 100 posts +- Full image download for all historical content + +### Production Automation +- Deployed systemd services for twice-daily runs (8 AM, 12 PM Atlantic) +- Automated NAS synchronization for markdown and media files +- Rate-limited scraping with humanized delays (10-20 seconds per Instagram post) + ## License Private repository - All rights reserved \ No newline at end of file diff --git a/deploy/hvac-content-images-12pm.service b/deploy/hvac-content-images-12pm.service new file mode 100644 index 0000000..77bd7f8 --- /dev/null +++ b/deploy/hvac-content-images-12pm.service @@ -0,0 +1,18 @@ +[Unit] +Description=HVAC Content Aggregation with Images - 12 PM Run +After=network.target + +[Service] +Type=oneshot +User=ben +Group=ben +WorkingDirectory=/home/ben/dev/hvac-kia-content +Environment="PATH=/home/ben/.local/bin:/usr/local/bin:/usr/bin:/bin" +Environment="DISPLAY=:0" +Environment="XAUTHORITY=/run/user/1000/.mutter-Xwaylandauth.90WDB3" +ExecStart=/usr/bin/bash -c 'source /home/ben/dev/hvac-kia-content/.venv/bin/activate && python run_production_with_images.py' +StandardOutput=journal +StandardError=journal + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/deploy/hvac-content-images-8am.service b/deploy/hvac-content-images-8am.service new file mode 100644 index 0000000..7920750 --- /dev/null +++ b/deploy/hvac-content-images-8am.service @@ -0,0 +1,18 @@ +[Unit] +Description=HVAC Content Aggregation with Images - 8 AM Run +After=network.target + +[Service] +Type=oneshot +User=ben +Group=ben +WorkingDirectory=/home/ben/dev/hvac-kia-content +Environment="PATH=/home/ben/.local/bin:/usr/local/bin:/usr/bin:/bin" +Environment="DISPLAY=:0" +Environment="XAUTHORITY=/run/user/1000/.mutter-Xwaylandauth.90WDB3" +ExecStart=/usr/bin/bash -c 'source /home/ben/dev/hvac-kia-content/.venv/bin/activate && python run_production_with_images.py' +StandardOutput=journal +StandardError=journal + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/deploy/update_to_images.sh b/deploy/update_to_images.sh new file mode 100755 index 0000000..d224238 --- /dev/null +++ b/deploy/update_to_images.sh @@ -0,0 +1,74 @@ +#!/bin/bash +# Update script to enable image downloading in production + +echo "Updating HVAC Content Aggregation to include image downloads..." +echo + +# Stop and disable old services +echo "Stopping old services..." +sudo systemctl stop hvac-content-8am.timer hvac-content-12pm.timer +sudo systemctl disable hvac-content-8am.service hvac-content-12pm.service +sudo systemctl disable hvac-content-8am.timer hvac-content-12pm.timer + +# Copy new service files +echo "Installing new services with image downloads..." +sudo cp hvac-content-images-8am.service /etc/systemd/system/ +sudo cp hvac-content-images-12pm.service /etc/systemd/system/ + +# Create new timer files (reuse existing timers with new names) +sudo tee /etc/systemd/system/hvac-content-images-8am.timer > /dev/null < /dev/null < high > medium > default quality + +### Podcasts +- **Episode thumbnails**: iTunes artwork and media thumbnails for each episode +- **Formats**: PNG/JPEG episode artwork + +## File Naming Convention + +All downloaded images follow a consistent naming pattern: +``` +{source}_{item_id}_{type}_{optional_number}.{ext} +``` + +Examples: +- `instagram_Cm1wgRMr_mj_video_thumb.jpg` +- `instagram_CpgiKyqPoX1_image_1.jpg` +- `youtube_dQw4w9WgXcQ_thumbnail.jpg` +- `podcast_episode123_thumbnail.png` + +## Directory Structure + +``` +data/ +├── media/ +│ ├── Instagram/ +│ │ ├── instagram_post1_image.jpg +│ │ └── instagram_post2_video_thumb.jpg +│ ├── YouTube/ +│ │ ├── youtube_video1_thumbnail.jpg +│ │ └── youtube_video2_thumbnail.jpg +│ └── Podcast/ +│ ├── podcast_ep1_thumbnail.png +│ └── podcast_ep2_thumbnail.jpg +└── markdown_current/ + ├── hvacnkowitall_instagram_*.md + ├── hvacnkowitall_youtube_*.md + └── hvacnkowitall_podcast_*.md +``` + +## Enhanced Scrapers + +### InstagramScraperWithImages +- Extends `InstagramScraper` +- Downloads all non-video media +- Handles carousel posts with multiple images +- Stores local paths in `local_images` field + +### YouTubeAPIScraperWithThumbnails +- Extends `YouTubeAPIScraper` +- Downloads video thumbnails +- Selects highest quality available +- Stores local path in `local_thumbnail` field + +### RSSScraperPodcastWithImages +- Extends `RSSScraperPodcast` +- Downloads episode thumbnails +- Extracts from iTunes metadata +- Stores local path in `local_thumbnail` field + +## Production Scripts + +### run_production_with_images.py +Main production script that: +1. Runs all enhanced scrapers +2. Downloads images during content fetching +3. Updates cumulative markdown files +4. Syncs both markdown and images to NAS + +### Test Script +`test_image_downloads.py` - Tests image downloading with small batches: +- 3 YouTube videos +- 3 Instagram posts +- 3 Podcast episodes + +## NAS Synchronization + +The rsync function has been enhanced to sync images: + +```python +# Sync markdown files +rsync -av --include=*.md --exclude=* data/markdown_current/ /mnt/nas/hvacknowitall/markdown_current/ + +# Sync image files +rsync -av --include=*/ --include=*.jpg --include=*.jpeg --include=*.png --include=*.gif --exclude=* data/media/ /mnt/nas/hvacknowitall/media/ +``` + +## Markdown Integration + +Downloaded images are referenced in markdown files: + +```markdown +## Thumbnail: +![Thumbnail](media/YouTube/youtube_videoId_thumbnail.jpg) + +## Downloaded Images: +- [image1.jpg](media/Instagram/instagram_postId_image_1.jpg) +- [image2.jpg](media/Instagram/instagram_postId_image_2.jpg) +``` + +## Rate Limiting Considerations + +- **Instagram**: Aggressive delays between image downloads (10-20 seconds) +- **YouTube**: Minimal delays, respects API quota +- **Podcast**: No rate limiting needed for RSS feeds + +## Storage Estimates + +Based on testing: +- **Instagram**: ~70-100 KB per image +- **YouTube**: ~100-200 KB per thumbnail +- **Podcast**: ~3-4 MB per episode thumbnail (high quality artwork) + +For 1000 items per source: +- Instagram: ~100 MB (assuming 1 image per post) +- YouTube: ~200 MB +- Podcast: ~4 GB (if all episodes have artwork) + +## Usage + +### Test Image Downloads +```bash +python test_image_downloads.py +``` + +### Production Run with Images +```bash +python run_production_with_images.py +``` + +### Check Downloaded Images +```bash +# Count images per source +find data/media -name "*.jpg" -o -name "*.png" | wc -l + +# Check disk usage +du -sh data/media/* +``` + +## Configuration + +No additional configuration needed. The system uses existing environment variables: +- Instagram credentials for authenticated image access +- YouTube API key (thumbnails are public) +- Podcast RSS URL (thumbnails in feed metadata) + +## Future Enhancements + +Potential improvements: +1. Image optimization/compression to reduce storage +2. Configurable image quality settings +3. Option to download video files (currently excluded) +4. Thumbnail generation for videos without thumbnails +5. Image deduplication for repeated content + +## Troubleshooting + +### Images Not Downloading +- Check network connectivity +- Verify source credentials (Instagram) +- Check disk space +- Review logs for HTTP errors + +### Rate Limiting +- Instagram may block rapid downloads +- Use aggressive delays in scraper +- Consider batching downloads + +### Storage Issues +- Monitor disk usage +- Consider external storage for media +- Implement rotation/archiving strategy \ No newline at end of file diff --git a/run_instagram_next_1000.py b/run_instagram_next_1000.py new file mode 100755 index 0000000..73f194f --- /dev/null +++ b/run_instagram_next_1000.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python3 +""" +Fetch the next 1000 Instagram posts (1001-2000) and update cumulative file. +""" + +import sys +from pathlib import Path +sys.path.insert(0, str(Path(__file__).parent)) + +from src.instagram_scraper_with_images import InstagramScraperWithImages +from src.base_scraper import ScraperConfig +from src.cumulative_markdown_manager import CumulativeMarkdownManager +from datetime import datetime +import pytz +import time +import logging +import instaloader + +# Set up logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('logs/instagram_next_1000.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger('instagram_next_1000') + + +def fetch_next_1000_posts(): + """Fetch Instagram posts 1001-2000 and update cumulative file.""" + + logger.info("=" * 60) + logger.info("INSTAGRAM NEXT 1000 POSTS (1001-2000)") + logger.info("=" * 60) + + # Get Atlantic timezone timestamp + tz = pytz.timezone('America/Halifax') + now = datetime.now(tz) + timestamp = now.strftime('%Y-%m-%dT%H%M%S') + + logger.info(f"Started at: {now.strftime('%Y-%m-%d %H:%M:%S %Z')}") + + # Setup config + config = ScraperConfig( + source_name='Instagram', + brand_name='hvacnkowitall', + data_dir=Path('data'), + logs_dir=Path('logs'), + timezone='America/Halifax' + ) + + # Initialize scraper + scraper = InstagramScraperWithImages(config) + cumulative_manager = CumulativeMarkdownManager(config) + + logger.info("Fetching posts 1001-2000 from Instagram...") + logger.info("This will take several hours due to rate limiting") + + all_items = [] + posts_to_skip = 1000 # We already have the first 1000 + max_posts = 1000 # We want the next 1000 + + try: + # Ensure we have a valid context + if not scraper.loader.context: + logger.error("Failed to initialize Instagram context") + return False + + # Get profile + profile = instaloader.Profile.from_username(scraper.loader.context, scraper.target_account) + scraper._check_rate_limit() + + # Get posts + posts = profile.get_posts() + + post_count = 0 + skipped = 0 + + for post in posts: + # Skip first 1000 posts + if skipped < posts_to_skip: + skipped += 1 + if skipped % 100 == 0: + logger.info(f"Skipping post {skipped}/{posts_to_skip}...") + continue + + # Stop after next 1000 + if post_count >= max_posts: + break + + try: + # Download images for this post + image_paths = scraper._download_post_images(post, post.shortcode) + + # Extract post data + post_data = { + 'id': post.shortcode, + 'type': scraper._get_post_type(post), + 'caption': post.caption if post.caption else '', + 'author': post.owner_username, + 'publish_date': post.date_utc.isoformat(), + 'link': f'https://www.instagram.com/p/{post.shortcode}/', + 'likes': post.likes, + 'comments': post.comments, + 'views': post.video_view_count if hasattr(post, 'video_view_count') else None, + 'media_count': post.mediacount if hasattr(post, 'mediacount') else 1, + 'hashtags': list(post.caption_hashtags) if post.caption else [], + 'mentions': list(post.caption_mentions) if post.caption else [], + 'is_video': getattr(post, 'is_video', False), + 'local_images': image_paths + } + + all_items.append(post_data) + post_count += 1 + + # Aggressive rate limiting + scraper._aggressive_delay() + scraper._check_rate_limit() + + # Progress updates + if post_count % 10 == 0: + logger.info(f"Fetched post {posts_to_skip + post_count} (#{post_count}/1000 in this batch)") + + # Save incremental updates every 100 posts + if post_count % 100 == 0: + logger.info(f"Saving incremental update at {post_count} posts...") + output_file = cumulative_manager.update_cumulative_file(all_items, 'Instagram') + logger.info(f"Saved to: {output_file}") + + except Exception as e: + logger.error(f"Error processing post: {e}") + continue + + # Final save + if all_items: + output_file = cumulative_manager.update_cumulative_file(all_items, 'Instagram') + + # Calculate statistics + img_count = sum(len(item.get('local_images', [])) for item in all_items) + + logger.info("=" * 60) + logger.info("INSTAGRAM NEXT 1000 COMPLETED") + logger.info("=" * 60) + logger.info(f"Posts fetched: {len(all_items)}") + logger.info(f"Post range: 1001-{1000 + len(all_items)}") + logger.info(f"Images downloaded: {img_count}") + logger.info(f"Output file: {output_file}") + logger.info("=" * 60) + + return True + else: + logger.warning("No posts fetched") + return False + + except Exception as e: + logger.error(f"Fatal error: {e}") + import traceback + traceback.print_exc() + return False + + +if __name__ == "__main__": + success = fetch_next_1000_posts() + sys.exit(0 if success else 1) \ No newline at end of file diff --git a/run_production_cumulative.py b/run_production_cumulative.py new file mode 100644 index 0000000..9d58c32 --- /dev/null +++ b/run_production_cumulative.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python3 +""" +Production script with cumulative markdown and image downloads. +Uses cumulative updates for all sources. +""" + +import sys +from pathlib import Path +sys.path.insert(0, str(Path(__file__).parent)) + +from src.youtube_api_scraper_with_thumbnails import YouTubeAPIScraperWithThumbnails +from src.mailchimp_api_scraper_v2 import MailChimpAPIScraper +from src.instagram_scraper_cumulative import InstagramScraperCumulative +from src.rss_scraper_with_images import RSSScraperPodcastWithImages +from src.wordpress_scraper import WordPressScraper +from src.tiktok_scraper_advanced import TikTokScraperAdvanced +from src.base_scraper import ScraperConfig +from src.cumulative_markdown_manager import CumulativeMarkdownManager +from datetime import datetime +import pytz +import time +import logging +import subprocess +import os + +# Set up logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('logs/production_cumulative.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger('production_cumulative') + + +def get_atlantic_timestamp() -> str: + """Get current timestamp in Atlantic timezone for file naming.""" + tz = pytz.timezone('America/Halifax') + return datetime.now(tz).strftime('%Y-%m-%dT%H%M%S') + + +def run_instagram_incremental(): + """Run Instagram incremental update with cumulative markdown.""" + logger.info("=" * 60) + logger.info("INSTAGRAM INCREMENTAL UPDATE (CUMULATIVE)") + logger.info("=" * 60) + + if not os.getenv('INSTAGRAM_USERNAME'): + logger.warning("Instagram not configured") + return False, 0, None + + config = ScraperConfig( + source_name='Instagram', + brand_name='hvacnkowitall', + data_dir=Path('data'), + logs_dir=Path('logs'), + timezone='America/Halifax' + ) + + try: + scraper = InstagramScraperCumulative(config) + return scraper.run_incremental(max_posts=50) # Check for 50 new posts + except Exception as e: + logger.error(f"Instagram error: {e}") + return False, 0, None + + +def run_youtube_incremental(): + """Run YouTube incremental update with thumbnails.""" + logger.info("=" * 60) + logger.info("YOUTUBE INCREMENTAL UPDATE") + logger.info("=" * 60) + + config = ScraperConfig( + source_name='YouTube', + brand_name='hvacnkowitall', + data_dir=Path('data'), + logs_dir=Path('logs'), + timezone='America/Halifax' + ) + + try: + scraper = YouTubeAPIScraperWithThumbnails(config) + videos = scraper.fetch_content(max_posts=20) # Check for 20 new videos + + if videos: + manager = CumulativeMarkdownManager(config) + output_file = manager.update_cumulative_file(videos, 'YouTube') + + thumb_count = sum(1 for v in videos if v.get('local_thumbnail')) + logger.info(f"✅ YouTube: {len(videos)} videos, {thumb_count} thumbnails") + return True, len(videos), output_file + else: + logger.info("No new YouTube videos") + return False, 0, None + + except Exception as e: + logger.error(f"YouTube error: {e}") + return False, 0, None + + +def run_podcast_incremental(): + """Run Podcast incremental update with thumbnails.""" + logger.info("=" * 60) + logger.info("PODCAST INCREMENTAL UPDATE") + logger.info("=" * 60) + + if not os.getenv('PODCAST_RSS_URL'): + logger.warning("Podcast not configured") + return False, 0, None + + config = ScraperConfig( + source_name='Podcast', + brand_name='hvacnkowitall', + data_dir=Path('data'), + logs_dir=Path('logs'), + timezone='America/Halifax' + ) + + try: + scraper = RSSScraperPodcastWithImages(config) + items = scraper.fetch_content(max_items=10) # Check for 10 new episodes + + if items: + manager = CumulativeMarkdownManager(config) + output_file = manager.update_cumulative_file(items, 'Podcast') + + thumb_count = sum(1 for item in items if item.get('local_thumbnail')) + logger.info(f"✅ Podcast: {len(items)} episodes, {thumb_count} thumbnails") + return True, len(items), output_file + else: + logger.info("No new podcast episodes") + return False, 0, None + + except Exception as e: + logger.error(f"Podcast error: {e}") + return False, 0, None + + +def sync_to_nas_with_images(): + """Sync markdown files AND images to NAS.""" + logger.info("\n" + "=" * 60) + logger.info("SYNCING TO NAS - MARKDOWN AND IMAGES") + logger.info("=" * 60) + + nas_base = Path('/mnt/nas/hvacknowitall') + + try: + # Sync markdown files + local_current = Path('data/markdown_current') + nas_current = nas_base / 'markdown_current' + + if local_current.exists() and any(local_current.glob('*.md')): + nas_current.mkdir(parents=True, exist_ok=True) + + cmd = ['rsync', '-av', '--include=*.md', '--exclude=*', + str(local_current) + '/', str(nas_current) + '/'] + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode == 0: + logger.info(f"✅ Markdown files synced to NAS") + else: + logger.warning(f"Markdown sync warning: {result.stderr}") + + # Sync media files + local_media = Path('data/media') + nas_media = nas_base / 'media' + + if local_media.exists(): + nas_media.mkdir(parents=True, exist_ok=True) + + cmd = ['rsync', '-av', + '--include=*/', + '--include=*.jpg', '--include=*.jpeg', + '--include=*.png', '--include=*.gif', + '--exclude=*', + str(local_media) + '/', str(nas_media) + '/'] + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode == 0: + logger.info(f"✅ Media files synced to NAS") + + except Exception as e: + logger.error(f"Failed to sync to NAS: {e}") + + +def main(): + """Main production run with cumulative updates and images.""" + logger.info("=" * 70) + logger.info("HVAC KNOW IT ALL - CUMULATIVE PRODUCTION") + logger.info("With Image Downloads and Cumulative Markdown") + logger.info("=" * 70) + + atlantic_tz = pytz.timezone('America/Halifax') + start_time = datetime.now(atlantic_tz) + logger.info(f"Started at: {start_time.isoformat()}") + + # Track results + results = {} + + # Run incremental updates + success, count, file = run_instagram_incremental() + results['Instagram'] = {'success': success, 'count': count, 'file': file} + + time.sleep(2) + + success, count, file = run_youtube_incremental() + results['YouTube'] = {'success': success, 'count': count, 'file': file} + + time.sleep(2) + + success, count, file = run_podcast_incremental() + results['Podcast'] = {'success': success, 'count': count, 'file': file} + + # Also run MailChimp (already has cumulative support) + # ... (add MailChimp, WordPress, TikTok as needed) + + # Sync to NAS + sync_to_nas_with_images() + + # Summary + logger.info("\n" + "=" * 60) + logger.info("PRODUCTION SUMMARY") + logger.info("=" * 60) + + for source, result in results.items(): + if result['success']: + logger.info(f"✅ {source}: {result['count']} items") + else: + logger.info(f"ℹ️ {source}: No new items") + + logger.info("=" * 60) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/run_production_with_images.py b/run_production_with_images.py new file mode 100644 index 0000000..d47881c --- /dev/null +++ b/run_production_with_images.py @@ -0,0 +1,344 @@ +#!/usr/bin/env python3 +""" +Production script with comprehensive image downloading for all sources. +Downloads thumbnails and images from Instagram, YouTube, and Podcasts. +""" + +import sys +from pathlib import Path +sys.path.insert(0, str(Path(__file__).parent)) + +from src.youtube_api_scraper_with_thumbnails import YouTubeAPIScraperWithThumbnails +from src.mailchimp_api_scraper_v2 import MailChimpAPIScraper +from src.instagram_scraper_with_images import InstagramScraperWithImages +from src.rss_scraper_with_images import RSSScraperPodcastWithImages +from src.wordpress_scraper import WordPressScraper +from src.tiktok_scraper_advanced import TikTokScraperAdvanced +from src.base_scraper import ScraperConfig +from src.cumulative_markdown_manager import CumulativeMarkdownManager +from datetime import datetime +import pytz +import time +import logging +import subprocess +import os + +# Set up logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('logs/production_with_images.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger('production_with_images') + + +def get_atlantic_timestamp() -> str: + """Get current timestamp in Atlantic timezone for file naming.""" + tz = pytz.timezone('America/Halifax') + return datetime.now(tz).strftime('%Y-%m-%dT%H%M%S') + + +def run_youtube_with_thumbnails(): + """Run YouTube API scraper with thumbnail downloads.""" + logger.info("=" * 60) + logger.info("YOUTUBE API SCRAPER WITH THUMBNAILS") + logger.info("=" * 60) + + timestamp = get_atlantic_timestamp() + + config = ScraperConfig( + source_name='YouTube', + brand_name='hvacnkowitall', + data_dir=Path('data'), + logs_dir=Path('logs'), + timezone='America/Halifax' + ) + + try: + scraper = YouTubeAPIScraperWithThumbnails(config) + + # Fetch videos with thumbnails + logger.info("Fetching YouTube videos and downloading thumbnails...") + videos = scraper.fetch_content(max_posts=100) # Limit for testing + + if videos: + # Process cumulative markdown + manager = CumulativeMarkdownManager(config) + output_file = manager.update_cumulative_file(videos, 'YouTube') + + logger.info(f"✅ YouTube completed: {len(videos)} videos") + logger.info(f" Output: {output_file}") + + # Count downloaded thumbnails + thumb_count = sum(1 for v in videos if v.get('local_thumbnail')) + logger.info(f" Thumbnails downloaded: {thumb_count}") + + return True, len(videos), output_file + else: + logger.warning("No YouTube videos fetched") + return False, 0, None + + except Exception as e: + logger.error(f"YouTube scraper error: {e}") + import traceback + traceback.print_exc() + return False, 0, None + + +def run_instagram_with_images(): + """Run Instagram scraper with image downloads.""" + logger.info("=" * 60) + logger.info("INSTAGRAM SCRAPER WITH IMAGES") + logger.info("=" * 60) + + if not os.getenv('INSTAGRAM_USERNAME'): + logger.warning("Instagram not configured (INSTAGRAM_USERNAME missing)") + return False, 0, None + + timestamp = get_atlantic_timestamp() + + config = ScraperConfig( + source_name='Instagram', + brand_name='hvacnkowitall', + data_dir=Path('data'), + logs_dir=Path('logs'), + timezone='America/Halifax' + ) + + try: + scraper = InstagramScraperWithImages(config) + + # Fetch posts with images (limited for testing) + logger.info("Fetching Instagram posts and downloading images...") + items = scraper.fetch_content(max_posts=20) # Start with 20 for testing + + if items: + # Process cumulative markdown + manager = CumulativeMarkdownManager(config) + output_file = manager.update_cumulative_file(items, 'Instagram') + + logger.info(f"✅ Instagram completed: {len(items)} posts") + logger.info(f" Output: {output_file}") + + # Count downloaded images + img_count = sum(len(item.get('local_images', [])) for item in items) + logger.info(f" Images downloaded: {img_count}") + + return True, len(items), output_file + else: + logger.warning("No Instagram posts fetched") + return False, 0, None + + except Exception as e: + logger.error(f"Instagram scraper error: {e}") + import traceback + traceback.print_exc() + return False, 0, None + + +def run_podcast_with_thumbnails(): + """Run Podcast RSS scraper with thumbnail downloads.""" + logger.info("=" * 60) + logger.info("PODCAST RSS SCRAPER WITH THUMBNAILS") + logger.info("=" * 60) + + if not os.getenv('PODCAST_RSS_URL'): + logger.warning("Podcast not configured (PODCAST_RSS_URL missing)") + return False, 0, None + + timestamp = get_atlantic_timestamp() + + config = ScraperConfig( + source_name='Podcast', + brand_name='hvacnkowitall', + data_dir=Path('data'), + logs_dir=Path('logs'), + timezone='America/Halifax' + ) + + try: + scraper = RSSScraperPodcastWithImages(config) + + # Fetch episodes with thumbnails + logger.info("Fetching podcast episodes and downloading thumbnails...") + items = scraper.fetch_content(max_items=50) # Limit for testing + + if items: + # Process cumulative markdown + manager = CumulativeMarkdownManager(config) + output_file = manager.update_cumulative_file(items, 'Podcast') + + logger.info(f"✅ Podcast completed: {len(items)} episodes") + logger.info(f" Output: {output_file}") + + # Count downloaded thumbnails + thumb_count = sum(1 for item in items if item.get('local_thumbnail')) + logger.info(f" Thumbnails downloaded: {thumb_count}") + + return True, len(items), output_file + else: + logger.warning("No podcast episodes fetched") + return False, 0, None + + except Exception as e: + logger.error(f"Podcast scraper error: {e}") + import traceback + traceback.print_exc() + return False, 0, None + + +def sync_to_nas_with_images(): + """Sync markdown files AND images to NAS.""" + logger.info("\n" + "=" * 60) + logger.info("SYNCING TO NAS - MARKDOWN AND IMAGES") + logger.info("=" * 60) + + nas_base = Path('/mnt/nas/hvacknowitall') + + try: + # Sync markdown files + local_current = Path('data/markdown_current') + nas_current = nas_base / 'markdown_current' + + if local_current.exists() and any(local_current.glob('*.md')): + nas_current.mkdir(parents=True, exist_ok=True) + + # Sync markdown files + cmd = ['rsync', '-av', '--include=*.md', '--exclude=*', + str(local_current) + '/', str(nas_current) + '/'] + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode == 0: + logger.info(f"✅ Markdown files synced to NAS: {nas_current}") + md_count = len(list(nas_current.glob('*.md'))) + logger.info(f" Total markdown files: {md_count}") + else: + logger.warning(f"Markdown sync warning: {result.stderr}") + + # Sync media files + local_media = Path('data/media') + nas_media = nas_base / 'media' + + if local_media.exists(): + nas_media.mkdir(parents=True, exist_ok=True) + + # Sync all image files (jpg, jpeg, png, gif) + cmd = ['rsync', '-av', + '--include=*/', + '--include=*.jpg', '--include=*.jpeg', + '--include=*.png', '--include=*.gif', + '--exclude=*', + str(local_media) + '/', str(nas_media) + '/'] + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode == 0: + logger.info(f"✅ Media files synced to NAS: {nas_media}") + + # Count images per source + for source_dir in nas_media.glob('*'): + if source_dir.is_dir(): + img_count = len(list(source_dir.glob('*.jpg'))) + \ + len(list(source_dir.glob('*.jpeg'))) + \ + len(list(source_dir.glob('*.png'))) + \ + len(list(source_dir.glob('*.gif'))) + if img_count > 0: + logger.info(f" {source_dir.name}: {img_count} images") + else: + logger.warning(f"Media sync warning: {result.stderr}") + + # Sync archives + for source in ['YouTube', 'MailChimp', 'Instagram', 'Podcast', 'WordPress', 'TikTok']: + local_archive = Path(f'data/markdown_archives/{source}') + nas_archive = nas_base / f'markdown_archives/{source}' + + if local_archive.exists() and any(local_archive.glob('*.md')): + nas_archive.mkdir(parents=True, exist_ok=True) + + cmd = ['rsync', '-av', '--include=*.md', '--exclude=*', + str(local_archive) + '/', str(nas_archive) + '/'] + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode == 0: + logger.info(f"✅ {source} archives synced to NAS") + + except Exception as e: + logger.error(f"Failed to sync to NAS: {e}") + + +def main(): + """Main production run with image downloads.""" + logger.info("=" * 70) + logger.info("HVAC KNOW IT ALL - PRODUCTION WITH IMAGE DOWNLOADS") + logger.info("Downloads all thumbnails and images (no videos)") + logger.info("=" * 70) + + atlantic_tz = pytz.timezone('America/Halifax') + start_time = datetime.now(atlantic_tz) + logger.info(f"Started at: {start_time.isoformat()}") + + # Track results + results = { + 'YouTube': {'success': False, 'count': 0, 'file': None}, + 'Instagram': {'success': False, 'count': 0, 'file': None}, + 'Podcast': {'success': False, 'count': 0, 'file': None} + } + + # Run YouTube with thumbnails + success, count, output_file = run_youtube_with_thumbnails() + results['YouTube'] = {'success': success, 'count': count, 'file': output_file} + + # Wait a bit between scrapers + time.sleep(2) + + # Run Instagram with images + success, count, output_file = run_instagram_with_images() + results['Instagram'] = {'success': success, 'count': count, 'file': output_file} + + # Wait a bit between scrapers + time.sleep(2) + + # Run Podcast with thumbnails + success, count, output_file = run_podcast_with_thumbnails() + results['Podcast'] = {'success': success, 'count': count, 'file': output_file} + + # Sync to NAS including images + sync_to_nas_with_images() + + # Summary + end_time = datetime.now(atlantic_tz) + duration = (end_time - start_time).total_seconds() + + logger.info("\n" + "=" * 60) + logger.info("PRODUCTION RUN SUMMARY") + logger.info("=" * 60) + + for source, result in results.items(): + if result['success']: + logger.info(f"✅ {source}: {result['count']} items") + if result['file']: + logger.info(f" File: {result['file']}") + else: + logger.info(f"❌ {source}: Failed") + + # Count total images downloaded + media_dir = Path('data/media') + total_images = 0 + if media_dir.exists(): + for source_dir in media_dir.glob('*'): + if source_dir.is_dir(): + img_count = len(list(source_dir.glob('*.jpg'))) + \ + len(list(source_dir.glob('*.jpeg'))) + \ + len(list(source_dir.glob('*.png'))) + \ + len(list(source_dir.glob('*.gif'))) + total_images += img_count + + logger.info(f"\nTotal images downloaded: {total_images}") + logger.info(f"Duration: {duration:.1f} seconds") + logger.info("=" * 60) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/instagram_scraper_cumulative.py b/src/instagram_scraper_cumulative.py new file mode 100644 index 0000000..c762918 --- /dev/null +++ b/src/instagram_scraper_cumulative.py @@ -0,0 +1,116 @@ +""" +Instagram scraper with cumulative markdown support and image downloads. +""" + +from typing import List, Dict, Any +from pathlib import Path +from src.instagram_scraper_with_images import InstagramScraperWithImages +from src.cumulative_markdown_manager import CumulativeMarkdownManager + + +class InstagramScraperCumulative(InstagramScraperWithImages): + """Instagram scraper that uses cumulative markdown management.""" + + def __init__(self, config): + super().__init__(config) + self.cumulative_manager = CumulativeMarkdownManager(config) + + def run_incremental(self, max_posts: int = 50) -> tuple: + """Run incremental update with cumulative markdown.""" + self.logger.info(f"Running Instagram incremental update (max {max_posts} posts)") + + # Fetch new content + items = self.fetch_content(max_posts=max_posts) + + if items: + # Update cumulative file + output_file = self.cumulative_manager.update_cumulative_file(items, 'Instagram') + + self.logger.info(f"✅ Instagram incremental: {len(items)} posts") + self.logger.info(f" Updated: {output_file}") + + # Count images + img_count = sum(len(item.get('local_images', [])) for item in items) + if img_count > 0: + self.logger.info(f" Images downloaded: {img_count}") + + return True, len(items), output_file + else: + self.logger.warning("No new Instagram posts found") + return False, 0, None + + def run_backlog(self, start_from: int = 0, max_posts: int = 1000) -> tuple: + """Run backlog capture starting from a specific post number.""" + self.logger.info(f"Running Instagram backlog (posts {start_from} to {start_from + max_posts})") + + # For backlog, we need to skip already captured posts + # This is a simplified approach - in production you'd track exact post IDs + all_items = [] + + try: + # Get profile + profile = instaloader.Profile.from_username(self.loader.context, self.target_account) + self._check_rate_limit() + + # Get posts + posts = profile.get_posts() + + # Skip to start position + for i, post in enumerate(posts): + if i < start_from: + continue + if i >= start_from + max_posts: + break + + try: + # Download images for this post + image_paths = self._download_post_images(post, post.shortcode) + + # Extract post data + post_data = { + 'id': post.shortcode, + 'type': self._get_post_type(post), + 'caption': post.caption if post.caption else '', + 'author': post.owner_username, + 'publish_date': post.date_utc.isoformat(), + 'link': f'https://www.instagram.com/p/{post.shortcode}/', + 'likes': post.likes, + 'comments': post.comments, + 'views': post.video_view_count if hasattr(post, 'video_view_count') else None, + 'media_count': post.mediacount if hasattr(post, 'mediacount') else 1, + 'hashtags': list(post.caption_hashtags) if post.caption else [], + 'mentions': list(post.caption_mentions) if post.caption else [], + 'is_video': getattr(post, 'is_video', False), + 'local_images': image_paths + } + + all_items.append(post_data) + + # Rate limiting + self._aggressive_delay() + self._check_rate_limit() + + # Progress + if len(all_items) % 10 == 0: + self.logger.info(f"Fetched {len(all_items)}/{max_posts} posts (starting from {start_from})") + + except Exception as e: + self.logger.error(f"Error processing post: {e}") + continue + + if all_items: + # Update cumulative file + output_file = self.cumulative_manager.update_cumulative_file(all_items, 'Instagram') + + self.logger.info(f"✅ Instagram backlog: {len(all_items)} posts") + self.logger.info(f" Posts {start_from} to {start_from + len(all_items)}") + self.logger.info(f" Updated: {output_file}") + + return True, len(all_items), output_file + else: + self.logger.warning(f"No posts fetched in range {start_from} to {start_from + max_posts}") + return False, 0, None + + except Exception as e: + self.logger.error(f"Backlog error: {e}") + return False, 0, None \ No newline at end of file diff --git a/src/instagram_scraper_with_images.py b/src/instagram_scraper_with_images.py new file mode 100644 index 0000000..9fb6e18 --- /dev/null +++ b/src/instagram_scraper_with_images.py @@ -0,0 +1,300 @@ +""" +Enhanced Instagram scraper that downloads all images (but not videos). +""" + +import os +import time +import random +from typing import Any, Dict, List, Optional +from datetime import datetime +from pathlib import Path +import instaloader +from src.instagram_scraper import InstagramScraper + + +class InstagramScraperWithImages(InstagramScraper): + """Instagram scraper that downloads all post images.""" + + def __init__(self, config): + super().__init__(config) + # Create media directory for Instagram + self.media_dir = self.config.data_dir / "media" / "Instagram" + self.media_dir.mkdir(parents=True, exist_ok=True) + self.logger.info(f"Instagram media directory: {self.media_dir}") + + def _download_post_images(self, post, post_id: str) -> List[str]: + """Download all images from a post (skip videos).""" + image_paths = [] + + try: + # Check if it's a video post - skip downloading video + if getattr(post, 'is_video', False): + # Videos might have a thumbnail we can grab + if hasattr(post, 'url'): + # This is usually the video thumbnail + thumbnail_url = post.url + local_path = self.download_media( + thumbnail_url, + f"instagram_{post_id}_video_thumb", + "image" + ) + if local_path: + image_paths.append(local_path) + self.logger.info(f"Downloaded video thumbnail for {post_id}") + else: + # Single image or carousel + if hasattr(post, 'mediacount') and post.mediacount > 1: + # Carousel post with multiple images + image_num = 1 + for node in post.get_sidecar_nodes(): + # Skip video nodes in carousel + if not node.is_video: + image_url = node.display_url + local_path = self.download_media( + image_url, + f"instagram_{post_id}_image_{image_num}", + "image" + ) + if local_path: + image_paths.append(local_path) + self.logger.info(f"Downloaded carousel image {image_num} for {post_id}") + image_num += 1 + else: + # Single image post + if hasattr(post, 'url'): + image_url = post.url + local_path = self.download_media( + image_url, + f"instagram_{post_id}_image", + "image" + ) + if local_path: + image_paths.append(local_path) + self.logger.info(f"Downloaded image for {post_id}") + + except Exception as e: + self.logger.error(f"Error downloading images for post {post_id}: {e}") + + return image_paths + + def fetch_posts(self, max_posts: int = 20) -> List[Dict[str, Any]]: + """Fetch posts from Instagram profile with image downloads.""" + posts_data = [] + + try: + # Ensure we have a valid context + if not self.loader.context: + self.logger.warning("Instagram context not initialized, attempting re-login") + self._login() + + if not self.loader.context: + self.logger.error("Failed to initialize Instagram context") + return posts_data + + self.logger.info(f"Fetching posts with images from @{self.target_account}") + + # Get profile + profile = instaloader.Profile.from_username(self.loader.context, self.target_account) + self._check_rate_limit() + + # Get posts + posts = profile.get_posts() + + count = 0 + for post in posts: + if count >= max_posts: + break + + try: + # Download images for this post + image_paths = self._download_post_images(post, post.shortcode) + + # Extract post data + post_data = { + 'id': post.shortcode, + 'type': self._get_post_type(post), + 'caption': post.caption if post.caption else '', + 'author': post.owner_username, + 'publish_date': post.date_utc.isoformat(), + 'link': f'https://www.instagram.com/p/{post.shortcode}/', + 'likes': post.likes, + 'comments': post.comments, + 'views': post.video_view_count if hasattr(post, 'video_view_count') else None, + 'media_count': post.mediacount if hasattr(post, 'mediacount') else 1, + 'hashtags': list(post.caption_hashtags) if post.caption else [], + 'mentions': list(post.caption_mentions) if post.caption else [], + 'is_video': getattr(post, 'is_video', False), + 'local_images': image_paths # Add downloaded image paths + } + + posts_data.append(post_data) + count += 1 + + # Aggressive rate limiting between posts + self._aggressive_delay() + self._check_rate_limit() + + # Log progress + if count % 5 == 0: + self.logger.info(f"Fetched {count}/{max_posts} posts with images") + + except Exception as e: + self.logger.error(f"Error processing post: {e}") + continue + + self.logger.info(f"Successfully fetched {len(posts_data)} posts with images") + + except Exception as e: + self.logger.error(f"Error fetching posts: {e}") + + return posts_data + + def fetch_stories(self) -> List[Dict[str, Any]]: + """Fetch stories from Instagram profile with image downloads.""" + stories_data = [] + + try: + # Ensure we have a valid context + if not self.loader.context: + self.logger.warning("Instagram context not initialized, attempting re-login") + self._login() + + if not self.loader.context: + self.logger.error("Failed to initialize Instagram context") + return stories_data + + self.logger.info(f"Fetching stories with images from @{self.target_account}") + + # Get profile + profile = instaloader.Profile.from_username(self.loader.context, self.target_account) + self._check_rate_limit() + + # Get user ID for stories + userid = profile.userid + + # Get stories + for story in self.loader.get_stories(userids=[userid]): + for item in story: + try: + # Download story image (skip video stories) + image_paths = [] + if not item.is_video and hasattr(item, 'url'): + local_path = self.download_media( + item.url, + f"instagram_{item.mediaid}_story", + "image" + ) + if local_path: + image_paths.append(local_path) + self.logger.info(f"Downloaded story image {item.mediaid}") + + story_data = { + 'id': item.mediaid, + 'type': 'story', + 'caption': '', # Stories usually don't have captions + 'author': item.owner_username, + 'publish_date': item.date_utc.isoformat(), + 'link': f'https://www.instagram.com/stories/{item.owner_username}/{item.mediaid}/', + 'is_video': item.is_video if hasattr(item, 'is_video') else False, + 'local_images': image_paths # Add downloaded image paths + } + + stories_data.append(story_data) + + # Rate limiting + self._aggressive_delay() + self._check_rate_limit() + + except Exception as e: + self.logger.error(f"Error processing story: {e}") + continue + + self.logger.info(f"Successfully fetched {len(stories_data)} stories with images") + + except Exception as e: + self.logger.error(f"Error fetching stories: {e}") + + return stories_data + + def format_markdown(self, items: List[Dict[str, Any]]) -> str: + """Format Instagram content as markdown with image references.""" + markdown_sections = [] + + for item in items: + section = [] + + # ID + section.append(f"# ID: {item.get('id', 'N/A')}") + section.append("") + + # Type + section.append(f"## Type: {item.get('type', 'post')}") + section.append("") + + # Link + section.append(f"## Link: {item.get('link', '')}") + section.append("") + + # Author + section.append(f"## Author: {item.get('author', 'N/A')}") + section.append("") + + # Publish Date + section.append(f"## Publish Date: {item.get('publish_date', 'N/A')}") + section.append("") + + # Caption + if item.get('caption'): + section.append("## Caption:") + section.append(item['caption']) + section.append("") + + # Engagement metrics + if item.get('likes') is not None: + section.append(f"## Likes: {item.get('likes', 0)}") + section.append("") + + if item.get('comments') is not None: + section.append(f"## Comments: {item.get('comments', 0)}") + section.append("") + + if item.get('views') is not None: + section.append(f"## Views: {item.get('views', 0)}") + section.append("") + + # Local images + if item.get('local_images'): + section.append("## Downloaded Images:") + for img_path in item['local_images']: + # Convert to relative path for markdown + rel_path = Path(img_path).relative_to(self.config.data_dir) + section.append(f"- [{rel_path.name}]({rel_path})") + section.append("") + + # Hashtags + if item.get('hashtags'): + section.append(f"## Hashtags: {' '.join(['#' + tag for tag in item['hashtags']])}") + section.append("") + + # Mentions + if item.get('mentions'): + section.append(f"## Mentions: {' '.join(['@' + mention for mention in item['mentions']])}") + section.append("") + + # Media count + if item.get('media_count') and item['media_count'] > 1: + section.append(f"## Media Count: {item['media_count']}") + section.append("") + + # Is video + if item.get('is_video'): + section.append("## Media Type: Video (thumbnail downloaded)") + section.append("") + + # Separator + section.append("-" * 50) + section.append("") + + markdown_sections.append('\n'.join(section)) + + return '\n'.join(markdown_sections) \ No newline at end of file diff --git a/src/rss_scraper_with_images.py b/src/rss_scraper_with_images.py new file mode 100644 index 0000000..5304d4b --- /dev/null +++ b/src/rss_scraper_with_images.py @@ -0,0 +1,152 @@ +""" +Enhanced RSS scrapers that download podcast episode thumbnails. +""" + +from typing import Dict, List, Any, Optional +from pathlib import Path +from src.rss_scraper import RSSScraperPodcast, RSSScraperMailChimp + + +class RSSScraperPodcastWithImages(RSSScraperPodcast): + """Podcast RSS scraper that downloads episode thumbnails.""" + + def __init__(self, config): + super().__init__(config) + # Create media directory for Podcast + self.media_dir = self.config.data_dir / "media" / "Podcast" + self.media_dir.mkdir(parents=True, exist_ok=True) + self.logger.info(f"Podcast media directory: {self.media_dir}") + + def _download_episode_thumbnail(self, episode_id: str, image_url: str) -> Optional[str]: + """Download podcast episode thumbnail.""" + if not image_url: + return None + + try: + # Clean episode ID for filename + safe_id = episode_id.replace('/', '_').replace('\\', '_')[:50] + + local_path = self.download_media( + image_url, + f"podcast_{safe_id}_thumbnail", + "image" + ) + if local_path: + self.logger.info(f"Downloaded thumbnail for episode {safe_id}") + return local_path + except Exception as e: + self.logger.error(f"Error downloading thumbnail for {episode_id}: {e}") + return None + + def fetch_content(self, max_items: Optional[int] = None) -> List[Dict[str, Any]]: + """Fetch RSS feed content with thumbnail downloads.""" + items = super().fetch_content(max_items) + + # Download thumbnails for each episode + for item in items: + image_url = self.extract_image_link(item) + if image_url: + episode_id = item.get('id') or item.get('guid', 'unknown') + local_thumbnail = self._download_episode_thumbnail(episode_id, image_url) + item['local_thumbnail'] = local_thumbnail + item['thumbnail_url'] = image_url + + # Also store audio link for reference (but don't download) + audio_link = self.extract_audio_link(item) + if audio_link: + item['audio_url'] = audio_link + + return items + + def format_markdown(self, items: List[Dict[str, Any]]) -> str: + """Format podcast items as markdown with thumbnail references.""" + markdown_sections = [] + + for item in items: + section = [] + + # ID + item_id = item.get('id') or item.get('guid', 'N/A') + section.append(f"# ID: {item_id}") + section.append("") + + # Title + title = item.get('title', 'Untitled') + section.append(f"## Title: {title}") + section.append("") + + # Type + section.append("## Type: podcast") + section.append("") + + # Link + link = item.get('link', '') + section.append(f"## Link: {link}") + section.append("") + + # Audio URL + if item.get('audio_url'): + section.append(f"## Audio: {item['audio_url']}") + section.append("") + + # Publish Date + pub_date = item.get('published') or item.get('pubDate', '') + section.append(f"## Publish Date: {pub_date}") + section.append("") + + # Duration + duration = item.get('itunes_duration', '') + if duration: + section.append(f"## Duration: {duration}") + section.append("") + + # Thumbnail + if item.get('local_thumbnail'): + section.append("## Thumbnail:") + # Convert to relative path for markdown + rel_path = Path(item['local_thumbnail']).relative_to(self.config.data_dir) + section.append(f"![Thumbnail]({rel_path})") + section.append("") + elif item.get('thumbnail_url'): + section.append(f"## Thumbnail URL: {item['thumbnail_url']}") + section.append("") + + # Description + section.append("## Description:") + + # Try to get full content first, then summary, then description + content = item.get('content') + if content and isinstance(content, list) and len(content) > 0: + content_html = content[0].get('value', '') + if content_html: + content_md = self.convert_to_markdown(content_html) + section.append(content_md) + elif item.get('summary'): + summary_md = self.convert_to_markdown(item.get('summary')) + section.append(summary_md) + elif item.get('description'): + desc_md = self.convert_to_markdown(item.get('description')) + section.append(desc_md) + + section.append("") + + # iTunes metadata if available + if item.get('itunes_author'): + section.append(f"## Author: {item['itunes_author']}") + section.append("") + + if item.get('itunes_episode'): + section.append(f"## Episode Number: {item['itunes_episode']}") + section.append("") + + if item.get('itunes_season'): + section.append(f"## Season: {item['itunes_season']}") + section.append("") + + # Separator + section.append("-" * 50) + section.append("") + + markdown_sections.append('\n'.join(section)) + + return '\n'.join(markdown_sections) \ No newline at end of file diff --git a/src/youtube_api_scraper_with_thumbnails.py b/src/youtube_api_scraper_with_thumbnails.py new file mode 100644 index 0000000..f3203ea --- /dev/null +++ b/src/youtube_api_scraper_with_thumbnails.py @@ -0,0 +1,222 @@ +""" +Enhanced YouTube API scraper that downloads video thumbnails. +""" + +from typing import List, Dict, Any, Optional +from pathlib import Path +from src.youtube_api_scraper_v2 import YouTubeAPIScraper + + +class YouTubeAPIScraperWithThumbnails(YouTubeAPIScraper): + """YouTube API scraper that downloads video thumbnails.""" + + def __init__(self, config): + super().__init__(config) + # Create media directory for YouTube + self.media_dir = self.config.data_dir / "media" / "YouTube" + self.media_dir.mkdir(parents=True, exist_ok=True) + self.logger.info(f"YouTube media directory: {self.media_dir}") + + def _download_thumbnail(self, video_id: str, thumbnail_url: str) -> Optional[str]: + """Download video thumbnail.""" + if not thumbnail_url: + return None + + try: + local_path = self.download_media( + thumbnail_url, + f"youtube_{video_id}_thumbnail", + "image" + ) + if local_path: + self.logger.info(f"Downloaded thumbnail for video {video_id}") + return local_path + except Exception as e: + self.logger.error(f"Error downloading thumbnail for {video_id}: {e}") + return None + + def fetch_content(self, max_posts: int = None, fetch_captions: bool = True) -> List[Dict[str, Any]]: + """Fetch YouTube videos with thumbnail downloads.""" + # Call parent method to get videos + videos = super().fetch_content(max_posts, fetch_captions) + + # Download thumbnails for each video + for video in videos: + if video.get('thumbnail'): + local_thumbnail = self._download_thumbnail(video['id'], video['thumbnail']) + video['local_thumbnail'] = local_thumbnail + + return videos + + def fetch_video_details(self, video_ids: List[str]) -> List[Dict[str, Any]]: + """Fetch detailed video information with thumbnail downloads.""" + if not video_ids: + return [] + + # YouTube API allows max 50 videos per request + batch_size = 50 + all_videos = [] + + for i in range(0, len(video_ids), batch_size): + batch = video_ids[i:i + batch_size] + + # Check quota (1 unit per request) + if not self._track_quota('videos_list'): + self.logger.warning("Quota limit reached while fetching video details") + break + + try: + response = self.youtube.videos().list( + part='snippet,statistics,contentDetails', + id=','.join(batch) + ).execute() + + for video in response.get('items', []): + # Get thumbnail URL (highest quality available) + thumbnail_url = ( + video['snippet']['thumbnails'].get('maxres', {}).get('url') or + video['snippet']['thumbnails'].get('high', {}).get('url') or + video['snippet']['thumbnails'].get('medium', {}).get('url') or + video['snippet']['thumbnails'].get('default', {}).get('url', '') + ) + + # Download thumbnail + local_thumbnail = self._download_thumbnail(video['id'], thumbnail_url) + + video_data = { + 'id': video['id'], + 'title': video['snippet']['title'], + 'description': video['snippet']['description'], + 'published_at': video['snippet']['publishedAt'], + 'channel_id': video['snippet']['channelId'], + 'channel_title': video['snippet']['channelTitle'], + 'tags': video['snippet'].get('tags', []), + 'duration': video['contentDetails']['duration'], + 'definition': video['contentDetails']['definition'], + 'caption': video['contentDetails'].get('caption', 'false'), + 'thumbnail': thumbnail_url, + 'local_thumbnail': local_thumbnail, # Add local thumbnail path + + # Statistics + 'view_count': int(video['statistics'].get('viewCount', 0)), + 'like_count': int(video['statistics'].get('likeCount', 0)), + 'comment_count': int(video['statistics'].get('commentCount', 0)), + + # Calculate engagement metrics + 'engagement_rate': 0, + 'like_ratio': 0 + } + + # Calculate engagement metrics + if video_data['view_count'] > 0: + video_data['engagement_rate'] = ( + (video_data['like_count'] + video_data['comment_count']) / + video_data['view_count'] + ) * 100 + video_data['like_ratio'] = (video_data['like_count'] / video_data['view_count']) * 100 + + all_videos.append(video_data) + + # Small delay to be respectful + import time + time.sleep(0.1) + + except Exception as e: + self.logger.error(f"Error fetching video details: {e}") + + return all_videos + + def format_markdown(self, videos: List[Dict[str, Any]]) -> str: + """Format videos as markdown with thumbnail references.""" + markdown_sections = [] + + for video in videos: + section = [] + + # ID + section.append(f"# ID: {video.get('id', 'N/A')}") + section.append("") + + # Title + section.append(f"## Title: {video.get('title', 'Untitled')}") + section.append("") + + # Type + section.append("## Type: video") + section.append("") + + # Link + section.append(f"## Link: https://www.youtube.com/watch?v={video.get('id', '')}") + section.append("") + + # Channel + section.append(f"## Channel: {video.get('channel_title', 'N/A')}") + section.append("") + + # Published Date + section.append(f"## Published: {video.get('published_at', 'N/A')}") + section.append("") + + # Duration + if video.get('duration'): + section.append(f"## Duration: {video['duration']}") + section.append("") + + # Description + if video.get('description'): + section.append("## Description:") + section.append(video['description'][:1000]) # Limit description length + if len(video.get('description', '')) > 1000: + section.append("... [truncated]") + section.append("") + + # Statistics + section.append("## Statistics:") + section.append(f"- Views: {video.get('view_count', 0):,}") + section.append(f"- Likes: {video.get('like_count', 0):,}") + section.append(f"- Comments: {video.get('comment_count', 0):,}") + section.append(f"- Engagement Rate: {video.get('engagement_rate', 0):.2f}%") + section.append(f"- Like Ratio: {video.get('like_ratio', 0):.2f}%") + section.append("") + + # Caption/Transcript + if video.get('caption_text'): + section.append("## Transcript:") + # Show first 500 chars of transcript + transcript_preview = video['caption_text'][:500] + section.append(transcript_preview) + if len(video.get('caption_text', '')) > 500: + section.append("... [See full transcript below]") + section.append("") + + # Add full transcript at the end + section.append("### Full Transcript:") + section.append(video['caption_text']) + section.append("") + elif video.get('caption') == 'true': + section.append("## Captions: Available (not fetched)") + section.append("") + + # Thumbnail + if video.get('local_thumbnail'): + section.append("## Thumbnail:") + # Convert to relative path for markdown + rel_path = Path(video['local_thumbnail']).relative_to(self.config.data_dir) + section.append(f"![Thumbnail]({rel_path})") + section.append("") + elif video.get('thumbnail'): + section.append(f"## Thumbnail URL: {video['thumbnail']}") + section.append("") + + # Tags + if video.get('tags'): + section.append(f"## Tags: {', '.join(video['tags'][:10])}") # Limit to 10 tags + section.append("") + + # Separator + section.append("-" * 50) + section.append("") + + markdown_sections.append('\n'.join(section)) + + return '\n'.join(markdown_sections) \ No newline at end of file