feat: Implement comprehensive image downloading and cumulative markdown system

Major Updates:
- Added image downloading for Instagram, YouTube, and Podcast scrapers
- Implemented cumulative markdown system for maintaining single source-of-truth files
- Deployed production services with automatic NAS sync for images
- Standardized file naming conventions per project specification

New Features:
- Instagram: Downloads all post images, carousel images, and video thumbnails
- YouTube: Downloads video thumbnails (highest quality available)
- Podcast: Downloads episode artwork/thumbnails
- Consistent image naming: {source}_{item_id}_{type}.{ext}
- Cumulative markdown updates to prevent file proliferation
- Automatic media sync to NAS at /mnt/nas/hvacknowitall/media/

Production Deployment:
- New systemd services: hvac-content-images-8am and hvac-content-images-12pm
- Runs twice daily at 8 AM and 12 PM Atlantic time
- Comprehensive rsync for both markdown and media files

File Structure Compliance:
- Renamed Instagram backlog to spec-compliant format
- Archived legacy directory structures
- Ensured all new files follow <brandName>_<source>_<dateTime>.md format

Testing:
- Successfully captured Instagram posts 1-1000 with images
- Launched next batch (posts 1001-2000) currently in progress
- Verified thumbnail downloads for YouTube and Podcast content

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Ben Reed 2025-08-19 12:54:21 -03:00
parent ef66d3bbc5
commit 2edc359b5e
12 changed files with 1871 additions and 5 deletions

View file

@ -5,10 +5,11 @@ A containerized Python application that aggregates content from multiple HVAC Kn
## Features ## Features
- **Multi-source content aggregation** from YouTube, Instagram, TikTok, MailChimp, WordPress, and Podcast RSS - **Multi-source content aggregation** from YouTube, Instagram, TikTok, MailChimp, WordPress, and Podcast RSS
- **Comprehensive image downloading** for all visual content (Instagram posts, YouTube thumbnails, Podcast artwork)
- **Cumulative markdown management** - Single source-of-truth files that grow with backlog and incremental updates - **Cumulative markdown management** - Single source-of-truth files that grow with backlog and incremental updates
- **API integrations** for YouTube Data API v3 and MailChimp API - **API integrations** for YouTube Data API v3 and MailChimp API
- **Intelligent content merging** with caption/transcript updates and metric tracking - **Intelligent content merging** with caption/transcript updates and metric tracking
- **Automated NAS synchronization** to `/mnt/nas/hvacknowitall/` - **Automated NAS synchronization** to `/mnt/nas/hvacknowitall/` for both markdown and media files
- **State management** for incremental updates - **State management** for incremental updates
- **Parallel processing** for multiple sources - **Parallel processing** for multiple sources
- **Atlantic timezone** (America/Halifax) timestamps - **Atlantic timezone** (America/Halifax) timestamps
@ -112,6 +113,10 @@ data/
│ ├── Instagram/ │ ├── Instagram/
│ └── ... │ └── ...
├── media/ # Downloaded media files ├── media/ # Downloaded media files
│ ├── Instagram/ # Instagram images and video thumbnails
│ ├── YouTube/ # YouTube video thumbnails
│ ├── Podcast/ # Podcast episode artwork
│ └── ...
└── .state/ # State files for incremental updates └── .state/ # State files for incremental updates
logs/ # Log files by source logs/ # Log files by source
@ -139,10 +144,10 @@ tests/ # Test files
### Systemd Services ### Systemd Services
Services are configured in `/etc/systemd/system/`: Services are configured in `/etc/systemd/system/`:
- `hvac-content-8am.service` - Morning run - `hvac-content-images-8am.service` - Morning run with image downloads
- `hvac-content-12pm.service` - Noon run - `hvac-content-images-12pm.service` - Noon run with image downloads
- `hvac-content-8am.timer` - Morning schedule - `hvac-content-images-8am.timer` - Morning schedule (8 AM Atlantic)
- `hvac-content-12pm.timer` - Noon schedule - `hvac-content-images-12pm.timer` - Noon schedule (12 PM Atlantic)
### Manual Deployment ### Manual Deployment
@ -207,6 +212,33 @@ tail -f logs/YouTube/youtube_*.log
uv run python -m src.youtube_api_scraper_v2 --test uv run python -m src.youtube_api_scraper_v2 --test
``` ```
## Recent Updates (2025-08-19)
### Comprehensive Image Downloading
- Implemented full image download capability for all content sources
- Instagram: Downloads all post images, carousel images, and video thumbnails
- YouTube: Automatically fetches highest quality video thumbnails
- Podcasts: Downloads episode artwork and thumbnails
- Consistent naming: `{source}_{item_id}_{type}.{ext}`
- Media organized in `data/media/{source}/` directories
### File Naming Standardization
- Migrated to project specification compliant naming
- Format: `<brandName>_<source>_<dateTime>.md`
- Example: `hvacnkowitall_instagram_2025-08-19T100511.md`
- Archived legacy file structures to `markdown_archives/legacy_structure/`
### Instagram Backlog Expansion
- Completed initial 1000 posts capture with images
- Currently capturing posts 1001-2000 with rate limiting
- Cumulative markdown updates every 100 posts
- Full image download for all historical content
### Production Automation
- Deployed systemd services for twice-daily runs (8 AM, 12 PM Atlantic)
- Automated NAS synchronization for markdown and media files
- Rate-limited scraping with humanized delays (10-20 seconds per Instagram post)
## License ## License
Private repository - All rights reserved Private repository - All rights reserved

View file

@ -0,0 +1,18 @@
[Unit]
Description=HVAC Content Aggregation with Images - 12 PM Run
After=network.target
[Service]
Type=oneshot
User=ben
Group=ben
WorkingDirectory=/home/ben/dev/hvac-kia-content
Environment="PATH=/home/ben/.local/bin:/usr/local/bin:/usr/bin:/bin"
Environment="DISPLAY=:0"
Environment="XAUTHORITY=/run/user/1000/.mutter-Xwaylandauth.90WDB3"
ExecStart=/usr/bin/bash -c 'source /home/ben/dev/hvac-kia-content/.venv/bin/activate && python run_production_with_images.py'
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target

View file

@ -0,0 +1,18 @@
[Unit]
Description=HVAC Content Aggregation with Images - 8 AM Run
After=network.target
[Service]
Type=oneshot
User=ben
Group=ben
WorkingDirectory=/home/ben/dev/hvac-kia-content
Environment="PATH=/home/ben/.local/bin:/usr/local/bin:/usr/bin:/bin"
Environment="DISPLAY=:0"
Environment="XAUTHORITY=/run/user/1000/.mutter-Xwaylandauth.90WDB3"
ExecStart=/usr/bin/bash -c 'source /home/ben/dev/hvac-kia-content/.venv/bin/activate && python run_production_with_images.py'
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target

74
deploy/update_to_images.sh Executable file
View file

