Skip to content

Optimise dataframe_to_mds for Large-Scale Datasets via intermediate Aggressive Disk Cleanup and Distributed Metadata Handling#971

Open
AbhishekKumarSingh wants to merge 1 commit intomosaicml:mainfrom
AbhishekKumarSingh:converter_fix
Open

Optimise dataframe_to_mds for Large-Scale Datasets via intermediate Aggressive Disk Cleanup and Distributed Metadata Handling#971
AbhishekKumarSingh wants to merge 1 commit intomosaicml:mainfrom
AbhishekKumarSingh:converter_fix

Conversation

@AbhishekKumarSingh
Copy link

@AbhishekKumarSingh AbhishekKumarSingh commented Feb 2, 2026

Optimise dataframe_to_mds for Large-Scale Datasets via intermediate Aggressive Disk Cleanup and Distributed Metadata Handling

Description of changes:

This PR introduces a significant architectural optimisation to the dataframe_to_mds conversion process. The current implementation often fails when processing massive datasets due to worker disk exhaustion, no space left on disk, and plausible Driver-side Out-Of-Memory (OOM) errors. This update shifts the conversion to a "No-Collect" strategy, ensuring constant-time memory and disk stability.

Key Enhancements

🚀 Scalability & Memory Efficiency

  • No-Collect Strategy: Replaced the Driver-side .collect() of partition data with a distributed metadata buffering system. Worker metadata is now staged in a temporary remote Parquet directory, preventing the Driver from crashing on high-partition counts.
  • Incremental Index Merging: The Driver now only downloads lightweight index.json files for the final merge, keeping memory overhead $O(1)$ relative to total dataset size.

💾 Aggressive Disk Management

  • Worker Disk Optimization: Enforces keep_local=False within worker tasks. Instead of retaining all shards until the job completes, shards are uploaded to remote_root and immediately purged from the executor's local disk.
  • Disk Safety: This allows the job to process datasets significantly larger than the combined disk capacity of the worker nodes, eliminating No space left on device errors.

🛡️ Robustness & Cleanup

  • Fault-Tolerant Cleanup: Integrated try...finally blocks to ensure that all temporary artifacts—including Driver-side staging folders and remote metadata dumps are purged even if the Spark job fails.
  • Task Isolation: Utilizes unique task-based subdirectories to prevent file contention and race conditions during high-concurrency writes to cloud object stores.

Architecture Overview

  1. Driver Setup: Infers schema and sets up a _spark_metadata path.
  2. Distributed Write (Workers): Each task writes data to a unique task_{id} subdirectory locally.
    • Optimisation: If remote present, then uploads shards to the remote object store immediately and cleans intermediate local shards on the workers.
  3. Merge Phase (Driver):
    • The driver reads the lightweight Parquet metadata.
    • Downloads only the small index.json files from the remote storage.
    • Merges them into a master index.json and uploads it.
    • Performs cleanup.

This architecture maintains constant worker disk usage by purging shards immediately after upload, ensuring stability regardless of total dataset size.

Issue #, if available:

Issue#970

Merge Checklist:

Put an x without space in the boxes that apply. If you are unsure about any checklist, please don't hesitate to ask. We are here to help! This is simply a reminder of what we are going to look for before merging your pull request.

General

  • I have read the contributor guidelines
  • This is a documentation change or typo fix. If so, skip the rest of this checklist.
  • I certify that the changes I am introducing will be backward compatible, and I have discussed concerns about this, if any, with the MosaicML team.
  • I have updated any necessary documentation, including README and API docs (if appropriate).

Tests

  • I ran pre-commit on my change. (check out the pre-commit section of prerequisites)
  • I have added tests that prove my fix is effective or that my feature works (if appropriate).
  • I ran the tests locally to make sure it pass. (check out testing)
  • I have added unit and/or integration tests as appropriate to ensure backward compatibility of the changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant