import torch from typing import List, Optional, Tuple, Union from diffusers.utils import ( USE_PEFT_BACKEND, BaseOutput, logging, replace_example_docstring, scale_lora_layers, unscale_lora_layers, ) from diffusers.utils.torch_utils import randn_tensor class EulerAncestralDiscreteSchedulerOutput(BaseOutput): prev_sample: torch.FloatTensor pred_original_sample: Optional[torch.FloatTensor] = None def eul_step( self, model_output: torch.FloatTensor, timestep: Union[float, torch.FloatTensor], sample: torch.FloatTensor, fusion_latent, pipe, generator: Optional[torch.Generator] = None, return_dict: bool = True, ) -> Union[EulerAncestralDiscreteSchedulerOutput, Tuple]: if ( isinstance(timestep, int) or isinstance(timestep, torch.IntTensor) or isinstance(timestep, torch.LongTensor) ): raise ValueError( ( "Passing integer indices (e.g. from `enumerate(timesteps)`) as timesteps to" " `EulerDiscreteScheduler.step()` is not supported. Make sure to pass" " one of the `scheduler.timesteps` as a timestep." ), ) if self.step_index is None: self._init_step_index(timestep) sigma = self.sigmas[self.step_index] # Upcast to avoid precision issues when computing prev_sample sample = sample.to(torch.float32) # 1. compute predicted original sample (x_0) from sigma-scaled predicted noise if self.config.prediction_type == "epsilon": ## True, 计算x_0 pred_original_sample = sample - sigma * model_output elif self.config.prediction_type == "v_prediction": # * c_out + input * c_skip pred_original_sample = model_output * (-sigma / (sigma**2 + 1) ** 0.5) + (sample / (sigma**2 + 1)) elif self.config.prediction_type == "sample": raise NotImplementedError("prediction_type not implemented yet: sample") else: raise ValueError( f"prediction_type given as {self.config.prediction_type} must be one of `epsilon`, or `v_prediction`" ) ## fusion latent pred_original_sample = fusion_latent sigma_from = self.sigmas[self.step_index] sigma_to = self.sigmas[self.step_index + 1] sigma_up = (sigma_to**2 * (sigma_from**2 - sigma_to**2) / sigma_from**2) ** 0.5 sigma_down = (sigma_to**2 - sigma_up**2) ** 0.5 # 2. Convert to an ODE derivative derivative = (sample - pred_original_sample) / sigma dt = sigma_down - sigma prev_sample = sample + derivative * dt device = model_output.device noise = randn_tensor(model_output.shape, dtype=model_output.dtype, device=device, generator=generator) prev_sample = prev_sample + noise * sigma_up # Cast sample back to model compatible dtype prev_sample = prev_sample.to(model_output.dtype) # upon completion increase step index by one self._step_index += 1 if not return_dict: return (prev_sample,) return EulerAncestralDiscreteSchedulerOutput( prev_sample=prev_sample, pred_original_sample=pred_original_sample )