Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/mavedb/lib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,6 @@ class SubmissionEnqueueError(ValueError):
pass


class LinkingEnqueueError(ValueError):
"""Raised when a linking job fails to be enqueued despite appearing as if it should have been"""

pass


class UniProtIDMappingEnqueueError(Exception):
"""Raised when a UniProt ID mapping job fails to be enqueued despite appearing as if it should have been"""

Expand Down
19 changes: 15 additions & 4 deletions src/mavedb/routers/score_sets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2056,15 +2056,26 @@ async def publish_score_set(
db.refresh(item)

# await the insertion of this job into the worker queue, not the job itself.
job = await worker.enqueue_job("refresh_published_variants_view", correlation_id_for_context())
if job is not None:
save_to_logging_context({"worker_job_id": job.job_id})
logger.info(msg="Enqueud published variant materialized view refresh job.", extra=logging_context())
refresh_published_variants_job = await worker.enqueue_job(
"refresh_published_variants_view", correlation_id_for_context()
)
if refresh_published_variants_job is not None:
save_to_logging_context({"refresh_published_variants_job_id": refresh_published_variants_job.job_id})
logger.info(msg="Enqueued published variant materialized view refresh job.", extra=logging_context())
else:
logger.warning(
msg="Failed to enqueue published variant materialized view refresh job.", extra=logging_context()
)

submit_to_ldh_job = await worker.enqueue_job(
"submit_score_set_mappings_to_ldh", item.id, correlation_id_for_context()
)
if submit_to_ldh_job is not None:
save_to_logging_context({"submit_to_ldh_job_id": submit_to_ldh_job.job_id})
logger.info(msg="Enqueued submit score set mappings to LDH job.", extra=logging_context())
else:
logger.warning(msg="Failed to enqueue submit score set mappings to LDH job.", extra=logging_context())

enriched_experiment = enrich_experiment_with_num_score_sets(item.experiment, user_data)
return score_set.ScoreSet.model_validate(item).copy(update={"experiment": enriched_experiment})

Expand Down
401 changes: 32 additions & 369 deletions src/mavedb/worker/jobs.py

Large diffs are not rendered by default.

10 changes: 4 additions & 6 deletions src/mavedb/worker/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@
from mavedb.lib.logging.canonical import log_job
from mavedb.worker.jobs import (
create_variants_for_score_set,
link_gnomad_variants,
map_variants_for_score_set,
variant_mapper_manager,
poll_uniprot_mapping_jobs_for_score_set,
refresh_materialized_views,
refresh_published_variants_view,
submit_score_set_mappings_to_car,
submit_score_set_mappings_to_ldh,
link_clingen_variants,
poll_uniprot_mapping_jobs_for_score_set,
submit_uniprot_mapping_jobs_for_score_set,
link_gnomad_variants,
submit_score_set_mappings_to_car,
variant_mapper_manager,
)

# ARQ requires at least one task on startup.
Expand All @@ -30,7 +29,6 @@
map_variants_for_score_set,
refresh_published_variants_view,
submit_score_set_mappings_to_ldh,
link_clingen_variants,
poll_uniprot_mapping_jobs_for_score_set,
submit_uniprot_mapping_jobs_for_score_set,
link_gnomad_variants,
Expand Down
31 changes: 21 additions & 10 deletions tests/helpers/util/score_set.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from copy import deepcopy
from datetime import date
from typing import Any, Dict, Optional
from unittest.mock import patch
from unittest.mock import call, patch

import cdot.hgvs.dataproviders
import jsonschema
from arq import ArqRedis
from fastapi.testclient import TestClient
from sqlalchemy import select

Expand Down Expand Up @@ -165,9 +166,9 @@ def create_seq_score_set_with_variants(
count_columns_metadata_json_path,
)

assert score_set["numVariants"] == 3, (
f"Could not create sequence based score set with variants within experiment {experiment_urn}"
)
assert (
score_set["numVariants"] == 3
), f"Could not create sequence based score set with variants within experiment {experiment_urn}"

jsonschema.validate(instance=score_set, schema=ScoreSet.model_json_schema())
return score_set
Expand Down Expand Up @@ -196,9 +197,9 @@ def create_acc_score_set_with_variants(
count_columns_metadata_json_path,
)

assert score_set["numVariants"] == 3, (
f"Could not create sequence based score set with variants within experiment {experiment_urn}"
)
assert (
score_set["numVariants"] == 3
), f"Could not create sequence based score set with variants within experiment {experiment_urn}"

jsonschema.validate(instance=score_set, schema=ScoreSet.model_json_schema())
return score_set
Expand Down Expand Up @@ -272,9 +273,19 @@ def mock_worker_vrs_mapping(client, db, score_set, alleles=True):
return client.get(f"/api/v1/score-sets/{score_set['urn']}").json()


def publish_score_set(client: TestClient, score_set_urn: str) -> Dict[str, Any]:
response = client.post(f"/api/v1/score-sets/{score_set_urn}/publish")
assert response.status_code == 200, f"Could not publish score set {score_set_urn}"
def publish_score_set(client: TestClient, score_set_urn: str, score_set_id: int) -> Dict[str, Any]:
with (
patch.object(ArqRedis, "enqueue_job", return_value=None) as worker_queue,
patch("mavedb.routers.score_sets.correlation_id_for_context", return_value="test-correlation-id"),
):
response = client.post(f"/api/v1/score-sets/{score_set_urn}/publish")
assert response.status_code == 200, f"Could not publish score set {score_set_urn}"
worker_queue.assert_has_calls(
[
call("refresh_published_variants_view", "test-correlation-id"),
call("submit_score_set_mappings_to_ldh", score_set_id, "test-correlation-id"),
]
)

response_data = response.json()
return response_data
76 changes: 43 additions & 33 deletions tests/routers/test_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@

import re
from copy import deepcopy
from unittest.mock import patch

import jsonschema
import pytest
from sqlalchemy import select

arq = pytest.importorskip("arq")
cdot = pytest.importorskip("cdot")
fastapi = pytest.importorskip("fastapi")

from mavedb.lib.validation.urn_re import MAVEDB_COLLECTION_URN_RE
from mavedb.models.enums.contribution_role import ContributionRole
from mavedb.models.score_set import ScoreSet as ScoreSetDbModel
from mavedb.view_models.collection import Collection

from tests.helpers.constants import (
EXTRA_USER,
TEST_USER,
TEST_COLLECTION,
TEST_COLLECTION_RESPONSE,
TEST_USER,
)
from tests.helpers.dependency_overrider import DependencyOverrider
from tests.helpers.util.collection import create_collection
Expand Down Expand Up @@ -235,9 +235,10 @@ def test_admin_can_add_experiment_to_collection(
client, session, data_provider, unpublished_score_set, data_files / "scores.csv"
)

with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue:
score_set = publish_score_set(client, unpublished_score_set["urn"])
worker_queue.assert_called_once()
score_set_id = session.scalars(
select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"])
).one()
score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id)

collection = create_collection(client)
client.post(f"/api/v1/collections/{collection['urn']}/admins", json={"orcid_id": EXTRA_USER["username"]})
Expand Down Expand Up @@ -293,9 +294,10 @@ def test_editor_can_add_experiment_to_collection(
client, session, data_provider, unpublished_score_set, data_files / "scores.csv"
)

with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue:
score_set = publish_score_set(client, unpublished_score_set["urn"])
worker_queue.assert_called_once()
score_set_id = session.scalars(
select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"])
).one()
score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id)

collection = create_collection(client)
client.post(f"/api/v1/collections/{collection['urn']}/editors", json={"orcid_id": EXTRA_USER["username"]})
Expand Down Expand Up @@ -345,9 +347,10 @@ def test_viewer_cannot_add_experiment_to_collection(
client, session, data_provider, unpublished_score_set, data_files / "scores.csv"
)

with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue:
score_set = publish_score_set(client, unpublished_score_set["urn"])
worker_queue.assert_called_once()
score_set_id = session.scalars(
select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"])
).one()
score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id)

collection = create_collection(client)
client.post(f"/api/v1/collections/{collection['urn']}/viewers", json={"orcid_id": EXTRA_USER["username"]})
Expand All @@ -372,9 +375,10 @@ def test_unauthorized_user_cannot_add_experiment_to_collection(
client, session, data_provider, unpublished_score_set, data_files / "scores.csv"
)

with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue:
score_set = publish_score_set(client, unpublished_score_set["urn"])
worker_queue.assert_called_once()
score_set_id = session.scalars(
select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"])
).one()
score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id)

collection = create_collection(client)

Expand All @@ -397,9 +401,10 @@ def test_anonymous_cannot_add_experiment_to_collection(
client, session, data_provider, unpublished_score_set, data_files / "scores.csv"
)

with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue:
score_set = publish_score_set(client, unpublished_score_set["urn"])
worker_queue.assert_called_once()
score_set_id = session.scalars(
select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"])
).one()
score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id)

collection = create_collection(client)

Expand All @@ -422,9 +427,10 @@ def test_admin_can_add_score_set_to_collection(
client, session, data_provider, unpublished_score_set, data_files / "scores.csv"
)

with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue:
score_set = publish_score_set(client, unpublished_score_set["urn"])
worker_queue.assert_called_once()
score_set_id = session.scalars(
select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"])
).one()
score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id)

collection = create_collection(client)
client.post(f"/api/v1/collections/{collection['urn']}/admins", json={"orcid_id": EXTRA_USER["username"]})
Expand Down Expand Up @@ -479,9 +485,10 @@ def test_editor_can_add_score_set_to_collection(
client, session, data_provider, unpublished_score_set, data_files / "scores.csv"
)

with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue:
score_set = publish_score_set(client, unpublished_score_set["urn"])
worker_queue.assert_called_once()
score_set_id = session.scalars(
select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"])
).one()
score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id)

collection = create_collection(client)
client.post(f"/api/v1/collections/{collection['urn']}/editors", json={"orcid_id": EXTRA_USER["username"]})
Expand Down Expand Up @@ -530,9 +537,10 @@ def test_viewer_cannot_add_score_set_to_collection(
client, session, data_provider, unpublished_score_set, data_files / "scores.csv"
)

with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue:
score_set = publish_score_set(client, unpublished_score_set["urn"])
worker_queue.assert_called_once()
score_set_id = session.scalars(
select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"])
).one()
score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id)

collection = create_collection(client)
client.post(f"/api/v1/collections/{collection['urn']}/viewers", json={"orcid_id": EXTRA_USER["username"]})
Expand All @@ -556,9 +564,10 @@ def test_unauthorized_user_cannot_add_score_set_to_collection(
client, session, data_provider, unpublished_score_set, data_files / "scores.csv"
)

with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue:
score_set = publish_score_set(client, unpublished_score_set["urn"])
worker_queue.assert_called_once()
score_set_id = session.scalars(
select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"])
).one()
score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id)

collection = create_collection(client)

Expand All @@ -580,9 +589,10 @@ def test_anonymous_cannot_add_score_set_to_collection(
client, session, data_provider, unpublished_score_set, data_files / "scores.csv"
)

with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue:
score_set = publish_score_set(client, unpublished_score_set["urn"])
worker_queue.assert_called_once()
score_set_id = session.scalars(
select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"])
).one()
score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id)

collection = create_collection(client)

Expand Down
14 changes: 6 additions & 8 deletions tests/routers/test_experiment_set.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# ruff: noqa: E402

import pytest
from unittest.mock import patch
from sqlalchemy import select

arq = pytest.importorskip("arq")
cdot = pytest.importorskip("cdot")
Expand All @@ -10,7 +10,6 @@
from mavedb.models.experiment import Experiment as ExperimentDbModel
from mavedb.models.experiment_set import ExperimentSet as ExperimentSetDbModel
from mavedb.models.score_set import ScoreSet as ScoreSetDbModel

from tests.helpers.constants import (
TEST_USER,
)
Expand Down Expand Up @@ -154,9 +153,9 @@ def test_users_get_one_experiment_one_score_set_from_own_public_experiment_set(
score_set = create_seq_score_set_with_variants(
client, session, data_provider, experiment["urn"], data_files / "scores.csv"
)
with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue:
pub_score_set = publish_score_set(client, score_set["urn"])
worker_queue.assert_called_once()

score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one()
pub_score_set = publish_score_set(client, score_set["urn"], score_set_id)

response = client.get(f"/api/v1/experiment-sets/{pub_score_set['experiment']['experimentSetUrn']}")
assert response.status_code == 200
Expand All @@ -174,9 +173,8 @@ def test_users_get_one_experiment_one_score_set_from_other_public_experiment_set
score_set = create_seq_score_set_with_variants(
client, session, data_provider, experiment["urn"], data_files / "scores.csv"
)
with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue:
pub_score_set = publish_score_set(client, score_set["urn"])
worker_queue.assert_called_once()
score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one()
pub_score_set = publish_score_set(client, score_set["urn"], score_set_id)

change_ownership(session, pub_score_set["urn"], ScoreSetDbModel)
change_ownership(session, pub_score_set["experiment"]["urn"], ExperimentDbModel)
Expand Down
Loading