249 lines
9.4 KiB
Python
249 lines
9.4 KiB
Python
import sqlite3
|
|
from contextlib import contextmanager
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from scrapers.base import Job
|
|
|
|
|
|
@dataclass
|
|
class StoredJob:
|
|
"""A job stored in the database."""
|
|
id: int
|
|
company_id: int
|
|
external_id: str
|
|
title: str
|
|
url: str
|
|
location: Optional[str]
|
|
department: Optional[str]
|
|
remote_type: Optional[str]
|
|
first_seen: datetime
|
|
last_seen: datetime
|
|
status: str # 'active' or 'removed'
|
|
|
|
|
|
class Database:
|
|
"""SQLite database for storing job listings."""
|
|
|
|
def __init__(self, db_path: str = "data/jobs.db"):
|
|
self.db_path = Path(db_path)
|
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self._init_db()
|
|
|
|
@contextmanager
|
|
def _get_conn(self):
|
|
"""Get a database connection."""
|
|
conn = sqlite3.connect(self.db_path, timeout=30.0)
|
|
conn.row_factory = sqlite3.Row
|
|
# Enable WAL mode for better concurrency
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA busy_timeout=30000")
|
|
try:
|
|
yield conn
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
def _init_db(self):
|
|
"""Initialize the database schema."""
|
|
with self._get_conn() as conn:
|
|
conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS companies (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL UNIQUE,
|
|
jobs_url TEXT,
|
|
platform_type TEXT,
|
|
last_scraped TIMESTAMP,
|
|
active BOOLEAN DEFAULT TRUE
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
company_id INTEGER REFERENCES companies(id),
|
|
external_id TEXT NOT NULL,
|
|
title TEXT NOT NULL,
|
|
url TEXT NOT NULL,
|
|
location TEXT,
|
|
department TEXT,
|
|
remote_type TEXT,
|
|
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
status TEXT DEFAULT 'active',
|
|
UNIQUE(company_id, external_id)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_jobs_company ON jobs(company_id);
|
|
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status);
|
|
""")
|
|
|
|
def get_or_create_company(self, name: str, jobs_url: str = None, platform_type: str = None) -> int:
|
|
"""Get or create a company and return its ID."""
|
|
with self._get_conn() as conn:
|
|
cursor = conn.execute(
|
|
"SELECT id FROM companies WHERE name = ?", (name,)
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return row["id"]
|
|
|
|
cursor = conn.execute(
|
|
"INSERT INTO companies (name, jobs_url, platform_type) VALUES (?, ?, ?)",
|
|
(name, jobs_url, platform_type)
|
|
)
|
|
return cursor.lastrowid
|
|
|
|
def update_company_scraped(self, company_id: int):
|
|
"""Update the last_scraped timestamp for a company."""
|
|
with self._get_conn() as conn:
|
|
conn.execute(
|
|
"UPDATE companies SET last_scraped = ? WHERE id = ?",
|
|
(datetime.now(), company_id)
|
|
)
|
|
|
|
def get_active_jobs(self, company_id: int) -> dict[str, StoredJob]:
|
|
"""Get all active jobs for a company, keyed by external_id."""
|
|
with self._get_conn() as conn:
|
|
cursor = conn.execute(
|
|
"""SELECT * FROM jobs WHERE company_id = ? AND status = 'active'""",
|
|
(company_id,)
|
|
)
|
|
jobs = {}
|
|
for row in cursor.fetchall():
|
|
job = StoredJob(
|
|
id=row["id"],
|
|
company_id=row["company_id"],
|
|
external_id=row["external_id"],
|
|
title=row["title"],
|
|
url=row["url"],
|
|
location=row["location"],
|
|
department=row["department"],
|
|
remote_type=row["remote_type"],
|
|
first_seen=row["first_seen"],
|
|
last_seen=row["last_seen"],
|
|
status=row["status"]
|
|
)
|
|
jobs[job.external_id] = job
|
|
return jobs
|
|
|
|
def upsert_job(self, company_id: int, job: Job) -> tuple[bool, Optional[StoredJob]]:
|
|
"""
|
|
Insert or update a job.
|
|
Returns (is_new, old_job) where old_job is the previous version if it existed.
|
|
"""
|
|
with self._get_conn() as conn:
|
|
# Check if job exists
|
|
cursor = conn.execute(
|
|
"SELECT * FROM jobs WHERE company_id = ? AND external_id = ?",
|
|
(company_id, job.external_id)
|
|
)
|
|
existing = cursor.fetchone()
|
|
|
|
if existing:
|
|
# Update last_seen and ensure status is active
|
|
conn.execute(
|
|
"""UPDATE jobs SET
|
|
title = ?, url = ?, location = ?, department = ?,
|
|
remote_type = ?, last_seen = ?, status = 'active'
|
|
WHERE id = ?""",
|
|
(job.title, job.url, job.location, job.department,
|
|
job.remote_type, datetime.now(), existing["id"])
|
|
)
|
|
old_job = StoredJob(
|
|
id=existing["id"],
|
|
company_id=existing["company_id"],
|
|
external_id=existing["external_id"],
|
|
title=existing["title"],
|
|
url=existing["url"],
|
|
location=existing["location"],
|
|
department=existing["department"],
|
|
remote_type=existing["remote_type"],
|
|
first_seen=existing["first_seen"],
|
|
last_seen=existing["last_seen"],
|
|
status=existing["status"]
|
|
)
|
|
return False, old_job
|
|
else:
|
|
# Insert new job
|
|
conn.execute(
|
|
"""INSERT INTO jobs
|
|
(company_id, external_id, title, url, location, department, remote_type)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(company_id, job.external_id, job.title, job.url,
|
|
job.location, job.department, job.remote_type)
|
|
)
|
|
return True, None
|
|
|
|
def mark_jobs_removed(self, company_id: int, external_ids: set[str]) -> list[StoredJob]:
|
|
"""Mark jobs as removed. Returns the jobs that were marked removed."""
|
|
if not external_ids:
|
|
return []
|
|
|
|
removed = []
|
|
with self._get_conn() as conn:
|
|
placeholders = ",".join("?" * len(external_ids))
|
|
cursor = conn.execute(
|
|
f"""SELECT * FROM jobs
|
|
WHERE company_id = ? AND external_id IN ({placeholders}) AND status = 'active'""",
|
|
(company_id, *external_ids)
|
|
)
|
|
|
|
for row in cursor.fetchall():
|
|
removed.append(StoredJob(
|
|
id=row["id"],
|
|
company_id=row["company_id"],
|
|
external_id=row["external_id"],
|
|
title=row["title"],
|
|
url=row["url"],
|
|
location=row["location"],
|
|
department=row["department"],
|
|
remote_type=row["remote_type"],
|
|
first_seen=row["first_seen"],
|
|
last_seen=row["last_seen"],
|
|
status=row["status"]
|
|
))
|
|
|
|
conn.execute(
|
|
f"""UPDATE jobs SET status = 'removed', last_seen = ?
|
|
WHERE company_id = ? AND external_id IN ({placeholders})""",
|
|
(datetime.now(), company_id, *external_ids)
|
|
)
|
|
|
|
return removed
|
|
|
|
def get_all_active_jobs(self) -> list[tuple[str, StoredJob]]:
|
|
"""Get all active jobs across all companies. Returns (company_name, job) tuples."""
|
|
with self._get_conn() as conn:
|
|
cursor = conn.execute(
|
|
"""SELECT c.name as company_name, j.*
|
|
FROM jobs j
|
|
JOIN companies c ON j.company_id = c.id
|
|
WHERE j.status = 'active'
|
|
ORDER BY c.name, j.title"""
|
|
)
|
|
results = []
|
|
for row in cursor.fetchall():
|
|
job = StoredJob(
|
|
id=row["id"],
|
|
company_id=row["company_id"],
|
|
external_id=row["external_id"],
|
|
title=row["title"],
|
|
url=row["url"],
|
|
location=row["location"],
|
|
department=row["department"],
|
|
remote_type=row["remote_type"],
|
|
first_seen=row["first_seen"],
|
|
last_seen=row["last_seen"],
|
|
status=row["status"]
|
|
)
|
|
results.append((row["company_name"], job))
|
|
return results
|
|
|
|
def get_all_companies(self) -> list[str]:
|
|
"""Get all company names from the database."""
|
|
with self._get_conn() as conn:
|
|
cursor = conn.execute(
|
|
"SELECT name FROM companies WHERE active = TRUE ORDER BY name"
|
|
)
|
|
return [row["name"] for row in cursor.fetchall()]
|