From d0d8d68000d9e63f036d0031d81e8bc39a2fbff2 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 3 Feb 2025 06:47:56 +0000 Subject: [PATCH 1/8] tune memory --- examples/ppo_trainer/run_deepseek_megatron.sh | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/examples/ppo_trainer/run_deepseek_megatron.sh b/examples/ppo_trainer/run_deepseek_megatron.sh index c0194c04..8db0a22a 100644 --- a/examples/ppo_trainer/run_deepseek_megatron.sh +++ b/examples/ppo_trainer/run_deepseek_megatron.sh @@ -2,6 +2,11 @@ set -x # the config file used: verl/trainer/main_ppo/config/ppo_megatron_trainer.yaml +huggingface-cli download deepseek-ai/deepseek-llm-7b-chat --local-dir $HOME/models/deepseek-llm-7b-chat + +# ``actor_rollout_ref.rollout.tensor_model_parallel_size`` in theory could be different from +# ``**.megatron.tensor_model_parallel_size`` + python3 -m verl.trainer.main_ppo --config-path=config \ --config-name='ppo_megatron_trainer.yaml'\ data.train_files=$HOME/data/gsm8k/train.parquet \ @@ -10,19 +15,22 @@ python3 -m verl.trainer.main_ppo --config-path=config \ data.val_batch_size=1312 \ data.max_prompt_length=512 \ data.max_response_length=512 \ - actor_rollout_ref.model.path=deepseek-ai/deepseek-coder-6.7b-instruct \ + actor_rollout_ref.model.path=$HOME/models/deepseek-llm-7b-chat \ actor_rollout_ref.actor.optim.lr=2e-6 \ actor_rollout_ref.actor.ppo_mini_batch_size=256 \ - actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=8 \ + actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \ + actor_rollout_ref.actor.megatron.tensor_model_parallel_size=4 \ actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8 \ actor_rollout_ref.rollout.tensor_model_parallel_size=2 \ actor_rollout_ref.rollout.name=vllm \ actor_rollout_ref.rollout.gpu_memory_utilization=0.5 \ actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=16 \ + actor_rollout_ref.ref.megatron.tensor_model_parallel_size=4 \ critic.optim.lr=2e-5 \ - critic.model.path=deepseek-ai/deepseek-coder-6.7b-instruct \ + critic.model.path=$HOME/models/deepseek-llm-7b-chat \ critic.model.enable_gradient_checkpointing=False \ - critic.ppo_micro_batch_size_per_gpu=8 \ + critic.ppo_micro_batch_size_per_gpu=4 \ + critic.megatron.tensor_model_parallel_size=4 \ algorithm.kl_ctrl.kl_coef=0.001 \ trainer.critic_warmup=0 \ trainer.logger=['console','wandb'] \ From 3e31196bced3fcb4cf6ca226508eaceb893355ee Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 5 Feb 2025 18:41:47 +0000 Subject: [PATCH 2/8] add comments --- examples/ppo_trainer/run_deepseek_megatron.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/ppo_trainer/run_deepseek_megatron.sh b/examples/ppo_trainer/run_deepseek_megatron.sh index 8db0a22a..c41aae35 100644 --- a/examples/ppo_trainer/run_deepseek_megatron.sh +++ b/examples/ppo_trainer/run_deepseek_megatron.sh @@ -1,12 +1,13 @@ set -x -# the config file used: verl/trainer/main_ppo/config/ppo_megatron_trainer.yaml - +# prepare pre-trained model ckpt huggingface-cli download deepseek-ai/deepseek-llm-7b-chat --local-dir $HOME/models/deepseek-llm-7b-chat # ``actor_rollout_ref.rollout.tensor_model_parallel_size`` in theory could be different from # ``**.megatron.tensor_model_parallel_size`` +# the config file used: verl/trainer/main_ppo/config/ppo_megatron_trainer.yaml + python3 -m verl.trainer.main_ppo --config-path=config \ --config-name='ppo_megatron_trainer.yaml'\ data.train_files=$HOME/data/gsm8k/train.parquet \ From 02ff6a3f07e6a9b2ec2ce22e5171b86b17206011 Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 5 Feb 2025 19:23:43 +0000 Subject: [PATCH 3/8] update docs --- docs/experiment/ppo.rst | 41 +++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/docs/experiment/ppo.rst b/docs/experiment/ppo.rst index d73f84bf..6922313c 100644 --- a/docs/experiment/ppo.rst +++ b/docs/experiment/ppo.rst @@ -11,22 +11,31 @@ Assuming GSM8k dataset is preprocess via ``python3 examples/data_preprocess/gsm8 Refer to the table below to reproduce PPO training from different pre-trained models. .. _Huggingface: https://huggingface.co/google/gemma-2-2b-it#benchmark-results -.. _SFT Command and logs: https://github.com/eric-haibin-lin/verl-data/blob/experiments/gsm8k/gemma-2-2b-it-sft-0.411.log -.. _SFT+PPO Command and logs: https://github.com/eric-haibin-lin/verl-data/blob/experiments/gsm8k/gemma-2-2b-it-ppo-bsz512_4-prompt1024-resp-512-0.640.log +.. _SFT Command and Logs: https://github.com/eric-haibin-lin/verl-data/blob/experiments/gsm8k/gemma-2-2b-it-sft-0.411.log +.. _SFT+PPO Command and Logs: https://github.com/eric-haibin-lin/verl-data/blob/experiments/gsm8k/gemma-2-2b-it-ppo-bsz512_4-prompt1024-resp-512-0.640.log .. _wandb: https://api.wandb.ai/links/verl-team/h7ux8602 .. _Qwen Blog: https://qwenlm.github.io/blog/qwen2.5-llm/ -.. _PPO Command and logs: https://github.com/eric-haibin-lin/verl-data/blob/experiments/gsm8k/Qwen2.5-0.5B-bsz256_2-prompt1024-resp512-0.567.log +.. _PPO Command and Logs: https://github.com/eric-haibin-lin/verl-data/blob/experiments/gsm8k/Qwen2.5-0.5B-bsz256_2-prompt1024-resp512-0.567.log +.. _Megatron PPO Command and Logs: https://github.com/eric-haibin-lin/verl-data/blob/experiments/gsm8k/deepseek-llm-7b-chat-megatron-bsz256_4-prompt512-resp512-0.695.log +.. _Qwen7b GRPO Script: https://github.com/volcengine/verl/blob/a65c9157bc0b85b64cd753de19f94e80a11bd871/examples/grpo_trainer/run_qwen2-7b_seq_balance.sh +.. _Megatron wandb: https://wandb.ai/verl-team/verl_megatron_gsm8k_examples/runs/10fetyr3 -+----------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ -| Model | Method | Test score | Details | -+============================+========================+============+=====================+=========================================================================+ -| google/gemma-2-2b-it | pretrained checkpoint | 23.9 | `Huggingface`_ | -+----------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ -| google/gemma-2-2b-it | SFT | 52.06 | `SFT Command and logs`_ | -+----------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ -| google/gemma-2-2b-it | SFT + PPO | 64.02 | `SFT+PPO Command and logs`_, `wandb`_ | -+----------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ -| Qwen/Qwen2.5-0.5B-Instruct | pretrained checkpoint | 36.4 | `Qwen Blog`_ | -+----------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ -| Qwen/Qwen2.5-0.5B-Instruct | PPO | 56.7 | `PPO Command and logs`_ | -+----------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ \ No newline at end of file ++----------------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ +| Model | Method | Test score | Details | ++==================================+========================+============+=====================+=========================================================================+ +| google/gemma-2-2b-it | pretrained checkpoint | 23.9 | `Huggingface`_ | ++----------------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ +| google/gemma-2-2b-it | SFT | 52.06 | `SFT Command and Logs`_ | ++----------------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ +| google/gemma-2-2b-it | SFT + PPO | 64.02 | `SFT+PPO Command and Logs`_, `wandb`_ | ++----------------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ +| Qwen/Qwen2.5-0.5B-Instruct | pretrained checkpoint | 36.4 | `Qwen Blog`_ | ++----------------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ +| Qwen/Qwen2.5-0.5B-Instruct | PPO | 56.7 | `PPO Command and Logs`_ | ++----------------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ +| deepseek-ai/deepseek-llm-7b-chat | PPO | 69.5[1] | `Megatron PPO Command and Logs`_, `Megatron wandb`_ | ++----------------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ +| Qwen/Qwen2-7B-Instruct | GRPO | 89 | `Qwen7b PPO Command and Logs`_ | ++----------------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ + +.. [1] During the evaluation, we have only extracted answers following the format "####". A more flexible answer exaction, longer response length and better prompt engineering may lead to higher score. \ No newline at end of file From 87d1d156112bbce151d41537d644ec3c6f6ed6ac Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 5 Feb 2025 19:26:14 +0000 Subject: [PATCH 4/8] update docs --- docs/experiment/ppo.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/experiment/ppo.rst b/docs/experiment/ppo.rst index 6922313c..aab67c4c 100644 --- a/docs/experiment/ppo.rst +++ b/docs/experiment/ppo.rst @@ -33,9 +33,9 @@ Refer to the table below to reproduce PPO training from different pre-trained mo +----------------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ | Qwen/Qwen2.5-0.5B-Instruct | PPO | 56.7 | `PPO Command and Logs`_ | +----------------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ -| deepseek-ai/deepseek-llm-7b-chat | PPO | 69.5[1] | `Megatron PPO Command and Logs`_, `Megatron wandb`_ | +| deepseek-ai/deepseek-llm-7b-chat | PPO | 69.5[1]_ | `Megatron PPO Command and Logs`_, `Megatron wandb`_ | +----------------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ -| Qwen/Qwen2-7B-Instruct | GRPO | 89 | `Qwen7b PPO Command and Logs`_ | +| Qwen/Qwen2-7B-Instruct | GRPO | 89 | `Qwen7b PPO Script`_ | +----------------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ .. [1] During the evaluation, we have only extracted answers following the format "####". A more flexible answer exaction, longer response length and better prompt engineering may lead to higher score. \ No newline at end of file From 71b8cd99a5e2c74fd0a6b1ede89b8b62df3d0d2b Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 5 Feb 2025 19:27:22 +0000 Subject: [PATCH 5/8] update docs --- docs/experiment/ppo.rst | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/experiment/ppo.rst b/docs/experiment/ppo.rst index aab67c4c..1877dcae 100644 --- a/docs/experiment/ppo.rst +++ b/docs/experiment/ppo.rst @@ -33,9 +33,10 @@ Refer to the table below to reproduce PPO training from different pre-trained mo +----------------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ | Qwen/Qwen2.5-0.5B-Instruct | PPO | 56.7 | `PPO Command and Logs`_ | +----------------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ -| deepseek-ai/deepseek-llm-7b-chat | PPO | 69.5[1]_ | `Megatron PPO Command and Logs`_, `Megatron wandb`_ | +| deepseek-ai/deepseek-llm-7b-chat | PPO | 69.5 [1]_ | `Megatron PPO Command and Logs`_, `Megatron wandb`_ | +----------------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ -| Qwen/Qwen2-7B-Instruct | GRPO | 89 | `Qwen7b PPO Script`_ | +| Qwen/Qwen2-7B-Instruct | GRPO | 89 | `Qwen7b GRPO Script`_ | +----------------------------------+------------------------+------------+-----------------------------------------------------------------------------------------------+ + .. [1] During the evaluation, we have only extracted answers following the format "####". A more flexible answer exaction, longer response length and better prompt engineering may lead to higher score. \ No newline at end of file From da9233c946e61c0135053a7ce96157224733ae5e Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 7 Feb 2025 07:04:54 +0000 Subject: [PATCH 6/8] megatron: drop megatron patch --- .github/workflows/e2e_gsm8k_megatron.yml | 4 + .../llama/megatron/modeling_llama_megatron.py | 76 +++++----- verl/single_controller/ray/megatron.py | 2 + verl/utils/megatron/optimizer.py | 141 ++++++++++++++++++ verl/utils/megatron/pipeline_parallel.py | 14 ++ verl/workers/actor/megatron_actor.py | 22 ++- verl/workers/critic/megatron_critic.py | 21 ++- .../reward_model/megatron/reward_model.py | 13 +- 8 files changed, 246 insertions(+), 47 deletions(-) diff --git a/.github/workflows/e2e_gsm8k_megatron.yml b/.github/workflows/e2e_gsm8k_megatron.yml index 305d1724..5bb69bc3 100644 --- a/.github/workflows/e2e_gsm8k_megatron.yml +++ b/.github/workflows/e2e_gsm8k_megatron.yml @@ -45,5 +45,9 @@ jobs: run: | ray stop --force [ ! -d "$HOME/Megatron-LM" ] && git clone -b core_v0.4.0_verl https://github.com/eric-haibin-lin/Megatron-LM $HOME/Megatron-LM + [ ! -d "$HOME/Megatron-LM-v0.4" ] && git clone -b core_v0.4.0 https://github.com/NVIDIA/Megatron-LM $HOME/Megatron-LM-v0.4 export PYTHONPATH=$PYTHONPATH:$HOME/Megatron-LM + bash tests/e2e/run_deepseek_megatron.sh + - name: Running gsm8k e2e training tests on 8 L20 GPUs with Megatron without patches + export PYTHONPATH=$HOME/Megatron-LM-v0.4:$PYTHONPATH bash tests/e2e/run_deepseek_megatron.sh \ No newline at end of file diff --git a/verl/models/llama/megatron/modeling_llama_megatron.py b/verl/models/llama/megatron/modeling_llama_megatron.py index c693f33c..88ac4234 100644 --- a/verl/models/llama/megatron/modeling_llama_megatron.py +++ b/verl/models/llama/megatron/modeling_llama_megatron.py @@ -84,7 +84,7 @@ def __init__(self, config: LlamaConfig, megatron_config: ModelParallelConfig): embedding_kwargs = tp_utils.get_default_kwargs_for_parallel_embedding() if megatron_config is not None: assert embedding_kwargs.get('config', False), 'must have ModelParallelConfig' - tp_utils.update_kwargs_with_config(embedding_kwargs, self.megatron_config) + tp_utils.update_kwargs_with_config(embedding_kwargs, self.config) self.embed_tokens = tensor_parallel.VocabParallelEmbedding(num_embeddings=config.vocab_size, embedding_dim=config.hidden_size, **embedding_kwargs) @@ -162,7 +162,7 @@ def __init__(self, config: LlamaConfig, megatron_config: ModelParallelConfig): column_kwargs = tp_utils.get_default_kwargs_for_column_parallel_linear() if megatron_config is not None: assert column_kwargs.get('config', False), 'must have ModelParallelConfig' - tp_utils.update_kwargs_with_config(column_kwargs, self.megatron_config) + tp_utils.update_kwargs_with_config(column_kwargs, self.config) self.lm_head = tensor_parallel.ColumnParallelLinear(input_size=config.hidden_size, output_size=config.vocab_size, @@ -225,10 +225,10 @@ def __init__(self, config: LlamaConfig, megatron_config: ModelParallelConfig): self.padding_idx = config.pad_token_id self.vocab_size = config.vocab_size embedding_kwargs = tp_utils.get_default_kwargs_for_parallel_embedding() - self.megatron_config = megatron_config + self.config = megatron_config if megatron_config is not None: assert embedding_kwargs.get('config', False), 'must have ModelParallelConfig' - tp_utils.update_kwargs_with_config(embedding_kwargs, self.megatron_config) + tp_utils.update_kwargs_with_config(embedding_kwargs, self.config) self.embed_tokens = tensor_parallel.VocabParallelEmbedding(num_embeddings=config.vocab_size, embedding_dim=config.hidden_size, **embedding_kwargs) @@ -257,7 +257,7 @@ def forward(self, # (1, total_nnz, hidden_size) -> (total_nnz, 1, hidden_size) -> (total_nnz // sp, 1, hidden_size) inputs_embeds = inputs_embeds.transpose(0, 1) - if self.megatron_config.sequence_parallel: + if self.config.sequence_parallel: inputs_embeds = tensor_parallel.scatter_to_sequence_parallel_region(inputs_embeds) hidden_states = inputs_embeds @@ -278,21 +278,22 @@ def forward(self, class ParallelLlamaForCausalLMRmPad(nn.Module): - def __init__(self, config: LlamaConfig, megatron_config: ModelParallelConfig): + def __init__(self, model_config: LlamaConfig, megatron_config: ModelParallelConfig): super().__init__() - self.config = config - self.megatron_config = megatron_config + # Note(haibin.lin): to be compatible with Megatron APIs, model.config refers to megatron configs + self.config = megatron_config + self.model_config = model_config self.model = ParallelLlamaModelRmPad(config, megatron_config=megatron_config) - self.vocab_size = config.vocab_size + self.vocab_size = model_config.vocab_size self._init_head() def _init_head(self): column_kwargs = tp_utils.get_default_kwargs_for_column_parallel_linear() - if self.megatron_config is not None: + if self.config is not None: assert column_kwargs.get('config', False), 'must have ModelParallelConfig' - tp_utils.update_kwargs_with_config(column_kwargs, self.megatron_config) - self.lm_head = tensor_parallel.ColumnParallelLinear(input_size=self.config.hidden_size, - output_size=self.config.vocab_size, + tp_utils.update_kwargs_with_config(column_kwargs, self.config) + self.lm_head = tensor_parallel.ColumnParallelLinear(input_size=self.model_config.hidden_size, + output_size=self.model_config.vocab_size, bias=False, gather_output=False, skip_bias_add=False, @@ -328,7 +329,7 @@ def forward( # pad input_ids to multiple of tp for all tp ranks # TODO: for better performance, the sp padding should be removed at each layer. Not sure the performance gap - if self.megatron_config.sequence_parallel: + if self.config.sequence_parallel: input_ids = sp_utils.pad_to_sequence_parallel(input_ids) input_ids = input_ids.transpose(0, 1) # (1, total_nnz+pad) @@ -345,7 +346,7 @@ def forward( logits = self._forward_head(hidden_states) # remove padding from sequence parallel - if self.megatron_config.sequence_parallel: + if self.config.sequence_parallel: totol_nnz = cu_seqlens[-1] logits = logits[:totol_nnz] # (total_nnz_padded) @@ -367,17 +368,17 @@ class ParallelLlamaForValueRmPad(ParallelLlamaForCausalLMRmPad): def _init_head(self): column_kwargs = tp_utils.get_default_kwargs_for_column_parallel_linear() - if self.megatron_config is not None: + if self.config is not None: assert column_kwargs.get('config', False), 'must have ModelParallelConfig' - tp_utils.update_kwargs_with_config(column_kwargs, self.megatron_config) - self.lm_head = nn.Linear(in_features=self.config.hidden_size, out_features=1, bias=False) + tp_utils.update_kwargs_with_config(column_kwargs, self.config) + self.lm_head = nn.Linear(in_features=self.model_config.hidden_size, out_features=1, bias=False) # lm_head is effectively the same as sequence parallel sp_utils.mark_parameter_as_sequence_parallel(self.lm_head.weight) def _forward_head(self, hidden_states): logits = self.lm_head(hidden_states) # (total_nnz_padded // tp, 1, 1) logits = logits.float() - if self.megatron_config.sequence_parallel: + if self.config.sequence_parallel: logits = tensor_parallel.gather_from_sequence_parallel_region(logits, tensor_parallel_output_grad=False) return logits @@ -413,11 +414,11 @@ def __init__(self, config: LlamaConfig, megatron_config: ModelParallelConfig, pr self.vocab_size = config.vocab_size self.pre_process = pre_process self.post_process = post_process - self.megatron_config = megatron_config + self.config = megatron_config embedding_kwargs = tp_utils.get_default_kwargs_for_parallel_embedding() if megatron_config is not None: assert embedding_kwargs.get('config', False), 'must have ModelParallelConfig' - tp_utils.update_kwargs_with_config(embedding_kwargs, self.megatron_config) + tp_utils.update_kwargs_with_config(embedding_kwargs, self.config) if pre_process: self.embed_tokens = tensor_parallel.VocabParallelEmbedding(num_embeddings=config.vocab_size, embedding_dim=config.hidden_size, @@ -487,7 +488,7 @@ def forward(self, # so need to deal with it by handle here: # (1, total_nnz, hidden_size) -> (total_nnz, 1, hidden_size) -> (total_nnz // sp, 1, hidden_size) inputs_embeds = inputs_embeds.transpose(0, 1) - if self.megatron_config.sequence_parallel: + if self.config.sequence_parallel: inputs_embeds = tensor_parallel.scatter_to_sequence_parallel_region(inputs_embeds) hidden_states = inputs_embeds @@ -513,16 +514,17 @@ def forward(self, class ParallelLlamaForCausalLMRmPadPP(nn.Module): - def __init__(self, config: LlamaConfig, megatron_config: ModelParallelConfig, pre_process, post_process): + def __init__(self, model_config: LlamaConfig, megatron_config: ModelParallelConfig, pre_process, post_process): super().__init__() - self.config = config - self.megatron_config = megatron_config - self.model = ParallelLlamaModelRmPadPP(config, + # Note(haibin.lin): to be compatible with Megatron APIs, model.config refers to megatron configs + self.config = megatron_config + self.model_config = model_config + self.model = ParallelLlamaModelRmPadPP(model_config, megatron_config=megatron_config, pre_process=pre_process, post_process=post_process) self.share_embeddings_and_output_weights = None # workaround, megatron requires this attr - self.vocab_size = config.vocab_size + self.vocab_size = model_config.vocab_size self.pre_process = pre_process self.post_process = post_process if post_process: @@ -541,11 +543,11 @@ def set_input_tensor(self, input_tensor): def _init_head(self): column_kwargs = tp_utils.get_default_kwargs_for_column_parallel_linear() - if self.megatron_config is not None: + if self.config is not None: assert column_kwargs.get('config', False), 'must have ModelParallelConfig' - tp_utils.update_kwargs_with_config(column_kwargs, self.megatron_config) - self.lm_head = tensor_parallel.ColumnParallelLinear(input_size=self.config.hidden_size, - output_size=self.config.vocab_size, + tp_utils.update_kwargs_with_config(column_kwargs, self.config) + self.lm_head = tensor_parallel.ColumnParallelLinear(input_size=self.model_config.hidden_size, + output_size=self.model_config.vocab_size, bias=False, gather_output=False, skip_bias_add=False, @@ -586,7 +588,7 @@ def forward( # pad input_ids to multiple of tp for all tp ranks # TODO: for better performance, the sp padding should be removed at each layer. Not sure the performance gap - if self.megatron_config.sequence_parallel: + if self.config.sequence_parallel: input_ids_rmpad = sp_utils.pad_to_sequence_parallel(input_ids_rmpad) input_ids_rmpad = input_ids_rmpad.transpose(0, 1) # (1, total_nnz+pad) @@ -605,7 +607,7 @@ def forward( logits = torch.squeeze(logits, dim=1) # remove the artificial batch dimension # torch.Size([8, 32, 16]) # remove padding from sequence parallel - if self.megatron_config.sequence_parallel: + if self.config.sequence_parallel: totol_nnz = cu_seqlens[-1] logits = logits[:totol_nnz] # (total_nnz_padded) # add removed padding back. If input is already rmpad, we let the caller pad_input @@ -627,17 +629,17 @@ class ParallelLlamaForValueRmPadPP(ParallelLlamaForCausalLMRmPadPP): def _init_head(self): column_kwargs = tp_utils.get_default_kwargs_for_column_parallel_linear() - if self.megatron_config is not None: + if self.config is not None: assert column_kwargs.get('config', False), 'must have ModelParallelConfig' - tp_utils.update_kwargs_with_config(column_kwargs, self.megatron_config) - self.lm_head = nn.Linear(in_features=self.config.hidden_size, out_features=1, bias=False) + tp_utils.update_kwargs_with_config(column_kwargs, self.config) + self.lm_head = nn.Linear(in_features=self.model_config.hidden_size, out_features=1, bias=False) # lm_head is effectively the same as sequence parallel sp_utils.mark_parameter_as_sequence_parallel(self.lm_head.weight) def _forward_head(self, hidden_states): logits = self.lm_head(hidden_states) # (total_nnz_padded // tp, 1, 1) logits = logits.float() - if self.megatron_config.sequence_parallel: + if self.config.sequence_parallel: logits = tensor_parallel.gather_from_sequence_parallel_region(logits, tensor_parallel_output_grad=False) return logits diff --git a/verl/single_controller/ray/megatron.py b/verl/single_controller/ray/megatron.py index 3ccb23a1..c24f47b9 100644 --- a/verl/single_controller/ray/megatron.py +++ b/verl/single_controller/ray/megatron.py @@ -37,6 +37,8 @@ def __init__(self, resource_pool: RayResourcePool, ray_cls_with_init: RayClassWi class MegatronRayWorkerGroup(RayWorkerGroup, MegatronWorkerGroup): """ + Note(haibin.lin): this class is not used in the open source version of verl. Kept for internal reference only. + MegatronWorkerGroup will query each worker of its megatron rank info and store it inside the WorkerGroup so that the dispatcher can use it to dispatch data. """ diff --git a/verl/utils/megatron/optimizer.py b/verl/utils/megatron/optimizer.py index 9ae70b08..3b4aa54a 100644 --- a/verl/utils/megatron/optimizer.py +++ b/verl/utils/megatron/optimizer.py @@ -90,3 +90,144 @@ def get_megatron_optimizer( # FP32. return FP32Optimizer(optimizer, config.clip_grad, config.log_num_zeros_in_grad, check_for_nan_in_loss_and_grad, params_have_main_grad, model) + + +def _init_distributed_optimizer(self, optimizer, clip_grad, log_num_zeros_in_grad, + check_for_nan_in_grad, params_have_main_grad, fp16, + bf16, params_dtype, grad_scaler, models, overlap_param_gather: bool): + """Megatron optimizer initialized WITHOUT the dependency of **get_args()** APIs. + + See top of class definition for argument descriptions. + + The steps in this method create the core mapping between DDP grad + buffers, parameters, and parameter shard ranges, that is needed for + converting between model param indexes and main parameter shard + indexes. This method also updates the optimizer parameter groups + with the newly created shards. + """ + import torch + from megatron import get_args + from megatron import get_timers + from megatron import print_rank_0 + from megatron.core import mpu, tensor_parallel + + from megatron.optimizer.optimizer import MixedPrecisionOptimizer, _zero_grad_group_helper + from megatron.optimizer.utils import shard_buffer + + super(DistributedOptimizer, self).__init__( + optimizer, clip_grad, log_num_zeros_in_grad, + check_for_nan_in_grad, params_have_main_grad, + fp16, bf16, params_dtype, grad_scaler, models) + + assert isinstance(optimizer, Adam), \ + "Only Adam currently supported, due to checkpointing requirements." + + # Model grad buffer ranges. + self.model_gbuf_ranges = [] + self.per_bucket_numel = [] + for _, model_chunk in enumerate(self.models): + self.per_bucket_numel.append( + {dtype: [bucket.data.numel() for bucket in model_chunk.grad_buffers[dtype].buckets] + for dtype in model_chunk.grad_buffers}) + self.model_gbuf_ranges.append(self.build_model_gbuf_range_map(model_chunk)) + self.model_param_gbuf_map = \ + self.build_model_param_gbuf_map(self.model_gbuf_ranges) + + # Optimizer ranges. + self.model_param_group_index_map, self.opt_group_ranges = \ + self.build_optimizer_group_ranges(self.optimizer.param_groups, + self.model_gbuf_ranges) + + # Allocate main param shards. + ( + self.model_float16_groups, + self.model_fp32_groups, + self.shard_float16_groups, + self.shard_fp32_groups, + self.shard_fp32_from_float16_groups, + ) = self.build_model_and_main_param_groups(self.model_gbuf_ranges, + self.model_param_gbuf_map, + self.opt_group_ranges) + + # Initialize param buffers. + # - These are views on the DDP model's grad buffers, that share + # storage & have their own dtype. This is safe because the param + # dtype size is always <= grad dtype size. + self.param_buffers = [] + for model_index, model in enumerate(self.models): + current_param_buffers = {} + for dtype, grad_buffer in model.grad_buffers.items(): + size_ratio = torch.finfo(dtype).bits // torch.finfo(params_dtype).bits + current_param_buffers[dtype] = [] + for bucket in grad_buffer.buckets: + + # Handle older/newer method for getting untyped storage. + try: + storage = bucket.data.storage()._untyped() + except: + storage = bucket.data.storage().untyped() + + # Typed param buffer. + param_buffer = torch.tensor( + storage, + dtype = params_dtype, + device = bucket.data.device) + + # .storage() ignores views / slices, so param_buffer now points to the start + # of the grad_buffer instead of to the start of each bucket. As a result, + # add bucket.offset to make sure param_buffers point to the right region of + # memory. + # Since we want the start of each bucket's param_buffer to coincide with the + # start of the same bucket's grad_buffer (this ensures that zeroing the grad + # buffer does not zero out params in the param_buffer before they are copied + # into the model_params), multiply the offset by the size ratio of grads and + # params. + offset = bucket.offset * size_ratio + param_buffer = param_buffer[offset:offset+bucket.data.numel()] + assert param_buffer.data_ptr() == bucket.data.data_ptr(), \ + "param_buffer and grad_buffer for same bucket should start at the same byte address" + assert param_buffer.numel() == bucket.data.numel(), \ + "param_buffer and grad_buffer for same bucket should have the same number of elements" + current_param_buffers[dtype].append(param_buffer) + self.param_buffers.append(current_param_buffers) + + # Now construct data structures to manage all-gather handles. + self.all_gather_handles = [] + self.all_gather_handle_index_to_bucket_index_map = [] + self.model_index_to_all_gather_handle_index_map = {} + self.param_to_all_gather_handle_index_map = {} + self.param_buffer_copied = [] + + self.pbuf_view_items = self.get_model_param_buffer_dp_views() + for (model_index, dtype, bucket_index, _, _) in self.pbuf_view_items: + self.all_gather_handle_index_to_bucket_index_map.append((model_index, dtype, bucket_index)) + all_gather_handle_index = len(self.all_gather_handle_index_to_bucket_index_map) - 1 + + # Store all all_gather_handle_indices relevant to a particular model chunk. + if model_index not in self.model_index_to_all_gather_handle_index_map: + self.model_index_to_all_gather_handle_index_map[model_index] = [] + self.model_index_to_all_gather_handle_index_map[model_index].append(all_gather_handle_index) + + for param in self.models[model_index].grad_buffers[dtype].buckets[bucket_index].params_list: + self.param_to_all_gather_handle_index_map[param] = all_gather_handle_index + self.param_buffer_copied.append(False) + self.num_all_gather_handles = len(self.all_gather_handle_index_to_bucket_index_map) + + self.overlap_param_gather = overlap_param_gather + if self.overlap_param_gather: + self.remove_pre_hook_handle = torch.nn.modules.module.register_module_forward_pre_hook( + self._make_forward_pre_hook()) + else: + self.remove_pre_hook_handle = None + + self.update_successful = False + + # Update optimizer groups. + # - Also, leverage state_dict() and load_state_dict() to + # recast preexisting per-param state tensors. + self.optimizer.param_groups = \ + [ g["orig_group"] for g in self.opt_group_ranges ] + self.optimizer.load_state_dict(self.optimizer.state_dict()) + + +DistributedOptimizer.__init__ = _init_distributed_optimizer \ No newline at end of file diff --git a/verl/utils/megatron/pipeline_parallel.py b/verl/utils/megatron/pipeline_parallel.py index 3a3790bb..b9bf6155 100644 --- a/verl/utils/megatron/pipeline_parallel.py +++ b/verl/utils/megatron/pipeline_parallel.py @@ -49,3 +49,17 @@ def make_batch_generator(batches, vpp_size): # no vpp batch_generator = iter(batches) return batch_generator + +def require_extra_schedule_kwargs(): + """Used to work around megatron get_args() issues. To be dropped after mcore v0.7""" + from megatron.core.pipeline_parallel.schedules import forward_backward_no_pipelining + import inspect + num_args = len(inspect.signature(forward_backward_no_pipelining).parameters) + # mcore v0.4 + if num_args == 9: + return False + elif num_args == 11: + # mcore v0.4 patched version + return True + else: + raise NotImplementedError("Unknown megatron version") \ No newline at end of file diff --git a/verl/workers/actor/megatron_actor.py b/verl/workers/actor/megatron_actor.py index 9ada0230..e1d96898 100644 --- a/verl/workers/actor/megatron_actor.py +++ b/verl/workers/actor/megatron_actor.py @@ -302,20 +302,24 @@ def forward_step(batch_iter, model): # batch should be a list of batches inside micro-batches batch_generator = make_batch_generator(batches, vpp_size=len(self.actor_module)) + from verl.utils.megatron.pipeline_parallel import require_extra_schedule_kwargs + schedule_kwargs = {} + if require_extra_schedule_kwargs(): + schedule_kwargs = {'hidden_size': self.model_config.hidden_size} # TODO: we may use the new schedule instead # for flash-attn: (seq_len, batch_size, hidden_size) = (mbs*seq_len, 1, hidden_size) if mpu.get_pipeline_model_parallel_world_size() > 1: + schedule_kwargs['input_shapes'] = input_shapes losses_reduced = forward_backward_func( forward_step_func=forward_step, data_iterator=batch_generator, model=self.actor_module, num_microbatches=n_micro_batch, - input_shapes=input_shapes, # must set for flash-attn sequence packing seq_length=batch_size * seq_len, # no use when input_shapes was set - hidden_size=self.model_config.hidden_size, # no use when input_shapes was set micro_batch_size=1, # no use when input_shapes was set forward_only=forward_only, + **schedule_kwargs, ) else: losses_reduced = forward_backward_func( @@ -324,9 +328,9 @@ def forward_step(batch_iter, model): model=self.actor_module, num_microbatches=n_micro_batch, seq_length=batch_size * seq_len, # in use for pp = 1 - hidden_size=self.model_config.hidden_size, # in use for pp = 1 micro_batch_size=1, # in use for pp = 1 forward_only=forward_only, + **schedule_kwargs, ) # loss_reduces contains the stats returned from loss_func return losses_reduced @@ -371,3 +375,15 @@ def update_policy(self, dataloader: Iterable[DataProto]) -> Dict: torch.cuda.empty_cache() return metrics + + def _schedule_require_args(self): + """Used to work around megatron get_args() issues. To be dropped after mcore v0.7""" + from megatron.core.pipeline_parallel.schedules import forward_backward_no_pipelining + import inspect + num_args = len(inspect.signature(forward_backward_no_pipelining).parameters) + if num_args == 9: + return False + elif num_args == 11: + return True + else: + raise NotImplementedError("Unknown megatron version") diff --git a/verl/workers/critic/megatron_critic.py b/verl/workers/critic/megatron_critic.py index f0b044ad..33fab924 100644 --- a/verl/workers/critic/megatron_critic.py +++ b/verl/workers/critic/megatron_critic.py @@ -176,10 +176,15 @@ def forward_step(batch_iter, model): # batch should be a list of batches inside micro-batches batch_generator = make_batch_generator(batches, vpp_size=len(self.critic_module)) + from verl.utils.megatron.pipeline_parallel import require_extra_schedule_kwargs + schedule_kwargs = {} + if require_extra_schedule_kwargs(): + schedule_kwargs = {'hidden_size': self.model_config.hidden_size} # TODO: we may use the new schedule instead # for flash-attn: (seq_len, batch_size, hidden_size) = (mbs*seq_len, 1, hidden_size) if mpu.get_pipeline_model_parallel_world_size() > 1: + schedule_kwargs['input_shapes'] = input_shapes losses_reduced = forward_backward_func( forward_step_func=forward_step, data_iterator=batch_generator, @@ -187,9 +192,9 @@ def forward_step(batch_iter, model): num_microbatches=n_micro_batch, input_shapes=input_shapes, # must set for flash-attn sequence packing seq_length=self.config.ppo_micro_batch_size_per_gpu * seq_len, # no use when input_shapes was set - hidden_size=self.model_config.hidden_size, # no use when input_shapes was set micro_batch_size=1, # no use when input_shapes was set forward_only=forward_only, + **schedule_kwargs, ) else: losses_reduced = forward_backward_func( @@ -198,9 +203,9 @@ def forward_step(batch_iter, model): model=self.critic_module, num_microbatches=n_micro_batch, seq_length=self.config.ppo_micro_batch_size_per_gpu * seq_len, # in use for pp = 1 - hidden_size=self.model_config.hidden_size, # in use for pp = 1 micro_batch_size=1, # in use for pp = 1 forward_only=forward_only, + **schedule_kwargs, ) # loss_reduces contains the stats returned from loss_func return losses_reduced @@ -231,3 +236,15 @@ def update_critic(self, dataloader: Iterable[DataProto]): # add empty cache after each compute torch.cuda.empty_cache() return metrics + + def _schedule_require_args(self): + """Used to work around megatron get_args() issues. To be dropped after mcore v0.7""" + from megatron.core.pipeline_parallel.schedules import forward_backward_no_pipelining + import inspect + num_args = len(inspect.signature(forward_backward_no_pipelining).parameters) + if num_args == 9: + return False + elif num_args == 11: + return True + else: + raise NotImplementedError("Unknown megatron version") diff --git a/verl/workers/reward_model/megatron/reward_model.py b/verl/workers/reward_model/megatron/reward_model.py index 1b58f42c..6c09c7c9 100644 --- a/verl/workers/reward_model/megatron/reward_model.py +++ b/verl/workers/reward_model/megatron/reward_model.py @@ -229,20 +229,23 @@ def forward_step(batch_iter, model): # batch should be a list of batches inside micro-batches batch_generator = make_batch_generator(batches, vpp_size=len(self.reward_model_module)) - + from verl.utils.megatron.pipeline_parallel import require_extra_schedule_kwargs + schedule_kwargs = {} + if require_extra_schedule_kwargs(): + schedule_kwargs = {'hidden_size': self.model_config.hidden_size} # TODO: we may use the new schedule instead # for flash-attn: (seq_len, batch_size, hidden_size) = (mbs*seq_len, 1, hidden_size) if mpu.get_pipeline_model_parallel_world_size() > 1: + schedule_kwargs['input_shapes'] = input_shapes # must set for flash-attn sequence packing losses_reduced = forward_backward_func( forward_step_func=forward_step, data_iterator=batch_generator, model=self.reward_model_module, num_microbatches=n_micro_batch, - input_shapes=input_shapes, # must set for flash-attn sequence packing seq_length=infer_batch_size * seq_len, # no use when input_shapes was set - hidden_size=self.model_config.hidden_size, # no use when input_shapes was set micro_batch_size=1, # no use when input_shapes was set forward_only=True, + **schedule_kwargs, # hidden size is of no use when input_shapes was set ) else: losses_reduced = forward_backward_func( @@ -251,9 +254,9 @@ def forward_step(batch_iter, model): model=self.reward_model_module, num_microbatches=n_micro_batch, seq_length=infer_batch_size * seq_len, # in use for pp = 1 - hidden_size=self.model_config.hidden_size, # in use for pp = 1 micro_batch_size=1, # in use for pp = 1 forward_only=True, + **schedule_kwargs, # hidden_size in use for pp = 1 ) # loss_reduces contains the stats returned from loss_func @@ -272,4 +275,4 @@ def load_params_to_cuda(self): for reward_model_module in self.reward_model_module: for name, param in reward_model_module.named_parameters(): param.data = param.data.to(torch.cuda.current_device(), non_blocking=True) - self.device = 'cuda' + self.device = 'cuda' \ No newline at end of file From 3f89cb8afd92834041f9a8316db52eebe7c23732 Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 7 Feb 2025 07:09:23 +0000 Subject: [PATCH 7/8] fix lint --- verl/utils/megatron/optimizer.py | 246 +++++++++--------- verl/utils/megatron/pipeline_parallel.py | 3 +- .../reward_model/megatron/reward_model.py | 8 +- 3 files changed, 126 insertions(+), 131 deletions(-) diff --git a/verl/utils/megatron/optimizer.py b/verl/utils/megatron/optimizer.py index 3b4aa54a..07aede38 100644 --- a/verl/utils/megatron/optimizer.py +++ b/verl/utils/megatron/optimizer.py @@ -92,10 +92,10 @@ def get_megatron_optimizer( params_have_main_grad, model) -def _init_distributed_optimizer(self, optimizer, clip_grad, log_num_zeros_in_grad, - check_for_nan_in_grad, params_have_main_grad, fp16, - bf16, params_dtype, grad_scaler, models, overlap_param_gather: bool): - """Megatron optimizer initialized WITHOUT the dependency of **get_args()** APIs. +def _init_distributed_optimizer(self, optimizer, clip_grad, log_num_zeros_in_grad, check_for_nan_in_grad, + params_have_main_grad, fp16, bf16, params_dtype, grad_scaler, models, + overlap_param_gather: bool): + """Megatron optimizer initialized WITHOUT the dependency of **get_args()** APIs. See top of class definition for argument descriptions. @@ -105,129 +105,123 @@ def _init_distributed_optimizer(self, optimizer, clip_grad, log_num_zeros_in_gra indexes. This method also updates the optimizer parameter groups with the newly created shards. """ - import torch - from megatron import get_args - from megatron import get_timers - from megatron import print_rank_0 - from megatron.core import mpu, tensor_parallel - - from megatron.optimizer.optimizer import MixedPrecisionOptimizer, _zero_grad_group_helper - from megatron.optimizer.utils import shard_buffer - - super(DistributedOptimizer, self).__init__( - optimizer, clip_grad, log_num_zeros_in_grad, - check_for_nan_in_grad, params_have_main_grad, - fp16, bf16, params_dtype, grad_scaler, models) - - assert isinstance(optimizer, Adam), \ - "Only Adam currently supported, due to checkpointing requirements." - - # Model grad buffer ranges. - self.model_gbuf_ranges = [] - self.per_bucket_numel = [] - for _, model_chunk in enumerate(self.models): - self.per_bucket_numel.append( - {dtype: [bucket.data.numel() for bucket in model_chunk.grad_buffers[dtype].buckets] - for dtype in model_chunk.grad_buffers}) - self.model_gbuf_ranges.append(self.build_model_gbuf_range_map(model_chunk)) - self.model_param_gbuf_map = \ - self.build_model_param_gbuf_map(self.model_gbuf_ranges) - - # Optimizer ranges. - self.model_param_group_index_map, self.opt_group_ranges = \ - self.build_optimizer_group_ranges(self.optimizer.param_groups, - self.model_gbuf_ranges) - - # Allocate main param shards. - ( - self.model_float16_groups, - self.model_fp32_groups, - self.shard_float16_groups, - self.shard_fp32_groups, - self.shard_fp32_from_float16_groups, - ) = self.build_model_and_main_param_groups(self.model_gbuf_ranges, - self.model_param_gbuf_map, - self.opt_group_ranges) - - # Initialize param buffers. - # - These are views on the DDP model's grad buffers, that share - # storage & have their own dtype. This is safe because the param - # dtype size is always <= grad dtype size. - self.param_buffers = [] - for model_index, model in enumerate(self.models): - current_param_buffers = {} - for dtype, grad_buffer in model.grad_buffers.items(): - size_ratio = torch.finfo(dtype).bits // torch.finfo(params_dtype).bits - current_param_buffers[dtype] = [] - for bucket in grad_buffer.buckets: - - # Handle older/newer method for getting untyped storage. - try: - storage = bucket.data.storage()._untyped() - except: - storage = bucket.data.storage().untyped() - - # Typed param buffer. - param_buffer = torch.tensor( - storage, - dtype = params_dtype, - device = bucket.data.device) - - # .storage() ignores views / slices, so param_buffer now points to the start - # of the grad_buffer instead of to the start of each bucket. As a result, - # add bucket.offset to make sure param_buffers point to the right region of - # memory. - # Since we want the start of each bucket's param_buffer to coincide with the - # start of the same bucket's grad_buffer (this ensures that zeroing the grad - # buffer does not zero out params in the param_buffer before they are copied - # into the model_params), multiply the offset by the size ratio of grads and - # params. - offset = bucket.offset * size_ratio - param_buffer = param_buffer[offset:offset+bucket.data.numel()] - assert param_buffer.data_ptr() == bucket.data.data_ptr(), \ - "param_buffer and grad_buffer for same bucket should start at the same byte address" - assert param_buffer.numel() == bucket.data.numel(), \ - "param_buffer and grad_buffer for same bucket should have the same number of elements" - current_param_buffers[dtype].append(param_buffer) - self.param_buffers.append(current_param_buffers) - - # Now construct data structures to manage all-gather handles. - self.all_gather_handles = [] - self.all_gather_handle_index_to_bucket_index_map = [] - self.model_index_to_all_gather_handle_index_map = {} - self.param_to_all_gather_handle_index_map = {} - self.param_buffer_copied = [] - - self.pbuf_view_items = self.get_model_param_buffer_dp_views() - for (model_index, dtype, bucket_index, _, _) in self.pbuf_view_items: - self.all_gather_handle_index_to_bucket_index_map.append((model_index, dtype, bucket_index)) - all_gather_handle_index = len(self.all_gather_handle_index_to_bucket_index_map) - 1 - - # Store all all_gather_handle_indices relevant to a particular model chunk. - if model_index not in self.model_index_to_all_gather_handle_index_map: - self.model_index_to_all_gather_handle_index_map[model_index] = [] - self.model_index_to_all_gather_handle_index_map[model_index].append(all_gather_handle_index) - - for param in self.models[model_index].grad_buffers[dtype].buckets[bucket_index].params_list: - self.param_to_all_gather_handle_index_map[param] = all_gather_handle_index - self.param_buffer_copied.append(False) - self.num_all_gather_handles = len(self.all_gather_handle_index_to_bucket_index_map) - - self.overlap_param_gather = overlap_param_gather - if self.overlap_param_gather: - self.remove_pre_hook_handle = torch.nn.modules.module.register_module_forward_pre_hook( - self._make_forward_pre_hook()) - else: - self.remove_pre_hook_handle = None + import torch + from megatron import get_args + from megatron import get_timers + from megatron import print_rank_0 + from megatron.core import mpu, tensor_parallel + + from megatron.optimizer.optimizer import MixedPrecisionOptimizer, _zero_grad_group_helper + from megatron.optimizer.utils import shard_buffer + + super(DistributedOptimizer, self).__init__(optimizer, clip_grad, log_num_zeros_in_grad, check_for_nan_in_grad, + params_have_main_grad, fp16, bf16, params_dtype, grad_scaler, models) + + assert isinstance(optimizer, Adam), \ + "Only Adam currently supported, due to checkpointing requirements." + + # Model grad buffer ranges. + self.model_gbuf_ranges = [] + self.per_bucket_numel = [] + for _, model_chunk in enumerate(self.models): + self.per_bucket_numel.append({ + dtype: [bucket.data.numel() for bucket in model_chunk.grad_buffers[dtype].buckets] + for dtype in model_chunk.grad_buffers + }) + self.model_gbuf_ranges.append(self.build_model_gbuf_range_map(model_chunk)) + self.model_param_gbuf_map = \ + self.build_model_param_gbuf_map(self.model_gbuf_ranges) + + # Optimizer ranges. + self.model_param_group_index_map, self.opt_group_ranges = \ + self.build_optimizer_group_ranges(self.optimizer.param_groups, + self.model_gbuf_ranges) + + # Allocate main param shards. + ( + self.model_float16_groups, + self.model_fp32_groups, + self.shard_float16_groups, + self.shard_fp32_groups, + self.shard_fp32_from_float16_groups, + ) = self.build_model_and_main_param_groups(self.model_gbuf_ranges, self.model_param_gbuf_map, self.opt_group_ranges) + + # Initialize param buffers. + # - These are views on the DDP model's grad buffers, that share + # storage & have their own dtype. This is safe because the param + # dtype size is always <= grad dtype size. + self.param_buffers = [] + for model_index, model in enumerate(self.models): + current_param_buffers = {} + for dtype, grad_buffer in model.grad_buffers.items(): + size_ratio = torch.finfo(dtype).bits // torch.finfo(params_dtype).bits + current_param_buffers[dtype] = [] + for bucket in grad_buffer.buckets: + + # Handle older/newer method for getting untyped storage. + try: + storage = bucket.data.storage()._untyped() + except: + storage = bucket.data.storage().untyped() + + # Typed param buffer. + param_buffer = torch.tensor(storage, dtype=params_dtype, device=bucket.data.device) + + # .storage() ignores views / slices, so param_buffer now points to the start + # of the grad_buffer instead of to the start of each bucket. As a result, + # add bucket.offset to make sure param_buffers point to the right region of + # memory. + # Since we want the start of each bucket's param_buffer to coincide with the + # start of the same bucket's grad_buffer (this ensures that zeroing the grad + # buffer does not zero out params in the param_buffer before they are copied + # into the model_params), multiply the offset by the size ratio of grads and + # params. + offset = bucket.offset * size_ratio + param_buffer = param_buffer[offset:offset + bucket.data.numel()] + assert param_buffer.data_ptr() == bucket.data.data_ptr(), \ + "param_buffer and grad_buffer for same bucket should start at the same byte address" + assert param_buffer.numel() == bucket.data.numel(), \ + "param_buffer and grad_buffer for same bucket should have the same number of elements" + current_param_buffers[dtype].append(param_buffer) + self.param_buffers.append(current_param_buffers) + + # Now construct data structures to manage all-gather handles. + self.all_gather_handles = [] + self.all_gather_handle_index_to_bucket_index_map = [] + self.model_index_to_all_gather_handle_index_map = {} + self.param_to_all_gather_handle_index_map = {} + self.param_buffer_copied = [] + + self.pbuf_view_items = self.get_model_param_buffer_dp_views() + for (model_index, dtype, bucket_index, _, _) in self.pbuf_view_items: + self.all_gather_handle_index_to_bucket_index_map.append((model_index, dtype, bucket_index)) + all_gather_handle_index = len(self.all_gather_handle_index_to_bucket_index_map) - 1 + + # Store all all_gather_handle_indices relevant to a particular model chunk. + if model_index not in self.model_index_to_all_gather_handle_index_map: + self.model_index_to_all_gather_handle_index_map[model_index] = [] + self.model_index_to_all_gather_handle_index_map[model_index].append(all_gather_handle_index) + + for param in self.models[model_index].grad_buffers[dtype].buckets[bucket_index].params_list: + self.param_to_all_gather_handle_index_map[param] = all_gather_handle_index + self.param_buffer_copied.append(False) + self.num_all_gather_handles = len(self.all_gather_handle_index_to_bucket_index_map) + + self.overlap_param_gather = overlap_param_gather + if self.overlap_param_gather: + self.remove_pre_hook_handle = torch.nn.modules.module.register_module_forward_pre_hook( + self._make_forward_pre_hook()) + else: + self.remove_pre_hook_handle = None - self.update_successful = False + self.update_successful = False - # Update optimizer groups. - # - Also, leverage state_dict() and load_state_dict() to - # recast preexisting per-param state tensors. - self.optimizer.param_groups = \ - [ g["orig_group"] for g in self.opt_group_ranges ] - self.optimizer.load_state_dict(self.optimizer.state_dict()) + # Update optimizer groups. + # - Also, leverage state_dict() and load_state_dict() to + # recast preexisting per-param state tensors. + self.optimizer.param_groups = \ + [ g["orig_group"] for g in self.opt_group_ranges ] + self.optimizer.load_state_dict(self.optimizer.state_dict()) -DistributedOptimizer.__init__ = _init_distributed_optimizer \ No newline at end of file +DistributedOptimizer.__init__ = _init_distributed_optimizer diff --git a/verl/utils/megatron/pipeline_parallel.py b/verl/utils/megatron/pipeline_parallel.py index b9bf6155..27d197e2 100644 --- a/verl/utils/megatron/pipeline_parallel.py +++ b/verl/utils/megatron/pipeline_parallel.py @@ -50,6 +50,7 @@ def make_batch_generator(batches, vpp_size): batch_generator = iter(batches) return batch_generator + def require_extra_schedule_kwargs(): """Used to work around megatron get_args() issues. To be dropped after mcore v0.7""" from megatron.core.pipeline_parallel.schedules import forward_backward_no_pipelining @@ -62,4 +63,4 @@ def require_extra_schedule_kwargs(): # mcore v0.4 patched version return True else: - raise NotImplementedError("Unknown megatron version") \ No newline at end of file + raise NotImplementedError("Unknown megatron version") diff --git a/verl/workers/reward_model/megatron/reward_model.py b/verl/workers/reward_model/megatron/reward_model.py index 6c09c7c9..0cfc1d49 100644 --- a/verl/workers/reward_model/megatron/reward_model.py +++ b/verl/workers/reward_model/megatron/reward_model.py @@ -236,7 +236,7 @@ def forward_step(batch_iter, model): # TODO: we may use the new schedule instead # for flash-attn: (seq_len, batch_size, hidden_size) = (mbs*seq_len, 1, hidden_size) if mpu.get_pipeline_model_parallel_world_size() > 1: - schedule_kwargs['input_shapes'] = input_shapes # must set for flash-attn sequence packing + schedule_kwargs['input_shapes'] = input_shapes # must set for flash-attn sequence packing losses_reduced = forward_backward_func( forward_step_func=forward_step, data_iterator=batch_generator, @@ -245,7 +245,7 @@ def forward_step(batch_iter, model): seq_length=infer_batch_size * seq_len, # no use when input_shapes was set micro_batch_size=1, # no use when input_shapes was set forward_only=True, - **schedule_kwargs, # hidden size is of no use when input_shapes was set + **schedule_kwargs, # hidden size is of no use when input_shapes was set ) else: losses_reduced = forward_backward_func( @@ -256,7 +256,7 @@ def forward_step(batch_iter, model): seq_length=infer_batch_size * seq_len, # in use for pp = 1 micro_batch_size=1, # in use for pp = 1 forward_only=True, - **schedule_kwargs, # hidden_size in use for pp = 1 + **schedule_kwargs, # hidden_size in use for pp = 1 ) # loss_reduces contains the stats returned from loss_func @@ -275,4 +275,4 @@ def load_params_to_cuda(self): for reward_model_module in self.reward_model_module: for name, param in reward_model_module.named_parameters(): param.data = param.data.to(torch.cuda.current_device(), non_blocking=True) - self.device = 'cuda' \ No newline at end of file + self.device = 'cuda' From e9d24475bd65db3d5c11f7988803273e71110d3c Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 7 Feb 2025 18:41:40 +0000 Subject: [PATCH 8/8] remove unused function --- verl/workers/actor/megatron_actor.py | 14 +------------- verl/workers/critic/megatron_critic.py | 14 +------------- 2 files changed, 2 insertions(+), 26 deletions(-) diff --git a/verl/workers/actor/megatron_actor.py b/verl/workers/actor/megatron_actor.py index e1d96898..d27b95f9 100644 --- a/verl/workers/actor/megatron_actor.py +++ b/verl/workers/actor/megatron_actor.py @@ -374,16 +374,4 @@ def update_policy(self, dataloader: Iterable[DataProto]) -> Dict: # add empty cache after each compute torch.cuda.empty_cache() - return metrics - - def _schedule_require_args(self): - """Used to work around megatron get_args() issues. To be dropped after mcore v0.7""" - from megatron.core.pipeline_parallel.schedules import forward_backward_no_pipelining - import inspect - num_args = len(inspect.signature(forward_backward_no_pipelining).parameters) - if num_args == 9: - return False - elif num_args == 11: - return True - else: - raise NotImplementedError("Unknown megatron version") + return metrics \ No newline at end of file diff --git a/verl/workers/critic/megatron_critic.py b/verl/workers/critic/megatron_critic.py index 33fab924..e2134c76 100644 --- a/verl/workers/critic/megatron_critic.py +++ b/verl/workers/critic/megatron_critic.py @@ -235,16 +235,4 @@ def update_critic(self, dataloader: Iterable[DataProto]): # add empty cache after each compute torch.cuda.empty_cache() - return metrics - - def _schedule_require_args(self): - """Used to work around megatron get_args() issues. To be dropped after mcore v0.7""" - from megatron.core.pipeline_parallel.schedules import forward_backward_no_pipelining - import inspect - num_args = len(inspect.signature(forward_backward_no_pipelining).parameters) - if num_args == 9: - return False - elif num_args == 11: - return True - else: - raise NotImplementedError("Unknown megatron version") + return metrics \ No newline at end of file