import sqlite3 from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Optional from scrapers.base import Job @dataclass class StoredJob: """A job stored in the database.""" id: int company_id: int external_id: str title: str url: str location: Optional[str] department: Optional[str] remote_type: Optional[str] first_seen: datetime last_seen: datetime status: str # 'active' or 'removed' class Database: """SQLite database for storing job listings.""" def __init__(self, db_path: str = "data/jobs.db"): self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) self._init_db() @contextmanager def _get_conn(self): """Get a database connection.""" conn = sqlite3.connect(self.db_path, timeout=30.0) conn.row_factory = sqlite3.Row # Enable WAL mode for better concurrency conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=30000") try: yield conn conn.commit() finally: conn.close() def _init_db(self): """Initialize the database schema.""" with self._get_conn() as conn: conn.executescript(""" CREATE TABLE IF NOT EXISTS companies ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, jobs_url TEXT, platform_type TEXT, last_scraped TIMESTAMP, active BOOLEAN DEFAULT TRUE ); CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, company_id INTEGER REFERENCES companies(id), external_id TEXT NOT NULL, title TEXT NOT NULL, url TEXT NOT NULL, location TEXT, department TEXT, remote_type TEXT, first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, status TEXT DEFAULT 'active', UNIQUE(company_id, external_id) ); CREATE INDEX IF NOT EXISTS idx_jobs_company ON jobs(company_id); CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status); """) def get_or_create_company(self, name: str, jobs_url: str = None, platform_type: str = None) -> int: """Get or create a company and return its ID.""" with self._get_conn() as conn: cursor = conn.execute( "SELECT id FROM companies WHERE name = ?", (name,) ) row = cursor.fetchone() if row: return row["id"] cursor = conn.execute( "INSERT INTO companies (name, jobs_url, platform_type) VALUES (?, ?, ?)", (name, jobs_url, platform_type) ) return cursor.lastrowid def update_company_scraped(self, company_id: int): """Update the last_scraped timestamp for a company.""" with self._get_conn() as conn: conn.execute( "UPDATE companies SET last_scraped = ? WHERE id = ?", (datetime.now(), company_id) ) def get_active_jobs(self, company_id: int) -> dict[str, StoredJob]: """Get all active jobs for a company, keyed by external_id.""" with self._get_conn() as conn: cursor = conn.execute( """SELECT * FROM jobs WHERE company_id = ? AND status = 'active'""", (company_id,) ) jobs = {} for row in cursor.fetchall(): job = StoredJob( id=row["id"], company_id=row["company_id"], external_id=row["external_id"], title=row["title"], url=row["url"], location=row["location"], department=row["department"], remote_type=row["remote_type"], first_seen=row["first_seen"], last_seen=row["last_seen"], status=row["status"] ) jobs[job.external_id] = job return jobs def upsert_job(self, company_id: int, job: Job) -> tuple[bool, Optional[StoredJob]]: """ Insert or update a job. Returns (is_new, old_job) where old_job is the previous version if it existed. """ with self._get_conn() as conn: # Check if job exists cursor = conn.execute( "SELECT * FROM jobs WHERE company_id = ? AND external_id = ?", (company_id, job.external_id) ) existing = cursor.fetchone() if existing: # Update last_seen and ensure status is active conn.execute( """UPDATE jobs SET title = ?, url = ?, location = ?, department = ?, remote_type = ?, last_seen = ?, status = 'active' WHERE id = ?""", (job.title, job.url, job.location, job.department, job.remote_type, datetime.now(), existing["id"]) ) old_job = StoredJob( id=existing["id"], company_id=existing["company_id"], external_id=existing["external_id"], title=existing["title"], url=existing["url"], location=existing["location"], department=existing["department"], remote_type=existing["remote_type"], first_seen=existing["first_seen"], last_seen=existing["last_seen"], status=existing["status"] ) return False, old_job else: # Insert new job conn.execute( """INSERT INTO jobs (company_id, external_id, title, url, location, department, remote_type) VALUES (?, ?, ?, ?, ?, ?, ?)""", (company_id, job.external_id, job.title, job.url, job.location, job.department, job.remote_type) ) return True, None def mark_jobs_removed(self, company_id: int, external_ids: set[str]) -> list[StoredJob]: """Mark jobs as removed. Returns the jobs that were marked removed.""" if not external_ids: return [] removed = [] with self._get_conn() as conn: placeholders = ",".join("?" * len(external_ids)) cursor = conn.execute( f"""SELECT * FROM jobs WHERE company_id = ? AND external_id IN ({placeholders}) AND status = 'active'""", (company_id, *external_ids) ) for row in cursor.fetchall(): removed.append(StoredJob( id=row["id"], company_id=row["company_id"], external_id=row["external_id"], title=row["title"], url=row["url"], location=row["location"], department=row["department"], remote_type=row["remote_type"], first_seen=row["first_seen"], last_seen=row["last_seen"], status=row["status"] )) conn.execute( f"""UPDATE jobs SET status = 'removed', last_seen = ? WHERE company_id = ? AND external_id IN ({placeholders})""", (datetime.now(), company_id, *external_ids) ) return removed def get_all_active_jobs(self) -> list[tuple[str, StoredJob]]: """Get all active jobs across all companies. Returns (company_name, job) tuples.""" with self._get_conn() as conn: cursor = conn.execute( """SELECT c.name as company_name, j.* FROM jobs j JOIN companies c ON j.company_id = c.id WHERE j.status = 'active' ORDER BY c.name, j.title""" ) results = [] for row in cursor.fetchall(): job = StoredJob( id=row["id"], company_id=row["company_id"], external_id=row["external_id"], title=row["title"], url=row["url"], location=row["location"], department=row["department"], remote_type=row["remote_type"], first_seen=row["first_seen"], last_seen=row["last_seen"], status=row["status"] ) results.append((row["company_name"], job)) return results def get_all_companies(self) -> list[str]: """Get all company names from the database.""" with self._get_conn() as conn: cursor = conn.execute( "SELECT name FROM companies WHERE active = TRUE ORDER BY name" ) return [row["name"] for row in cursor.fetchall()] def cleanup_removed_companies(self, active_company_names: list[str]) -> list[str]: """ Remove companies (and their jobs) that are no longer in the config. Returns list of removed company names. """ with self._get_conn() as conn: # Get companies in DB but not in config placeholders = ",".join("?" * len(active_company_names)) cursor = conn.execute( f"SELECT id, name FROM companies WHERE name NOT IN ({placeholders})", active_company_names ) removed = [] for row in cursor.fetchall(): company_id = row["id"] company_name = row["name"] # Delete jobs first (foreign key) conn.execute("DELETE FROM jobs WHERE company_id = ?", (company_id,)) # Delete company conn.execute("DELETE FROM companies WHERE id = ?", (company_id,)) removed.append(company_name) return removed