Skip to content

Megatron streaming dataset#976

Open
bodsul wants to merge 2 commits intomosaicml:mainfrom
Zyphra:megatron_streaming_dataset
Open

Megatron streaming dataset#976
bodsul wants to merge 2 commits intomosaicml:mainfrom
Zyphra:megatron_streaming_dataset

Conversation

@bodsul
Copy link

@bodsul bodsul commented Feb 12, 2026

Description of changes

PR that addresses this issue #827 and #397 more generally for Megatron and also a framework for arbitrary N-D parallelisms

At a very high level StreamingDataset with multiple workers via StreamingDataloader:

  1. Assumes all ranks in a multinode-multigpu setup need to iterate through a potentially remote data stream
  2. Sets up a local leader per node to handle downloading index.json and setting up resources like shms. This is the essence of unique_rank_world and unique_worker world for the single worker and multiworker case respectively.
  3. During initialization i.e when StreamingDataset.__init__ is called dist.barrier() is used for synchronization
  4. At the beginning of StreamingDataset.__iter__ parallel_worker_world is set up based on parallel_rank world and num_workers to determine how the data stream is sharded among dataloader workers

The above setup is valid for data parallelisms like vanilla DP, Zero, FSDP, HSDP etc.

Now two assumptions in the above setup can break down for general N-D parallelisms. These are:

  1. All ranks participate in dataloader instantiation and iteration
  2. All ranks iterate through different shards of the data stream

Without loss of generality we can assume that a rank initializes the dataloader iff it also iterates through it.

The replicate argument is an attempt to address issue 2, however it is not general enough since it assumes contiguous gpus stream the same data shards in the same order.

A bigger issue is that replicate does not address issue 1 since vanilla dist.barrier() waits for all ranks.

The solutions to the above issues:

  1. Let the distrubuted trainer supply a policy of which ranks actually need to instantiate the dataloader. Let's call the corresponding process group of such ranks the dl_group,
    then we construct the unique_rank_world based on this info, assigning one process in the dl_group per node to be a local leader and replace all dist.barrier() calls with
    dist.barrier(dl_group)

  2. For each rank in the dl_group we use only the dp_rank and dp_world_size to construct the parallel_rank_world

In some cases the N-D parallelism policy might also require that certain ranks iterate through multiple copies of identical data streams, for example
in virtual pipeline parallelism in Megatron. In this case the training code can simply instantiate multiple StreamingDataset instances with identical arguments
but with different local_caches. Ditto StreamingDataloader instances.

Tests

Currently model independent demo and tests is in megatron_dataset_demo.py and can be ran with torchrun in a single-node or multi-node setting. This requires installing Megatron-LM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant