#!/usr/bin/env python3 """ LLM-Enhanced Blog Analysis Runner Uses Claude Sonnet 3.5 for high-volume content classification and Claude Opus 4.1 for strategic synthesis. Cost-optimized pipeline with traditional fallback. """ import asyncio import logging import argparse from pathlib import Path from datetime import datetime import json # Import LLM-enhanced modules from src.competitive_intelligence.blog_analysis.llm_enhanced import ( LLMOrchestrator, PipelineConfig ) # Import traditional modules for comparison from src.competitive_intelligence.blog_analysis import ( BlogTopicAnalyzer, ContentGapAnalyzer ) from src.competitive_intelligence.blog_analysis.topic_opportunity_matrix import ( TopicOpportunityMatrixGenerator ) # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) async def main(): parser = argparse.ArgumentParser(description='LLM-Enhanced Blog Analysis') # Analysis options parser.add_argument('--mode', choices=['llm', 'traditional', 'compare'], default='llm', help='Analysis mode') # Budget controls parser.add_argument('--max-budget', type=float, default=5.0, help='Maximum budget in USD for LLM calls') parser.add_argument('--items-limit', type=int, default=500, help='Maximum items to process with LLM') # Data directories parser.add_argument('--competitive-data-dir', default='data/competitive_intelligence', help='Directory containing competitive intelligence data') parser.add_argument('--hkia-blog-dir', default='data/markdown_current', help='Directory containing existing HKIA blog content') parser.add_argument('--output-dir', default='analysis_results/llm_enhanced', help='Directory for analysis output files') # Processing options parser.add_argument('--min-engagement', type=float, default=3.0, help='Minimum engagement rate for LLM processing') parser.add_argument('--use-cache', action='store_true', help='Use cached classifications if available') parser.add_argument('--dry-run', action='store_true', help='Show what would be processed without making API calls') parser.add_argument('--verbose', action='store_true', help='Enable verbose logging') args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Setup directories competitive_data_dir = Path(args.competitive_data_dir) hkia_blog_dir = Path(args.hkia_blog_dir) output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) # Check for alternative blog locations if not hkia_blog_dir.exists(): alternative_paths = [ Path('/mnt/nas/hvacknowitall/markdown_current'), Path('test_data/markdown_current') ] for alt_path in alternative_paths: if alt_path.exists(): logger.info(f"Using alternative blog path: {alt_path}") hkia_blog_dir = alt_path break logger.info("=" * 60) logger.info("LLM-ENHANCED BLOG ANALYSIS") logger.info("=" * 60) logger.info(f"Mode: {args.mode}") logger.info(f"Max Budget: ${args.max_budget:.2f}") logger.info(f"Items Limit: {args.items_limit}") logger.info(f"Min Engagement: {args.min_engagement}") logger.info(f"Competitive Data: {competitive_data_dir}") logger.info(f"HKIA Blog Data: {hkia_blog_dir}") logger.info(f"Output Directory: {output_dir}") logger.info("=" * 60) if args.dry_run: logger.info("DRY RUN MODE - No API calls will be made") return await dry_run_analysis(competitive_data_dir, args) try: if args.mode == 'llm': await run_llm_analysis( competitive_data_dir, hkia_blog_dir, output_dir, args ) elif args.mode == 'traditional': run_traditional_analysis( competitive_data_dir, hkia_blog_dir, output_dir ) elif args.mode == 'compare': await run_comparison_analysis( competitive_data_dir, hkia_blog_dir, output_dir, args ) except Exception as e: logger.error(f"Analysis failed: {e}") import traceback traceback.print_exc() return 1 return 0 async def run_llm_analysis(competitive_data_dir: Path, hkia_blog_dir: Path, output_dir: Path, args): """Run LLM-enhanced analysis pipeline""" logger.info("\nšŸš€ Starting LLM-Enhanced Analysis Pipeline") # Configure pipeline config = PipelineConfig( max_budget=args.max_budget, min_engagement_for_llm=args.min_engagement, max_items_per_source=args.items_limit, enable_caching=args.use_cache ) # Initialize orchestrator orchestrator = LLMOrchestrator(config) # Progress callback def progress_update(message: str): logger.info(f" šŸ“Š {message}") # Run pipeline result = await orchestrator.run_analysis_pipeline( competitive_data_dir, hkia_blog_dir, progress_update ) # Display results logger.info("\nšŸ“ˆ ANALYSIS RESULTS") logger.info("=" * 60) if result.success: logger.info(f"āœ… Analysis completed successfully") logger.info(f"ā±ļø Processing time: {result.processing_time:.1f} seconds") logger.info(f"šŸ’° Total cost: ${result.cost_breakdown['total']:.2f}") logger.info(f" - Sonnet: ${result.cost_breakdown.get('sonnet', 0):.2f}") logger.info(f" - Opus: ${result.cost_breakdown.get('opus', 0):.2f}") # Display metrics if result.pipeline_metrics: logger.info(f"\nšŸ“Š Processing Metrics:") logger.info(f" - Total items: {result.pipeline_metrics.get('total_items_processed', 0)}") logger.info(f" - LLM processed: {result.pipeline_metrics.get('llm_items_processed', 0)}") logger.info(f" - Cache hits: {result.pipeline_metrics.get('cache_hits', 0)}") # Display strategic insights if result.strategic_analysis: logger.info(f"\nšŸŽÆ Strategic Insights:") logger.info(f" - High priority opportunities: {len(result.strategic_analysis.high_priority_opportunities)}") logger.info(f" - Content series identified: {len(result.strategic_analysis.content_series_opportunities)}") logger.info(f" - Emerging topics: {len(result.strategic_analysis.emerging_topics)}") # Show top opportunities logger.info(f"\nšŸ“ Top Content Opportunities:") for i, opp in enumerate(result.strategic_analysis.high_priority_opportunities[:5], 1): logger.info(f" {i}. {opp.topic}") logger.info(f" - Type: {opp.opportunity_type}") logger.info(f" - Impact: {opp.business_impact:.0%}") logger.info(f" - Advantage: {opp.competitive_advantage}") else: logger.error(f"āŒ Analysis failed") for error in result.errors: logger.error(f" - {error}") # Export results orchestrator.export_pipeline_result(result, output_dir) logger.info(f"\nšŸ“ Results exported to: {output_dir}") return result def run_traditional_analysis(competitive_data_dir: Path, hkia_blog_dir: Path, output_dir: Path): """Run traditional keyword-based analysis for comparison""" logger.info("\nšŸ“Š Running Traditional Analysis") timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # Step 1: Topic Analysis logger.info(" 1. Analyzing topics...") topic_analyzer = BlogTopicAnalyzer(competitive_data_dir) topic_analysis = topic_analyzer.analyze_competitive_content() topic_output = output_dir / f'traditional_topic_analysis_{timestamp}.json' topic_analyzer.export_analysis(topic_analysis, topic_output) # Step 2: Content Gap Analysis logger.info(" 2. Analyzing content gaps...") gap_analyzer = ContentGapAnalyzer(competitive_data_dir, hkia_blog_dir) gap_analysis = gap_analyzer.analyze_content_gaps(topic_analysis.__dict__) gap_output = output_dir / f'traditional_gap_analysis_{timestamp}.json' gap_analyzer.export_gap_analysis(gap_analysis, gap_output) # Step 3: Opportunity Matrix logger.info(" 3. Generating opportunity matrix...") matrix_generator = TopicOpportunityMatrixGenerator() opportunity_matrix = matrix_generator.generate_matrix(topic_analysis, gap_analysis) matrix_output = output_dir / f'traditional_opportunity_matrix_{timestamp}' matrix_generator.export_matrix(opportunity_matrix, matrix_output) # Display summary logger.info(f"\nšŸ“Š Traditional Analysis Summary:") logger.info(f" - Primary topics: {len(topic_analysis.primary_topics)}") logger.info(f" - High opportunities: {len(opportunity_matrix.high_priority_opportunities)}") logger.info(f" - Processing time: <1 minute") logger.info(f" - Cost: $0.00") return topic_analysis, gap_analysis, opportunity_matrix async def run_comparison_analysis(competitive_data_dir: Path, hkia_blog_dir: Path, output_dir: Path, args): """Run both LLM and traditional analysis for comparison""" logger.info("\nšŸ”„ Running Comparison Analysis") # Run traditional first (fast and free) logger.info("\n--- Traditional Analysis ---") trad_topic, trad_gap, trad_matrix = run_traditional_analysis( competitive_data_dir, hkia_blog_dir, output_dir ) # Run LLM analysis logger.info("\n--- LLM-Enhanced Analysis ---") llm_result = await run_llm_analysis( competitive_data_dir, hkia_blog_dir, output_dir, args ) # Compare results logger.info("\nšŸ“Š COMPARISON RESULTS") logger.info("=" * 60) # Topic diversity comparison trad_topics = len(trad_topic.primary_topics) + len(trad_topic.secondary_topics) if llm_result.classified_content and 'statistics' in llm_result.classified_content: llm_topics = len(llm_result.classified_content['statistics'].get('topic_frequency', {})) else: llm_topics = 0 logger.info(f"Topic Diversity:") logger.info(f" Traditional: {trad_topics} topics") logger.info(f" LLM-Enhanced: {llm_topics} topics") logger.info(f" Improvement: {((llm_topics / max(trad_topics, 1)) - 1) * 100:.0f}%") # Cost-benefit analysis logger.info(f"\nCost-Benefit:") logger.info(f" Traditional: $0.00 for {trad_topics} topics") logger.info(f" LLM-Enhanced: ${llm_result.cost_breakdown['total']:.2f} for {llm_topics} topics") if llm_topics > 0: logger.info(f" Cost per topic: ${llm_result.cost_breakdown['total'] / llm_topics:.3f}") # Export comparison comparison_data = { 'timestamp': datetime.now().isoformat(), 'traditional': { 'topics_found': trad_topics, 'processing_time': 'sub-second', 'cost': 0 }, 'llm_enhanced': { 'topics_found': llm_topics, 'processing_time': f"{llm_result.processing_time:.1f}s", 'cost': llm_result.cost_breakdown['total'] }, 'improvement_factor': llm_topics / max(trad_topics, 1) } comparison_path = output_dir / f"comparison_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" comparison_path.write_text(json.dumps(comparison_data, indent=2)) return llm_result async def dry_run_analysis(competitive_data_dir: Path, args): """Show what would be processed without making API calls""" logger.info("\nšŸ” DRY RUN ANALYSIS") # Load content orchestrator = LLMOrchestrator(PipelineConfig( min_engagement_for_llm=args.min_engagement, max_items_per_source=args.items_limit ), dry_run=True) content_items = orchestrator._load_competitive_content(competitive_data_dir) tiered_content = orchestrator._tier_content_for_processing(content_items) # Display statistics logger.info(f"\nContent Statistics:") logger.info(f" Total items found: {len(content_items)}") logger.info(f" Full analysis tier: {len(tiered_content['full_analysis'])}") logger.info(f" Classification tier: {len(tiered_content['classification'])}") logger.info(f" Traditional tier: {len(tiered_content['traditional'])}") # Estimate costs llm_items = tiered_content['full_analysis'] + tiered_content['classification'] estimated_sonnet = len(llm_items) * 0.002 estimated_opus = 2.0 total_estimate = estimated_sonnet + estimated_opus logger.info(f"\nCost Estimates:") logger.info(f" Sonnet classification: ${estimated_sonnet:.2f}") logger.info(f" Opus synthesis: ${estimated_opus:.2f}") logger.info(f" Total estimated cost: ${total_estimate:.2f}") if total_estimate > args.max_budget: logger.warning(f" āš ļø Exceeds budget of ${args.max_budget:.2f}") reduced_items = int(args.max_budget * 0.3 / 0.002) logger.info(f" Would reduce to {reduced_items} items to fit budget") # Show sample items logger.info(f"\nSample items for LLM processing:") for item in llm_items[:5]: logger.info(f" - {item.get('title', 'N/A')[:60]}...") logger.info(f" Source: {item.get('source', 'unknown')}") logger.info(f" Engagement: {item.get('engagement_rate', 0):.1f}%") if __name__ == '__main__': exit(asyncio.run(main()))