Skip to content

Feature/snowflake s3 stage operations#18

Closed
abhishek-pattern wants to merge 100 commits intomainfrom
feature/snowflake-s3-stage-operations
Closed

Feature/snowflake s3 stage operations#18
abhishek-pattern wants to merge 100 commits intomainfrom
feature/snowflake-s3-stage-operations

Conversation

@abhishek-pattern
Copy link

No description provided.

- Add query_pandas_from_snowflake_via_s3_stage() for efficient large query results (>10M rows)
- Add publish_pandas_via_s3_stage() for efficient large DataFrame writes (>10M rows)
- Add make_batch_predictions_from_snowflake_via_s3_stage() for batch ML predictions
- Support dev/prod environment switching via current.is_production
- Add helper functions for S3 operations and SQL generation
- Add metaflow_s3/utils.py with S3 utility functions
- Add comprehensive functional tests
- Integrate with existing Metaflow card system and cost tracking
…function for improved readability and maintainability
Copilot AI review requested due to automatic review settings February 18, 2026 09:27
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an S3-stage based path for moving data between Pandas and Snowflake in Metaflow flows (via Snowflake COPY INTO + S3 parquet), and updates CI to run tests in parallel.

Changes:

  • Add S3/Snowflake stage utilities (copy_snowflake_to_s3, copy_s3_to_snowflake) and S3 dataframe read/write helpers.
  • Extend publish_pandas / query_pandas_from_snowflake with a use_s3_stage option.
  • Add a functional Metaflow test for the S3-stage flow and enable pytest-xdist in CI.

Reviewed changes

Copilot reviewed 9 out of 10 changed files in this pull request and generated 19 comments.

Show a summary per file
File Description
tests/functional_tests/metaflow/test__pandas_s3.py New functional flow test exercising publish_pandas/query_pandas_from_snowflake via S3-stage.
src/ds_platform_utils/metaflow/s3_stage.py New Snowflake↔S3 stage COPY helpers and schema inference.
src/ds_platform_utils/metaflow/s3.py New Metaflow S3 client helpers for parquet IO and folder chunking.
src/ds_platform_utils/metaflow/pandas.py Adds use_s3_stage option for publish/query and updates schema selection to DEV_SCHEMA.
src/ds_platform_utils/metaflow/batch_inference.py New batch inference pipeline leveraging S3 as an intermediate store.
src/ds_platform_utils/metaflow/_consts.py Replaces NON_PROD_SCHEMA with DEV_SCHEMA; adds S3 stage constants.
src/ds_platform_utils/_snowflake/write_audit_publish.py Updates to new DEV_SCHEMA constant.
pyproject.toml Version bump to 0.4.0; adds pytest-xdist.
.github/workflows/ci-cd-ds-platform-utils.yaml Runs pytest with xdist (-n auto).
Comments suppressed due to low confidence (1)

src/ds_platform_utils/metaflow/pandas.py:208

  • Docstring says non-prod schema will be NON_PROD_SCHEMA, but that constant no longer exists and the code uses DEV_SCHEMA. Update this note to match the new constant naming.
    **NOTE:** If the query contains `{schema}` placeholders, they will be replaced with the appropriate schema name.
    The schema name will be determined based on the current environment:
    - If in production, it will be set to `PROD_SCHEMA`.
    - If not in production, it will be set to `NON_PROD_SCHEMA`.
    - If the query does not contain `{schema}` placeholders, the schema name will not be modified.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +3 to +11

import pandas as pd
from metaflow import S3, current


def _get_metaflow_s3_client():
return S3(role="arn:aws:iam::209479263910:role/outerbounds_iam_role")


Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The S3 client role ARN is hardcoded inside the library. This makes the package environment/account-specific and hard to reuse or test. Consider moving it to configuration (env var / Metaflow config) or at least into _consts.py so it can be overridden per environment.

