jobcurator

🧠 JobCurator – Incremental Pipelines & Data Model

This page is an advanced user guide for jobcurator.


1. Core Data Model (Conceptual)

Job

A Job is the main unit you pass to JobCurator.

Typical fields:

You create Job objects, then JobCurator enriches them with quality + hash metadata.


Category

Represents hierarchical category information (multi-level taxonomy).

Fields:

A job can have multiple category dimensions at once:

job.categories = {
    "job_function": [
        Category(
            id="backend",
            label="Backend",
            level=2,
            parent_id="software",
            level_path=["Engineering", "Software", "Backend"],
        )
    ],
    "industry": [
        Category(
            id="saas",
            label="SaaS",
            level=1,
            parent_id="tech",
            level_path=["Technology", "SaaS"],
        )
    ],
}

These categories are used in the hashing process (meta-hash, MinHash, FAISS vectors).


Location3DField

A location with 3D coordinates for proper geo distance:

JobCurator uses these coordinates to:


SalaryField

Structured salary information:

Salary can be bucketized and used in the hashing / meta-hash steps.


2. What JobCurator Does (In Memory)

JobCurator works entirely in memory on a list of Job objects:

  1. Scores quality Combines length, completion, freshness, etc. into a single quality score per job.

  2. Computes hashes & signatures

    • exact hash (to remove strict duplicates)
    • SimHash / MinHash / FAISS vector signatures depending on backend
  3. Clusters similar jobs Using LSH, MinHash, FAISS, etc.

  4. Selects a subset (compression) Respects:

    • ratio (e.g. keep 50%),
    • alpha (quality vs diversity trade-off),
    • cluster-level pooling (max_per_cluster_in_pool).

Canonical call:

compressed_jobs = curator.dedupe_and_compress(jobs)

3. Incremental Pipelines: Key Concepts

In many real-world setups:

The incremental strategy uses three additional pieces:

  1. CuckooFilter

    • A compact β€œseen set” of exact hashes.
    • Lets you check β€œhave we already seen something that looks exactly like this job?”
    • Updated each time you process a batch.
  2. StoreDB interface

    • Abstracts where compressed jobs and the CuckooFilter are stored.
    • There are ready-made implementations for:

      • SQL (SqlStoreDB)
      • local files (LocalFileStoreDB)
  3. Helpers for incremental flows

    • process_batch(store, jobs, curator)
    • global_reselect_in_store(store, ratio, alpha)

These live under:

from jobcurator.storage import (
    StoreDB,
    SqlStoreDB,
    LocalFileStoreDB,
    process_batch,
    global_reselect_in_store,
)

4. StoreDB: The Storage Abstraction

Conceptually, StoreDB is:

β€œAnything that can store compressed jobs + one global CuckooFilter, and can list minimal per-job metadata when we want to rebalance.”

It needs to support:

The algorithmic core only needs, for each job:

Everything else (title, text, company, location, etc.) is for your own business needs.


5. Incremental Batch Processing

5.1 process_batch

Used for each new batch of raw jobs:

from jobcurator.storage import process_batch

compressed_jobsN = process_batch(
    store=my_store_db,  # SqlStoreDB or LocalFileStoreDB
    jobs=jobsN,
    curator=my_curator,
)

What happens:

  1. Load or create a global CuckooFilter from the store.
  2. Run curator.dedupe_and_compress(jobsN, seen_filter=cuckoo_filter):

    • dedup + compress inside the batch,
    • drop jobs that seem already seen (exact hash), based on previous batches.
  3. Insert the resulting compressed_jobsN into storage.
  4. Save the updated CuckooFilter back to the store.

Result:


6. Global Rebalancing (Quality + Diversity)

Over time, you may want to:

You can use:

from jobcurator.storage import global_reselect_in_store

global_reselect_in_store(
    store=my_store_db,
    ratio=0.5,   # keep ~50% of stored compressed jobs
    alpha=0.6,   # trade-off between quality and diversity
)

What happens:

  1. store.load_all_light_jobs() returns a list of light objects (id, quality, signature).
  2. A global greedy selection is run:

    • same quality + diversity logic as in JobCurator.
  3. store.overwrite_with_selected(selected_ids) keeps only those jobs in storage.

This gives you a globally consistent compressed set over multiple batches:


7. Concrete Examples

7.1 Incremental Pipeline with SQL

from jobcurator import JobCurator
from jobcurator.storage import SqlStoreDB, process_batch, global_reselect_in_store
import psycopg2

# 1) Connect to your database
conn = psycopg2.connect("dbname=... user=... password=... host=...")

# 2) Choose a storage implementation
store = SqlStoreDB(conn)

# 3) Configure JobCurator
curator = JobCurator(
    backend="default_hash",
    ratio=0.5,
    alpha=0.6,
    max_per_cluster_in_pool=3,
    d_sim_threshold=20,
    max_cluster_distance_km=50.0,
    use_multiprobe=True,
)

# 4) Process batches incrementally
compressed_jobs1 = process_batch(store, jobs1, curator)
compressed_jobs2 = process_batch(store, jobs2, curator)
compressed_jobs3 = process_batch(store, jobs3, curator)
# ...

# 5) Periodically rebalance globally
global_reselect_in_store(store, ratio=0.5, alpha=0.6)

Test with SQL storage (Postgres):

python3 test_incremental.py \
  --backend default_hash \
  --ratio 0.5 \
  --alpha 0.6 \
  --storage sql \
  --dsn "dbname=mydb user=myuser password=mypass host=localhost port=5432" \  
  --batches 3 \
  --n-per-batch 30 \
  # --no-global-reselect   # optional

7.2 Incremental Pipeline with Local Files

from jobcurator import JobCurator
from jobcurator.storage import LocalFileStoreDB, process_batch, global_reselect_in_store

# 1) Use the local file-based store
store = LocalFileStoreDB()  # defaults to ./data/compressed_jobs.jsonl, ./data/cuckoo_filter.pkl

# 2) Configure JobCurator as usual
curator = JobCurator(
    backend="default_hash",
    ratio=0.5,
    alpha=0.6,
    max_per_cluster_in_pool=3,
    d_sim_threshold=20,
    max_cluster_distance_km=50.0,
    use_multiprobe=True,
)

# 3) Process incoming batches
compressed_jobs1 = process_batch(store, jobs1, curator)
compressed_jobs2 = process_batch(store, jobs2, curator)

# 4) Periodic global cleanup / rebalancing
global_reselect_in_store(store, ratio=0.5, alpha=0.6)

Test with local storage:

python3 test_incremental.py \
  --backend default_hash \
  --ratio 0.5 \
  --alpha 0.6 \
  --storage local \
  --dsn "" \
  --batches 3 \
  --n-per-batch 20 \
  --clear-local \
  # --no-global-reselect   # (optional) add this flag if you want to skip final global rebalancing

8. When to Use the Incremental Approach

You should consider the incremental pipeline if:

If you just want to dedupe one big static snapshot once, you can call:

compressed_jobs = curator.dedupe_and_compress(jobs)

directly and ignore the incremental API.

For long-running production feeds, the combination of:

gives you a clean, reusable pattern to scale over time.