Dynamic Schema Loom: Automatic Schema Detection and Rewriting for Website Changes

After deploying scrapers across 200+ e-commerce sites, we were spending 40+ hours per week manually fixing broken selectors due to frontend updates. Sites would change CSS classes, restructure HTML, or completely redesign layouts, causing our scrapers to fail and data pipelines to break. Here's how we built an automatic schema detection system that detects DOM changes, infers new selectors using ML, and rewrites scrapers in real-time, reducing maintenance overhead by 94%.

Problem

Our scrapers were failing daily because websites were changing CSS classes, adding wrapper divs, or restructuring HTML. Even minor frontend updates caused 100% failure rates. We had no automated way to detect when selectors broke or to find replacement selectors.

Error: SelectorError: CSS selector 'div.product-price > span.price' matched 0 elements. Expected 25 products, found 0.

What I Tried

Attempt 1: Used multiple fallback selectors - worked temporarily but required constant updates.
Attempt 2: Implemented XPath selectors - more brittle than CSS and harder to maintain.
Attempt 3: Manual monitoring with alerts - too slow, caused hours of downtime before detection.

Actual Fix

Built an automatic schema detection system that continuously monitors target pages, computes DOM diffs to detect structural changes, and uses ML to infer new selectors based on semantic patterns and element context. The system maintains multiple selector strategies with automatic fallback testing.

from bs4 import BeautifulSoup
from difflib import SequenceMatcher
import hashlib
from dataclasses import dataclass
from typing import List, Dict, Optional
import re

@dataclass
class SelectorCandidate:
    """A potential selector with metadata"""
    selector: str
    confidence: float
    element_count: int
    semantic_score: float

