Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ harbor_images832:
multires_image: tomorecon_nersc_mpi_hdf5@sha256:cc098a2cfb6b1632ea872a202c66cb7566908da066fd8f8c123b92fa95c2a43c

ghcr_images832:
recon_image: ghcr.io/als-computing/microct:master
multires_image: ghcr.io/als-computing/microct:master
recon_image: ghcr.io/als-computing/microct@sha256:1fdfb786726ee03301d624319e3d16702045072f38e2b0cca9d6237e5ab3f5ff
multires_image: ghcr.io/als-computing/microct@sha256:1fdfb786726ee03301d624319e3d16702045072f38e2b0cca9d6237e5ab3f5ff

prefect:
deployments:
Expand Down
1 change: 1 addition & 0 deletions orchestration/flows/bl832/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ def _beam_specific_config(self) -> None:
self.alcf832_scratch = self.endpoints["alcf832_scratch"]
self.scicat = self.config["scicat"]
self.ghcr_images832 = self.config["ghcr_images832"]
self.nersc_recon_num_nodes = 4
32 changes: 28 additions & 4 deletions orchestration/flows/bl832/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class FlowParameterMapper:
# From nersc.py
"nersc_recon_flow/nersc_recon_flow": [
"file_path",
"config"],
"nersc_recon_multinode_flow/nersc_recon_multinode_flow": [
"file_path",
"num_nodes",
"config"]
}

Expand All @@ -51,27 +55,37 @@ class DecisionFlowInputModel(BaseModel):
"""
file_path: Optional[str] = Field(default=None)
is_export_control: Optional[bool] = Field(default=False)
num_nodes: Optional[int] = Field(default=4)
config: Optional[Union[dict, Any]] = Field(default_factory=dict)


@task(name="setup_decision_settings")
def setup_decision_settings(alcf_recon: bool, nersc_recon: bool, new_file_832: bool) -> dict:
def setup_decision_settings(
alcf_recon: bool,
nersc_recon: bool,
nersc_recon_multinode: bool,
new_file_832: bool
) -> dict:
"""
This task is used to define the settings for the decision making process of the BL832 beamline.

:param alcf_recon: Boolean indicating whether to run the ALCF reconstruction flow.
:param nersc_recon: Boolean indicating whether to run the NERSC reconstruction flow.
:param nersc_move: Boolean indicating whether to move files to NERSC.
:param nersc_recon_multinode: Boolean indicating whether to run the NERSC multinode reconstruction flow.
:param new_file_832: Boolean indicating whether to move files to NERSC.
:return: A dictionary containing the settings for each flow.
"""
logger = get_run_logger()
try:
logger.info(f"Setting up decision settings: alcf_recon={alcf_recon}, "
f"nersc_recon={nersc_recon}, new_file_832={new_file_832}")
f"nersc_recon={nersc_recon}, "
f"nersc_recon_multinode={nersc_recon_multinode}, "
f"new_file_832={new_file_832}")
# Define which flows to run based on the input settings
settings = {
"alcf_recon_flow/alcf_recon_flow": alcf_recon,
"nersc_recon_flow/nersc_recon_flow": nersc_recon,
"nersc_recon_multinode_flow/nersc_recon_multinode_flow": nersc_recon_multinode,
"new_832_file_flow/new_file_832": new_file_832
}
# Save the settings in a JSON block for later retrieval by other flows
Expand Down Expand Up @@ -149,6 +163,11 @@ async def dispatcher(
nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params)
tasks.append(run_recon_flow_async("nersc_recon_flow/nersc_recon_flow", nersc_params))

if decision_settings.get("nersc_recon_multinode_flow/nersc_recon_multinode_flow"):
nersc_multinode_params = FlowParameterMapper.get_flow_parameters(
"nersc_recon_multinode_flow/nersc_recon_multinode_flow", available_params)
tasks.append(run_recon_flow_async("nersc_recon_multinode_flow/nersc_recon_multinode_flow", nersc_multinode_params))

# Run ALCF and NERSC flows in parallel, if any
if tasks:
try:
Expand All @@ -169,7 +188,12 @@ async def dispatcher(
"""
try:
# Setup decision settings based on input parameters
setup_decision_settings(alcf_recon=True, nersc_recon=True, new_file_832=True)
setup_decision_settings(
alcf_recon=True,
nersc_recon=True,
nersc_recon_multinode=True,
new_file_832=True
)
# Run the main decision flow with the specified parameters
# asyncio.run(dispatcher(
# config={}, # PYTEST, ALCF, NERSC
Expand Down
Loading