hvac-kia-content/youtube_backlog_all_with_transcripts.py
Ben Reed daab901e35 refactor: Update naming convention from hvacknowitall to hkia
Major Changes:
- Updated all code references from hvacknowitall/hvacnkowitall to hkia
- Renamed all existing markdown files to use hkia_ prefix
- Updated configuration files, scrapers, and production scripts
- Modified systemd service descriptions to use HKIA
- Changed NAS sync path to /mnt/nas/hkia

Files Updated:
- 20+ source files updated with new naming convention
- 34 markdown files renamed to hkia_* format
- All ScraperConfig brand_name parameters now use 'hkia'
- Documentation updated to reflect new naming

Rationale:
- Shorter, cleaner filenames
- Consistent branding across all outputs
- Easier to type and reference
- Maintains same functionality with improved naming

Next Steps:
- Deploy updated services to production
- Update any external references to old naming
- Monitor scrapers to ensure proper operation

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-19 13:35:23 -03:00

198 lines
No EOL
7.3 KiB
Python

#!/usr/bin/env python3
"""
YouTube Backlog Capture: ALL AVAILABLE VIDEOS with Transcripts
Fetches all available videos (approximately 370) with full transcript extraction
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from src.base_scraper import ScraperConfig
from src.youtube_scraper import YouTubeScraper
from datetime import datetime
import logging
import time
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('youtube_backlog_all_transcripts.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def test_authentication():
"""Test authentication before starting full backlog"""
logger.info("🔐 Testing YouTube authentication...")
config = ScraperConfig(
source_name="youtube_test",
brand_name="hvacknowitall",
data_dir=Path("test_data/auth_test"),
logs_dir=Path("test_logs/auth_test"),
timezone="America/Halifax"
)
scraper = YouTubeScraper(config)
auth_status = scraper.auth_handler.get_status()
if not auth_status['has_valid_cookies']:
logger.error("❌ Authentication failed")
return False
# Test with single video
logger.info("Testing single video extraction...")
test_video = scraper.fetch_video_details("TpdYT_itu9U", fetch_transcript=True)
if not test_video:
logger.error("❌ Failed to fetch test video")
return False
if not test_video.get('transcript'):
logger.error("❌ Failed to fetch test transcript")
return False
logger.info(f"✅ Authentication test passed")
logger.info(f"✅ Transcript test passed ({len(test_video['transcript'])} chars)")
return True
def fetch_all_videos_with_transcripts():
"""Fetch ALL available YouTube videos with transcripts"""
logger.info("🎥 YOUTUBE FULL BACKLOG: Fetching ALL videos with transcripts")
logger.info("Expected: ~370 videos (entire channel history)")
logger.info("Estimated time: 20-30 minutes")
logger.info("=" * 70)
# Create config for production backlog
config = ScraperConfig(
source_name="youtube",
brand_name="hvacknowitall",
data_dir=Path("data_production_backlog"),
logs_dir=Path("logs_production_backlog"),
timezone="America/Halifax"
)
# Initialize scraper
scraper = YouTubeScraper(config)
# Clear any existing state for full backlog
if scraper.state_file.exists():
scraper.state_file.unlink()
logger.info("Cleared existing state for full backlog capture")
start_time = time.time()
try:
# Fetch ALL videos with transcripts (no max_posts limit = all videos)
logger.info("Starting full backlog capture with transcripts...")
videos = scraper.fetch_content(fetch_transcripts=True) # No max_posts = all videos
if not videos:
logger.error("❌ No videos fetched")
return False
# Count videos with transcripts
transcript_count = sum(1 for video in videos if video.get('transcript'))
total_transcript_chars = sum(len(video.get('transcript', '')) for video in videos)
# Generate markdown
logger.info("\nGenerating comprehensive markdown with transcripts...")
markdown = scraper.format_markdown(videos)
# Save with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"hvacknowitall_youtube_full_backlog_transcripts_{timestamp}.md"
output_dir = config.data_dir / "markdown_current"
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / filename
output_file.write_text(markdown, encoding='utf-8')
# Calculate duration and stats
duration = time.time() - start_time
avg_time_per_video = duration / len(videos)
# Final statistics
logger.info("\n" + "=" * 70)
logger.info("🎉 YOUTUBE FULL BACKLOG CAPTURE COMPLETE")
logger.info(f"📊 FINAL STATISTICS:")
logger.info(f" Total videos fetched: {len(videos)}")
logger.info(f" Videos with transcripts: {transcript_count}")
logger.info(f" Transcript success rate: {transcript_count/len(videos)*100:.1f}%")
logger.info(f" Total transcript characters: {total_transcript_chars:,}")
logger.info(f" Average transcript length: {total_transcript_chars/transcript_count if transcript_count > 0 else 0:,.0f} chars")
logger.info(f" Total processing time: {duration/60:.1f} minutes")
logger.info(f" Average time per video: {avg_time_per_video:.1f} seconds")
logger.info(f" Markdown file size: {output_file.stat().st_size / 1024 / 1024:.1f} MB")
logger.info(f"📄 Saved to: {output_file}")
# Validation check
expected_minimum = 300 # Expect at least 300 videos
if len(videos) < expected_minimum:
logger.warning(f"⚠️ Only {len(videos)} videos captured, expected ~370")
else:
logger.info(f"✅ Captured {len(videos)} videos - full backlog complete")
# Show transcript quality samples
logger.info(f"\n📝 TRANSCRIPT QUALITY SAMPLES:")
transcript_videos = [v for v in videos if v.get('transcript')][:5]
for i, video in enumerate(transcript_videos):
title = video.get('title', 'Unknown')[:40] + "..."
transcript = video.get('transcript', '')
logger.info(f" {i+1}. {title}")
logger.info(f" Length: {len(transcript):,} chars")
preview = transcript[:80] + "..." if len(transcript) > 80 else transcript
logger.info(f" Preview: {preview}")
return True
except Exception as e:
logger.error(f"❌ Backlog capture failed: {e}")
import traceback
logger.error(traceback.format_exc())
return False
def main():
"""Main execution with proper testing pipeline"""
print("\n🎥 YouTube Full Backlog Capture with Transcripts")
print("=" * 55)
print("This will capture ALL available YouTube videos (~370) with transcripts")
print("Expected time: 20-30 minutes")
print("Output: Complete backlog markdown with transcripts")
# Step 1: Test authentication
print("\nStep 1: Testing authentication...")
if not test_authentication():
print("❌ Authentication test failed. Please ensure you're logged into YouTube in Firefox.")
return False
print("✅ Authentication test passed")
# Step 2: Confirm full backlog
print(f"\nStep 2: Ready to capture full backlog")
print("Press Enter to start full backlog capture or Ctrl+C to cancel...")
try:
input()
except KeyboardInterrupt:
print("\nCancelled by user")
return False
# Step 3: Execute full backlog
return fetch_all_videos_with_transcripts()
if __name__ == "__main__":
try:
success = main()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
logger.info("\nBacklog capture interrupted by user")
sys.exit(1)
except Exception as e:
logger.critical(f"Backlog capture failed: {e}")
sys.exit(2)