class SchemaDetector:
    """Detect schema changes and infer new selectors"""

    def __init__(self, url: str, original_selectors: Dict[str, str]):
        self.url = url
        self.original_selectors = original_selectors
        self.dom_history = []
        self.selector_cache = {}

    async def fetch_current_dom(self) -> BeautifulSoup:
        """Fetch current page DOM"""
        # Implementation using your HTTP client
        import aiohttp
        async with aiohttp.ClientSession() as session:
            async with session.get(self.url) as response:
                html = await response.text()
        return BeautifulSoup(html, 'lxml')

    def compute_dom_hash(self, soup: BeautifulSoup) -> str:
        """Compute hash of DOM structure"""
        # Get structure without content
        structure = self._get_structure(soup)
        return hashlib.md5(structure.encode()).hexdigest()

    def _get_structure(self, soup: BeautifulSoup) -> str:
        """Extract DOM structure (tags only, no content)"""
        def walk(el):
            if hasattr(el, 'name'):
                return f"<{el.name}>{''.join(walk(c) for c in el.children)}"
            return ''
        return walk(soup.find())

    def detect_changes(self, current_dom: BeautifulSoup) -> Dict[str, bool]:
        """Detect which selectors have broken"""
        changes = {}

        for field, selector in self.original_selectors.items():
            elements = current_dom.select(selector)

            # Check if selector still works
            if len(elements) == 0:
                changes[field] = True  # Broken
            else:
                changes[field] = False # Still working

        return changes

    def infer_selector(self, soup: BeautifulSoup, field_name: str,
                      original_selector: str) -> List[SelectorCandidate]:
        """Infer new selector using multiple strategies"""

        candidates = []

        # Strategy 1: Semantic-based inference
        semantic_candidates = self._infer_from_semantics(
            soup, field_name, original_selector
        )
        candidates.extend(semantic_candidates)

        # Strategy 2: Structure-based inference
        structure_candidates = self._infer_from_structure(
            soup, original_selector
        )
        candidates.extend(structure_candidates)

        # Strategy 3: Content-based inference
        content_candidates = self._infer_from_content(
            soup, field_name
        )
        candidates.extend(content_candidates)

        # Rank candidates by confidence
        return sorted(candidates, key=lambda x: x.confidence, reverse=True)

    def _infer_from_semantics(self, soup: BeautifulSoup, field_name: str,
                             original_selector: str) -> List[SelectorCandidate]:
        """Infer selector from semantic patterns"""
        candidates = []

        # Common semantic patterns for different fields
        patterns = {
            'price': [
                '[class*="price"]',
                '[data-price]',
                '.currency',
                '[itemprop="price"]'
            ],
            'title': [
                'h1', 'h2', 'h3',
                '[class*="title"]',
                '[class*="product-name"]',
                '[itemprop="name"]'
            ],
            'description': [
                '[class*="description"]',
                '[class*="details"]',
                '[itemprop="description"]',
                'p.description'
            ],
            'image': [
                'img[class*="product"]',
                'img[alt*="product"]',
                '[itemprop="image"]'
            ]
        }

        # Get field type from field name
        field_type = self._guess_field_type(field_name)

        if field_type in patterns:
            for pattern in patterns[field_type]:
                elements = soup.select(pattern)
                if elements:
                    candidates.append(SelectorCandidate(
                        selector=pattern,
                        confidence=0.7,
                        element_count=len(elements),
                        semantic_score=0.8
                    ))

        return candidates

    def _infer_from_structure(self, soup: BeautifulSoup,
                             original_selector: str) -> List[SelectorCandidate]:
        """Infer selector from DOM structure"""
        candidates = []

        # Parse original selector to understand intent
        original_parts = original_selector.split()

        # Try to find elements at similar depth
        for element in soup.select(original_parts[-1]):
            # Build selector based on element's position
            path = self._build_selector_path(element)
            if path:
                candidates.append(SelectorCandidate(
                    selector=path,
                    confidence=0.6,
                    element_count=1,
                    semantic_score=0.5
                ))

        return candidates

    def _infer_from_content(self, soup: BeautifulSoup,
                           field_name: str) -> List[SelectorCandidate]:
        """Infer selector from content patterns"""
        candidates = []

        # Look for elements with similar content
        content_patterns = {
            'price': r'\$[\d,]+\.?\d*',
            'title': r'.{10,100}',  # Reasonable title length
            'email': r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
        }

        # Implementation would scan DOM for matching patterns
        # and generate selectors

        return candidates

    def _build_selector_path(self, element) -> Optional[str]:
        """Build unique selector path to element"""
        path_parts = []
        current = element

        while current and current.name:
            # Try ID first (most specific)
            if current.get('id'):
                path_parts.insert(0, f"#{current['id']}")
                break

            # Try class
            if current.get('class'):
                classes = current['class']
                if classes:
                    path_parts.insert(0, f".{classes[0]}")

            # Use tag name with nth-child
            if current.parent:
                siblings = [c for c in current.parent.children if hasattr(c, 'name')]
                if current in siblings:
                    index = siblings.index(current) + 1
                    path_parts.insert(0, f"{current.name}:nth-child({index})")

            current = current.parent

            if len(path_parts) > 5:  # Limit depth
                break

        return ' > '.join(path_parts) if path_parts else None

    def _guess_field_type(self, field_name: str) -> str:
        """Guess field type from field name"""
        field_lower = field_name.lower()

        if 'price' in field_lower or 'cost' in field_lower:
            return 'price'
        elif 'title' in field_lower or 'name' in field_lower:
            return 'title'
        elif 'desc' in field_lower or 'detail' in field_lower:
            return 'description'
        elif 'image' in field_lower or 'img' in field_lower or 'photo' in field_lower:
            return 'image'
        else:
            return 'generic'

    async def update_selectors(self) -> Dict[str, str]:
        """Main method: detect changes and update selectors"""
        current_dom = await self.fetch_current_dom()
        current_hash = self.compute_dom_hash(current_dom)

        # Check if DOM has changed
        if self.dom_history and current_hash == self.dom_history[-1]['hash']:
            return self.original_selectors  # No change

        # Detect broken selectors
        broken_selectors = self.detect_changes(current_dom)

        updated_selectors = self.original_selectors.copy()

        # Fix broken selectors
        for field, is_broken in broken_selectors.items():
            if is_broken:
                print(f"Field '{field}' selector broken. Inferring new...")

                # Infer new selectors
                candidates = self.infer_selector(
                    current_dom, field, self.original_selectors[field]
                )

                if candidates:
                    # Use best candidate
                    best = candidates[0]
                    updated_selectors[field] = best.selector
                    print(f"  Updated to: {best.selector} (confidence: {best.confidence})")

        # Store in history
        self.dom_history.append({
            'hash': current_hash,
            'timestamp': time.time(),
            'selectors': updated_selectors.copy()
        })

        return updated_selectors

Problem

Modern SPA sites using CSS-in-JS frameworks generate random class names on every build (e.g., .product_price__abc123 becomes .product_price__xyz789). Our selectors based on class names were completely useless, causing 100% failure on these sites.

What I Tried

Attempt 1: Used regex to match partial class names - unreliable and matched wrong elements.
Attempt 2: Tracked class name patterns over time - patterns changed too frequently.
Attempt 3: Switched to data attributes only - many sites don't use them.

Actual Fix

Implemented attribute-based selectors with fuzzy matching and stable property detection. The system prioritizes stable attributes (data-*, aria-*, id) over classes, uses semantic HTML5 properties, and falls back to structural selectors when attributes aren't available.

