SQLite Database Design

Storing and querying scraped movie data

Why I needed a database

Now that we can fetch and parse movie data, need somewhere to store it. SQLite works well - no server to set up, single file database.

Why SQLite

  • Zero configuration: No server setup
  • Single file: Database is just one file
  • SQL support: Full SQL features
  • Built into Python: No extra dependencies

For 250 movies, SQLite is plenty. If you're dealing with millions, consider PostgreSQL.

Database schema

CREATE TABLE movies (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    rank INTEGER NOT NULL UNIQUE,
    title TEXT NOT NULL,
    director TEXT,
    actors TEXT,        -- JSON array
    year INTEGER,
    country TEXT,
    genres TEXT,        -- JSON array
    rating REAL,
    vote_count INTEGER,
    quote TEXT,
    ai_summary TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Indexes for common queries
CREATE INDEX idx_rank ON movies(rank);
CREATE INDEX idx_rating ON movies(rating);
CREATE INDEX idx_year ON movies(year);
CREATE INDEX idx_country ON movies(country);

Design decisions

JSON for arrays: SQLite doesn't have native arrays. Storing actors and genres as JSON strings works fine for read-heavy workloads.

UNIQUE on rank: Each movie has a unique rank (1-250). Prevents duplicates if we re-scrape.

Indexes: Added indexes on frequently queried columns.

Database class

import sqlite3
import json
from pathlib import Path
from datetime import datetime

class Database:
    def __init__(self, db_path: str = "data/movies.db"):
        self.db_path = db_path
        self.connection = None
        self._connect()
        self._create_tables()

    def _connect(self):
        """Create database connection and directory"""
        Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)

        self.connection = sqlite3.connect(self.db_path)
        self.connection.row_factory = sqlite3.Row  # Access columns by name

    def _create_tables(self):
        """Create tables if they don't exist"""
        create_sql = """
        CREATE TABLE IF NOT EXISTS movies (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            rank INTEGER NOT NULL UNIQUE,
            title TEXT NOT NULL,
            director TEXT,
            actors TEXT,
            year INTEGER,
            country TEXT,
            genres TEXT,
            rating REAL,
            vote_count INTEGER,
            quote TEXT,
            ai_summary TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE INDEX IF NOT EXISTS idx_rank ON movies(rank);
        CREATE INDEX IF NOT EXISTS idx_rating ON movies(rating);
        CREATE INDEX IF NOT EXISTS idx_year ON movies(year);
        CREATE INDEX IF NOT EXISTS idx_country ON movies(country);
        """

        self.connection.executescript(create_sql)
        self.connection.commit()

Inserting data

def insert_movie(self, movie: Dict[str, Any]) -> bool:
    """Insert a single movie record"""

    insert_sql = """
    INSERT OR REPLACE INTO movies (
        rank, title, director, actors, year, country, genres,
        rating, vote_count, quote, ai_summary, updated_at
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """

    try:
        # Convert lists to JSON
        actors_json = json.dumps(movie.get('actors', []), ensure_ascii=False)
        genres_json = json.dumps(movie.get('genres', []), ensure_ascii=False)

        self.connection.execute(insert_sql, (
            movie.get('rank'),
            movie.get('title'),
            movie.get('director'),
            actors_json,
            movie.get('year'),
            movie.get('country'),
            genres_json,
            movie.get('rating'),
            movie.get('vote_count'),
            movie.get('quote'),
            movie.get('ai_summary'),
            datetime.now().isoformat()
        ))

        self.connection.commit()
        return True

    except sqlite3.Error as e:
        print(f"Error inserting movie: {e}")
        self.connection.rollback()
        return False

INSERT OR REPLACE updates the record if rank exists. Useful for re-scraping.

Batch insert

Inserting 250 movies one by one is slow. Used a single transaction instead:

def insert_movies_batch(self, movies: List[Dict[str, Any]]) -> int:
    """Insert multiple movies in a single transaction"""

    insert_sql = """
    INSERT OR REPLACE INTO movies (
        rank, title, director, actors, year, country, genres,
        rating, vote_count, quote, ai_summary, updated_at
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """

    success_count = 0
    cursor = self.connection.cursor()

    try:
        for movie in movies:
            actors_json = json.dumps(movie.get('actors', []), ensure_ascii=False)
            genres_json = json.dumps(movie.get('genres', []), ensure_ascii=False)

            try:
                cursor.execute(insert_sql, (
                    movie.get('rank'),
                    movie.get('title'),
                    movie.get('director'),
                    actors_json,
                    movie.get('year'),
                    movie.get('country'),
                    genres_json,
                    movie.get('rating'),
                    movie.get('vote_count'),
                    movie.get('quote'),
                    movie.get('ai_summary'),
                    datetime.now().isoformat()
                ))
                success_count += 1
            except sqlite3.Error as e:
                print(f"Failed to insert {movie.get('rank')}: {e}")

        # Commit all at once
        self.connection.commit()
        return success_count

    except sqlite3.Error as e:
        print(f"Batch insert failed: {e}")
        self.connection.rollback()
        return success_count

Single commit at the end is way faster than committing after each insert. Learned this the hard way - initial implementation took 30+ seconds, now takes less than 1.

Querying data

def get_all_movies(self) -> List[Dict[str, Any]]:
    """Retrieve all movies ordered by rank"""
    sql = "SELECT * FROM movies ORDER BY rank ASC"
    cursor = self.connection.execute(sql)
    rows = cursor.fetchall()
    return [self._row_to_dict(row) for row in rows]

def get_movie_by_rank(self, rank: int) -> Optional[Dict[str, Any]]:
    """Get a single movie by rank"""
    sql = "SELECT * FROM movies WHERE rank = ?"
    cursor = self.connection.execute(sql, (rank,))
    row = cursor.fetchone()
    return self._row_to_dict(row) if row else None

def _row_to_dict(self, row: sqlite3.Row) -> Dict[str, Any]:
    """Convert database row to dictionary, parse JSON fields"""
    movie = dict(row)

    # Parse JSON back to Python objects
    try:
        if movie.get('actors'):
            movie['actors'] = json.loads(movie['actors'])
        if movie.get('genres'):
            movie['genres'] = json.loads(movie['genres'])
    except json.JSONDecodeError as e:
        print(f"Error parsing JSON: {e}")

    return movie

Advanced queries

def get_top_directors(self, limit: int = 10) -> List[tuple]:
    """Get directors with most movies in top 250"""
    sql = """
    SELECT director, COUNT(*) as movie_count
    FROM movies
    WHERE director IS NOT NULL AND director != ''
    GROUP BY director
    ORDER BY movie_count DESC
    LIMIT ?
    """
    return self.connection.execute(sql, (limit,)).fetchall()

def get_genre_distribution(self) -> List[tuple]:
    """Count movies by genre using json_each"""
    sql = """
    SELECT genre, COUNT(*) as count
    FROM (
        SELECT json_each.value as genre
        FROM movies, json_each(movies.genres)
        WHERE json_valid(movies.genres)
    )
    GROUP BY genre
    ORDER BY count DESC
    """
    return self.connection.execute(sql).fetchall()

Context manager

Made the database class a context manager for proper cleanup:

def close(self):
    """Close the database connection"""
    if self.connection:
        self.connection.close()

def __enter__(self):
    """Context manager entry"""
    return self

def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit"""
    self.close()

# Usage
with Database() as db:
    movies = db.get_all_movies()
    print(f"Total movies: {len(movies)}")
# Connection automatically closed