Feature/snowflake s3 stage operations#18
Conversation
- Add query_pandas_from_snowflake_via_s3_stage() for efficient large query results (>10M rows) - Add publish_pandas_via_s3_stage() for efficient large DataFrame writes (>10M rows) - Add make_batch_predictions_from_snowflake_via_s3_stage() for batch ML predictions - Support dev/prod environment switching via current.is_production - Add helper functions for S3 operations and SQL generation - Add metaflow_s3/utils.py with S3 utility functions - Add comprehensive functional tests - Integrate with existing Metaflow card system and cost tracking
… functional tests
…r schema from DataFrame
…te table creation logic
…function for improved readability and maintainability
…for improved clarity and performance
…in batch inference
…using S3 file retrieval function
…ient assignment for improved clarity
…for improved performance
…tegrate with multiprocessing
…roved parallel processing
…r for improved parallel processing
…ved concurrency in batch inference
…e and temporary file handling
…remove unused functions
There was a problem hiding this comment.
Pull request overview
Adds an S3-stage based path for moving data between Pandas and Snowflake in Metaflow flows (via Snowflake COPY INTO + S3 parquet), and updates CI to run tests in parallel.
Changes:
- Add S3/Snowflake stage utilities (
copy_snowflake_to_s3,copy_s3_to_snowflake) and S3 dataframe read/write helpers. - Extend
publish_pandas/query_pandas_from_snowflakewith ause_s3_stageoption. - Add a functional Metaflow test for the S3-stage flow and enable pytest-xdist in CI.
Reviewed changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 19 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/functional_tests/metaflow/test__pandas_s3.py | New functional flow test exercising publish_pandas/query_pandas_from_snowflake via S3-stage. |
| src/ds_platform_utils/metaflow/s3_stage.py | New Snowflake↔S3 stage COPY helpers and schema inference. |
| src/ds_platform_utils/metaflow/s3.py | New Metaflow S3 client helpers for parquet IO and folder chunking. |
| src/ds_platform_utils/metaflow/pandas.py | Adds use_s3_stage option for publish/query and updates schema selection to DEV_SCHEMA. |
| src/ds_platform_utils/metaflow/batch_inference.py | New batch inference pipeline leveraging S3 as an intermediate store. |
| src/ds_platform_utils/metaflow/_consts.py | Replaces NON_PROD_SCHEMA with DEV_SCHEMA; adds S3 stage constants. |
| src/ds_platform_utils/_snowflake/write_audit_publish.py | Updates to new DEV_SCHEMA constant. |
| pyproject.toml | Version bump to 0.4.0; adds pytest-xdist. |
| .github/workflows/ci-cd-ds-platform-utils.yaml | Runs pytest with xdist (-n auto). |
Comments suppressed due to low confidence (1)
src/ds_platform_utils/metaflow/pandas.py:208
- Docstring says non-prod schema will be
NON_PROD_SCHEMA, but that constant no longer exists and the code usesDEV_SCHEMA. Update this note to match the new constant naming.
**NOTE:** If the query contains `{schema}` placeholders, they will be replaced with the appropriate schema name.
The schema name will be determined based on the current environment:
- If in production, it will be set to `PROD_SCHEMA`.
- If not in production, it will be set to `NON_PROD_SCHEMA`.
- If the query does not contain `{schema}` placeholders, the schema name will not be modified.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| import pandas as pd | ||
| from metaflow import S3, current | ||
|
|
||
|
|
||
| def _get_metaflow_s3_client(): | ||
| return S3(role="arn:aws:iam::209479263910:role/outerbounds_iam_role") | ||
|
|
||
|
|
There was a problem hiding this comment.
The S3 client role ARN is hardcoded inside the library. This makes the package environment/account-specific and hard to reuse or test. Consider moving it to configuration (env var / Metaflow config) or at least into _consts.py so it can be overridden per environment.
| import pandas as pd | |
| from metaflow import S3, current | |
| def _get_metaflow_s3_client(): | |
| return S3(role="arn:aws:iam::209479263910:role/outerbounds_iam_role") | |
| import os | |
| import pandas as pd | |
| from metaflow import S3, current | |
| def _get_metaflow_s3_client(): | |
| """ | |
| Return a Metaflow S3 client. | |
| The role ARN can be configured via the METAFLOW_S3_ROLE_ARN environment variable. | |
| If not set, the default Metaflow credential resolution is used. | |
| """ | |
| role_arn = os.getenv("METAFLOW_S3_ROLE_ARN") | |
| if role_arn: | |
| return S3(role=role_arn) | |
| return S3() |
There was a problem hiding this comment.
The parameter name table_defination is misspelled. Since this is part of the public API for S3-stage loading, consider renaming it consistently across copy_s3_to_snowflake, _generate_s3_to_snowflake_copy_query, and publish_pandas (e.g., table_definition or table_schema) to avoid propagating the typo.
| table_defination: Optional[List[Tuple[str, str]]] = None, | |
| table_definition: Optional[List[Tuple[str, str]]] = None, |
| conn: SnowflakeConnection = get_snowflake_connection(use_utc) | ||
|
|
||
| # set warehouse | ||
| if warehouse is not None: | ||
| _execute_sql(conn, f"USE WAREHOUSE {warehouse};") | ||
| _execute_sql(conn, f"USE SCHEMA PATTERN_DB.{schema};") | ||
|
|
||
| if use_s3_stage: | ||
| s3_bucket, _ = _get_s3_config(current.is_production) | ||
| data_folder = "publish_" + str(pd.Timestamp.now().strftime("%Y%m%d_%H%M%S_%f")) | ||
| s3_path = f"{s3_bucket}/{S3_DATA_FOLDER}/{data_folder}" |
There was a problem hiding this comment.
When use_s3_stage=True, a Snowflake connection is opened and USE WAREHOUSE/SCHEMA is executed, but the connection is not used for the actual load (which opens its own connection in copy_s3_to_snowflake). Consider creating the connection only in the write_pandas branch (or reusing the same connection in copy_s3_to_snowflake) to avoid an unnecessary connection open/close.
There was a problem hiding this comment.
The public API parameter is named table_defination (misspelled) but the docstring below refers to table_schema, and callers/tests are using table_schema. Rename this parameter consistently (e.g., table_schema or table_definition) and update the call into copy_s3_to_snowflake accordingly.
| table_defination: Optional[List[Tuple[str, str]]] = None, | |
| table_schema: Optional[List[Tuple[str, str]]] = None, |
| with tempfile.NamedTemporaryFile( | ||
| prefix=str(Path(current.tempdir).absolute()) + "/", # type: ignore | ||
| suffix=".parquet", | ||
| ) as tmp_file: |
There was a problem hiding this comment.
NamedTemporaryFile(prefix=str(Path(current.tempdir).absolute()) + "/") uses prefix to try to control the directory, but prefix is part of the filename and including path separators can cause invalid paths at runtime. Use the dir= parameter to place the temp file in current.tempdir and keep prefix as a simple name prefix.
There was a problem hiding this comment.
TRUNCATE TABLE IF EXISTS is not valid Snowflake syntax. If you need conditional behavior, check existence first; otherwise use TRUNCATE TABLE <name>; (or DELETE FROM <name>; if you need transactional semantics).
| print(f"Generated TRUNCATE TABLE query:\nTRUNCATE TABLE IF EXISTS {table_name};") | |
| sql_statements.append(f"TRUNCATE TABLE IF EXISTS {table_name};") | |
| print(f"Generated TRUNCATE TABLE query:\nTRUNCATE TABLE {table_name};") | |
| sql_statements.append(f"TRUNCATE TABLE {table_name};") |
| FILE_FORMAT = (TYPE = 'parquet') | ||
| MAX_FILE_SIZE = {max_file_size} | ||
| SINGLE = {single} | ||
| HEADER = TRUE |
There was a problem hiding this comment.
HEADER = TRUE is a CSV-specific unload option and is not applicable to Parquet exports. Remove it from the COPY INTO statement to avoid Snowflake rejecting the query or silently ignoring an irrelevant setting.
| HEADER = TRUE |
There was a problem hiding this comment.
_generate_snowflake_to_s3_copy_query is called with batch_size_in_mb=..., but the current helper signature (in s3_stage.py) only accepts query and snowflake_stage_path. Either update the helper to support batch sizing, or remove this argument and adjust file sizing via MAX_FILE_SIZE logic in the helper.
…inference with S3 and Snowflake
…nce with S3 and Snowflake
…_ids for foreach processing
…rocessing and improve batch handling
…sing and publishing
…ndling and initialization
…ne for consistency
…onsistent path formatting
…ted in BatchInferencePipeline
…bility in batch inference and S3 operations
No description provided.