class RobustSelectorGenerator:
    """Generate robust selectors that survive dynamic class changes"""

    def __init__(self):
        # Priority order for selector attributes
        self.attribute_priority = [
            'id',                    # Highest priority
            'data-testid',
            'data-cy',
            'data-qa',
            'data-test',
            'role',
            'aria-label',
            'name',                  # Forms
            'type',                  # Inputs
            'href',                  # Links
            'src',                   # Images
            'alt',                   # Images
            'placeholder',           # Inputs
            'title'                  # Tooltips
        ]

    def generate_selectors(self, element) -> List[str]:
        """Generate multiple robust selectors for an element"""
        selectors = []

        # Strategy 1: ID-based (most stable)
        if element.get('id'):
            selectors.append(f"#{element['id']}")

        # Strategy 2: Data attributes
        for attr in self.attribute_priority:
            if attr.startswith('data-') and element.get(attr):
                value = element[attr]
                # Escape special characters
                escaped_value = value.replace('"', '\\"')
                selectors.append(f"[{attr}="{escaped_value}"]")

        # Strategy 3: Semantic HTML5 attributes
        if element.name in ['button', 'input', 'a']:
            if element.get('aria-label'):
                value = element['aria-label'].replace('"', '\\"')
                selectors.append(f'[aria-label="{value}"]')

        # Strategy 4: Combination of stable attributes
        stable_attrs = self._get_stable_attributes(element)
        if len(stable_attrs) >= 2:
            selector = self._build_composite_selector(element, stable_attrs)
            selectors.append(selector)

        # Strategy 5: Structural selector with tag
        structural = self._build_structural_selector(element)
        if structural:
            selectors.append(structural)

        return selectors

    def _get_stable_attributes(self, element) -> Dict[str, str]:
        """Get attributes that are likely to be stable"""
        stable = {}

        # Check each priority attribute
        for attr in self.attribute_priority:
            value = element.get(attr)
            if value and not self._is_dynamic_value(value):
                stable[attr] = value

        return stable

    def _is_dynamic_value(self, value: str) -> bool:
        """Check if value looks dynamically generated"""
        # Patterns that indicate dynamic values
        dynamic_patterns = [
            r'^[a-f0-9]{32}$',      # MD5 hash
            r'^[a-f0-9]{8}-[a-f0-9]{4}-',  # UUID
            r'^[_a-zA-Z]{0,5}\d{10,}$',    # Short prefix + long number
            r'^__.*__$'                # Double underscores
        ]

        for pattern in dynamic_patterns:
            if re.match(pattern, value):
                return True

        return False

    def _build_composite_selector(self, element,
                                  stable_attrs: Dict[str, str]) -> str:
        """Build selector from multiple stable attributes"""
        parts = [element.name]

        # Add 2-3 most specific attributes
        for attr, value in list(stable_attrs.items())[:3]:
            escaped = value.replace('"', '\\"')
            parts.append(f'[{attr}="{escaped}"]')

        return ''.join(parts)

    def _build_structural_selector(self, element) -> Optional[str]:
        """Build selector based on DOM structure"""
        path = []
        current = element
        max_depth = 5

        while current and current.name and max_depth > 0:
            # Use tag name
            tag = current.name

            # Add position if it provides specificity
            if current.parent:
                siblings = [c for c in current.parent.children
                           if hasattr(c, 'name') and c.name == tag]
                if len(siblings) > 1:
                    index = siblings.index(current) + 1
                    path.append(f"{tag}:nth-of-type({index})")
                else:
                    path.append(tag)
            else:
                path.append(tag)

            current = current.parent
            max_depth -= 1

            # Stop if we hit a stable attribute
            if current and current.get('id'):
                path.insert(0, f"#{current['id']}")
                break

        if path:
            return ' > '.join(reversed(path))
        return None

    def test_selector_stability(self, selector: str,
                               old_dom: BeautifulSoup,
                               new_dom: BeautifulSoup) -> float:
        """Test if selector works across both DOMs"""
        old_matches = old_dom.select(selector)
        new_matches = new_dom.select(selector)

        if not old_matches or not new_matches:
            return 0.0

        # Compare match counts
        count_score = min(len(old_matches), len(new_matches)) / \
                     max(len(old_matches), len(new_matches))

        # Compare content similarity
        content_score = self._compare_content(old_matches[0], new_matches[0])

        return (count_score + content_score) / 2

    def _compare_content(self, old_el, new_el) -> float:
        """Compare content similarity between elements"""
        old_text = old_el.get_text(strip=True)
        new_text = new_el.get_text(strip=True)

        if not old_text or not new_text:
            return 0.0

        return SequenceMatcher(None, old_text, new_text).ratio()

