From b5bc9dd3fa4e7e6faf66202e06abd36c5252947f Mon Sep 17 00:00:00 2001 From: zzstoatzz Date: Tue, 12 Nov 2024 08:11:33 -0800 Subject: [PATCH 1/7] deploy example and fix typing --- examples/deploy.py | 27 +++++++++++++++++++++++++++ src/controlflow/decorators.py | 30 +++++++++++++++++------------- src/controlflow/instructions.py | 6 ++++-- 3 files changed, 48 insertions(+), 15 deletions(-) create mode 100644 examples/deploy.py diff --git a/examples/deploy.py b/examples/deploy.py new file mode 100644 index 00000000..ad4a086c --- /dev/null +++ b/examples/deploy.py @@ -0,0 +1,27 @@ +import sys +from pathlib import Path + +import controlflow as cf + + +@cf.task() +def write_poem(topic: str) -> str: + """Write four lines that rhyme""" + return f"The topic is {topic}" + + +@cf.flow() +def write_poems(topics: list[str]) -> list[str]: + return write_poem.map(topics).result() + + +if __name__ == "__main__": + if len(sys.argv) > 1 and sys.argv[1] == "serve": + write_poems.serve() + elif len(sys.argv) > 1 and sys.argv[1] == "deploy": + write_poems.from_source( + source=str((p := Path(__file__)).parent.resolve()), + entrypoint=f"{p.name}:something", + ).deploy(name="some-deployment", work_pool_name="local-process-pool") + else: + write_poems(["roses", "violets", "sugar", "spice"]) diff --git a/src/controlflow/decorators.py b/src/controlflow/decorators.py index 82df0e71..44a852b3 100644 --- a/src/controlflow/decorators.py +++ b/src/controlflow/decorators.py @@ -1,8 +1,9 @@ import asyncio import functools import inspect -from typing import Any, Callable, Optional, Union +from typing import Any, Callable, Optional, ParamSpec, TypeVar, Union, cast +from prefect import Flow as PrefectFlow from prefect.utilities.asyncutils import run_coro_as_sync import controlflow @@ -14,11 +15,14 @@ # from controlflow.utilities.marvin import patch_marvin +P = ParamSpec("P") +R = TypeVar("R") + logger = get_logger(__name__) def flow( - fn: Optional[Callable[..., Any]] = None, + fn: Optional[Callable[P, R]] = None, *, thread: Optional[str] = None, instructions: Optional[str] = None, @@ -30,7 +34,7 @@ def flow( prefect_kwargs: Optional[dict[str, Any]] = None, context_kwargs: Optional[list[str]] = None, **kwargs: Optional[dict[str, Any]], -): +) -> Callable[[Callable[P, R]], PrefectFlow[P, R]]: """ A decorator that wraps a function as a ControlFlow flow. @@ -54,7 +58,7 @@ def flow( callable: The wrapped function or a new flow decorator if `fn` is not provided. """ if fn is None: - return functools.partial( + return functools.partial( # type: ignore flow, thread=thread, instructions=instructions, @@ -72,11 +76,11 @@ def flow( def create_flow_context(bound_args): flow_kwargs = kwargs.copy() if thread is not None: - flow_kwargs.setdefault("thread_id", thread) + flow_kwargs.setdefault("thread_id", thread) # type: ignore if tools is not None: - flow_kwargs.setdefault("tools", tools) + flow_kwargs.setdefault("tools", tools) # type: ignore if default_agent is not None: - flow_kwargs.setdefault("default_agent", default_agent) + flow_kwargs.setdefault("default_agent", default_agent) # type: ignore context = {} if context_kwargs: @@ -92,7 +96,7 @@ def create_flow_context(bound_args): if asyncio.iscoroutinefunction(fn): @functools.wraps(fn) - async def wrapper(*wrapper_args, **wrapper_kwargs): + async def wrapper(*wrapper_args, **wrapper_kwargs): # type: ignore bound = sig.bind(*wrapper_args, **wrapper_kwargs) bound.apply_defaults() with ( @@ -112,13 +116,13 @@ def wrapper(*wrapper_args, **wrapper_kwargs): ): return fn(*wrapper_args, **wrapper_kwargs) - wrapper = prefect_flow( + prefect_wrapper = prefect_flow( timeout_seconds=timeout_seconds, retries=retries, retry_delay_seconds=retry_delay_seconds, **(prefect_kwargs or {}), )(wrapper) - return wrapper + return cast(Callable[[Callable[P, R]], PrefectFlow[P, R]], prefect_wrapper) def task( @@ -222,13 +226,13 @@ def wrapper(*args, **kwargs): task = _get_task(*args, **kwargs) return task.run() - wrapper = prefect_task( + prefect_wrapper = prefect_task( timeout_seconds=timeout_seconds, retries=retries, retry_delay_seconds=retry_delay_seconds, )(wrapper) # store the `as_task` method for loading the task object - wrapper.as_task = _get_task + prefect_wrapper.as_task = _get_task - return wrapper + return cast(Callable[[Callable[..., Any]], Task], prefect_wrapper) diff --git a/src/controlflow/instructions.py b/src/controlflow/instructions.py index 55fbaf76..eb863278 100644 --- a/src/controlflow/instructions.py +++ b/src/controlflow/instructions.py @@ -1,5 +1,5 @@ from contextlib import contextmanager -from typing import Generator, List +from typing import Generator, List, Union from controlflow.utilities.context import ctx from controlflow.utilities.logging import get_logger @@ -8,7 +8,9 @@ @contextmanager -def instructions(instructions: str) -> Generator[list[str], None, None]: +def instructions( + instructions: Union[str, None], +) -> Generator[Union[list[str], None], None, None]: """ Temporarily add instructions to the current instruction stack. The instruction is removed when the context is exited. From be41a7c85aad62977ff3577e4e654893c0201840 Mon Sep 17 00:00:00 2001 From: zzstoatzz Date: Tue, 12 Nov 2024 16:08:40 -0800 Subject: [PATCH 2/7] update example --- examples/deploy.py | 27 ------------------ examples/prefect-deploy.Dockerfile | 10 +++++++ examples/prefect_deploy.py | 44 ++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 27 deletions(-) delete mode 100644 examples/deploy.py create mode 100644 examples/prefect-deploy.Dockerfile create mode 100644 examples/prefect_deploy.py diff --git a/examples/deploy.py b/examples/deploy.py deleted file mode 100644 index ad4a086c..00000000 --- a/examples/deploy.py +++ /dev/null @@ -1,27 +0,0 @@ -import sys -from pathlib import Path - -import controlflow as cf - - -@cf.task() -def write_poem(topic: str) -> str: - """Write four lines that rhyme""" - return f"The topic is {topic}" - - -@cf.flow() -def write_poems(topics: list[str]) -> list[str]: - return write_poem.map(topics).result() - - -if __name__ == "__main__": - if len(sys.argv) > 1 and sys.argv[1] == "serve": - write_poems.serve() - elif len(sys.argv) > 1 and sys.argv[1] == "deploy": - write_poems.from_source( - source=str((p := Path(__file__)).parent.resolve()), - entrypoint=f"{p.name}:something", - ).deploy(name="some-deployment", work_pool_name="local-process-pool") - else: - write_poems(["roses", "violets", "sugar", "spice"]) diff --git a/examples/prefect-deploy.Dockerfile b/examples/prefect-deploy.Dockerfile new file mode 100644 index 00000000..05addc9c --- /dev/null +++ b/examples/prefect-deploy.Dockerfile @@ -0,0 +1,10 @@ +FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim + +WORKDIR /app + +ENV UV_SYSTEM_PYTHON=1 +ENV PATH="/root/.local/bin:$PATH" + +RUN --mount=type=cache,target=/root/.cache/uv \ + uv pip install controlflow + diff --git a/examples/prefect_deploy.py b/examples/prefect_deploy.py new file mode 100644 index 00000000..271ea775 --- /dev/null +++ b/examples/prefect_deploy.py @@ -0,0 +1,44 @@ +import sys +from pathlib import Path + +from prefect.docker import DockerImage + +import controlflow as cf + + +@cf.task() +def write_poem(topic: str) -> str: + """Write four lines that rhyme""" + return f"The topic is {topic}" + + +@cf.flow() +def write_poems(topics: list[str]) -> list[str]: + return write_poem.map(topics).result() + + +if __name__ == "__main__": + if len(sys.argv) > 1 and sys.argv[1] == "serve": + write_poems.serve() + elif len(sys.argv) > 1 and sys.argv[1] == "local_deploy": + write_poems.from_source( + source=str((p := Path(__file__)).parent.resolve()), + entrypoint=f"{p.name}:write_poem", + ).deploy(name="local-deployment", work_pool_name="local-process-pool") + elif len(sys.argv) > 1 and sys.argv[1] == "docker_deploy": + write_poems.from_source( + source="https://github.com/PrefectHQ/controlflow.git@example-deploy", + entrypoint="examples/prefect_deploy.py:write_poems", + ).deploy( + name="docker-deployment", + image=DockerImage( + name="zzstoatzz/cf-test-deploy", + tag="latest", + dockerfile=str( + Path(__file__).parent.resolve() / "prefect-deploy.Dockerfile" + ), + ), + work_pool_name="docker-pool", + ) + else: + write_poems(["roses", "violets", "sugar", "spice"]) From 041efb055fa2045fcc648ce7c6ea1323517648e6 Mon Sep 17 00:00:00 2001 From: zzstoatzz Date: Tue, 12 Nov 2024 16:09:50 -0800 Subject: [PATCH 3/7] clarify --- examples/prefect_deploy.py | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/prefect_deploy.py b/examples/prefect_deploy.py index 271ea775..453990dd 100644 --- a/examples/prefect_deploy.py +++ b/examples/prefect_deploy.py @@ -41,4 +41,5 @@ def write_poems(topics: list[str]) -> list[str]: work_pool_name="docker-pool", ) else: + print(f"just running the code\n\n\n\n\n\n") write_poems(["roses", "violets", "sugar", "spice"]) From 6e5a43d6ac7910707c4a8f67982214b8ca3b4906 Mon Sep 17 00:00:00 2001 From: zzstoatzz Date: Tue, 12 Nov 2024 21:02:29 -0600 Subject: [PATCH 4/7] add docker example --- examples/prefect-deploy.Dockerfile | 7 +++++-- examples/prefect_deploy.py | 12 ++++++++++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/examples/prefect-deploy.Dockerfile b/examples/prefect-deploy.Dockerfile index 05addc9c..55fb0fdb 100644 --- a/examples/prefect-deploy.Dockerfile +++ b/examples/prefect-deploy.Dockerfile @@ -1,10 +1,13 @@ FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim +RUN apt-get update && apt-get install -y git + +RUN rm -rf /var/lib/apt/lists/* + WORKDIR /app ENV UV_SYSTEM_PYTHON=1 ENV PATH="/root/.local/bin:$PATH" -RUN --mount=type=cache,target=/root/.cache/uv \ - uv pip install controlflow +RUN uv pip install controlflow diff --git a/examples/prefect_deploy.py b/examples/prefect_deploy.py index 453990dd..b6c71c24 100644 --- a/examples/prefect_deploy.py +++ b/examples/prefect_deploy.py @@ -1,7 +1,9 @@ +import os import sys from pathlib import Path from prefect.docker import DockerImage +from prefect.runner.storage import GitRepository import controlflow as cf @@ -26,8 +28,12 @@ def write_poems(topics: list[str]) -> list[str]: entrypoint=f"{p.name}:write_poem", ).deploy(name="local-deployment", work_pool_name="local-process-pool") elif len(sys.argv) > 1 and sys.argv[1] == "docker_deploy": + repo = GitRepository( + url="https://github.com/PrefectHQ/controlflow.git", + branch="example-deploy", + ) write_poems.from_source( - source="https://github.com/PrefectHQ/controlflow.git@example-deploy", + source=repo, entrypoint="examples/prefect_deploy.py:write_poems", ).deploy( name="docker-deployment", @@ -38,7 +44,9 @@ def write_poems(topics: list[str]) -> list[str]: Path(__file__).parent.resolve() / "prefect-deploy.Dockerfile" ), ), - work_pool_name="docker-pool", + work_pool_name="docker-work", + parameters={"topics": ["roses", "violets", "sugar", "spice"]}, + job_variables={"env": {"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY")}}, ) else: print(f"just running the code\n\n\n\n\n\n") From 85fa4fdc47f53802bf70e40116f4b1d2d46f520f Mon Sep 17 00:00:00 2001 From: zzstoatzz Date: Tue, 12 Nov 2024 21:04:11 -0600 Subject: [PATCH 5/7] make 3.9 happy --- src/controlflow/decorators.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/controlflow/decorators.py b/src/controlflow/decorators.py index 44a852b3..92f06efb 100644 --- a/src/controlflow/decorators.py +++ b/src/controlflow/decorators.py @@ -1,10 +1,11 @@ import asyncio import functools import inspect -from typing import Any, Callable, Optional, ParamSpec, TypeVar, Union, cast +from typing import Any, Callable, Optional, TypeVar, Union, cast from prefect import Flow as PrefectFlow from prefect.utilities.asyncutils import run_coro_as_sync +from typing_extensions import ParamSpec import controlflow from controlflow.agents import Agent From 329cf8e1d880142594caa255425cefce098c6d8b Mon Sep 17 00:00:00 2001 From: zzstoatzz Date: Tue, 12 Nov 2024 21:58:08 -0600 Subject: [PATCH 6/7] make friendly to any user better example updates fix kwargs --- examples/prefect_deploy.py | 53 ------ ...t-deploy.Dockerfile => read-hn.Dockerfile} | 1 - examples/read_hn.py | 99 +++++++++++ src/controlflow/decorators.py | 157 +++++++++--------- 4 files changed, 175 insertions(+), 135 deletions(-) delete mode 100644 examples/prefect_deploy.py rename examples/{prefect-deploy.Dockerfile => read-hn.Dockerfile} (85%) create mode 100644 examples/read_hn.py diff --git a/examples/prefect_deploy.py b/examples/prefect_deploy.py deleted file mode 100644 index b6c71c24..00000000 --- a/examples/prefect_deploy.py +++ /dev/null @@ -1,53 +0,0 @@ -import os -import sys -from pathlib import Path - -from prefect.docker import DockerImage -from prefect.runner.storage import GitRepository - -import controlflow as cf - - -@cf.task() -def write_poem(topic: str) -> str: - """Write four lines that rhyme""" - return f"The topic is {topic}" - - -@cf.flow() -def write_poems(topics: list[str]) -> list[str]: - return write_poem.map(topics).result() - - -if __name__ == "__main__": - if len(sys.argv) > 1 and sys.argv[1] == "serve": - write_poems.serve() - elif len(sys.argv) > 1 and sys.argv[1] == "local_deploy": - write_poems.from_source( - source=str((p := Path(__file__)).parent.resolve()), - entrypoint=f"{p.name}:write_poem", - ).deploy(name="local-deployment", work_pool_name="local-process-pool") - elif len(sys.argv) > 1 and sys.argv[1] == "docker_deploy": - repo = GitRepository( - url="https://github.com/PrefectHQ/controlflow.git", - branch="example-deploy", - ) - write_poems.from_source( - source=repo, - entrypoint="examples/prefect_deploy.py:write_poems", - ).deploy( - name="docker-deployment", - image=DockerImage( - name="zzstoatzz/cf-test-deploy", - tag="latest", - dockerfile=str( - Path(__file__).parent.resolve() / "prefect-deploy.Dockerfile" - ), - ), - work_pool_name="docker-work", - parameters={"topics": ["roses", "violets", "sugar", "spice"]}, - job_variables={"env": {"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY")}}, - ) - else: - print(f"just running the code\n\n\n\n\n\n") - write_poems(["roses", "violets", "sugar", "spice"]) diff --git a/examples/prefect-deploy.Dockerfile b/examples/read-hn.Dockerfile similarity index 85% rename from examples/prefect-deploy.Dockerfile rename to examples/read-hn.Dockerfile index 55fb0fdb..26a5c787 100644 --- a/examples/prefect-deploy.Dockerfile +++ b/examples/read-hn.Dockerfile @@ -7,7 +7,6 @@ RUN rm -rf /var/lib/apt/lists/* WORKDIR /app ENV UV_SYSTEM_PYTHON=1 -ENV PATH="/root/.local/bin:$PATH" RUN uv pip install controlflow diff --git a/examples/read_hn.py b/examples/read_hn.py new file mode 100644 index 00000000..e3dff0bd --- /dev/null +++ b/examples/read_hn.py @@ -0,0 +1,99 @@ +# /// script +# dependencies = ["controlflow"] +# /// + +import os +import sys +from pathlib import Path +from typing import Annotated, TypedDict + +import httpx +from prefect.artifacts import create_markdown_artifact +from prefect.blocks.system import Secret +from prefect.docker import DockerImage +from prefect.runner.storage import GitCredentials, GitRepository +from pydantic import AnyHttpUrl, Field + +import controlflow as cf + + +class HNArticleSummary(TypedDict): + link: AnyHttpUrl + title: str + main_topics: Annotated[set[str], Field(min_length=1, max_length=5)] + key_takeaways: Annotated[set[str], Field(min_length=1, max_length=5)] + tech_domains: Annotated[set[str], Field(min_length=1, max_length=5)] + + +@cf.task(instructions="concise, main details") +def analyze_article(id: str) -> HNArticleSummary: + """Analyze a HackerNews article and return structured insights""" + content = httpx.get(f"https://hacker-news.firebaseio.com/v0/item/{id}.json").json() + return f"here is the article content: {content}" # type: ignore + + +@cf.task() +def summarize_article_briefs( + briefs: list[HNArticleSummary], +) -> Annotated[str, Field(description="markdown summary")]: + """Summarize a list of article briefs""" + return f"here are the article briefs: {briefs}" # type: ignore + + +@cf.flow(retries=2) +def analyze_hn_articles(n: int = 5): + top_article_ids = httpx.get( + "https://hacker-news.firebaseio.com/v0/topstories.json" + ).json()[:n] + briefs = analyze_article.map(top_article_ids).result() + create_markdown_artifact( + key="hn-article-exec-summary", + markdown=summarize_article_briefs(briefs), + ) + + +if __name__ == "__main__": + EVERY_12_HOURS_CRON = "0 */12 * * *" + if len(sys.argv) > 1 and sys.argv[1] == "serve": + analyze_hn_articles.serve( + parameters={"n": 5}, + cron=EVERY_12_HOURS_CRON, + ) + elif len(sys.argv) > 1 and sys.argv[1] == "local_deploy": + analyze_hn_articles.from_source( + source=str((p := Path(__file__)).parent.resolve()), + entrypoint=f"{p.name}:analyze_hn_articles", + ).deploy( + name="local-deployment", + work_pool_name="local", + cron=EVERY_12_HOURS_CRON, + ) + elif len(sys.argv) > 1 and sys.argv[1] == "docker_deploy": + repo = GitRepository( + url="https://github.com/PrefectHQ/controlflow.git", + branch="main", + credentials=None, # replace with `dict(username="", access_token="")` for private repos + ) + analyze_hn_articles.from_source( + source=repo, + entrypoint="examples/read_hn.py:analyze_articles", + ).deploy( + name="docker-deployment", + # image=DockerImage( # uncomment and replace with your own image if desired + # name="zzstoatzz/cf-read-hn", + # tag="latest", + # dockerfile=str(Path(__file__).parent.resolve() / "read-hn.Dockerfile"), + # ), + work_pool_name="docker-work", # uv pip install -U prefect-docker prefect worker start --pool docker-work --type docker + cron=EVERY_12_HOURS_CRON, + parameters={"n": 5}, + job_variables={ + "env": {"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY")}, + "image": "zzstoatzz/cf-read-hn:latest", # publicly available image on dockerhub + }, + build=False, + push=False, + ) + else: + print(f"just running the code\n\n\n\n\n\n") + analyze_hn_articles(5) diff --git a/src/controlflow/decorators.py b/src/controlflow/decorators.py index 92f06efb..8cbf7039 100644 --- a/src/controlflow/decorators.py +++ b/src/controlflow/decorators.py @@ -4,6 +4,7 @@ from typing import Any, Callable, Optional, TypeVar, Union, cast from prefect import Flow as PrefectFlow +from prefect import Task as PrefectTask from prefect.utilities.asyncutils import run_coro_as_sync from typing_extensions import ParamSpec @@ -34,7 +35,7 @@ def flow( timeout_seconds: Optional[Union[float, int]] = None, prefect_kwargs: Optional[dict[str, Any]] = None, context_kwargs: Optional[list[str]] = None, - **kwargs: Optional[dict[str, Any]], + **kwargs: Any, ) -> Callable[[Callable[P, R]], PrefectFlow[P, R]]: """ A decorator that wraps a function as a ControlFlow flow. @@ -75,13 +76,15 @@ def flow( sig = inspect.signature(fn) def create_flow_context(bound_args): - flow_kwargs = kwargs.copy() + flow_kwargs: dict[str, Any] = kwargs.copy() if thread is not None: - flow_kwargs.setdefault("thread_id", thread) # type: ignore + flow_kwargs["thread_id"] = thread if tools is not None: - flow_kwargs.setdefault("tools", tools) # type: ignore + flow_kwargs["tools"] = tools if default_agent is not None: - flow_kwargs.setdefault("default_agent", default_agent) # type: ignore + flow_kwargs["default_agent"] = default_agent + + flow_kwargs.update(kwargs) context = {} if context_kwargs: @@ -117,17 +120,19 @@ def wrapper(*wrapper_args, **wrapper_kwargs): ): return fn(*wrapper_args, **wrapper_kwargs) - prefect_wrapper = prefect_flow( - timeout_seconds=timeout_seconds, - retries=retries, - retry_delay_seconds=retry_delay_seconds, - **(prefect_kwargs or {}), - )(wrapper) - return cast(Callable[[Callable[P, R]], PrefectFlow[P, R]], prefect_wrapper) + return cast( + Callable[[Callable[P, R]], PrefectFlow[P, R]], + prefect_flow( + timeout_seconds=timeout_seconds, + retries=retries, + retry_delay_seconds=retry_delay_seconds, + **(prefect_kwargs or {}), + )(wrapper), + ) def task( - fn: Optional[Callable[..., Any]] = None, + fn: Optional[Callable[P, R]] = None, *, objective: Optional[str] = None, instructions: Optional[str] = None, @@ -138,8 +143,8 @@ def task( retries: Optional[int] = None, retry_delay_seconds: Optional[Union[float, int]] = None, timeout_seconds: Optional[Union[float, int]] = None, - **task_kwargs: Optional[dict[str, Any]], -): + **task_kwargs: Any, +) -> Callable[[Callable[P, R]], PrefectTask[P, R]]: """ A decorator that turns a Python function into a Task. The Task objective is set to the function name, and the instructions are set to the function @@ -162,78 +167,68 @@ def task( callable: The wrapped function or a new task decorator if `fn` is not provided. """ - if fn is None: - return functools.partial( - task, - objective=objective, - instructions=instructions, - name=name, - agents=agents, - tools=tools, - interactive=interactive, - retries=retries, - retry_delay_seconds=retry_delay_seconds, - timeout_seconds=timeout_seconds, - **task_kwargs, - ) - - sig = inspect.signature(fn) - - if name is None: - name = fn.__name__ - - if objective is None: - objective = fn.__doc__ or "" + def decorator(func: Callable[P, R]) -> PrefectTask[P, R]: + sig = inspect.signature(func) - result_type = fn.__annotations__.get("return") - - def _get_task(*args, **kwargs) -> Task: - # first process callargs - bound = sig.bind(*args, **kwargs) - bound.apply_defaults() - context = bound.arguments.copy() - - # call the function to see if it produces an updated objective - maybe_coro = fn(*args, **kwargs) - if asyncio.iscoroutine(maybe_coro): - result = run_coro_as_sync(maybe_coro) + if name is None: + task_name = func.__name__ else: - result = maybe_coro - if result is not None: - context["Additional context"] = result + task_name = name - return Task( - objective=objective, - instructions=instructions, - name=name, - agents=agents, - context=context, - result_type=result_type, - interactive=interactive or False, - tools=tools or [], - **task_kwargs, - ) + if objective is None: + task_objective = func.__doc__ or "" + else: + task_objective = objective - if asyncio.iscoroutinefunction(fn): + result_type = func.__annotations__.get("return") - @functools.wraps(fn) - async def wrapper(*args, **kwargs): - task = _get_task(*args, **kwargs) - return await task.run_async() - else: + def _get_task(*args, **kwargs) -> Task: + bound = sig.bind(*args, **kwargs) + bound.apply_defaults() + context = bound.arguments.copy() + + maybe_coro = func(*args, **kwargs) + if asyncio.iscoroutine(maybe_coro): + result = run_coro_as_sync(maybe_coro) + else: + result = maybe_coro + if result is not None: + context["Additional context"] = result + + return Task( + objective=task_objective, + instructions=instructions, + name=task_name, + agents=agents, + context=context, + result_type=result_type, + interactive=interactive or False, + tools=tools or [], + **task_kwargs, + ) + + if asyncio.iscoroutinefunction(func): + + @functools.wraps(func) + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: # type: ignore + task = _get_task(*args, **kwargs) + return await task.run_async() # type: ignore + else: - @functools.wraps(fn) - def wrapper(*args, **kwargs): - task = _get_task(*args, **kwargs) - return task.run() + @functools.wraps(func) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + task = _get_task(*args, **kwargs) + return task.run() # type: ignore - prefect_wrapper = prefect_task( - timeout_seconds=timeout_seconds, - retries=retries, - retry_delay_seconds=retry_delay_seconds, - )(wrapper) + prefect_wrapper = prefect_task( + timeout_seconds=timeout_seconds, + retries=retries, + retry_delay_seconds=retry_delay_seconds, + )(wrapper) - # store the `as_task` method for loading the task object - prefect_wrapper.as_task = _get_task + setattr(prefect_wrapper, "as_task", _get_task) + return cast(PrefectTask[P, R], prefect_wrapper) - return cast(Callable[[Callable[..., Any]], Task], prefect_wrapper) + if fn is None: + return decorator + return decorator(fn) # type: ignore From ce258927dda97368421cc9a258c990552b1def78 Mon Sep 17 00:00:00 2001 From: zzstoatzz Date: Wed, 13 Nov 2024 01:08:56 -0600 Subject: [PATCH 7/7] spiffy doesnt need to be a cf flow wrap it in task name --- examples/read_hn.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/examples/read_hn.py b/examples/read_hn.py index e3dff0bd..75acb0f0 100644 --- a/examples/read_hn.py +++ b/examples/read_hn.py @@ -8,11 +8,11 @@ from typing import Annotated, TypedDict import httpx +from prefect import flow, task from prefect.artifacts import create_markdown_artifact -from prefect.blocks.system import Secret from prefect.docker import DockerImage from prefect.runner.storage import GitCredentials, GitRepository -from pydantic import AnyHttpUrl, Field +from pydantic import AnyHttpUrl, Field, TypeAdapter import controlflow as cf @@ -32,24 +32,22 @@ def analyze_article(id: str) -> HNArticleSummary: return f"here is the article content: {content}" # type: ignore -@cf.task() -def summarize_article_briefs( - briefs: list[HNArticleSummary], -) -> Annotated[str, Field(description="markdown summary")]: - """Summarize a list of article briefs""" - return f"here are the article briefs: {briefs}" # type: ignore - - -@cf.flow(retries=2) -def analyze_hn_articles(n: int = 5): +@flow(retries=2) +def analyze_hn_articles(n: int = 5) -> list[HNArticleSummary]: top_article_ids = httpx.get( "https://hacker-news.firebaseio.com/v0/topstories.json" ).json()[:n] briefs = analyze_article.map(top_article_ids).result() create_markdown_artifact( key="hn-article-exec-summary", - markdown=summarize_article_briefs(briefs), + markdown=task(task_run_name=f"make summary of {len(briefs)} articles")(cf.run)( + objective="markdown summary of all extracted article briefs", + result_type=Annotated[str, Field(description="markdown summary")], + context=dict(briefs=briefs), + ), + description="executive summary of all extracted article briefs", ) + return briefs if __name__ == "__main__": @@ -96,4 +94,5 @@ def analyze_hn_articles(n: int = 5): ) else: print(f"just running the code\n\n\n\n\n\n") - analyze_hn_articles(5) + briefs = analyze_hn_articles(5) # type: ignore + TypeAdapter(list[HNArticleSummary]).validate_python(briefs)