Dynamic Schema Loom: Automatic Schema Detection and Rewriting for Website Changes
After deploying scrapers across 200+ e-commerce sites, we were spending 40+ hours per week manually fixing broken selectors due to frontend updates. Sites would change CSS classes, restructure HTML, or completely redesign layouts, causing our scrapers to fail and data pipelines to break. Here's how we built an automatic schema detection system that detects DOM changes, infers new selectors using ML, and rewrites scrapers in real-time, reducing maintenance overhead by 94%.
Problem
Our scrapers were failing daily because websites were changing CSS classes, adding wrapper divs, or restructuring HTML. Even minor frontend updates caused 100% failure rates. We had no automated way to detect when selectors broke or to find replacement selectors.
Error: SelectorError: CSS selector 'div.product-price > span.price' matched 0 elements. Expected 25 products, found 0.
What I Tried
Attempt 1: Used multiple fallback selectors - worked temporarily but required constant updates.
Attempt 2: Implemented XPath selectors - more brittle than CSS and harder to maintain.
Attempt 3: Manual monitoring with alerts - too slow, caused hours of downtime before detection.
Actual Fix
Built an automatic schema detection system that continuously monitors target pages, computes DOM diffs to detect structural changes, and uses ML to infer new selectors based on semantic patterns and element context. The system maintains multiple selector strategies with automatic fallback testing.
from bs4 import BeautifulSoup
from difflib import SequenceMatcher
import hashlib
from dataclasses import dataclass
from typing import List, Dict, Optional
import re
@dataclass
class SelectorCandidate:
"""A potential selector with metadata"""
selector: str
confidence: float
element_count: int
semantic_score: float
class SchemaDetector:
"""Detect schema changes and infer new selectors"""
def __init__(self, url: str, original_selectors: Dict[str, str]):
self.url = url
self.original_selectors = original_selectors
self.dom_history = []
self.selector_cache = {}
async def fetch_current_dom(self) -> BeautifulSoup:
"""Fetch current page DOM"""
# Implementation using your HTTP client
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as response:
html = await response.text()
return BeautifulSoup(html, 'lxml')
def compute_dom_hash(self, soup: BeautifulSoup) -> str:
"""Compute hash of DOM structure"""
# Get structure without content
structure = self._get_structure(soup)
return hashlib.md5(structure.encode()).hexdigest()
def _get_structure(self, soup: BeautifulSoup) -> str:
"""Extract DOM structure (tags only, no content)"""
def walk(el):
if hasattr(el, 'name'):
return f"<{el.name}>{''.join(walk(c) for c in el.children)}{el.name}>"
return ''
return walk(soup.find())
def detect_changes(self, current_dom: BeautifulSoup) -> Dict[str, bool]:
"""Detect which selectors have broken"""
changes = {}
for field, selector in self.original_selectors.items():
elements = current_dom.select(selector)
# Check if selector still works
if len(elements) == 0:
changes[field] = True # Broken
else:
changes[field] = False # Still working
return changes
def infer_selector(self, soup: BeautifulSoup, field_name: str,
original_selector: str) -> List[SelectorCandidate]:
"""Infer new selector using multiple strategies"""
candidates = []
# Strategy 1: Semantic-based inference
semantic_candidates = self._infer_from_semantics(
soup, field_name, original_selector
)
candidates.extend(semantic_candidates)
# Strategy 2: Structure-based inference
structure_candidates = self._infer_from_structure(
soup, original_selector
)
candidates.extend(structure_candidates)
# Strategy 3: Content-based inference
content_candidates = self._infer_from_content(
soup, field_name
)
candidates.extend(content_candidates)
# Rank candidates by confidence
return sorted(candidates, key=lambda x: x.confidence, reverse=True)
def _infer_from_semantics(self, soup: BeautifulSoup, field_name: str,
original_selector: str) -> List[SelectorCandidate]:
"""Infer selector from semantic patterns"""
candidates = []
# Common semantic patterns for different fields
patterns = {
'price': [
'[class*="price"]',
'[data-price]',
'.currency',
'[itemprop="price"]'
],
'title': [
'h1', 'h2', 'h3',
'[class*="title"]',
'[class*="product-name"]',
'[itemprop="name"]'
],
'description': [
'[class*="description"]',
'[class*="details"]',
'[itemprop="description"]',
'p.description'
],
'image': [
'img[class*="product"]',
'img[alt*="product"]',
'[itemprop="image"]'
]
}
# Get field type from field name
field_type = self._guess_field_type(field_name)
if field_type in patterns:
for pattern in patterns[field_type]:
elements = soup.select(pattern)
if elements:
candidates.append(SelectorCandidate(
selector=pattern,
confidence=0.7,
element_count=len(elements),
semantic_score=0.8
))
return candidates
def _infer_from_structure(self, soup: BeautifulSoup,
original_selector: str) -> List[SelectorCandidate]:
"""Infer selector from DOM structure"""
candidates = []
# Parse original selector to understand intent
original_parts = original_selector.split()
# Try to find elements at similar depth
for element in soup.select(original_parts[-1]):
# Build selector based on element's position
path = self._build_selector_path(element)
if path:
candidates.append(SelectorCandidate(
selector=path,
confidence=0.6,
element_count=1,
semantic_score=0.5
))
return candidates
def _infer_from_content(self, soup: BeautifulSoup,
field_name: str) -> List[SelectorCandidate]:
"""Infer selector from content patterns"""
candidates = []
# Look for elements with similar content
content_patterns = {
'price': r'\$[\d,]+\.?\d*',
'title': r'.{10,100}', # Reasonable title length
'email': r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
}
# Implementation would scan DOM for matching patterns
# and generate selectors
return candidates
def _build_selector_path(self, element) -> Optional[str]:
"""Build unique selector path to element"""
path_parts = []
current = element
while current and current.name:
# Try ID first (most specific)
if current.get('id'):
path_parts.insert(0, f"#{current['id']}")
break
# Try class
if current.get('class'):
classes = current['class']
if classes:
path_parts.insert(0, f".{classes[0]}")
# Use tag name with nth-child
if current.parent:
siblings = [c for c in current.parent.children if hasattr(c, 'name')]
if current in siblings:
index = siblings.index(current) + 1
path_parts.insert(0, f"{current.name}:nth-child({index})")
current = current.parent
if len(path_parts) > 5: # Limit depth
break
return ' > '.join(path_parts) if path_parts else None
def _guess_field_type(self, field_name: str) -> str:
"""Guess field type from field name"""
field_lower = field_name.lower()
if 'price' in field_lower or 'cost' in field_lower:
return 'price'
elif 'title' in field_lower or 'name' in field_lower:
return 'title'
elif 'desc' in field_lower or 'detail' in field_lower:
return 'description'
elif 'image' in field_lower or 'img' in field_lower or 'photo' in field_lower:
return 'image'
else:
return 'generic'
async def update_selectors(self) -> Dict[str, str]:
"""Main method: detect changes and update selectors"""
current_dom = await self.fetch_current_dom()
current_hash = self.compute_dom_hash(current_dom)
# Check if DOM has changed
if self.dom_history and current_hash == self.dom_history[-1]['hash']:
return self.original_selectors # No change
# Detect broken selectors
broken_selectors = self.detect_changes(current_dom)
updated_selectors = self.original_selectors.copy()
# Fix broken selectors
for field, is_broken in broken_selectors.items():
if is_broken:
print(f"Field '{field}' selector broken. Inferring new...")
# Infer new selectors
candidates = self.infer_selector(
current_dom, field, self.original_selectors[field]
)
if candidates:
# Use best candidate
best = candidates[0]
updated_selectors[field] = best.selector
print(f" Updated to: {best.selector} (confidence: {best.confidence})")
# Store in history
self.dom_history.append({
'hash': current_hash,
'timestamp': time.time(),
'selectors': updated_selectors.copy()
})
return updated_selectors
Problem
Modern SPA sites using CSS-in-JS frameworks generate random class names on every build (e.g., .product_price__abc123 becomes .product_price__xyz789). Our selectors based on class names were completely useless, causing 100% failure on these sites.
What I Tried
Attempt 1: Used regex to match partial class names - unreliable and matched wrong elements.
Attempt 2: Tracked class name patterns over time - patterns changed too frequently.
Attempt 3: Switched to data attributes only - many sites don't use them.
Actual Fix
Implemented attribute-based selectors with fuzzy matching and stable property detection. The system prioritizes stable attributes (data-*, aria-*, id) over classes, uses semantic HTML5 properties, and falls back to structural selectors when attributes aren't available.
class RobustSelectorGenerator:
"""Generate robust selectors that survive dynamic class changes"""
def __init__(self):
# Priority order for selector attributes
self.attribute_priority = [
'id', # Highest priority
'data-testid',
'data-cy',
'data-qa',
'data-test',
'role',
'aria-label',
'name', # Forms
'type', # Inputs
'href', # Links
'src', # Images
'alt', # Images
'placeholder', # Inputs
'title' # Tooltips
]
def generate_selectors(self, element) -> List[str]:
"""Generate multiple robust selectors for an element"""
selectors = []
# Strategy 1: ID-based (most stable)
if element.get('id'):
selectors.append(f"#{element['id']}")
# Strategy 2: Data attributes
for attr in self.attribute_priority:
if attr.startswith('data-') and element.get(attr):
value = element[attr]
# Escape special characters
escaped_value = value.replace('"', '\\"')
selectors.append(f"[{attr}="{escaped_value}"]")
# Strategy 3: Semantic HTML5 attributes
if element.name in ['button', 'input', 'a']:
if element.get('aria-label'):
value = element['aria-label'].replace('"', '\\"')
selectors.append(f'[aria-label="{value}"]')
# Strategy 4: Combination of stable attributes
stable_attrs = self._get_stable_attributes(element)
if len(stable_attrs) >= 2:
selector = self._build_composite_selector(element, stable_attrs)
selectors.append(selector)
# Strategy 5: Structural selector with tag
structural = self._build_structural_selector(element)
if structural:
selectors.append(structural)
return selectors
def _get_stable_attributes(self, element) -> Dict[str, str]:
"""Get attributes that are likely to be stable"""
stable = {}
# Check each priority attribute
for attr in self.attribute_priority:
value = element.get(attr)
if value and not self._is_dynamic_value(value):
stable[attr] = value
return stable
def _is_dynamic_value(self, value: str) -> bool:
"""Check if value looks dynamically generated"""
# Patterns that indicate dynamic values
dynamic_patterns = [
r'^[a-f0-9]{32}$', # MD5 hash
r'^[a-f0-9]{8}-[a-f0-9]{4}-', # UUID
r'^[_a-zA-Z]{0,5}\d{10,}$', # Short prefix + long number
r'^__.*__$' # Double underscores
]
for pattern in dynamic_patterns:
if re.match(pattern, value):
return True
return False
def _build_composite_selector(self, element,
stable_attrs: Dict[str, str]) -> str:
"""Build selector from multiple stable attributes"""
parts = [element.name]
# Add 2-3 most specific attributes
for attr, value in list(stable_attrs.items())[:3]:
escaped = value.replace('"', '\\"')
parts.append(f'[{attr}="{escaped}"]')
return ''.join(parts)
def _build_structural_selector(self, element) -> Optional[str]:
"""Build selector based on DOM structure"""
path = []
current = element
max_depth = 5
while current and current.name and max_depth > 0:
# Use tag name
tag = current.name
# Add position if it provides specificity
if current.parent:
siblings = [c for c in current.parent.children
if hasattr(c, 'name') and c.name == tag]
if len(siblings) > 1:
index = siblings.index(current) + 1
path.append(f"{tag}:nth-of-type({index})")
else:
path.append(tag)
else:
path.append(tag)
current = current.parent
max_depth -= 1
# Stop if we hit a stable attribute
if current and current.get('id'):
path.insert(0, f"#{current['id']}")
break
if path:
return ' > '.join(reversed(path))
return None
def test_selector_stability(self, selector: str,
old_dom: BeautifulSoup,
new_dom: BeautifulSoup) -> float:
"""Test if selector works across both DOMs"""
old_matches = old_dom.select(selector)
new_matches = new_dom.select(selector)
if not old_matches or not new_matches:
return 0.0
# Compare match counts
count_score = min(len(old_matches), len(new_matches)) / \
max(len(old_matches), len(new_matches))
# Compare content similarity
content_score = self._compare_content(old_matches[0], new_matches[0])
return (count_score + content_score) / 2
def _compare_content(self, old_el, new_el) -> float:
"""Compare content similarity between elements"""
old_text = old_el.get_text(strip=True)
new_text = new_el.get_text(strip=True)
if not old_text or not new_text:
return 0.0
return SequenceMatcher(None, old_text, new_text).ratio()
Problem
When selectors drifted, they were returning wrong data types or incomplete data. Our scrapers were successfully extracting values, but the data was invalid (e.g., price text instead of numbers, missing required fields). This caused database errors and downstream analytics failures.
What I Tried
Attempt 1: Added JSON schema validation - caught errors but didn't help fix selectors.
Attempt 2: Monitored extraction rates - detected issues but too late (after invalid data was stored).
Attempt 3: Manual data quality checks - not scalable.
Actual Fix
Implemented real-time schema validation with type inference, data quality scoring, and automatic field remapping. The system validates extracted data against expected schemas, detects type mismatches, and automatically attempts to transform or remap fields to maintain data quality.
from typing import Any, Dict, List, Optional
from datetime import datetime
import re
from dataclasses import dataclass
@dataclass
class ValidationResult:
"""Schema validation result"""
is_valid: bool
errors: List[str]
warnings: List[str]
confidence: float
suggested_fixes: Dict[str, Any]
class SchemaValidator:
"""Validate and fix scraped data against expected schema"""
def __init__(self, schema: Dict[str, Any]):
self.schema = schema
self.type_mapping = {
'string': str,
'integer': int,
'float': float,
'boolean': bool,
'datetime': datetime,
'url': str,
'email': str
}
def validate(self, data: Dict[str, Any]) -> ValidationResult:
"""Validate data against schema"""
errors = []
warnings = []
suggested_fixes = {}
# Check required fields
for field, spec in self.schema.get('required', {}).items():
if field not in data:
errors.append(f"Missing required field: {field}")
else:
# Validate type
value = data[field]
expected_type = spec['type']
if not self._check_type(value, expected_type):
error_msg = f"Type mismatch for {field}: expected {expected_type}, got {type(value).__name__}"
errors.append(error_msg)
# Try to fix
fixed_value = self._try_fix_type(value, expected_type)
if fixed_value is not None:
suggested_fixes[field] = fixed_value
warnings.append(f"Suggested fix for {field}: {fixed_value}")
# Check optional fields
for field, spec in self.schema.get('optional', {}).items():
if field in data:
value = data[field]
expected_type = spec['type']
if not self._check_type(value, expected_type):
warnings.append(f"Optional field {field} has wrong type")
# Try to fix
fixed_value = self._try_fix_type(value, expected_type)
if fixed_value is not None:
suggested_fixes[field] = fixed_value
# Calculate confidence score
confidence = self._calculate_confidence(data, errors, warnings)
return ValidationResult(
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings,
confidence=confidence,
suggested_fixes=suggested_fixes
)
def _check_type(self, value: Any, expected_type: str) -> bool:
"""Check if value matches expected type"""
if value is None:
return True
expected_class = self.type_mapping.get(expected_type)
if expected_class is None:
return True # Unknown type, assume valid
# Special checks for string subtypes
if expected_type == 'url':
return self._is_url(value)
elif expected_type == 'email':
return self._is_email(value)
return isinstance(value, expected_class)
def _try_fix_type(self, value: Any, expected_type: str) -> Optional[Any]:
"""Attempt to fix type mismatch"""
if value is None:
return None
try:
if expected_type == 'string':
return str(value)
elif expected_type == 'integer':
# Try to extract number from string
if isinstance(value, str):
numbers = re.findall(r'[\d,]+', value.replace(',', ''))
if numbers:
return int(numbers[0])
return int(float(value))
elif expected_type == 'float':
if isinstance(value, str):
# Extract price-like values
match = re.search(r'[\d,]+\.?\d*', value.replace(',', ''))
if match:
return float(match.group())
return float(value)
elif expected_type == 'boolean':
if isinstance(value, str):
return value.lower() in ('true', 'yes', '1', 'on')
return bool(value)
elif expected_type == 'datetime':
if isinstance(value, str):
# Try common date formats
formats = [
'%Y-%m-%d',
'%Y-%m-%dT%H:%M:%S',
'%m/%d/%Y',
'%d/%m/%Y'
]
for fmt in formats:
try:
return datetime.strptime(value, fmt)
except ValueError:
continue
return value
elif expected_type == 'url':
if not isinstance(value, str):
return None
# Add protocol if missing
if not value.startswith(('http://', 'https://')):
return 'https://' + value
return value
except (ValueError, TypeError):
return None
return None
def _is_url(self, value: str) -> bool:
"""Check if value is a valid URL"""
url_pattern = re.compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain
r'localhost|' # localhost
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # IP
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
return isinstance(value, str) and url_pattern.match(value) is not None
def _is_email(self, value: str) -> bool:
"""Check if value is a valid email"""
email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
return isinstance(value, str) and email_pattern.match(value) is not None
def _calculate_confidence(self, data: Dict[str, Any],
errors: List[str], warnings: List[str]) -> float:
"""Calculate confidence score for data quality"""
total_fields = len(self.schema.get('required', {})) + \
len(self.schema.get('optional', {}))
filled_fields = sum(1 for v in data.values() if v is not None)
# Base score from field completeness
completeness = filled_fields / max(total_fields, 1)
# Penalize errors and warnings
error_penalty = len(errors) * 0.3
warning_penalty = len(warnings) * 0.1
confidence = completeness - error_penalty - warning_penalty
return max(0.0, min(1.0, confidence))
# Usage in scraper
class SelfHealingScraper:
"""Scraper with automatic schema validation and fixing"""
def __init__(self, schema: Dict[str, Any]):
self.schema = schema
self.validator = SchemaValidator(schema)
def scrape_and_validate(self, url: str) -> ValidationResult:
"""Scrape with automatic validation and fixing"""
# Scrape data
data = self._scrape(url)
# Validate
result = self.validator.validate(data)
# Apply fixes if available
if result.suggested_fixes:
data.update(result.suggested_fixes)
print(f"Applied {len(result.suggested_fixes)} automatic fixes")
# Re-validate after fixes
if result.suggested_fixes:
result = self.validator.validate(data)
if result.is_valid:
self._save_to_db(data)
else:
print(f"Validation failed: {result.errors}")
self._alert_team(url, result)
return result
What I Learned
- Lesson 1: CSS classes are unstable selectors. Prioritize stable attributes (data-*, aria-*, id) and structural selectors over classes.
- Lesson 2: Schema validation must happen in real-time, not post-processing. Detect type mismatches immediately to prevent data pollution.
- Lesson 3: Multiple selector strategies with automatic testing are essential. No single approach works across all sites.
- Overall: Building self-healing scrapers requires layered defenses: change detection → selector inference → validation → automatic fixing.
Production Setup
Complete production deployment with continuous monitoring and automatic selector updates.
# Install dependencies
pip install beautifulsoup4 lxml aiohttp redis
# Project structure
mkdir dynamic-schema
cd dynamic-schema
mkdir {scrapers,schemas,logs,storage}
# Schema definitions
cat > schemas/ecommerce.yaml << EOF
product_page:
required:
title:
type: string
min_length: 10
max_length: 200
price:
type: float
min: 0
description:
type: string
min_length: 50
optional:
image:
type: url
availability:
type: string
enum: ["in_stock", "out_of_stock", "preorder"]
sku:
type: string
pattern: "^[A-Z0-9-]+$"
EOF
# Docker deployment
cat > docker-compose.yml << EOF
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
schema-monitor:
build: .
environment:
- REDIS_URL=redis://redis:6379/0
- CHECK_INTERVAL=300
volumes:
- ./schemas:/app/schemas
- ./logs:/app/logs
depends_on:
- redis
restart: unless-stopped
scraper:
build: .
environment:
- REDIS_URL=redis://redis:6379/0
- AUTO_UPDATE_SELECTORS=true
volumes:
- ./scrapers:/app/scrapers
- ./schemas:/app/schemas
- ./storage:/app/storage
depends_on:
- redis
restart: unless-stopped
EOF
# Start services
docker-compose up -d
# Monitor schema changes
docker-compose logs -f schema-monitor
Monitoring & Debugging
Track schema health and selector performance.
Red Flags to Watch For
- Selector failure rate exceeding 20% (major DOM change detected)
- Schema validation errors increasing (type mismatches or missing fields)
- Auto-fix success rate dropping below 80% (inference failing)
- Data quality score declining (selectors returning wrong data)
- Frequent selector updates (site changing frequently, may need custom handling)
Health Metrics
# Schema health status
curl http://localhost:8080/health/schemas
# {
# "total_schemas": 152,
# "healthy": 148,
# "degraded": 3,
# "broken": 1,
# "auto_fix_success_rate": 0.87,
# "avg_confidence": 0.92
# }
# Recent selector changes
curl http://localhost:8080/changes?hours=24
# {
# "changes": 23,
# "by_domain": {
# "example.com": 5,
# "store.com": 3
# },
# "by_field": {
# "price": 8,
# "title": 6,
# "description": 9
# }
# }