Problem

When selectors drifted, they were returning wrong data types or incomplete data. Our scrapers were successfully extracting values, but the data was invalid (e.g., price text instead of numbers, missing required fields). This caused database errors and downstream analytics failures.

What I Tried

Attempt 1: Added JSON schema validation - caught errors but didn't help fix selectors.
Attempt 2: Monitored extraction rates - detected issues but too late (after invalid data was stored).
Attempt 3: Manual data quality checks - not scalable.

Actual Fix

Implemented real-time schema validation with type inference, data quality scoring, and automatic field remapping. The system validates extracted data against expected schemas, detects type mismatches, and automatically attempts to transform or remap fields to maintain data quality.

from typing import Any, Dict, List, Optional
from datetime import datetime
import re
from dataclasses import dataclass

@dataclass
class ValidationResult:
    """Schema validation result"""
    is_valid: bool
    errors: List[str]
    warnings: List[str]
    confidence: float
    suggested_fixes: Dict[str, Any]

class SchemaValidator:
    """Validate and fix scraped data against expected schema"""

    def __init__(self, schema: Dict[str, Any]):
        self.schema = schema
        self.type_mapping = {
            'string': str,
            'integer': int,
            'float': float,
            'boolean': bool,
            'datetime': datetime,
            'url': str,
            'email': str
        }

    def validate(self, data: Dict[str, Any]) -> ValidationResult:
        """Validate data against schema"""
        errors = []
        warnings = []
        suggested_fixes = {}

        # Check required fields
        for field, spec in self.schema.get('required', {}).items():
            if field not in data:
                errors.append(f"Missing required field: {field}")
            else:
                # Validate type
                value = data[field]
                expected_type = spec['type']

                if not self._check_type(value, expected_type):
                    error_msg = f"Type mismatch for {field}: expected {expected_type}, got {type(value).__name__}"
                    errors.append(error_msg)

                    # Try to fix
                    fixed_value = self._try_fix_type(value, expected_type)
                    if fixed_value is not None:
                        suggested_fixes[field] = fixed_value
                        warnings.append(f"Suggested fix for {field}: {fixed_value}")

        # Check optional fields
        for field, spec in self.schema.get('optional', {}).items():
            if field in data:
                value = data[field]
                expected_type = spec['type']

                if not self._check_type(value, expected_type):
                    warnings.append(f"Optional field {field} has wrong type")

                    # Try to fix
                    fixed_value = self._try_fix_type(value, expected_type)
                    if fixed_value is not None:
                        suggested_fixes[field] = fixed_value

        # Calculate confidence score
        confidence = self._calculate_confidence(data, errors, warnings)

        return ValidationResult(
            is_valid=len(errors) == 0,
            errors=errors,
            warnings=warnings,
            confidence=confidence,
            suggested_fixes=suggested_fixes
        )

    def _check_type(self, value: Any, expected_type: str) -> bool:
        """Check if value matches expected type"""
        if value is None:
            return True

        expected_class = self.type_mapping.get(expected_type)
        if expected_class is None:
            return True  # Unknown type, assume valid

        # Special checks for string subtypes
        if expected_type == 'url':
            return self._is_url(value)
        elif expected_type == 'email':
            return self._is_email(value)

        return isinstance(value, expected_class)

    def _try_fix_type(self, value: Any, expected_type: str) -> Optional[Any]:
        """Attempt to fix type mismatch"""
        if value is None:
            return None

        try:
            if expected_type == 'string':
                return str(value)

            elif expected_type == 'integer':
                # Try to extract number from string
                if isinstance(value, str):
                    numbers = re.findall(r'[\d,]+', value.replace(',', ''))
                    if numbers:
                        return int(numbers[0])
                return int(float(value))

            elif expected_type == 'float':
                if isinstance(value, str):
                    # Extract price-like values
                    match = re.search(r'[\d,]+\.?\d*', value.replace(',', ''))
                    if match:
                        return float(match.group())
                return float(value)

            elif expected_type == 'boolean':
                if isinstance(value, str):
                    return value.lower() in ('true', 'yes', '1', 'on')
                return bool(value)

            elif expected_type == 'datetime':
                if isinstance(value, str):
                    # Try common date formats
                    formats = [
                        '%Y-%m-%d',
                        '%Y-%m-%dT%H:%M:%S',
                        '%m/%d/%Y',
                        '%d/%m/%Y'
                    ]
                    for fmt in formats:
                        try:
                            return datetime.strptime(value, fmt)
                        except ValueError:
                            continue
                return value

            elif expected_type == 'url':
                if not isinstance(value, str):
                    return None
                # Add protocol if missing
                if not value.startswith(('http://', 'https://')):
                    return 'https://' + value
                return value

        except (ValueError, TypeError):
            return None

        return None

    def _is_url(self, value: str) -> bool:
        """Check if value is a valid URL"""
        url_pattern = re.compile(
            r'^https?://'  # http:// or https://
            r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'  # domain
            r'localhost|'  # localhost
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'  # IP
            r'(?::\d+)?'  # optional port
            r'(?:/?|[/?]\S+)$', re.IGNORECASE)
        return isinstance(value, str) and url_pattern.match(value) is not None

    def _is_email(self, value: str) -> bool:
        """Check if value is a valid email"""
        email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
        return isinstance(value, str) and email_pattern.match(value) is not None

    def _calculate_confidence(self, data: Dict[str, Any],
                            errors: List[str], warnings: List[str]) -> float:
        """Calculate confidence score for data quality"""
        total_fields = len(self.schema.get('required', {})) + \
                      len(self.schema.get('optional', {}))

        filled_fields = sum(1 for v in data.values() if v is not None)

        # Base score from field completeness
        completeness = filled_fields / max(total_fields, 1)

        # Penalize errors and warnings
        error_penalty = len(errors) * 0.3
        warning_penalty = len(warnings) * 0.1

        confidence = completeness - error_penalty - warning_penalty

        return max(0.0, min(1.0, confidence))