@ -0,0 +1,74 @@
#!/bin/bash
# Update script to enable image downloading in production
echo "Updating HVAC Content Aggregation to include image downloads..."
echo
# Stop and disable old services
echo "Stopping old services..."
sudo systemctl stop hvac-content-8am.timer hvac-content-12pm.timer
sudo systemctl disable hvac-content-8am.service hvac-content-12pm.service
sudo systemctl disable hvac-content-8am.timer hvac-content-12pm.timer
# Copy new service files
echo "Installing new services with image downloads..."
sudo cp hvac-content-images-8am.service /etc/systemd/system/
sudo cp hvac-content-images-12pm.service /etc/systemd/system/
# Create new timer files (reuse existing timers with new names)
sudo tee /etc/systemd/system/hvac-content-images-8am.timer > /dev/null <<EOF
[Unit]
Description=Run HVAC Content with Images at 8 AM daily
[Timer]
OnCalendar=*-*-* 08:00:00
Persistent=true
[Install]
WantedBy=timers.target
EOF
sudo tee /etc/systemd/system/hvac-content-images-12pm.timer > /dev/null <<EOF
[Unit]
Description=Run HVAC Content with Images at 12 PM daily
[Timer]
OnCalendar=*-*-* 12:00:00
Persistent=true
[Install]
WantedBy=timers.target
EOF
# Reload systemd
echo "Reloading systemd..."
sudo systemctl daemon-reload
# Enable new services
echo "Enabling new services..."
sudo systemctl enable hvac-content-images-8am.timer
sudo systemctl enable hvac-content-images-12pm.timer
# Start timers
echo "Starting timers..."
sudo systemctl start hvac-content-images-8am.timer
sudo systemctl start hvac-content-images-12pm.timer
# Show status
echo
echo "Service status:"
sudo systemctl status hvac-content-images-8am.timer --no-pager
echo
sudo systemctl status hvac-content-images-12pm.timer --no-pager
echo
echo "Next scheduled runs:"
sudo systemctl list-timers hvac-content-images-* --no-pager
echo
echo "✅ Update complete! Image downloading is now enabled in production."
echo "The scrapers will now download:"
echo " - Instagram post images and video thumbnails"
echo " - YouTube video thumbnails"
echo " - Podcast episode thumbnails"
echo
echo "Images will be synced to: /mnt/nas/hvacknowitall/media/"

186
docs/image_downloads.md Normal file
View file

@ -0,0 +1,186 @@
# Image Download System
## Overview
The HVAC Know It All content aggregation system now includes comprehensive image downloading capabilities for all supported sources. This system downloads thumbnails and images (but not videos) to provide visual context alongside the markdown content.
## Supported Image Types
### Instagram
- **Post images**: All images from single posts and carousel posts
- **Video thumbnails**: Thumbnail images for video posts (videos themselves are not downloaded)
- **Story images**: Images from stories (video stories get thumbnails only)
### YouTube
- **Video thumbnails**: High-resolution thumbnails for each video
- **Formats**: Attempts to get maxres > high > medium > default quality
### Podcasts
- **Episode thumbnails**: iTunes artwork and media thumbnails for each episode
- **Formats**: PNG/JPEG episode artwork
## File Naming Convention
All downloaded images follow a consistent naming pattern:
```
{source}_{item_id}_{type}_{optional_number}.{ext}
```
Examples:
- `instagram_Cm1wgRMr_mj_video_thumb.jpg`
- `instagram_CpgiKyqPoX1_image_1.jpg`
- `youtube_dQw4w9WgXcQ_thumbnail.jpg`
- `podcast_episode123_thumbnail.png`
## Directory Structure
```
data/
├── media/
│ ├── Instagram/
│ │ ├── instagram_post1_image.jpg
│ │ └── instagram_post2_video_thumb.jpg
│ ├── YouTube/
│ │ ├── youtube_video1_thumbnail.jpg
│ │ └── youtube_video2_thumbnail.jpg
│ └── Podcast/
│ ├── podcast_ep1_thumbnail.png
│ └── podcast_ep2_thumbnail.jpg
└── markdown_current/
├── hvacnkowitall_instagram_*.md
├── hvacnkowitall_youtube_*.md
└── hvacnkowitall_podcast_*.md
```
## Enhanced Scrapers
### InstagramScraperWithImages
- Extends `InstagramScraper`
- Downloads all non-video media
- Handles carousel posts with multiple images
- Stores local paths in `local_images` field
### YouTubeAPIScraperWithThumbnails
- Extends `YouTubeAPIScraper`
- Downloads video thumbnails
- Selects highest quality available
- Stores local path in `local_thumbnail` field
### RSSScraperPodcastWithImages
- Extends `RSSScraperPodcast`
- Downloads episode thumbnails
- Extracts from iTunes metadata
- Stores local path in `local_thumbnail` field
## Production Scripts
### run_production_with_images.py
Main production script that:
1. Runs all enhanced scrapers
2. Downloads images during content fetching
3. Updates cumulative markdown files
4. Syncs both markdown and images to NAS
### Test Script
`test_image_downloads.py` - Tests image downloading with small batches:
- 3 YouTube videos
- 3 Instagram posts
- 3 Podcast episodes
## NAS Synchronization
The rsync function has been enhanced to sync images:
```python
# Sync markdown files
rsync -av --include=*.md --exclude=* data/markdown_current/ /mnt/nas/hvacknowitall/markdown_current/
# Sync image files
rsync -av --include=*/ --include=*.jpg --include=*.jpeg --include=*.png --include=*.gif --exclude=* data/media/ /mnt/nas/hvacknowitall/media/
```
## Markdown Integration
Downloaded images are referenced in markdown files:
```markdown
## Thumbnail:
![Thumbnail](media/YouTube/youtube_videoId_thumbnail.jpg)
## Downloaded Images:
- [image1.jpg](media/Instagram/instagram_postId_image_1.jpg)
- [image2.jpg](media/Instagram/instagram_postId_image_2.jpg)
```
## Rate Limiting Considerations
- **Instagram**: Aggressive delays between image downloads (10-20 seconds)
- **YouTube**: Minimal delays, respects API quota
- **Podcast**: No rate limiting needed for RSS feeds
## Storage Estimates
Based on testing:
- **Instagram**: ~70-100 KB per image
- **YouTube**: ~100-200 KB per thumbnail
- **Podcast**: ~3-4 MB per episode thumbnail (high quality artwork)
For 1000 items per source:
- Instagram: ~100 MB (assuming 1 image per post)
- YouTube: ~200 MB
- Podcast: ~4 GB (if all episodes have artwork)
## Usage
### Test Image Downloads
```bash
python test_image_downloads.py
```
### Production Run with Images
```bash
python run_production_with_images.py
```
### Check Downloaded Images
```bash
# Count images per source
find data/media -name "*.jpg" -o -name "*.png" | wc -l
# Check disk usage
du -sh data/media/*
```
## Configuration
No additional configuration needed. The system uses existing environment variables:
- Instagram credentials for authenticated image access
- YouTube API key (thumbnails are public)
- Podcast RSS URL (thumbnails in feed metadata)
## Future Enhancements
Potential improvements:
1. Image optimization/compression to reduce storage
2. Configurable image quality settings
3. Option to download video files (currently excluded)
4. Thumbnail generation for videos without thumbnails
5. Image deduplication for repeated content
## Troubleshooting
### Images Not Downloading
- Check network connectivity
- Verify source credentials (Instagram)
- Check disk space
- Review logs for HTTP errors
### Rate Limiting
- Instagram may block rapid downloads
- Use aggressive delays in scraper
- Consider batching downloads
### Storage Issues
- Monitor disk usage
- Consider external storage for media
- Implement rotation/archiving strategy

166
run_instagram_next_1000.py Executable file
View file

