Initial commit: Job scraper for privacy/open-source companies

- Scrapes job listings from Greenhouse, Lever, and Ashby platforms
- Tracks 14 companies (1Password, DuckDuckGo, GitLab, etc.)
- SQLite database for change detection
- Filters by engineering job titles and location preferences
- Generates static HTML dashboard with search/filter
- Docker support for deployment to Debian server
This commit is contained in:
Bastian Gruber 2026-01-20 12:40:08 -04:00
parent 251002b889
commit e8eb9d3fcf
No known key found for this signature in database
GPG key ID: 2E8AA0462DB41CBE
16 changed files with 1613 additions and 155 deletions

167
.gitignore vendored
View file

@ -1,164 +1,25 @@
# ---> Python # Python
# Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
*$py.class *$py.class
# C extensions
*.so *.so
# Distribution / packaging
.Python .Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/ venv/
.venv/
ENV/ ENV/
env.bak/
venv.bak/
# Spyder project settings # Data
.spyderproject data/*.db
.spyproject
# Rope project settings # IDE
.ropeproject .idea/
.vscode/
*.swp
*.swo
# mkdocs documentation # OS
/site .DS_Store
Thumbs.db
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Secrets (if you add email credentials)
.env

16
Dockerfile Normal file
View file

@ -0,0 +1,16 @@
FROM python:3.12-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create data directory for SQLite database
RUN mkdir -p /app/data
# Run the scraper
CMD ["python", "main.py"]

133
README.md
View file

@ -1,3 +1,132 @@
# job-scraper # Job Scraper
Track openings for companies I am interested in Monitor job openings from privacy-focused and open-source companies. Runs daily and shows changes.
## Quick Start (Local)
```bash
# Create venv and install deps
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
# Run once
python main.py
# View dashboard
open data/dashboard.html
```
## Deploy to Debian Server
### 1. Install Docker
```bash
# Install Docker
curl -fsSL https://get.docker.com | sh
sudo usermod -aG docker $USER
# Log out and back in
# Install Docker Compose
sudo apt install docker-compose-plugin
```
### 2. Clone/Copy the project
```bash
# Copy project to server
scp -r job-scraper user@your-server:~/
# Or clone from git if you pushed it
git clone <your-repo> ~/job-scraper
```
### 3. Run with Docker Compose
```bash
cd ~/job-scraper
# Run scraper once to populate data
docker compose run --rm scraper
# Start dashboard + scheduled scraper
docker compose up -d scraper-scheduled dashboard
# View logs
docker compose logs -f
```
### 4. Access the dashboard
Open `http://your-server:8080` in your browser.
### Optional: Use a reverse proxy
If you want HTTPS or a custom domain, add nginx/caddy in front:
```bash
# Example with Caddy (auto HTTPS)
sudo apt install caddy
echo "jobs.yourdomain.com {
reverse_proxy localhost:8080
}" | sudo tee /etc/caddy/Caddyfile
sudo systemctl reload caddy
```
## Commands
```bash
# Run scraper once
docker compose run --rm scraper
# Run scraper with schedule (daily 9 AM)
docker compose up -d scraper-scheduled
# Start web dashboard
docker compose up -d dashboard
# View all jobs
docker compose run --rm scraper python main.py --list
# Stop everything
docker compose down
# View logs
docker compose logs -f scraper-scheduled
```
## Configuration
Edit `config.yaml` to:
- Add/remove companies
- Change location filters
- Configure email/Slack notifications
## Dashboard Features
- Dark theme, monospace font
- Filter jobs by typing (press `/` to focus, `Esc` to clear)
- Color-coded tags: `remote`, `canada`, `berlin`
- Jump to company links
- Updates automatically when scraper runs
## Project Structure
```
job-scraper/
├── main.py # CLI entry point
├── db.py # SQLite database
├── dashboard.py # HTML generator
├── notify.py # Notifications
├── scrapers/ # Platform scrapers
│ ├── base.py # Base class
│ ├── greenhouse.py # Greenhouse API
│ ├── lever.py # Lever API
│ └── ashby.py # Ashby API
├── config.yaml # Company list & settings
├── Dockerfile
├── docker-compose.yaml
└── data/
├── jobs.db # SQLite database
└── dashboard.html # Generated dashboard
```

116
config.yaml Normal file
View file

@ -0,0 +1,116 @@
# Job Scraper Configuration
# ===========================
# Location filters - jobs matching these locations will be highlighted
location_filters:
- remote
- canada
- toronto
- vancouver
- berlin
- germany
# Job title filters - only jobs containing these keywords will be tracked
# Leave empty or remove to track all jobs
title_filters:
- engineer
- developer
- software
- sre
- devops
- infrastructure
- platform
- backend
- frontend
- fullstack
- full-stack
- security
# Companies to monitor
# Each company needs: name, platform, and platform-specific config
companies:
# Privacy & Security Focused
- name: Signal
platform: lever
lever_company: signal
- name: DuckDuckGo
platform: ashby
ashby_company: duck-duck-go
- name: 1Password
platform: ashby
ashby_company: 1password
- name: Bitwarden
platform: greenhouse
board_token: bitwarden
# Open Source Infrastructure & DevTools
- name: GrafanaLabs
platform: greenhouse
board_token: grafanalabs
- name: GitLab
platform: greenhouse
board_token: gitlab
- name: Sourcegraph
platform: greenhouse
board_token: sourcegraph91
- name: Supabase
platform: ashby
ashby_company: supabase
- name: Tailscale
platform: greenhouse
board_token: tailscale
- name: HashiCorp
platform: greenhouse
board_token: hashicorp
# Developer Tools & Platforms
- name: Automattic
platform: greenhouse
board_token: automatticcareers
- name: Canonical
platform: greenhouse
board_token: canonical
- name: ClickHouse
platform: greenhouse
board_token: clickhouse
- name: Cloudflare
platform: greenhouse
board_token: cloudflare
# Notification settings (optional - configure as needed)
notifications:
# Console output is always enabled
console: true
# Uncomment and configure for email notifications
# email:
# smtp_host: smtp.gmail.com
# smtp_port: 587
# username: your-email@gmail.com
# password: your-app-password
# from_addr: your-email@gmail.com
# to_addr: your-email@gmail.com
# Uncomment for Slack webhook
# slack:
# webhook_url: https://hooks.slack.com/services/YOUR/WEBHOOK/URL
# Scraper settings
scraper:
# Delay between requests in seconds (be respectful!)
request_delay: 2
# Timeout for requests in seconds
timeout: 30
# Number of retries on failure
retries: 3

385
dashboard.py Normal file
View file

@ -0,0 +1,385 @@
#!/usr/bin/env python3
"""
Generate a simple text-based HTML dashboard of all tracked jobs.
"""
from datetime import datetime
from pathlib import Path
from db import Database
def generate_dashboard(output_path: str = "data/dashboard.html"):
"""Generate a static HTML dashboard."""
db = Database()
jobs = db.get_all_active_jobs()
# Group by company
companies = {}
for company_name, job in jobs:
if company_name not in companies:
companies[company_name] = []
companies[company_name].append(job)
# Sort companies by name
sorted_companies = sorted(companies.items())
html = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Job Board</title>
<style>
:root {{
--bg: #1a1a1a;
--fg: #e0e0e0;
--accent: #4a9eff;
--muted: #888;
--border: #333;
--highlight: #2a2a2a;
}}
* {{ box-sizing: border-box; margin: 0; padding: 0; }}
body {{
font-family: "SF Mono", "Monaco", "Inconsolata", "Fira Code", monospace;
font-size: 14px;
line-height: 1.6;
background: var(--bg);
color: var(--fg);
padding: 20px;
max-width: 1200px;
margin: 0 auto;
}}
header {{
border-bottom: 1px solid var(--border);
padding-bottom: 15px;
margin-bottom: 20px;
}}
h1 {{
font-size: 18px;
font-weight: normal;
color: var(--accent);
}}
.meta {{
color: var(--muted);
font-size: 12px;
margin-top: 5px;
}}
.filters {{
margin: 15px 0;
padding: 10px;
background: var(--highlight);
border-radius: 4px;
}}
.filters input {{
background: var(--bg);
border: 1px solid var(--border);
color: var(--fg);
padding: 8px 12px;
width: 100%;
max-width: 400px;
font-family: inherit;
font-size: 14px;
border-radius: 4px;
}}
.filters input:focus {{
outline: none;
border-color: var(--accent);
}}
.stats {{
display: flex;
gap: 20px;
margin: 10px 0;
font-size: 12px;
color: var(--muted);
}}
.company {{
margin-bottom: 25px;
}}
.company-header {{
display: flex;
align-items: baseline;
gap: 10px;
padding: 8px 0;
border-bottom: 1px solid var(--border);
cursor: pointer;
}}
.company-header:hover {{
color: var(--accent);
}}
.company-name {{
font-weight: bold;
color: var(--accent);
}}
.company-count {{
color: var(--muted);
font-size: 12px;
}}
.jobs {{
margin-left: 20px;
}}
.job {{
padding: 6px 0;
border-bottom: 1px solid var(--border);
display: grid;
grid-template-columns: 1fr 180px;
gap: 10px;
align-items: baseline;
}}
.job:last-child {{
border-bottom: none;
}}
.job:hover {{
background: var(--highlight);
}}
.job-title {{
overflow: hidden;
text-overflow: ellipsis;
}}
.job-title a {{
color: var(--fg);
text-decoration: none;
}}
.job-title a:hover {{
color: var(--accent);
text-decoration: underline;
}}
.job-location {{
color: var(--muted);
font-size: 12px;
text-align: right;
}}
.tag {{
display: inline-block;
padding: 2px 6px;
border-radius: 3px;
font-size: 11px;
margin-left: 5px;
}}
.tag-remote {{
background: #1a4a1a;
color: #4ade80;
}}
.tag-canada {{
background: #4a1a1a;
color: #f87171;
}}
.tag-berlin {{
background: #4a4a1a;
color: #facc15;
}}
.hidden {{
display: none;
}}
.toc {{
margin: 20px 0;
padding: 15px;
background: var(--highlight);
border-radius: 4px;
}}
.toc-title {{
font-size: 12px;
color: var(--muted);
margin-bottom: 10px;
}}
.toc-links {{
display: flex;
flex-wrap: wrap;
gap: 10px;
}}
.toc-links a {{
color: var(--accent);
text-decoration: none;
font-size: 13px;
}}
.toc-links a:hover {{
text-decoration: underline;
}}
.filter-buttons {{
display: flex;
flex-wrap: wrap;
gap: 8px;
margin-top: 10px;
}}
.filter-btn {{
background: var(--bg);
border: 1px solid var(--border);
color: var(--muted);
padding: 4px 12px;
font-family: inherit;
font-size: 12px;
border-radius: 4px;
cursor: pointer;
transition: all 0.15s;
}}
.filter-btn:hover {{
border-color: var(--accent);
color: var(--fg);
}}
.filter-btn.active {{
background: var(--accent);
border-color: var(--accent);
color: var(--bg);
}}
</style>
</head>
<body>
<header>
<h1>$ job-board</h1>
<div class="meta">
Last updated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |
{len(jobs)} jobs across {len(companies)} companies
</div>
</header>
<div class="filters">
<input type="text" id="search" placeholder="Filter jobs... (e.g. 'senior engineer', 'remote', 'canada')" autofocus>
<div class="filter-buttons">
<button class="filter-btn" data-filter="">All</button>
<button class="filter-btn" data-filter="engineer">Engineering</button>
<button class="filter-btn" data-filter="senior engineer">Senior Eng</button>
<button class="filter-btn" data-filter="staff principal">Staff+</button>
<button class="filter-btn" data-filter="manager director">Management</button>
<button class="filter-btn" data-filter="product">Product</button>
<button class="filter-btn" data-filter="design">Design</button>
<button class="filter-btn" data-filter="security">Security</button>
<button class="filter-btn" data-filter="remote">Remote</button>
<button class="filter-btn" data-filter="canada toronto vancouver">Canada</button>
<button class="filter-btn" data-filter="berlin germany">Berlin</button>
</div>
<div class="stats">
<span id="visible-count">{len(jobs)} jobs shown</span>
</div>
</div>
<div class="toc">
<div class="toc-title">Jump to company:</div>
<div class="toc-links">
"""
# Table of contents
for company_name, company_jobs in sorted_companies:
anchor = company_name.lower().replace(" ", "-")
html += f' <a href="#{anchor}">{company_name} ({len(company_jobs)})</a>\n'
html += """ </div>
</div>
<main id="job-list">
"""
# Job listings
for company_name, company_jobs in sorted_companies:
anchor = company_name.lower().replace(" ", "-")
html += f"""
<div class="company" id="{anchor}">
<div class="company-header">
<span class="company-name">{company_name}</span>
<span class="company-count">{len(company_jobs)} positions</span>
</div>
<div class="jobs">
"""
for job in sorted(company_jobs, key=lambda j: j.title):
location = job.location or ""
location_lower = location.lower()
# Tags
tags = ""
if job.remote_type == "remote" or "remote" in location_lower:
tags += '<span class="tag tag-remote">remote</span>'
if "canada" in location_lower or "toronto" in location_lower or "vancouver" in location_lower:
tags += '<span class="tag tag-canada">canada</span>'
if "berlin" in location_lower or "germany" in location_lower:
tags += '<span class="tag tag-berlin">berlin</span>'
html += f""" <div class="job" data-search="{job.title.lower()} {location_lower} {(job.department or '').lower()}">
<span class="job-title"><a href="{job.url}" target="_blank">{job.title}</a>{tags}</span>
<span class="job-location">{location}</span>
</div>
"""
html += """ </div>
</div>
"""
html += """ </main>
<script>
const search = document.getElementById('search');
const jobs = document.querySelectorAll('.job');
const companies = document.querySelectorAll('.company');
const visibleCount = document.getElementById('visible-count');
const filterBtns = document.querySelectorAll('.filter-btn');
function filterJobs(query) {
let visible = 0;
const terms = query.toLowerCase().trim().split(/\\s+/).filter(t => t);
companies.forEach(company => {
const companyJobs = company.querySelectorAll('.job');
let companyVisible = 0;
companyJobs.forEach(job => {
const searchText = job.dataset.search;
// Match if ANY term matches (OR logic for filter buttons)
const matches = terms.length === 0 || terms.some(term => searchText.includes(term));
job.classList.toggle('hidden', !matches);
if (matches) {
companyVisible++;
visible++;
}
});
company.classList.toggle('hidden', companyVisible === 0);
});
visibleCount.textContent = `${visible} jobs shown`;
}
search.addEventListener('input', (e) => {
// Clear active button when typing
filterBtns.forEach(btn => btn.classList.remove('active'));
filterJobs(e.target.value);
});
// Filter buttons
filterBtns.forEach(btn => {
btn.addEventListener('click', () => {
const filter = btn.dataset.filter;
search.value = filter;
filterBtns.forEach(b => b.classList.remove('active'));
btn.classList.add('active');
filterJobs(filter);
});
});
// Keyboard shortcut: / to focus search
document.addEventListener('keydown', (e) => {
if (e.key === '/' && document.activeElement !== search) {
e.preventDefault();
search.focus();
}
if (e.key === 'Escape') {
search.value = '';
filterBtns.forEach(b => b.classList.remove('active'));
filterJobs('');
search.blur();
}
});
// Set "All" as active by default
filterBtns[0].classList.add('active');
</script>
</body>
</html>
"""
# Write the file
output = Path(output_path)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(html)
print(f"Dashboard generated: {output_path}")
return output_path
if __name__ == "__main__":
generate_dashboard()

238
db.py Normal file
View file

@ -0,0 +1,238 @@
import sqlite3
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Optional
from scrapers.base import Job
@dataclass
class StoredJob:
"""A job stored in the database."""
id: int
company_id: int
external_id: str
title: str
url: str
location: Optional[str]
department: Optional[str]
remote_type: Optional[str]
first_seen: datetime
last_seen: datetime
status: str # 'active' or 'removed'
class Database:
"""SQLite database for storing job listings."""
def __init__(self, db_path: str = "data/jobs.db"):
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_db()
@contextmanager
def _get_conn(self):
"""Get a database connection."""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
def _init_db(self):
"""Initialize the database schema."""
with self._get_conn() as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS companies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
jobs_url TEXT,
platform_type TEXT,
last_scraped TIMESTAMP,
active BOOLEAN DEFAULT TRUE
);
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
company_id INTEGER REFERENCES companies(id),
external_id TEXT NOT NULL,
title TEXT NOT NULL,
url TEXT NOT NULL,
location TEXT,
department TEXT,
remote_type TEXT,
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT DEFAULT 'active',
UNIQUE(company_id, external_id)
);
CREATE INDEX IF NOT EXISTS idx_jobs_company ON jobs(company_id);
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status);
""")
def get_or_create_company(self, name: str, jobs_url: str = None, platform_type: str = None) -> int:
"""Get or create a company and return its ID."""
with self._get_conn() as conn:
cursor = conn.execute(
"SELECT id FROM companies WHERE name = ?", (name,)
)
row = cursor.fetchone()
if row:
return row["id"]
cursor = conn.execute(
"INSERT INTO companies (name, jobs_url, platform_type) VALUES (?, ?, ?)",
(name, jobs_url, platform_type)
)
return cursor.lastrowid
def update_company_scraped(self, company_id: int):
"""Update the last_scraped timestamp for a company."""
with self._get_conn() as conn:
conn.execute(
"UPDATE companies SET last_scraped = ? WHERE id = ?",
(datetime.now(), company_id)
)
def get_active_jobs(self, company_id: int) -> dict[str, StoredJob]:
"""Get all active jobs for a company, keyed by external_id."""
with self._get_conn() as conn:
cursor = conn.execute(
"""SELECT * FROM jobs WHERE company_id = ? AND status = 'active'""",
(company_id,)
)
jobs = {}
for row in cursor.fetchall():
job = StoredJob(
id=row["id"],
company_id=row["company_id"],
external_id=row["external_id"],
title=row["title"],
url=row["url"],
location=row["location"],
department=row["department"],
remote_type=row["remote_type"],
first_seen=row["first_seen"],
last_seen=row["last_seen"],
status=row["status"]
)
jobs[job.external_id] = job
return jobs
def upsert_job(self, company_id: int, job: Job) -> tuple[bool, Optional[StoredJob]]:
"""
Insert or update a job.
Returns (is_new, old_job) where old_job is the previous version if it existed.
"""
with self._get_conn() as conn:
# Check if job exists
cursor = conn.execute(
"SELECT * FROM jobs WHERE company_id = ? AND external_id = ?",
(company_id, job.external_id)
)
existing = cursor.fetchone()
if existing:
# Update last_seen and ensure status is active
conn.execute(
"""UPDATE jobs SET
title = ?, url = ?, location = ?, department = ?,
remote_type = ?, last_seen = ?, status = 'active'
WHERE id = ?""",
(job.title, job.url, job.location, job.department,
job.remote_type, datetime.now(), existing["id"])
)
old_job = StoredJob(
id=existing["id"],
company_id=existing["company_id"],
external_id=existing["external_id"],
title=existing["title"],
url=existing["url"],
location=existing["location"],
department=existing["department"],
remote_type=existing["remote_type"],
first_seen=existing["first_seen"],
last_seen=existing["last_seen"],
status=existing["status"]
)
return False, old_job
else:
# Insert new job
conn.execute(
"""INSERT INTO jobs
(company_id, external_id, title, url, location, department, remote_type)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(company_id, job.external_id, job.title, job.url,
job.location, job.department, job.remote_type)
)
return True, None
def mark_jobs_removed(self, company_id: int, external_ids: set[str]) -> list[StoredJob]:
"""Mark jobs as removed. Returns the jobs that were marked removed."""
if not external_ids:
return []
removed = []
with self._get_conn() as conn:
placeholders = ",".join("?" * len(external_ids))
cursor = conn.execute(
f"""SELECT * FROM jobs
WHERE company_id = ? AND external_id IN ({placeholders}) AND status = 'active'""",
(company_id, *external_ids)
)
for row in cursor.fetchall():
removed.append(StoredJob(
id=row["id"],
company_id=row["company_id"],
external_id=row["external_id"],
title=row["title"],
url=row["url"],
location=row["location"],
department=row["department"],
remote_type=row["remote_type"],
first_seen=row["first_seen"],
last_seen=row["last_seen"],
status=row["status"]
))
conn.execute(
f"""UPDATE jobs SET status = 'removed', last_seen = ?
WHERE company_id = ? AND external_id IN ({placeholders})""",
(datetime.now(), company_id, *external_ids)
)
return removed
def get_all_active_jobs(self) -> list[tuple[str, StoredJob]]:
"""Get all active jobs across all companies. Returns (company_name, job) tuples."""
with self._get_conn() as conn:
cursor = conn.execute(
"""SELECT c.name as company_name, j.*
FROM jobs j
JOIN companies c ON j.company_id = c.id
WHERE j.status = 'active'
ORDER BY c.name, j.title"""
)
results = []
for row in cursor.fetchall():
job = StoredJob(
id=row["id"],
company_id=row["company_id"],
external_id=row["external_id"],
title=row["title"],
url=row["url"],
location=row["location"],
department=row["department"],
remote_type=row["remote_type"],
first_seen=row["first_seen"],
last_seen=row["last_seen"],
status=row["status"]
)
results.append((row["company_name"], job))
return results

35
docker-compose.yaml Normal file
View file

@ -0,0 +1,35 @@
services:
# Run scraper once (for manual/cron triggering)
scraper:
build: .
container_name: job-scraper
volumes:
- ./data:/app/data
- ./config.yaml:/app/config.yaml:ro
environment:
- TZ=America/Toronto
# Scheduled scraper - runs daily at 9 AM
scraper-scheduled:
build: .
container_name: job-scraper-scheduled
volumes:
- ./data:/app/data
- ./config.yaml:/app/config.yaml:ro
environment:
- TZ=America/Toronto
command: ["python", "main.py", "--schedule"]
restart: unless-stopped
# Web dashboard - lightweight static file server
dashboard:
image: nginx:alpine
container_name: job-dashboard
ports:
- "8080:80"
volumes:
- ./data:/usr/share/nginx/html:ro
- ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
restart: unless-stopped
depends_on:
- scraper

246
main.py Normal file
View file

@ -0,0 +1,246 @@
#!/usr/bin/env python3
"""
Job Scraper - Monitor job openings from companies you're interested in.
Usage:
python main.py # Run once
python main.py --schedule # Run daily at configured time
python main.py --list # List all tracked jobs
"""
import argparse
import sys
import time
from datetime import datetime
from pathlib import Path
import yaml
from db import Database
from notify import ChangeReport, Notifier
from scrapers import AshbyScraper, GreenhouseScraper, LeverScraper
from scrapers.base import BaseScraper, Job
from dashboard import generate_dashboard
def load_config(config_path: str = "config.yaml") -> dict:
"""Load configuration from YAML file."""
with open(config_path) as f:
return yaml.safe_load(f)
def get_scraper(company_config: dict) -> BaseScraper:
"""Create a scraper instance based on company configuration."""
platform = company_config["platform"]
name = company_config["name"]
if platform == "greenhouse":
return GreenhouseScraper(name, company_config["board_token"])
elif platform == "lever":
return LeverScraper(name, company_config["lever_company"])
elif platform == "ashby":
return AshbyScraper(name, company_config["ashby_company"])
else:
raise ValueError(f"Unknown platform: {platform}")
def filter_jobs_by_title(jobs: list[Job], title_filters: list[str]) -> list[Job]:
"""Filter jobs to only include those matching title keywords."""
if not title_filters:
return jobs
filtered = []
for job in jobs:
title_lower = job.title.lower()
if any(keyword.lower() in title_lower for keyword in title_filters):
filtered.append(job)
return filtered
def scrape_company(company_config: dict, db: Database, config: dict) -> ChangeReport:
"""Scrape jobs for a single company and detect changes."""
name = company_config["name"]
print(f"\n🔍 Scraping {name}...", end=" ", flush=True)
try:
with get_scraper(company_config) as scraper:
# Get current jobs from the career page
all_jobs = scraper.scrape()
# Filter by title keywords if configured
title_filters = config.get("title_filters", [])
current_jobs = filter_jobs_by_title(all_jobs, title_filters)
print(f"found {len(current_jobs)} jobs (of {len(all_jobs)} total)")
# Get or create company in database
company_id = db.get_or_create_company(
name,
jobs_url=company_config.get("board_token", company_config.get("lever_company", "")),
platform_type=company_config["platform"]
)
# Get stored jobs
stored_jobs = db.get_active_jobs(company_id)
# Detect changes
current_ids = {job.external_id for job in current_jobs}
stored_ids = set(stored_jobs.keys())
new_ids = current_ids - stored_ids
removed_ids = stored_ids - current_ids
# Process new jobs
new_jobs = []
for job in current_jobs:
is_new, _ = db.upsert_job(company_id, job)
if is_new:
new_jobs.append(job)
# Mark removed jobs
removed_jobs = db.mark_jobs_removed(company_id, removed_ids)
# Update last scraped time
db.update_company_scraped(company_id)
# Apply location filters to highlight relevant jobs
location_filters = config.get("location_filters", [])
if location_filters and new_jobs:
relevant_new = []
for job in new_jobs:
if job.location:
loc_lower = job.location.lower()
if any(f.lower() in loc_lower for f in location_filters):
relevant_new.append(job)
elif job.remote_type == "remote":
relevant_new.append(job)
if relevant_new:
print(f"{len(relevant_new)} jobs match your location filters!")
return ChangeReport(
company_name=name,
new_jobs=new_jobs,
removed_jobs=removed_jobs,
total_active=len(current_jobs)
)
except Exception as e:
print(f"ERROR: {e}")
return ChangeReport(
company_name=name,
new_jobs=[],
removed_jobs=[],
total_active=0
)
def run_scraper(config: dict):
"""Run the scraper for all configured companies."""
print(f"\n{'=' * 60}")
print(f"Job Scraper - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'=' * 60}")
db = Database()
notifier = Notifier(config.get("notifications", {}))
companies = config.get("companies", [])
print(f"\nMonitoring {len(companies)} companies...")
reports = []
delay = config.get("scraper", {}).get("request_delay", 2)
for i, company_config in enumerate(companies):
report = scrape_company(company_config, db, config)
reports.append(report)
# Delay between companies (be respectful!)
if i < len(companies) - 1:
time.sleep(delay)
# Send notifications
notifier.notify(reports)
# Summary
total_jobs = sum(r.total_active for r in reports)
total_new = sum(len(r.new_jobs) for r in reports)
total_removed = sum(len(r.removed_jobs) for r in reports)
print(f"\n📊 Total: {total_jobs} active jobs across {len(companies)} companies")
print(f" Changes: +{total_new} new, -{total_removed} removed")
# Generate dashboard
generate_dashboard()
def list_jobs(config: dict):
"""List all tracked jobs."""
db = Database()
jobs = db.get_all_active_jobs()
if not jobs:
print("No jobs tracked yet. Run the scraper first.")
return
print(f"\n{'=' * 60}")
print(f"All Tracked Jobs ({len(jobs)} total)")
print(f"{'=' * 60}")
current_company = None
for company_name, job in jobs:
if company_name != current_company:
print(f"\n📌 {company_name}")
print("-" * 40)
current_company = company_name
location = f" [{job.location}]" if job.location else ""
remote = " 🏠" if job.remote_type == "remote" else ""
print(f"{job.title}{location}{remote}")
print(f" {job.url}")
def run_scheduled(config: dict):
"""Run the scraper on a schedule."""
import schedule
print("Starting scheduled job scraper...")
print("Will run daily at 09:00")
print("Press Ctrl+C to stop\n")
# Run immediately on start
run_scraper(config)
# Schedule daily run
schedule.every().day.at("09:00").do(run_scraper, config)
while True:
schedule.run_pending()
time.sleep(60)
def main():
parser = argparse.ArgumentParser(description="Job Scraper - Monitor job openings")
parser.add_argument("--config", default="config.yaml", help="Path to config file")
parser.add_argument("--schedule", action="store_true", help="Run on a schedule")
parser.add_argument("--list", action="store_true", help="List all tracked jobs")
args = parser.parse_args()
# Load config
config_path = Path(args.config)
if not config_path.exists():
print(f"Error: Config file not found: {config_path}")
sys.exit(1)
config = load_config(args.config)
if args.list:
list_jobs(config)
elif args.schedule:
run_scheduled(config)
else:
run_scraper(config)
if __name__ == "__main__":
main()

24
nginx.conf Normal file
View file

@ -0,0 +1,24 @@
server {
listen 80;
server_name _;
root /usr/share/nginx/html;
# Serve dashboard.html as the index
location / {
try_files /dashboard.html =404;
}
# Cache static assets
location ~* \.(html|css|js)$ {
expires 5m;
add_header Cache-Control "public, no-transform";
}
# Security headers
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-Content-Type-Options "nosniff" always;
# Gzip
gzip on;
gzip_types text/html text/css application/javascript;
}

178
notify.py Normal file
View file

@ -0,0 +1,178 @@
from dataclasses import dataclass
from typing import Optional
import json
from db import StoredJob
from scrapers.base import Job
@dataclass
class ChangeReport:
"""Report of changes detected during a scrape."""
company_name: str
new_jobs: list[Job]
removed_jobs: list[StoredJob]
total_active: int
class Notifier:
"""Handles notifications for job changes."""
def __init__(self, config: dict):
self.config = config
def notify(self, reports: list[ChangeReport]):
"""Send notifications for all changes."""
# Filter to only reports with changes
reports_with_changes = [r for r in reports if r.new_jobs or r.removed_jobs]
if not reports_with_changes:
print("\n✓ No changes detected across all companies.")
return
# Console output (always)
self._notify_console(reports_with_changes)
# Email (if configured)
email_config = self.config.get("email")
if email_config:
self._notify_email(reports_with_changes, email_config)
# Slack (if configured)
slack_config = self.config.get("slack")
if slack_config:
self._notify_slack(reports_with_changes, slack_config)
def _notify_console(self, reports: list[ChangeReport]):
"""Print changes to console."""
print("\n" + "=" * 60)
print("JOB CHANGES DETECTED")
print("=" * 60)
total_new = sum(len(r.new_jobs) for r in reports)
total_removed = sum(len(r.removed_jobs) for r in reports)
print(f"\nSummary: {total_new} new jobs, {total_removed} removed jobs\n")
for report in reports:
print(f"\n📌 {report.company_name} ({report.total_active} active jobs)")
print("-" * 40)
if report.new_jobs:
print(f"\n 🆕 NEW JOBS ({len(report.new_jobs)}):")
for job in report.new_jobs:
location_str = f" [{job.location}]" if job.location else ""
remote_str = f" 🏠" if job.remote_type == "remote" else ""
print(f"{job.title}{location_str}{remote_str}")
print(f" {job.url}")
if report.removed_jobs:
print(f"\n ❌ REMOVED JOBS ({len(report.removed_jobs)}):")
for job in report.removed_jobs:
print(f"{job.title}")
print("\n" + "=" * 60)
def _notify_email(self, reports: list[ChangeReport], config: dict):
"""Send email notification."""
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# Build email body
body = self._build_html_report(reports)
msg = MIMEMultipart("alternative")
msg["Subject"] = f"Job Alert: {sum(len(r.new_jobs) for r in reports)} new positions"
msg["From"] = config["from_addr"]
msg["To"] = config["to_addr"]
msg.attach(MIMEText(body, "html"))
try:
with smtplib.SMTP(config["smtp_host"], config["smtp_port"]) as server:
server.starttls()
server.login(config["username"], config["password"])
server.send_message(msg)
print("✓ Email notification sent")
except Exception as e:
print(f"✗ Failed to send email: {e}")
def _notify_slack(self, reports: list[ChangeReport], config: dict):
"""Send Slack notification."""
import httpx
blocks = []
# Header
total_new = sum(len(r.new_jobs) for r in reports)
blocks.append({
"type": "header",
"text": {"type": "plain_text", "text": f"🔔 {total_new} New Job Openings"}
})
for report in reports:
if report.new_jobs:
blocks.append({"type": "divider"})
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{report.company_name}* ({len(report.new_jobs)} new)"
}
})
for job in report.new_jobs[:5]: # Limit to 5 per company
location = f"{job.location}" if job.location else ""
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"<{job.url}|{job.title}>{location}"
}
})
payload = {"blocks": blocks}
try:
response = httpx.post(config["webhook_url"], json=payload)
response.raise_for_status()
print("✓ Slack notification sent")
except Exception as e:
print(f"✗ Failed to send Slack notification: {e}")
def _build_html_report(self, reports: list[ChangeReport]) -> str:
"""Build HTML email body."""
total_new = sum(len(r.new_jobs) for r in reports)
html = f"""
<html>
<body style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto;">
<h1 style="color: #333;">🔔 {total_new} New Job Openings</h1>
"""
for report in reports:
if report.new_jobs:
html += f"""
<h2 style="color: #666; border-bottom: 1px solid #ddd; padding-bottom: 5px;">
{report.company_name}
</h2>
<ul>
"""
for job in report.new_jobs:
location = f" <span style='color: #888;'>({job.location})</span>" if job.location else ""
html += f"""
<li style="margin: 10px 0;">
<a href="{job.url}" style="color: #0066cc; text-decoration: none;">
{job.title}
</a>
{location}
</li>
"""
html += "</ul>"
html += """
</body>
</html>
"""
return html

5
requirements.txt Normal file
View file

@ -0,0 +1,5 @@
httpx>=0.27.0
beautifulsoup4>=4.12.0
lxml>=5.0.0
pyyaml>=6.0
schedule>=1.2.0

6
scrapers/__init__.py Normal file
View file

@ -0,0 +1,6 @@
from .base import BaseScraper, Job
from .greenhouse import GreenhouseScraper
from .lever import LeverScraper
from .ashby import AshbyScraper
__all__ = ["BaseScraper", "Job", "GreenhouseScraper", "LeverScraper", "AshbyScraper"]

51
scrapers/ashby.py Normal file
View file

@ -0,0 +1,51 @@
from .base import BaseScraper, Job
class AshbyScraper(BaseScraper):
"""
Scraper for companies using Ashby.
Ashby provides a JSON API endpoint.
Example: https://api.ashbyhq.com/posting-api/job-board/{company}
"""
def __init__(self, company_name: str, ashby_company: str, **kwargs):
# Ashby API endpoint
jobs_url = f"https://api.ashbyhq.com/posting-api/job-board/{ashby_company}"
super().__init__(company_name, jobs_url, **kwargs)
self.ashby_company = ashby_company
def scrape(self) -> list[Job]:
"""Scrape jobs from Ashby API."""
data = self.fetch_json()
jobs = []
for job_data in data.get("jobs", []):
job_id = job_data.get("id", "")
title = job_data.get("title", "")
job_url = job_data.get("jobUrl", "")
# Location info
location = job_data.get("location", "")
department = job_data.get("department", "")
# Employment type
employment_type = job_data.get("employmentType", "")
# Check for remote
is_remote = job_data.get("isRemote", False)
if is_remote:
remote_type = "remote"
else:
remote_type = self.classify_remote(location)
jobs.append(Job(
external_id=job_id,
title=title,
url=job_url,
location=location,
department=department,
remote_type=remote_type
))
return jobs

76
scrapers/base.py Normal file
View file

@ -0,0 +1,76 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import httpx
@dataclass
class Job:
"""Represents a job listing."""
external_id: str
title: str
url: str
location: Optional[str] = None
department: Optional[str] = None
remote_type: Optional[str] = None # 'remote', 'hybrid', 'onsite'
def __hash__(self):
return hash(self.external_id)
def __eq__(self, other):
if isinstance(other, Job):
return self.external_id == other.external_id
return False
class BaseScraper(ABC):
"""Base class for all job scrapers."""
def __init__(self, company_name: str, jobs_url: str, timeout: int = 30):
self.company_name = company_name
self.jobs_url = jobs_url
self.timeout = timeout
self.client = httpx.Client(
timeout=timeout,
headers={
"User-Agent": "JobScraper/1.0 (Personal job search tool)"
},
follow_redirects=True
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.client.close()
def fetch(self, url: Optional[str] = None) -> str:
"""Fetch the content from a URL."""
target_url = url or self.jobs_url
response = self.client.get(target_url)
response.raise_for_status()
return response.text
def fetch_json(self, url: Optional[str] = None) -> dict:
"""Fetch JSON from a URL."""
target_url = url or self.jobs_url
response = self.client.get(target_url)
response.raise_for_status()
return response.json()
@abstractmethod
def scrape(self) -> list[Job]:
"""Scrape jobs from the company's career page. Must be implemented by subclasses."""
pass
def classify_remote(self, location: str) -> Optional[str]:
"""Try to classify if a job is remote based on location text."""
if not location:
return None
location_lower = location.lower()
if "remote" in location_lower:
if "hybrid" in location_lower:
return "hybrid"
return "remote"
return "onsite"

42
scrapers/greenhouse.py Normal file
View file

@ -0,0 +1,42 @@
from .base import BaseScraper, Job
class GreenhouseScraper(BaseScraper):
"""
Scraper for companies using Greenhouse.
Greenhouse provides a JSON API at /embed/job_board/jobs endpoint.
Example: https://boards-api.greenhouse.io/v1/boards/{company}/jobs
"""
def __init__(self, company_name: str, board_token: str, **kwargs):
# Greenhouse API endpoint
jobs_url = f"https://boards-api.greenhouse.io/v1/boards/{board_token}/jobs"
super().__init__(company_name, jobs_url, **kwargs)
self.board_token = board_token
def scrape(self) -> list[Job]:
"""Scrape jobs from Greenhouse API."""
data = self.fetch_json()
jobs = []
for job_data in data.get("jobs", []):
job_id = str(job_data.get("id", ""))
title = job_data.get("title", "")
location = job_data.get("location", {}).get("name", "")
absolute_url = job_data.get("absolute_url", "")
# Get department if available
departments = job_data.get("departments", [])
department = departments[0].get("name") if departments else None
jobs.append(Job(
external_id=job_id,
title=title,
url=absolute_url,
location=location,
department=department,
remote_type=self.classify_remote(location)
))
return jobs

50
scrapers/lever.py Normal file
View file

@ -0,0 +1,50 @@
from .base import BaseScraper, Job
class LeverScraper(BaseScraper):
"""
Scraper for companies using Lever.
Lever provides a JSON API at /v0/postings/{company} endpoint.
Example: https://api.lever.co/v0/postings/{company}
"""
def __init__(self, company_name: str, lever_company: str, **kwargs):
# Lever API endpoint
jobs_url = f"https://api.lever.co/v0/postings/{lever_company}"
super().__init__(company_name, jobs_url, **kwargs)
self.lever_company = lever_company
def scrape(self) -> list[Job]:
"""Scrape jobs from Lever API."""
data = self.fetch_json()
jobs = []
for job_data in data:
job_id = job_data.get("id", "")
title = job_data.get("text", "")
hosted_url = job_data.get("hostedUrl", "")
# Location info
categories = job_data.get("categories", {})
location = categories.get("location", "")
department = categories.get("department", "")
commitment = categories.get("commitment", "") # Full-time, Part-time, etc.
# Check for remote in work type
work_type = categories.get("workplaceType", "")
if work_type:
remote_type = self.classify_remote(work_type)
else:
remote_type = self.classify_remote(location)
jobs.append(Job(
external_id=job_id,
title=title,
url=hosted_url,
location=location,
department=department,
remote_type=remote_type
))
return jobs