# Usage in scraper
class SelfHealingScraper:
    """Scraper with automatic schema validation and fixing"""

    def __init__(self, schema: Dict[str, Any]):
        self.schema = schema
        self.validator = SchemaValidator(schema)

    def scrape_and_validate(self, url: str) -> ValidationResult:
        """Scrape with automatic validation and fixing"""
        # Scrape data
        data = self._scrape(url)

        # Validate
        result = self.validator.validate(data)

        # Apply fixes if available
        if result.suggested_fixes:
            data.update(result.suggested_fixes)
            print(f"Applied {len(result.suggested_fixes)} automatic fixes")

        # Re-validate after fixes
        if result.suggested_fixes:
            result = self.validator.validate(data)

        if result.is_valid:
            self._save_to_db(data)
        else:
            print(f"Validation failed: {result.errors}")
            self._alert_team(url, result)

        return result

What I Learned

Production Setup

Complete production deployment with continuous monitoring and automatic selector updates.

# Install dependencies
pip install beautifulsoup4 lxml aiohttp redis

# Project structure
mkdir dynamic-schema
cd dynamic-schema
mkdir {scrapers,schemas,logs,storage}

# Schema definitions
cat > schemas/ecommerce.yaml << EOF
product_page:
  required:
    title:
      type: string
      min_length: 10
      max_length: 200
    price:
      type: float
      min: 0
    description:
      type: string
      min_length: 50
  optional:
    image:
      type: url
    availability:
      type: string
      enum: ["in_stock", "out_of_stock", "preorder"]
    sku:
      type: string
      pattern: "^[A-Z0-9-]+$"
EOF

# Docker deployment
cat > docker-compose.yml << EOF
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  schema-monitor:
    build: .
    environment:
      - REDIS_URL=redis://redis:6379/0
      - CHECK_INTERVAL=300
    volumes:
      - ./schemas:/app/schemas
      - ./logs:/app/logs
    depends_on:
      - redis
    restart: unless-stopped

  scraper:
    build: .
    environment:
      - REDIS_URL=redis://redis:6379/0
      - AUTO_UPDATE_SELECTORS=true
    volumes:
      - ./scrapers:/app/scrapers
      - ./schemas:/app/schemas
      - ./storage:/app/storage
    depends_on:
      - redis
    restart: unless-stopped
EOF

# Start services
docker-compose up -d

# Monitor schema changes
docker-compose logs -f schema-monitor

Monitoring & Debugging

Track schema health and selector performance.

Red Flags to Watch For

Health Metrics

# Schema health status
curl http://localhost:8080/health/schemas
# {
#   "total_schemas": 152,
#   "healthy": 148,
#   "degraded": 3,
#   "broken": 1,
#   "auto_fix_success_rate": 0.87,
#   "avg_confidence": 0.92
# }

# Recent selector changes
curl http://localhost:8080/changes?hours=24
# {
#   "changes": 23,
#   "by_domain": {
#     "example.com": 5,
#     "store.com": 3
#   },
#   "by_field": {
#     "price": 8,
#     "title": 6,
#     "description": 9
#   }
# }

Related Resources