From c1831d3a52e067605b99f6623a1c76da2002e295 Mon Sep 17 00:00:00 2001 From: Ben Reed Date: Mon, 18 Aug 2025 12:39:49 -0300 Subject: [PATCH] feat: Implement YouTube scraper with humanized behavior MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - YouTube channel scraper using yt-dlp - Authentication and session persistence via cookies - Humanized delays and rate limiting (2-5 seconds between requests) - User agent rotation for stealth - Incremental updates via state management - Support for videos, shorts, and live streams detection - All 11 tests passing 🤖 Generated with Claude Code Co-Authored-By: Claude --- src/youtube_scraper.py | 299 ++++++++++++++++++++++++++++++++++ tests/test_youtube_scraper.py | 233 ++++++++++++++++++++++++++ 2 files changed, 532 insertions(+) create mode 100644 src/youtube_scraper.py create mode 100644 tests/test_youtube_scraper.py diff --git a/src/youtube_scraper.py b/src/youtube_scraper.py new file mode 100644 index 0000000..27657cc --- /dev/null +++ b/src/youtube_scraper.py @@ -0,0 +1,299 @@ +import os +import time +import random +import json +from typing import Any, Dict, List, Optional +from datetime import datetime +from pathlib import Path +import yt_dlp +from src.base_scraper import BaseScraper, ScraperConfig + + +class YouTubeScraper(BaseScraper): + """YouTube channel scraper using yt-dlp.""" + + def __init__(self, config: ScraperConfig): + super().__init__(config) + self.username = os.getenv('YOUTUBE_USERNAME') + self.password = os.getenv('YOUTUBE_PASSWORD') + self.channel_url = os.getenv('YOUTUBE_CHANNEL_URL', 'https://www.youtube.com/@HVACKnowItAll') + + # Cookies file for session persistence + self.cookies_file = self.config.data_dir / '.cookies' / 'youtube_cookies.txt' + self.cookies_file.parent.mkdir(parents=True, exist_ok=True) + + # User agents for rotation + self.user_agents = [ + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' + ] + + def _get_ydl_options(self) -> Dict[str, Any]: + """Get yt-dlp options with authentication and rate limiting.""" + options = { + 'quiet': True, + 'no_warnings': True, + 'extract_flat': False, # Get full video info + 'ignoreerrors': True, # Continue on error + 'cookiefile': str(self.cookies_file), + 'cookiesfrombrowser': None, # Don't use browser cookies + 'username': self.username, + 'password': self.password, + 'ratelimit': 100000, # 100KB/s rate limit + 'sleep_interval': 1, # Sleep between downloads + 'max_sleep_interval': 3, + 'user_agent': random.choice(self.user_agents), + 'referer': 'https://www.youtube.com/', + 'add_header': ['Accept-Language:en-US,en;q=0.9'], + } + + # Add proxy if configured + proxy = os.getenv('YOUTUBE_PROXY') + if proxy: + options['proxy'] = proxy + + return options + + def _humanized_delay(self, min_seconds: float = 2, max_seconds: float = 5) -> None: + """Add humanized random delay between requests.""" + delay = random.uniform(min_seconds, max_seconds) + self.logger.debug(f"Waiting {delay:.2f} seconds...") + time.sleep(delay) + + def fetch_channel_videos(self, max_videos: int = 50) -> List[Dict[str, Any]]: + """Fetch video list from YouTube channel.""" + videos = [] + + try: + self.logger.info(f"Fetching videos from channel: {self.channel_url}") + + ydl_opts = self._get_ydl_options() + ydl_opts['extract_flat'] = True # Just get video list, not full info + ydl_opts['playlistend'] = max_videos + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + channel_info = ydl.extract_info(self.channel_url, download=False) + + if 'entries' in channel_info: + videos = list(channel_info['entries']) + self.logger.info(f"Found {len(videos)} videos in channel") + else: + self.logger.warning("No entries found in channel info") + + # Save cookies for next session + if self.cookies_file.exists(): + self.logger.debug("Cookies saved for next session") + + except Exception as e: + self.logger.error(f"Error fetching channel videos: {e}") + + return videos + + def fetch_video_details(self, video_id: str) -> Optional[Dict[str, Any]]: + """Fetch detailed information for a specific video.""" + try: + video_url = f"https://www.youtube.com/watch?v={video_id}" + + ydl_opts = self._get_ydl_options() + ydl_opts['extract_flat'] = False # Get full video info + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + video_info = ydl.extract_info(video_url, download=False) + return video_info + + except Exception as e: + self.logger.error(f"Error fetching video {video_id}: {e}") + return None + + def _get_video_type(self, video: Dict[str, Any]) -> str: + """Determine video type (video, short, live).""" + duration = video.get('duration', 0) + is_live = video.get('is_live', False) + + if is_live: + return 'live' + elif duration and duration < 60: # Less than 60 seconds + return 'short' + else: + return 'video' + + def fetch_content(self) -> List[Dict[str, Any]]: + """Fetch and enrich video content with rate limiting.""" + # First get list of videos + videos = self.fetch_channel_videos() + + if not videos: + return [] + + # Enrich each video with detailed information + enriched_videos = [] + + for i, video in enumerate(videos): + try: + video_id = video.get('id') + if not video_id: + continue + + self.logger.info(f"Fetching details for video {i+1}/{len(videos)}: {video_id}") + + # Add humanized delay between requests + if i > 0: + self._humanized_delay() + + # Fetch full video details + detailed_info = self.fetch_video_details(video_id) + + if detailed_info: + # Add video type + detailed_info['type'] = self._get_video_type(detailed_info) + enriched_videos.append(detailed_info) + + # Extra delay after every 5 videos + if (i + 1) % 5 == 0: + self.logger.info("Taking longer break after 5 videos...") + self._humanized_delay(5, 10) + + except Exception as e: + self.logger.error(f"Error enriching video {video.get('id')}: {e}") + continue + + self.logger.info(f"Successfully enriched {len(enriched_videos)} videos") + return enriched_videos + + def format_markdown(self, videos: List[Dict[str, Any]]) -> str: + """Format videos as markdown.""" + markdown_sections = [] + + for video in videos: + section = [] + + # ID + video_id = video.get('id', 'N/A') + section.append(f"# ID: {video_id}") + section.append("") + + # Title + title = video.get('title', 'Untitled') + section.append(f"## Title: {title}") + section.append("") + + # Type + video_type = video.get('type', self._get_video_type(video)) + section.append(f"## Type: {video_type}") + section.append("") + + # Author/Uploader + author = video.get('uploader', 'Unknown') + section.append(f"## Author: {author}") + section.append("") + + # Link + link = video.get('webpage_url', f"https://www.youtube.com/watch?v={video_id}") + section.append(f"## Link: {link}") + section.append("") + + # Upload Date + upload_date = video.get('upload_date', '') + if upload_date and len(upload_date) == 8: # YYYYMMDD format + formatted_date = f"{upload_date[:4]}-{upload_date[4:6]}-{upload_date[6:]}" + section.append(f"## Upload Date: {formatted_date}") + else: + section.append(f"## Upload Date: {upload_date}") + section.append("") + + # Views + view_count = video.get('view_count', 0) + section.append(f"## Views: {view_count}") + section.append("") + + # Likes + like_count = video.get('like_count', 0) + section.append(f"## Likes: {like_count}") + section.append("") + + # Comments + comment_count = video.get('comment_count', 0) + section.append(f"## Comments: {comment_count}") + section.append("") + + # Duration + duration = video.get('duration', 0) + section.append(f"## Duration: {duration} seconds") + section.append("") + + # Tags + tags = video.get('tags', []) + if tags: + tags_str = ', '.join(tags[:10]) # Limit to first 10 tags + section.append(f"## Tags: {tags_str}") + section.append("") + + # Thumbnail + thumbnail = video.get('thumbnail', '') + if thumbnail: + section.append(f"## Thumbnail: {thumbnail}") + section.append("") + + # Description + section.append("## Description:") + description = video.get('description', '') + if description: + # Limit description to first 500 characters + if len(description) > 500: + description = description[:500] + "..." + section.append(description) + section.append("") + + # Separator + section.append("-" * 50) + section.append("") + + markdown_sections.append('\n'.join(section)) + + return '\n'.join(markdown_sections) + + def get_incremental_items(self, items: List[Dict[str, Any]], state: Dict[str, Any]) -> List[Dict[str, Any]]: + """Get only new videos since last sync.""" + if not state: + return items + + last_video_id = state.get('last_video_id') + last_video_date = state.get('last_video_date') + + if not last_video_id: + return items + + # Filter for videos newer than the last synced + new_items = [] + for item in items: + video_id = item.get('id') + upload_date = item.get('upload_date', '') + + # Check if this is a new video + if video_id == last_video_id: + break # Found the last synced video, stop here + + # Also check by date as backup + if upload_date and last_video_date and upload_date <= last_video_date: + continue + + new_items.append(item) + + return new_items + + def update_state(self, state: Dict[str, Any], items: List[Dict[str, Any]]) -> Dict[str, Any]: + """Update state with latest video information.""" + if not items: + return state + + # Get the first item (most recent) + latest_item = items[0] + + state['last_video_id'] = latest_item.get('id') + state['last_video_date'] = latest_item.get('upload_date') + state['last_video_title'] = latest_item.get('title') + state['last_sync'] = datetime.now(self.tz).isoformat() + state['video_count'] = len(items) + + return state \ No newline at end of file diff --git a/tests/test_youtube_scraper.py b/tests/test_youtube_scraper.py new file mode 100644 index 0000000..fe4617a --- /dev/null +++ b/tests/test_youtube_scraper.py @@ -0,0 +1,233 @@ +import pytest +from unittest.mock import Mock, patch, MagicMock, call +from datetime import datetime +from pathlib import Path +import random +from src.youtube_scraper import YouTubeScraper +from src.base_scraper import ScraperConfig + + +class TestYouTubeScraper: + @pytest.fixture + def config(self): + return ScraperConfig( + source_name="youtube", + brand_name="hvacknowitall", + data_dir=Path("data"), + logs_dir=Path("logs"), + timezone="America/Halifax" + ) + + @pytest.fixture + def mock_env(self): + with patch.dict('os.environ', { + 'YOUTUBE_USERNAME': 'test@example.com', + 'YOUTUBE_PASSWORD': 'test_password', + 'YOUTUBE_CHANNEL_URL': 'https://www.youtube.com/@HVACKnowItAll' + }): + yield + + @pytest.fixture + def sample_video_info(self): + return { + 'id': 'abc123', + 'title': 'HVAC Maintenance Tips', + 'description': 'Learn how to maintain your HVAC system', + 'uploader': 'HVAC Know It All', + 'upload_date': '20240101', + 'view_count': 1500, + 'like_count': 100, + 'comment_count': 25, + 'duration': 600, + 'webpage_url': 'https://www.youtube.com/watch?v=abc123', + 'thumbnail': 'https://i.ytimg.com/vi/abc123/maxresdefault.jpg', + 'tags': ['hvac', 'maintenance', 'tips'] + } + + def test_initialization(self, config, mock_env): + scraper = YouTubeScraper(config) + assert scraper.config == config + assert scraper.username == 'test@example.com' + assert scraper.password == 'test_password' + assert scraper.channel_url == 'https://www.youtube.com/@HVACKnowItAll' + + @patch('yt_dlp.YoutubeDL') + def test_setup_ydl_options(self, mock_ydl_class, config, mock_env): + scraper = YouTubeScraper(config) + options = scraper._get_ydl_options() + + # Check key options + assert options['quiet'] == True + assert options['no_warnings'] == True + assert options['extract_flat'] == False + assert 'username' in options + assert 'password' in options + assert 'cookiefile' in options + assert 'ratelimit' in options + + @patch('yt_dlp.YoutubeDL') + def test_fetch_channel_videos(self, mock_ydl_class, config, mock_env, sample_video_info): + mock_ydl = MagicMock() + mock_ydl_class.return_value.__enter__.return_value = mock_ydl + + # Mock channel info with videos + mock_ydl.extract_info.return_value = { + 'entries': [ + sample_video_info, + {**sample_video_info, 'id': 'def456', 'title': 'Another Video'} + ] + } + + scraper = YouTubeScraper(config) + videos = scraper.fetch_channel_videos() + + assert len(videos) == 2 + assert videos[0]['id'] == 'abc123' + assert videos[1]['id'] == 'def456' + mock_ydl.extract_info.assert_called_once() + + @patch('yt_dlp.YoutubeDL') + def test_fetch_video_details(self, mock_ydl_class, config, mock_env, sample_video_info): + mock_ydl = MagicMock() + mock_ydl_class.return_value.__enter__.return_value = mock_ydl + mock_ydl.extract_info.return_value = sample_video_info + + scraper = YouTubeScraper(config) + video_info = scraper.fetch_video_details('abc123') + + assert video_info['id'] == 'abc123' + assert video_info['title'] == 'HVAC Maintenance Tips' + mock_ydl.extract_info.assert_called_with( + 'https://www.youtube.com/watch?v=abc123', + download=False + ) + + @patch('time.sleep') + @patch('random.uniform') + def test_humanized_delay(self, mock_uniform, mock_sleep, config, mock_env): + mock_uniform.return_value = 3.5 + + scraper = YouTubeScraper(config) + scraper._humanized_delay() + + mock_uniform.assert_called_with(2, 5) + mock_sleep.assert_called_with(3.5) + + def test_format_video_type(self, config, mock_env): + scraper = YouTubeScraper(config) + + # Test short video + assert scraper._get_video_type({'duration': 50}) == 'short' + + # Test regular video + assert scraper._get_video_type({'duration': 600}) == 'video' + + # Test live stream + assert scraper._get_video_type({'is_live': True}) == 'live' + + # Test missing duration + assert scraper._get_video_type({}) == 'video' + + def test_format_markdown(self, config, mock_env): + scraper = YouTubeScraper(config) + + videos = [ + { + 'id': 'abc123', + 'title': 'HVAC Tips', + 'description': 'Learn HVAC basics', + 'uploader': 'HVAC Know It All', + 'upload_date': '20240101', + 'view_count': 1500, + 'like_count': 100, + 'comment_count': 25, + 'duration': 600, + 'webpage_url': 'https://www.youtube.com/watch?v=abc123', + 'tags': ['hvac', 'tips'], + 'type': 'video' + } + ] + + markdown = scraper.format_markdown(videos) + + assert '# ID: abc123' in markdown + assert '## Title: HVAC Tips' in markdown + assert '## Type: video' in markdown + assert '## Author: HVAC Know It All' in markdown + assert '## Link: https://www.youtube.com/watch?v=abc123' in markdown + assert '## Views: 1500' in markdown + assert '## Likes: 100' in markdown + assert '## Comments: 25' in markdown + assert '## Duration: 600 seconds' in markdown + assert '## Upload Date: 2024-01-01' in markdown + assert '## Tags: hvac, tips' in markdown + + def test_get_incremental_items(self, config, mock_env): + scraper = YouTubeScraper(config) + + videos = [ + {'id': 'video3', 'upload_date': '20240103'}, + {'id': 'video2', 'upload_date': '20240102'}, + {'id': 'video1', 'upload_date': '20240101'} + ] + + # Test with no previous state + state = {} + new_videos = scraper.get_incremental_items(videos, state) + assert len(new_videos) == 3 + + # Test with existing state + state = {'last_video_id': 'video2', 'last_video_date': '20240102'} + new_videos = scraper.get_incremental_items(videos, state) + assert len(new_videos) == 1 + assert new_videos[0]['id'] == 'video3' + + def test_update_state(self, config, mock_env): + scraper = YouTubeScraper(config) + + state = {} + videos = [ + {'id': 'video2', 'upload_date': '20240102'}, + {'id': 'video1', 'upload_date': '20240101'} + ] + + updated_state = scraper.update_state(state, videos) + + assert updated_state['last_video_id'] == 'video2' + assert updated_state['last_video_date'] == '20240102' + assert updated_state['video_count'] == 2 + + @patch('yt_dlp.YoutubeDL') + def test_error_handling(self, mock_ydl_class, config, mock_env): + mock_ydl = MagicMock() + mock_ydl_class.return_value.__enter__.return_value = mock_ydl + mock_ydl.extract_info.side_effect = Exception("Network error") + + scraper = YouTubeScraper(config) + videos = scraper.fetch_channel_videos() + + assert videos == [] + + @patch('yt_dlp.YoutubeDL') + @patch('time.sleep') + def test_fetch_content_with_rate_limiting(self, mock_sleep, mock_ydl_class, config, mock_env, sample_video_info): + mock_ydl = MagicMock() + mock_ydl_class.return_value.__enter__.return_value = mock_ydl + + # Mock channel with multiple videos + mock_ydl.extract_info.side_effect = [ + {'entries': [ + {'id': 'video1', 'title': 'Video 1'}, + {'id': 'video2', 'title': 'Video 2'} + ]}, + {**sample_video_info, 'id': 'video1'}, + {**sample_video_info, 'id': 'video2'} + ] + + scraper = YouTubeScraper(config) + with patch.object(scraper, '_humanized_delay') as mock_delay: + videos = scraper.fetch_content() + + assert len(videos) == 2 + # Check that delay was called between video fetches (once for second video) + assert mock_delay.call_count >= 1 \ No newline at end of file