This page is an advanced user guide for jobcurator.
README.md.Job, Category, etc.) fit together, andJobA Job is the main unit you pass to JobCurator.
Typical fields:
id: str β unique identifier for the jobtitle: str | Nonetext: str | Nonecategories: dict[str, list[Category]]location: Location3DField | Nonesalary: SalaryField | Nonecompany: str | Nonecontract_type: str | Nonesource: str | Nonecreated_at: datetime | NoneJobCurator
length_score: floatcompletion_score_val: floatquality: float β used for ranking / selectionexact_hash: intsignature: int β 128-bit composite hash, used for diversity (Hamming)You create Job objects, then JobCurator enriches them with quality + hash metadata.
CategoryRepresents hierarchical category information (multi-level taxonomy).
Fields:
id: strlabel: strlevel: int β depth (0 = root)parent_id: str | Nonelevel_path: list[str] β e.g. ["Engineering", "Software", "Backend"]A job can have multiple category dimensions at once:
job.categories = {
"job_function": [
Category(
id="backend",
label="Backend",
level=2,
parent_id="software",
level_path=["Engineering", "Software", "Backend"],
)
],
"industry": [
Category(
id="saas",
label="SaaS",
level=1,
parent_id="tech",
level_path=["Technology", "SaaS"],
)
],
}
These categories are used in the hashing process (meta-hash, MinHash, FAISS vectors).
Location3DFieldA location with 3D coordinates for proper geo distance:
Inputs
lat: float β latitude (degrees)lon: float β longitude (degrees)alt_m: float β altitude in meters (optional)city: str | Nonecountry_code: str | NoneInternal
x, y, z: float β Earth-centered 3D coordinates (computed once)JobCurator uses these coordinates to:
SalaryFieldStructured salary information:
min_value: float | Nonemax_value: float | Nonecurrency: str β e.g. "EUR", "USD"period: str β e.g. "year", "month"Salary can be bucketized and used in the hashing / meta-hash steps.
JobCurator works entirely in memory on a list of Job objects:
Scores quality
Combines length, completion, freshness, etc. into a single quality score per job.
Computes hashes & signatures
Clusters similar jobs Using LSH, MinHash, FAISS, etc.
Selects a subset (compression) Respects:
ratio (e.g. keep 50%),alpha (quality vs diversity trade-off),max_per_cluster_in_pool).Canonical call:
compressed_jobs = curator.dedupe_and_compress(jobs)
In many real-world setups:
jobs1, jobs2, β¦),The incremental strategy uses three additional pieces:
CuckooFilter
StoreDB interface
There are ready-made implementations for:
SqlStoreDB)LocalFileStoreDB)Helpers for incremental flows
process_batch(store, jobs, curator)global_reselect_in_store(store, ratio, alpha)These live under:
from jobcurator.storage import (
StoreDB,
SqlStoreDB,
LocalFileStoreDB,
process_batch,
global_reselect_in_store,
)
Conceptually, StoreDB is:
βAnything that can store compressed jobs + one global CuckooFilter, and can list minimal per-job metadata when we want to rebalance.β
It needs to support:
CuckooFilter state
load_or_create_cuckoo(capacity) -> CuckooFiltersave_cuckoo(cf) -> NoneCompressed jobs
insert_compressed_jobs(compressed_jobs, backend)load_all_light_jobs() -> list[LightJob]overwrite_with_selected(selected_ids)The algorithmic core only needs, for each job:
idqualitysignatureEverything else (title, text, company, location, etc.) is for your own business needs.
process_batchUsed for each new batch of raw jobs:
from jobcurator.storage import process_batch
compressed_jobsN = process_batch(
store=my_store_db, # SqlStoreDB or LocalFileStoreDB
jobs=jobsN,
curator=my_curator,
)
What happens:
Run curator.dedupe_and_compress(jobsN, seen_filter=cuckoo_filter):
compressed_jobsN into storage.Result:
jobs1, jobs2, jobs3, β¦ in order,Over time, you may want to:
You can use:
from jobcurator.storage import global_reselect_in_store
global_reselect_in_store(
store=my_store_db,
ratio=0.5, # keep ~50% of stored compressed jobs
alpha=0.6, # trade-off between quality and diversity
)
What happens:
store.load_all_light_jobs() returns a list of light objects (id, quality, signature).A global greedy selection is run:
JobCurator.store.overwrite_with_selected(selected_ids) keeps only those jobs in storage.This gives you a globally consistent compressed set over multiple batches:
signature),alpha trade-off,from jobcurator import JobCurator
from jobcurator.storage import SqlStoreDB, process_batch, global_reselect_in_store
import psycopg2
# 1) Connect to your database
conn = psycopg2.connect("dbname=... user=... password=... host=...")
# 2) Choose a storage implementation
store = SqlStoreDB(conn)
# 3) Configure JobCurator
curator = JobCurator(
backend="default_hash",
ratio=0.5,
alpha=0.6,
max_per_cluster_in_pool=3,
d_sim_threshold=20,
max_cluster_distance_km=50.0,
use_multiprobe=True,
)
# 4) Process batches incrementally
compressed_jobs1 = process_batch(store, jobs1, curator)
compressed_jobs2 = process_batch(store, jobs2, curator)
compressed_jobs3 = process_batch(store, jobs3, curator)
# ...
# 5) Periodically rebalance globally
global_reselect_in_store(store, ratio=0.5, alpha=0.6)
Test with SQL storage (Postgres):
python3 test_incremental.py \
--backend default_hash \
--ratio 0.5 \
--alpha 0.6 \
--storage sql \
--dsn "dbname=mydb user=myuser password=mypass host=localhost port=5432" \
--batches 3 \
--n-per-batch 30 \
# --no-global-reselect # optional
from jobcurator import JobCurator
from jobcurator.storage import LocalFileStoreDB, process_batch, global_reselect_in_store
# 1) Use the local file-based store
store = LocalFileStoreDB() # defaults to ./data/compressed_jobs.jsonl, ./data/cuckoo_filter.pkl
# 2) Configure JobCurator as usual
curator = JobCurator(
backend="default_hash",
ratio=0.5,
alpha=0.6,
max_per_cluster_in_pool=3,
d_sim_threshold=20,
max_cluster_distance_km=50.0,
use_multiprobe=True,
)
# 3) Process incoming batches
compressed_jobs1 = process_batch(store, jobs1, curator)
compressed_jobs2 = process_batch(store, jobs2, curator)
# 4) Periodic global cleanup / rebalancing
global_reselect_in_store(store, ratio=0.5, alpha=0.6)
Test with local storage:
python3 test_incremental.py \
--backend default_hash \
--ratio 0.5 \
--alpha 0.6 \
--storage local \
--dsn "" \
--batches 3 \
--n-per-batch 20 \
--clear-local \
# --no-global-reselect # (optional) add this flag if you want to skip final global rebalancing
You should consider the incremental pipeline if:
You need a bounded global set of compressed jobs with:
If you just want to dedupe one big static snapshot once, you can call:
compressed_jobs = curator.dedupe_and_compress(jobs)
directly and ignore the incremental API.
For long-running production feeds, the combination of:
JobCurator (in-memory dedup/compression),CuckooFilter (seen set),StoreDB (persistence),process_batch + global_reselect_in_storegives you a clean, reusable pattern to scale over time.