@ -0,0 +1,166 @@
#!/usr/bin/env python3
"""
Fetch the next 1000 Instagram posts (1001-2000) and update cumulative file.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from src.instagram_scraper_with_images import InstagramScraperWithImages
from src.base_scraper import ScraperConfig
from src.cumulative_markdown_manager import CumulativeMarkdownManager
from datetime import datetime
import pytz
import time
import logging
import instaloader
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/instagram_next_1000.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('instagram_next_1000')
def fetch_next_1000_posts():
"""Fetch Instagram posts 1001-2000 and update cumulative file."""
logger.info("=" * 60)
logger.info("INSTAGRAM NEXT 1000 POSTS (1001-2000)")
logger.info("=" * 60)
# Get Atlantic timezone timestamp
tz = pytz.timezone('America/Halifax')
now = datetime.now(tz)
timestamp = now.strftime('%Y-%m-%dT%H%M%S')
logger.info(f"Started at: {now.strftime('%Y-%m-%d %H:%M:%S %Z')}")
# Setup config
config = ScraperConfig(
source_name='Instagram',
brand_name='hvacnkowitall',
data_dir=Path('data'),
logs_dir=Path('logs'),
timezone='America/Halifax'
)
# Initialize scraper
scraper = InstagramScraperWithImages(config)
cumulative_manager = CumulativeMarkdownManager(config)
logger.info("Fetching posts 1001-2000 from Instagram...")
logger.info("This will take several hours due to rate limiting")
all_items = []
posts_to_skip = 1000 # We already have the first 1000
max_posts = 1000 # We want the next 1000
try:
# Ensure we have a valid context
if not scraper.loader.context:
logger.error("Failed to initialize Instagram context")
return False
# Get profile
profile = instaloader.Profile.from_username(scraper.loader.context, scraper.target_account)
scraper._check_rate_limit()
# Get posts
posts = profile.get_posts()
post_count = 0
skipped = 0
for post in posts:
# Skip first 1000 posts
if skipped < posts_to_skip:
skipped += 1
if skipped % 100 == 0:
logger.info(f"Skipping post {skipped}/{posts_to_skip}...")
continue
# Stop after next 1000
if post_count >= max_posts:
break
try:
# Download images for this post
image_paths = scraper._download_post_images(post, post.shortcode)
# Extract post data
post_data = {
'id': post.shortcode,
'type': scraper._get_post_type(post),
'caption': post.caption if post.caption else '',
'author': post.owner_username,
'publish_date': post.date_utc.isoformat(),
'link': f'https://www.instagram.com/p/{post.shortcode}/',
'likes': post.likes,
'comments': post.comments,
'views': post.video_view_count if hasattr(post, 'video_view_count') else None,
'media_count': post.mediacount if hasattr(post, 'mediacount') else 1,
'hashtags': list(post.caption_hashtags) if post.caption else [],
'mentions': list(post.caption_mentions) if post.caption else [],
'is_video': getattr(post, 'is_video', False),
'local_images': image_paths
}
all_items.append(post_data)
post_count += 1
# Aggressive rate limiting
scraper._aggressive_delay()
scraper._check_rate_limit()
# Progress updates
if post_count % 10 == 0:
logger.info(f"Fetched post {posts_to_skip + post_count} (#{post_count}/1000 in this batch)")
# Save incremental updates every 100 posts
if post_count % 100 == 0:
logger.info(f"Saving incremental update at {post_count} posts...")
output_file = cumulative_manager.update_cumulative_file(all_items, 'Instagram')
logger.info(f"Saved to: {output_file}")
except Exception as e:
logger.error(f"Error processing post: {e}")
continue
# Final save
if all_items:
output_file = cumulative_manager.update_cumulative_file(all_items, 'Instagram')
# Calculate statistics
img_count = sum(len(item.get('local_images', [])) for item in all_items)
logger.info("=" * 60)
logger.info("INSTAGRAM NEXT 1000 COMPLETED")
logger.info("=" * 60)
logger.info(f"Posts fetched: {len(all_items)}")
logger.info(f"Post range: 1001-{1000 + len(all_items)}")
logger.info(f"Images downloaded: {img_count}")
logger.info(f"Output file: {output_file}")
logger.info("=" * 60)
return True
else:
logger.warning("No posts fetched")
return False
except Exception as e:
logger.error(f"Fatal error: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = fetch_next_1000_posts()
sys.exit(0 if success else 1)

View file

@ -0,0 +1,238 @@
#!/usr/bin/env python3
"""
Production script with cumulative markdown and image downloads.
Uses cumulative updates for all sources.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from src.youtube_api_scraper_with_thumbnails import YouTubeAPIScraperWithThumbnails
from src.mailchimp_api_scraper_v2 import MailChimpAPIScraper
from src.instagram_scraper_cumulative import InstagramScraperCumulative
from src.rss_scraper_with_images import RSSScraperPodcastWithImages
from src.wordpress_scraper import WordPressScraper
from src.tiktok_scraper_advanced import TikTokScraperAdvanced
from src.base_scraper import ScraperConfig
from src.cumulative_markdown_manager import CumulativeMarkdownManager
from datetime import datetime
import pytz
import time
import logging
import subprocess
import os
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/production_cumulative.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('production_cumulative')
def get_atlantic_timestamp() -> str:
"""Get current timestamp in Atlantic timezone for file naming."""
tz = pytz.timezone('America/Halifax')
return datetime.now(tz).strftime('%Y-%m-%dT%H%M%S')
def run_instagram_incremental():
"""Run Instagram incremental update with cumulative markdown."""
logger.info("=" * 60)
logger.info("INSTAGRAM INCREMENTAL UPDATE (CUMULATIVE)")
logger.info("=" * 60)
if not os.getenv('INSTAGRAM_USERNAME'):
logger.warning("Instagram not configured")
return False, 0, None
config = ScraperConfig(
source_name='Instagram',
brand_name='hvacnkowitall',
data_dir=Path('data'),
logs_dir=Path('logs'),
timezone='America/Halifax'
)
try:
scraper = InstagramScraperCumulative(config)
return scraper.run_incremental(max_posts=50) # Check for 50 new posts
except Exception as e:
logger.error(f"Instagram error: {e}")
return False, 0, None
def run_youtube_incremental():
"""Run YouTube incremental update with thumbnails."""
logger.info("=" * 60)
logger.info("YOUTUBE INCREMENTAL UPDATE")
logger.info("=" * 60)
config = ScraperConfig(
source_name='YouTube',
brand_name='hvacnkowitall',
data_dir=Path('data'),
logs_dir=Path('logs'),
timezone='America/Halifax'
)
try:
scraper = YouTubeAPIScraperWithThumbnails(config)
videos = scraper.fetch_content(max_posts=20) # Check for 20 new videos
if videos:
manager = CumulativeMarkdownManager(config)
output_file = manager.update_cumulative_file(videos, 'YouTube')
thumb_count = sum(1 for v in videos if v.get('local_thumbnail'))
logger.info(f"✅ YouTube: {len(videos)} videos, {thumb_count} thumbnails")
return True, len(videos), output_file
else:
logger.info("No new YouTube videos")
return False, 0, None
except Exception as e:
logger.error(f"YouTube error: {e}")
return False, 0, None
def run_podcast_incremental():
"""Run Podcast incremental update with thumbnails."""
logger.info("=" * 60)
logger.info("PODCAST INCREMENTAL UPDATE")
logger.info("=" * 60)
if not os.getenv('PODCAST_RSS_URL'):
logger.warning("Podcast not configured")
return False, 0, None
config = ScraperConfig(
source_name='Podcast',
brand_name='hvacnkowitall',
data_dir=Path('data'),
logs_dir=Path('logs'),
timezone='America/Halifax'
)
try:
scraper = RSSScraperPodcastWithImages(config)
items = scraper.fetch_content(max_items=10) # Check for 10 new episodes
if items:
manager = CumulativeMarkdownManager(config)
output_file = manager.update_cumulative_file(items, 'Podcast')
thumb_count = sum(1 for item in items if item.get('local_thumbnail'))
logger.info(f"✅ Podcast: {len(items)} episodes, {thumb_count} thumbnails")
return True, len(items), output_file
else:
logger.info("No new podcast episodes")
return False, 0, None
except Exception as e:
logger.error(f"Podcast error: {e}")
return False, 0, None
def sync_to_nas_with_images():
"""Sync markdown files AND images to NAS."""
logger.info("\n" + "=" * 60)
logger.info("SYNCING TO NAS - MARKDOWN AND IMAGES")
logger.info("=" * 60)
nas_base = Path('/mnt/nas/hvacknowitall')
try:
# Sync markdown files
local_current = Path('data/markdown_current')
nas_current = nas_base / 'markdown_current'
if local_current.exists() and any(local_current.glob('*.md')):
nas_current.mkdir(parents=True, exist_ok=True)
cmd = ['rsync', '-av', '--include=*.md', '--exclude=*',
str(local_current) + '/', str(nas_current) + '/']
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"✅ Markdown files synced to NAS")
else:
logger.warning(f"Markdown sync warning: {result.stderr}")
# Sync media files
local_media = Path('data/media')
nas_media = nas_base / 'media'
if local_media.exists():
nas_media.mkdir(parents=True, exist_ok=True)
cmd = ['rsync', '-av',
'--include=*/',
'--include=*.jpg', '--include=*.jpeg',
'--include=*.png', '--include=*.gif',
'--exclude=*',
str(local_media) + '/', str(nas_media) + '/']
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"✅ Media files synced to NAS")
except Exception as e:
logger.error(f"Failed to sync to NAS: {e}")
def main():
"""Main production run with cumulative updates and images."""
logger.info("=" * 70)
logger.info("HVAC KNOW IT ALL - CUMULATIVE PRODUCTION")
logger.info("With Image Downloads and Cumulative Markdown")
logger.info("=" * 70)
atlantic_tz = pytz.timezone('America/Halifax')
start_time = datetime.now(atlantic_tz)
logger.info(f"Started at: {start_time.isoformat()}")
# Track results
results = {}
# Run incremental updates
success, count, file = run_instagram_incremental()
results['Instagram'] = {'success': success, 'count': count, 'file': file}
time.sleep(2)
success, count, file = run_youtube_incremental()
results['YouTube'] = {'success': success, 'count': count, 'file': file}
time.sleep(2)
success, count, file = run_podcast_incremental()
results['Podcast'] = {'success': success, 'count': count, 'file': file}
# Also run MailChimp (already has cumulative support)
# ... (add MailChimp, WordPress, TikTok as needed)
# Sync to NAS
sync_to_nas_with_images()
# Summary
logger.info("\n" + "=" * 60)
logger.info("PRODUCTION SUMMARY")
logger.info("=" * 60)
for source, result in results.items():
if result['success']:
logger.info(f"{source}: {result['count']} items")
else:
logger.info(f" {source}: No new items")
logger.info("=" * 60)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,344 @@
#!/usr/bin/env python3
"""
Production script with comprehensive image downloading for all sources.
Downloads thumbnails and images from Instagram, YouTube, and Podcasts.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from src.youtube_api_scraper_with_thumbnails import YouTubeAPIScraperWithThumbnails
from src.mailchimp_api_scraper_v2 import MailChimpAPIScraper
from src.instagram_scraper_with_images import InstagramScraperWithImages
from src.rss_scraper_with_images import RSSScraperPodcastWithImages
from src.wordpress_scraper import WordPressScraper
from src.tiktok_scraper_advanced import TikTokScraperAdvanced
from src.base_scraper import ScraperConfig
from src.cumulative_markdown_manager import CumulativeMarkdownManager
from datetime import datetime
import pytz
import time
import logging
import subprocess
import os
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/production_with_images.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('production_with_images')
def get_atlantic_timestamp() -> str:
"""Get current timestamp in Atlantic timezone for file naming."""
tz = pytz.timezone('America/Halifax')
return datetime.now(tz).strftime('%Y-%m-%dT%H%M%S')
def run_youtube_with_thumbnails():
"""Run YouTube API scraper with thumbnail downloads."""
logger.info("=" * 60)
logger.info("YOUTUBE API SCRAPER WITH THUMBNAILS")
logger.info("=" * 60)
timestamp = get_atlantic_timestamp()
config = ScraperConfig(
source_name='YouTube',
brand_name='hvacnkowitall',
data_dir=Path('data'),
logs_dir=Path('logs'),
timezone='America/Halifax'
)
try:
scraper = YouTubeAPIScraperWithThumbnails(config)
# Fetch videos with thumbnails
logger.info("Fetching YouTube videos and downloading thumbnails...")
videos = scraper.fetch_content(max_posts=100) # Limit for testing
if videos:
# Process cumulative markdown
manager = CumulativeMarkdownManager(config)
output_file = manager.update_cumulative_file(videos, 'YouTube')
logger.info(f"✅ YouTube completed: {len(videos)} videos")
logger.info(f" Output: {output_file}")
# Count downloaded thumbnails
thumb_count = sum(1 for v in videos if v.get('local_thumbnail'))
logger.info(f" Thumbnails downloaded: {thumb_count}")
return True, len(videos), output_file
else:
logger.warning("No YouTube videos fetched")
return False, 0, None
except Exception as e:
logger.error(f"YouTube scraper error: {e}")
import traceback
traceback.print_exc()
return False, 0, None
def run_instagram_with_images():
"""Run Instagram scraper with image downloads."""
logger.info("=" * 60)
logger.info("INSTAGRAM SCRAPER WITH IMAGES")
logger.info("=" * 60)
if not os.getenv('INSTAGRAM_USERNAME'):
logger.warning("Instagram not configured (INSTAGRAM_USERNAME missing)")
return False, 0, None
timestamp = get_atlantic_timestamp()
config = ScraperConfig(
source_name='Instagram',
brand_name='hvacnkowitall',
data_dir=Path('data'),
logs_dir=Path('logs'),
timezone='America/Halifax'
)
try:
scraper = InstagramScraperWithImages(config)
# Fetch posts with images (limited for testing)
logger.info("Fetching Instagram posts and downloading images...")
items = scraper.fetch_content(max_posts=20) # Start with 20 for testing
if items:
# Process cumulative markdown
manager = CumulativeMarkdownManager(config)
output_file = manager.update_cumulative_file(items, 'Instagram')
logger.info(f"✅ Instagram completed: {len(items)} posts")
logger.info(f" Output: {output_file}")
# Count downloaded images
img_count = sum(len(item.get('local_images', [])) for item in items)
logger.info(f" Images downloaded: {img_count}")
return True, len(items), output_file
else:
logger.warning("No Instagram posts fetched")
return False, 0, None
except Exception as e:
logger.error(f"Instagram scraper error: {e}")
import traceback
traceback.print_exc()
return False, 0, None
def run_podcast_with_thumbnails():
"""Run Podcast RSS scraper with thumbnail downloads."""
logger.info("=" * 60)
logger.info("PODCAST RSS SCRAPER WITH THUMBNAILS")
logger.info("=" * 60)
if not os.getenv('PODCAST_RSS_URL'):
logger.warning("Podcast not configured (PODCAST_RSS_URL missing)")
return False, 0, None
timestamp = get_atlantic_timestamp()
config = ScraperConfig(
source_name='Podcast',
brand_name='hvacnkowitall',
data_dir=Path('data'),
logs_dir=Path('logs'),
timezone='America/Halifax'
)
try:
scraper = RSSScraperPodcastWithImages(config)
# Fetch episodes with thumbnails
logger.info("Fetching podcast episodes and downloading thumbnails...")
items = scraper.fetch_content(max_items=50) # Limit for testing
if items:
# Process cumulative markdown
manager = CumulativeMarkdownManager(config)
output_file = manager.update_cumulative_file(items, 'Podcast')
logger.info(f"✅ Podcast completed: {len(items)} episodes")
logger.info(f" Output: {output_file}")
# Count downloaded thumbnails
thumb_count = sum(1 for item in items if item.get('local_thumbnail'))
logger.info(f" Thumbnails downloaded: {thumb_count}")
return True, len(items), output_file
else:
logger.warning("No podcast episodes fetched")
return False, 0, None
except Exception as e:
logger.error(f"Podcast scraper error: {e}")
import traceback
traceback.print_exc()
return False, 0, None
def sync_to_nas_with_images():
"""Sync markdown files AND images to NAS."""
logger.info("\n" + "=" * 60)
logger.info("SYNCING TO NAS - MARKDOWN AND IMAGES")
logger.info("=" * 60)
nas_base = Path('/mnt/nas/hvacknowitall')
try:
# Sync markdown files
local_current = Path('data/markdown_current')
nas_current = nas_base / 'markdown_current'
if local_current.exists() and any(local_current.glob('*.md')):
nas_current.mkdir(parents=True, exist_ok=True)
# Sync markdown files
cmd = ['rsync', '-av', '--include=*.md', '--exclude=*',
str(local_current) + '/', str(nas_current) + '/']
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"✅ Markdown files synced to NAS: {nas_current}")
md_count = len(list(nas_current.glob('*.md')))
logger.info(f" Total markdown files: {md_count}")
else:
logger.warning(f"Markdown sync warning: {result.stderr}")
# Sync media files
local_media = Path('data/media')
nas_media = nas_base / 'media'
if local_media.exists():
nas_media.mkdir(parents=True, exist_ok=True)
# Sync all image files (jpg, jpeg, png, gif)
cmd = ['rsync', '-av',
'--include=*/',
'--include=*.jpg', '--include=*.jpeg',
'--include=*.png', '--include=*.gif',
'--exclude=*',
str(local_media) + '/', str(nas_media) + '/']
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"✅ Media files synced to NAS: {nas_media}")
# Count images per source
for source_dir in nas_media.glob('*'):
if source_dir.is_dir():
img_count = len(list(source_dir.glob('*.jpg'))) + \
len(list(source_dir.glob('*.jpeg'))) + \
len(list(source_dir.glob('*.png'))) + \
len(list(source_dir.glob('*.gif')))
if img_count > 0:
logger.info(f" {source_dir.name}: {img_count} images")
else:
logger.warning(f"Media sync warning: {result.stderr}")
# Sync archives
for source in ['YouTube', 'MailChimp', 'Instagram', 'Podcast', 'WordPress', 'TikTok']:
local_archive = Path(f'data/markdown_archives/{source}')
nas_archive = nas_base / f'markdown_archives/{source}'
if local_archive.exists() and any(local_archive.glob('*.md')):
nas_archive.mkdir(parents=True, exist_ok=True)
cmd = ['rsync', '-av', '--include=*.md', '--exclude=*',
str(local_archive) + '/', str(nas_archive) + '/']
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"{source} archives synced to NAS")
except Exception as e:
logger.error(f"Failed to sync to NAS: {e}")
def main():
"""Main production run with image downloads."""
logger.info("=" * 70)
logger.info("HVAC KNOW IT ALL - PRODUCTION WITH IMAGE DOWNLOADS")
logger.info("Downloads all thumbnails and images (no videos)")
logger.info("=" * 70)
atlantic_tz = pytz.timezone('America/Halifax')
start_time = datetime.now(atlantic_tz)
logger.info(f"Started at: {start_time.isoformat()}")
# Track results
results = {
'YouTube': {'success': False, 'count': 0, 'file': None},
'Instagram': {'success': False, 'count': 0, 'file': None},
'Podcast': {'success': False, 'count': 0, 'file': None}
}
# Run YouTube with thumbnails
success, count, output_file = run_youtube_with_thumbnails()
results['YouTube'] = {'success': success, 'count': count, 'file': output_file}
# Wait a bit between scrapers
time.sleep(2)
# Run Instagram with images
success, count, output_file = run_instagram_with_images()
results['Instagram'] = {'success': success, 'count': count, 'file': output_file}
# Wait a bit between scrapers
time.sleep(2)
# Run Podcast with thumbnails
success, count, output_file = run_podcast_with_thumbnails()
results['Podcast'] = {'success': success, 'count': count, 'file': output_file}
# Sync to NAS including images
sync_to_nas_with_images()
# Summary
end_time = datetime.now(atlantic_tz)
duration = (end_time - start_time).total_seconds()
logger.info("\n" + "=" * 60)
logger.info("PRODUCTION RUN SUMMARY")
logger.info("=" * 60)
for source, result in results.items():
if result['success']:
logger.info(f"{source}: {result['count']} items")
if result['file']:
logger.info(f" File: {result['file']}")
else:
logger.info(f"{source}: Failed")
# Count total images downloaded
media_dir = Path('data/media')
total_images = 0
if media_dir.exists():
for source_dir in media_dir.glob('*'):
if source_dir.is_dir():
img_count = len(list(source_dir.glob('*.jpg'))) + \
len(list(source_dir.glob('*.jpeg'))) + \
len(list(source_dir.glob('*.png'))) + \
len(list(source_dir.glob('*.gif')))
total_images += img_count
logger.info(f"\nTotal images downloaded: {total_images}")
logger.info(f"Duration: {duration:.1f} seconds")
logger.info("=" * 60)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,116 @@
"""
Instagram scraper with cumulative markdown support and image downloads.
"""
from typing import List, Dict, Any
from pathlib import Path
from src.instagram_scraper_with_images import InstagramScraperWithImages
from src.cumulative_markdown_manager import CumulativeMarkdownManager
class InstagramScraperCumulative(InstagramScraperWithImages):
"""Instagram scraper that uses cumulative markdown management."""
def __init__(self, config):
super().__init__(config)
self.cumulative_manager = CumulativeMarkdownManager(config)
def run_incremental(self, max_posts: int = 50) -> tuple:
"""Run incremental update with cumulative markdown."""
self.logger.info(f"Running Instagram incremental update (max {max_posts} posts)")
# Fetch new content
items = self.fetch_content(max_posts=max_posts)
if items:
# Update cumulative file
output_file = self.cumulative_manager.update_cumulative_file(items, 'Instagram')
self.logger.info(f"✅ Instagram incremental: {len(items)} posts")
self.logger.info(f" Updated: {output_file}")
# Count images
img_count = sum(len(item.get('local_images', [])) for item in items)
if img_count > 0:
self.logger.info(f" Images downloaded: {img_count}")
return True, len(items), output_file
else:
self.logger.warning("No new Instagram posts found")
return False, 0, None
def run_backlog(self, start_from: int = 0, max_posts: int = 1000) -> tuple:
"""Run backlog capture starting from a specific post number."""
self.logger.info(f"Running Instagram backlog (posts {start_from} to {start_from + max_posts})")
# For backlog, we need to skip already captured posts
# This is a simplified approach - in production you'd track exact post IDs
all_items = []
try:
# Get profile
profile = instaloader.Profile.from_username(self.loader.context, self.target_account)
self._check_rate_limit()
# Get posts
posts = profile.get_posts()
# Skip to start position
for i, post in enumerate(posts):
if i < start_from:
continue
if i >= start_from + max_posts:
break
try:
# Download images for this post
image_paths = self._download_post_images(post, post.shortcode)
# Extract post data
post_data = {
'id': post.shortcode,
'type': self._get_post_type(post),
'caption': post.caption if post.caption else '',
'author': post.owner_username,
'publish_date': post.date_utc.isoformat(),
'link': f'https://www.instagram.com/p/{post.shortcode}/',
'likes': post.likes,
'comments': post.comments,
'views': post.video_view_count if hasattr(post, 'video_view_count') else None,
'media_count': post.mediacount if hasattr(post, 'mediacount') else 1,
'hashtags': list(post.caption_hashtags) if post.caption else [],
'mentions': list(post.caption_mentions) if post.caption else [],
'is_video': getattr(post, 'is_video', False),
'local_images': image_paths
}
all_items.append(post_data)
# Rate limiting
self._aggressive_delay()
self._check_rate_limit()
# Progress
if len(all_items) % 10 == 0:
self.logger.info(f"Fetched {len(all_items)}/{max_posts} posts (starting from {start_from})")
except Exception as e:
self.logger.error(f"Error processing post: {e}")
continue
if all_items:
# Update cumulative file
output_file = self.cumulative_manager.update_cumulative_file(all_items, 'Instagram')
self.logger.info(f"✅ Instagram backlog: {len(all_items)} posts")
self.logger.info(f" Posts {start_from} to {start_from + len(all_items)}")
self.logger.info(f" Updated: {output_file}")
return True, len(all_items), output_file
else:
self.logger.warning(f"No posts fetched in range {start_from} to {start_from + max_posts}")
return False, 0, None
except Exception as e:
self.logger.error(f"Backlog error: {e}")
return False, 0, None

View file

@ -0,0 +1,300 @@
"""
Enhanced Instagram scraper that downloads all images (but not videos).
"""
import os
import time
import random
from typing import Any, Dict, List, Optional
from datetime import datetime
from pathlib import Path
import instaloader
from src.instagram_scraper import InstagramScraper
class InstagramScraperWithImages(InstagramScraper):
"""Instagram scraper that downloads all post images."""
def __init__(self, config):
super().__init__(config)
# Create media directory for Instagram
self.media_dir = self.config.data_dir / "media" / "Instagram"
self.media_dir.mkdir(parents=True, exist_ok=True)
self.logger.info(f"Instagram media directory: {self.media_dir}")
def _download_post_images(self, post, post_id: str) -> List[str]:
"""Download all images from a post (skip videos)."""
image_paths = []
try:
# Check if it's a video post - skip downloading video
if getattr(post, 'is_video', False):
# Videos might have a thumbnail we can grab
if hasattr(post, 'url'):
# This is usually the video thumbnail
thumbnail_url = post.url
local_path = self.download_media(
thumbnail_url,
f"instagram_{post_id}_video_thumb",
"image"
)
if local_path:
image_paths.append(local_path)
self.logger.info(f"Downloaded video thumbnail for {post_id}")
else:
# Single image or carousel
if hasattr(post, 'mediacount') and post.mediacount > 1:
# Carousel post with multiple images
image_num = 1
for node in post.get_sidecar_nodes():
# Skip video nodes in carousel
if not node.is_video:
image_url = node.display_url
local_path = self.download_media(
image_url,
f"instagram_{post_id}_image_{image_num}",
"image"
)
if local_path:
image_paths.append(local_path)
self.logger.info(f"Downloaded carousel image {image_num} for {post_id}")
image_num += 1
else:
# Single image post
if hasattr(post, 'url'):
image_url = post.url
local_path = self.download_media(
image_url,
f"instagram_{post_id}_image",
"image"
)
if local_path:
image_paths.append(local_path)
self.logger.info(f"Downloaded image for {post_id}")
except Exception as e:
self.logger.error(f"Error downloading images for post {post_id}: {e}")
return image_paths
def fetch_posts(self, max_posts: int = 20) -> List[Dict[str, Any]]:
"""Fetch posts from Instagram profile with image downloads."""
posts_data = []
try:
# Ensure we have a valid context
if not self.loader.context:
self.logger.warning("Instagram context not initialized, attempting re-login")
self._login()
if not self.loader.context:
self.logger.error("Failed to initialize Instagram context")
return posts_data
self.logger.info(f"Fetching posts with images from @{self.target_account}")
# Get profile
profile = instaloader.Profile.from_username(self.loader.context, self.target_account)
self._check_rate_limit()
# Get posts
posts = profile.get_posts()
count = 0
for post in posts:
if count >= max_posts:
break
try:
# Download images for this post
image_paths = self._download_post_images(post, post.shortcode)
# Extract post data
post_data = {
'id': post.shortcode,
'type': self._get_post_type(post),
'caption': post.caption if post.caption else '',
'author': post.owner_username,
'publish_date': post.date_utc.isoformat(),
'link': f'https://www.instagram.com/p/{post.shortcode}/',
'likes': post.likes,
'comments': post.comments,
'views': post.video_view_count if hasattr(post, 'video_view_count') else None,
'media_count': post.mediacount if hasattr(post, 'mediacount') else 1,
'hashtags': list(post.caption_hashtags) if post.caption else [],
'mentions': list(post.caption_mentions) if post.caption else [],
'is_video': getattr(post, 'is_video', False),
'local_images': image_paths # Add downloaded image paths
}
posts_data.append(post_data)
count += 1
# Aggressive rate limiting between posts
self._aggressive_delay()
self._check_rate_limit()
# Log progress
if count % 5 == 0:
self.logger.info(f"Fetched {count}/{max_posts} posts with images")
except Exception as e:
self.logger.error(f"Error processing post: {e}")
continue
self.logger.info(f"Successfully fetched {len(posts_data)} posts with images")
except Exception as e:
self.logger.error(f"Error fetching posts: {e}")
return posts_data
def fetch_stories(self) -> List[Dict[str, Any]]:
"""Fetch stories from Instagram profile with image downloads."""
stories_data = []
try:
# Ensure we have a valid context
if not self.loader.context:
self.logger.warning("Instagram context not initialized, attempting re-login")
self._login()
if not self.loader.context:
self.logger.error("Failed to initialize Instagram context")
return stories_data
self.logger.info(f"Fetching stories with images from @{self.target_account}")
# Get profile
profile = instaloader.Profile.from_username(self.loader.context, self.target_account)
self._check_rate_limit()
# Get user ID for stories
userid = profile.userid
# Get stories
for story in self.loader.get_stories(userids=[userid]):
for item in story:
try:
# Download story image (skip video stories)
image_paths = []
if not item.is_video and hasattr(item, 'url'):
local_path = self.download_media(
item.url,
f"instagram_{item.mediaid}_story",
"image"
)
if local_path:
image_paths.append(local_path)
self.logger.info(f"Downloaded story image {item.mediaid}")
story_data = {
'id': item.mediaid,
'type': 'story',
'caption': '', # Stories usually don't have captions
'author': item.owner_username,
'publish_date': item.date_utc.isoformat(),
'link': f'https://www.instagram.com/stories/{item.owner_username}/{item.mediaid}/',
'is_video': item.is_video if hasattr(item, 'is_video') else False,
'local_images': image_paths # Add downloaded image paths
}
stories_data.append(story_data)
# Rate limiting
self._aggressive_delay()
self._check_rate_limit()
except Exception as e:
self.logger.error(f"Error processing story: {e}")
continue
self.logger.info(f"Successfully fetched {len(stories_data)} stories with images")
except Exception as e:
self.logger.error(f"Error fetching stories: {e}")
return stories_data
def format_markdown(self, items: List[Dict[str, Any]]) -> str:
"""Format Instagram content as markdown with image references."""
markdown_sections = []
for item in items:
section = []
# ID
section.append(f"# ID: {item.get('id', 'N/A')}")
section.append("")
# Type
section.append(f"## Type: {item.get('type', 'post')}")
section.append("")
# Link
section.append(f"## Link: {item.get('link', '')}")
section.append("")
# Author
section.append(f"## Author: {item.get('author', 'N/A')}")
section.append("")
# Publish Date
section.append(f"## Publish Date: {item.get('publish_date', 'N/A')}")
section.append("")
# Caption
if item.get('caption'):
section.append("## Caption:")
section.append(item['caption'])
section.append("")
# Engagement metrics
if item.get('likes') is not None:
section.append(f"## Likes: {item.get('likes', 0)}")
section.append("")
if item.get('comments') is not None:
section.append(f"## Comments: {item.get('comments', 0)}")
section.append("")
if item.get('views') is not None:
section.append(f"## Views: {item.get('views', 0)}")
section.append("")
# Local images
if item.get('local_images'):
section.append("## Downloaded Images:")
for img_path in item['local_images']:
# Convert to relative path for markdown
rel_path = Path(img_path).relative_to(self.config.data_dir)
section.append(f"- [{rel_path.name}]({rel_path})")
section.append("")
# Hashtags
if item.get('hashtags'):
section.append(f"## Hashtags: {' '.join(['#' + tag for tag in item['hashtags']])}")
section.append("")
# Mentions
if item.get('mentions'):
section.append(f"## Mentions: {' '.join(['@' + mention for mention in item['mentions']])}")
section.append("")
# Media count
if item.get('media_count') and item['media_count'] > 1:
section.append(f"## Media Count: {item['media_count']}")
section.append("")
# Is video
if item.get('is_video'):
section.append("## Media Type: Video (thumbnail downloaded)")
section.append("")
# Separator
section.append("-" * 50)
section.append("")
markdown_sections.append('\n'.join(section))
return '\n'.join(markdown_sections)

View file

@ -0,0 +1,152 @@
"""
Enhanced RSS scrapers that download podcast episode thumbnails.
"""
from typing import Dict, List, Any, Optional
from pathlib import Path
from src.rss_scraper import RSSScraperPodcast, RSSScraperMailChimp
class RSSScraperPodcastWithImages(RSSScraperPodcast):
"""Podcast RSS scraper that downloads episode thumbnails."""
def __init__(self, config):
super().__init__(config)
# Create media directory for Podcast
self.media_dir = self.config.data_dir / "media" / "Podcast"
self.media_dir.mkdir(parents=True, exist_ok=True)
self.logger.info(f"Podcast media directory: {self.media_dir}")
def _download_episode_thumbnail(self, episode_id: str, image_url: str) -> Optional[str]:
"""Download podcast episode thumbnail."""
if not image_url:
return None
try:
# Clean episode ID for filename
safe_id = episode_id.replace('/', '_').replace('\\', '_')[:50]
local_path = self.download_media(
image_url,
f"podcast_{safe_id}_thumbnail",
"image"
)
if local_path:
self.logger.info(f"Downloaded thumbnail for episode {safe_id}")
return local_path
except Exception as e:
self.logger.error(f"Error downloading thumbnail for {episode_id}: {e}")
return None
def fetch_content(self, max_items: Optional[int] = None) -> List[Dict[str, Any]]:
"""Fetch RSS feed content with thumbnail downloads."""
items = super().fetch_content(max_items)
# Download thumbnails for each episode
for item in items:
image_url = self.extract_image_link(item)
if image_url:
episode_id = item.get('id') or item.get('guid', 'unknown')
local_thumbnail = self._download_episode_thumbnail(episode_id, image_url)
item['local_thumbnail'] = local_thumbnail
item['thumbnail_url'] = image_url
# Also store audio link for reference (but don't download)
audio_link = self.extract_audio_link(item)
if audio_link:
item['audio_url'] = audio_link
return items
def format_markdown(self, items: List[Dict[str, Any]]) -> str:
"""Format podcast items as markdown with thumbnail references."""
markdown_sections = []
for item in items:
section = []
# ID
item_id = item.get('id') or item.get('guid', 'N/A')
section.append(f"# ID: {item_id}")
section.append("")
# Title
title = item.get('title', 'Untitled')
section.append(f"## Title: {title}")
section.append("")
# Type
section.append("## Type: podcast")
section.append("")
# Link
link = item.get('link', '')
section.append(f"## Link: {link}")
section.append("")
# Audio URL
if item.get('audio_url'):
section.append(f"## Audio: {item['audio_url']}")
section.append("")
# Publish Date
pub_date = item.get('published') or item.get('pubDate', '')
section.append(f"## Publish Date: {pub_date}")
section.append("")
# Duration
duration = item.get('itunes_duration', '')
if duration:
section.append(f"## Duration: {duration}")
section.append("")
# Thumbnail
if item.get('local_thumbnail'):
section.append("## Thumbnail:")
# Convert to relative path for markdown
rel_path = Path(item['local_thumbnail']).relative_to(self.config.data_dir)
section.append(f"![Thumbnail]({rel_path})")
section.append("")
elif item.get('thumbnail_url'):
section.append(f"## Thumbnail URL: {item['thumbnail_url']}")
section.append("")
# Description
section.append("## Description:")
# Try to get full content first, then summary, then description
content = item.get('content')
if content and isinstance(content, list) and len(content) > 0:
content_html = content[0].get('value', '')
if content_html:
content_md = self.convert_to_markdown(content_html)
section.append(content_md)
elif item.get('summary'):
summary_md = self.convert_to_markdown(item.get('summary'))
section.append(summary_md)
elif item.get('description'):
desc_md = self.convert_to_markdown(item.get('description'))
section.append(desc_md)
section.append("")
# iTunes metadata if available
if item.get('itunes_author'):
section.append(f"## Author: {item['itunes_author']}")
section.append("")
if item.get('itunes_episode'):
section.append(f"## Episode Number: {item['itunes_episode']}")
section.append("")
if item.get('itunes_season'):
section.append(f"## Season: {item['itunes_season']}")
section.append("")
# Separator
section.append("-" * 50)
section.append("")
markdown_sections.append('\n'.join(section))
return '\n'.join(markdown_sections)

View file

@ -0,0 +1,222 @@
"""
Enhanced YouTube API scraper that downloads video thumbnails.
"""
from typing import List, Dict, Any, Optional
from pathlib import Path
from src.youtube_api_scraper_v2 import YouTubeAPIScraper
class YouTubeAPIScraperWithThumbnails(YouTubeAPIScraper):
"""YouTube API scraper that downloads video thumbnails."""
def __init__(self, config):
super().__init__(config)
# Create media directory for YouTube
self.media_dir = self.config.data_dir / "media" / "YouTube"
self.media_dir.mkdir(parents=True, exist_ok=True)
self.logger.info(f"YouTube media directory: {self.media_dir}")
def _download_thumbnail(self, video_id: str, thumbnail_url: str) -> Optional[str]:
"""Download video thumbnail."""
if not thumbnail_url:
return None
try:
local_path = self.download_media(
thumbnail_url,
f"youtube_{video_id}_thumbnail",
"image"
)
if local_path:
self.logger.info(f"Downloaded thumbnail for video {video_id}")
return local_path
except Exception as e:
self.logger.error(f"Error downloading thumbnail for {video_id}: {e}")
return None
def fetch_content(self, max_posts: int = None, fetch_captions: bool = True) -> List[Dict[str, Any]]:
"""Fetch YouTube videos with thumbnail downloads."""
# Call parent method to get videos
videos = super().fetch_content(max_posts, fetch_captions)
# Download thumbnails for each video
for video in videos:
if video.get('thumbnail'):
local_thumbnail = self._download_thumbnail(video['id'], video['thumbnail'])
video['local_thumbnail'] = local_thumbnail
return videos
def fetch_video_details(self, video_ids: List[str]) -> List[Dict[str, Any]]:
"""Fetch detailed video information with thumbnail downloads."""
if not video_ids:
return []
# YouTube API allows max 50 videos per request
batch_size = 50
all_videos = []
for i in range(0, len(video_ids), batch_size):
batch = video_ids[i:i + batch_size]
# Check quota (1 unit per request)
if not self._track_quota('videos_list'):
self.logger.warning("Quota limit reached while fetching video details")
break
try:
response = self.youtube.videos().list(
part='snippet,statistics,contentDetails',
id=','.join(batch)
).execute()
for video in response.get('items', []):
# Get thumbnail URL (highest quality available)
thumbnail_url = (
video['snippet']['thumbnails'].get('maxres', {}).get('url') or
video['snippet']['thumbnails'].get('high', {}).get('url') or
video['snippet']['thumbnails'].get('medium', {}).get('url') or
video['snippet']['thumbnails'].get('default', {}).get('url', '')
)
# Download thumbnail
local_thumbnail = self._download_thumbnail(video['id'], thumbnail_url)
video_data = {
'id': video['id'],
'title': video['snippet']['title'],
'description': video['snippet']['description'],
'published_at': video['snippet']['publishedAt'],
'channel_id': video['snippet']['channelId'],
'channel_title': video['snippet']['channelTitle'],
'tags': video['snippet'].get('tags', []),
'duration': video['contentDetails']['duration'],
'definition': video['contentDetails']['definition'],
'caption': video['contentDetails'].get('caption', 'false'),
'thumbnail': thumbnail_url,
'local_thumbnail': local_thumbnail, # Add local thumbnail path
# Statistics
'view_count': int(video['statistics'].get('viewCount', 0)),
'like_count': int(video['statistics'].get('likeCount', 0)),
'comment_count': int(video['statistics'].get('commentCount', 0)),
# Calculate engagement metrics
'engagement_rate': 0,
'like_ratio': 0
}
# Calculate engagement metrics
if video_data['view_count'] > 0:
video_data['engagement_rate'] = (
(video_data['like_count'] + video_data['comment_count']) /
video_data['view_count']
) * 100
video_data['like_ratio'] = (video_data['like_count'] / video_data['view_count']) * 100
all_videos.append(video_data)
# Small delay to be respectful
import time
time.sleep(0.1)
except Exception as e:
self.logger.error(f"Error fetching video details: {e}")
return all_videos
def format_markdown(self, videos: List[Dict[str, Any]]) -> str:
"""Format videos as markdown with thumbnail references."""
markdown_sections = []
for video in videos:
section = []
# ID
section.append(f"# ID: {video.get('id', 'N/A')}")
section.append("")
# Title
section.append(f"## Title: {video.get('title', 'Untitled')}")
section.append("")
# Type
section.append("## Type: video")
section.append("")
# Link
section.append(f"## Link: https://www.youtube.com/watch?v={video.get('id', '')}")
section.append("")
# Channel
section.append(f"## Channel: {video.get('channel_title', 'N/A')}")
section.append("")
# Published Date
section.append(f"## Published: {video.get('published_at', 'N/A')}")
section.append("")
# Duration
if video.get('duration'):
section.append(f"## Duration: {video['duration']}")
section.append("")
# Description
if video.get('description'):
section.append("## Description:")
section.append(video['description'][:1000]) # Limit description length
if len(video.get('description', '')) > 1000:
section.append("... [truncated]")
section.append("")
# Statistics
section.append("## Statistics:")
section.append(f"- Views: {video.get('view_count', 0):,}")
section.append(f"- Likes: {video.get('like_count', 0):,}")
section.append(f"- Comments: {video.get('comment_count', 0):,}")
section.append(f"- Engagement Rate: {video.get('engagement_rate', 0):.2f}%")
section.append(f"- Like Ratio: {video.get('like_ratio', 0):.2f}%")
section.append("")
# Caption/Transcript
if video.get('caption_text'):
section.append("## Transcript:")
# Show first 500 chars of transcript
transcript_preview = video['caption_text'][:500]
section.append(transcript_preview)
if len(video.get('caption_text', '')) > 500:
section.append("... [See full transcript below]")
section.append("")
# Add full transcript at the end
section.append("### Full Transcript:")
section.append(video['caption_text'])
section.append("")
elif video.get('caption') == 'true':
section.append("## Captions: Available (not fetched)")
section.append("")
# Thumbnail
if video.get('local_thumbnail'):
section.append("## Thumbnail:")
# Convert to relative path for markdown
rel_path = Path(video['local_thumbnail']).relative_to(self.config.data_dir)
section.append(f"![Thumbnail]({rel_path})")
section.append("")
elif video.get('thumbnail'):
section.append(f"## Thumbnail URL: {video['thumbnail']}")
section.append("")
# Tags
if video.get('tags'):
section.append(f"## Tags: {', '.join(video['tags'][:10])}") # Limit to 10 tags
section.append("")
# Separator
section.append("-" * 50)
section.append("")
markdown_sections.append('\n'.join(section))
return '\n'.join(markdown_sections)