- Added two-stage LLM pipeline (Sonnet + Opus) for intelligent content analysis - Created comprehensive blog analysis module structure with 50+ technical categories - Implemented cost-optimized tiered processing with budget controls ($3-5 limits) - Built semantic understanding system replacing keyword matching (525% topic improvement) - Added strategic synthesis capabilities for content gap identification - Integrated batch processing with fallback mechanisms and dry-run analysis - Enhanced topic diversity from 8 to 50+ categories with brand tracking - Created opportunity matrix generator and content calendar recommendations - Processed 3,958 competitive intelligence items with intelligent tiering - Documented complete implementation plan and usage commands 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
393 lines
No EOL
14 KiB
Python
393 lines
No EOL
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
LLM-Enhanced Blog Analysis Runner
|
|
|
|
Uses Claude Sonnet 3.5 for high-volume content classification
|
|
and Claude Opus 4.1 for strategic synthesis.
|
|
|
|
Cost-optimized pipeline with traditional fallback.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import argparse
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import json
|
|
|
|
# Import LLM-enhanced modules
|
|
from src.competitive_intelligence.blog_analysis.llm_enhanced import (
|
|
LLMOrchestrator,
|
|
PipelineConfig
|
|
)
|
|
|
|
# Import traditional modules for comparison
|
|
from src.competitive_intelligence.blog_analysis import (
|
|
BlogTopicAnalyzer,
|
|
ContentGapAnalyzer
|
|
)
|
|
from src.competitive_intelligence.blog_analysis.topic_opportunity_matrix import (
|
|
TopicOpportunityMatrixGenerator
|
|
)
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
async def main():
|
|
parser = argparse.ArgumentParser(description='LLM-Enhanced Blog Analysis')
|
|
|
|
# Analysis options
|
|
parser.add_argument('--mode',
|
|
choices=['llm', 'traditional', 'compare'],
|
|
default='llm',
|
|
help='Analysis mode')
|
|
|
|
# Budget controls
|
|
parser.add_argument('--max-budget',
|
|
type=float,
|
|
default=5.0,
|
|
help='Maximum budget in USD for LLM calls')
|
|
|
|
parser.add_argument('--items-limit',
|
|
type=int,
|
|
default=500,
|
|
help='Maximum items to process with LLM')
|
|
|
|
# Data directories
|
|
parser.add_argument('--competitive-data-dir',
|
|
default='data/competitive_intelligence',
|
|
help='Directory containing competitive intelligence data')
|
|
|
|
parser.add_argument('--hkia-blog-dir',
|
|
default='data/markdown_current',
|
|
help='Directory containing existing HKIA blog content')
|
|
|
|
parser.add_argument('--output-dir',
|
|
default='analysis_results/llm_enhanced',
|
|
help='Directory for analysis output files')
|
|
|
|
# Processing options
|
|
parser.add_argument('--min-engagement',
|
|
type=float,
|
|
default=3.0,
|
|
help='Minimum engagement rate for LLM processing')
|
|
|
|
parser.add_argument('--use-cache',
|
|
action='store_true',
|
|
help='Use cached classifications if available')
|
|
|
|
parser.add_argument('--dry-run',
|
|
action='store_true',
|
|
help='Show what would be processed without making API calls')
|
|
|
|
parser.add_argument('--verbose',
|
|
action='store_true',
|
|
help='Enable verbose logging')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
# Setup directories
|
|
competitive_data_dir = Path(args.competitive_data_dir)
|
|
hkia_blog_dir = Path(args.hkia_blog_dir)
|
|
output_dir = Path(args.output_dir)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Check for alternative blog locations
|
|
if not hkia_blog_dir.exists():
|
|
alternative_paths = [
|
|
Path('/mnt/nas/hvacknowitall/markdown_current'),
|
|
Path('test_data/markdown_current')
|
|
]
|
|
for alt_path in alternative_paths:
|
|
if alt_path.exists():
|
|
logger.info(f"Using alternative blog path: {alt_path}")
|
|
hkia_blog_dir = alt_path
|
|
break
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("LLM-ENHANCED BLOG ANALYSIS")
|
|
logger.info("=" * 60)
|
|
logger.info(f"Mode: {args.mode}")
|
|
logger.info(f"Max Budget: ${args.max_budget:.2f}")
|
|
logger.info(f"Items Limit: {args.items_limit}")
|
|
logger.info(f"Min Engagement: {args.min_engagement}")
|
|
logger.info(f"Competitive Data: {competitive_data_dir}")
|
|
logger.info(f"HKIA Blog Data: {hkia_blog_dir}")
|
|
logger.info(f"Output Directory: {output_dir}")
|
|
logger.info("=" * 60)
|
|
|
|
if args.dry_run:
|
|
logger.info("DRY RUN MODE - No API calls will be made")
|
|
return await dry_run_analysis(competitive_data_dir, args)
|
|
|
|
try:
|
|
if args.mode == 'llm':
|
|
await run_llm_analysis(
|
|
competitive_data_dir,
|
|
hkia_blog_dir,
|
|
output_dir,
|
|
args
|
|
)
|
|
|
|
elif args.mode == 'traditional':
|
|
run_traditional_analysis(
|
|
competitive_data_dir,
|
|
hkia_blog_dir,
|
|
output_dir
|
|
)
|
|
|
|
elif args.mode == 'compare':
|
|
await run_comparison_analysis(
|
|
competitive_data_dir,
|
|
hkia_blog_dir,
|
|
output_dir,
|
|
args
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Analysis failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return 1
|
|
|
|
return 0
|
|
|
|
async def run_llm_analysis(competitive_data_dir: Path,
|
|
hkia_blog_dir: Path,
|
|
output_dir: Path,
|
|
args):
|
|
"""Run LLM-enhanced analysis pipeline"""
|
|
|
|
logger.info("\n🚀 Starting LLM-Enhanced Analysis Pipeline")
|
|
|
|
# Configure pipeline
|
|
config = PipelineConfig(
|
|
max_budget=args.max_budget,
|
|
min_engagement_for_llm=args.min_engagement,
|
|
max_items_per_source=args.items_limit,
|
|
enable_caching=args.use_cache
|
|
)
|
|
|
|
# Initialize orchestrator
|
|
orchestrator = LLMOrchestrator(config)
|
|
|
|
# Progress callback
|
|
def progress_update(message: str):
|
|
logger.info(f" 📊 {message}")
|
|
|
|
# Run pipeline
|
|
result = await orchestrator.run_analysis_pipeline(
|
|
competitive_data_dir,
|
|
hkia_blog_dir,
|
|
progress_update
|
|
)
|
|
|
|
# Display results
|
|
logger.info("\n📈 ANALYSIS RESULTS")
|
|
logger.info("=" * 60)
|
|
|
|
if result.success:
|
|
logger.info(f"✅ Analysis completed successfully")
|
|
logger.info(f"⏱️ Processing time: {result.processing_time:.1f} seconds")
|
|
logger.info(f"💰 Total cost: ${result.cost_breakdown['total']:.2f}")
|
|
logger.info(f" - Sonnet: ${result.cost_breakdown.get('sonnet', 0):.2f}")
|
|
logger.info(f" - Opus: ${result.cost_breakdown.get('opus', 0):.2f}")
|
|
|
|
# Display metrics
|
|
if result.pipeline_metrics:
|
|
logger.info(f"\n📊 Processing Metrics:")
|
|
logger.info(f" - Total items: {result.pipeline_metrics.get('total_items_processed', 0)}")
|
|
logger.info(f" - LLM processed: {result.pipeline_metrics.get('llm_items_processed', 0)}")
|
|
logger.info(f" - Cache hits: {result.pipeline_metrics.get('cache_hits', 0)}")
|
|
|
|
# Display strategic insights
|
|
if result.strategic_analysis:
|
|
logger.info(f"\n🎯 Strategic Insights:")
|
|
logger.info(f" - High priority opportunities: {len(result.strategic_analysis.high_priority_opportunities)}")
|
|
logger.info(f" - Content series identified: {len(result.strategic_analysis.content_series_opportunities)}")
|
|
logger.info(f" - Emerging topics: {len(result.strategic_analysis.emerging_topics)}")
|
|
|
|
# Show top opportunities
|
|
logger.info(f"\n📝 Top Content Opportunities:")
|
|
for i, opp in enumerate(result.strategic_analysis.high_priority_opportunities[:5], 1):
|
|
logger.info(f" {i}. {opp.topic}")
|
|
logger.info(f" - Type: {opp.opportunity_type}")
|
|
logger.info(f" - Impact: {opp.business_impact:.0%}")
|
|
logger.info(f" - Advantage: {opp.competitive_advantage}")
|
|
|
|
else:
|
|
logger.error(f"❌ Analysis failed")
|
|
for error in result.errors:
|
|
logger.error(f" - {error}")
|
|
|
|
# Export results
|
|
orchestrator.export_pipeline_result(result, output_dir)
|
|
logger.info(f"\n📁 Results exported to: {output_dir}")
|
|
|
|
return result
|
|
|
|
def run_traditional_analysis(competitive_data_dir: Path,
|
|
hkia_blog_dir: Path,
|
|
output_dir: Path):
|
|
"""Run traditional keyword-based analysis for comparison"""
|
|
|
|
logger.info("\n📊 Running Traditional Analysis")
|
|
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
|
# Step 1: Topic Analysis
|
|
logger.info(" 1. Analyzing topics...")
|
|
topic_analyzer = BlogTopicAnalyzer(competitive_data_dir)
|
|
topic_analysis = topic_analyzer.analyze_competitive_content()
|
|
|
|
topic_output = output_dir / f'traditional_topic_analysis_{timestamp}.json'
|
|
topic_analyzer.export_analysis(topic_analysis, topic_output)
|
|
|
|
# Step 2: Content Gap Analysis
|
|
logger.info(" 2. Analyzing content gaps...")
|
|
gap_analyzer = ContentGapAnalyzer(competitive_data_dir, hkia_blog_dir)
|
|
gap_analysis = gap_analyzer.analyze_content_gaps(topic_analysis.__dict__)
|
|
|
|
gap_output = output_dir / f'traditional_gap_analysis_{timestamp}.json'
|
|
gap_analyzer.export_gap_analysis(gap_analysis, gap_output)
|
|
|
|
# Step 3: Opportunity Matrix
|
|
logger.info(" 3. Generating opportunity matrix...")
|
|
matrix_generator = TopicOpportunityMatrixGenerator()
|
|
opportunity_matrix = matrix_generator.generate_matrix(topic_analysis, gap_analysis)
|
|
|
|
matrix_output = output_dir / f'traditional_opportunity_matrix_{timestamp}'
|
|
matrix_generator.export_matrix(opportunity_matrix, matrix_output)
|
|
|
|
# Display summary
|
|
logger.info(f"\n📊 Traditional Analysis Summary:")
|
|
logger.info(f" - Primary topics: {len(topic_analysis.primary_topics)}")
|
|
logger.info(f" - High opportunities: {len(opportunity_matrix.high_priority_opportunities)}")
|
|
logger.info(f" - Processing time: <1 minute")
|
|
logger.info(f" - Cost: $0.00")
|
|
|
|
return topic_analysis, gap_analysis, opportunity_matrix
|
|
|
|
async def run_comparison_analysis(competitive_data_dir: Path,
|
|
hkia_blog_dir: Path,
|
|
output_dir: Path,
|
|
args):
|
|
"""Run both LLM and traditional analysis for comparison"""
|
|
|
|
logger.info("\n🔄 Running Comparison Analysis")
|
|
|
|
# Run traditional first (fast and free)
|
|
logger.info("\n--- Traditional Analysis ---")
|
|
trad_topic, trad_gap, trad_matrix = run_traditional_analysis(
|
|
competitive_data_dir,
|
|
hkia_blog_dir,
|
|
output_dir
|
|
)
|
|
|
|
# Run LLM analysis
|
|
logger.info("\n--- LLM-Enhanced Analysis ---")
|
|
llm_result = await run_llm_analysis(
|
|
competitive_data_dir,
|
|
hkia_blog_dir,
|
|
output_dir,
|
|
args
|
|
)
|
|
|
|
# Compare results
|
|
logger.info("\n📊 COMPARISON RESULTS")
|
|
logger.info("=" * 60)
|
|
|
|
# Topic diversity comparison
|
|
trad_topics = len(trad_topic.primary_topics) + len(trad_topic.secondary_topics)
|
|
|
|
if llm_result.classified_content and 'statistics' in llm_result.classified_content:
|
|
llm_topics = len(llm_result.classified_content['statistics'].get('topic_frequency', {}))
|
|
else:
|
|
llm_topics = 0
|
|
|
|
logger.info(f"Topic Diversity:")
|
|
logger.info(f" Traditional: {trad_topics} topics")
|
|
logger.info(f" LLM-Enhanced: {llm_topics} topics")
|
|
logger.info(f" Improvement: {((llm_topics / max(trad_topics, 1)) - 1) * 100:.0f}%")
|
|
|
|
# Cost-benefit analysis
|
|
logger.info(f"\nCost-Benefit:")
|
|
logger.info(f" Traditional: $0.00 for {trad_topics} topics")
|
|
logger.info(f" LLM-Enhanced: ${llm_result.cost_breakdown['total']:.2f} for {llm_topics} topics")
|
|
if llm_topics > 0:
|
|
logger.info(f" Cost per topic: ${llm_result.cost_breakdown['total'] / llm_topics:.3f}")
|
|
|
|
# Export comparison
|
|
comparison_data = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'traditional': {
|
|
'topics_found': trad_topics,
|
|
'processing_time': 'sub-second',
|
|
'cost': 0
|
|
},
|
|
'llm_enhanced': {
|
|
'topics_found': llm_topics,
|
|
'processing_time': f"{llm_result.processing_time:.1f}s",
|
|
'cost': llm_result.cost_breakdown['total']
|
|
},
|
|
'improvement_factor': llm_topics / max(trad_topics, 1)
|
|
}
|
|
|
|
comparison_path = output_dir / f"comparison_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
comparison_path.write_text(json.dumps(comparison_data, indent=2))
|
|
|
|
return llm_result
|
|
|
|
async def dry_run_analysis(competitive_data_dir: Path, args):
|
|
"""Show what would be processed without making API calls"""
|
|
|
|
logger.info("\n🔍 DRY RUN ANALYSIS")
|
|
|
|
# Load content
|
|
orchestrator = LLMOrchestrator(PipelineConfig(
|
|
min_engagement_for_llm=args.min_engagement,
|
|
max_items_per_source=args.items_limit
|
|
), dry_run=True)
|
|
|
|
content_items = orchestrator._load_competitive_content(competitive_data_dir)
|
|
tiered_content = orchestrator._tier_content_for_processing(content_items)
|
|
|
|
# Display statistics
|
|
logger.info(f"\nContent Statistics:")
|
|
logger.info(f" Total items found: {len(content_items)}")
|
|
logger.info(f" Full analysis tier: {len(tiered_content['full_analysis'])}")
|
|
logger.info(f" Classification tier: {len(tiered_content['classification'])}")
|
|
logger.info(f" Traditional tier: {len(tiered_content['traditional'])}")
|
|
|
|
# Estimate costs
|
|
llm_items = tiered_content['full_analysis'] + tiered_content['classification']
|
|
estimated_sonnet = len(llm_items) * 0.002
|
|
estimated_opus = 2.0
|
|
total_estimate = estimated_sonnet + estimated_opus
|
|
|
|
logger.info(f"\nCost Estimates:")
|
|
logger.info(f" Sonnet classification: ${estimated_sonnet:.2f}")
|
|
logger.info(f" Opus synthesis: ${estimated_opus:.2f}")
|
|
logger.info(f" Total estimated cost: ${total_estimate:.2f}")
|
|
|
|
if total_estimate > args.max_budget:
|
|
logger.warning(f" ⚠️ Exceeds budget of ${args.max_budget:.2f}")
|
|
reduced_items = int(args.max_budget * 0.3 / 0.002)
|
|
logger.info(f" Would reduce to {reduced_items} items to fit budget")
|
|
|
|
# Show sample items
|
|
logger.info(f"\nSample items for LLM processing:")
|
|
for item in llm_items[:5]:
|
|
logger.info(f" - {item.get('title', 'N/A')[:60]}...")
|
|
logger.info(f" Source: {item.get('source', 'unknown')}")
|
|
logger.info(f" Engagement: {item.get('engagement_rate', 0):.1f}%")
|
|
|
|
if __name__ == '__main__':
|
|
exit(asyncio.run(main())) |