|
"""Kernel test utils""" |
|
|
|
import itertools |
|
import random |
|
import unittest |
|
from numbers import Number |
|
from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Tuple, Union |
|
|
|
import pytest |
|
import torch |
|
from torch._prims_common import TensorLikeType |
|
|
|
|
|
|
|
DEFAULT_OPCHECK_TEST_UTILS: Tuple[str, ...] = ( |
|
"test_schema", |
|
"test_autograd_registration", |
|
"test_faketensor", |
|
) |
|
|
|
ALL_OPCHECK_TEST_UTILS: Tuple[str, ...] = ( |
|
"test_schema", |
|
"test_autograd_registration", |
|
"test_faketensor", |
|
"test_aot_dispatch_dynamic", |
|
) |
|
|
|
def to_fp8(tensor: torch.Tensor): |
|
finfo = torch.finfo(torch.float8_e4m3fn) |
|
return torch.round(tensor.clamp( |
|
min=finfo.min, max=finfo.max)).to(dtype=torch.float8_e4m3fn) |
|
|
|
def to_int8(tensor: torch.Tensor): |
|
return torch.round(tensor.clamp(min=-128, max=127)).to(dtype=torch.int8) |
|
|
|
|
|
def rand_int8(shape: tuple, device: str = "cuda"): |
|
return to_int8(torch.rand(shape, device=device) * 255 - 128) |
|
|
|
|
|
|
|
|
|
|
|
def fp8_allclose( |
|
a: TensorLikeType, |
|
b: TensorLikeType, |
|
rtol: float = 1e-05, |
|
atol: float = 1e-08, |
|
equal_nan: bool = False, |
|
) -> bool: |
|
""" |
|
Reference implementation of torch.allclose |
|
""" |
|
torch._refs._check_close_args(name="torch.allclose", a=a, b=b, rtol=rtol, atol=atol) |
|
|
|
return bool( |
|
torch.all( |
|
torch.isclose( |
|
a.double(), b.double(), rtol=rtol, atol=atol, equal_nan=equal_nan |
|
) |
|
).item() |
|
) |
|
|
|
|
|
|
|
|
|
def opcheck( |
|
op: Union[ |
|
torch._ops.OpOverload, |
|
torch._ops.OpOverloadPacket, |
|
torch._library.custom_ops.CustomOpDef, |
|
], |
|
args: Tuple[Any, ...], |
|
kwargs: Optional[Dict[str, Any]] = None, |
|
*, |
|
test_utils: Union[str, Sequence[str]] = ALL_OPCHECK_TEST_UTILS, |
|
raise_exception: bool = True, |
|
cond: bool = True |
|
) -> Dict[str, str]: |
|
with unittest.mock.patch("torch.allclose", new=fp8_allclose): |
|
return ( |
|
torch.library.opcheck( |
|
op, args, kwargs, test_utils=test_utils, raise_exception=raise_exception |
|
) |
|
if cond |
|
else {} |
|
) |
|
|
|
def baseline_scaled_mm(a: torch.Tensor, |
|
b: torch.Tensor, |
|
scale_a: torch.Tensor, |
|
scale_b: torch.Tensor, |
|
out_dtype: type[torch.dtype], |
|
bias: Optional[torch.Tensor] = None) -> torch.Tensor: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def group_broadcast(t, shape): |
|
for i, s in enumerate(shape): |
|
if t.shape[i] != s and t.shape[i] != 1: |
|
assert s % t.shape[i] == 0 |
|
t = t.unsqueeze(i + 1)\ |
|
.expand(*t.shape[:i+1], s // t.shape[i], *t.shape[i+1:])\ |
|
.flatten(i, i + 1) |
|
return t |
|
|
|
scale_a = group_broadcast(scale_a, a.shape) |
|
scale_b = group_broadcast(scale_b, b.shape) |
|
|
|
output = torch.mm((scale_a * a.to(dtype=torch.float32)), |
|
(scale_b * b.to(dtype=torch.float32))).to(out_dtype) |
|
|
|
if bias is not None: |
|
output = output + bias |
|
|
|
return output |
|
|