# Set the frequency Scaling Governor to Performance ~$ sudo cpupower frequency-set -g performence # Disable Turbo Boost ~$ echo"1" | sudo tee /sys/devices/system/cpu/intel_pstate/no_turbo 1
from transformers import AutoTokenizer, AutoModelForCausalLM
model_name = "bigscience/bloomz-3b" payload = "Explain in a sentence in English what is backpropagation in neural networks." tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16) ds_model = deepspeed.init_inference(model=model, mp_size=2, dtype=torch.float16, replace_method="auto", replace_with_kernel_inject=True)
# We only support three modes: 1) user specified policy for tensor-parallelism, 2) kernel injection (replace_with_kernel_inject), and 3) automatic tensor parallelism if tp_size > 1. if self.injection_dict: # 1. User specified Tensor Parallelism assertnot config.replace_with_kernel_inject, "Cannot use both user specified injection policy and kernel injection" for client_module, injection_policy in self.injection_dict.items(): assertissubclass(client_module, torch.nn.Module), f"{client_module} is not a subclass of torch.nn.Module" # construct the tuple and pass that instead of a string or dict. ifisinstance(injection_policy, str): config.injection_policy_tuple = (injection_policy, ) else: config.injection_policy_tuple = injection_policy layer_names = [name for name, _ in self.module.named_modules()] for policy in config.injection_policy_tuple: ifnotany(name.endswith(policy) for name in layer_names): raise ValueError(f"Injection policy layer'{policy}' not valid.") self._apply_injection_policy(config, client_module) else: if config.replace_with_kernel_inject: # 2. DeepSpeed Kernel Injection self._apply_injection_policy(config) elif config.tensor_parallel.tp_size > 1: # 3. Automatic Tensor Parallelism parser_dict = AutoTP.tp_parser(model) print("AutoTP: ", parser_dict) for client_module, injection_policy in parser_dict: ifisinstance(injection_policy, str): config.injection_policy_tuple = (injection_policy, ) else: config.injection_policy_tuple = injection_policy self._apply_injection_policy(config, client_module)
def_apply_injection_policy(self, config, client_module=None): # client_module is only passed when using the injection_dict method. checkpoint_dir = config.checkpoint checkpoint = SDLoaderFactory.get_sd_loader_json(checkpoint_dir, self.checkpoint_engine) if checkpoint_dir isnotNoneelseNone generic_injection(self.module, dtype=config.dtype, enable_cuda_graph=config.enable_cuda_graph) ifisinstance(self.module, torch.nn.Module): # config is our DeepSpeedInferenceConfig and self.config is the HF model config replace_transformer_layer(client_module, self.module, checkpoint, config, self.config)
# defining globals as internally defined functions inherit these everywhere quantize = (config.dtype == torch.int8) # todo: Refactor later. In future, let's minimize the style used above and use config.** instead
linear_layer_setting = None ''' linear_layer_setting (tuple of modules) [Optional]: shows which two classes are used for linear layers and embedding layers ''' micro_batch_size = -1 seed = -1 local_rank = -1
defreplace_module(model, orig_class, replace_fn, _replace_policy, checkpoint=None): """ Scan the model for instances of ``orig_clas:`` to replace using ``replace_fn``. Arguments: model (torch.nn.Module): the model to augment orig_class (torch.nn.Module): the module to search for replace_fn (method): a method to convert instances of ``orig_class`` to the desired type and return a new instance. Returns: A modified ``model``. """ sd = None if checkpoint isnotNone: sd = torch.load(checkpoint, map_location='cpu') policy = {} if orig_class isnotNone: policy.update({orig_class: (replace_fn, _replace_policy)}) else: for plcy in replace_policies: # instantiate a throw-away policy in order to populate the _orig_layer_class _ = plcy(None) ifisinstance(plcy._orig_layer_class, list): for orig_layer_class in plcy._orig_layer_class: policy.update({orig_layer_class: (replace_fn, plcy)}) elif plcy._orig_layer_class isnotNone: policy.update({plcy._orig_layer_class: (replace_fn, plcy)}) assertlen(policy.items()) > 0,\ "No default policy found! Please specify your policy injection_policy (like {BertLayer:HFBEertLayerPolicy})." +\ "You can find some samples here: https://github.com/microsoft/DeepSpeed/blob/master/deepspeed/module_inject/replace_policy.py"
from .containers import HFGPT2LayerPolicy from .containers import HFBertLayerPolicy from .containers import BLOOMLayerPolicy from .containers import HFGPTJLayerPolicy from .containers import HFGPTNEOLayerPolicy from .containers import GPTNEOXLayerPolicy from .containers import HFOPTLayerPolicy from .containers import MegatronLayerPolicy from .containers import HFDistilBertLayerPolicy from .containers import HFCLIPLayerPolicy from .containers import LLAMALayerPolicy from .containers import UNetPolicy from .containers import VAEPolicy from .containers import LLAMA2LayerPolicy from .containers import InternLMLayerPolicy
defreplace_fn(child, _policy, layer_id=0, prefix="", state_dict=None): training = False# todo: refactor this part to go in the config if training: # copy relevant state from child -> new module new_module = replace_with_policy(child, _policy, config.triangular_masking) else: # copy relevant state from child -> new module if config.replace_with_kernel_inject: new_module = replace_with_policy(child, _policy, config.triangular_masking, inference=True, layer_id=layer_id) else: new_module = replace_wo_policy(child, _policy, prefix=prefix, state_dict=state_dict) return new_module
def_replace_module(model, policies, prefix='', layer_id=0, level_id=0, state_dict=None): """ Traverse model's children recursively and apply any transformations in ``policies``. Arguments: model (torch.nn.Module): model to augment policies (dict): Mapping of source class to replacement function. Returns: Modified ``model``. """ for name, child in model.named_children(): if child.__class__ in policies: replaced_module = policies[child.__class__][0](child, policies[child.__class__][-1], layer_id, prefix=prefix + name, state_dict=state_dict) setattr(model, name, replaced_module) ifisinstance(model, PipelineModule): asserthasattr(model, 'forward_funcs'),\ "we require pipe-module to have the list of fwd_functions" model.forward_funcs[model.fwd_map[name]] = replaced_module layer_id += 1 else: checking_key = prefix + name + '.' if Loading.is_load_module(child) and state_dict isnotNone: ifany(checking_key in item for item in state_dict): Loading.load( child, state_dict, checking_key, ) else: continue iflen(child._buffers) != 0and state_dict isnotNone: Loading.load_buffer(child, state_dict, checking_key) _, layer_id = _replace_module(child, policies, prefix if level_id == 0and skip_level_0_prefix(model, state_dict) else \ prefix + name + '.', layer_id=layer_id, level_id=level_id + 1, state_dict=state_dict)
# Add the reset_cache func to the model, so that it can be called in the beginning of text-generation. model.reset_cache = transformer_inference.DeepSpeedTransformerInference.reset_cache return model, layer_id
defreplace_with_policy(child, policy_cls, triangular_masking, inference=False, layer_id=0): policy = policy_cls(child, inference=inference) ifnot policy.cuda_graph_supported: # policy says cuda graph is not supported raise an error if set assertnot config.enable_cuda_graph, "cuda graph is not supported with this model, please disable"
# 4. deal with data types -- needs refactor to use dtype instead of fp16 if config.dtype in [torch.float16, torch.bfloat16, torch.int8]: _container.convert_to_required_dtype()
# 5. Set the quantization config quantizer = GroupQuantizer(q_int8=quantize) _container.set_quantization_config(quantizer)
# 6. create a DS Inference config object _container.create_ds_model_config()
# 7. use the config and create the module _container.create_module()
# 8. transpose the weights and bias if needed _container.transpose()
# 9. deal with tensor parallelism. _container.apply_tensor_parallelism(mp_replace)
# 10. copy the tensors from the model-specific container to the new module _container.copy_data_to_new_module()
# 11. set global for generic checkpoint loading global container_g
# helper function to map between DS policies and DS containers defpolicy_to_ds_container(**kwargs): from .containers import HFGPT2LayerPolicy, DS_GPT2Container from .containers import HFBertLayerPolicy, DS_BERTContainer from .containers import BLOOMLayerPolicy, DS_BloomContainer from .containers import HFGPTJLayerPolicy, DS_GPTJContainer from .containers import HFGPTNEOLayerPolicy, DS_GPTNEOContainer from .containers import GPTNEOXLayerPolicy, DS_GPTNEOXContainer from .containers import HFOPTLayerPolicy, DS_OPTContainer from .containers import MegatronLayerPolicy, DS_MegatronGPTContainer from .containers import HFDistilBertLayerPolicy, DS_DistilBERTContainer from .containers import LLAMALayerPolicy, DS_LLAMAContainer from .containers import LLAMA2LayerPolicy, DS_LLAMA2Container from .containers import InternLMLayerPolicy, DS_InternLMContainer
if policy_type notin policy_to_container: log_dist(f"Policy type {policy_type} not supported", [0]) else: container = policy_to_container[policy_type](**kwargs)
defset_lora_params(self): """ Necessary to implement for `HybridEngineContainer` """ self.lora_params = [ maybe_get_lora(p) for p in [ self.policy.client_module.mlp.dense_h_to_4h, self.policy.client_module.mlp.dense_4h_to_h, self.policy. client_module.self_attention.query_key_value, self.policy.client_module.self_attention.dense ] ]
definitialize_tensors(self, enable_training=False): # Set the tensors from policy (user module) to container (DS module) self.set_attention(*self.policy.attention(enable_training=enable_training)) self.set_mlp(*self.policy.mlp(enable_training=enable_training)) self.set_layernorm(*self.policy.layernorm()) #self.check_meta_tensor_support()
defconvert_to_required_dtype(self): # Note: converting tensors to fp16 requires that we do it in-place using self.__dict__ and not make a list/dict copy if self.dtype in [torch.half, torch.bfloat16]: for k, v in self.__dict__.items(): # The list comprehension is used for MoE tensor lists ifisinstance(v, list) andall((isinstance(tensor, torch.Tensor) \ orisinstance(tensor, torch.nn.Parameter)) for tensor in v): self.__dict__[k] = [moe_tensor.to(self.dtype) for moe_tensor in v]
defcreate_ds_model_config(self): self.set_hidden_heads(*self.policy.get_hidden_heads()) assert self.num_attention_heads % self.mp_size == 0,\ "To run the model parallel across the GPUs, the attention_heads require to be divisible by the world_size!" +\ "This is because the attention computation is partitioned evenly among the parallel GPUs."
if self.use_triton and deepspeed.HAS_TRITON: from .bert import DS_BERTContainer ifnotisinstance(self, DS_BERTContainer): raise NotImplementedError("Triton kernels are only for BERT-like models yet")
ifnot self.config.triton_autotune: from deepspeed.ops.transformer.inference.triton.matmul_ext import fp16_matmul fp16_matmul.skip_autotune()
classDeepSpeedTransformerInference(nn.Module): """Initialize the DeepSpeed Transformer Layer. Arguments: layer_id: The layer index starting from 0, e.g. if model has 24 transformer layers, layer_id will be 0,1,2...23 when each layer object is instantiated config: An object of DeepSpeedInferenceConfig mp_group: Model parallelism group initialized on the modeling side. quantize_scales: This argument groups all the layers' scales used for quantization quantize_groups: Number of groups used for quantizing the model merge_count: Shows the number of model-parallel checkpoints merged before running inference. We use this argument to control the quantization scale for the model parameters if a bigger quantize-grouping than 1 is used. mlp_extra_grouping: This flag is used to show a 2x higher number of groups used for the MLP part of a Transformer layer. We use this feature for quantization to reduce the convergence impact for specific downstream tasks. """ layer_id = 0
defapply_tensor_parallelism(self, mp_replace, reversed_dim=False): """ Add support for reversed dim in tensor parallelism. If necessary, override the called methods to handle partitioned weights (i.e. if qkv is split, override the `attention_qkv_mp` method). If the model component is not split, it should be safe to use the default implementation. """ # Setup the new Attention module self.attention_qkv_mp(mp_replace, reversed_dim=reversed_dim) self.attention_o_mp(mp_replace, reversed_dim=reversed_dim)
# Setup the new MLP module self.mlp_inter_mp(mp_replace, reversed_dim=reversed_dim) self.mlp_output_mp(mp_replace, reversed_dim=reversed_dim)
Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.