flash-mla / tests /test_flash_mla.py
drbh
feat: build flash mla with kernel builder
1f83cde
import torch
import random
import torch.nn.functional as F
import flash_mla
# TODO: revise to use the same test as the original code
def test_flash_mla():
# b = 128
# s_q = 4096
# mean_sk = 8192
# h_q = 16
# h_kv = 1
# d = 576
# dv = 512
b = 16
s_q = 16
mean_sk = 16
h_q = 16
h_kv = 1
d = 576
dv = 512
causal = True
varlen = False
print(f"{b=}, {s_q=}, {mean_sk=}, {h_q=}, {h_kv=}, {d=}, {dv=}, {causal=}, {varlen=}")
cache_seqlens = torch.full((b,), mean_sk, dtype=torch.int32)
if varlen:
for i in range(b):
cache_seqlens[i] = max(random.normalvariate(mean_sk, mean_sk / 2), s_q)
total_seqlens = cache_seqlens.sum().item()
mean_seqlens = cache_seqlens.float().mean().int().item()
max_seqlen = cache_seqlens.max().item()
# TODO: avoid triton from original code
# max_seqlen_pad = triton.cdiv(max_seqlen, 256) * 256
print(f"{total_seqlens=}, {mean_seqlens=}, {max_seqlen=}")
max_seqlen_pad = max_seqlen + 255 & ~255 # round up to multiple of 256
q = torch.randn(b, s_q, h_q, d)
block_size = 64
block_table = torch.arange(b * max_seqlen_pad // block_size, dtype=torch.int32).view(
b, max_seqlen_pad // block_size
)
blocked_k = torch.randn(block_table.numel(), block_size, h_kv, d)
print(blocked_k.shape)
for i in range(b):
blocked_k.view(b, max_seqlen_pad, h_kv, d)[i, cache_seqlens[i].item() :] = float(
"nan"
)
blocked_v = blocked_k[..., :dv]
print(blocked_k.shape, blocked_v.shape)
cache_seqlens = cache_seqlens.to("cuda")
tile_scheduler_metadata, num_splits = flash_mla.get_mla_metadata(
seqlens_k=cache_seqlens,
#
s_q=s_q * h_q // h_kv,
h_kv=h_kv,
)
print(tile_scheduler_metadata, num_splits)
# TODO: update to expect the correct output
assert False