feat: Implement YouTube scraper with humanized behavior

- YouTube channel scraper using yt-dlp
- Authentication and session persistence via cookies
- Humanized delays and rate limiting (2-5 seconds between requests)
- User agent rotation for stealth
- Incremental updates via state management
- Support for videos, shorts, and live streams detection
- All 11 tests passing

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Ben Reed 2025-08-18 12:39:49 -03:00
parent 7191fcd132
commit c1831d3a52
2 changed files with 532 additions and 0 deletions

299
src/youtube_scraper.py Normal file
View file

@ -0,0 +1,299 @@
import os
import time
import random
import json
from typing import Any, Dict, List, Optional
from datetime import datetime
from pathlib import Path
import yt_dlp
from src.base_scraper import BaseScraper, ScraperConfig
class YouTubeScraper(BaseScraper):
"""YouTube channel scraper using yt-dlp."""
def __init__(self, config: ScraperConfig):
super().__init__(config)
self.username = os.getenv('YOUTUBE_USERNAME')
self.password = os.getenv('YOUTUBE_PASSWORD')
self.channel_url = os.getenv('YOUTUBE_CHANNEL_URL', 'https://www.youtube.com/@HVACKnowItAll')
# Cookies file for session persistence
self.cookies_file = self.config.data_dir / '.cookies' / 'youtube_cookies.txt'
self.cookies_file.parent.mkdir(parents=True, exist_ok=True)
# User agents for rotation
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
]
def _get_ydl_options(self) -> Dict[str, Any]:
"""Get yt-dlp options with authentication and rate limiting."""
options = {
'quiet': True,
'no_warnings': True,
'extract_flat': False, # Get full video info
'ignoreerrors': True, # Continue on error
'cookiefile': str(self.cookies_file),
'cookiesfrombrowser': None, # Don't use browser cookies
'username': self.username,
'password': self.password,
'ratelimit': 100000, # 100KB/s rate limit
'sleep_interval': 1, # Sleep between downloads
'max_sleep_interval': 3,
'user_agent': random.choice(self.user_agents),
'referer': 'https://www.youtube.com/',
'add_header': ['Accept-Language:en-US,en;q=0.9'],
}
# Add proxy if configured
proxy = os.getenv('YOUTUBE_PROXY')
if proxy:
options['proxy'] = proxy
return options
def _humanized_delay(self, min_seconds: float = 2, max_seconds: float = 5) -> None:
"""Add humanized random delay between requests."""
delay = random.uniform(min_seconds, max_seconds)
self.logger.debug(f"Waiting {delay:.2f} seconds...")
time.sleep(delay)
def fetch_channel_videos(self, max_videos: int = 50) -> List[Dict[str, Any]]:
"""Fetch video list from YouTube channel."""
videos = []
try:
self.logger.info(f"Fetching videos from channel: {self.channel_url}")
ydl_opts = self._get_ydl_options()
ydl_opts['extract_flat'] = True # Just get video list, not full info
ydl_opts['playlistend'] = max_videos
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
channel_info = ydl.extract_info(self.channel_url, download=False)
if 'entries' in channel_info:
videos = list(channel_info['entries'])
self.logger.info(f"Found {len(videos)} videos in channel")
else:
self.logger.warning("No entries found in channel info")
# Save cookies for next session
if self.cookies_file.exists():
self.logger.debug("Cookies saved for next session")
except Exception as e:
self.logger.error(f"Error fetching channel videos: {e}")
return videos
def fetch_video_details(self, video_id: str) -> Optional[Dict[str, Any]]:
"""Fetch detailed information for a specific video."""
try:
video_url = f"https://www.youtube.com/watch?v={video_id}"
ydl_opts = self._get_ydl_options()
ydl_opts['extract_flat'] = False # Get full video info
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
video_info = ydl.extract_info(video_url, download=False)
return video_info
except Exception as e:
self.logger.error(f"Error fetching video {video_id}: {e}")
return None
def _get_video_type(self, video: Dict[str, Any]) -> str:
"""Determine video type (video, short, live)."""
duration = video.get('duration', 0)
is_live = video.get('is_live', False)
if is_live:
return 'live'
elif duration and duration < 60: # Less than 60 seconds
return 'short'
else:
return 'video'
def fetch_content(self) -> List[Dict[str, Any]]:
"""Fetch and enrich video content with rate limiting."""
# First get list of videos
videos = self.fetch_channel_videos()
if not videos:
return []
# Enrich each video with detailed information
enriched_videos = []
for i, video in enumerate(videos):
try:
video_id = video.get('id')
if not video_id:
continue
self.logger.info(f"Fetching details for video {i+1}/{len(videos)}: {video_id}")
# Add humanized delay between requests
if i > 0:
self._humanized_delay()
# Fetch full video details
detailed_info = self.fetch_video_details(video_id)
if detailed_info:
# Add video type
detailed_info['type'] = self._get_video_type(detailed_info)
enriched_videos.append(detailed_info)
# Extra delay after every 5 videos
if (i + 1) % 5 == 0:
self.logger.info("Taking longer break after 5 videos...")
self._humanized_delay(5, 10)
except Exception as e:
self.logger.error(f"Error enriching video {video.get('id')}: {e}")
continue
self.logger.info(f"Successfully enriched {len(enriched_videos)} videos")
return enriched_videos
def format_markdown(self, videos: List[Dict[str, Any]]) -> str:
"""Format videos as markdown."""
markdown_sections = []
for video in videos:
section = []
# ID
video_id = video.get('id', 'N/A')
section.append(f"# ID: {video_id}")
section.append("")
# Title
title = video.get('title', 'Untitled')
section.append(f"## Title: {title}")
section.append("")
# Type
video_type = video.get('type', self._get_video_type(video))
section.append(f"## Type: {video_type}")
section.append("")
# Author/Uploader
author = video.get('uploader', 'Unknown')
section.append(f"## Author: {author}")
section.append("")
# Link
link = video.get('webpage_url', f"https://www.youtube.com/watch?v={video_id}")
section.append(f"## Link: {link}")
section.append("")
# Upload Date
upload_date = video.get('upload_date', '')
if upload_date and len(upload_date) == 8: # YYYYMMDD format
formatted_date = f"{upload_date[:4]}-{upload_date[4:6]}-{upload_date[6:]}"
section.append(f"## Upload Date: {formatted_date}")
else:
section.append(f"## Upload Date: {upload_date}")
section.append("")
# Views
view_count = video.get('view_count', 0)
section.append(f"## Views: {view_count}")
section.append("")
# Likes
like_count = video.get('like_count', 0)
section.append(f"## Likes: {like_count}")
section.append("")
# Comments
comment_count = video.get('comment_count', 0)
section.append(f"## Comments: {comment_count}")
section.append("")
# Duration
duration = video.get('duration', 0)
section.append(f"## Duration: {duration} seconds")
section.append("")
# Tags
tags = video.get('tags', [])
if tags:
tags_str = ', '.join(tags[:10]) # Limit to first 10 tags
section.append(f"## Tags: {tags_str}")
section.append("")
# Thumbnail
thumbnail = video.get('thumbnail', '')
if thumbnail:
section.append(f"## Thumbnail: {thumbnail}")
section.append("")
# Description
section.append("## Description:")
description = video.get('description', '')
if description:
# Limit description to first 500 characters
if len(description) > 500:
description = description[:500] + "..."
section.append(description)
section.append("")
# Separator
section.append("-" * 50)
section.append("")
markdown_sections.append('\n'.join(section))
return '\n'.join(markdown_sections)
def get_incremental_items(self, items: List[Dict[str, Any]], state: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Get only new videos since last sync."""
if not state:
return items
last_video_id = state.get('last_video_id')
last_video_date = state.get('last_video_date')
if not last_video_id:
return items
# Filter for videos newer than the last synced
new_items = []
for item in items:
video_id = item.get('id')
upload_date = item.get('upload_date', '')
# Check if this is a new video
if video_id == last_video_id:
break # Found the last synced video, stop here
# Also check by date as backup
if upload_date and last_video_date and upload_date <= last_video_date:
continue
new_items.append(item)
return new_items
def update_state(self, state: Dict[str, Any], items: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Update state with latest video information."""
if not items:
return state
# Get the first item (most recent)
latest_item = items[0]
state['last_video_id'] = latest_item.get('id')
state['last_video_date'] = latest_item.get('upload_date')
state['last_video_title'] = latest_item.get('title')
state['last_sync'] = datetime.now(self.tz).isoformat()
state['video_count'] = len(items)
return state

View file

@ -0,0 +1,233 @@
import pytest
from unittest.mock import Mock, patch, MagicMock, call
from datetime import datetime
from pathlib import Path
import random
from src.youtube_scraper import YouTubeScraper
from src.base_scraper import ScraperConfig
class TestYouTubeScraper:
@pytest.fixture
def config(self):
return ScraperConfig(
source_name="youtube",
brand_name="hvacknowitall",
data_dir=Path("data"),
logs_dir=Path("logs"),
timezone="America/Halifax"
)
@pytest.fixture
def mock_env(self):
with patch.dict('os.environ', {
'YOUTUBE_USERNAME': 'test@example.com',
'YOUTUBE_PASSWORD': 'test_password',
'YOUTUBE_CHANNEL_URL': 'https://www.youtube.com/@HVACKnowItAll'
}):
yield
@pytest.fixture
def sample_video_info(self):
return {
'id': 'abc123',
'title': 'HVAC Maintenance Tips',
'description': 'Learn how to maintain your HVAC system',
'uploader': 'HVAC Know It All',
'upload_date': '20240101',
'view_count': 1500,
'like_count': 100,
'comment_count': 25,
'duration': 600,
'webpage_url': 'https://www.youtube.com/watch?v=abc123',
'thumbnail': 'https://i.ytimg.com/vi/abc123/maxresdefault.jpg',
'tags': ['hvac', 'maintenance', 'tips']
}
def test_initialization(self, config, mock_env):
scraper = YouTubeScraper(config)
assert scraper.config == config
assert scraper.username == 'test@example.com'
assert scraper.password == 'test_password'
assert scraper.channel_url == 'https://www.youtube.com/@HVACKnowItAll'
@patch('yt_dlp.YoutubeDL')
def test_setup_ydl_options(self, mock_ydl_class, config, mock_env):
scraper = YouTubeScraper(config)
options = scraper._get_ydl_options()
# Check key options
assert options['quiet'] == True
assert options['no_warnings'] == True
assert options['extract_flat'] == False
assert 'username' in options
assert 'password' in options
assert 'cookiefile' in options
assert 'ratelimit' in options
@patch('yt_dlp.YoutubeDL')
def test_fetch_channel_videos(self, mock_ydl_class, config, mock_env, sample_video_info):
mock_ydl = MagicMock()
mock_ydl_class.return_value.__enter__.return_value = mock_ydl
# Mock channel info with videos
mock_ydl.extract_info.return_value = {
'entries': [
sample_video_info,
{**sample_video_info, 'id': 'def456', 'title': 'Another Video'}
]
}
scraper = YouTubeScraper(config)
videos = scraper.fetch_channel_videos()
assert len(videos) == 2
assert videos[0]['id'] == 'abc123'
assert videos[1]['id'] == 'def456'
mock_ydl.extract_info.assert_called_once()
@patch('yt_dlp.YoutubeDL')
def test_fetch_video_details(self, mock_ydl_class, config, mock_env, sample_video_info):
mock_ydl = MagicMock()
mock_ydl_class.return_value.__enter__.return_value = mock_ydl
mock_ydl.extract_info.return_value = sample_video_info
scraper = YouTubeScraper(config)
video_info = scraper.fetch_video_details('abc123')
assert video_info['id'] == 'abc123'
assert video_info['title'] == 'HVAC Maintenance Tips'
mock_ydl.extract_info.assert_called_with(
'https://www.youtube.com/watch?v=abc123',
download=False
)
@patch('time.sleep')
@patch('random.uniform')
def test_humanized_delay(self, mock_uniform, mock_sleep, config, mock_env):
mock_uniform.return_value = 3.5
scraper = YouTubeScraper(config)
scraper._humanized_delay()
mock_uniform.assert_called_with(2, 5)
mock_sleep.assert_called_with(3.5)
def test_format_video_type(self, config, mock_env):
scraper = YouTubeScraper(config)
# Test short video
assert scraper._get_video_type({'duration': 50}) == 'short'
# Test regular video
assert scraper._get_video_type({'duration': 600}) == 'video'
# Test live stream
assert scraper._get_video_type({'is_live': True}) == 'live'
# Test missing duration
assert scraper._get_video_type({}) == 'video'
def test_format_markdown(self, config, mock_env):
scraper = YouTubeScraper(config)
videos = [
{
'id': 'abc123',
'title': 'HVAC Tips',
'description': 'Learn HVAC basics',
'uploader': 'HVAC Know It All',
'upload_date': '20240101',
'view_count': 1500,
'like_count': 100,
'comment_count': 25,
'duration': 600,
'webpage_url': 'https://www.youtube.com/watch?v=abc123',
'tags': ['hvac', 'tips'],
'type': 'video'
}
]
markdown = scraper.format_markdown(videos)
assert '# ID: abc123' in markdown
assert '## Title: HVAC Tips' in markdown
assert '## Type: video' in markdown
assert '## Author: HVAC Know It All' in markdown
assert '## Link: https://www.youtube.com/watch?v=abc123' in markdown
assert '## Views: 1500' in markdown
assert '## Likes: 100' in markdown
assert '## Comments: 25' in markdown
assert '## Duration: 600 seconds' in markdown
assert '## Upload Date: 2024-01-01' in markdown
assert '## Tags: hvac, tips' in markdown
def test_get_incremental_items(self, config, mock_env):
scraper = YouTubeScraper(config)
videos = [
{'id': 'video3', 'upload_date': '20240103'},
{'id': 'video2', 'upload_date': '20240102'},
{'id': 'video1', 'upload_date': '20240101'}
]
# Test with no previous state
state = {}
new_videos = scraper.get_incremental_items(videos, state)
assert len(new_videos) == 3
# Test with existing state
state = {'last_video_id': 'video2', 'last_video_date': '20240102'}
new_videos = scraper.get_incremental_items(videos, state)
assert len(new_videos) == 1
assert new_videos[0]['id'] == 'video3'
def test_update_state(self, config, mock_env):
scraper = YouTubeScraper(config)
state = {}
videos = [
{'id': 'video2', 'upload_date': '20240102'},
{'id': 'video1', 'upload_date': '20240101'}
]
updated_state = scraper.update_state(state, videos)
assert updated_state['last_video_id'] == 'video2'
assert updated_state['last_video_date'] == '20240102'
assert updated_state['video_count'] == 2
@patch('yt_dlp.YoutubeDL')
def test_error_handling(self, mock_ydl_class, config, mock_env):
mock_ydl = MagicMock()
mock_ydl_class.return_value.__enter__.return_value = mock_ydl
mock_ydl.extract_info.side_effect = Exception("Network error")
scraper = YouTubeScraper(config)
videos = scraper.fetch_channel_videos()
assert videos == []
@patch('yt_dlp.YoutubeDL')
@patch('time.sleep')
def test_fetch_content_with_rate_limiting(self, mock_sleep, mock_ydl_class, config, mock_env, sample_video_info):
mock_ydl = MagicMock()
mock_ydl_class.return_value.__enter__.return_value = mock_ydl
# Mock channel with multiple videos
mock_ydl.extract_info.side_effect = [
{'entries': [
{'id': 'video1', 'title': 'Video 1'},
{'id': 'video2', 'title': 'Video 2'}
]},
{**sample_video_info, 'id': 'video1'},
{**sample_video_info, 'id': 'video2'}
]
scraper = YouTubeScraper(config)
with patch.object(scraper, '_humanized_delay') as mock_delay:
videos = scraper.fetch_content()
assert len(videos) == 2
# Check that delay was called between video fetches (once for second video)
assert mock_delay.call_count >= 1