Files
2025-12-17 18:17:05 -07:00

311 lines
11 KiB
Python

import os
import logging
import feedparser
import requests
import json
from datetime import datetime
from dateutil import parser as date_parser
from typing import List, Dict, Optional
from urllib.parse import urlencode, urlparse
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class BlogScraper:
"""RSS feed scraper that ingests blog posts into Directus CMS"""
def __init__(self):
self.rss_url = "https://haveibeenflocked.com/feed.xml"
self.directus_base_url = os.getenv("DIRECTUS_BASE_URL", "https://cms.deflock.me")
self.directus_token = os.getenv("DIRECTUS_API_TOKEN")
if not self.directus_token:
raise ValueError("DIRECTUS_API_TOKEN environment variable is required")
self.headers = {
"Authorization": f"Bearer {self.directus_token}",
"Content-Type": "application/json",
"User-Agent": "deflock-blog-scraper/1.0"
}
# Extract host from RSS URL for filtering
self.rss_host = urlparse(self.rss_url).netloc
def fetch_rss_feed(self) -> Optional[feedparser.FeedParserDict]:
"""Fetch and parse the RSS feed. Returns None if connection fails."""
logger.info(f"Fetching RSS feed from {self.rss_url}")
try:
feed = feedparser.parse(self.rss_url)
if feed.bozo:
logger.warning(f"Feed parsing warning: {feed.bozo_exception}")
logger.info(f"Successfully parsed RSS feed with {len(feed.entries)} entries")
return feed
except Exception as e:
logger.error(f"Error fetching RSS feed: {e}. Skipping sync to prevent data loss.")
return None
def get_existing_posts(self) -> List[Dict]:
"""Get all existing blog posts from Directus that have external URLs"""
logger.info("Fetching existing blog posts from Directus")
try:
# Filter for posts that have an externalUrl (RSS-ingested posts)
url = f"{self.directus_base_url}/items/blog"
# Properly format the filter as JSON and URL encode it
filter_obj = {
"externalUrl": {
"_nnull": True # not null
}
}
params = {
"filter": json.dumps(filter_obj),
"limit": -1 # Get all records
}
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
data = response.json()
posts = data.get("data", [])
logger.info(f"Found {len(posts)} existing RSS-ingested posts")
return posts
except Exception as e:
logger.error(f"Error fetching existing posts: {e}")
raise
def is_same_host_as_rss(self, url: str) -> bool:
"""Check if the given URL has the same host as the RSS feed"""
try:
url_host = urlparse(url).netloc
return url_host == self.rss_host
except Exception:
return False
def create_blog_post(self, post_data: Dict) -> Optional[Dict]:
"""Create a new blog post in Directus"""
logger.info(f"Creating new blog post: {post_data['title']}")
try:
url = f"{self.directus_base_url}/items/blog"
response = requests.post(url, headers=self.headers, json=post_data)
if response.status_code >= 400:
logger.error(f"HTTP {response.status_code} error response body: {response.text}")
response.raise_for_status()
created_post = response.json()
logger.info(f"Successfully created blog post with ID: {created_post['data']['id']}")
return created_post["data"]
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP error creating blog post '{post_data['title']}': {e}")
logger.error(f"Response content: {response.text if 'response' in locals() else 'No response available'}")
raise
except Exception as e:
logger.error(f"Error creating blog post '{post_data['title']}': {e}")
raise
def update_blog_post(self, post_id: int, post_data: Dict) -> Optional[Dict]:
"""Update an existing blog post in Directus"""
logger.info(f"Updating blog post ID {post_id}: {post_data['title']}")
try:
url = f"{self.directus_base_url}/items/blog/{post_id}"
response = requests.patch(url, headers=self.headers, json=post_data)
if response.status_code >= 400:
logger.error(f"HTTP {response.status_code} error response body: {response.text}")
response.raise_for_status()
updated_post = response.json()
logger.info(f"Successfully updated blog post ID: {post_id}")
return updated_post["data"]
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP error updating blog post ID {post_id}: {e}")
logger.error(f"Response content: {response.text if 'response' in locals() else 'No response available'}")
raise
except Exception as e:
logger.error(f"Error updating blog post ID {post_id}: {e}")
raise
def delete_blog_post(self, post_id: int) -> None:
"""Delete a blog post from Directus"""
logger.info(f"Deleting blog post ID {post_id}")
try:
url = f"{self.directus_base_url}/items/blog/{post_id}"
response = requests.delete(url, headers=self.headers)
response.raise_for_status()
logger.info(f"Successfully deleted blog post ID: {post_id}")
except Exception as e:
logger.error(f"Error deleting blog post ID {post_id}: {e}")
raise
def parse_feed_entry(self, entry) -> Dict:
"""Parse a feed entry into Directus blog post format"""
# Parse the publication date
pub_date = None
if hasattr(entry, 'published'):
try:
pub_date = date_parser.parse(entry.published).isoformat()
except Exception as e:
logger.warning(f"Could not parse date {entry.published}: {e}")
# Extract description from summary or content
description = ""
if hasattr(entry, 'summary'):
description = entry.summary
elif hasattr(entry, 'content') and entry.content:
# Take the first content item's value
description = entry.content[0].value if entry.content else ""
# Clean up the description (remove HTML tags if present)
# For production, you might want to use a proper HTML parser like BeautifulSoup
import re
description = re.sub(r'<[^>]+>', '', description)
description = description.strip()
post_data = {
"title": entry.title,
"description": description,
"externalUrl": entry.link,
"content": None, # RSS posts don't have content, just external links
}
if pub_date:
post_data["published"] = pub_date
return post_data
def sync_rss_posts(self) -> Dict[str, int]:
"""Main synchronization logic - ensures RSS feed matches Directus"""
logger.info("Starting RSS to Directus synchronization")
# Fetch RSS feed
feed = self.fetch_rss_feed()
# If feed fetch failed, return early to prevent data loss
if feed is None:
logger.warning("Skipping synchronization due to RSS feed fetch failure")
return {
"created": 0,
"updated": 0,
"deleted": 0,
"errors": 1
}
# Get existing posts from Directus
all_existing_posts = self.get_existing_posts()
# Filter existing posts to only include those from the same host as RSS feed
existing_posts = [post for post in all_existing_posts
if post.get("externalUrl") and self.is_same_host_as_rss(post["externalUrl"])]
logger.info(f"Found {len(existing_posts)} existing posts from RSS host {self.rss_host}")
# Create lookup by external URL
existing_by_url = {post["externalUrl"]: post for post in existing_posts}
stats = {
"created": 0,
"updated": 0,
"deleted": 0,
"errors": 0
}
# Track URLs from RSS feed
rss_urls = set()
# Process each RSS entry
for entry in feed.entries:
try:
post_data = self.parse_feed_entry(entry)
url = post_data["externalUrl"]
rss_urls.add(url)
if url in existing_by_url:
# Update existing post if needed
existing_post = existing_by_url[url]
# Check if update is needed (compare title and description)
needs_update = (
existing_post["title"] != post_data["title"] or
existing_post["description"] != post_data["description"]
)
if needs_update:
self.update_blog_post(existing_post["id"], post_data)
stats["updated"] += 1
else:
# Create new post
self.create_blog_post(post_data)
stats["created"] += 1
except Exception as e:
logger.error(f"Error processing RSS entry {entry.link}: {e}")
stats["errors"] += 1
# Delete posts that are no longer in RSS feed
for existing_post in existing_posts:
if existing_post["externalUrl"] not in rss_urls:
try:
self.delete_blog_post(existing_post["id"])
stats["deleted"] += 1
except Exception as e:
logger.error(f"Error deleting post {existing_post['id']}: {e}")
stats["errors"] += 1
logger.info(f"Synchronization complete. Stats: {stats}")
return stats
def lambda_handler(event, context):
"""AWS Lambda handler function"""
try:
scraper = BlogScraper()
stats = scraper.sync_rss_posts()
return {
'statusCode': 200,
'body': {
'message': 'RSS synchronization completed successfully',
'stats': stats
}
}
except Exception as e:
logger.error(f"Lambda execution failed: {e}")
return {
'statusCode': 500,
'body': {
'message': 'RSS synchronization failed',
'error': str(e)
}
}
def main():
"""Main function for local testing"""
try:
scraper = BlogScraper()
stats = scraper.sync_rss_posts()
print(f"Synchronization completed with stats: {stats}")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()