diff --git a/README.md b/README.md new file mode 100644 index 0000000..2e9d072 --- /dev/null +++ b/README.md @@ -0,0 +1,212 @@ +# HVAC Know It All Content Aggregation System + +A containerized Python application that aggregates content from multiple HVAC Know It All sources, converts them to markdown format, and syncs to a NAS. + +## Features + +- **Multi-source content aggregation** from YouTube, Instagram, TikTok, MailChimp, WordPress, and Podcast RSS +- **Cumulative markdown management** - Single source-of-truth files that grow with backlog and incremental updates +- **API integrations** for YouTube Data API v3 and MailChimp API +- **Intelligent content merging** with caption/transcript updates and metric tracking +- **Automated NAS synchronization** to `/mnt/nas/hvacknowitall/` +- **State management** for incremental updates +- **Parallel processing** for multiple sources +- **Atlantic timezone** (America/Halifax) timestamps + +## Cumulative Markdown System + +### Overview +The system maintains a single markdown file per source that combines: +- Initial backlog content (historical data) +- Daily incremental updates (new content) +- Content updates (new captions, updated metrics) + +### How It Works + +1. **Initial Backlog**: First run creates base file with all historical content +2. **Daily Incremental**: Subsequent runs merge new content into existing file +3. **Smart Merging**: Updates existing entries when better data is available (captions, transcripts, metrics) +4. **Archival**: Previous versions archived with timestamps for history + +### File Naming Convention +``` +__.md +Example: hvacnkowitall_YouTube_2025-08-19T143045.md +``` + +## Quick Start + +### Installation + +```bash +# Install UV package manager +pip install uv + +# Install dependencies +uv pip install -r requirements.txt +``` + +### Configuration + +Create `.env` file with credentials: +```env +# YouTube +YOUTUBE_API_KEY=your_api_key + +# MailChimp +MAILCHIMP_API_KEY=your_api_key +MAILCHIMP_SERVER_PREFIX=us10 + +# Instagram +INSTAGRAM_USERNAME=username +INSTAGRAM_PASSWORD=password + +# WordPress +WORDPRESS_USERNAME=username +WORDPRESS_API_KEY=api_key +``` + +### Running + +```bash +# Run all scrapers (parallel) +uv run python run_all_scrapers.py + +# Run single source +uv run python -m src.youtube_api_scraper_v2 + +# Test cumulative mode +uv run python test_cumulative_mode.py + +# Consolidate existing files +uv run python consolidate_current_files.py +``` + +## Architecture + +### Core Components + +- **BaseScraper**: Abstract base class for all scrapers +- **BaseScraperCumulative**: Enhanced base with cumulative support +- **CumulativeMarkdownManager**: Handles intelligent file merging +- **ContentOrchestrator**: Manages parallel scraper execution + +### Data Flow + +``` +1. Scraper fetches content (checks state for incremental) +2. CumulativeMarkdownManager loads existing file +3. Merges new content (adds new, updates existing) +4. Archives previous version +5. Saves updated file with current timestamp +6. Updates state for next run +``` + +### Directory Structure + +``` +data/ +├── markdown_current/ # Current single-source-of-truth files +├── markdown_archives/ # Historical versions by source +│ ├── YouTube/ +│ ├── Instagram/ +│ └── ... +├── media/ # Downloaded media files +└── .state/ # State files for incremental updates + +logs/ # Log files by source +src/ # Source code +tests/ # Test files +``` + +## API Quota Management + +### YouTube Data API v3 +- **Daily Limit**: 10,000 units +- **Usage Strategy**: 95% daily quota for captions +- **Costs**: + - videos.list: 1 unit + - captions.list: 50 units + - channels.list: 1 unit + +### Rate Limiting +- Instagram: 200 posts/hour +- YouTube: Respects API quotas +- General: Exponential backoff with retry + +## Production Deployment + +### Systemd Services + +Services are configured in `/etc/systemd/system/`: +- `hvac-content-8am.service` - Morning run +- `hvac-content-12pm.service` - Noon run +- `hvac-content-8am.timer` - Morning schedule +- `hvac-content-12pm.timer` - Noon schedule + +### Manual Deployment + +```bash +# Start services +sudo systemctl start hvac-content-8am.timer +sudo systemctl start hvac-content-12pm.timer + +# Enable on boot +sudo systemctl enable hvac-content-8am.timer +sudo systemctl enable hvac-content-12pm.timer + +# Check status +sudo systemctl status hvac-content-*.timer +``` + +## Monitoring + +```bash +# View logs +journalctl -u hvac-content-8am -f + +# Check file growth +ls -lh data/markdown_current/ + +# View statistics +uv run python -c "from src.cumulative_markdown_manager import CumulativeMarkdownManager; ..." +``` + +## Testing + +```bash +# Run all tests +uv run pytest + +# Test specific scraper +uv run pytest tests/test_youtube_scraper.py + +# Test cumulative mode +uv run python test_cumulative_mode.py +``` + +## Troubleshooting + +### Common Issues + +1. **Instagram Rate Limiting**: Scraper implements humanized delays (18-22 seconds between requests) +2. **YouTube Quota Exceeded**: Wait until next day, quota resets at midnight Pacific +3. **NAS Permission Errors**: Warnings are normal, files still sync successfully +4. **Missing Captions**: Use YouTube Data API instead of youtube-transcript-api + +### Debug Commands + +```bash +# Check scraper state +cat data/.state/*_state.json + +# View recent logs +tail -f logs/YouTube/youtube_*.log + +# Test single source +uv run python -m src.youtube_api_scraper_v2 --test +``` + +## License + +Private repository - All rights reserved \ No newline at end of file diff --git a/consolidate_current_files.py b/consolidate_current_files.py new file mode 100644 index 0000000..c16603e --- /dev/null +++ b/consolidate_current_files.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python3 +""" +Consolidate multiple markdown files per source into single current files +Combines backlog data and incremental updates into one source of truth +Follows project specification naming: hvacnkowitall__.md +""" + +import sys +from pathlib import Path +sys.path.insert(0, str(Path(__file__).parent)) + +from datetime import datetime +import pytz +import re +from typing import Dict, List, Set +import logging + +# Set up logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('logs/consolidation.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger('consolidator') + + +def get_atlantic_timestamp() -> str: + """Get current timestamp in Atlantic timezone.""" + tz = pytz.timezone('America/Halifax') + return datetime.now(tz).strftime('%Y-%m-%dT%H%M%S') + + +def parse_markdown_sections(content: str) -> List[Dict]: + """Parse markdown content into sections by ID.""" + sections = [] + + # Split by ID headers + parts = content.split('# ID: ') + + for part in parts[1:]: # Skip first empty part + if not part.strip(): + continue + + lines = part.strip().split('\n') + section_id = lines[0].strip() + + # Get the full section content + section_content = f"# ID: {section_id}\n" + '\n'.join(lines[1:]) + + sections.append({ + 'id': section_id, + 'content': section_content + }) + + return sections + + +def consolidate_source_files(source_name: str) -> bool: + """Consolidate all files for a specific source into one current file.""" + logger.info(f"Consolidating {source_name} files...") + + current_dir = Path('data/markdown_current') + archives_dir = Path('data/markdown_archives') + + # Find all files for this source + pattern = f"hvacnkowitall_{source_name}_*.md" + current_files = list(current_dir.glob(pattern)) + + # Also check for files with different naming (like captions files) + alt_patterns = [ + f"*{source_name}*.md", + f"hvacnkowitall_{source_name.lower()}_*.md" + ] + + for alt_pattern in alt_patterns: + current_files.extend(current_dir.glob(alt_pattern)) + + # Remove duplicates + current_files = list(set(current_files)) + + if not current_files: + logger.warning(f"No files found for source: {source_name}") + return False + + logger.info(f"Found {len(current_files)} files for {source_name}: {[f.name for f in current_files]}") + + # Track unique sections by ID + sections_by_id: Dict[str, Dict] = {} + all_sections = [] + + # Process each file + for file_path in current_files: + logger.info(f"Processing {file_path.name}...") + + try: + content = file_path.read_text(encoding='utf-8') + sections = parse_markdown_sections(content) + + logger.info(f" Found {len(sections)} sections") + + # Add sections, preferring newer data + for section in sections: + section_id = section['id'] + + # If we haven't seen this ID, add it + if section_id not in sections_by_id: + sections_by_id[section_id] = section + all_sections.append(section) + else: + # Check if this version has more content (like captions) + old_content = sections_by_id[section_id]['content'] + new_content = section['content'] + + # Prefer content with captions/more detail + if ('Caption Status:' in new_content and 'Caption Status:' not in old_content) or \ + len(new_content) > len(old_content): + logger.info(f" Updating section {section_id} with more detailed content") + # Update in place + for i, existing in enumerate(all_sections): + if existing['id'] == section_id: + all_sections[i] = section + sections_by_id[section_id] = section + break + + except Exception as e: + logger.error(f"Error processing {file_path}: {e}") + continue + + if not all_sections: + logger.warning(f"No sections found for {source_name}") + return False + + # Create consolidated content + consolidated_content = [] + + # Sort sections by ID for consistency + all_sections.sort(key=lambda x: x['id']) + + for section in all_sections: + consolidated_content.append(section['content']) + consolidated_content.append("") # Add separator + + # Generate new filename following project specification + timestamp = get_atlantic_timestamp() + new_filename = f"hvacnkowitall_{source_name}_{timestamp}.md" + new_file_path = current_dir / new_filename + + # Save consolidated file + final_content = '\n'.join(consolidated_content) + new_file_path.write_text(final_content, encoding='utf-8') + + logger.info(f"Created consolidated file: {new_filename}") + logger.info(f" Total sections: {len(all_sections)}") + logger.info(f" File size: {len(final_content):,} characters") + + # Archive old files + archive_source_dir = archives_dir / source_name + archive_source_dir.mkdir(parents=True, exist_ok=True) + + archived_count = 0 + for old_file in current_files: + if old_file.name != new_filename: # Don't archive the new file + try: + archive_path = archive_source_dir / old_file.name + old_file.rename(archive_path) + archived_count += 1 + logger.info(f" Archived: {old_file.name}") + except Exception as e: + logger.error(f"Error archiving {old_file.name}: {e}") + + logger.info(f"Archived {archived_count} old files for {source_name}") + + # Create copy in archives as well + archive_current_path = archive_source_dir / new_filename + archive_current_path.write_text(final_content, encoding='utf-8') + + return True + + +def main(): + """Main consolidation function.""" + logger.info("=" * 60) + logger.info("CONSOLIDATING CURRENT MARKDOWN FILES") + logger.info("=" * 60) + + # Create directories if needed + Path('data/markdown_current').mkdir(parents=True, exist_ok=True) + Path('data/markdown_archives').mkdir(parents=True, exist_ok=True) + Path('logs').mkdir(parents=True, exist_ok=True) + + # Define sources to consolidate + sources = ['YouTube', 'MailChimp', 'Instagram', 'TikTok', 'Podcast'] + + consolidated = [] + failed = [] + + for source in sources: + logger.info(f"\n{'-' * 40}") + try: + if consolidate_source_files(source): + consolidated.append(source) + else: + failed.append(source) + except Exception as e: + logger.error(f"Failed to consolidate {source}: {e}") + failed.append(source) + + logger.info(f"\n{'=' * 60}") + logger.info("CONSOLIDATION SUMMARY") + logger.info(f"{'=' * 60}") + logger.info(f"Successfully consolidated: {consolidated}") + logger.info(f"Failed/No data: {failed}") + + # List final current files + current_files = list(Path('data/markdown_current').glob('*.md')) + logger.info(f"\nFinal current files:") + for file in sorted(current_files): + size = file.stat().st_size + logger.info(f" {file.name} ({size:,} bytes)") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/continue_youtube_captions.py b/continue_youtube_captions.py new file mode 100644 index 0000000..8b4c4bb --- /dev/null +++ b/continue_youtube_captions.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python3 +""" +Continue YouTube caption fetching using remaining quota +Fetches captions for videos 50-188 (next 139 videos by view count) +Uses up to 95% of daily quota (9,500 units) +""" + +import sys +from pathlib import Path +sys.path.insert(0, str(Path(__file__).parent)) + +from src.youtube_api_scraper_v2 import YouTubeAPIScraper +from src.base_scraper import ScraperConfig +from datetime import datetime +import pytz +import time +import json +import logging + +# Set up logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('logs/youtube_caption_continue.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger('youtube_captions') + + +def load_existing_videos(): + """Load existing video data from the latest markdown file.""" + latest_file = Path('data/markdown_current/hvacnkowitall_YouTube_2025-08-19T100336.md') + + if not latest_file.exists(): + logger.error(f"Latest YouTube file not found: {latest_file}") + return [] + + # Parse the markdown to extract video data + content = latest_file.read_text(encoding='utf-8') + videos = [] + + # Simple parsing - split by video sections + sections = content.split('# ID: ') + + for section in sections[1:]: # Skip first empty section + lines = section.strip().split('\n') + if not lines: + continue + + video_id = lines[0].strip() + video_data = {'id': video_id} + + # Parse basic info + for line in lines: + if line.startswith('## Title: '): + video_data['title'] = line.replace('## Title: ', '') + elif line.startswith('## Views: '): + views_str = line.replace('## Views: ', '').replace(',', '') + video_data['view_count'] = int(views_str) if views_str.isdigit() else 0 + elif line.startswith('## Caption Status:'): + video_data['has_caption_info'] = True + + videos.append(video_data) + + logger.info(f"Loaded {len(videos)} videos from existing file") + return videos + + +def continue_caption_fetching(): + """Continue fetching captions from where we left off.""" + logger.info("=" * 60) + logger.info("CONTINUING YOUTUBE CAPTION FETCHING") + logger.info("=" * 60) + + # Load existing video data + videos = load_existing_videos() + + if not videos: + logger.error("No existing videos found to continue from") + return False + + # Sort by view count (descending) + videos.sort(key=lambda x: x.get('view_count', 0), reverse=True) + + # Count how many already have captions + with_captions = sum(1 for v in videos if v.get('has_caption_info')) + without_captions = [v for v in videos if not v.get('has_caption_info')] + + logger.info(f"Current status:") + logger.info(f" Total videos: {len(videos)}") + logger.info(f" Already have captions: {with_captions}") + logger.info(f" Need captions: {len(without_captions)}") + + # Calculate quota + quota_used_so_far = 2519 # From previous run + daily_limit = 10000 + target_usage = int(daily_limit * 0.95) # 95% = 9,500 units + available_quota = target_usage - quota_used_so_far + + logger.info(f"Quota analysis:") + logger.info(f" Daily limit: {daily_limit:,} units") + logger.info(f" Already used: {quota_used_so_far:,} units") + logger.info(f" Target (95%): {target_usage:,} units") + logger.info(f" Available: {available_quota:,} units") + + # Calculate how many more videos we can caption + max_additional_captions = available_quota // 50 # 50 units per video + videos_to_caption = without_captions[:max_additional_captions] + + logger.info(f"Caption plan:") + logger.info(f" Videos to caption now: {len(videos_to_caption)}") + logger.info(f" Estimated quota cost: {len(videos_to_caption) * 50:,} units") + logger.info(f" Will use: {quota_used_so_far + (len(videos_to_caption) * 50):,} units total") + + if not videos_to_caption: + logger.info("No additional videos to caption within quota limits") + return True + + # Set up scraper + config = ScraperConfig( + source_name='YouTube', + brand_name='hvacnkowitall', + data_dir=Path('data/markdown_current'), + logs_dir=Path('logs/YouTube'), + timezone='America/Halifax' + ) + + scraper = YouTubeAPIScraper(config) + scraper.quota_used = quota_used_so_far # Set initial quota usage + + logger.info(f"Starting caption fetching for {len(videos_to_caption)} videos...") + start_time = time.time() + + captions_found = 0 + for i, video in enumerate(videos_to_caption, 1): + video_id = video['id'] + title = video.get('title', 'Unknown')[:50] + + logger.info(f"[{i}/{len(videos_to_caption)}] Fetching caption for: {title}...") + + # Fetch caption info + caption_info = scraper._fetch_caption_text(video_id) + + if caption_info: + video['caption_text'] = caption_info + captions_found += 1 + logger.info(f" ✅ Caption found") + else: + logger.info(f" ❌ No caption available") + + # Add delay to be respectful + time.sleep(0.5) + + # Check if we're approaching quota limit + if scraper.quota_used >= target_usage: + logger.warning(f"Reached 95% quota limit at video {i}") + break + + elapsed = time.time() - start_time + + logger.info(f"Caption fetching complete!") + logger.info(f" Duration: {elapsed:.1f} seconds") + logger.info(f" Captions found: {captions_found}") + logger.info(f" Quota used: {scraper.quota_used:,}/{daily_limit:,} units") + logger.info(f" Quota percentage: {(scraper.quota_used/daily_limit)*100:.1f}%") + + # Update the video data with new caption info + video_lookup = {v['id']: v for v in videos} + for video in videos_to_caption: + if video['id'] in video_lookup and video.get('caption_text'): + video_lookup[video['id']]['caption_text'] = video['caption_text'] + + # Save updated data + timestamp = datetime.now(pytz.timezone('America/Halifax')).strftime('%Y-%m-%dT%H%M%S') + updated_filename = f"hvacnkowitall_YouTube_{timestamp}_captions.md" + + # Generate updated markdown (simplified version) + markdown_sections = [] + for video in videos: + section = [] + section.append(f"# ID: {video['id']}") + section.append("") + section.append(f"## Title: {video.get('title', 'Unknown')}") + section.append("") + section.append(f"## Views: {video.get('view_count', 0):,}") + section.append("") + + # Caption status + if video.get('caption_text'): + section.append("## Caption Status:") + section.append(video['caption_text']) + section.append("") + elif video.get('has_caption_info'): + section.append("## Caption Status:") + section.append("[Captions available - ]") + section.append("") + + section.append("-" * 50) + section.append("") + markdown_sections.append('\n'.join(section)) + + # Save updated file + output_file = Path(f'data/markdown_current/{updated_filename}') + output_file.write_text('\n'.join(markdown_sections), encoding='utf-8') + + logger.info(f"Updated file saved: {output_file}") + + # Calculate remaining work + total_with_captions = with_captions + captions_found + remaining_videos = len(videos) - total_with_captions + + logger.info(f"Progress summary:") + logger.info(f" Total videos: {len(videos)}") + logger.info(f" Captioned: {total_with_captions}") + logger.info(f" Remaining: {remaining_videos}") + logger.info(f" Progress: {(total_with_captions/len(videos))*100:.1f}%") + + if remaining_videos > 0: + days_needed = (remaining_videos // 190) + (1 if remaining_videos % 190 else 0) + logger.info(f" Estimated days to complete: {days_needed}") + + return True + + +if __name__ == "__main__": + success = continue_caption_fetching() + sys.exit(0 if success else 1) \ No newline at end of file diff --git a/docs/cumulative_markdown.md b/docs/cumulative_markdown.md new file mode 100644 index 0000000..18d66b0 --- /dev/null +++ b/docs/cumulative_markdown.md @@ -0,0 +1,188 @@ +# Cumulative Markdown System Documentation + +## Overview + +The cumulative markdown system maintains a single, continuously growing markdown file per content source that intelligently combines backlog data with incremental daily updates. + +## Problem It Solves + +Previously, each scraper run created entirely new files: +- Backlog runs created large initial files +- Incremental updates created small separate files +- No merging of content between files +- Multiple files per source made it hard to find the "current" state + +## Solution Architecture + +### CumulativeMarkdownManager + +Core class that handles: +1. **Loading** existing markdown files +2. **Parsing** content into sections by unique ID +3. **Merging** new content with existing sections +4. **Updating** sections when better data is available +5. **Archiving** previous versions for history +6. **Saving** updated single-source-of-truth file + +### Merge Logic + +The system uses intelligent merging based on content quality: + +```python +def should_update_section(old_section, new_section): + # Update if new has captions/transcripts that old doesn't + if new_has_captions and not old_has_captions: + return True + + # Update if new has significantly more content + if new_description_length > old_description_length * 1.2: + return True + + # Update if metrics have increased + if new_views > old_views: + return True + + return False +``` + +## Usage Patterns + +### Initial Backlog Capture + +```python +# Day 1 - First run captures all historical content +scraper.fetch_content(max_posts=1000) +# Creates: hvacnkowitall_YouTube_20250819T080000.md (444 videos) +``` + +### Daily Incremental Update + +```python +# Day 2 - Fetch only new content since last run +scraper.fetch_content() # Uses state to get only new items +# Loads existing file, merges new content +# Updates: hvacnkowitall_YouTube_20250819T120000.md (449 videos) +``` + +### Caption/Transcript Enhancement + +```python +# Day 3 - Fetch captions for existing videos +youtube_scraper.fetch_captions(video_ids) +# Loads existing file, updates videos with caption data +# Updates: hvacnkowitall_YouTube_20250819T080000.md (449 videos, 200 with captions) +``` + +## File Management + +### Naming Convention +``` +hvacnkowitall__.md +``` +- Brand name is always lowercase +- Source name is TitleCase +- Timestamp in Atlantic timezone + +### Archive Strategy +``` +Current: + hvacnkowitall_YouTube_20250819T143045.md (latest) + +Archives: + YouTube/ + hvacnkowitall_YouTube_20250819T080000_archived_20250819_120000.md + hvacnkowitall_YouTube_20250819T120000_archived_20250819_143045.md +``` + +## Implementation Details + +### Section Structure + +Each content item is a section with unique ID: +```markdown +# ID: video_abc123 + +## Title: Video Title + +## Views: 1,234 + +## Description: +Full description text... + +## Caption Status: +Caption text if available... + +## Publish Date: 2024-01-15 + +-------------------------------------------------- +``` + +### Merge Process + +1. **Parse** both existing and new content into sections +2. **Index** by unique ID (video ID, post ID, etc.) +3. **Compare** sections with same ID +4. **Update** if new version is better +5. **Add** new sections not in existing file +6. **Sort** by date (newest first) or maintain order +7. **Save** combined content with new timestamp + +### State Management + +State files track last processed item for incremental updates: +```json +{ + "last_video_id": "abc123", + "last_video_date": "2024-01-20", + "last_sync": "2024-01-20T12:00:00", + "total_processed": 449 +} +``` + +## Benefits + +1. **Single Source of Truth**: One file per source with all content +2. **Automatic Updates**: Existing entries enhanced with new data +3. **Efficient Storage**: No duplicate content across files +4. **Complete History**: Archives preserve all versions +5. **Incremental Growth**: Files grow naturally over time +6. **Smart Merging**: Best version of each entry is preserved + +## Migration from Separate Files + +Use the consolidation script to migrate existing separate files: + +```bash +# Consolidate all existing files into cumulative format +uv run python consolidate_current_files.py +``` + +This will: +1. Find all files for each source +2. Parse and merge by content ID +3. Create single cumulative file +4. Archive old separate files + +## Testing + +Test the cumulative workflow: + +```bash +uv run python test_cumulative_mode.py +``` + +This demonstrates: +- Initial backlog capture (5 items) +- First incremental update (+2 items = 7 total) +- Second incremental with updates (1 updated, +1 new = 8 total) +- Proper archival of previous versions + +## Future Enhancements + +Potential improvements: +1. Conflict resolution strategies (user choice on updates) +2. Differential backups (only store changes) +3. Compression of archived versions +4. Metrics tracking across versions +5. Automatic cleanup of old archives +6. API endpoint to query cumulative statistics \ No newline at end of file diff --git a/docs/status.md b/docs/status.md index 893ded3..c58292f 100644 --- a/docs/status.md +++ b/docs/status.md @@ -1,11 +1,11 @@ # HVAC Know It All Content Aggregation - Project Status -## Current Status: 🟢 PRODUCTION DEPLOYED +## Current Status: 🟢 PRODUCTION READY **Project Completion: 100%** **All 6 Sources: ✅ Working** -**Deployment: 🚀 In Production** -**Last Updated: 2025-08-18 23:15 ADT** +**Deployment: 🚀 Production Ready** +**Last Updated: 2025-08-19 10:50 ADT** --- @@ -13,18 +13,34 @@ | Source | Status | Last Tested | Items Fetched | Notes | |--------|--------|-------------|---------------|-------| -| WordPress Blog | ✅ Working | 2025-08-18 | 139 posts | HTML cleaning implemented, clean markdown output | -| MailChimp RSS | ⚠️ SSL Error | 2025-08-18 | 0 entries | Provider SSL issue, not a code problem | -| Podcast RSS | ✅ Working | 2025-08-18 | 428 episodes | Full backlog captured successfully | -| YouTube | ✅ Working | 2025-08-18 | 200 videos | Channel scraping with metadata | -| Instagram | 🔄 Processing | 2025-08-18 | 45/1000 posts | Rate: 200/hr, ETA: 3:54 AM | -| TikTok | ⏳ Queued | 2025-08-18 | 0/1000 videos | Starts after Instagram completes | +| YouTube | ✅ API Working | 2025-08-19 | 444 videos | API integration, 179/444 with captions (40.3%) | +| MailChimp | ✅ API Working | 2025-08-19 | 22 campaigns | API integration, cleaned content | +| TikTok | ✅ Working | 2025-08-19 | 35 videos | All available videos captured | +| Podcast RSS | ✅ Working | 2025-08-19 | 428 episodes | Full backlog captured | +| WordPress Blog | ✅ Working | 2025-08-18 | 139 posts | HTML cleaning implemented | +| Instagram | 🔄 Processing | 2025-08-19 | ~555/1000 posts | Long-running backlog capture | --- +## Latest Updates (2025-08-19) + +### 🆕 Cumulative Markdown System +- **Single Source of Truth**: One continuously growing file per source +- **Intelligent Merging**: Updates existing entries with new data (captions, metrics) +- **Backlog + Incremental**: Properly combines historical and daily updates +- **Smart Updates**: Prefers content with captions/transcripts over without +- **Archive Management**: Previous versions timestamped in archives + +### 🆕 API Integrations +- **YouTube Data API v3**: Replaced yt-dlp with official API +- **MailChimp API**: Replaced RSS feed with API integration +- **Caption Support**: YouTube captions via Data API (50 units/video) +- **Content Cleaning**: MailChimp headers/footers removed + ## Technical Implementation ### ✅ Core Features Complete +- **Cumulative Markdown**: Single growing file per source with intelligent merging - **Incremental Updates**: All scrapers support state-based incremental fetching - **Archive Management**: Previous files automatically archived with timestamps - **Markdown Conversion**: All content properly converted to markdown format diff --git a/run_api_production_v2.py b/run_api_production_v2.py new file mode 100755 index 0000000..51d9584 --- /dev/null +++ b/run_api_production_v2.py @@ -0,0 +1,304 @@ +#!/usr/bin/env python3 +""" +Production script for API-based content scraping - Version 2 +Follows project specification file/folder naming conventions +Captures YouTube videos with captions and MailChimp campaigns with cleaned content +""" + +import sys +from pathlib import Path +sys.path.insert(0, str(Path(__file__).parent)) + +from src.youtube_api_scraper_v2 import YouTubeAPIScraper +from src.mailchimp_api_scraper_v2 import MailChimpAPIScraper +from src.base_scraper import ScraperConfig +from datetime import datetime +import pytz +import time +import logging +import subprocess + +# Set up logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('logs/api_production_v2.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger('api_production_v2') + + +def get_atlantic_timestamp() -> str: + """Get current timestamp in Atlantic timezone for file naming.""" + tz = pytz.timezone('America/Halifax') + return datetime.now(tz).strftime('%Y-%m-%dT%H%M%S') + + +def run_youtube_api_production(): + """Run YouTube API scraper for production backlog with captions.""" + logger.info("=" * 60) + logger.info("YOUTUBE API SCRAPER - PRODUCTION V2") + logger.info("=" * 60) + + timestamp = get_atlantic_timestamp() + + # Follow project specification directory structure + config = ScraperConfig( + source_name='YouTube', # Capitalized per spec + brand_name='hvacnkowitall', + data_dir=Path('data/markdown_current'), + logs_dir=Path('logs/YouTube'), + timezone='America/Halifax' + ) + + try: + scraper = YouTubeAPIScraper(config) + + logger.info("Starting YouTube API fetch with captions for all videos...") + start = time.time() + + # Fetch all videos WITH captions for top 50 (use more quota) + videos = scraper.fetch_content(fetch_captions=True) + + elapsed = time.time() - start + logger.info(f"Fetched {len(videos)} videos in {elapsed:.1f} seconds") + + if videos: + # Statistics + total_views = sum(v.get('view_count', 0) for v in videos) + total_likes = sum(v.get('like_count', 0) for v in videos) + with_captions = sum(1 for v in videos if v.get('caption_text')) + + logger.info(f"Statistics:") + logger.info(f" Total videos: {len(videos)}") + logger.info(f" Total views: {total_views:,}") + logger.info(f" Total likes: {total_likes:,}") + logger.info(f" Videos with captions: {with_captions}") + logger.info(f" Quota used: {scraper.quota_used}/{scraper.daily_quota_limit} units") + + # Save with project specification naming: __.md + filename = f"hvacnkowitall_YouTube_{timestamp}.md" + markdown = scraper.format_markdown(videos) + output_file = Path(f'data/markdown_current/{filename}') + output_file.parent.mkdir(parents=True, exist_ok=True) + output_file.write_text(markdown, encoding='utf-8') + logger.info(f"Markdown saved to: {output_file}") + + # Create archive copy + archive_dir = Path('data/markdown_archives/YouTube') + archive_dir.mkdir(parents=True, exist_ok=True) + archive_file = archive_dir / filename + archive_file.write_text(markdown, encoding='utf-8') + logger.info(f"Archive copy saved to: {archive_file}") + + # Update state file + state = scraper.load_state() + state = scraper.update_state(state, videos) + scraper.save_state(state) + logger.info("State file updated for incremental updates") + + return True, len(videos), output_file + else: + logger.error("No videos fetched from YouTube API") + return False, 0, None + + except Exception as e: + logger.error(f"YouTube API scraper failed: {e}") + return False, 0, None + + +def run_mailchimp_api_production(): + """Run MailChimp API scraper for production backlog with cleaned content.""" + logger.info("\n" + "=" * 60) + logger.info("MAILCHIMP API SCRAPER - PRODUCTION V2") + logger.info("=" * 60) + + timestamp = get_atlantic_timestamp() + + # Follow project specification directory structure + config = ScraperConfig( + source_name='MailChimp', # Capitalized per spec + brand_name='hvacnkowitall', + data_dir=Path('data/markdown_current'), + logs_dir=Path('logs/MailChimp'), + timezone='America/Halifax' + ) + + try: + scraper = MailChimpAPIScraper(config) + + logger.info("Starting MailChimp API fetch with content cleaning...") + start = time.time() + + # Fetch all campaigns from Bi-Weekly Newsletter folder + campaigns = scraper.fetch_content(max_items=1000) + + elapsed = time.time() - start + logger.info(f"Fetched {len(campaigns)} campaigns in {elapsed:.1f} seconds") + + if campaigns: + # Statistics + total_sent = sum(c.get('metrics', {}).get('emails_sent', 0) for c in campaigns) + total_opens = sum(c.get('metrics', {}).get('unique_opens', 0) for c in campaigns) + total_clicks = sum(c.get('metrics', {}).get('unique_clicks', 0) for c in campaigns) + + logger.info(f"Statistics:") + logger.info(f" Total campaigns: {len(campaigns)}") + logger.info(f" Total emails sent: {total_sent:,}") + logger.info(f" Total unique opens: {total_opens:,}") + logger.info(f" Total unique clicks: {total_clicks:,}") + + if campaigns: + avg_open_rate = sum(c.get('metrics', {}).get('open_rate', 0) for c in campaigns) / len(campaigns) + avg_click_rate = sum(c.get('metrics', {}).get('click_rate', 0) for c in campaigns) / len(campaigns) + logger.info(f" Average open rate: {avg_open_rate*100:.1f}%") + logger.info(f" Average click rate: {avg_click_rate*100:.1f}%") + + # Save with project specification naming: __.md + filename = f"hvacnkowitall_MailChimp_{timestamp}.md" + markdown = scraper.format_markdown(campaigns) + output_file = Path(f'data/markdown_current/{filename}') + output_file.parent.mkdir(parents=True, exist_ok=True) + output_file.write_text(markdown, encoding='utf-8') + logger.info(f"Markdown saved to: {output_file}") + + # Create archive copy + archive_dir = Path('data/markdown_archives/MailChimp') + archive_dir.mkdir(parents=True, exist_ok=True) + archive_file = archive_dir / filename + archive_file.write_text(markdown, encoding='utf-8') + logger.info(f"Archive copy saved to: {archive_file}") + + # Update state file + state = scraper.load_state() + state = scraper.update_state(state, campaigns) + scraper.save_state(state) + logger.info("State file updated for incremental updates") + + return True, len(campaigns), output_file + else: + logger.warning("No campaigns found in MailChimp") + return True, 0, None + + except Exception as e: + logger.error(f"MailChimp API scraper failed: {e}") + return False, 0, None + + +def sync_to_nas(): + """Sync API scraper results to NAS following project structure.""" + logger.info("\n" + "=" * 60) + logger.info("SYNCING TO NAS - PROJECT STRUCTURE") + logger.info("=" * 60) + + nas_base = Path('/mnt/nas/hvacknowitall') + + try: + # Sync all markdown_current files + local_current = Path('data/markdown_current') + nas_current = nas_base / 'markdown_current' + + if local_current.exists() and any(local_current.glob('*.md')): + # Create destination if needed + nas_current.mkdir(parents=True, exist_ok=True) + + # Sync all current markdown files + cmd = ['rsync', '-av', '--include=*.md', '--exclude=*', + str(local_current) + '/', str(nas_current) + '/'] + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode == 0: + logger.info(f"✅ Current markdown files synced to NAS: {nas_current}") + # List synced files + for md_file in nas_current.glob('*.md'): + size = md_file.stat().st_size / 1024 # KB + logger.info(f" - {md_file.name} ({size:.0f}KB)") + else: + logger.warning(f"Sync warning: {result.stderr}") + else: + logger.info("No current markdown files to sync") + + # Sync archives + for source in ['YouTube', 'MailChimp']: + local_archive = Path(f'data/markdown_archives/{source}') + nas_archive = nas_base / f'markdown_archives/{source}' + + if local_archive.exists() and any(local_archive.glob('*.md')): + nas_archive.mkdir(parents=True, exist_ok=True) + + cmd = ['rsync', '-av', '--include=*.md', '--exclude=*', + str(local_archive) + '/', str(nas_archive) + '/'] + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode == 0: + logger.info(f"✅ {source} archives synced to NAS: {nas_archive}") + else: + logger.warning(f"{source} archive sync warning: {result.stderr}") + + except Exception as e: + logger.error(f"Failed to sync to NAS: {e}") + + +def main(): + """Main production run with project specification compliance.""" + logger.info("=" * 70) + logger.info("HVAC KNOW IT ALL - API SCRAPERS PRODUCTION V2") + logger.info("Following Project Specification Standards") + logger.info("=" * 70) + + atlantic_tz = pytz.timezone('America/Halifax') + start_time = datetime.now(atlantic_tz) + logger.info(f"Started at: {start_time.isoformat()}") + + # Track results + results = { + 'YouTube': {'success': False, 'count': 0, 'file': None}, + 'MailChimp': {'success': False, 'count': 0, 'file': None} + } + + # Run YouTube API scraper with captions + success, count, output_file = run_youtube_api_production() + results['YouTube'] = {'success': success, 'count': count, 'file': output_file} + + # Run MailChimp API scraper with content cleaning + success, count, output_file = run_mailchimp_api_production() + results['MailChimp'] = {'success': success, 'count': count, 'file': output_file} + + # Sync to NAS + sync_to_nas() + + # Summary + end_time = datetime.now(atlantic_tz) + duration = end_time - start_time + + logger.info("\n" + "=" * 70) + logger.info("PRODUCTION V2 SUMMARY") + logger.info("=" * 70) + + for source, result in results.items(): + status = "✅" if result['success'] else "❌" + logger.info(f"{status} {source}: {result['count']} items") + if result['file']: + logger.info(f" Output: {result['file']}") + + logger.info(f"\nTotal duration: {duration.total_seconds():.1f} seconds") + logger.info(f"Completed at: {end_time.isoformat()}") + + # Project specification compliance + logger.info("\nPROJECT SPECIFICATION COMPLIANCE:") + logger.info("✅ File naming: hvacnkowitall__.md") + logger.info("✅ Directory structure: data/markdown_current/, data/markdown_archives/") + logger.info("✅ Capitalized source names: YouTube, MailChimp") + logger.info("✅ Atlantic timezone timestamps") + logger.info("✅ Archive copies created") + logger.info("✅ State files for incremental updates") + + # Return success if at least one scraper succeeded + return any(r['success'] for r in results.values()) + + +if __name__ == "__main__": + success = main() + sys.exit(0 if success else 1) \ No newline at end of file diff --git a/src/base_scraper_cumulative.py b/src/base_scraper_cumulative.py new file mode 100644 index 0000000..67c424e --- /dev/null +++ b/src/base_scraper_cumulative.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +""" +Enhanced Base Scraper with Cumulative Markdown Support +Extension of base_scraper.py that adds cumulative markdown functionality +""" + +from src.base_scraper import BaseScraper +from src.cumulative_markdown_manager import CumulativeMarkdownManager +from pathlib import Path +from typing import List, Dict, Any, Optional + + +class BaseScraperCumulative(BaseScraper): + """Base scraper with cumulative markdown support.""" + + def __init__(self, config, use_cumulative: bool = True): + """Initialize with optional cumulative mode.""" + super().__init__(config) + self.use_cumulative = use_cumulative + + if self.use_cumulative: + self.cumulative_manager = CumulativeMarkdownManager(config, self.logger) + self.logger.info("Initialized with cumulative markdown mode") + + def save_content(self, items: List[Dict[str, Any]]) -> Optional[Path]: + """Save content using either cumulative or traditional mode.""" + if not items: + self.logger.warning("No items to save") + return None + + if self.use_cumulative: + # Use cumulative manager + return self.cumulative_manager.save_cumulative( + items, + self.format_markdown + ) + else: + # Use traditional save (creates new file each time) + markdown = self.format_markdown(items) + return self.save_markdown(markdown) + + def run(self) -> Optional[Path]: + """Run the scraper with cumulative support.""" + try: + self.logger.info(f"Starting {self.config.source_name} scraper " + f"(cumulative={self.use_cumulative})") + + # Fetch content (will check state for incremental) + items = self.fetch_content() + + if not items: + self.logger.info("No new content found") + return None + + self.logger.info(f"Fetched {len(items)} items") + + # Save content (cumulative or traditional) + filepath = self.save_content(items) + + # Update state for next incremental run + if items and filepath: + self.update_state(items) + + # Log statistics if cumulative + if self.use_cumulative: + stats = self.cumulative_manager.get_statistics(filepath) + self.logger.info(f"Cumulative stats: {stats}") + + return filepath + + except Exception as e: + self.logger.error(f"Error in scraper run: {e}") + raise + + def get_cumulative_stats(self) -> Dict[str, int]: + """Get statistics about the cumulative file.""" + if not self.use_cumulative: + return {} + + return self.cumulative_manager.get_statistics() \ No newline at end of file diff --git a/src/cumulative_markdown_manager.py b/src/cumulative_markdown_manager.py new file mode 100644 index 0000000..5302a0e --- /dev/null +++ b/src/cumulative_markdown_manager.py @@ -0,0 +1,273 @@ +#!/usr/bin/env python3 +""" +Cumulative Markdown Manager +Maintains a single, growing markdown file per source that combines: +- Initial backlog content +- Daily incremental updates +- Updates to existing entries (e.g., new captions, updated metrics) +""" + +from pathlib import Path +from datetime import datetime +from typing import Dict, List, Optional, Any +import pytz +import logging +import shutil +import re + + +class CumulativeMarkdownManager: + """Manages cumulative markdown files that grow with each update.""" + + def __init__(self, config, logger: Optional[logging.Logger] = None): + """Initialize with scraper config.""" + self.config = config + self.logger = logger or logging.getLogger(self.__class__.__name__) + self.tz = pytz.timezone(config.timezone) + + # Paths + self.current_dir = config.data_dir / "markdown_current" + self.archive_dir = config.data_dir / "markdown_archives" / config.source_name.title() + + # Ensure directories exist + self.current_dir.mkdir(parents=True, exist_ok=True) + self.archive_dir.mkdir(parents=True, exist_ok=True) + + # File pattern for this source + self.file_pattern = f"{config.brand_name}_{config.source_name}_*.md" + + def get_current_file(self) -> Optional[Path]: + """Find the current markdown file for this source.""" + files = list(self.current_dir.glob(self.file_pattern)) + if not files: + return None + + # Return the most recent file (by filename timestamp) + files.sort(reverse=True) + return files[0] + + def parse_markdown_sections(self, content: str) -> Dict[str, Dict]: + """Parse markdown content into sections indexed by ID.""" + sections = {} + + # Split by ID headers + parts = content.split('# ID: ') + + for part in parts[1:]: # Skip first empty part + if not part.strip(): + continue + + lines = part.strip().split('\n') + section_id = lines[0].strip() + + # Reconstruct full section content + section_content = f"# ID: {section_id}\n" + '\n'.join(lines[1:]) + + # Extract metadata for comparison + metadata = self._extract_metadata(section_content) + + sections[section_id] = { + 'id': section_id, + 'content': section_content, + 'metadata': metadata + } + + return sections + + def _extract_metadata(self, content: str) -> Dict[str, Any]: + """Extract metadata from section content for comparison.""" + metadata = {} + + # Extract common fields + patterns = { + 'views': r'## Views?:\s*([0-9,]+)', + 'likes': r'## Likes?:\s*([0-9,]+)', + 'comments': r'## Comments?:\s*([0-9,]+)', + 'publish_date': r'## Publish(?:ed)? Date:\s*([^\n]+)', + 'has_caption': r'## Caption Status:', + 'has_transcript': r'## Transcript:', + 'description_length': r'## Description:\n(.+?)(?:\n##|\n---|\Z)', + } + + for key, pattern in patterns.items(): + match = re.search(pattern, content, re.DOTALL | re.IGNORECASE) + if match: + if key in ['views', 'likes', 'comments']: + # Convert numeric fields + metadata[key] = int(match.group(1).replace(',', '')) + elif key in ['has_caption', 'has_transcript']: + # Boolean fields + metadata[key] = True + elif key == 'description_length': + # Calculate length of description + metadata[key] = len(match.group(1).strip()) + else: + metadata[key] = match.group(1).strip() + + return metadata + + def should_update_section(self, old_section: Dict, new_section: Dict) -> bool: + """Determine if a section should be updated with new content.""" + old_meta = old_section.get('metadata', {}) + new_meta = new_section.get('metadata', {}) + + # Update if new section has captions/transcripts that old doesn't + if new_meta.get('has_caption') and not old_meta.get('has_caption'): + return True + if new_meta.get('has_transcript') and not old_meta.get('has_transcript'): + return True + + # Update if new section has more content + old_desc_len = old_meta.get('description_length', 0) + new_desc_len = new_meta.get('description_length', 0) + if new_desc_len > old_desc_len * 1.2: # 20% more content + return True + + # Update if metrics have changed significantly (for incremental updates) + for metric in ['views', 'likes', 'comments']: + old_val = old_meta.get(metric, 0) + new_val = new_meta.get(metric, 0) + if new_val > old_val: + return True + + # Update if content is substantially different + if len(new_section['content']) > len(old_section['content']) * 1.1: + return True + + return False + + def merge_content(self, existing_sections: Dict[str, Dict], + new_items: List[Dict[str, Any]], + formatter_func) -> str: + """Merge new content with existing sections.""" + # Convert new items to sections + new_content = formatter_func(new_items) + new_sections = self.parse_markdown_sections(new_content) + + # Track updates + added_count = 0 + updated_count = 0 + + # Merge sections + for section_id, new_section in new_sections.items(): + if section_id in existing_sections: + # Update existing section if newer/better + if self.should_update_section(existing_sections[section_id], new_section): + existing_sections[section_id] = new_section + updated_count += 1 + self.logger.info(f"Updated section: {section_id}") + else: + # Add new section + existing_sections[section_id] = new_section + added_count += 1 + self.logger.debug(f"Added new section: {section_id}") + + self.logger.info(f"Merge complete: {added_count} added, {updated_count} updated") + + # Reconstruct markdown content + # Sort by ID to maintain consistent order + sorted_sections = sorted(existing_sections.values(), + key=lambda x: x['id']) + + # For sources with dates, sort by date (newest first) + # Try to extract date from content for better sorting + for section in sorted_sections: + date_match = re.search(r'## Publish(?:ed)? Date:\s*([^\n]+)', + section['content']) + if date_match: + try: + # Parse various date formats + date_str = date_match.group(1).strip() + # Add parsed date for sorting + section['sort_date'] = date_str + except: + pass + + # Sort by date if available, otherwise by ID + if any('sort_date' in s for s in sorted_sections): + sorted_sections.sort(key=lambda x: x.get('sort_date', ''), reverse=True) + + # Combine sections + combined_content = [] + for section in sorted_sections: + combined_content.append(section['content']) + combined_content.append("") # Empty line between sections + + return '\n'.join(combined_content) + + def save_cumulative(self, new_items: List[Dict[str, Any]], + formatter_func) -> Path: + """Save content cumulatively, merging with existing file if present.""" + current_file = self.get_current_file() + + if current_file and current_file.exists(): + # Load and merge with existing content + self.logger.info(f"Loading existing file: {current_file.name}") + existing_content = current_file.read_text(encoding='utf-8') + existing_sections = self.parse_markdown_sections(existing_content) + + # Merge new items with existing sections + merged_content = self.merge_content(existing_sections, new_items, + formatter_func) + + # Archive the current file before overwriting + self._archive_file(current_file) + + else: + # First time - just format the new items + self.logger.info("No existing file, creating new cumulative file") + merged_content = formatter_func(new_items) + + # Generate new filename with current timestamp + timestamp = datetime.now(self.tz).strftime('%Y-%m-%dT%H%M%S') + filename = f"{self.config.brand_name}_{self.config.source_name}_{timestamp}.md" + filepath = self.current_dir / filename + + # Save merged content + filepath.write_text(merged_content, encoding='utf-8') + self.logger.info(f"Saved cumulative file: {filename}") + + # Remove old file if it exists (we archived it already) + if current_file and current_file.exists() and current_file != filepath: + current_file.unlink() + self.logger.debug(f"Removed old file: {current_file.name}") + + return filepath + + def _archive_file(self, file_path: Path) -> None: + """Archive a file with timestamp suffix.""" + if not file_path.exists(): + return + + # Add archive timestamp to filename + archive_time = datetime.now(self.tz).strftime('%Y%m%d_%H%M%S') + archive_name = f"{file_path.stem}_archived_{archive_time}{file_path.suffix}" + archive_path = self.archive_dir / archive_name + + # Copy to archive + shutil.copy2(file_path, archive_path) + self.logger.debug(f"Archived to: {archive_path.name}") + + def get_statistics(self, file_path: Optional[Path] = None) -> Dict[str, int]: + """Get statistics about the cumulative file.""" + if not file_path: + file_path = self.get_current_file() + + if not file_path or not file_path.exists(): + return {'total_sections': 0} + + content = file_path.read_text(encoding='utf-8') + sections = self.parse_markdown_sections(content) + + stats = { + 'total_sections': len(sections), + 'with_captions': sum(1 for s in sections.values() + if s['metadata'].get('has_caption')), + 'with_transcripts': sum(1 for s in sections.values() + if s['metadata'].get('has_transcript')), + 'total_views': sum(s['metadata'].get('views', 0) + for s in sections.values()), + 'file_size_kb': file_path.stat().st_size // 1024 + } + + return stats \ No newline at end of file diff --git a/src/mailchimp_api_scraper_v2.py b/src/mailchimp_api_scraper_v2.py new file mode 100644 index 0000000..761b1de --- /dev/null +++ b/src/mailchimp_api_scraper_v2.py @@ -0,0 +1,410 @@ +#!/usr/bin/env python3 +""" +MailChimp API scraper for fetching campaign data and metrics +Fetches only campaigns from "Bi-Weekly Newsletter" folder +Cleans headers and footers from content +""" + +import os +import time +import requests +import re +from typing import Any, Dict, List, Optional +from datetime import datetime +from src.base_scraper import BaseScraper, ScraperConfig +import logging + + +class MailChimpAPIScraper(BaseScraper): + """MailChimp API scraper for campaigns and metrics.""" + + def __init__(self, config: ScraperConfig): + super().__init__(config) + + self.api_key = os.getenv('MAILCHIMP_API_KEY') + self.server_prefix = os.getenv('MAILCHIMP_SERVER_PREFIX', 'us10') + + if not self.api_key: + raise ValueError("MAILCHIMP_API_KEY not found in environment variables") + + self.base_url = f"https://{self.server_prefix}.api.mailchimp.com/3.0" + self.headers = { + 'Authorization': f'Bearer {self.api_key}', + 'Content-Type': 'application/json' + } + + # Cache folder ID for "Bi-Weekly Newsletter" + self.target_folder_id = None + self.target_folder_name = "Bi-Weekly Newsletter" + + self.logger.info(f"Initialized MailChimp API scraper for server: {self.server_prefix}") + + def _clean_content(self, content: str) -> str: + """Clean unwanted headers and footers from MailChimp content.""" + if not content: + return content + + # Patterns to remove + patterns_to_remove = [ + # Header patterns + r'VIEW THIS EMAIL IN BROWSER[^\n]*\n?', + r'\(\*\|ARCHIVE\|\*\)[^\n]*\n?', + r'https://hvacknowitall\.com/?\n?', + + # Footer patterns + r'Newsletter produced by Teal Maker[^\n]*\n?', + r'https://tealmaker\.com[^\n]*\n?', + r'https://open\.spotify\.com[^\n]*\n?', + r'https://www\.instagram\.com[^\n]*\n?', + r'https://www\.youtube\.com[^\n]*\n?', + r'https://www\.facebook\.com[^\n]*\n?', + r'https://x\.com[^\n]*\n?', + r'https://www\.linkedin\.com[^\n]*\n?', + r'Copyright \(C\)[^\n]*\n?', + r'\*\|CURRENT_YEAR\|\*[^\n]*\n?', + r'\*\|LIST:COMPANY\|\*[^\n]*\n?', + r'\*\|IFNOT:ARCHIVE_PAGE\|\*[^\n]*\*\|END:IF\|\*\n?', + r'\*\|LIST:DESCRIPTION\|\*[^\n]*\n?', + r'\*\|LIST_ADDRESS\|\*[^\n]*\n?', + r'Our mailing address is:[^\n]*\n?', + r'Want to change how you receive these emails\?[^\n]*\n?', + r'You can update your preferences[^\n]*\n?', + r'\(\*\|UPDATE_PROFILE\|\*\)[^\n]*\n?', + r'or unsubscribe[^\n]*\n?', + r'\(\*\|UNSUB\|\*\)[^\n]*\n?', + + # Clean up multiple newlines + r'\n{3,}', + ] + + cleaned = content + for pattern in patterns_to_remove: + cleaned = re.sub(pattern, '', cleaned, flags=re.MULTILINE | re.IGNORECASE) + + # Clean up multiple newlines (replace with double newline) + cleaned = re.sub(r'\n{3,}', '\n\n', cleaned) + + # Trim whitespace + cleaned = cleaned.strip() + + return cleaned + + def _test_connection(self) -> bool: + """Test API connection.""" + try: + response = requests.get(f"{self.base_url}/ping", headers=self.headers) + if response.status_code == 200: + self.logger.info("MailChimp API connection successful") + return True + else: + self.logger.error(f"MailChimp API connection failed: {response.status_code}") + return False + except Exception as e: + self.logger.error(f"MailChimp API connection error: {e}") + return False + + def _get_folder_id(self) -> Optional[str]: + """Get the folder ID for 'Bi-Weekly Newsletter'.""" + if self.target_folder_id: + return self.target_folder_id + + try: + response = requests.get( + f"{self.base_url}/campaign-folders", + headers=self.headers, + params={'count': 100} + ) + + if response.status_code == 200: + folders_data = response.json() + for folder in folders_data.get('folders', []): + if folder['name'] == self.target_folder_name: + self.target_folder_id = folder['id'] + self.logger.info(f"Found '{self.target_folder_name}' folder: {self.target_folder_id}") + return self.target_folder_id + + self.logger.warning(f"'{self.target_folder_name}' folder not found") + else: + self.logger.error(f"Failed to fetch folders: {response.status_code}") + + except Exception as e: + self.logger.error(f"Error fetching folders: {e}") + + return None + + def _fetch_campaign_content(self, campaign_id: str) -> Optional[Dict[str, Any]]: + """Fetch campaign content.""" + try: + response = requests.get( + f"{self.base_url}/campaigns/{campaign_id}/content", + headers=self.headers + ) + + if response.status_code == 200: + return response.json() + else: + self.logger.warning(f"Failed to fetch content for campaign {campaign_id}: {response.status_code}") + return None + + except Exception as e: + self.logger.error(f"Error fetching campaign content: {e}") + return None + + def _fetch_campaign_report(self, campaign_id: str) -> Optional[Dict[str, Any]]: + """Fetch campaign report with metrics.""" + try: + response = requests.get( + f"{self.base_url}/reports/{campaign_id}", + headers=self.headers + ) + + if response.status_code == 200: + return response.json() + else: + self.logger.warning(f"Failed to fetch report for campaign {campaign_id}: {response.status_code}") + return None + + except Exception as e: + self.logger.error(f"Error fetching campaign report: {e}") + return None + + def fetch_content(self, max_items: int = None) -> List[Dict[str, Any]]: + """Fetch campaigns from MailChimp API.""" + + # Test connection first + if not self._test_connection(): + self.logger.error("Failed to connect to MailChimp API") + return [] + + # Get folder ID + folder_id = self._get_folder_id() + + # Prepare parameters + params = { + 'count': max_items or 1000, # Default to 1000 if not specified + 'status': 'sent', # Only sent campaigns + 'sort_field': 'send_time', + 'sort_dir': 'DESC' + } + + if folder_id: + params['folder_id'] = folder_id + self.logger.info(f"Fetching campaigns from '{self.target_folder_name}' folder") + else: + self.logger.info("Fetching all sent campaigns") + + try: + response = requests.get( + f"{self.base_url}/campaigns", + headers=self.headers, + params=params + ) + + if response.status_code != 200: + self.logger.error(f"Failed to fetch campaigns: {response.status_code}") + return [] + + campaigns_data = response.json() + campaigns = campaigns_data.get('campaigns', []) + + self.logger.info(f"Found {len(campaigns)} campaigns") + + # Enrich each campaign with content and metrics + enriched_campaigns = [] + + for campaign in campaigns: + campaign_id = campaign['id'] + + # Add basic campaign info + enriched_campaign = { + 'id': campaign_id, + 'title': campaign.get('settings', {}).get('subject_line', 'Untitled'), + 'preview_text': campaign.get('settings', {}).get('preview_text', ''), + 'from_name': campaign.get('settings', {}).get('from_name', ''), + 'reply_to': campaign.get('settings', {}).get('reply_to', ''), + 'send_time': campaign.get('send_time'), + 'status': campaign.get('status'), + 'type': campaign.get('type', 'regular'), + 'archive_url': campaign.get('archive_url', ''), + 'long_archive_url': campaign.get('long_archive_url', ''), + 'folder_id': campaign.get('settings', {}).get('folder_id') + } + + # Fetch content + content_data = self._fetch_campaign_content(campaign_id) + if content_data: + plain_text = content_data.get('plain_text', '') + # Clean the content + enriched_campaign['plain_text'] = self._clean_content(plain_text) + + # If no plain text, convert HTML + if not enriched_campaign['plain_text'] and content_data.get('html'): + converted = self.convert_to_markdown( + content_data['html'], + content_type="text/html" + ) + enriched_campaign['plain_text'] = self._clean_content(converted) + + # Fetch metrics + report_data = self._fetch_campaign_report(campaign_id) + if report_data: + enriched_campaign['metrics'] = { + 'emails_sent': report_data.get('emails_sent', 0), + 'unique_opens': report_data.get('opens', {}).get('unique_opens', 0), + 'open_rate': report_data.get('opens', {}).get('open_rate', 0), + 'total_opens': report_data.get('opens', {}).get('opens_total', 0), + 'unique_clicks': report_data.get('clicks', {}).get('unique_clicks', 0), + 'click_rate': report_data.get('clicks', {}).get('click_rate', 0), + 'total_clicks': report_data.get('clicks', {}).get('clicks_total', 0), + 'unsubscribed': report_data.get('unsubscribed', 0), + 'bounces': { + 'hard': report_data.get('bounces', {}).get('hard_bounces', 0), + 'soft': report_data.get('bounces', {}).get('soft_bounces', 0), + 'syntax_errors': report_data.get('bounces', {}).get('syntax_errors', 0) + }, + 'abuse_reports': report_data.get('abuse_reports', 0), + 'forwards': { + 'count': report_data.get('forwards', {}).get('forwards_count', 0), + 'opens': report_data.get('forwards', {}).get('forwards_opens', 0) + } + } + else: + enriched_campaign['metrics'] = {} + + enriched_campaigns.append(enriched_campaign) + + # Add small delay to avoid rate limiting + time.sleep(0.5) + + return enriched_campaigns + + except Exception as e: + self.logger.error(f"Error fetching campaigns: {e}") + return [] + + def format_markdown(self, campaigns: List[Dict[str, Any]]) -> str: + """Format campaigns as markdown with enhanced metrics.""" + markdown_sections = [] + + for campaign in campaigns: + section = [] + + # ID + section.append(f"# ID: {campaign.get('id', 'N/A')}") + section.append("") + + # Title + section.append(f"## Title: {campaign.get('title', 'Untitled')}") + section.append("") + + # Type + section.append(f"## Type: email_campaign") + section.append("") + + # Send Time + send_time = campaign.get('send_time', '') + if send_time: + section.append(f"## Send Date: {send_time}") + section.append("") + + # From and Reply-to + from_name = campaign.get('from_name', '') + reply_to = campaign.get('reply_to', '') + if from_name: + section.append(f"## From: {from_name}") + if reply_to: + section.append(f"## Reply To: {reply_to}") + section.append("") + + # Archive URL + archive_url = campaign.get('long_archive_url') or campaign.get('archive_url', '') + if archive_url: + section.append(f"## Archive URL: {archive_url}") + section.append("") + + # Metrics + metrics = campaign.get('metrics', {}) + if metrics: + section.append("## Metrics:") + section.append(f"### Emails Sent: {metrics.get('emails_sent', 0):,}") + section.append(f"### Opens: {metrics.get('unique_opens', 0):,} unique ({metrics.get('open_rate', 0)*100:.1f}%)") + section.append(f"### Clicks: {metrics.get('unique_clicks', 0):,} unique ({metrics.get('click_rate', 0)*100:.1f}%)") + section.append(f"### Unsubscribes: {metrics.get('unsubscribed', 0)}") + + bounces = metrics.get('bounces', {}) + total_bounces = bounces.get('hard', 0) + bounces.get('soft', 0) + if total_bounces > 0: + section.append(f"### Bounces: {total_bounces} (Hard: {bounces.get('hard', 0)}, Soft: {bounces.get('soft', 0)})") + + if metrics.get('abuse_reports', 0) > 0: + section.append(f"### Abuse Reports: {metrics.get('abuse_reports', 0)}") + + forwards = metrics.get('forwards', {}) + if forwards.get('count', 0) > 0: + section.append(f"### Forwards: {forwards.get('count', 0)}") + + section.append("") + + # Preview Text + preview_text = campaign.get('preview_text', '') + if preview_text: + section.append(f"## Preview Text:") + section.append(preview_text) + section.append("") + + # Content (cleaned) + content = campaign.get('plain_text', '') + if content: + section.append("## Content:") + section.append(content) + section.append("") + + # Separator + section.append("-" * 50) + section.append("") + + markdown_sections.append('\n'.join(section)) + + return '\n'.join(markdown_sections) + + def get_incremental_items(self, items: List[Dict[str, Any]], state: Dict[str, Any]) -> List[Dict[str, Any]]: + """Get only new campaigns since last sync.""" + if not state: + return items + + last_campaign_id = state.get('last_campaign_id') + last_send_time = state.get('last_send_time') + + if not last_campaign_id: + return items + + # Filter for campaigns newer than the last synced + new_items = [] + for item in items: + if item.get('id') == last_campaign_id: + break # Found the last synced campaign + + # Also check by send time as backup + if last_send_time and item.get('send_time'): + if item['send_time'] <= last_send_time: + continue + + new_items.append(item) + + return new_items + + def update_state(self, state: Dict[str, Any], items: List[Dict[str, Any]]) -> Dict[str, Any]: + """Update state with latest campaign information.""" + if not items: + return state + + # Get the first item (most recent) + latest_item = items[0] + + state['last_campaign_id'] = latest_item.get('id') + state['last_send_time'] = latest_item.get('send_time') + state['last_campaign_title'] = latest_item.get('title') + state['last_sync'] = datetime.now(self.tz).isoformat() + state['campaign_count'] = len(items) + + return state \ No newline at end of file diff --git a/src/youtube_api_scraper_v2.py b/src/youtube_api_scraper_v2.py new file mode 100644 index 0000000..e27eb84 --- /dev/null +++ b/src/youtube_api_scraper_v2.py @@ -0,0 +1,513 @@ +#!/usr/bin/env python3 +""" +YouTube Data API v3 scraper with quota management and captions support +Designed to stay within 10,000 units/day limit while fetching captions + +Quota costs: +- channels.list: 1 unit +- playlistItems.list: 1 unit per page (50 items max) +- videos.list: 1 unit per page (50 videos max) +- search.list: 100 units (avoid if possible!) +- captions.list: 50 units per video +- captions.download: 200 units per caption + +Strategy for 444 videos with captions: +- Get channel info: 1 unit +- Get all playlist items (444/50 = 9 pages): 9 units +- Get video details in batches of 50: 9 units +- Get captions list for each video: 444 * 50 = 22,200 units (too much!) +- Alternative: Use captions.list selectively or in batches +""" + +import os +import time +from typing import Any, Dict, List, Optional, Tuple +from datetime import datetime +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError +from src.base_scraper import BaseScraper, ScraperConfig +import logging +import re + + +class YouTubeAPIScraper(BaseScraper): + """YouTube API scraper with quota management and captions.""" + + # Quota costs for different operations + QUOTA_COSTS = { + 'channels_list': 1, + 'playlist_items': 1, + 'videos_list': 1, + 'search': 100, + 'captions_list': 50, + 'captions_download': 200, + } + + def __init__(self, config: ScraperConfig): + super().__init__(config) + + self.api_key = os.getenv('YOUTUBE_API_KEY') + if not self.api_key: + raise ValueError("YOUTUBE_API_KEY not found in environment variables") + + # Build YouTube API client + self.youtube = build('youtube', 'v3', developerKey=self.api_key) + + # Channel configuration + self.channel_url = os.getenv('YOUTUBE_CHANNEL_URL', 'https://www.youtube.com/@HVACKnowItAll') + self.channel_id = None + self.uploads_playlist_id = None + + # Quota tracking + self.quota_used = 0 + self.daily_quota_limit = 10000 + + # Caption fetching strategy + self.max_captions_per_run = 50 # Limit caption fetches to top videos + # 50 videos * 50 units = 2,500 units for caption listing + # Plus potential download costs + + self.logger.info(f"Initialized YouTube API scraper for channel: {self.channel_url}") + + def _track_quota(self, operation: str, count: int = 1) -> bool: + """Track quota usage and return True if within limits.""" + cost = self.QUOTA_COSTS.get(operation, 0) * count + + if self.quota_used + cost > self.daily_quota_limit: + self.logger.warning(f"Quota limit would be exceeded. Current: {self.quota_used}, Cost: {cost}") + return False + + self.quota_used += cost + self.logger.debug(f"Quota used: {self.quota_used}/{self.daily_quota_limit} (+{cost} for {operation})") + return True + + def _get_channel_info(self) -> bool: + """Get channel ID and uploads playlist ID.""" + if self.channel_id and self.uploads_playlist_id: + return True + + try: + # Extract channel handle + channel_handle = self.channel_url.split('@')[-1] + + # Try to get channel by handle first (costs 1 unit) + if not self._track_quota('channels_list'): + return False + + response = self.youtube.channels().list( + part='snippet,statistics,contentDetails', + forHandle=channel_handle + ).execute() + + if not response.get('items'): + # Fallback to search by name (costs 100 units - avoid!) + self.logger.warning("Channel not found by handle, trying search...") + + if not self._track_quota('search'): + return False + + search_response = self.youtube.search().list( + part='snippet', + q="HVAC Know It All", + type='channel', + maxResults=1 + ).execute() + + if not search_response.get('items'): + self.logger.error("Channel not found") + return False + + self.channel_id = search_response['items'][0]['snippet']['channelId'] + + # Get full channel details + if not self._track_quota('channels_list'): + return False + + response = self.youtube.channels().list( + part='snippet,statistics,contentDetails', + id=self.channel_id + ).execute() + + if response.get('items'): + channel_data = response['items'][0] + self.channel_id = channel_data['id'] + self.uploads_playlist_id = channel_data['contentDetails']['relatedPlaylists']['uploads'] + + # Log channel stats + stats = channel_data['statistics'] + self.logger.info(f"Channel: {channel_data['snippet']['title']}") + self.logger.info(f"Subscribers: {int(stats.get('subscriberCount', 0)):,}") + self.logger.info(f"Total videos: {int(stats.get('videoCount', 0)):,}") + + return True + + except HttpError as e: + self.logger.error(f"YouTube API error: {e}") + except Exception as e: + self.logger.error(f"Error getting channel info: {e}") + + return False + + def _fetch_all_video_ids(self, max_videos: int = None) -> List[str]: + """Fetch all video IDs from the channel efficiently.""" + if not self._get_channel_info(): + return [] + + video_ids = [] + next_page_token = None + videos_fetched = 0 + + while True: + # Check quota before each request + if not self._track_quota('playlist_items'): + self.logger.warning("Quota limit reached while fetching video IDs") + break + + try: + # Fetch playlist items (50 per page, costs 1 unit) + request = self.youtube.playlistItems().list( + part='contentDetails', + playlistId=self.uploads_playlist_id, + maxResults=50, + pageToken=next_page_token + ) + + response = request.execute() + + for item in response.get('items', []): + video_ids.append(item['contentDetails']['videoId']) + videos_fetched += 1 + + if max_videos and videos_fetched >= max_videos: + return video_ids[:max_videos] + + # Check for next page + next_page_token = response.get('nextPageToken') + if not next_page_token: + break + + except HttpError as e: + self.logger.error(f"Error fetching video IDs: {e}") + break + + self.logger.info(f"Fetched {len(video_ids)} video IDs") + return video_ids + + def _fetch_video_details_batch(self, video_ids: List[str]) -> List[Dict[str, Any]]: + """Fetch details for a batch of videos (max 50 per request).""" + if not video_ids: + return [] + + # YouTube API allows max 50 videos per request + batch_size = 50 + all_videos = [] + + for i in range(0, len(video_ids), batch_size): + batch = video_ids[i:i + batch_size] + + # Check quota (1 unit per request) + if not self._track_quota('videos_list'): + self.logger.warning("Quota limit reached while fetching video details") + break + + try: + response = self.youtube.videos().list( + part='snippet,statistics,contentDetails', + id=','.join(batch) + ).execute() + + for video in response.get('items', []): + video_data = { + 'id': video['id'], + 'title': video['snippet']['title'], + 'description': video['snippet']['description'], # Full description! + 'published_at': video['snippet']['publishedAt'], + 'channel_id': video['snippet']['channelId'], + 'channel_title': video['snippet']['channelTitle'], + 'tags': video['snippet'].get('tags', []), + 'duration': video['contentDetails']['duration'], + 'definition': video['contentDetails']['definition'], + 'caption': video['contentDetails'].get('caption', 'false'), # Has captions? + 'thumbnail': video['snippet']['thumbnails'].get('maxres', {}).get('url') or + video['snippet']['thumbnails'].get('high', {}).get('url', ''), + + # Statistics + 'view_count': int(video['statistics'].get('viewCount', 0)), + 'like_count': int(video['statistics'].get('likeCount', 0)), + 'comment_count': int(video['statistics'].get('commentCount', 0)), + + # Calculate engagement metrics + 'engagement_rate': 0, + 'like_ratio': 0 + } + + # Calculate engagement metrics + if video_data['view_count'] > 0: + video_data['engagement_rate'] = ( + (video_data['like_count'] + video_data['comment_count']) / + video_data['view_count'] + ) * 100 + video_data['like_ratio'] = (video_data['like_count'] / video_data['view_count']) * 100 + + all_videos.append(video_data) + + # Small delay to be respectful + time.sleep(0.1) + + except HttpError as e: + self.logger.error(f"Error fetching video details: {e}") + + return all_videos + + def _fetch_caption_text(self, video_id: str) -> Optional[str]: + """Fetch caption text using YouTube Data API (costs 50 units!).""" + try: + # Check quota (50 units for list) + if not self._track_quota('captions_list'): + self.logger.debug(f"Quota limit - skipping captions for {video_id}") + return None + + # List available captions + captions_response = self.youtube.captions().list( + part='snippet', + videoId=video_id + ).execute() + + captions = captions_response.get('items', []) + if not captions: + self.logger.debug(f"No captions available for video {video_id}") + return None + + # Find English caption (or auto-generated) + english_caption = None + for caption in captions: + if caption['snippet']['language'] == 'en': + english_caption = caption + break + + if not english_caption: + # Try auto-generated + for caption in captions: + if 'auto' in caption['snippet']['name'].lower(): + english_caption = caption + break + + if english_caption: + caption_id = english_caption['id'] + + # Download caption would cost 200 more units! + # For now, just note that captions are available + self.logger.debug(f"Captions available for video {video_id} (id: {caption_id})") + return f"[Captions available - {english_caption['snippet']['name']}]" + + return None + + except HttpError as e: + if 'captionsDisabled' in str(e): + self.logger.debug(f"Captions disabled for video {video_id}") + else: + self.logger.debug(f"Error fetching captions for {video_id}: {e}") + except Exception as e: + self.logger.debug(f"Error fetching captions for {video_id}: {e}") + + return None + + def fetch_content(self, max_posts: int = None, fetch_captions: bool = True) -> List[Dict[str, Any]]: + """Fetch video content with intelligent quota management.""" + + self.logger.info(f"Starting YouTube API fetch (quota limit: {self.daily_quota_limit})") + + # Step 1: Get all video IDs (very cheap - ~9 units for 444 videos) + video_ids = self._fetch_all_video_ids(max_posts) + + if not video_ids: + self.logger.warning("No video IDs fetched") + return [] + + # Step 2: Fetch video details in batches (also cheap - ~9 units for 444 videos) + videos = self._fetch_video_details_batch(video_ids) + + self.logger.info(f"Fetched details for {len(videos)} videos") + + # Step 3: Fetch captions for top videos (expensive - 50 units per video) + if fetch_captions: + # Prioritize videos by views for caption fetching + videos_sorted = sorted(videos, key=lambda x: x['view_count'], reverse=True) + + # Limit caption fetching to top videos + max_captions = min(self.max_captions_per_run, len(videos_sorted)) + + # Check remaining quota + captions_quota_needed = max_captions * 50 + if self.quota_used + captions_quota_needed > self.daily_quota_limit: + max_captions = (self.daily_quota_limit - self.quota_used) // 50 + self.logger.warning(f"Limiting captions to {max_captions} videos due to quota") + + if max_captions > 0: + self.logger.info(f"Fetching captions for top {max_captions} videos by views") + + for i, video in enumerate(videos_sorted[:max_captions]): + caption_text = self._fetch_caption_text(video['id']) + if caption_text: + video['caption_text'] = caption_text + self.logger.debug(f"Got caption info for video {i+1}/{max_captions}: {video['title']}") + + # Small delay to be respectful + time.sleep(0.5) + + # Log final quota usage + self.logger.info(f"Total quota used: {self.quota_used}/{self.daily_quota_limit} units") + self.logger.info(f"Remaining quota: {self.daily_quota_limit - self.quota_used} units") + + return videos + + def _get_video_type(self, video: Dict[str, Any]) -> str: + """Determine video type based on duration.""" + duration = video.get('duration', 'PT0S') + + # Parse ISO 8601 duration + match = re.match(r'PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?', duration) + if match: + hours = int(match.group(1) or 0) + minutes = int(match.group(2) or 0) + seconds = int(match.group(3) or 0) + total_seconds = hours * 3600 + minutes * 60 + seconds + + if total_seconds < 60: + return 'short' + elif total_seconds > 600: # > 10 minutes + return 'video' + else: + return 'video' + + return 'video' + + def format_markdown(self, videos: List[Dict[str, Any]]) -> str: + """Format videos as markdown with enhanced data.""" + markdown_sections = [] + + for video in videos: + section = [] + + # ID + section.append(f"# ID: {video.get('id', 'N/A')}") + section.append("") + + # Title + section.append(f"## Title: {video.get('title', 'Untitled')}") + section.append("") + + # Type + video_type = self._get_video_type(video) + section.append(f"## Type: {video_type}") + section.append("") + + # Author + section.append(f"## Author: {video.get('channel_title', 'Unknown')}") + section.append("") + + # Link + section.append(f"## Link: https://www.youtube.com/watch?v={video.get('id')}") + section.append("") + + # Upload Date + section.append(f"## Upload Date: {video.get('published_at', '')}") + section.append("") + + # Duration + section.append(f"## Duration: {video.get('duration', 'Unknown')}") + section.append("") + + # Views + section.append(f"## Views: {video.get('view_count', 0):,}") + section.append("") + + # Likes + section.append(f"## Likes: {video.get('like_count', 0):,}") + section.append("") + + # Comments + section.append(f"## Comments: {video.get('comment_count', 0):,}") + section.append("") + + # Engagement Metrics + section.append(f"## Engagement Rate: {video.get('engagement_rate', 0):.2f}%") + section.append(f"## Like Ratio: {video.get('like_ratio', 0):.2f}%") + section.append("") + + # Tags + tags = video.get('tags', []) + if tags: + section.append(f"## Tags: {', '.join(tags[:10])}") # First 10 tags + section.append("") + + # Thumbnail + thumbnail = video.get('thumbnail', '') + if thumbnail: + section.append(f"## Thumbnail: {thumbnail}") + section.append("") + + # Full Description (untruncated!) + section.append("## Description:") + description = video.get('description', '') + if description: + section.append(description) + section.append("") + + # Caption/Transcript + caption_text = video.get('caption_text') + if caption_text: + section.append("## Caption Status:") + section.append(caption_text) + section.append("") + + # Separator + section.append("-" * 50) + section.append("") + + markdown_sections.append('\n'.join(section)) + + return '\n'.join(markdown_sections) + + def get_incremental_items(self, items: List[Dict[str, Any]], state: Dict[str, Any]) -> List[Dict[str, Any]]: + """Get only new videos since last sync.""" + if not state: + return items + + last_video_id = state.get('last_video_id') + last_published = state.get('last_published') + + if not last_video_id: + return items + + # Filter for videos newer than the last synced + new_items = [] + for item in items: + if item.get('id') == last_video_id: + break # Found the last synced video + + # Also check by publish date as backup + if last_published and item.get('published_at'): + if item['published_at'] <= last_published: + continue + + new_items.append(item) + + return new_items + + def update_state(self, state: Dict[str, Any], items: List[Dict[str, Any]]) -> Dict[str, Any]: + """Update state with latest video information.""" + if not items: + return state + + # Get the first item (most recent) + latest_item = items[0] + + state['last_video_id'] = latest_item.get('id') + state['last_published'] = latest_item.get('published_at') + state['last_video_title'] = latest_item.get('title') + state['last_sync'] = datetime.now(self.tz).isoformat() + state['video_count'] = len(items) + state['quota_used'] = self.quota_used + + return state \ No newline at end of file diff --git a/test_cumulative_mode.py b/test_cumulative_mode.py new file mode 100644 index 0000000..242770c --- /dev/null +++ b/test_cumulative_mode.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python3 +""" +Test the cumulative markdown functionality +Demonstrates how backlog + incremental updates work together +""" + +import sys +from pathlib import Path +sys.path.insert(0, str(Path(__file__).parent)) + +from src.cumulative_markdown_manager import CumulativeMarkdownManager +from src.base_scraper import ScraperConfig +import logging +from datetime import datetime +import pytz + +# Set up logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger('cumulative_test') + + +def create_mock_items(start_id: int, count: int, prefix: str = ""): + """Create mock content items for testing.""" + items = [] + for i in range(count): + item_id = f"video_{start_id + i}" + items.append({ + 'id': item_id, + 'title': f"{prefix}Video Title {start_id + i}", + 'views': 1000 * (start_id + i), + 'likes': 100 * (start_id + i), + 'description': f"Description for video {start_id + i}", + 'publish_date': '2024-01-15' + }) + return items + + +def format_mock_markdown(items): + """Format mock items as markdown.""" + sections = [] + for item in items: + section = [ + f"# ID: {item['id']}", + "", + f"## Title: {item['title']}", + "", + f"## Views: {item['views']:,}", + "", + f"## Likes: {item['likes']:,}", + "", + f"## Description:", + item['description'], + "", + f"## Publish Date: {item['publish_date']}", + "", + "-" * 50 + ] + sections.append('\n'.join(section)) + + return '\n\n'.join(sections) + + +def test_cumulative_workflow(): + """Test the complete cumulative workflow.""" + logger.info("=" * 60) + logger.info("TESTING CUMULATIVE MARKDOWN WORKFLOW") + logger.info("=" * 60) + + # Setup test config + config = ScraperConfig( + source_name='TestSource', + brand_name='testbrand', + data_dir=Path('test_data'), + logs_dir=Path('test_logs'), + timezone='America/Halifax' + ) + + # Clean up any existing test files + test_pattern = "testbrand_TestSource_*.md" + for old_file in Path('test_data/markdown_current').glob(test_pattern): + old_file.unlink() + logger.info(f"Cleaned up old test file: {old_file.name}") + + # Initialize manager + manager = CumulativeMarkdownManager(config, logger) + + # STEP 1: Initial backlog capture + logger.info("\n" + "=" * 40) + logger.info("STEP 1: BACKLOG CAPTURE (Day 1)") + logger.info("=" * 40) + + backlog_items = create_mock_items(1, 5, "Backlog ") + logger.info(f"Created {len(backlog_items)} backlog items") + + file1 = manager.save_cumulative(backlog_items, format_mock_markdown) + logger.info(f"Saved backlog to: {file1.name}") + + stats = manager.get_statistics(file1) + logger.info(f"Stats after backlog: {stats}") + + # STEP 2: First incremental update (new items) + logger.info("\n" + "=" * 40) + logger.info("STEP 2: INCREMENTAL UPDATE - New Items (Day 2)") + logger.info("=" * 40) + + new_items = create_mock_items(6, 2, "New ") + logger.info(f"Created {len(new_items)} new items") + + file2 = manager.save_cumulative(new_items, format_mock_markdown) + logger.info(f"Saved incremental to: {file2.name}") + + stats = manager.get_statistics(file2) + logger.info(f"Stats after first incremental: {stats}") + + # Verify content + content = file2.read_text(encoding='utf-8') + id_count = content.count('# ID:') + logger.info(f"Total sections in file: {id_count}") + + # STEP 3: Second incremental with updates + logger.info("\n" + "=" * 40) + logger.info("STEP 3: INCREMENTAL UPDATE - With Updates (Day 3)") + logger.info("=" * 40) + + # Create items with updates (higher view counts) and new items + updated_items = [ + { + 'id': 'video_1', # Update existing + 'title': 'Backlog Video Title 1', + 'views': 5000, # Increased from 1000 + 'likes': 500, # Increased from 100 + 'description': 'Updated description with more details and captions', + 'publish_date': '2024-01-15', + 'caption': 'This video now has captions!' # New field + }, + { + 'id': 'video_8', # New item + 'title': 'Brand New Video 8', + 'views': 8000, + 'likes': 800, + 'description': 'Newest video just published', + 'publish_date': '2024-01-18' + } + ] + + # Format with caption support + def format_with_captions(items): + sections = [] + for item in items: + section = [ + f"# ID: {item['id']}", + "", + f"## Title: {item['title']}", + "", + f"## Views: {item['views']:,}", + "", + f"## Likes: {item['likes']:,}", + "", + f"## Description:", + item['description'], + "" + ] + + if 'caption' in item: + section.extend([ + "## Caption Status:", + item['caption'], + "" + ]) + + section.extend([ + f"## Publish Date: {item['publish_date']}", + "", + "-" * 50 + ]) + + sections.append('\n'.join(section)) + + return '\n\n'.join(sections) + + logger.info(f"Created 1 update + 1 new item") + + file3 = manager.save_cumulative(updated_items, format_with_captions) + logger.info(f"Saved second incremental to: {file3.name}") + + stats = manager.get_statistics(file3) + logger.info(f"Stats after second incremental: {stats}") + + # Verify final content + final_content = file3.read_text(encoding='utf-8') + final_id_count = final_content.count('# ID:') + caption_count = final_content.count('## Caption Status:') + + logger.info(f"Final total sections: {final_id_count}") + logger.info(f"Sections with captions: {caption_count}") + + # Check if video_1 was updated + if 'This video now has captions!' in final_content: + logger.info("✅ Successfully updated video_1 with captions") + else: + logger.error("❌ Failed to update video_1") + + # Check if video_8 was added + if 'video_8' in final_content: + logger.info("✅ Successfully added new video_8") + else: + logger.error("❌ Failed to add video_8") + + # List archive files + logger.info("\n" + "=" * 40) + logger.info("ARCHIVED FILES:") + logger.info("=" * 40) + + archive_dir = Path('test_data/markdown_archives/TestSource') + if archive_dir.exists(): + archives = list(archive_dir.glob("*.md")) + for archive in sorted(archives): + logger.info(f" - {archive.name}") + + logger.info("\n" + "=" * 60) + logger.info("TEST COMPLETE!") + logger.info("=" * 60) + logger.info("Summary:") + logger.info(f" - Started with 5 backlog items") + logger.info(f" - Added 2 new items in first incremental") + logger.info(f" - Updated 1 item + added 1 item in second incremental") + logger.info(f" - Final file has {final_id_count} total items") + logger.info(f" - {caption_count} items have captions") + logger.info(f" - {len(archives) if archive_dir.exists() else 0} versions archived") + + +if __name__ == "__main__": + test_cumulative_workflow() \ No newline at end of file