import os import time import requests from typing import Any, Dict, List, Optional from datetime import datetime from src.base_scraper import BaseScraper, ScraperConfig class WordPressScraper(BaseScraper): def __init__(self, config: ScraperConfig): super().__init__(config) self.base_url = os.getenv('WORDPRESS_URL', 'https://hvacknowitall.com/') self.username = os.getenv('WORDPRESS_USERNAME') self.api_key = os.getenv('WORDPRESS_API_KEY') self.auth = (self.username, self.api_key) # Ensure base_url ends with / if not self.base_url.endswith('/'): self.base_url += '/' # Cache for authors, categories, and tags self.author_cache = {} self.category_cache = {} self.tag_cache = {} def fetch_posts(self, max_posts: Optional[int] = None) -> List[Dict[str, Any]]: """Fetch posts from WordPress API with pagination.""" posts = [] page = 1 # Optimize per_page based on max_posts if max_posts and max_posts <= 100: per_page = max_posts else: per_page = 100 # WordPress max try: while True: self.logger.info(f"Fetching posts page {page} (per_page={per_page})") # Use session with retry logic from base class response = self.make_request( 'GET', f"{self.base_url}wp-json/wp/v2/posts", params={'per_page': per_page, 'page': page}, auth=self.auth, timeout=30 ) if response.status_code != 200: self.logger.error(f"Error fetching posts: {response.status_code}") break page_posts = response.json() if not page_posts: break posts.extend(page_posts) # Check if we have enough posts if max_posts and len(posts) >= max_posts: posts = posts[:max_posts] break # Check if there are more pages total_pages = int(response.headers.get('X-WP-TotalPages', 1)) if page >= total_pages: break page += 1 time.sleep(1) # Rate limiting except Exception as e: self.logger.error(f"Error fetching posts: {e}") self.logger.info(f"Fetched {len(posts)} posts total") return posts def fetch_author(self, author_id: int) -> Dict[str, Any]: """Fetch author information.""" if author_id in self.author_cache: return self.author_cache[author_id] try: response = self.make_request( 'GET', f"{self.base_url}wp-json/wp/v2/users/{author_id}", auth=self.auth, timeout=30 ) if response.status_code == 200: author = response.json() self.author_cache[author_id] = author return author except Exception as e: self.logger.error(f"Error fetching author {author_id}: {e}") return {'name': 'Unknown'} def fetch_categories(self, category_ids: List[int]) -> List[Dict[str, Any]]: """Fetch category information.""" categories = [] for cat_id in category_ids: if cat_id in self.category_cache: categories.append(self.category_cache[cat_id]) continue try: response = self.make_request( 'GET', f"{self.base_url}wp-json/wp/v2/categories/{cat_id}", auth=self.auth, timeout=30 ) if response.status_code == 200: category = response.json() self.category_cache[cat_id] = category categories.append(category) except Exception as e: self.logger.error(f"Error fetching category {cat_id}: {e}") return categories def fetch_tags(self, tag_ids: List[int]) -> List[Dict[str, Any]]: """Fetch tag information.""" tags = [] for tag_id in tag_ids: if tag_id in self.tag_cache: tags.append(self.tag_cache[tag_id]) continue try: response = self.make_request( 'GET', f"{self.base_url}wp-json/wp/v2/tags/{tag_id}", auth=self.auth, timeout=30 ) if response.status_code == 200: tag = response.json() self.tag_cache[tag_id] = tag tags.append(tag) except Exception as e: self.logger.error(f"Error fetching tag {tag_id}: {e}") return tags def count_words(self, html_content: str) -> int: """Count words in HTML content.""" # Convert to markdown first to get clean text text = self.convert_to_markdown(html_content) # Simple word count words = text.split() return len(words) def fetch_content(self, max_items: Optional[int] = None) -> List[Dict[str, Any]]: """Fetch and enrich content.""" posts = self.fetch_posts(max_posts=max_items) # Enrich posts with author, category, and tag information enriched_posts = [] for post in posts: try: # Fetch author info author = self.fetch_author(post.get('author', 0)) post['author_name'] = author.get('name', 'Unknown') # Fetch categories category_ids = post.get('categories', []) if category_ids: categories = self.fetch_categories(category_ids) post['category_names'] = [cat.get('name', '') for cat in categories] else: post['category_names'] = [] # Fetch tags tag_ids = post.get('tags', []) if tag_ids: tags = self.fetch_tags(tag_ids) post['tag_names'] = [tag.get('name', '') for tag in tags] else: post['tag_names'] = [] # Count words content_html = post.get('content', {}).get('rendered', '') post['word_count'] = self.count_words(content_html) enriched_posts.append(post) except Exception as e: self.logger.error(f"Error enriching post {post.get('id')}: {e}") enriched_posts.append(post) return enriched_posts def format_markdown(self, posts: List[Dict[str, Any]]) -> str: """Format posts as markdown.""" markdown_sections = [] for post in posts: section = [] # ID section.append(f"# ID: {post.get('id', 'N/A')}") section.append("") # Title title = post.get('title', {}).get('rendered', 'Untitled') section.append(f"## Title: {title}") section.append("") # Type section.append("## Type: blog_post") section.append("") # Author author = post.get('author_name', 'Unknown') section.append(f"## Author: {author}") section.append("") # Publish Date date = post.get('date', '') section.append(f"## Publish Date: {date}") section.append("") # Word Count word_count = post.get('word_count', 0) section.append(f"## Word Count: {word_count}") section.append("") # Categories categories = ', '.join(post.get('category_names', [])) section.append(f"## Categories: {categories if categories else 'None'}") section.append("") # Tags tags = ', '.join(post.get('tag_names', [])) section.append(f"## Tags: {tags if tags else 'None'}") section.append("") # Permalink link = post.get('link', '') section.append(f"## Permalink: {link}") section.append("") # Description/Content section.append("## Description:") content_html = post.get('content', {}).get('rendered', '') if content_html: content_md = self.convert_to_markdown(content_html) section.append(content_md) else: excerpt_html = post.get('excerpt', {}).get('rendered', '') if excerpt_html: excerpt_md = self.convert_to_markdown(excerpt_html) section.append(excerpt_md) section.append("") # Separator section.append("-" * 50) section.append("") markdown_sections.append('\n'.join(section)) return '\n'.join(markdown_sections) def get_incremental_items(self, items: List[Dict[str, Any]], state: Dict[str, Any]) -> List[Dict[str, Any]]: """Get only new posts since last sync.""" if not state: # No previous state, return all items return items last_post_id = state.get('last_post_id') last_post_date = state.get('last_post_date') if not last_post_id: return items # Filter for posts newer than the last synced post new_items = [] for item in items: post_id = item.get('id') post_date = item.get('date') # Check if this is a new post if post_id > last_post_id or (post_date and post_date > last_post_date): new_items.append(item) return new_items def update_state(self, state: Dict[str, Any], items: List[Dict[str, Any]]) -> Dict[str, Any]: """Update state with latest post information.""" if not items: return state # Sort by ID to get the latest sorted_items = sorted(items, key=lambda x: x.get('id', 0), reverse=True) latest_item = sorted_items[0] state['last_post_id'] = latest_item.get('id') state['last_post_date'] = latest_item.get('date') state['last_sync'] = datetime.now(self.tz).isoformat() state['post_count'] = len(items) return state