hvac-kia-content/src/tiktok_scraper.py
Ben Reed daab901e35 refactor: Update naming convention from hvacknowitall to hkia
Major Changes:
- Updated all code references from hvacknowitall/hvacnkowitall to hkia
- Renamed all existing markdown files to use hkia_ prefix
- Updated configuration files, scrapers, and production scripts
- Modified systemd service descriptions to use HKIA
- Changed NAS sync path to /mnt/nas/hkia

Files Updated:
- 20+ source files updated with new naming convention
- 34 markdown files renamed to hkia_* format
- All ScraperConfig brand_name parameters now use 'hkia'
- Documentation updated to reflect new naming

Rationale:
- Shorter, cleaner filenames
- Consistent branding across all outputs
- Easier to type and reference
- Maintains same functionality with improved naming

Next Steps:
- Deploy updated services to production
- Update any external references to old naming
- Monitor scrapers to ensure proper operation

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-19 13:35:23 -03:00

276 lines
No EOL
11 KiB
Python

#!/usr/bin/env python3
"""
TikTok scraper using TikTokApi library with Playwright.
"""
import os
import time
import random
import asyncio
from typing import Any, Dict, List, Optional
from datetime import datetime
from pathlib import Path
from TikTokApi import TikTokApi
from src.base_scraper import BaseScraper, ScraperConfig
class TikTokScraper(BaseScraper):
"""TikTok scraper using TikTokApi with Playwright."""
def __init__(self, config: ScraperConfig):
super().__init__(config)
self.username = os.getenv('TIKTOK_USERNAME')
self.password = os.getenv('TIKTOK_PASSWORD')
self.target_account = os.getenv('TIKTOK_TARGET', 'hkia')
# Session directory for persistence
self.session_dir = self.config.data_dir / '.sessions' / 'tiktok'
self.session_dir.mkdir(parents=True, exist_ok=True)
# Setup API
self.api = self._setup_api()
# Request counter for rate limiting
self.request_count = 0
self.max_requests_per_hour = 100
def _setup_api(self) -> TikTokApi:
"""Setup TikTokApi with conservative settings."""
# Note: In production, you'd get ms_token from browser cookies
# For now, we'll let the API try to get it automatically
# TikTokApi v7 has simplified parameters
return TikTokApi()
def _humanized_delay(self, min_seconds: float = 3, max_seconds: float = 7) -> None:
"""Add humanized random delay between requests."""
delay = random.uniform(min_seconds, max_seconds)
self.logger.debug(f"Waiting {delay:.2f} seconds...")
time.sleep(delay)
def _check_rate_limit(self) -> None:
"""Check and enforce rate limiting."""
self.request_count += 1
if self.request_count >= self.max_requests_per_hour:
self.logger.warning(f"Rate limit reached ({self.max_requests_per_hour} requests), pausing for 1 hour...")
time.sleep(3600) # Wait 1 hour
self.request_count = 0
elif self.request_count % 10 == 0:
# Take a longer break every 10 requests
self.logger.info("Taking extended break after 10 requests...")
self._humanized_delay(15, 30)
async def fetch_user_videos(self, max_videos: int = 20) -> List[Dict[str, Any]]:
"""Fetch videos from TikTok user profile."""
videos_data = []
try:
self.logger.info(f"Fetching videos from @{self.target_account}")
# Create sessions with Playwright
async with self.api:
# Try to get ms_token from environment or let API handle it
ms_token = os.getenv('TIKTOK_MS_TOKEN')
ms_tokens = [ms_token] if ms_token else []
await self.api.create_sessions(
ms_tokens=ms_tokens,
num_sessions=1,
sleep_after=3,
headless=True,
suppress_resource_load_types=["image", "media", "font", "stylesheet"]
)
# Get user object
user = self.api.user(self.target_account)
self._check_rate_limit()
# Get videos
count = 0
async for video in user.videos(count=max_videos):
if count >= max_videos:
break
try:
# Extract video data
video_data = {
'id': video.id,
'author': video.author.username,
'nickname': video.author.nickname,
'description': video.desc if hasattr(video, 'desc') else '',
'publish_date': datetime.fromtimestamp(video.create_time).isoformat() if hasattr(video, 'create_time') else '',
'link': f'https://www.tiktok.com/@{video.author.username}/video/{video.id}',
'views': video.stats.play_count if hasattr(video.stats, 'play_count') else 0,
'likes': video.stats.collect_count if hasattr(video.stats, 'collect_count') else 0,
'comments': video.stats.comment_count if hasattr(video.stats, 'comment_count') else 0,
'shares': video.stats.share_count if hasattr(video.stats, 'share_count') else 0,
'duration': video.duration if hasattr(video, 'duration') else 0,
'music': video.music.title if hasattr(video, 'music') and hasattr(video.music, 'title') else '',
'hashtags': video.hashtags if hasattr(video, 'hashtags') else []
}
videos_data.append(video_data)
count += 1
# Rate limiting
self._humanized_delay()
self._check_rate_limit()
# Log progress
if count % 5 == 0:
self.logger.info(f"Fetched {count}/{max_videos} videos")
except Exception as e:
self.logger.error(f"Error processing video: {e}")
continue
self.logger.info(f"Successfully fetched {len(videos_data)} videos")
except Exception as e:
self.logger.error(f"Error fetching videos: {e}")
return videos_data
def fetch_content(self) -> List[Dict[str, Any]]:
"""Synchronous wrapper for fetch_user_videos."""
# Run the async function in a new event loop
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# If there's already a running loop, create a new one in a thread
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, self.fetch_user_videos())
return future.result()
else:
return loop.run_until_complete(self.fetch_user_videos())
except RuntimeError:
# No event loop, create a new one
return asyncio.run(self.fetch_user_videos())
def format_markdown(self, videos: List[Dict[str, Any]]) -> str:
"""Format TikTok videos as markdown."""
markdown_sections = []
for video in videos:
section = []
# ID
video_id = video.get('id', 'N/A')
section.append(f"# ID: {video_id}")
section.append("")
# Author
author = video.get('author', 'Unknown')
section.append(f"## Author: {author}")
section.append("")
# Nickname
nickname = video.get('nickname', '')
if nickname:
section.append(f"## Nickname: {nickname}")
section.append("")
# Publish Date
pub_date = video.get('publish_date', '')
section.append(f"## Publish Date: {pub_date}")
section.append("")
# Link
link = video.get('link', '')
section.append(f"## Link: {link}")
section.append("")
# Views
views = video.get('views', 0)
section.append(f"## Views: {views}")
section.append("")
# Likes
likes = video.get('likes', 0)
section.append(f"## Likes: {likes}")
section.append("")
# Comments
comments = video.get('comments', 0)
section.append(f"## Comments: {comments}")
section.append("")
# Shares
shares = video.get('shares', 0)
section.append(f"## Shares: {shares}")
section.append("")
# Duration
duration = video.get('duration', 0)
section.append(f"## Duration: {duration} seconds")
section.append("")
# Music
music = video.get('music', '')
if music:
section.append(f"## Music: {music}")
section.append("")
# Hashtags
hashtags = video.get('hashtags', [])
if hashtags:
if isinstance(hashtags[0], dict):
# If hashtags are objects, extract the name
hashtags_str = ', '.join([h.get('name', '') for h in hashtags if h.get('name')])
else:
hashtags_str = ', '.join(hashtags)
section.append(f"## Hashtags: {hashtags_str}")
section.append("")
# Description
section.append("## Description:")
description = video.get('description', '')
if description:
# Limit description to first 500 characters
if len(description) > 500:
description = description[:500] + "..."
section.append(description)
section.append("")
# Separator
section.append("-" * 50)
section.append("")
markdown_sections.append('\n'.join(section))
return '\n'.join(markdown_sections)
def get_incremental_items(self, items: List[Dict[str, Any]], state: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Get only new videos since last sync."""
if not state:
return items
last_video_id = state.get('last_video_id')
if not last_video_id:
return items
# Filter for videos newer than the last synced
new_items = []
for item in items:
if item.get('id') == last_video_id:
break # Found the last synced video
new_items.append(item)
return new_items
def update_state(self, state: Dict[str, Any], items: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Update state with latest video information."""
if not items:
return state
# Get the first item (most recent)
latest_item = items[0]
state['last_video_id'] = latest_item.get('id')
state['last_video_date'] = latest_item.get('publish_date')
state['last_sync'] = datetime.now(self.tz).isoformat()
state['video_count'] = len(items)
return state