Open
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description of changes
PR that addresses this issue #827 and #397 more generally for Megatron and also a framework for arbitrary N-D parallelisms
At a very high level
StreamingDatasetwith multiple workers viaStreamingDataloader:StreamingDataset.__init__is calleddist.barrier()is used for synchronizationStreamingDataset.__iter__parallel_worker_world is set up based onparallel_rank worldandnum_workersto determine how the data stream is sharded among dataloader workersThe above setup is valid for data parallelisms like vanilla DP, Zero, FSDP, HSDP etc.
Now two assumptions in the above setup can break down for general N-D parallelisms. These are:
Without loss of generality we can assume that a rank initializes the dataloader iff it also iterates through it.
The
replicateargument is an attempt to address issue 2, however it is not general enough since it assumes contiguous gpus stream the same data shards in the same order.A bigger issue is that replicate does not address issue 1 since vanilla
dist.barrier()waits for all ranks.The solutions to the above issues:
Let the distrubuted trainer supply a policy of which ranks actually need to instantiate the dataloader. Let's call the corresponding process group of such ranks the
dl_group,then we construct the
unique_rank_worldbased on this info, assigning one process in thedl_groupper node to be a local leader and replace alldist.barrier()calls withdist.barrier(dl_group)For each rank in the dl_group we use only the
dp_rankanddp_world_sizeto construct theparallel_rank_worldIn some cases the N-D parallelism policy might also require that certain ranks iterate through multiple copies of identical data streams, for example
in virtual pipeline parallelism in Megatron. In this case the training code can simply instantiate multiple StreamingDataset instances with identical arguments
but with different local_caches. Ditto
StreamingDataloaderinstances.Tests
Currently model independent demo and tests is in
megatron_dataset_demo.pyand can be ran with torchrun in a single-node or multi-node setting. This requires installing Megatron-LM.