diff --git a/src/mavedb/lib/exceptions.py b/src/mavedb/lib/exceptions.py index 8734becb..c019262c 100644 --- a/src/mavedb/lib/exceptions.py +++ b/src/mavedb/lib/exceptions.py @@ -186,12 +186,6 @@ class SubmissionEnqueueError(ValueError): pass -class LinkingEnqueueError(ValueError): - """Raised when a linking job fails to be enqueued despite appearing as if it should have been""" - - pass - - class UniProtIDMappingEnqueueError(Exception): """Raised when a UniProt ID mapping job fails to be enqueued despite appearing as if it should have been""" diff --git a/src/mavedb/routers/score_sets.py b/src/mavedb/routers/score_sets.py index 959f9133..0df64afc 100644 --- a/src/mavedb/routers/score_sets.py +++ b/src/mavedb/routers/score_sets.py @@ -2056,15 +2056,26 @@ async def publish_score_set( db.refresh(item) # await the insertion of this job into the worker queue, not the job itself. - job = await worker.enqueue_job("refresh_published_variants_view", correlation_id_for_context()) - if job is not None: - save_to_logging_context({"worker_job_id": job.job_id}) - logger.info(msg="Enqueud published variant materialized view refresh job.", extra=logging_context()) + refresh_published_variants_job = await worker.enqueue_job( + "refresh_published_variants_view", correlation_id_for_context() + ) + if refresh_published_variants_job is not None: + save_to_logging_context({"refresh_published_variants_job_id": refresh_published_variants_job.job_id}) + logger.info(msg="Enqueued published variant materialized view refresh job.", extra=logging_context()) else: logger.warning( msg="Failed to enqueue published variant materialized view refresh job.", extra=logging_context() ) + submit_to_ldh_job = await worker.enqueue_job( + "submit_score_set_mappings_to_ldh", item.id, correlation_id_for_context() + ) + if submit_to_ldh_job is not None: + save_to_logging_context({"submit_to_ldh_job_id": submit_to_ldh_job.job_id}) + logger.info(msg="Enqueued submit score set mappings to LDH job.", extra=logging_context()) + else: + logger.warning(msg="Failed to enqueue submit score set mappings to LDH job.", extra=logging_context()) + enriched_experiment = enrich_experiment_with_num_score_sets(item.experiment, user_data) return score_set.ScoreSet.model_validate(item).copy(update={"experiment": enriched_experiment}) diff --git a/src/mavedb/worker/jobs.py b/src/mavedb/worker/jobs.py index 6bd673be..cb03be1e 100644 --- a/src/mavedb/worker/jobs.py +++ b/src/mavedb/worker/jobs.py @@ -9,7 +9,7 @@ from arq import ArqRedis from arq.jobs import Job, JobStatus from cdot.hgvs.dataproviders import RESTDataProvider -from sqlalchemy import cast, delete, null, select +from sqlalchemy import cast, delete, func, null, select from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Session @@ -20,18 +20,14 @@ CLIN_GEN_SUBMISSION_ENABLED, DEFAULT_LDH_SUBMISSION_BATCH_SIZE, LDH_SUBMISSION_ENDPOINT, - LINKED_DATA_RETRY_THRESHOLD, ) from mavedb.lib.clingen.content_constructors import construct_ldh_submission from mavedb.lib.clingen.services import ( ClinGenAlleleRegistryService, ClinGenLdhService, - clingen_allele_id_from_ldh_variation, get_allele_registry_associations, - get_clingen_variation, ) from mavedb.lib.exceptions import ( - LinkingEnqueueError, MappingEnqueueError, NonexistentMappingReferenceError, NonexistentMappingResultsError, @@ -504,6 +500,7 @@ async def map_variants_for_score_set( mapping_api_version=mapping_results["dcd_mapping_version"], error_message=mapped_score.get("error_message", null()), current=True, + hgvs_assay_level=get_hgvs_from_post_mapped(mapped_score.get("post_mapped")), ) db.add(mapped_variant) @@ -845,7 +842,6 @@ async def submit_score_set_mappings_to_car(ctx: dict, correlation_id: str, score text = "Could not submit mappings to ClinGen Allele Registry for score set %s. Mappings for this score set should be submitted manually." try: db: Session = ctx["db"] - redis: ArqRedis = ctx["redis"] score_set = db.scalars(select(ScoreSet).where(ScoreSet.id == score_set_id)).one() logging_context = setup_job_state(ctx, None, score_set.urn, correlation_id) @@ -874,43 +870,50 @@ async def submit_score_set_mappings_to_car(ctx: dict, correlation_id: str, score try: variant_post_mapped_objects = db.execute( - select(MappedVariant.id, MappedVariant.post_mapped) + select(MappedVariant.id, MappedVariant.hgvs_assay_level) .join(Variant) .join(ScoreSet) .where(ScoreSet.urn == score_set.urn) - .where(MappedVariant.post_mapped.is_not(None)) + .where(MappedVariant.hgvs_assay_level.is_not(None)) .where(MappedVariant.current.is_(True)) ).all() + variants_without_assay_level_hgvs = db.execute( + select(func.count(MappedVariant.id)) + .join(Variant) + .join(ScoreSet) + .where(ScoreSet.urn == score_set.urn) + .where(MappedVariant.hgvs_assay_level.is_(None)) + ).scalar_one() + + logging_context["variants_with_assay_level_hgvs"] = len(variant_post_mapped_objects) + logging_context["variants_without_assay_level_hgvs"] = variants_without_assay_level_hgvs if not variant_post_mapped_objects: logger.warning( - msg="No current mapped variants with post mapped metadata were found for this score set. Skipping CAR submission.", + msg="No current mapped variants with assay level HGVS were found for this score set. Skipping CAR submission.", extra=logging_context, ) return {"success": True, "retried": False, "enqueued_job": None} - variant_post_mapped_hgvs: dict[str, list[int]] = {} - for mapped_variant_id, post_mapped in variant_post_mapped_objects: - hgvs_for_post_mapped = get_hgvs_from_post_mapped(post_mapped) - - if not hgvs_for_post_mapped: - logger.warning( - msg=f"Could not construct a valid HGVS string for mapped variant {mapped_variant_id}. Skipping submission of this variant.", - extra=logging_context, - ) - continue + if variants_without_assay_level_hgvs > 0: + logger.warning( + msg="Some mapped variants for this score set are missing assay level HGVS strings. These variants will be skipped during CAR submission.", + extra=logging_context, + ) - if hgvs_for_post_mapped in variant_post_mapped_hgvs: - variant_post_mapped_hgvs[hgvs_for_post_mapped].append(mapped_variant_id) + variant_post_mapped_hgvs: dict[str, list[int]] = {} + for mapped_variant_id, assay_level_hgvs in variant_post_mapped_objects: + if assay_level_hgvs in variant_post_mapped_hgvs: + variant_post_mapped_hgvs[assay_level_hgvs].append(mapped_variant_id) else: - variant_post_mapped_hgvs[hgvs_for_post_mapped] = [mapped_variant_id] + variant_post_mapped_hgvs[assay_level_hgvs] = [mapped_variant_id] except Exception as e: send_slack_error(e) send_slack_message(text=text % score_set.urn) logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} logger.error( - msg="LDH mapped resource submission encountered an unexpected error while attempting to construct post mapped HGVS strings. This job will not be retried.", + msg="CAR mapped resource submission encountered an unexpected error while attempting to build a submission resource of assay level HGVS strings. This job will not be retried.", extra=logging_context, ) @@ -931,7 +934,7 @@ async def submit_score_set_mappings_to_car(ctx: dict, correlation_id: str, score send_slack_message(text=text % score_set.urn) logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} logger.error( - msg="LDH mapped resource submission encountered an unexpected error while attempting to authenticate to the LDH. This job will not be retried.", + msg="CAR mapped resource submission encountered an unexpected error while attempting to authenticate to the CAR. This job will not be retried.", extra=logging_context, ) @@ -960,38 +963,8 @@ async def submit_score_set_mappings_to_car(ctx: dict, correlation_id: str, score return {"success": False, "retried": False, "enqueued_job": None} - new_job_id = None - try: - new_job = await redis.enqueue_job( - "submit_score_set_mappings_to_ldh", - correlation_id, - score_set.id, - ) - - if new_job: - new_job_id = new_job.job_id - - logging_context["submit_clingen_ldh_variants_job_id"] = new_job_id - logger.info(msg="Queued a new ClinGen submission job.", extra=logging_context) - - else: - raise SubmissionEnqueueError() - - except Exception as e: - send_slack_error(e) - send_slack_message( - f"Could not submit mappings to LDH for score set {score_set.urn}. Mappings for this score set should be submitted manually." - ) - logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} - logger.error( - msg="Mapped variant ClinGen submission encountered an unexpected error while attempting to enqueue a submission job. This job will not be retried.", - extra=logging_context, - ) - - return {"success": False, "retried": False, "enqueued_job": new_job_id} - ctx["state"][ctx["job_id"]] = logging_context.copy() - return {"success": True, "retried": False, "enqueued_job": new_job_id} + return {"success": True, "retried": False, "enqueued_job": None} async def submit_score_set_mappings_to_ldh(ctx: dict, correlation_id: str, score_set_id: int): @@ -1002,7 +975,6 @@ async def submit_score_set_mappings_to_ldh(ctx: dict, correlation_id: str, score ) try: db: Session = ctx["db"] - redis: ArqRedis = ctx["redis"] score_set = db.scalars(select(ScoreSet).where(ScoreSet.id == score_set_id)).one() logging_context = setup_job_state(ctx, None, score_set.urn, correlation_id) @@ -1062,16 +1034,14 @@ async def submit_score_set_mappings_to_ldh(ctx: dict, correlation_id: str, score variant_content = [] for variant, mapped_variant in variant_objects: - variation = get_hgvs_from_post_mapped(mapped_variant.post_mapped) - - if not variation: + if not mapped_variant.hgvs_assay_level: logger.warning( - msg=f"Could not construct a valid HGVS string for mapped variant {mapped_variant.id}. Skipping submission of this variant.", + msg=f"No valid assay level HGVS string for mapped variant {mapped_variant.id}. Skipping submission of this variant.", extra=logging_context, ) continue - variant_content.append((variation, variant, mapped_variant)) + variant_content.append((mapped_variant.hgvs_assay_level, variant, mapped_variant)) submission_content = construct_ldh_submission(variant_content) @@ -1120,314 +1090,7 @@ async def submit_score_set_mappings_to_ldh(ctx: dict, correlation_id: str, score return {"success": False, "retried": False, "enqueued_job": None} - new_job_id = None - try: - new_job = await redis.enqueue_job( - "link_clingen_variants", - correlation_id, - score_set.id, - 1, - _defer_by=timedelta(seconds=LINKING_BACKOFF_IN_SECONDS), - ) - - if new_job: - new_job_id = new_job.job_id - - logging_context["link_clingen_variants_job_id"] = new_job_id - logger.info(msg="Queued a new ClinGen linking job.", extra=logging_context) - - else: - raise LinkingEnqueueError() - - except Exception as e: - send_slack_error(e) - send_slack_message(text=text % score_set.urn) - logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} - logger.error( - msg="LDH mapped resource submission encountered an unexpected error while attempting to enqueue a linking job. This job will not be retried.", - extra=logging_context, - ) - - return {"success": False, "retried": False, "enqueued_job": new_job_id} - - return {"success": True, "retried": False, "enqueued_job": new_job_id} - - -def do_clingen_fetch(variant_urns): - return [(variant_urn, get_clingen_variation(variant_urn)) for variant_urn in variant_urns] - - -async def link_clingen_variants(ctx: dict, correlation_id: str, score_set_id: int, attempt: int) -> dict: - logging_context = {} - score_set = None - text = "Could not link mappings to LDH for score set %s. Mappings for this score set should be linked manually." - try: - db: Session = ctx["db"] - redis: ArqRedis = ctx["redis"] - score_set = db.scalars(select(ScoreSet).where(ScoreSet.id == score_set_id)).one() - - logging_context = setup_job_state(ctx, None, score_set.urn, correlation_id) - logging_context["linkage_retry_threshold"] = LINKED_DATA_RETRY_THRESHOLD - logging_context["attempt"] = attempt - logging_context["max_attempts"] = BACKOFF_LIMIT - logger.info(msg="Started LDH mapped resource linkage", extra=logging_context) - - submission_urn = score_set.urn - assert submission_urn, "A valid URN is needed to link LDH objects for this score set." - - logging_context["current_ldh_linking_resource"] = submission_urn - logger.debug(msg="Fetched score set metadata for ldh mapped resource linkage.", extra=logging_context) - - except Exception as e: - send_slack_error(e) - if score_set: - send_slack_message(text=text % score_set.urn) - else: - send_slack_message(text=text % score_set_id) - - logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} - logger.error( - msg="LDH mapped resource linkage encountered an unexpected error during setup. This job will not be retried.", - extra=logging_context, - ) - - return {"success": False, "retried": False, "enqueued_job": None} - - try: - variant_urns = db.scalars( - select(Variant.urn) - .join(MappedVariant) - .join(ScoreSet) - .where( - ScoreSet.urn == score_set.urn, MappedVariant.current.is_(True), MappedVariant.post_mapped.is_not(None) - ) - ).all() - num_variant_urns = len(variant_urns) - - logging_context["variants_to_link_ldh"] = num_variant_urns - - if not variant_urns: - logger.warning( - msg="No current mapped variants with post mapped metadata were found for this score set. Skipping LDH linkage (nothing to do). A gnomAD linkage job will not be enqueued, as no variants will have a CAID.", - extra=logging_context, - ) - - return {"success": True, "retried": False, "enqueued_job": None} - - logger.info( - msg="Found current mapped variants with post mapped metadata for this score set. Attempting to link them to LDH submissions.", - extra=logging_context, - ) - - except Exception as e: - send_slack_error(e) - send_slack_message(text=text % score_set.urn) - logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} - logger.error( - msg="LDH mapped resource linkage encountered an unexpected error while attempting to build linkage urn list. This job will not be retried.", - extra=logging_context, - ) - - return {"success": False, "retried": False, "enqueued_job": None} - - try: - logger.info(msg="Attempting to link mapped variants to LDH submissions.", extra=logging_context) - - # TODO#372: Non-nullable variant urns. - blocking = functools.partial( - do_clingen_fetch, - variant_urns, # type: ignore - ) - loop = asyncio.get_running_loop() - linked_data = await loop.run_in_executor(ctx["pool"], blocking) - - except Exception as e: - send_slack_error(e) - send_slack_message(text=text % score_set.urn) - logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} - logger.error( - msg="LDH mapped resource linkage encountered an unexpected error while attempting to link LDH submissions. This job will not be retried.", - extra=logging_context, - ) - - return {"success": False, "retried": False, "enqueued_job": None} - - try: - linked_allele_ids = [ - (variant_urn, clingen_allele_id_from_ldh_variation(clingen_variation)) - for variant_urn, clingen_variation in linked_data - ] - - linkage_failures = [] - for variant_urn, ldh_variation in linked_allele_ids: - # XXX: Should we unlink variation if it is not found? Does this constitute a failure? - if not ldh_variation: - logger.warning( - msg=f"Failed to link mapped variant {variant_urn} to LDH submission. No LDH variation found.", - extra=logging_context, - ) - linkage_failures.append(variant_urn) - continue - - mapped_variant = db.scalars( - select(MappedVariant).join(Variant).where(Variant.urn == variant_urn, MappedVariant.current.is_(True)) - ).one_or_none() - - if not mapped_variant: - logger.warning( - msg=f"Failed to link mapped variant {variant_urn} to LDH submission. No mapped variant found.", - extra=logging_context, - ) - linkage_failures.append(variant_urn) - continue - - mapped_variant.clingen_allele_id = ldh_variation - db.add(mapped_variant) - - db.commit() - - except Exception as e: - db.rollback() - - send_slack_error(e) - send_slack_message(text=text % score_set.urn) - logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} - logger.error( - msg="LDH mapped resource linkage encountered an unexpected error while attempting to link LDH submissions. This job will not be retried.", - extra=logging_context, - ) - - return {"success": False, "retried": False, "enqueued_job": None} - - try: - num_linkage_failures = len(linkage_failures) - ratio_failed_linking = round(num_linkage_failures / num_variant_urns, 3) - logging_context["linkage_failure_rate"] = ratio_failed_linking - logging_context["linkage_failures"] = num_linkage_failures - logging_context["linkage_successes"] = num_variant_urns - num_linkage_failures - - assert ( - len(linked_allele_ids) == num_variant_urns - ), f"{num_variant_urns - len(linked_allele_ids)} appear to not have been attempted to be linked." - - job_succeeded = False - if not linkage_failures: - logger.info( - msg="Successfully linked all mapped variants to LDH submissions.", - extra=logging_context, - ) - - job_succeeded = True - - elif ratio_failed_linking < LINKED_DATA_RETRY_THRESHOLD: - logger.warning( - msg="Linkage failures exist, but did not exceed the retry threshold.", - extra=logging_context, - ) - send_slack_message( - text=f"Failed to link {len(linkage_failures)} mapped variants to LDH submissions for score set {score_set.urn}." - f"The retry threshold was not exceeded and this job will not be retried. URNs failed to link: {', '.join(linkage_failures)}." - ) - - job_succeeded = True - - except Exception as e: - send_slack_error(e) - send_slack_message(text=text % score_set.urn) - logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} - logger.error( - msg="LDH mapped resource linkage encountered an unexpected error while attempting to finalize linkage. This job will not be retried.", - extra=logging_context, - ) - - return {"success": False, "retried": False, "enqueued_job": None} - - if job_succeeded: - gnomad_linking_job_id = None - try: - new_job = await redis.enqueue_job( - "link_gnomad_variants", - correlation_id, - score_set.id, - ) - - if new_job: - gnomad_linking_job_id = new_job.job_id - - logging_context["link_gnomad_variants_job_id"] = gnomad_linking_job_id - logger.info(msg="Queued a new gnomAD linking job.", extra=logging_context) - - else: - raise LinkingEnqueueError() - - except Exception as e: - job_succeeded = False - - send_slack_error(e) - send_slack_message(text=text % score_set.urn) - logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} - logger.error( - msg="LDH mapped resource linkage encountered an unexpected error while attempting to enqueue a gnomAD linking job. GnomAD variants should be linked manually for this score set. This job will not be retried.", - extra=logging_context, - ) - finally: - return {"success": job_succeeded, "retried": False, "enqueued_job": gnomad_linking_job_id} - - # If we reach this point, we should consider the job failed (there were failures which exceeded our retry threshold). - new_job_id = None - max_retries_exceeded = None - try: - new_job_id, max_retries_exceeded, backoff_time = await enqueue_job_with_backoff( - ctx["redis"], "variant_mapper_manager", attempt, LINKING_BACKOFF_IN_SECONDS, correlation_id - ) - - logging_context["backoff_limit_exceeded"] = max_retries_exceeded - logging_context["backoff_deferred_in_seconds"] = backoff_time - logging_context["backoff_job_id"] = new_job_id - - except Exception as e: - send_slack_error(e) - send_slack_message(text=text % score_set.urn) - logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} - logger.critical( - msg="LDH mapped resource linkage encountered an unexpected error while attempting to retry a failed linkage job. This job will not be retried.", - extra=logging_context, - ) - else: - if new_job_id and not max_retries_exceeded: - logger.info( - msg="After a failure condition while linking mapped variants to LDH submissions, another linkage job was queued.", - extra=logging_context, - ) - send_slack_message( - text=f"Failed to link {len(linkage_failures)} ({ratio_failed_linking * 100}% of total mapped variants for {score_set.urn})." - f"This job was successfully retried. This was attempt {attempt}. Retry will occur in {backoff_time} seconds. URNs failed to link: {', '.join(linkage_failures)}." - ) - elif new_job_id is None and not max_retries_exceeded: - logger.error( - msg="After a failure condition while linking mapped variants to LDH submissions, another linkage job was unable to be queued.", - extra=logging_context, - ) - send_slack_message( - text=f"Failed to link {len(linkage_failures)} ({ratio_failed_linking} of total mapped variants for {score_set.urn})." - f"This job could not be retried due to an unexpected issue while attempting to enqueue another linkage job. This was attempt {attempt}. URNs failed to link: {', '.join(linkage_failures)}." - ) - else: - logger.error( - msg="After a failure condition while linking mapped variants to LDH submissions, the maximum retries for this job were exceeded. The reamining linkage failures will not be retried.", - extra=logging_context, - ) - send_slack_message( - text=f"Failed to link {len(linkage_failures)} ({ratio_failed_linking} of total mapped variants for {score_set.urn})." - f"The retry threshold was exceeded and this job will not be retried. URNs failed to link: {', '.join(linkage_failures)}." - ) - - finally: - return { - "success": False, - "retried": (not max_retries_exceeded and new_job_id is not None), - "enqueued_job": new_job_id, - } + return {"success": True, "retried": False, "enqueued_job": None} ######################################################################################################## diff --git a/src/mavedb/worker/settings.py b/src/mavedb/worker/settings.py index 0a9359d5..6a82b434 100644 --- a/src/mavedb/worker/settings.py +++ b/src/mavedb/worker/settings.py @@ -11,16 +11,15 @@ from mavedb.lib.logging.canonical import log_job from mavedb.worker.jobs import ( create_variants_for_score_set, + link_gnomad_variants, map_variants_for_score_set, - variant_mapper_manager, + poll_uniprot_mapping_jobs_for_score_set, refresh_materialized_views, refresh_published_variants_view, + submit_score_set_mappings_to_car, submit_score_set_mappings_to_ldh, - link_clingen_variants, - poll_uniprot_mapping_jobs_for_score_set, submit_uniprot_mapping_jobs_for_score_set, - link_gnomad_variants, - submit_score_set_mappings_to_car, + variant_mapper_manager, ) # ARQ requires at least one task on startup. @@ -30,7 +29,6 @@ map_variants_for_score_set, refresh_published_variants_view, submit_score_set_mappings_to_ldh, - link_clingen_variants, poll_uniprot_mapping_jobs_for_score_set, submit_uniprot_mapping_jobs_for_score_set, link_gnomad_variants, diff --git a/tests/helpers/util/score_set.py b/tests/helpers/util/score_set.py index b2a8b2c6..59d25826 100644 --- a/tests/helpers/util/score_set.py +++ b/tests/helpers/util/score_set.py @@ -1,10 +1,11 @@ from copy import deepcopy from datetime import date from typing import Any, Dict, Optional -from unittest.mock import patch +from unittest.mock import call, patch import cdot.hgvs.dataproviders import jsonschema +from arq import ArqRedis from fastapi.testclient import TestClient from sqlalchemy import select @@ -165,9 +166,9 @@ def create_seq_score_set_with_variants( count_columns_metadata_json_path, ) - assert score_set["numVariants"] == 3, ( - f"Could not create sequence based score set with variants within experiment {experiment_urn}" - ) + assert ( + score_set["numVariants"] == 3 + ), f"Could not create sequence based score set with variants within experiment {experiment_urn}" jsonschema.validate(instance=score_set, schema=ScoreSet.model_json_schema()) return score_set @@ -196,9 +197,9 @@ def create_acc_score_set_with_variants( count_columns_metadata_json_path, ) - assert score_set["numVariants"] == 3, ( - f"Could not create sequence based score set with variants within experiment {experiment_urn}" - ) + assert ( + score_set["numVariants"] == 3 + ), f"Could not create sequence based score set with variants within experiment {experiment_urn}" jsonschema.validate(instance=score_set, schema=ScoreSet.model_json_schema()) return score_set @@ -272,9 +273,19 @@ def mock_worker_vrs_mapping(client, db, score_set, alleles=True): return client.get(f"/api/v1/score-sets/{score_set['urn']}").json() -def publish_score_set(client: TestClient, score_set_urn: str) -> Dict[str, Any]: - response = client.post(f"/api/v1/score-sets/{score_set_urn}/publish") - assert response.status_code == 200, f"Could not publish score set {score_set_urn}" +def publish_score_set(client: TestClient, score_set_urn: str, score_set_id: int) -> Dict[str, Any]: + with ( + patch.object(ArqRedis, "enqueue_job", return_value=None) as worker_queue, + patch("mavedb.routers.score_sets.correlation_id_for_context", return_value="test-correlation-id"), + ): + response = client.post(f"/api/v1/score-sets/{score_set_urn}/publish") + assert response.status_code == 200, f"Could not publish score set {score_set_urn}" + worker_queue.assert_has_calls( + [ + call("refresh_published_variants_view", "test-correlation-id"), + call("submit_score_set_mappings_to_ldh", score_set_id, "test-correlation-id"), + ] + ) response_data = response.json() return response_data diff --git a/tests/routers/test_collections.py b/tests/routers/test_collections.py index 3b3bec65..0f845656 100644 --- a/tests/routers/test_collections.py +++ b/tests/routers/test_collections.py @@ -2,10 +2,10 @@ import re from copy import deepcopy -from unittest.mock import patch import jsonschema import pytest +from sqlalchemy import select arq = pytest.importorskip("arq") cdot = pytest.importorskip("cdot") @@ -13,13 +13,13 @@ from mavedb.lib.validation.urn_re import MAVEDB_COLLECTION_URN_RE from mavedb.models.enums.contribution_role import ContributionRole +from mavedb.models.score_set import ScoreSet as ScoreSetDbModel from mavedb.view_models.collection import Collection - from tests.helpers.constants import ( EXTRA_USER, - TEST_USER, TEST_COLLECTION, TEST_COLLECTION_RESPONSE, + TEST_USER, ) from tests.helpers.dependency_overrider import DependencyOverrider from tests.helpers.util.collection import create_collection @@ -235,9 +235,10 @@ def test_admin_can_add_experiment_to_collection( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) collection = create_collection(client) client.post(f"/api/v1/collections/{collection['urn']}/admins", json={"orcid_id": EXTRA_USER["username"]}) @@ -293,9 +294,10 @@ def test_editor_can_add_experiment_to_collection( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) collection = create_collection(client) client.post(f"/api/v1/collections/{collection['urn']}/editors", json={"orcid_id": EXTRA_USER["username"]}) @@ -345,9 +347,10 @@ def test_viewer_cannot_add_experiment_to_collection( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) collection = create_collection(client) client.post(f"/api/v1/collections/{collection['urn']}/viewers", json={"orcid_id": EXTRA_USER["username"]}) @@ -372,9 +375,10 @@ def test_unauthorized_user_cannot_add_experiment_to_collection( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) collection = create_collection(client) @@ -397,9 +401,10 @@ def test_anonymous_cannot_add_experiment_to_collection( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) collection = create_collection(client) @@ -422,9 +427,10 @@ def test_admin_can_add_score_set_to_collection( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) collection = create_collection(client) client.post(f"/api/v1/collections/{collection['urn']}/admins", json={"orcid_id": EXTRA_USER["username"]}) @@ -479,9 +485,10 @@ def test_editor_can_add_score_set_to_collection( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) collection = create_collection(client) client.post(f"/api/v1/collections/{collection['urn']}/editors", json={"orcid_id": EXTRA_USER["username"]}) @@ -530,9 +537,10 @@ def test_viewer_cannot_add_score_set_to_collection( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) collection = create_collection(client) client.post(f"/api/v1/collections/{collection['urn']}/viewers", json={"orcid_id": EXTRA_USER["username"]}) @@ -556,9 +564,10 @@ def test_unauthorized_user_cannot_add_score_set_to_collection( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) collection = create_collection(client) @@ -580,9 +589,10 @@ def test_anonymous_cannot_add_score_set_to_collection( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) collection = create_collection(client) diff --git a/tests/routers/test_experiment_set.py b/tests/routers/test_experiment_set.py index ebf5c9da..4c6e20d3 100644 --- a/tests/routers/test_experiment_set.py +++ b/tests/routers/test_experiment_set.py @@ -1,7 +1,7 @@ # ruff: noqa: E402 import pytest -from unittest.mock import patch +from sqlalchemy import select arq = pytest.importorskip("arq") cdot = pytest.importorskip("cdot") @@ -10,7 +10,6 @@ from mavedb.models.experiment import Experiment as ExperimentDbModel from mavedb.models.experiment_set import ExperimentSet as ExperimentSetDbModel from mavedb.models.score_set import ScoreSet as ScoreSetDbModel - from tests.helpers.constants import ( TEST_USER, ) @@ -154,9 +153,9 @@ def test_users_get_one_experiment_one_score_set_from_own_public_experiment_set( score_set = create_seq_score_set_with_variants( client, session, data_provider, experiment["urn"], data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - pub_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + pub_score_set = publish_score_set(client, score_set["urn"], score_set_id) response = client.get(f"/api/v1/experiment-sets/{pub_score_set['experiment']['experimentSetUrn']}") assert response.status_code == 200 @@ -174,9 +173,8 @@ def test_users_get_one_experiment_one_score_set_from_other_public_experiment_set score_set = create_seq_score_set_with_variants( client, session, data_provider, experiment["urn"], data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - pub_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + pub_score_set = publish_score_set(client, score_set["urn"], score_set_id) change_ownership(session, pub_score_set["urn"], ScoreSetDbModel) change_ownership(session, pub_score_set["experiment"]["urn"], ExperimentDbModel) diff --git a/tests/routers/test_experiments.py b/tests/routers/test_experiments.py index 9767c125..d90bd12a 100644 --- a/tests/routers/test_experiments.py +++ b/tests/routers/test_experiments.py @@ -9,6 +9,7 @@ import pytest import requests import requests_mock +from sqlalchemy import select arq = pytest.importorskip("arq") cdot = pytest.importorskip("cdot") @@ -591,9 +592,10 @@ def test_can_update_own_public_experiment_set(session, data_provider, client, se client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + published_score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) response_data = create_experiment( client, @@ -610,9 +612,10 @@ def test_cannot_update_other_users_public_experiment_set(session, data_provider, client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + published_score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) published_experiment_set_urn = published_score_set["experiment"]["experimentSetUrn"] change_ownership(session, published_experiment_set_urn, ExperimentSetDbModel) @@ -633,9 +636,10 @@ def test_anonymous_cannot_update_others_user_public_experiment_set( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + published_score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) published_experiment_set_urn = published_score_set["experiment"]["experimentSetUrn"] experiment_post_payload = deepcopy(TEST_MINIMAL_EXPERIMENT) @@ -658,9 +662,10 @@ def test_admin_can_update_other_users_public_experiment_set( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + published_score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) with DependencyOverrider(admin_app_overrides): response_data = create_experiment( @@ -1179,9 +1184,10 @@ def test_users_get_one_score_set_to_own_public_experiment(session, data_provider unpublished_score_set = create_seq_score_set_with_variants( client, session, data_provider, experiment["urn"], data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) response = client.get(f"/api/v1/experiments/{score_set['experiment']['urn']}") assert response.status_code == 200 @@ -1197,9 +1203,11 @@ def test_users_get_one_published_score_set_from_other_experiment( unpublished_score_set = create_seq_score_set_with_variants( client, session, data_provider, experiment["urn"], data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) + change_ownership(session, score_set["experiment"]["urn"], ExperimentDbModel) change_ownership(session, score_set["urn"], ScoreSetDbModel) response = client.get(f"/api/v1/experiments/{score_set['experiment']['urn']}") @@ -1216,9 +1224,9 @@ def test_users_get_one_published_score_set_from_others_experiment_with_a_private score_set_1 = create_seq_score_set_with_variants( client, session, data_provider, experiment["urn"], data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - pub_score_set = publish_score_set(client, score_set_1["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_1["urn"])).one() + pub_score_set = publish_score_set(client, score_set_1["urn"], score_set_id) + score_set_2 = create_seq_score_set_with_variants( client, session, data_provider, pub_score_set["experiment"]["urn"], data_files / "scores.csv" ) @@ -1237,9 +1245,8 @@ def test_users_get_two_score_sets_from_own_experiment_with_a_private_and_a_publi score_set_1 = create_seq_score_set_with_variants( client, session, data_provider, experiment["urn"], data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - pub_score_set = publish_score_set(client, score_set_1["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_1["urn"])).one() + pub_score_set = publish_score_set(client, score_set_1["urn"], score_set_id) score_set_2 = create_seq_score_set_with_variants( client, session, data_provider, pub_score_set["experiment"]["urn"], data_files / "scores.csv" ) @@ -1258,9 +1265,8 @@ def test_users_get_one_score_set_from_own_experiment_with_a_superseding_score_se score_set = create_seq_score_set_with_variants( client, session, data_provider, experiment["urn"], data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - pub_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + pub_score_set = publish_score_set(client, score_set["urn"], score_set_id) score_set_post_payload = deepcopy(TEST_MINIMAL_SEQ_SCORESET) score_set_post_payload["experimentUrn"] = pub_score_set["experiment"]["urn"] score_set_post_payload["supersededScoreSetUrn"] = pub_score_set["urn"] @@ -1299,9 +1305,10 @@ def test_search_meta_analysis_experiment(session, data_provider, client, setup_r client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) meta_score_set = create_seq_score_set( client, @@ -1312,9 +1319,8 @@ def test_search_meta_analysis_experiment(session, data_provider, client, setup_r client, session, data_provider, meta_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_meta_score_set = publish_score_set(client, meta_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == meta_score_set["urn"])).one() + published_meta_score_set = publish_score_set(client, meta_score_set["urn"], score_set_id) score_set_refresh = (client.get(f"/api/v1/score-sets/{score_set['urn']}")).json() search_payload = {"metaAnalysis": True} @@ -1332,9 +1338,10 @@ def test_search_exclude_meta_analysis_experiment(session, data_provider, client, client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) meta_score_set = create_seq_score_set( client, @@ -1345,9 +1352,8 @@ def test_search_exclude_meta_analysis_experiment(session, data_provider, client, client, session, data_provider, meta_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - meta_score_set = publish_score_set(client, meta_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == meta_score_set["urn"])).one() + meta_score_set = publish_score_set(client, meta_score_set["urn"], score_set_id) score_set_refresh = (client.get(f"/api/v1/score-sets/{score_set['urn']}")).json() search_payload = {"metaAnalysis": False} @@ -1367,9 +1373,8 @@ def test_search_score_sets_for_experiments(session, client, setup_router_db, dat score_set_unpub = create_seq_score_set(client, experiment["urn"], update={"title": "Unpublished Score Set"}) change_ownership(session, score_set_unpub["urn"], ScoreSetDbModel) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) # On score set publication, the experiment will get a new urn experiment_urn = published_score_set["experiment"]["urn"] @@ -1389,9 +1394,10 @@ def test_owner_searches_score_sets_with_unpublished_superseding_score_sets_for_e client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + published_score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) score_set_post_payload = deepcopy(TEST_MINIMAL_SEQ_SCORESET) score_set_post_payload["experimentUrn"] = published_score_set["experiment"]["urn"] @@ -1417,9 +1423,10 @@ def test_non_owner_searches_score_sets_with_unpublished_superseding_score_sets_f client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + published_score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) score_set_post_payload = deepcopy(TEST_MINIMAL_SEQ_SCORESET) score_set_post_payload["experimentUrn"] = published_score_set["experiment"]["urn"] @@ -1446,9 +1453,10 @@ def test_owner_searches_published_superseding_score_sets_for_experiments( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + published_score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) # On score set publication, the experiment will get a new urn experiment_urn = published_score_set["experiment"]["urn"] @@ -1459,9 +1467,10 @@ def test_owner_searches_published_superseding_score_sets_for_experiments( client, session, data_provider, superseding_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_superseding_score_set = publish_score_set(client, superseding_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == superseding_score_set["urn"]) + ).one() + published_superseding_score_set = publish_score_set(client, superseding_score_set["urn"], score_set_id) response = client.get(f"/api/v1/experiments/{experiment_urn}/score-sets") assert response.status_code == 200 @@ -1478,9 +1487,10 @@ def test_non_owner_searches_published_superseding_score_sets_for_experiments( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + published_score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) # On score set publication, the experiment will get a new urn experiment_urn = published_score_set["experiment"]["urn"] @@ -1491,9 +1501,10 @@ def test_non_owner_searches_published_superseding_score_sets_for_experiments( client, session, data_provider, superseding_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_superseding_score_set = publish_score_set(client, superseding_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == superseding_score_set["urn"]) + ).one() + published_superseding_score_set = publish_score_set(client, superseding_score_set["urn"], score_set_id) change_ownership(session, published_score_set["urn"], ScoreSetDbModel) change_ownership(session, published_superseding_score_set["urn"], ScoreSetDbModel) @@ -1521,9 +1532,8 @@ def test_search_score_sets_for_contributor_experiments(session, client, setup_ro TEST_USER["last_name"], ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) # On score set publication, the experiment will get a new urn experiment_urn = published_score_set["experiment"]["urn"] @@ -1542,9 +1552,8 @@ def test_search_score_sets_for_my_experiments(session, client, setup_router_db, # The unpublished score set is for the current user, so it should show up in results. score_set_unpub = create_seq_score_set(client, experiment["urn"], update={"title": "Unpublished Score Set"}) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) # On score set publication, the experiment will get a new urn experiment_urn = published_score_set["experiment"]["urn"] @@ -1615,9 +1624,10 @@ def test_anonymous_cannot_delete_other_users_published_experiment( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) experiment_urn = score_set["experiment"]["urn"] with DependencyOverrider(anonymous_app_overrides): @@ -1642,9 +1652,10 @@ def test_cannot_delete_own_published_experiment(session, data_provider, client, client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) experiment_urn = score_set["experiment"]["urn"] del_response = client.delete(f"/api/v1/experiments/{experiment_urn}") @@ -1686,9 +1697,10 @@ def test_contributor_cannot_delete_other_users_published_experiment( unpublished_score_set = mock_worker_variant_insertion( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) experiment_urn = score_set["experiment"]["urn"] change_ownership(session, experiment_urn, ExperimentDbModel) @@ -1714,9 +1726,10 @@ def test_admin_can_delete_other_users_published_experiment( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) experiment_urn = score_set["experiment"]["urn"] with DependencyOverrider(admin_app_overrides): @@ -1740,9 +1753,10 @@ def test_can_add_experiment_to_own_public_experiment_set(session, data_provider, client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + published_score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) test_experiment = deepcopy(TEST_MINIMAL_EXPERIMENT) test_experiment.update({"experimentSetUrn": published_score_set["experiment"]["experimentSetUrn"]}) @@ -1777,9 +1791,10 @@ def test_contributor_can_add_experiment_to_others_public_experiment_set( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + published_score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) change_ownership(session, published_score_set["urn"], ScoreSetDbModel) change_ownership(session, published_score_set["experiment"]["urn"], ExperimentDbModel) @@ -1820,9 +1835,10 @@ def test_cannot_add_experiment_to_others_public_experiment_set( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + published_score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) experiment_set_urn = published_score_set["experiment"]["experimentSetUrn"] change_ownership(session, published_score_set["urn"], ScoreSetDbModel) diff --git a/tests/routers/test_permissions.py b/tests/routers/test_permissions.py index 74405a47..59769361 100644 --- a/tests/routers/test_permissions.py +++ b/tests/routers/test_permissions.py @@ -1,13 +1,12 @@ # ruff: noqa: E402 import pytest +from sqlalchemy import select arq = pytest.importorskip("arq") cdot = pytest.importorskip("cdot") fastapi = pytest.importorskip("fastapi") -from unittest.mock import patch - from mavedb.lib.permissions import Action from mavedb.models.experiment import Experiment as ExperimentDbModel from mavedb.models.experiment_set import ExperimentSet as ExperimentSetDbModel @@ -186,9 +185,10 @@ def test_get_true_permission_from_others_public_experiment_add_score_set_check( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + published_score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) published_experiment_urn = published_score_set["experiment"]["urn"] change_ownership(session, published_experiment_urn, ExperimentDbModel) diff --git a/tests/routers/test_score_calibrations.py b/tests/routers/test_score_calibrations.py index 307394ec..30ad6e53 100644 --- a/tests/routers/test_score_calibrations.py +++ b/tests/routers/test_score_calibrations.py @@ -6,13 +6,18 @@ cdot = pytest.importorskip("cdot") fastapi = pytest.importorskip("fastapi") -from unittest.mock import patch - -from arq import ArqRedis from sqlalchemy import select from mavedb.models.score_calibration import ScoreCalibration as CalibrationDbModel from mavedb.models.score_set import ScoreSet as ScoreSetDbModel +from tests.helpers.constants import ( + EXTRA_USER, + TEST_BIORXIV_IDENTIFIER, + TEST_BRNICH_SCORE_CALIBRATION, + TEST_PATHOGENICITY_SCORE_CALIBRATION, + TEST_PUBMED_IDENTIFIER, + VALID_CALIBRATION_URN, +) from tests.helpers.dependency_overrider import DependencyOverrider from tests.helpers.util.common import deepcamelize from tests.helpers.util.contributor import add_contributor @@ -24,15 +29,6 @@ ) from tests.helpers.util.score_set import create_seq_score_set_with_mapped_variants, publish_score_set -from tests.helpers.constants import ( - EXTRA_USER, - TEST_BIORXIV_IDENTIFIER, - TEST_BRNICH_SCORE_CALIBRATION, - TEST_PATHOGENICITY_SCORE_CALIBRATION, - TEST_PUBMED_IDENTIFIER, - VALID_CALIBRATION_URN, -) - ########################################################### # GET /score-calibrations/{calibration_urn} ########################################################### @@ -581,8 +577,8 @@ def test_anonymous_user_cannot_get_score_calibrations_for_score_set_when_publish client, score_set["urn"], deepcamelize(TEST_BRNICH_SCORE_CALIBRATION) ) - with patch.object(ArqRedis, "enqueue_job", return_value=None): - score_set = publish_score_set(client, score_set["urn"]) + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + score_set = publish_score_set(client, score_set["urn"], score_set_id) with DependencyOverrider(anonymous_app_overrides): response = client.get(f"/api/v1/score-calibrations/score-set/{score_set['urn']}") @@ -617,8 +613,8 @@ def test_other_user_cannot_get_score_calibrations_for_score_set_when_published_b client, score_set["urn"], deepcamelize(TEST_BRNICH_SCORE_CALIBRATION) ) - with patch.object(ArqRedis, "enqueue_job", return_value=None): - score_set = publish_score_set(client, score_set["urn"]) + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + score_set = publish_score_set(client, score_set["urn"], score_set_id) with DependencyOverrider(extra_user_app_overrides): response = client.get(f"/api/v1/score-calibrations/score-set/{score_set['urn']}") @@ -786,8 +782,8 @@ def test_anonymous_user_can_get_score_calibrations_for_score_set_when_public( publish_test_score_calibration_via_client(client, calibration["urn"]) - with patch.object(ArqRedis, "enqueue_job", return_value=None): - score_set = publish_score_set(client, score_set["urn"]) + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + score_set = publish_score_set(client, score_set["urn"], score_set_id) with DependencyOverrider(anonymous_app_overrides): response = client.get(f"/api/v1/score-calibrations/score-set/{score_set['urn']}") @@ -831,8 +827,8 @@ def test_other_user_can_get_score_calibrations_for_score_set_when_public( publish_test_score_calibration_via_client(client, calibration["urn"]) - with patch.object(ArqRedis, "enqueue_job", return_value=None): - score_set = publish_score_set(client, score_set["urn"]) + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + score_set = publish_score_set(client, score_set["urn"], score_set_id) with DependencyOverrider(extra_user_app_overrides): response = client.get(f"/api/v1/score-calibrations/score-set/{score_set['urn']}") @@ -1289,8 +1285,8 @@ def test_cannot_create_score_calibration_in_public_score_set_when_score_set_not_ data_files / "scores.csv", ) - with patch.object(ArqRedis, "enqueue_job", return_value=None): - score_set = publish_score_set(client, score_set["urn"]) + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + score_set = publish_score_set(client, score_set["urn"], score_set_id) with DependencyOverrider(extra_user_app_overrides): response = client.post( @@ -1642,8 +1638,8 @@ def test_cannot_update_score_calibration_in_published_score_set_when_score_set_n client, score_set["urn"], deepcamelize(TEST_BRNICH_SCORE_CALIBRATION) ) - with patch.object(ArqRedis, "enqueue_job", return_value=None): - score_set = publish_score_set(client, score_set["urn"]) + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + score_set = publish_score_set(client, score_set["urn"], score_set_id) with DependencyOverrider(extra_user_app_overrides): response = client.put( @@ -2082,9 +2078,10 @@ def test_user_may_move_investigator_calibration_when_has_permissions_on_destinat EXTRA_USER["last_name"], ) - with patch.object(ArqRedis, "enqueue_job", return_value=None): - score_set1 = publish_score_set(client, score_set1["urn"]) - score_set2 = publish_score_set(client, score_set2["urn"]) + score_set_id1 = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set1["urn"])).one() + score_set1 = publish_score_set(client, score_set1["urn"], score_set_id1) + score_set_id2 = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set2["urn"])).one() + score_set2 = publish_score_set(client, score_set2["urn"], score_set_id2) with DependencyOverrider(extra_user_app_overrides): response = client.put( diff --git a/tests/routers/test_score_set.py b/tests/routers/test_score_set.py index 86234392..de75bb61 100644 --- a/tests/routers/test_score_set.py +++ b/tests/routers/test_score_set.py @@ -509,9 +509,8 @@ def test_can_update_score_set_supporting_data_after_publication( score_set = create_seq_score_set(client, experiment["urn"]) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) published_urn = published_score_set["urn"] response = client.get(f"/api/v1/score-sets/{published_urn}") @@ -581,9 +580,8 @@ def test_cannot_update_score_set_target_data_after_publication( score_set = create_seq_score_set(client, experiment["urn"]) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) published_urn = published_score_set["urn"] response = client.get(f"/api/v1/score-sets/{published_urn}") @@ -810,9 +808,8 @@ def test_extra_user_can_only_view_published_score_calibrations_in_score_set( score_set = create_seq_score_set(client, experiment["urn"]) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) create_test_score_calibration_in_score_set_via_client( client, published_score_set["urn"], deepcamelize(TEST_BRNICH_SCORE_CALIBRATION) @@ -1253,9 +1250,8 @@ def test_publish_score_set(session, data_provider, client, setup_router_db, data score_set = create_seq_score_set(client, experiment["urn"]) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) assert isinstance(MAVEDB_SCORE_SET_URN_RE.fullmatch(published_score_set["urn"]), re.Match) assert isinstance(MAVEDB_EXPERIMENT_URN_RE.fullmatch(published_score_set["experiment"]["urn"]), re.Match) @@ -1296,11 +1292,12 @@ def test_publish_multiple_score_sets(session, data_provider, client, setup_route score_set_3 = create_seq_score_set(client, experiment["urn"]) score_set_3 = mock_worker_variant_insertion(client, session, data_provider, score_set_3, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - pub_score_set_1_data = publish_score_set(client, score_set_1["urn"]) - pub_score_set_2_data = publish_score_set(client, score_set_2["urn"]) - pub_score_set_3_data = publish_score_set(client, score_set_3["urn"]) - worker_queue.assert_called() + score_set_id1 = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_1["urn"])).one() + score_set_id2 = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_2["urn"])).one() + score_set_id3 = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_3["urn"])).one() + pub_score_set_1_data = publish_score_set(client, score_set_1["urn"], score_set_id1) + pub_score_set_2_data = publish_score_set(client, score_set_2["urn"], score_set_id2) + pub_score_set_3_data = publish_score_set(client, score_set_3["urn"], score_set_id3) assert pub_score_set_1_data["urn"] == "urn:mavedb:00000001-a-1" assert pub_score_set_1_data["title"] == score_set_1["title"] @@ -1349,9 +1346,8 @@ def test_score_calibrations_remain_private_when_score_set_is_published( client, score_set["urn"], deepcamelize(TEST_BRNICH_SCORE_CALIBRATION) ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) # refresh score set to post worker state score_set = (client.get(f"/api/v1/score-sets/{published_score_set['urn']}")).json() @@ -1422,9 +1418,8 @@ def test_contributor_can_publish_other_users_score_set(session, data_provider, c TEST_USER["last_name"], ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) assert published_score_set["urn"] == "urn:mavedb:00000001-a-1" assert published_score_set["experiment"]["urn"] == "urn:mavedb:00000001-a" @@ -1505,9 +1500,8 @@ def test_create_single_score_set_meta_analysis(session, data_provider, client, s score_set = create_seq_score_set(client, experiment["urn"]) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) meta_score_set = create_seq_score_set( client, @@ -1529,9 +1523,8 @@ def test_publish_single_score_set_meta_analysis(session, data_provider, client, score_set = create_seq_score_set(client, experiment["urn"]) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + score_set = publish_score_set(client, score_set["urn"], score_set_id) meta_score_set = create_seq_score_set( client, @@ -1542,9 +1535,10 @@ def test_publish_single_score_set_meta_analysis(session, data_provider, client, client, session, data_provider, meta_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - meta_score_set = publish_score_set(client, meta_score_set["urn"]) - worker_queue.assert_called_once() + meta_score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == meta_score_set["urn"]) + ).one() + meta_score_set = publish_score_set(client, meta_score_set["urn"], meta_score_set_id) assert isinstance(MAVEDB_SCORE_SET_URN_RE.fullmatch(meta_score_set["urn"]), re.Match) assert meta_score_set["urn"] == "urn:mavedb:00000001-0-1" @@ -1559,10 +1553,10 @@ def test_multiple_score_set_meta_analysis_single_experiment( score_set_2 = create_seq_score_set(client, experiment["urn"], update={"title": "Score Set 2"}) score_set_2 = mock_worker_variant_insertion(client, session, data_provider, score_set_2, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set_1 = publish_score_set(client, score_set_1["urn"]) - published_score_set_2 = publish_score_set(client, score_set_2["urn"]) - worker_queue.assert_called() + score_set_1_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_1["urn"])).one() + score_set_2_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_2["urn"])).one() + published_score_set_1 = publish_score_set(client, score_set_1["urn"], score_set_1_id) + published_score_set_2 = publish_score_set(client, score_set_2["urn"], score_set_2_id) meta_score_set = create_seq_score_set( client, @@ -1582,9 +1576,10 @@ def test_multiple_score_set_meta_analysis_single_experiment( ) assert published_score_set_1_refresh["metaAnalyzedByScoreSetUrns"] == [meta_score_set["urn"]] - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_meta_score_set = publish_score_set(client, meta_score_set["urn"]) - worker_queue.assert_called_once() + meta_score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == meta_score_set["urn"]) + ).one() + published_meta_score_set = publish_score_set(client, meta_score_set["urn"], meta_score_set_id) assert isinstance(MAVEDB_SCORE_SET_URN_RE.fullmatch(published_meta_score_set["urn"]), re.Match) assert published_meta_score_set["urn"] == "urn:mavedb:00000001-0-1" @@ -1600,10 +1595,10 @@ def test_multiple_score_set_meta_analysis_multiple_experiment_sets( score_set_2 = create_seq_score_set(client, experiment_2["urn"], update={"title": "Score Set 2"}) score_set_2 = mock_worker_variant_insertion(client, session, data_provider, score_set_2, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set_1 = publish_score_set(client, score_set_1["urn"]) - published_score_set_2 = publish_score_set(client, score_set_2["urn"]) - worker_queue.assert_called() + score_set_1_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_1["urn"])).one() + score_set_2_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_2["urn"])).one() + published_score_set_1 = publish_score_set(client, score_set_1["urn"], score_set_1_id) + published_score_set_2 = publish_score_set(client, score_set_2["urn"], score_set_2_id) meta_score_set = create_seq_score_set( client, @@ -1622,9 +1617,10 @@ def test_multiple_score_set_meta_analysis_multiple_experiment_sets( ) assert published_score_set_1_refresh["metaAnalyzedByScoreSetUrns"] == [meta_score_set["urn"]] - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_meta_score_set = publish_score_set(client, meta_score_set["urn"]) - worker_queue.assert_called_once() + meta_score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == meta_score_set["urn"]) + ).one() + published_meta_score_set = publish_score_set(client, meta_score_set["urn"], meta_score_set_id) assert isinstance(MAVEDB_SCORE_SET_URN_RE.fullmatch(published_meta_score_set["urn"]), re.Match) assert published_meta_score_set["urn"] == "urn:mavedb:00000003-0-1" @@ -1642,10 +1638,10 @@ def test_multiple_score_set_meta_analysis_multiple_experiments( score_set_2 = create_seq_score_set(client, experiment_2["urn"], update={"title": "Score Set 2"}) score_set_2 = mock_worker_variant_insertion(client, session, data_provider, score_set_2, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set_1 = publish_score_set(client, score_set_1["urn"]) - published_score_set_2 = publish_score_set(client, score_set_2["urn"]) - worker_queue.assert_called() + score_set_1_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_1["urn"])).one() + score_set_2_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_2["urn"])).one() + published_score_set_1 = publish_score_set(client, score_set_1["urn"], score_set_1_id) + published_score_set_2 = publish_score_set(client, score_set_2["urn"], score_set_2_id) meta_score_set = create_seq_score_set( client, @@ -1664,9 +1660,10 @@ def test_multiple_score_set_meta_analysis_multiple_experiments( ) assert published_score_set_1_refresh["metaAnalyzedByScoreSetUrns"] == [meta_score_set["urn"]] - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_meta_score_set = publish_score_set(client, meta_score_set["urn"]) - worker_queue.assert_called_once() + meta_score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == meta_score_set["urn"]) + ).one() + published_meta_score_set = publish_score_set(client, meta_score_set["urn"], meta_score_set_id) assert isinstance(MAVEDB_SCORE_SET_URN_RE.fullmatch(published_meta_score_set["urn"]), re.Match) assert published_meta_score_set["urn"] == "urn:mavedb:00000001-0-1" @@ -1695,12 +1692,22 @@ def test_multiple_score_set_meta_analysis_multiple_experiment_sets_different_sco client, session, data_provider, score_set_2_2, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set_1_1 = publish_score_set(client, score_set_1_1["urn"]) - published_score_set_1_2 = publish_score_set(client, score_set_1_2["urn"]) - published_score_set_2_1 = publish_score_set(client, score_set_2_1["urn"]) - published_score_set_2_2 = publish_score_set(client, score_set_2_2["urn"]) - worker_queue.assert_called() + score_set_1_1_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_1_1["urn"]) + ).one() + score_set_1_2_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_1_2["urn"]) + ).one() + score_set_2_1_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_2_1["urn"]) + ).one() + score_set_2_2_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_2_2["urn"]) + ).one() + published_score_set_1_1 = publish_score_set(client, score_set_1_1["urn"], score_set_1_1_id) + published_score_set_1_2 = publish_score_set(client, score_set_1_2["urn"], score_set_1_2_id) + published_score_set_2_1 = publish_score_set(client, score_set_2_1["urn"], score_set_2_1_id) + published_score_set_2_2 = publish_score_set(client, score_set_2_2["urn"], score_set_2_2_id) meta_score_set_1 = create_seq_score_set( client, @@ -1749,11 +1756,18 @@ def test_multiple_score_set_meta_analysis_multiple_experiment_sets_different_sco client, session, data_provider, meta_score_set_3, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_meta_score_set_1 = publish_score_set(client, meta_score_set_1["urn"]) - published_meta_score_set_2 = publish_score_set(client, meta_score_set_2["urn"]) - published_meta_score_set_3 = publish_score_set(client, meta_score_set_3["urn"]) - worker_queue.assert_called() + published_meta_score_set_1_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == meta_score_set_1["urn"]) + ).one() + published_meta_score_set_2_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == meta_score_set_2["urn"]) + ).one() + published_meta_score_set_3_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == meta_score_set_3["urn"]) + ).one() + published_meta_score_set_1 = publish_score_set(client, meta_score_set_1["urn"], published_meta_score_set_1_id) + published_meta_score_set_2 = publish_score_set(client, meta_score_set_2["urn"], published_meta_score_set_2_id) + published_meta_score_set_3 = publish_score_set(client, meta_score_set_3["urn"], published_meta_score_set_3_id) assert isinstance(MAVEDB_SCORE_SET_URN_RE.fullmatch(published_meta_score_set_1["urn"]), re.Match) assert isinstance(MAVEDB_SCORE_SET_URN_RE.fullmatch(published_meta_score_set_2["urn"]), re.Match) @@ -1768,9 +1782,8 @@ def test_cannot_add_score_set_to_meta_analysis_experiment(session, data_provider score_set_1 = create_seq_score_set(client, experiment["urn"], update={"title": "Score Set 1"}) score_set_1 = mock_worker_variant_insertion(client, session, data_provider, score_set_1, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set_1 = publish_score_set(client, score_set_1["urn"]) - worker_queue.assert_called() + score_set_1_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_1["urn"])).one() + published_score_set_1 = publish_score_set(client, score_set_1["urn"], score_set_1_id) meta_score_set_1 = create_seq_score_set( client, @@ -1781,9 +1794,10 @@ def test_cannot_add_score_set_to_meta_analysis_experiment(session, data_provider client, session, data_provider, meta_score_set_1, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - meta_score_set_1 = publish_score_set(client, meta_score_set_1["urn"]) - worker_queue.assert_called() + meta_score_set_1_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == meta_score_set_1["urn"]) + ).one() + meta_score_set_1 = publish_score_set(client, meta_score_set_1["urn"], meta_score_set_1_id) assert isinstance(MAVEDB_SCORE_SET_URN_RE.fullmatch(meta_score_set_1["urn"]), re.Match) assert meta_score_set_1["urn"] == "urn:mavedb:00000001-0-1" @@ -1805,9 +1819,8 @@ def test_create_single_score_set_meta_analysis_to_others_score_set( score_set = create_seq_score_set(client, experiment["urn"]) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) change_ownership(session, published_score_set["urn"], ScoreSetDbModel) @@ -1835,10 +1848,10 @@ def test_multiple_score_set_meta_analysis_single_experiment_with_different_creat score_set_2 = create_seq_score_set(client, experiment["urn"], update={"title": "Score Set 2"}) score_set_2 = mock_worker_variant_insertion(client, session, data_provider, score_set_2, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set_1 = publish_score_set(client, score_set_1["urn"]) - published_score_set_2 = publish_score_set(client, score_set_2["urn"]) - worker_queue.assert_called() + score_set_1_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_1["urn"])).one() + published_score_set_1 = publish_score_set(client, score_set_1["urn"], score_set_1_id) + score_set_2_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_2["urn"])).one() + published_score_set_2 = publish_score_set(client, score_set_2["urn"], score_set_2_id) change_ownership(session, published_score_set_2["urn"], ScoreSetDbModel) meta_score_set = create_seq_score_set( @@ -1859,9 +1872,10 @@ def test_multiple_score_set_meta_analysis_single_experiment_with_different_creat ) assert published_score_set_1_refresh["metaAnalyzedByScoreSetUrns"] == [meta_score_set["urn"]] - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - meta_score_set = publish_score_set(client, meta_score_set["urn"]) - worker_queue.assert_called() + meta_score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == meta_score_set["urn"]) + ).one() + meta_score_set = publish_score_set(client, meta_score_set["urn"], meta_score_set_id) assert meta_score_set["urn"] == "urn:mavedb:00000001-0-1" assert isinstance(MAVEDB_SCORE_SET_URN_RE.fullmatch(meta_score_set["urn"]), re.Match) @@ -1877,10 +1891,10 @@ def test_multiple_score_set_meta_analysis_multiple_experiment_sets_with_differen score_set_2 = create_seq_score_set(client, experiment_2["urn"], update={"title": "Score Set 2"}) score_set_2 = mock_worker_variant_insertion(client, session, data_provider, score_set_2, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set_1 = publish_score_set(client, score_set_1["urn"]) - published_score_set_2 = publish_score_set(client, score_set_2["urn"]) - worker_queue.assert_called() + score_set_1_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_1["urn"])).one() + published_score_set_1 = publish_score_set(client, score_set_1["urn"], score_set_1_id) + score_set_2_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_2["urn"])).one() + published_score_set_2 = publish_score_set(client, score_set_2["urn"], score_set_2_id) change_ownership(session, published_score_set_2["urn"], ScoreSetDbModel) meta_score_set = create_seq_score_set( @@ -1901,9 +1915,10 @@ def test_multiple_score_set_meta_analysis_multiple_experiment_sets_with_differen ) assert published_score_set_1_refresh["metaAnalyzedByScoreSetUrns"] == [meta_score_set["urn"]] - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_meta_score_set = publish_score_set(client, meta_score_set["urn"]) - worker_queue.assert_called() + meta_score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == meta_score_set["urn"]) + ).one() + published_meta_score_set = publish_score_set(client, meta_score_set["urn"], meta_score_set_id) assert published_meta_score_set["urn"] == "urn:mavedb:00000003-0-1" assert isinstance(MAVEDB_SCORE_SET_URN_RE.fullmatch(published_meta_score_set["urn"]), re.Match) @@ -2028,9 +2043,8 @@ def test_search_public_score_sets_no_match(session, data_provider, client, setup score_set = create_seq_score_set(client, experiment["urn"], update={"title": "Score Set 1"}) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + publish_score_set(client, score_set["urn"], score_set_id) search_payload = {"text": "fnord"} response = client.post("/api/v1/score-sets/search", json=search_payload) @@ -2044,9 +2058,8 @@ def test_search_public_score_sets_match(session, data_provider, client, setup_ro score_set = create_seq_score_set(client, experiment["urn"], update={"title": "Test Fnord Score Set"}) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + publish_score_set(client, score_set["urn"], score_set_id) search_payload = {"text": "fnord"} response = client.post("/api/v1/score-sets/search", json=search_payload) @@ -2063,9 +2076,8 @@ def test_cannot_search_public_score_sets_with_published_false( score_set = create_seq_score_set(client, experiment["urn"], update={"title": "Test Fnord Score Set"}) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + publish_score_set(client, score_set["urn"], score_set_id) search_payload = {"text": "fnord", "published": "false"} response = client.post("/api/v1/score-sets/search", json=search_payload) @@ -2082,9 +2094,8 @@ def test_search_public_score_sets_invalid_limit(session, data_provider, client, score_set = create_seq_score_set(client, experiment["urn"], update={"title": "Test Fnord Score Set"}) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + publish_score_set(client, score_set["urn"], score_set_id) search_payload = {"text": "fnord", "limit": 101} response = client.post("/api/v1/score-sets/search", json=search_payload) @@ -2101,9 +2112,8 @@ def test_search_public_score_sets_valid_limit(session, data_provider, client, se score_set = create_seq_score_set(client, experiment["urn"], update={"title": "Test Fnord Score Set"}) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + publish_score_set(client, score_set["urn"], score_set_id) search_payload = {"text": "fnord", "limit": 100} response = client.post("/api/v1/score-sets/search", json=search_payload) @@ -2120,9 +2130,8 @@ def test_search_public_score_sets_too_many_publication_identifiers( score_set = create_seq_score_set(client, experiment["urn"], update={"title": "Test Fnord Score Set"}) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + publish_score_set(client, score_set["urn"], score_set_id) publication_identifier_search = [str(20711194 + i) for i in range(41)] search_payload = {"text": "fnord", "publication_identifiers": publication_identifier_search} @@ -2140,9 +2149,8 @@ def test_search_public_score_sets_urn_with_space_match(session, data_provider, c score_set = create_seq_score_set(client, experiment["urn"], update={"title": "Score Set 1"}) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) urn_with_space = published_score_set["urn"] + " " search_payload = {"urn": urn_with_space} @@ -2158,9 +2166,8 @@ def test_search_others_public_score_sets_no_match(session, data_provider, client score_set = create_seq_score_set(client, experiment["urn"], update={"title": "Score Set 1"}) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) change_ownership(session, published_score_set["urn"], ScoreSetDbModel) @@ -2176,9 +2183,8 @@ def test_search_others_public_score_sets_match(session, data_provider, client, s score_set = create_seq_score_set(client, experiment["urn"], update={"title": "Test Fnord Score Set"}) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) change_ownership(session, published_score_set["urn"], ScoreSetDbModel) assert session.query(ScoreSetDbModel).filter_by(urn=published_score_set["urn"]).one() @@ -2196,9 +2202,8 @@ def test_search_others_public_score_sets_urn_match(session, data_provider, clien score_set = create_seq_score_set(client, experiment["urn"], update={"title": "Score Set 1"}) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) change_ownership(session, published_score_set["urn"], ScoreSetDbModel) search_payload = {"urn": score_set["urn"]} @@ -2216,9 +2221,8 @@ def test_search_others_public_score_sets_urn_with_space_match( score_set = create_seq_score_set(client, experiment["urn"], update={"title": "Score Set 1"}) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) change_ownership(session, published_score_set["urn"], ScoreSetDbModel) urn_with_space = published_score_set["urn"] + " " @@ -2237,9 +2241,8 @@ def test_cannot_search_private_score_sets(session, data_provider, client, setup_ score_set_2 = create_seq_score_set(client, experiment["urn"], update={"title": "Score Set 2"}) score_set_2 = mock_worker_variant_insertion(client, session, data_provider, score_set_2, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - publish_score_set(client, score_set_1["urn"]) - worker_queue.assert_called_once() + score_set_1_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_1["urn"])).one() + publish_score_set(client, score_set_1["urn"], score_set_1_id) search_payload = {"published": False} response = client.post("/api/v1/score-sets/search", json=search_payload) @@ -2261,9 +2264,8 @@ def test_search_public_score_sets_not_showing_private_score_set( score_set_2 = create_seq_score_set(client, experiment["urn"], update={"title": "Score Set 2"}) score_set_2 = mock_worker_variant_insertion(client, session, data_provider, score_set_2, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set_1 = publish_score_set(client, score_set_1["urn"]) - worker_queue.assert_called_once() + score_set_1_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_1["urn"])).one() + published_score_set_1 = publish_score_set(client, score_set_1["urn"], score_set_1_id) search_payload = {"published": True} response = client.post("/api/v1/score-sets/search", json=search_payload) @@ -2299,9 +2301,8 @@ def test_anonymous_cannot_delete_other_users_published_scoreset( score_set = create_seq_score_set(client, experiment["urn"]) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) with DependencyOverrider(anonymous_app_overrides): del_response = client.delete(f"/api/v1/score-sets/{published_score_set['urn']}") @@ -2326,9 +2327,8 @@ def test_cannot_delete_own_published_scoreset(session, data_provider, client, se score_set = create_seq_score_set(client, experiment["urn"]) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) del_response = client.delete(f"/api/v1/score-sets/{published_score_set['urn']}") @@ -2378,9 +2378,8 @@ def test_admin_can_delete_other_users_published_scoreset( score_set = create_seq_score_set(client, experiment["urn"]) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) with DependencyOverrider(admin_app_overrides): del_response = client.delete(f"/api/v1/score-sets/{published_score_set['urn']}") @@ -2417,9 +2416,8 @@ def test_can_add_score_set_to_own_public_experiment(session, data_provider, clie score_set_1 = create_seq_score_set(client, experiment["urn"]) score_set_1 = mock_worker_variant_insertion(client, session, data_provider, score_set_1, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set_1 = publish_score_set(client, score_set_1["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_1["urn"])).one() + published_score_set_1 = publish_score_set(client, score_set_1["urn"], score_set_id) score_set_2 = deepcopy(TEST_MINIMAL_SEQ_SCORESET) score_set_2["experimentUrn"] = published_score_set_1["experiment"]["urn"] @@ -2432,9 +2430,8 @@ def test_can_add_score_set_to_others_public_experiment(session, data_provider, c score_set_1 = create_seq_score_set(client, experiment["urn"]) score_set_1 = mock_worker_variant_insertion(client, session, data_provider, score_set_1, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set_1["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set_1["urn"])).one() + published_score_set = publish_score_set(client, score_set_1["urn"], score_set_id) published_experiment_urn = published_score_set["experiment"]["urn"] change_ownership(session, published_experiment_urn, ExperimentDbModel) @@ -2468,9 +2465,8 @@ def test_contributor_can_add_score_set_to_others_public_experiment( score_set = create_seq_score_set(client, experiment["urn"]) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) published_experiment_urn = published_score_set["experiment"]["urn"] change_ownership(session, published_experiment_urn, ExperimentDbModel) @@ -2528,9 +2524,8 @@ def test_create_superseding_score_set(session, data_provider, client, setup_rout score_set = create_seq_score_set(client, experiment["urn"]) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) score_set_post_payload = deepcopy(TEST_MINIMAL_SEQ_SCORESET) score_set_post_payload["experimentUrn"] = published_score_set["experiment"]["urn"] @@ -2546,9 +2541,10 @@ def test_can_view_unpublished_superseding_score_set(session, data_provider, clie client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + published_score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) score_set_post_payload = deepcopy(TEST_MINIMAL_SEQ_SCORESET) score_set_post_payload["experimentUrn"] = published_score_set["experiment"]["urn"] @@ -2571,9 +2567,10 @@ def test_cannot_view_others_unpublished_superseding_score_set( unpublished_score_set = mock_worker_variant_insertion( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + published_score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) score_set_post_payload = deepcopy(TEST_MINIMAL_SEQ_SCORESET) score_set_post_payload["experimentUrn"] = published_score_set["experiment"]["urn"] @@ -2597,9 +2594,10 @@ def test_can_view_others_published_superseding_score_set(session, data_provider, client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + published_score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) superseding_score_set = create_seq_score_set( client, published_score_set["experiment"]["urn"], update={"supersededScoreSetUrn": published_score_set["urn"]} @@ -2607,9 +2605,10 @@ def test_can_view_others_published_superseding_score_set(session, data_provider, superseding_score_set = mock_worker_variant_insertion( client, session, data_provider, superseding_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_superseding_score_set = publish_score_set(client, superseding_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == superseding_score_set["urn"]) + ).one() + published_superseding_score_set = publish_score_set(client, superseding_score_set["urn"], score_set_id) change_ownership(session, published_superseding_score_set["urn"], ScoreSetDbModel) @@ -2630,9 +2629,10 @@ def test_show_correct_score_set_version_with_superseded_score_set_to_its_owner( unpublished_score_set = mock_worker_variant_insertion( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + published_score_set = publish_score_set(client, unpublished_score_set["urn"], score_set_id) score_set_post_payload = deepcopy(TEST_MINIMAL_SEQ_SCORESET) score_set_post_payload["experimentUrn"] = published_score_set["experiment"]["urn"] @@ -2692,9 +2692,8 @@ def test_download_variants_data_file( if mapped_variant is not None: create_mapped_variants_for_score_set(session, score_set["urn"], mapped_variant) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) download_scores_csv_response = client.get( f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?drop_na_columns=true&include_post_mapped_hgvs=true" @@ -2741,9 +2740,8 @@ def test_download_scores_file(session, data_provider, client, setup_router_db, d score_set = create_seq_score_set(client, experiment["urn"]) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) download_scores_csv_response = client.get( f"/api/v1/score-sets/{published_score_set['urn']}/scores?drop_na_columns=true" @@ -2763,9 +2761,8 @@ def test_download_counts_file(session, data_provider, client, setup_router_db, d score_set = mock_worker_variant_insertion( client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) download_counts_csv_response = client.get( f"/api/v1/score-sets/{published_score_set['urn']}/counts?drop_na_columns=true" @@ -2786,9 +2783,8 @@ def test_download_scores_file_in_variant_data_path(session, data_provider, clien score_set = mock_worker_variant_insertion( client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) download_scores_csv_response = client.get( f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=scores&drop_na_columns=true" @@ -2809,9 +2805,8 @@ def test_download_counts_file_in_variant_data_path(session, data_provider, clien score_set = mock_worker_variant_insertion( client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) download_counts_csv_response = client.get( f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=counts&include_custom_columns=true&drop_na_columns=true" @@ -2833,9 +2828,8 @@ def test_download_scores_and_counts_file(session, data_provider, client, setup_r score_set = mock_worker_variant_insertion( client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) download_scores_and_counts_csv_response = client.get( f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=counts&namespaces=scores&include_custom_columns=true&drop_na_columns=true" @@ -2868,9 +2862,8 @@ def test_download_scores_counts_and_post_mapped_variants_file( if mapped_variant is not None: create_mapped_variants_for_score_set(session, score_set["urn"], mapped_variant) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) download_multiple_data_csv_response = client.get( f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=scores&namespaces=counts&include_custom_columns=true&include_post_mapped_hgvs=true&drop_na_columns=true" @@ -3045,15 +3038,11 @@ def test_cannot_get_annotated_variants_for_score_set_with_no_mapped_variants( client, session, data_provider, experiment["urn"], data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as queue: - publish_score_set_response = client.post(f"/api/v1/score-sets/{score_set['urn']}/publish") - assert publish_score_set_response.status_code == 200 - queue.assert_called_once() - - publish_score_set = publish_score_set_response.json() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) download_scores_csv_response = client.get( - f"/api/v1/score-sets/{publish_score_set['urn']}/scores?drop_na_columns=true" + f"/api/v1/score-sets/{published_score_set['urn']}/scores?drop_na_columns=true" ) assert download_scores_csv_response.status_code == 200 download_scores_csv = download_scores_csv_response.text @@ -3063,12 +3052,12 @@ def test_cannot_get_annotated_variants_for_score_set_with_no_mapped_variants( assert "hgvs_pro" in columns assert "hgvs_splice" not in columns - response = client.get(f"/api/v1/score-sets/{publish_score_set['urn']}/annotated-variants/{annotation_type}") + response = client.get(f"/api/v1/score-sets/{published_score_set['urn']}/annotated-variants/{annotation_type}") response_data = response.json() assert response.status_code == 404 assert ( - f"No mapped variants associated with score set URN {publish_score_set['urn']} were found" + f"No mapped variants associated with score set URN {published_score_set['urn']} were found" in response_data["detail"] ) diff --git a/tests/routers/test_statistics.py b/tests/routers/test_statistics.py index 69be6ffb..3024defc 100644 --- a/tests/routers/test_statistics.py +++ b/tests/routers/test_statistics.py @@ -1,29 +1,31 @@ # ruff: noqa: E402 +from unittest.mock import patch + import pytest from humps import camelize -from unittest.mock import patch +from sqlalchemy import select arq = pytest.importorskip("arq") cdot = pytest.importorskip("cdot") fastapi = pytest.importorskip("fastapi") from mavedb.models.published_variant import PublishedVariantsMV - +from mavedb.models.score_set import ScoreSet as ScoreSetDbModel from tests.helpers.constants import ( TEST_BIORXIV_IDENTIFIER, - TEST_MINIMAL_MAPPED_VARIANT, - TEST_NT_CDOT_TRANSCRIPT, TEST_KEYWORDS, TEST_MEDRXIV_IDENTIFIER, TEST_MINIMAL_ACC_SCORESET, + TEST_MINIMAL_MAPPED_VARIANT, TEST_MINIMAL_SEQ_SCORESET, + TEST_NT_CDOT_TRANSCRIPT, TEST_PUBMED_IDENTIFIER, VALID_GENE, ) -from tests.helpers.util.score_set import publish_score_set, create_acc_score_set, create_seq_score_set from tests.helpers.util.experiment import create_experiment -from tests.helpers.util.variant import mock_worker_variant_insertion, create_mapped_variants_for_score_set +from tests.helpers.util.score_set import create_acc_score_set, create_seq_score_set, publish_score_set +from tests.helpers.util.variant import create_mapped_variants_for_score_set, mock_worker_variant_insertion TARGET_ACCESSION_FIELDS = ["accession", "assembly", "gene"] TARGET_SEQUENCE_FIELDS = ["sequence", "sequence-type"] @@ -53,9 +55,8 @@ def setup_acc_scoreset(setup_router_db, session, data_provider, client, data_fil client, session, data_provider, score_set, data_files / "scores_acc.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + publish_score_set(client, score_set["urn"], score_set_id) @pytest.fixture @@ -67,9 +68,10 @@ def setup_seq_scoreset(setup_router_db, session, data_provider, client, data_fil ) create_mapped_variants_for_score_set(session, unpublished_score_set["urn"], TEST_MINIMAL_MAPPED_VARIANT) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + publish_score_set(client, unpublished_score_set["urn"], score_set_id) # Note that we have not created indexes for this view when it is generated via metadata. This differs # from the database created via alembic, which does create indexes. @@ -257,9 +259,10 @@ def test_target_gene_identifier_statistiscs( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + publish_score_set(client, unpublished_score_set["urn"], score_set_id) response = client.get(f"/api/v1/statistics/target/gene/{field_value}") desired_field_value = EXTERNAL_IDENTIFIERS[field_value]["identifier"]["identifier"] @@ -322,9 +325,10 @@ def test_record_publication_identifier_statistics( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + publish_score_set(client, unpublished_score_set["urn"], score_set_id) response = client.get(f"/api/v1/statistics/record/{model_value}/publication-identifiers") @@ -355,9 +359,10 @@ def test_record_keyword_statistics(session, data_provider, client, setup_router_ client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + publish_score_set(client, unpublished_score_set["urn"], score_set_id) response = client.get("/api/v1/statistics/record/experiment/keywords") desired_field_values = ["SaCas9", "Endogenous locus library method", "Base editor", "Other"] @@ -380,9 +385,10 @@ def test_record_doi_identifier_statistics(session, data_provider, client, setup_ client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + publish_score_set(client, unpublished_score_set["urn"], score_set_id) response = client.get(f"/api/v1/statistics/record/{model_value}/doi-identifiers") desired_field_value = record_update["doiIdentifiers"][0]["identifier"] @@ -406,9 +412,10 @@ def test_record_raw_read_identifier_statistics( client, session, data_provider, unpublished_score_set, data_files / "scores.csv" ) - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - publish_score_set(client, unpublished_score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars( + select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == unpublished_score_set["urn"]) + ).one() + publish_score_set(client, unpublished_score_set["urn"], score_set_id) response = client.get(f"/api/v1/statistics/record/{model_value}/raw-read-identifiers") desired_field_value = record_update["rawReadIdentifiers"][0]["identifier"] diff --git a/tests/routers/test_target_gene.py b/tests/routers/test_target_gene.py index 5ca1b4a2..51608b3a 100644 --- a/tests/routers/test_target_gene.py +++ b/tests/routers/test_target_gene.py @@ -1,18 +1,18 @@ # ruff: noqa: E402 + import pytest -from unittest.mock import patch +from sqlalchemy import select arq = pytest.importorskip("arq") cdot = pytest.importorskip("cdot") fastapi = pytest.importorskip("fastapi") from mavedb.models.score_set import ScoreSet as ScoreSetDbModel - from tests.helpers.constants import TEST_USER from tests.helpers.util.contributor import add_contributor from tests.helpers.util.experiment import create_experiment -from tests.helpers.util.user import change_ownership from tests.helpers.util.score_set import create_seq_score_set, publish_score_set +from tests.helpers.util.user import change_ownership from tests.helpers.util.variant import mock_worker_variant_insertion @@ -79,9 +79,8 @@ def test_search_public_target_genes_match_on_other_user(session, data_provider, experiment = create_experiment(client, {"title": "Experiment 1"}) score_set = create_seq_score_set(client, experiment["urn"]) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) change_ownership(session, published_score_set["urn"], ScoreSetDbModel) @@ -156,9 +155,8 @@ def test_fetch_public_target_gene_by_id(session, data_provider, client, setup_ro experiment = create_experiment(client, {"title": "Experiment 1"}) score_set = create_seq_score_set(client, experiment["urn"]) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") - with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: - published_score_set = publish_score_set(client, score_set["urn"]) - worker_queue.assert_called_once() + score_set_id = session.scalars(select(ScoreSetDbModel.id).where(ScoreSetDbModel.urn == score_set["urn"])).one() + published_score_set = publish_score_set(client, score_set["urn"], score_set_id) change_ownership(session, published_score_set["urn"], ScoreSetDbModel) diff --git a/tests/worker/test_jobs.py b/tests/worker/test_jobs.py index e7fd0b39..2d0012a5 100644 --- a/tests/worker/test_jobs.py +++ b/tests/worker/test_jobs.py @@ -22,7 +22,6 @@ from mavedb.lib.clingen.services import ( ClinGenAlleleRegistryService, ClinGenLdhService, - clingen_allele_id_from_ldh_variation, ) from mavedb.lib.mave.constants import HGVS_NT_COLUMN from mavedb.lib.score_sets import csv_data_to_df @@ -40,7 +39,6 @@ MAPPING_CURRENT_ID_NAME, MAPPING_QUEUE_NAME, create_variants_for_score_set, - link_clingen_variants, link_gnomad_variants, map_variants_for_score_set, poll_uniprot_mapping_jobs_for_score_set, @@ -1600,17 +1598,6 @@ async def test_mapping_manager_enqueues_mapping_process_with_successful_mapping( async def dummy_mapping_job(): return await setup_mapping_output(async_client, session, score_set) - async def dummy_ldh_submission_job(): - return [TEST_CLINGEN_SUBMISSION_RESPONSE, None] - - async def dummy_linking_job(): - return [ - (variant_urn, TEST_CLINGEN_LDH_LINKING_RESPONSE) - for variant_urn in session.scalars( - select(Variant.urn).join(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn) - ).all() - ] - # We seem unable to mock requests via requests_mock that occur inside another event loop. Workaround # this limitation by instead patching the _UnixSelectorEventLoop 's executor function, with a coroutine # object that sets up test mappingn output. @@ -1618,17 +1605,15 @@ async def dummy_linking_job(): patch.object( _UnixSelectorEventLoop, "run_in_executor", - side_effect=[dummy_mapping_job(), dummy_ldh_submission_job(), dummy_linking_job()], + side_effect=[dummy_mapping_job()], ), patch.object(ClinGenAlleleRegistryService, "dispatch_submissions", return_value=[TEST_CLINGEN_ALLELE_OBJECT]), - patch.object(ClinGenLdhService, "_existing_jwt", return_value="test_jwt"), patch.object(UniProtIDMappingAPI, "submit_id_mapping", return_value=TEST_UNIPROT_JOB_SUBMISSION_RESPONSE), patch.object(UniProtIDMappingAPI, "check_id_mapping_results_ready", return_value=True), patch.object( UniProtIDMappingAPI, "get_id_mapping_results", return_value=TEST_UNIPROT_ID_MAPPING_SWISS_PROT_RESPONSE ), patch("mavedb.worker.jobs.MAPPING_BACKOFF_IN_SECONDS", 0), - patch("mavedb.worker.jobs.LINKING_BACKOFF_IN_SECONDS", 0), patch("mavedb.worker.jobs.UNIPROT_ID_MAPPING_ENABLED", True), patch("mavedb.worker.jobs.CLIN_GEN_SUBMISSION_ENABLED", True), patch("mavedb.worker.jobs.CAR_SUBMISSION_ENDPOINT", "https://reg.test.genome.network/pytest"), @@ -1640,8 +1625,8 @@ async def dummy_linking_job(): await arq_worker.async_run() num_completed_jobs = await arq_worker.run_check() - # We should have completed all jobs exactly once. - assert num_completed_jobs == 8 + # We should have completed the mapping manager, mapping, CAR submission, and the two UniProt jobs. + assert num_completed_jobs == 5 score_set = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn)).one() mapped_variants_for_score_set = session.scalars( @@ -1775,17 +1760,6 @@ async def test_mapping_manager_enqueues_mapping_process_with_successful_mapping_ async def dummy_mapping_job(): return await setup_mapping_output(async_client, session, score_set) - async def dummy_submission_job(): - return [TEST_CLINGEN_SUBMISSION_RESPONSE, None] - - async def dummy_linking_job(): - return [ - (variant_urn, TEST_CLINGEN_LDH_LINKING_RESPONSE) - for variant_urn in session.scalars( - select(Variant.urn).join(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn) - ).all() - ] - # We seem unable to mock requests via requests_mock that occur inside another event loop. Workaround # this limitation by instead patching the _UnixSelectorEventLoop 's executor function, with a coroutine # object that sets up test mappingn output. @@ -1793,7 +1767,7 @@ async def dummy_linking_job(): patch.object( _UnixSelectorEventLoop, "run_in_executor", - side_effect=[dummy_mapping_job(), dummy_submission_job(), dummy_linking_job()], + side_effect=[dummy_mapping_job()], ), patch.object(ClinGenLdhService, "_existing_jwt", return_value="test_jwt"), patch("mavedb.worker.jobs.MAPPING_BACKOFF_IN_SECONDS", 0), @@ -1810,8 +1784,8 @@ async def dummy_linking_job(): await arq_worker.async_run() num_completed_jobs = await arq_worker.run_check() - # We should have completed the manager, mapping, submission, and linking jobs, but not the uniprot jobs. - assert num_completed_jobs == 6 + # We should have completed the manager, mapping, and allele registry submission, but not the uniprot jobs. + assert num_completed_jobs == 3 score_set = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn)).one() mapped_variants_for_score_set = session.scalars( @@ -1842,17 +1816,6 @@ async def failed_mapping_job(): async def dummy_mapping_job(): return await setup_mapping_output(async_client, session, score_set) - async def dummy_ldh_submission_job(): - return [TEST_CLINGEN_SUBMISSION_RESPONSE, None] - - async def dummy_linking_job(): - return [ - (variant_urn, TEST_CLINGEN_LDH_LINKING_RESPONSE) - for variant_urn in session.scalars( - select(Variant.urn).join(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn) - ).all() - ] - # We seem unable to mock requests via requests_mock that occur inside another event loop. Workaround # this limitation by instead patching the _UnixSelectorEventLoop 's executor function, with a coroutine # object that sets up test mappingn output. @@ -1860,12 +1823,10 @@ async def dummy_linking_job(): patch.object( _UnixSelectorEventLoop, "run_in_executor", - side_effect=[failed_mapping_job(), dummy_mapping_job(), dummy_ldh_submission_job(), dummy_linking_job()], + side_effect=[failed_mapping_job(), dummy_mapping_job()], ), - patch.object(ClinGenLdhService, "_existing_jwt", return_value="test_jwt"), patch.object(ClinGenAlleleRegistryService, "dispatch_submissions", return_value=[TEST_CLINGEN_ALLELE_OBJECT]), patch("mavedb.worker.jobs.MAPPING_BACKOFF_IN_SECONDS", 0), - patch("mavedb.worker.jobs.LINKING_BACKOFF_IN_SECONDS", 0), patch("mavedb.worker.jobs.UNIPROT_ID_MAPPING_ENABLED", False), patch("mavedb.worker.jobs.CLIN_GEN_SUBMISSION_ENABLED", True), patch("mavedb.worker.jobs.CAR_SUBMISSION_ENDPOINT", "https://reg.test.genome.network/pytest"), @@ -1877,8 +1838,8 @@ async def dummy_linking_job(): await arq_worker.async_run() num_completed_jobs = await arq_worker.run_check() - # We should have completed the mapping manager job twice, the mapping job twice, the two submission jobs, and both linking jobs. - assert num_completed_jobs == 8 + # We should have completed the mapping manager job twice, the mapping job twice and the CAR submission job (UniProt disabled). + assert num_completed_jobs == 5 score_set = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn)).one() mapped_variants_for_score_set = session.scalars( @@ -1968,7 +1929,7 @@ async def test_submit_score_set_mappings_to_car_success( assert result["success"] assert not result["retried"] - assert result["enqueued_job"] is not None + assert result["enqueued_job"] is None @pytest.mark.asyncio @@ -2082,39 +2043,6 @@ async def test_submit_score_set_mappings_to_car_exception_in_allele_association( assert not result["enqueued_job"] -@pytest.mark.asyncio -async def test_submit_score_set_mappings_to_car_exception_during_ldh_enqueue( - setup_worker_db, standalone_worker_context, session, async_client, data_files, arq_worker, arq_redis -): - score_set = await setup_records_files_and_variants_with_mapping( - session, - async_client, - data_files, - TEST_MINIMAL_SEQ_SCORESET, - standalone_worker_context, - ) - - with ( - patch("mavedb.worker.jobs.CAR_SUBMISSION_ENDPOINT", "https://reg.test.genome.network/pytest"), - patch.object(ClinGenAlleleRegistryService, "dispatch_submissions", return_value=[TEST_CLINGEN_ALLELE_OBJECT]), - patch.object(arq.ArqRedis, "enqueue_job", side_effect=Exception()), - ): - result = await submit_score_set_mappings_to_car(standalone_worker_context, uuid4().hex, score_set.id) - - mapped_variants_with_caid_for_score_set = session.scalars( - select(MappedVariant) - .join(Variant) - .join(ScoreSetDbModel) - .filter(ScoreSetDbModel.urn == score_set.urn, MappedVariant.clingen_allele_id.is_not(None)) - ).all() - - assert len(mapped_variants_with_caid_for_score_set) == score_set.num_variants - - assert not result["success"] - assert not result["retried"] - assert not result["enqueued_job"] - - ############################################################################################################################################ # ClinGen LDH Submission ############################################################################################################################################ @@ -2149,7 +2077,7 @@ async def dummy_submission_job(): assert result["success"] assert not result["retried"] - assert result["enqueued_job"] is not None + assert result["enqueued_job"] is None @pytest.mark.asyncio @@ -2332,439 +2260,6 @@ async def dummy_submission_job(): assert not result["enqueued_job"] -@pytest.mark.asyncio -async def test_submit_score_set_mappings_to_ldh_exception_during_linking_enqueue( - setup_worker_db, standalone_worker_context, session, async_client, data_files, arq_worker, arq_redis -): - score_set = await setup_records_files_and_variants_with_mapping( - session, - async_client, - data_files, - TEST_MINIMAL_SEQ_SCORESET, - standalone_worker_context, - ) - - async def dummy_submission_job(): - return [TEST_CLINGEN_SUBMISSION_RESPONSE, None] - - # We are unable to mock requests via requests_mock that occur inside another event loop. Instead, patch the return - # value of the EventLoop itself, which would have made the request. - with ( - patch.object( - _UnixSelectorEventLoop, - "run_in_executor", - return_value=dummy_submission_job(), - ), - patch.object(ClinGenLdhService, "_existing_jwt", return_value="test_jwt"), - patch.object(arq.ArqRedis, "enqueue_job", side_effect=Exception()), - ): - result = await submit_score_set_mappings_to_ldh(standalone_worker_context, uuid4().hex, score_set.id) - - assert not result["success"] - assert not result["retried"] - assert not result["enqueued_job"] - - -@pytest.mark.asyncio -async def test_submit_score_set_mappings_to_ldh_linking_not_queued_when_expected( - setup_worker_db, standalone_worker_context, session, async_client, data_files, arq_worker, arq_redis -): - score_set = await setup_records_files_and_variants_with_mapping( - session, - async_client, - data_files, - TEST_MINIMAL_SEQ_SCORESET, - standalone_worker_context, - ) - - async def dummy_submission_job(): - return [TEST_CLINGEN_SUBMISSION_RESPONSE, None] - - # We are unable to mock requests via requests_mock that occur inside another event loop. Instead, patch the return - # value of the EventLoop itself, which would have made the request. - with ( - patch.object( - _UnixSelectorEventLoop, - "run_in_executor", - return_value=dummy_submission_job(), - ), - patch.object(ClinGenLdhService, "_existing_jwt", return_value="test_jwt"), - patch.object(arq.ArqRedis, "enqueue_job", return_value=None), - ): - result = await submit_score_set_mappings_to_ldh(standalone_worker_context, uuid4().hex, score_set.id) - - assert not result["success"] - assert not result["retried"] - assert not result["enqueued_job"] - - -############################################################################################################################################## -## ClinGen Linkage -############################################################################################################################################## - - -@pytest.mark.asyncio -async def test_link_score_set_mappings_to_ldh_objects_success( - setup_worker_db, standalone_worker_context, session, async_client, data_files, arq_worker, arq_redis -): - score_set = await setup_records_files_and_variants_with_mapping( - session, - async_client, - data_files, - TEST_MINIMAL_SEQ_SCORESET, - standalone_worker_context, - ) - - async def dummy_linking_job(): - return [ - (variant_urn, TEST_CLINGEN_LDH_LINKING_RESPONSE) - for variant_urn in session.scalars( - select(Variant.urn).join(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn) - ).all() - ] - - # We are unable to mock requests via requests_mock that occur inside another event loop. Instead, patch the return - # value of the EventLoop itself, which would have made the request. - with patch.object( - _UnixSelectorEventLoop, - "run_in_executor", - return_value=dummy_linking_job(), - ): - result = await link_clingen_variants(standalone_worker_context, uuid4().hex, score_set.id, 1) - - assert result["success"] - assert not result["retried"] - assert result["enqueued_job"] - - for variant in session.scalars( - select(MappedVariant).join(Variant).join(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn) - ): - assert variant.clingen_allele_id == clingen_allele_id_from_ldh_variation(TEST_CLINGEN_LDH_LINKING_RESPONSE) - - -@pytest.mark.asyncio -async def test_link_score_set_mappings_to_ldh_objects_exception_in_setup( - setup_worker_db, standalone_worker_context, session, async_client, data_files, arq_worker, arq_redis -): - score_set = await setup_records_files_and_variants_with_mapping( - session, - async_client, - data_files, - TEST_MINIMAL_SEQ_SCORESET, - standalone_worker_context, - ) - - with patch( - "mavedb.worker.jobs.setup_job_state", - side_effect=Exception(), - ): - result = await link_clingen_variants(standalone_worker_context, uuid4().hex, score_set.id, 1) - - assert not result["success"] - assert not result["retried"] - assert not result["enqueued_job"] - - for variant in session.scalars( - select(MappedVariant).join(Variant).join(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn) - ): - assert variant.clingen_allele_id is None - - -@pytest.mark.asyncio -async def test_link_score_set_mappings_to_ldh_objects_no_variants_to_link( - setup_worker_db, standalone_worker_context, session, async_client, data_files, arq_worker, arq_redis -): - score_set = await setup_records_files_and_variants( - session, - async_client, - data_files, - TEST_MINIMAL_SEQ_SCORESET, - standalone_worker_context, - ) - - result = await link_clingen_variants(standalone_worker_context, uuid4().hex, score_set.id, 1) - - assert result["success"] - assert not result["retried"] - assert not result["enqueued_job"] - - -@pytest.mark.asyncio -async def test_link_score_set_mappings_to_ldh_objects_exception_during_linkage( - setup_worker_db, standalone_worker_context, session, async_client, data_files, arq_worker, arq_redis -): - score_set = await setup_records_files_and_variants_with_mapping( - session, - async_client, - data_files, - TEST_MINIMAL_SEQ_SCORESET, - standalone_worker_context, - ) - - # We are unable to mock requests via requests_mock that occur inside another event loop. Instead, patch the return - # value of the EventLoop itself, which would have made the request. - with patch.object( - _UnixSelectorEventLoop, - "run_in_executor", - side_effect=Exception(), - ): - result = await link_clingen_variants(standalone_worker_context, uuid4().hex, score_set.id, 1) - - assert not result["success"] - assert not result["retried"] - assert not result["enqueued_job"] - - -@pytest.mark.asyncio -async def test_link_score_set_mappings_to_ldh_objects_exception_while_parsing_linkages( - setup_worker_db, standalone_worker_context, session, async_client, data_files, arq_worker, arq_redis -): - score_set = await setup_records_files_and_variants_with_mapping( - session, - async_client, - data_files, - TEST_MINIMAL_SEQ_SCORESET, - standalone_worker_context, - ) - - async def dummy_linking_job(): - return [ - (variant_urn, TEST_CLINGEN_LDH_LINKING_RESPONSE) - for variant_urn in session.scalars( - select(Variant.urn).join(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn) - ).all() - ] - - # We are unable to mock requests via requests_mock that occur inside another event loop. Instead, patch the return - # value of the EventLoop itself, which would have made the request. - with ( - patch.object( - _UnixSelectorEventLoop, - "run_in_executor", - return_value=dummy_linking_job(), - ), - patch( - "mavedb.worker.jobs.clingen_allele_id_from_ldh_variation", - side_effect=Exception(), - ), - ): - result = await link_clingen_variants(standalone_worker_context, uuid4().hex, score_set.id, 1) - - assert not result["success"] - assert not result["retried"] - assert not result["enqueued_job"] - - -@pytest.mark.asyncio -async def test_link_score_set_mappings_to_ldh_objects_failures_exist_but_do_not_eclipse_retry_threshold( - setup_worker_db, standalone_worker_context, session, async_client, data_files, arq_worker, arq_redis -): - score_set = await setup_records_files_and_variants_with_mapping( - session, - async_client, - data_files, - TEST_MINIMAL_SEQ_SCORESET, - standalone_worker_context, - ) - - async def dummy_linking_job(): - return [ - (variant_urn, None) - for variant_urn in session.scalars( - select(Variant.urn).join(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn) - ).all() - ] - - # We are unable to mock requests via requests_mock that occur inside another event loop. Instead, patch the return - # value of the EventLoop itself, which would have made the request. - with ( - patch.object( - _UnixSelectorEventLoop, - "run_in_executor", - return_value=dummy_linking_job(), - ), - patch( - "mavedb.worker.jobs.LINKED_DATA_RETRY_THRESHOLD", - 2, - ), - ): - result = await link_clingen_variants(standalone_worker_context, uuid4().hex, score_set.id, 1) - - assert result["success"] - assert not result["retried"] - assert result["enqueued_job"] - - -@pytest.mark.asyncio -async def test_link_score_set_mappings_to_ldh_objects_failures_exist_and_eclipse_retry_threshold( - setup_worker_db, standalone_worker_context, session, async_client, data_files, arq_worker, arq_redis -): - score_set = await setup_records_files_and_variants_with_mapping( - session, - async_client, - data_files, - TEST_MINIMAL_SEQ_SCORESET, - standalone_worker_context, - ) - - async def dummy_linking_job(): - return [ - (variant_urn, None) - for variant_urn in session.scalars( - select(Variant.urn).join(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn) - ).all() - ] - - # We are unable to mock requests via requests_mock that occur inside another event loop. Instead, patch the return - # value of the EventLoop itself, which would have made the request. - with ( - patch.object( - _UnixSelectorEventLoop, - "run_in_executor", - return_value=dummy_linking_job(), - ), - patch( - "mavedb.worker.jobs.LINKED_DATA_RETRY_THRESHOLD", - 1, - ), - patch( - "mavedb.worker.jobs.LINKING_BACKOFF_IN_SECONDS", - 0, - ), - ): - result = await link_clingen_variants(standalone_worker_context, uuid4().hex, score_set.id, 1) - - assert not result["success"] - assert result["retried"] - assert result["enqueued_job"] - - -@pytest.mark.asyncio -async def test_link_score_set_mappings_to_ldh_objects_failures_exist_and_eclipse_retry_threshold_cant_enqueue( - setup_worker_db, standalone_worker_context, session, async_client, data_files, arq_worker, arq_redis -): - score_set = await setup_records_files_and_variants_with_mapping( - session, - async_client, - data_files, - TEST_MINIMAL_SEQ_SCORESET, - standalone_worker_context, - ) - - async def dummy_linking_job(): - return [ - (variant_urn, None) - for variant_urn in session.scalars( - select(Variant.urn).join(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn) - ).all() - ] - - # We are unable to mock requests via requests_mock that occur inside another event loop. Instead, patch the return - # value of the EventLoop itself, which would have made the request. - with ( - patch.object( - _UnixSelectorEventLoop, - "run_in_executor", - return_value=dummy_linking_job(), - ), - patch( - "mavedb.worker.jobs.LINKED_DATA_RETRY_THRESHOLD", - 1, - ), - patch.object(arq.ArqRedis, "enqueue_job", return_value=awaitable_exception()), - ): - result = await link_clingen_variants(standalone_worker_context, uuid4().hex, score_set.id, 1) - - assert not result["success"] - assert not result["retried"] - assert not result["enqueued_job"] - - -@pytest.mark.asyncio -async def test_link_score_set_mappings_to_ldh_objects_failures_exist_and_eclipse_retry_threshold_retries_exceeded( - setup_worker_db, standalone_worker_context, session, async_client, data_files, arq_worker, arq_redis -): - score_set = await setup_records_files_and_variants_with_mapping( - session, - async_client, - data_files, - TEST_MINIMAL_SEQ_SCORESET, - standalone_worker_context, - ) - - async def dummy_linking_job(): - return [ - (variant_urn, None) - for variant_urn in session.scalars( - select(Variant.urn).join(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn) - ).all() - ] - - # We are unable to mock requests via requests_mock that occur inside another event loop. Instead, patch the return - # value of the EventLoop itself, which would have made the request. - with ( - patch.object( - _UnixSelectorEventLoop, - "run_in_executor", - return_value=dummy_linking_job(), - ), - patch( - "mavedb.worker.jobs.LINKED_DATA_RETRY_THRESHOLD", - 1, - ), - patch( - "mavedb.worker.jobs.LINKING_BACKOFF_IN_SECONDS", - 0, - ), - patch( - "mavedb.worker.jobs.BACKOFF_LIMIT", - 1, - ), - ): - result = await link_clingen_variants(standalone_worker_context, uuid4().hex, score_set.id, 2) - - assert not result["success"] - assert not result["retried"] - assert not result["enqueued_job"] - - -@pytest.mark.asyncio -async def test_link_score_set_mappings_to_ldh_objects_error_in_gnomad_job_enqueue( - setup_worker_db, standalone_worker_context, session, async_client, data_files, arq_worker, arq_redis -): - score_set = await setup_records_files_and_variants_with_mapping( - session, - async_client, - data_files, - TEST_MINIMAL_SEQ_SCORESET, - standalone_worker_context, - ) - - async def dummy_linking_job(): - return [ - (variant_urn, TEST_CLINGEN_LDH_LINKING_RESPONSE) - for variant_urn in session.scalars( - select(Variant.urn).join(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn) - ).all() - ] - - # We are unable to mock requests via requests_mock that occur inside another event loop. Instead, patch the return - # value of the EventLoop itself, which would have made the request. - with ( - patch.object( - _UnixSelectorEventLoop, - "run_in_executor", - return_value=dummy_linking_job(), - ), - patch.object(arq.ArqRedis, "enqueue_job", return_value=awaitable_exception()), - ): - result = await link_clingen_variants(standalone_worker_context, uuid4().hex, score_set.id, 1) - - assert not result["success"] - assert not result["retried"] - assert not result["enqueued_job"] - - ################################################################################################################################################## # UniProt ID mapping ##################################################################################################################################################