Suggested change
import pandas as pd
from metaflow import S3, current
def _get_metaflow_s3_client():
return S3(role="arn:aws:iam::209479263910:role/outerbounds_iam_role")
import os
import pandas as pd
from metaflow import S3, current
def _get_metaflow_s3_client():
"""
Return a Metaflow S3 client.
The role ARN can be configured via the METAFLOW_S3_ROLE_ARN environment variable.
If not set, the default Metaflow credential resolution is used.
"""
role_arn = os.getenv("METAFLOW_S3_ROLE_ARN")
if role_arn:
return S3(role=role_arn)
return S3()

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter name table_defination is misspelled. Since this is part of the public API for S3-stage loading, consider renaming it consistently across copy_s3_to_snowflake, _generate_s3_to_snowflake_copy_query, and publish_pandas (e.g., table_definition or table_schema) to avoid propagating the typo.

Suggested change
table_defination: Optional[List[Tuple[str, str]]] = None,
table_definition: Optional[List[Tuple[str, str]]] = None,

Copilot uses AI. Check for mistakes.
Comment on lines 130 to +138
conn: SnowflakeConnection = get_snowflake_connection(use_utc)

# set warehouse
if warehouse is not None:
_execute_sql(conn, f"USE WAREHOUSE {warehouse};")
_execute_sql(conn, f"USE SCHEMA PATTERN_DB.{schema};")

if use_s3_stage:
s3_bucket, _ = _get_s3_config(current.is_production)
data_folder = "publish_" + str(pd.Timestamp.now().strftime("%Y%m%d_%H%M%S_%f"))
s3_path = f"{s3_bucket}/{S3_DATA_FOLDER}/{data_folder}"
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When use_s3_stage=True, a Snowflake connection is opened and USE WAREHOUSE/SCHEMA is executed, but the connection is not used for the actual load (which opens its own connection in copy_s3_to_snowflake). Consider creating the connection only in the write_pandas branch (or reusing the same connection in copy_s3_to_snowflake) to avoid an unnecessary connection open/close.

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The public API parameter is named table_defination (misspelled) but the docstring below refers to table_schema, and callers/tests are using table_schema. Rename this parameter consistently (e.g., table_schema or table_definition) and update the call into copy_s3_to_snowflake accordingly.

Suggested change
table_defination: Optional[List[Tuple[str, str]]] = None,
table_schema: Optional[List[Tuple[str, str]]] = None,

Copilot uses AI. Check for mistakes.
Comment on lines +52 to +55
with tempfile.NamedTemporaryFile(
prefix=str(Path(current.tempdir).absolute()) + "/", # type: ignore
suffix=".parquet",
) as tmp_file:
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NamedTemporaryFile(prefix=str(Path(current.tempdir).absolute()) + "/") uses prefix to try to control the directory, but prefix is part of the filename and including path separators can cause invalid paths at runtime. Use the dir= parameter to place the temp file in current.tempdir and keep prefix as a simple name prefix.

Copilot uses AI. Check for mistakes.
Comment on lines 105 to 106
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TRUNCATE TABLE IF EXISTS is not valid Snowflake syntax. If you need conditional behavior, check existence first; otherwise use TRUNCATE TABLE <name>; (or DELETE FROM <name>; if you need transactional semantics).

Suggested change
print(f"Generated TRUNCATE TABLE query:\nTRUNCATE TABLE IF EXISTS {table_name};")
sql_statements.append(f"TRUNCATE TABLE IF EXISTS {table_name};")
print(f"Generated TRUNCATE TABLE query:\nTRUNCATE TABLE {table_name};")
sql_statements.append(f"TRUNCATE TABLE {table_name};")

Copilot uses AI. Check for mistakes.
FILE_FORMAT = (TYPE = 'parquet')
MAX_FILE_SIZE = {max_file_size}
SINGLE = {single}
HEADER = TRUE
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HEADER = TRUE is a CSV-specific unload option and is not applicable to Parquet exports. Remove it from the COPY INTO statement to avoid Snowflake rejecting the query or silently ignoring an irrelevant setting.

Suggested change
HEADER = TRUE

Copilot uses AI. Check for mistakes.
Comment on lines 140 to 144
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_generate_snowflake_to_s3_copy_query is called with batch_size_in_mb=..., but the current helper signature (in s3_stage.py) only accepts query and snowflake_stage_path. Either update the helper to support batch sizing, or remove this argument and adjust file sizing via MAX_FILE_SIZE logic in the helper.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments