Spaces:
Sleeping
Sleeping
File size: 108,436 Bytes
c61ccee |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 |
import copy
import functools
import inspect
import itertools
import logging
import os
import sys
import warnings
import weakref
from collections import defaultdict, deque
from contextlib import contextmanager
from dataclasses import dataclass, fields, is_dataclass
from enum import auto, Enum
from typing import Any, Callable, List, Optional, Tuple, Type
import torch
import torch.distributed as dist
from torch.autograd import Function, Variable
from torch.distributed.algorithms.join import Join, Joinable, JoinHook
from torch.utils._pytree import tree_flatten, tree_unflatten
from torch.utils.hooks import RemovableHandle
RPC_AVAILABLE = False
if dist.is_available():
from torch.distributed.distributed_c10d import (
_get_default_group,
_rank_not_in_group,
ReduceOp,
)
from torch.distributed.utils import (
_alloc_storage,
_cast_forward_inputs,
_free_storage,
_sync_module_states,
_to_kwargs,
_verify_param_shape_across_processes,
)
if torch.distributed.rpc.is_available():
RPC_AVAILABLE = True
from torch.distributed.rpc import RRef
from torch._utils import _get_device_index
from ..modules import Module
from .scatter_gather import gather, scatter_kwargs # noqa: F401
__all__ = ["DistributedDataParallel"]
logger = logging.getLogger(__name__)
@dataclass
class _MixedPrecision:
"""
This configures DDP-native mixed precision training.
Attributes:
param_dtype (torch.dtype): This specifies the dtype for model
parameters, inputs (when ``cast_forward_inputs`` is set to
``True``), and therefore the dtype for computation.
However, outside the forward and backward passes, parameters are in
full precision. Model checkpointing always happens in full
precision.
reduce_dtype (torch.dtype): This specifies the dtype for gradient
reduction, which is permitted to differ from ``param_dtype``.
buffer_dtype (torch.dtype): This specifies the dtype for buffers.
.. note:: This API is experimental and subject to change.
.. note:: Only floating point tensors are cast to their specified dtypes.
.. note:: ``state_dict`` checkpoints parameters and buffers in full
precision.
.. note:: Each low precision dtype must be specified explicitly. For
example, ``_MixedPrecision(reduce_dtype=torch.float16)`` only specifies
the reduction dtype to be low precision, and DDP will not cast
parameters or buffers.
.. note:: If a ``reduce_dtype`` is not specified, then gradient reduction
happens in ``param_dtype`` if specified or the original parameter dtype
otherwise. For example, ``_MixedPrecision(param_dtype=torch.float16)``
would result in communication occurring in fp16.
"""
param_dtype: Optional[torch.dtype] = None
reduce_dtype: Optional[torch.dtype] = None
buffer_dtype: Optional[torch.dtype] = None
# TODO (rohan-varma): keep_low_precision_grads: bool = False
# TODO (rohan-varma): APIs to allow users to run batchnorm and layernorm
# in full precision. For DDP, this can be implemented by not performing the
# parameter cast for BN and LN units.
def _cast_buffers(mixed_precision_config, root_module):
"""Casts buffers to the given ``buffer_dtype``."""
for buf in root_module.buffers():
if hasattr(buf, "_ddp_ignored") and buf._ddp_ignored:
continue
buf.data = buf.to(dtype=mixed_precision_config.buffer_dtype)
def _setup_mixed_precision_params(mixed_precision_config, root_module):
"""Create and free storage for the mixed precision parameters."""
for param in root_module.parameters():
# Do not setup mixed precision for DDP ignored parameters.
if hasattr(param, "_ddp_ignored") and param._ddp_ignored:
continue
if not hasattr(param, "_mp_param"):
param._mp_param = torch.zeros_like(
param,
device=param.device,
dtype=mixed_precision_config.param_dtype,
requires_grad=param.requires_grad,
)
_free_storage(param._mp_param)
# _fp_param will point to the full precision param so it can be switched
# back to at the end of forward / backward.
param._fp_param = param.data
def _tree_flatten_with_rref(output):
output_is_rref = RPC_AVAILABLE and isinstance(output, RRef)
if output_is_rref:
output_tensor_list, treespec = tree_flatten(output.local_value())
else:
output_tensor_list, treespec = tree_flatten(output)
# Need to return flattened tensors, spec to re-pack them, as well
# as if the return type was actually an RRef to reconstruct.
return output_tensor_list, treespec, output_is_rref
def _tree_unflatten_with_rref(output, treespec, output_is_rref):
output = tree_unflatten(output, treespec)
if output_is_rref:
output = RRef(output)
return output
def _find_tensors(obj):
r"""Recursively find all tensors contained in the specified object."""
if RPC_AVAILABLE and isinstance(obj, RRef):
# If the current node is the owner of the RRef, unwrap it and try to
# find Tensors.
# TODO: Expand to remote RRefs.
if obj.is_owner():
return _find_tensors(obj.local_value())
if isinstance(obj, torch.Tensor):
return [obj]
if isinstance(obj, (list, tuple)):
return itertools.chain.from_iterable(map(_find_tensors, obj))
if isinstance(obj, dict):
return itertools.chain.from_iterable(map(_find_tensors, obj.values()))
if is_dataclass(obj):
return itertools.chain.from_iterable(
map(_find_tensors, (getattr(obj, f.name) for f in fields(obj)))
)
return []
def _dump_DDP_relevant_env_vars():
relevant_env_vars = [
"RANK",
"LOCAL_RANK",
"WORLD_SIZE",
"MASTER_PORT",
"MASTER_ADDR",
"CUDA_VISIBLE_DEVICES",
"GLOO_SOCKET_IFNAME",
"GLOO_DEVICE_TRANSPORT",
"NCCL_SOCKET_IFNAME",
"TORCH_NCCL_BLOCKING_WAIT",
"NCCL_DEBUG",
"NCCL_DEBUG_SUBSYS",
"NCCL_IB_DISABLE",
# More NCCL env vars:
"NCCL_P2P_DISABLE",
"NCCL_P2P_LEVEL",
"NCCL_SHM_DISABLE",
"NCCL_SOCKET_NTHREADS",
"NCCL_NSOCKS_PERTHREAD",
"NCCL_BUFFSIZE",
"NCCL_NTHREADS",
"NCCL_RINGS",
"NCCL_MAX_NCHANNELS",
"NCCL_MIN_NCHANNELS",
"NCCL_CHECKS_DISABLE",
"NCCL_CHECK_POINTERS",
"NCCL_LAUNCH_MODE",
"NCCL_IB_HCA",
"NCCL_IB_TIMEOUT",
"NCCL_IB_RETRY_CNT",
"NCCL_IB_GID_INDEX",
"NCCL_IB_SL",
"NCCL_IB_TC",
"NCCL_IB_AR_THRESHOLD",
"NCCL_IB_CUDA_SUPPORT",
"NCCL_NET_GDR_LEVEL",
"NCCL_NET_GDR_READ",
"NCCL_SINGLE_RING_THRESHOLD",
"NCCL_LL_THRESHOLD",
"NCCL_TREE_THRESHOLD",
"NCCL_ALGO",
"NCCL_PROTO",
"NCCL_IGNORE_CPU_AFFINITY",
"NCCL_DEBUG_FILE",
"NCCL_COLLNET_ENABLE",
"NCCL_TOPO_FILE",
"NCCL_TOPO_DUMP_FILE",
"TORCH_NCCL_ASYNC_ERROR_HANDLING",
]
formatted_output = ""
for var in relevant_env_vars:
value = os.environ[var] if var in os.environ else "N/A"
formatted_output += f"env:{var}={value}\n"
print(formatted_output)
class _BufferCommHookLocation(Enum):
PRE_FORWARD = auto()
POST_FORWARD = auto()
@dataclass
class _BufferCommHook:
buffer_comm_hook: Callable
buffer_comm_hook_state: Any
buffer_comm_hook_location: _BufferCommHookLocation
# Add a DDPSink to run various functions when backwards starts, such as
# queueing call back of out-most backward/graph task,
# this helps call back is fired after all gradients' calculation
# is completed.
class _DDPSink(Function):
@staticmethod
def forward(ctx, ddp_weakref, *inputs):
# set_materialize_grads(False) will ensure that None gradients stay as
# None and are not filled with zeros.
ctx.set_materialize_grads(False)
ctx.ddp_weakref = ddp_weakref
ret = tuple(
inp.clone() if isinstance(inp, torch.Tensor) else inp for inp in inputs
)
return ret
@staticmethod
def backward(ctx, *grad_outputs):
# Enqueue delay allreduce for static graph training on the first
# iteration.
ddp_weakref = ctx.ddp_weakref()
reducer = ddp_weakref.reducer
static_graph = ddp_weakref.static_graph
delay_ar_enqueued = (
static_graph and ddp_weakref._static_graph_delay_allreduce_enqueued
)
if static_graph and not delay_ar_enqueued:
Variable._execution_engine.queue_callback( # type: ignore[call-arg,misc]
reducer._delay_all_reduce
)
ddp_weakref._static_graph_delay_allreduce_enqueued = True
return (None, *grad_outputs)
class _DDPJoinHook(JoinHook):
def __init__(self, ddp, divide_by_initial_world_size):
"""Set config variables for internal usage."""
assert isinstance(ddp, DistributedDataParallel), (
"DDP join hook requires passing in a DistributedDataParallel "
"instance as the state"
)
assert ddp.logger is not None
ddp.logger._set_uneven_input_join()
self.ddp = ddp
self.ddp._divide_by_initial_world_size = divide_by_initial_world_size
super().__init__()
def main_hook(self):
"""Shadow the DDP collective communication operations in the forward and backward passes."""
ddp = self.ddp
# Buckets are rebuilt only once during a training period
ddp.reducer._rebuild_buckets()
# Schedule a broadcast if we are syncing module buffers in the
# forward pass
# TODO: make DDP uneven inputs context manager support buffer
# comm hook (https://github.com/pytorch/pytorch/issues/65436)
ddp._check_and_sync_module_buffers()
# Check if need to sync in the backward pass
should_sync_backwards = ddp._check_global_requires_backward_grad_sync(
is_joined_rank=True
)
# Forward parameter sync is disabled in the next iteration if we
# are skipping gradient sync this iteration, so set
# `require_forward_param_sync` accordingly
ddp.require_forward_param_sync = should_sync_backwards
if not should_sync_backwards:
return
# Schedule one allreduce per gradient bucket to match the backward
# pass allreduce
ddp._match_all_reduce_for_bwd_pass()
# Check if we need to allreduce locally unused parameters
if ddp.find_unused_parameters:
ddp._match_unused_params_allreduce()
# Rebuilt parameters are pushed only once during a training period
ddp.reducer._push_all_rebuilt_params()
def post_hook(self, is_last_joiner: bool):
"""Sync the final model to ensure that the model is the same across all processes."""
self.ddp._sync_final_model(is_last_joiner)
class DistributedDataParallel(Module, Joinable):
r"""Implement distributed data parallelism based on ``torch.distributed`` at module level.
This container provides data parallelism by synchronizing gradients
across each model replica. The devices to synchronize across are
specified by the input ``process_group``, which is the entire world
by default. Note that ``DistributedDataParallel`` does not chunk or
otherwise shard the input across participating GPUs; the user is
responsible for defining how to do so, for example through the use
of a :class:`DistributedSampler`.
See also: :ref:`distributed-basics` and :ref:`cuda-nn-ddp-instead`.
The same constraints on input as in :class:`torch.nn.DataParallel` apply.
Creation of this class requires that ``torch.distributed`` to be already
initialized, by calling :func:`torch.distributed.init_process_group`.
``DistributedDataParallel`` is proven to be significantly faster than
:class:`torch.nn.DataParallel` for single-node multi-GPU data
parallel training.
To use ``DistributedDataParallel`` on a host with N GPUs, you should spawn
up ``N`` processes, ensuring that each process exclusively works on a single
GPU from 0 to N-1. This can be done by either setting
``CUDA_VISIBLE_DEVICES`` for every process or by calling:
>>> # xdoctest: +SKIP("undefined variables")
>>> torch.cuda.set_device(i)
where i is from 0 to N-1. In each process, you should refer the following
to construct this module:
>>> # xdoctest: +SKIP("undefined variables")
>>> torch.distributed.init_process_group(
>>> backend='nccl', world_size=N, init_method='...'
>>> )
>>> model = DistributedDataParallel(model, device_ids=[i], output_device=i)
In order to spawn up multiple processes per node, you can use either
``torch.distributed.launch`` or ``torch.multiprocessing.spawn``.
.. note::
Please refer to `PyTorch Distributed Overview <https://pytorch.org/tutorials/beginner/dist_overview.html>`__
for a brief introduction to all features related to distributed training.
.. note::
``DistributedDataParallel`` can be used in conjunction with
:class:`torch.distributed.optim.ZeroRedundancyOptimizer` to reduce
per-rank optimizer states memory footprint. Please refer to
`ZeroRedundancyOptimizer recipe <https://pytorch.org/tutorials/recipes/zero_redundancy_optimizer.html>`__
for more details.
.. note:: ``nccl`` backend is currently the fastest and highly recommended
backend when using GPUs. This applies to both single-node and
multi-node distributed training.
.. note:: This module also supports mixed-precision distributed training.
This means that your model can have different types of parameters such
as mixed types of ``fp16`` and ``fp32``, the gradient reduction on these
mixed types of parameters will just work fine.
.. note:: If you use ``torch.save`` on one process to checkpoint the module,
and ``torch.load`` on some other processes to recover it, make sure that
``map_location`` is configured properly for every process. Without
``map_location``, ``torch.load`` would recover the module to devices
where the module was saved from.
.. note:: When a model is trained on ``M`` nodes with ``batch=N``, the
gradient will be ``M`` times smaller when compared to the same model
trained on a single node with ``batch=M*N`` if the loss is summed (NOT
averaged as usual) across instances in a batch (because the gradients
between different nodes are averaged). You should take this into
consideration when you want to obtain a mathematically equivalent
training process compared to the local training counterpart. But in most
cases, you can just treat a DistributedDataParallel wrapped model, a
DataParallel wrapped model and an ordinary model on a single GPU as the
same (E.g. using the same learning rate for equivalent batch size).
.. note::
Parameters are never broadcast between processes. The module performs
an all-reduce step on gradients and assumes that they will be modified
by the optimizer in all processes in the same way. Buffers
(e.g. BatchNorm stats) are broadcast from the module in process of rank
0, to all other replicas in the system in every iteration.
.. note::
If you are using DistributedDataParallel in conjunction with the
:ref:`distributed-rpc-framework`, you should always use
:meth:`torch.distributed.autograd.backward` to compute gradients and
:class:`torch.distributed.optim.DistributedOptimizer` for optimizing
parameters.
Example::
>>> # xdoctest: +SKIP("undefined variables")
>>> import torch.distributed.autograd as dist_autograd
>>> from torch.nn.parallel import DistributedDataParallel as DDP
>>> import torch
>>> from torch import optim
>>> from torch.distributed.optim import DistributedOptimizer
>>> import torch.distributed.rpc as rpc
>>> from torch.distributed.rpc import RRef
>>>
>>> t1 = torch.rand((3, 3), requires_grad=True)
>>> t2 = torch.rand((3, 3), requires_grad=True)
>>> rref = rpc.remote("worker1", torch.add, args=(t1, t2))
>>> ddp_model = DDP(my_model)
>>>
>>> # Setup optimizer
>>> optimizer_params = [rref]
>>> for param in ddp_model.parameters():
>>> optimizer_params.append(RRef(param))
>>>
>>> dist_optim = DistributedOptimizer(
>>> optim.SGD,
>>> optimizer_params,
>>> lr=0.05,
>>> )
>>>
>>> with dist_autograd.context() as context_id:
>>> pred = ddp_model(rref.to_here())
>>> loss = loss_func(pred, target)
>>> dist_autograd.backward(context_id, [loss])
>>> dist_optim.step(context_id)
.. note::
DistributedDataParallel currently offers limited support for gradient
checkpointing with :meth:`torch.utils.checkpoint`.
If the checkpoint is done with use_reentrant=False (recommended), DDP
will work as expected without any limitations.
If, however, the checkpoint is done with use_reentrant=True (the default),
DDP will work as expected when there are no unused parameters in the model
and each layer is checkpointed at most once (make sure you are not passing
`find_unused_parameters=True` to DDP). We currently do not support the
case where a layer is checkpointed multiple times, or when there unused
parameters in the checkpointed model.
.. note::
To let a non-DDP model load a state dict from a DDP model,
:meth:`~torch.nn.modules.utils.consume_prefix_in_state_dict_if_present`
needs to be applied to strip the prefix "module." in the DDP state dict before loading.
.. warning::
Constructor, forward method, and differentiation of the output (or a
function of the output of this module) are distributed synchronization
points. Take that into account in case different processes might be
executing different code.
.. warning::
This module assumes all parameters are registered in the model by the
time it is created. No parameters should be added nor removed later.
Same applies to buffers.
.. warning::
This module assumes all parameters are registered in the model of each
distributed processes are in the same order. The module itself will
conduct gradient ``allreduce`` following the reverse order of the
registered parameters of the model. In other words, it is users'
responsibility to ensure that each distributed process has the exact
same model and thus the exact same parameter registration order.
.. warning::
This module allows parameters with non-rowmajor-contiguous strides.
For example, your model may contain some parameters whose
:class:`torch.memory_format` is ``torch.contiguous_format``
and others whose format is ``torch.channels_last``. However,
corresponding parameters in different processes must have the
same strides.
.. warning::
This module doesn't work with :func:`torch.autograd.grad` (i.e. it will
only work if gradients are to be accumulated in ``.grad`` attributes of
parameters).
.. warning::
If you plan on using this module with a ``nccl`` backend or a ``gloo``
backend (that uses Infiniband), together with a DataLoader that uses
multiple workers, please change the multiprocessing start method to
``forkserver`` (Python 3 only) or ``spawn``. Unfortunately
Gloo (that uses Infiniband) and NCCL2 are not fork safe, and you will
likely experience deadlocks if you don't change this setting.
.. warning::
You should never try to change your model's parameters after wrapping
up your model with ``DistributedDataParallel``. Because, when
wrapping up your model with ``DistributedDataParallel``, the constructor
of ``DistributedDataParallel`` will register the additional gradient
reduction functions on all the parameters of the model itself at the
time of construction. If you change the model's parameters afterwards,
gradient reduction functions no longer match the correct set of
parameters.
.. warning::
Using ``DistributedDataParallel`` in conjunction with the
:ref:`distributed-rpc-framework` is experimental and subject to change.
Args:
module (Module): module to be parallelized
device_ids (list of int or torch.device): CUDA devices.
1) For single-device modules, ``device_ids`` can
contain exactly one device id, which represents the only
CUDA device where the input module corresponding to this process resides.
Alternatively, ``device_ids`` can also be ``None``.
2) For multi-device modules and CPU modules,
``device_ids`` must be ``None``.
When ``device_ids`` is ``None`` for both cases,
both the input data for the forward pass and the actual module
must be placed on the correct device.
(default: ``None``)
output_device (int or torch.device): Device location of output for
single-device CUDA modules. For multi-device modules and
CPU modules, it must be ``None``, and the module itself
dictates the output location. (default: ``device_ids[0]``
for single-device modules)
broadcast_buffers (bool): Flag that enables syncing (broadcasting)
buffers of the module at beginning of the ``forward``
function. (default: ``True``)
process_group: The process group to be used for distributed data
all-reduction. If ``None``, the default process group, which
is created by :func:`torch.distributed.init_process_group`,
will be used. (default: ``None``)
bucket_cap_mb: ``DistributedDataParallel`` will bucket parameters into
multiple buckets so that gradient reduction of each
bucket can potentially overlap with backward computation.
:attr:`bucket_cap_mb` controls the bucket size in
MegaBytes (MB). (default: 25)
find_unused_parameters (bool): Traverse the autograd graph from all
tensors contained in the return value of the
wrapped module's ``forward`` function. Parameters
that don't receive gradients as part of this
graph are preemptively marked as being ready to
be reduced. In addition, parameters that may have
been used in the wrapped module's ``forward``
function but were not part of loss computation and
thus would also not receive gradients are
preemptively marked as ready to be reduced.
(default: ``False``)
check_reduction: This argument is deprecated.
gradient_as_bucket_view (bool): When set to ``True``, gradients will be views
pointing to different offsets of ``allreduce`` communication
buckets. This can reduce peak memory usage, where the
saved memory size will be equal to the total gradients
size. Moreover, it avoids the overhead of copying between
gradients and ``allreduce`` communication buckets. When
gradients are views, ``detach_()`` cannot be called on the
gradients. If hitting such errors, please fix it by
referring to the :meth:`~torch.optim.Optimizer.zero_grad`
function in ``torch/optim/optimizer.py`` as a solution.
Note that gradients will be views after first iteration, so
the peak memory saving should be checked after first iteration.
static_graph (bool): When set to ``True``, DDP knows the trained graph is
static. Static graph means 1) The set of used and unused
parameters will not change during the whole training loop; in
this case, it does not matter whether users set
``find_unused_parameters = True`` or not. 2) How the graph is trained
will not change during the whole training loop (meaning there is
no control flow depending on iterations).
When static_graph is set to be ``True``, DDP will support cases that
can not be supported in the past:
1) Reentrant backwards.
2) Activation checkpointing multiple times.
3) Activation checkpointing when model has unused parameters.
4) There are model parameters that are outside of forward function.
5) Potentially improve performance when there are unused parameters,
as DDP will not search graph in each iteration to detect unused
parameters when static_graph is set to be ``True``.
To check whether you can set static_graph to be ``True``, one way is to
check ddp logging data at the end of your previous model training,
if ``ddp_logging_data.get("can_set_static_graph") == True``, mostly you
can set ``static_graph = True`` as well.
Example::
>>> # xdoctest: +SKIP("undefined variables")
>>> model_DDP = torch.nn.parallel.DistributedDataParallel(model)
>>> # Training loop
>>> ...
>>> ddp_logging_data = model_DDP._get_ddp_logging_data()
>>> static_graph = ddp_logging_data.get("can_set_static_graph")
delay_all_reduce_named_params (list of tuple of str and torch.nn.Parameter): a list
of named parameters whose all reduce will be delayed when the gradient of
the parameter specified in ``param_to_hook_all_reduce`` is ready. Other
arguments of DDP do not apply to named params specified in this argument
as these named params will be ignored by DDP reducer.
param_to_hook_all_reduce (torch.nn.Parameter): a parameter to hook delayed all reduce
of parameters specified in ``delay_all_reduce_named_params``.
Attributes:
module (Module): the module to be parallelized.
Example::
>>> # xdoctest: +SKIP("undefined variables")
>>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
>>> net = torch.nn.parallel.DistributedDataParallel(model)
"""
# used to track whether the given thread is inside ddp forward for torchdynamo purposes
_active_ddp_module: Optional["DistributedDataParallel"] = None
def __init__(
self,
module,
device_ids=None,
output_device=None,
dim=0,
broadcast_buffers=True,
process_group=None,
bucket_cap_mb=25,
find_unused_parameters=False,
check_reduction=False,
gradient_as_bucket_view=False,
static_graph=False,
delay_all_reduce_named_params=None,
param_to_hook_all_reduce=None,
mixed_precision: Optional[_MixedPrecision] = None,
device_mesh=None,
):
super().__init__()
Joinable.__init__(self)
self.logger = None
if bool(delay_all_reduce_named_params is not None) != bool(
param_to_hook_all_reduce is not None
):
self._log_and_throw(
ValueError,
"delay_all_reduce_named_params and param_to_hook_all_reduce "
"need to be set at the same time.",
)
self._delay_all_reduce_params = []
if hasattr(module, "_ddp_params_and_buffers_to_ignore"):
self.parameters_to_ignore = set(module._ddp_params_and_buffers_to_ignore)
else:
self.parameters_to_ignore = set()
if delay_all_reduce_named_params is not None:
for name, param in delay_all_reduce_named_params:
self.parameters_to_ignore.add(name)
self._delay_all_reduce_params.append(param)
self._module_parameters = [
p
for n, p in module.named_parameters()
if n not in self.parameters_to_ignore
]
if not any(p.requires_grad for p in self._module_parameters):
if len(self._delay_all_reduce_params):
logger.info("Delay the AllReduce of all parameters.")
else:
self._log_and_throw(
RuntimeError,
"DistributedDataParallel is not needed when a module "
"doesn't have any parameter that requires a gradient.",
)
if device_ids is not None and len(device_ids) > 1:
self._log_and_throw(
ValueError,
"device_ids can only be None or contain a single element.",
)
self.is_multi_device_module = (
len({p.device for p in self._module_parameters}) > 1
)
distinct_device_types = {
p.device.type for p in self._module_parameters if p.device is not None
}
if len(distinct_device_types) != 1:
self._log_and_throw(
ValueError,
"DistributedDataParallel's input module must be on "
f"the same type of devices, but input module parameters locate in {distinct_device_types}.",
)
self.device_type = next(iter(distinct_device_types))
if (
device_ids is None
or len(device_ids) == 0 # For backward compatibility.
or self.device_type == "cpu"
or self.is_multi_device_module
):
if device_ids or output_device:
self._log_and_throw(
ValueError,
"DistributedDataParallel device_ids and output_device arguments "
"only work with single-device/multiple-device GPU modules or CPU modules, "
"but got device_ids {}, output_device {}, and module parameters {}.".format(
device_ids,
output_device,
{p.device for p in self._module_parameters},
),
)
self.device_ids = None
self.output_device = None
else:
self.device_ids = [_get_device_index(x, True) for x in device_ids]
if output_device is None:
output_device = device_ids[0]
self.output_device = _get_device_index(output_device, True)
if process_group and device_mesh is not None:
raise RuntimeError(
"Cannot specify both process_group and device_mesh arguments."
)
elif process_group is None and device_mesh is None:
self.process_group = _get_default_group()
elif device_mesh is None:
self.process_group = process_group
else:
if device_mesh.ndim != 1:
raise RuntimeError(
f"Only 1D device mesh is supported, but got {device_mesh}."
)
self.device_mesh = device_mesh
self.process_group = device_mesh.get_group(mesh_dim=0)
self.static_graph = False
self.dim = dim
self.module = module
self.device = next(iter(self._module_parameters)).device
self.broadcast_buffers = broadcast_buffers
self.find_unused_parameters = find_unused_parameters
self.require_backward_grad_sync = True
self.require_forward_param_sync = True
self.gradient_as_bucket_view = gradient_as_bucket_view
self.mixed_precision = mixed_precision
if self.mixed_precision is not None:
logger.warning("Received mixed precision config %s", self.mixed_precision)
if check_reduction:
# This argument is no longer used since the reducer
# will ensure reduction completes even if some parameters
# do not receive gradients.
warnings.warn(
"The `check_reduction` argument in `DistributedDataParallel` "
"module is deprecated. Please avoid using it."
)
# Check that a module does not have Uninitialized parameters
for param in self._module_parameters:
if isinstance(param, torch.nn.parameter.UninitializedParameter):
self._log_and_throw(
RuntimeError,
"Modules with uninitialized parameters can't be used with `DistributedDataParallel`. "
"Run a dummy forward pass to correctly initialize the modules",
)
# used for intra-node param sync and inter-node sync as well
self.broadcast_bucket_size = int(250 * 1024 * 1024)
# reduction bucket size
self.bucket_bytes_cap = int(bucket_cap_mb * 1024 * 1024)
# Whether to perform input tensor CPU to GPU copies on a side-stream
self.use_side_stream_for_tensor_copies = (
os.environ.get("PYTORCH_DDP_USE_SIDE_STREAM", "1") == "1"
)
# Initialize gradient buffers and register all reduce hook
self._delay_grad_buffer = None
self._delay_grad_views: List[torch.Tensor] = []
self._delay_all_reduce_all_params = False
if len(self._delay_all_reduce_params) != 0:
self._register_delay_all_reduce_hook(
bucket_cap_mb=bucket_cap_mb,
param_to_hook_all_reduce=param_to_hook_all_reduce,
device_ids=device_ids,
)
if self._delay_all_reduce_all_params:
return
# Build parameters for reducer.
parameters, expect_sparse_gradient = self._build_params_for_reducer()
# Verify model equivalence.
_verify_param_shape_across_processes(self.process_group, parameters)
# Sync params and buffers. Ensures all DDP models start off at the same value.
_sync_module_states(
module=self.module,
process_group=self.process_group,
broadcast_bucket_size=self.broadcast_bucket_size,
src=0,
params_and_buffers_to_ignore=self.parameters_to_ignore,
broadcast_buffers=self.broadcast_buffers,
)
# In debug mode, build a mapping of parameter index -> parameter.
param_to_name_mapping = self._build_debug_param_to_name_mapping(parameters)
# Builds reducer.
self._ddp_init_helper(
parameters,
expect_sparse_gradient,
param_to_name_mapping,
static_graph,
)
self._comm_hooks: List[Tuple[Callable, object]] = []
if self.mixed_precision is not None:
_setup_mixed_precision_params(self.mixed_precision, self.module)
_cast_buffers(self.mixed_precision, self.module)
# Stream used for async low precision copies.
self._mp_stream = torch.cuda.Stream()
self._submodule_to_event = defaultdict(deque) # type: ignore[var-annotated]
# Add forward pre-hook to root module to kick off copies to lower
# precision.
self.module.register_forward_pre_hook(
self._root_copy_hook, prepend=False, with_kwargs=True
)
# Add forward pre hook to all submodules to wait for copy events
# before running computation.
for module in self.module.modules():
module.register_forward_pre_hook(
self._module_wait_for_copy_hook,
prepend=False,
with_kwargs=True,
)
# Set up callbacks in backward to upcast and use full precision
# params. TODO (rohan-varma): Make this compose with general
# comm hooks and apply_optimizer_in_backward. Importing inline to
# avoid circular import issue.
from torch.distributed.algorithms.ddp_comm_hooks.mixed_precision_hooks import (
_AllreduceUpcastHookState,
_reducer_allreduce_and_upcast_hook,
)
upcast_hook_state = _AllreduceUpcastHookState(
ddp_weakref=weakref.ref(self),
upcast_stream=torch.cuda.Stream(),
)
self.register_comm_hook(
upcast_hook_state,
_reducer_allreduce_and_upcast_hook,
)
# Inform reducer of reduced precision param dtype for correctness
# of type checks between gradient and bucket.
self.reducer._set_mixed_precision_param_dtype( # type: ignore[attr-defined]
self.mixed_precision.param_dtype
)
self._has_rebuilt_buckets = False
if static_graph:
self._set_static_graph()
self._lazy_init_ran = False
# Register the AccumulateGrad post hooks if optimize_ddp is
# True. The hooks will be deregistered if compiled_autograd is not
# enabled.
self._accum_grad_hooks: List[RemovableHandle] = []
optimize_ddp = torch._dynamo.config._get_optimize_ddp_mode()
self._use_python_reducer = optimize_ddp in (
"python_reducer",
"python_reducer_without_compiled_forward",
)
self._force_to_disable_cpp_reducer = (
optimize_ddp == "python_reducer_without_compiled_forward"
)
if self._use_python_reducer:
self._register_accum_grad_hook()
def _register_accum_grad_hook(self):
import torch.distributed._functional_collectives as fcol
def compiled_accum_grad_hook(
param,
*,
param_index: int,
):
if not self.require_backward_grad_sync:
return
if param.grad is None:
return
if self._comm_hooks:
for hook, state in self._comm_hooks:
hook(state, (param.grad, param))
else:
gradient = param.grad / self.process_group.size()
gradient = fcol.all_reduce(gradient, "sum", self.process_group)
param.grad.copy_(gradient)
for index, param in enumerate(self._module_parameters):
self._accum_grad_hooks.append(
param.register_post_accumulate_grad_hook(
functools.partial(
compiled_accum_grad_hook,
param_index=index,
)
)
)
def _delayed_all_reduce_hook(self, grad):
world_size = dist.get_world_size(self.process_group)
self._delay_grad_buffer.div_(world_size) # type: ignore[union-attr]
_ = dist.all_reduce(
self._delay_grad_buffer, group=self.process_group, async_op=True
)
return grad
def _register_delay_all_reduce_hook(
self,
bucket_cap_mb,
param_to_hook_all_reduce,
device_ids,
):
# 1. Create gradient buffer
device = torch.device("cpu") if device_ids is None else device_ids[0]
self._delay_grad_buffer = torch.zeros(
sum([p.numel() for p in self._delay_all_reduce_params]),
device=device,
)
# 2. Broadcast the parameters
detached_params = [p.detach() for p in self._delay_all_reduce_params]
dist._broadcast_coalesced(self.process_group, detached_params, bucket_cap_mb, 0)
# 3. Hook all reduce to the specified parameter
param_to_hook_all_reduce.register_hook(self._delayed_all_reduce_hook)
# 4. Build tensor views for gradients
offset = 0
for param in self._delay_all_reduce_params:
grad_view = self._delay_grad_buffer[offset : (offset + param.numel())].view(
param.shape
)
self._delay_grad_views.append(grad_view)
offset = offset + param.numel()
# 5. Check whether the all reduce of all params requiring grad is delayed.
for module_name, module in self.module.named_modules():
for param_name, param in module.named_parameters(recurse=False):
if param.requires_grad:
full_name = f"{module_name}.{param_name}"
if full_name not in self.parameters_to_ignore:
# There is at least a param whose all reduce will not be delayed.
# In this case, we should not set self._delay_all_reduce_all_params
# to True.
return
self._delay_all_reduce_all_params = True
def _setup_in_backward_optimizers(self):
# Check if user has used apply_optim_in_backward to overlap optimizer
# step + DDP backward. Current constraints:
# 1. Only allreduce is supported at the moment, no custom communication.
# 2. For DDP-managed parameters that have their optimizer run in
# backward, their gradients are set to ``None``. If your use case
# requires DDP parameters grad not to be set to ``None`` after their
# in-backward optimizer runs, please ping
# https://github.com/pytorch/pytorch/issues/90052.
# NOTE: we use self._module_parameters instead of .parameters() since
# the former excludes ignored (non-DDP managed) parameters.
if any(hasattr(p, "_in_backward_optimizers") for p in self._module_parameters):
torch._C._log_api_usage_once("ddp.optimizer_in_backward")
# Remove hooks that apply_optim_in_backward had registered because
# DDP customizes how optimizer is overlapped with backward due to
# the allreduce.
param_to_handle_map = (
dist.optim.apply_optimizer_in_backward.param_to_optim_hook_handle_map
)
for p in self._module_parameters:
for handle in param_to_handle_map.get(p, []):
handle.remove()
# Need a weakref to DDP instance to run all_reduce (from reducer)
# and get managed DDP parameters.
ddp_weakref = weakref.ref(self)
# Note: importing in function, otherwise this will cause a circular
# import.
from torch.distributed.algorithms.ddp_comm_hooks.optimizer_overlap_hooks import (
_apply_optim_in_backward_hook,
)
self.register_comm_hook(
ddp_weakref,
_apply_optim_in_backward_hook(
gradient_is_bucket_view=self.gradient_as_bucket_view
),
)
self.reducer._set_optimizer_in_backward() # type: ignore[attr-defined]
def _fire_reducer_autograd_hook(self, idx, *unused):
"""
Fire the reducer's autograd hook to allreduce params in a Reducer bucket.
Note that this is only used during mixed precision training as the
Reducer's hooks installed during construction time would not be called
as we're working in the low precision parameter setting.
"""
self.reducer._autograd_hook(idx) # type: ignore[attr-defined]
def _root_copy_hook(self, *args: Any, **kwargs: Any) -> None:
"""
For DDP mixed precision, put low precision copies on separate stream and create events to wait for them.
When training with DDP mixed precision, this root pre-forward hook kicks
off low precision copies on a separate stream and creates respective
events to wait for them.
"""
# Clear out previous iteration submodule to event. This is because we
# may have populated some events for modules that didn't end up being
# used.
self._submodule_to_event = defaultdict(deque) # type: ignore[var-annotated]
with torch.cuda.stream(self._mp_stream):
for submodule in self.module.modules():
for param in submodule.parameters(recurse=False):
# Do not cast DDP ignored parameters.
if hasattr(param, "_ddp_ignored") and param._ddp_ignored:
continue
_alloc_storage(param._mp_param, param.size())
# copy() implicitly casts to low precision
with torch.no_grad():
param._mp_param.copy_(param.data)
# TODO: when zero_grad(set_to_none=False) or in grad
# accumulation case, accumulated grads can be in fp32
# which can cause errors when running DDP backwards due
# to mismatched incoming and accumulated gradient types.
# So we manually cast the accumulated grad down for now,
# in the future we may shift to FSDP style gradient
# accumulation management where the accumulated gradient
# is saved and .grad field is set to None, bypassing
# this issue.
if param.grad is not None:
param.grad.data = param.grad.to(
self.mixed_precision.param_dtype # type: ignore[union-attr]
)
param.data = param._mp_param
copy_event = torch.cuda.Event()
copy_event.record()
self._submodule_to_event[submodule].append(copy_event)
def _module_wait_for_copy_hook(
self,
module,
*args: Any,
**kwargs: Any,
) -> None:
"""Before carrying out computation, wait on the appropriate event to ensure low precision copies have finished."""
try:
event = self._submodule_to_event[module].popleft()
except IndexError:
# copy event has already been waited on
return
event.wait(stream=torch.cuda.current_stream())
for p in module.parameters(recurse=False):
# Don't register hooks if param does not require grad
if not p.requires_grad or (hasattr(p, "_ddp_ignored") and p._ddp_ignored):
continue
# We need to register autograd hook here instead of DDP's ctor
# since we're working with the low precision param. Register them
# via obtaining the gradient accumulator.
tmp = p.expand_as(p)
grad_acc = tmp.grad_fn.next_functions[0][0]
hook = grad_acc.register_hook(
functools.partial(self._fire_reducer_autograd_hook, p._idx)
)
p._ddp_mp_hook_state = (grad_acc, hook)
def _log_and_throw(self, err_type, err_msg):
if self.logger is not None:
self.logger.set_error_and_log(f"{str(err_type)}: {err_msg}")
raise err_type(err_msg)
def _ddp_init_helper(
self,
parameters,
expect_sparse_gradient,
param_to_name_mapping,
static_graph,
):
"""
DDP init helper function to manage parameters, grad hooks, logging, and SyncBatchNorm.
Initialization helper function that does the following:
(1) bucketing the parameters for reductions
(2) resetting the bucketing states
(3) registering the grad hooks
(4) Logging construction-time DDP logging data
(5) passing a handle of DDP to SyncBatchNorm Layer
"""
# Notice, the parameters order is not in the order in which they are used,
# especially in models with control flow.
#
# Alongside parameters are not presented in the real execution order,
# if a certain model happens to also
# 1) have other collectives comm ops in its backward graph.
# 2) have unused parameter in subset ranks of the whole world.
# bucketing could insert ALL-REDUCE comm op too early on the rank with unused parameter,
# matching up with other collectives comm ops on other ranks unexpectedly.
#
# In order to handle this corner case, when the parameters are not in the real execution order,
# we don't do bucketing, thus only one ALL-REDUCE is inserted after all the gradients
# of the whole graph are computed.
#
# Notice, here we only disable bucketing for the first iteration.
# After the first iteration, it's OK to rebuild buckets,
# because "bucket rebuild" bucketizes parameters based on its real execution order in backward graph.
# Can remove this branching once #73732 is landed.
if static_graph is True or self.find_unused_parameters is False:
bucket_size_limits = [sys.maxsize]
else:
bucket_size_limits = [
dist._DEFAULT_FIRST_BUCKET_BYTES,
self.bucket_bytes_cap,
]
(
bucket_indices,
per_bucket_size_limits,
) = dist._compute_bucket_assignment_by_size(
parameters,
bucket_size_limits,
expect_sparse_gradient,
)
# Remember index for parameters if we are in mixed precision, as we
# need to pass in index to Reducer's autograd hook via python.
if self.mixed_precision is not None:
for i, p in enumerate(parameters):
p._idx = i
# Note: reverse list of buckets because we want to approximate the
# order in which their gradients are produced, and assume they
# are used in the forward pass in the order they are defined.
self.reducer = dist.Reducer(
parameters,
list(reversed(bucket_indices)),
list(reversed(per_bucket_size_limits)),
self.process_group,
expect_sparse_gradient,
# The bucket size limit is specified in the constructor.
# Additionally, we allow for a single small bucket for parameters
# that are defined first, such that their gradients don't spill into
# a much larger bucket, adding unnecessary latency after gradient
# computation finishes. Experiments showed 1MB is a reasonable value.
self.bucket_bytes_cap,
self.find_unused_parameters,
self.gradient_as_bucket_view,
param_to_name_mapping,
# User can set dist._DEFAULT_FIRST_BUCKET_BYTES to tune DDP first
# bucket.
dist._DEFAULT_FIRST_BUCKET_BYTES,
)
self.logger = dist.Logger(self.reducer)
# Set as a weak reference to avoid reference cycle between
# logger and reducer.
self.reducer.set_logger(self.logger)
has_sync_bn = False
for submodule in self.module.modules():
if isinstance(submodule, torch.nn.SyncBatchNorm):
has_sync_bn = True
break
# Set logging data that can be got during construction time.
self.logger.set_construction_data_and_log(
self.module.__class__.__name__,
[] if self.device_ids is None else self.device_ids,
-1 if self.output_device is None else self.output_device,
self.broadcast_buffers,
has_sync_bn,
static_graph,
)
# passing a handle to torch.nn.SyncBatchNorm layer
self._passing_sync_batchnorm_handle(self.module)
def __getstate__(self):
self._check_default_group()
attrs = copy.copy(self.__dict__)
del attrs["process_group"]
del attrs["reducer"]
del attrs["logger"]
return attrs
def __setstate__(self, state):
# If serializable, then the process group should be the default one
self.process_group = _get_default_group()
super().__setstate__(state)
self.__dict__.setdefault("require_forward_param_sync", True)
self.__dict__.setdefault("require_backward_grad_sync", True)
parameters, expect_sparse_gradient = self._build_params_for_reducer()
# In debug mode, build a mapping of parameter index -> parameter.
param_to_name_mapping = self._build_debug_param_to_name_mapping(parameters)
# Builds reducer.
self._ddp_init_helper(
parameters,
expect_sparse_gradient,
param_to_name_mapping,
self.static_graph,
)
if self.static_graph:
self.reducer._set_static_graph()
assert self.logger is not None
self.logger._set_static_graph()
def _build_params_for_reducer(self):
# Build tuple of (module, parameter) for all parameters that require grads.
modules_and_parameters = [
(module, parameter)
for module_name, module in self.module.named_modules()
for parameter in [
param
# Note that we access module.named_parameters instead of
# parameters(module). parameters(module) is only needed in the
# single-process multi device case, where it accesses replicated
# parameters through _former_parameters.
for param_name, param in module.named_parameters(recurse=False)
if param.requires_grad
and f"{module_name}.{param_name}" not in self.parameters_to_ignore
]
]
# Deduplicate any parameters that might be shared across child modules.
memo = set()
modules_and_parameters = [
# "p not in memo" is the deduplication check.
# "not memo.add(p)" is always True, and it's only there to cause "add(p)" if needed.
(m, p)
for m, p in modules_and_parameters
if p not in memo and not memo.add(p) # type: ignore[func-returns-value]
]
# Build list of parameters.
parameters = [parameter for _, parameter in modules_and_parameters]
# Checks if a module will produce a sparse gradient.
def produces_sparse_gradient(module):
if isinstance(module, (torch.nn.Embedding, torch.nn.EmbeddingBag)):
return module.sparse
return False
# Build list of booleans indicating whether or not to expect sparse
# gradients for the corresponding parameters.
expect_sparse_gradient = [
produces_sparse_gradient(module) for module, _ in modules_and_parameters
]
self._assign_modules_buffers()
return parameters, expect_sparse_gradient
def _assign_modules_buffers(self):
"""
Assign self.module.named_buffers to self.modules_buffers.
Assigns module buffers to self.modules_buffers which are then used to
broadcast across ranks when broadcast_buffers=True. Note that this
must be called every time buffers need to be synced because buffers can
be reassigned by user module,
see https://github.com/pytorch/pytorch/issues/63916.
"""
# Collect buffers for modules, filtering out buffers that should be ignored.
named_module_buffers = [
(buffer, buffer_name)
for buffer_name, buffer in self.module.named_buffers()
if buffer_name not in self.parameters_to_ignore
]
self.modules_buffers = [
buffer for (buffer, buffer_name) in named_module_buffers
]
# Dict[str, tensor] representing module buffers not ignored by DDP.
self.named_module_buffers = {
buffer_name: buffer for (buffer, buffer_name) in named_module_buffers
}
def _build_debug_param_to_name_mapping(self, parameters):
param_to_param_index = {parameters[i]: i for i in range(len(parameters))}
param_set = set(parameters)
param_index_to_param_fqn = {}
for module_name, module in self.module.named_modules():
for param_name, param in module.named_parameters(recurse=False):
fqn = f"{module_name}.{param_name}"
# Bypass ignored parameters since those are not reduced by DDP
# to begin with.
if fqn not in self.parameters_to_ignore and param.requires_grad:
if param not in param_set:
self._log_and_throw(
ValueError,
f"Param with name {fqn} found in module parameters, but not DDP parameters."
" This indicates a bug in DDP, please report an issue to PyTorch.",
)
param_index = param_to_param_index[param]
param_index_to_param_fqn[param_index] = fqn
# Ensure we covered all parameters
if len(param_set) != len(param_index_to_param_fqn):
self._log_and_throw(
ValueError,
(
"Expected param to name mapping to cover all parameters, but"
f" got conflicting lengths: {len(param_set)} vs "
f"{len(param_index_to_param_fqn)}. This indicates a bug in DDP"
", please report an issue to PyTorch."
),
)
return param_index_to_param_fqn
def _get_parameters(self, m, recurse=True):
"""Return a generator of module parameters."""
def model_parameters(m):
ps = (
m._former_parameters.values()
if hasattr(m, "_former_parameters")
else m.parameters(recurse=False)
)
yield from ps
for mod in m.modules() if recurse else [m]:
yield from model_parameters(mod)
def _check_default_group(self):
pickle_not_supported = False
try:
if self.process_group != _get_default_group():
pickle_not_supported = True
except RuntimeError:
pickle_not_supported = True
if pickle_not_supported:
self._log_and_throw(
RuntimeError,
"DDP Pickling/Unpickling are only supported "
"when using DDP with the default process "
"group. That is, when you have called "
"init_process_group and have not passed "
"process_group argument to DDP constructor",
)
@contextmanager
def no_sync(self):
r"""
Context manager to disable gradient synchronizations across DDP processes.
Within this context, gradients will be accumulated on module
variables, which will later be synchronized in the first
forward-backward pass exiting the context.
Example::
>>> # xdoctest: +SKIP("undefined variables")
>>> ddp = torch.nn.parallel.DistributedDataParallel(model, pg)
>>> with ddp.no_sync():
>>> for input in inputs:
>>> ddp(input).backward() # no synchronization, accumulate grads
>>> ddp(another_input).backward() # synchronize grads
.. warning::
The forward pass should be included inside the context manager, or
else gradients will still be synchronized.
"""
old_require_backward_grad_sync = self.require_backward_grad_sync
self.require_backward_grad_sync = False
try:
yield
finally:
self.require_backward_grad_sync = old_require_backward_grad_sync
@classmethod
def _get_active_ddp_module(cls):
"""`TorchDynamo` requires DDP's status and module for cooperative optimization."""
return cls._active_ddp_module
# note, this ctxmgr function is marked 'skip' in torchdynamo, so dynamo only kicks in
# for the 'module_to_run' underneath
# see torch._dynamo/eval_frame.py TorchPatcher.patch for more details
@contextmanager
@torch._disable_dynamo(recursive=False)
def _inside_ddp_forward(self):
DistributedDataParallel._active_ddp_module = self
try:
yield
finally:
DistributedDataParallel._active_ddp_module = None
def _run_ddp_forward(self, *inputs, **kwargs):
if self._use_python_reducer:
return self.module(*inputs, **kwargs) # type: ignore[index]
else:
with self._inside_ddp_forward():
return self.module(*inputs, **kwargs) # type: ignore[index]
def _clear_grad_buffer(self):
# Making param.grad points to the grad buffers before backward is based on the
# assumption that the grad accumulation is done in place in autograd engine,
# for some edge cases, if the grad accumulation in autograd engine is not in
# place, then the param.grad and grad buffers are detached.
if self._delay_grad_buffer is not None:
# We batch zero_grad for all params by resetting the whole grad
# buffer when the grad of all params is set to None.
all_param_grad_none = all(
param.grad is None for param in self._delay_all_reduce_params
)
for index, param in enumerate(self._delay_all_reduce_params):
if param.grad is None:
param.grad = self._delay_grad_views[index]
if not all_param_grad_none:
param.grad.zero_()
if all_param_grad_none:
self._delay_grad_buffer.zero_()
def _lazy_init(self):
# Initialization for DDP that occurs after construction, but lazily
# before the first forward pass.
self._setup_in_backward_optimizers()
self._lazy_init_ran = True
def _should_disable_cpp_reducer(self) -> bool:
return self._use_python_reducer and (
torch._utils.is_compiling() or self._force_to_disable_cpp_reducer
)
def _pre_forward(self, *inputs, **kwargs):
if self._should_disable_cpp_reducer():
return inputs, kwargs
# Disable the python reducer if compiled_autograd is not enabled.
if self._accum_grad_hooks:
for index, h in enumerate(self._accum_grad_hooks):
h.remove()
self._accum_grad_hooks.clear()
if not self._lazy_init_ran and not torch._utils.is_compiling():
self._lazy_init()
if self._delay_all_reduce_all_params:
return inputs, kwargs
if torch.is_grad_enabled() and self.require_backward_grad_sync:
assert self.logger is not None
self.logger.set_runtime_stats_and_log()
self.reducer.prepare_for_forward()
# Notify the join context that this process has not joined, if
# needed
work = Join.notify_join_context(self)
if work:
self.reducer._set_forward_pass_work_handle(
work, self._divide_by_initial_world_size # type: ignore[arg-type]
)
# Calling _rebuild_buckets before forward computation,
# It may allocate new buckets before deallocating old buckets
# inside _rebuild_buckets. To save peak memory usage,
# call _rebuild_buckets before the peak memory usage increases
# during forward computation.
# This should be called only once during whole training period.
if torch.is_grad_enabled() and self.reducer._rebuild_buckets():
logger.info("Reducer buckets have been rebuilt in this iteration.")
self._has_rebuilt_buckets = True
# sync params according to location (before/after forward) user
# specified as part of hook, if hook was specified.
if self._check_sync_bufs_pre_fwd():
self._sync_buffers()
if self._join_config.enable:
# Notify joined ranks whether they should sync in backwards pass or not.
self._check_global_requires_backward_grad_sync(is_joined_rank=False)
if self.device_ids:
moved_inputs, moved_kwargs = _to_kwargs(
inputs,
kwargs,
torch.device(self.device_type, self.device_ids[0]),
self.use_side_stream_for_tensor_copies,
)
args, kwargs = moved_inputs[0], moved_kwargs[0]
# Cast inputs to reduced precision if needed.
if self.mixed_precision is not None:
args, kwargs = _cast_forward_inputs(
self.mixed_precision.param_dtype,
*args,
**kwargs,
)
return args, kwargs
else:
# Cast inputs to reduced precision if needed.
# TODO (rohan-varma) test this codepath.
if self.mixed_precision is not None:
inputs, kwargs = _cast_forward_inputs(
self.mixed_precision.param_dtype,
*inputs,
**kwargs,
)
return inputs, kwargs
def _post_forward(self, output):
if self._should_disable_cpp_reducer():
return output
if self._delay_all_reduce_all_params:
self._clear_grad_buffer()
return output
# sync params according to location (before/after forward) user
# specified as part of hook, if hook was specified.
if self._check_sync_bufs_post_fwd():
self._sync_buffers()
if torch.is_grad_enabled() and self.require_backward_grad_sync:
self.require_forward_param_sync = True
# We'll return the output object verbatim since it is a freeform
# object. We need to find any tensors in this object, though,
# because we need to figure out which parameters were used during
# this forward pass, to ensure we short circuit reduction for any
# unused parameters. Only if `find_unused_parameters` is set.
if self.find_unused_parameters and not self.static_graph:
# Do not need to populate this for static graph.
self.reducer.prepare_for_backward(list(_find_tensors(output)))
else:
self.reducer.prepare_for_backward([])
else:
self.require_forward_param_sync = False
# TODO: DDPSink is currently enabled for unused parameter detection and
# static graph training for first iteration.
if (self.find_unused_parameters and not self.static_graph) or (
self.static_graph and not self._static_graph_delay_allreduce_enqueued
):
(
output_tensor_list,
treespec,
output_is_rref,
) = _tree_flatten_with_rref(output)
output_placeholders = [None for _ in range(len(output_tensor_list))]
# Do not touch tensors that have no grad_fn, which can cause issues
# such as https://github.com/pytorch/pytorch/issues/60733
for i, output in enumerate(output_tensor_list):
if torch.is_tensor(output) and output.grad_fn is None:
output_placeholders[i] = output
# When find_unused_parameters=True, makes tensors which require grad
# run through the DDPSink backward pass. When not all outputs are
# used in loss, this makes those corresponding tensors receive
# undefined gradient which the reducer then handles to ensure
# param.grad field is not touched and we don't error out.
passthrough_tensor_list = _DDPSink.apply(
weakref.ref(self),
*output_tensor_list,
)
for i in range(len(output_placeholders)):
if output_placeholders[i] is None:
output_placeholders[i] = passthrough_tensor_list[i]
# Reconstruct output data structure.
output = _tree_unflatten_with_rref(
output_placeholders, treespec, output_is_rref
)
# At the end of the forward pass, reset the grad buffer and grad views
self._clear_grad_buffer()
return output
def forward(self, *inputs, **kwargs):
with torch.autograd.profiler.record_function("DistributedDataParallel.forward"):
inputs, kwargs = self._pre_forward(*inputs, **kwargs)
output = (
self.module.forward(*inputs, **kwargs)
if self._delay_all_reduce_all_params
else self._run_ddp_forward(*inputs, **kwargs)
)
return self._post_forward(output)
def scatter(self, inputs, kwargs, device_ids):
return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
def to_kwargs(self, inputs, kwargs, device_id):
# Kept for BC
return _to_kwargs(
inputs,
kwargs,
torch.device(self.device_type, device_id),
self.use_side_stream_for_tensor_copies,
)
def gather(self, outputs, output_device):
return gather(outputs, output_device, dim=self.dim)
def train(self, mode=True):
super().train(mode)
return self
# When running in join mode, schedules an allreduce to notify joined ranks
# of whether backwards pass synchronization will run this iteration or not.
def _check_global_requires_backward_grad_sync(self, is_joined_rank):
if not is_joined_rank and self.require_backward_grad_sync:
requires_sync_tensor = torch.ones(1, device=self.device)
else:
requires_sync_tensor = torch.zeros(1, device=self.device)
work = dist.all_reduce(
requires_sync_tensor, group=self.process_group, async_op=True
)
# (kwen2501) This if condition is a plain translation of previous
# behavior, i.e. in the `is_joined_rank=False` case, `work.wait()`
# is not called and it doesn't care about the result. I am guessing
# that it just wants to fire a matching all-reduce and does not want
# the main stream to wait.
if is_joined_rank:
work.wait()
should_sync_backwards = requires_sync_tensor.item() != 0
return should_sync_backwards
else:
return None # Return value is not/should not be used.
# When running in join mode, checks and performs sync of module buffers if
# the models have buffers that should be synchronized in the forward pass.
def _check_and_sync_module_buffers(self):
if self._check_sync_bufs_pre_fwd():
authoritative_rank = self._find_common_rank(self._distributed_rank, False)
self._sync_module_buffers(authoritative_rank)
# When running in join model, agrees upon a common rank and broadcast model
# parameters to all other ranks.
def _sync_final_model(self, is_last_joiner):
# Agree upon the process that will be the authoritative model copy.
# The current rank is a candidate for being the authoritative copy if
# is_last_joiner=True. We break ties via picking the larger rank.
self._authoritative_rank = self._find_common_rank(
self._distributed_rank, is_last_joiner
)
_sync_module_states(
module=self.module,
process_group=self.process_group,
broadcast_bucket_size=self.broadcast_bucket_size,
src=self._authoritative_rank,
params_and_buffers_to_ignore=self.parameters_to_ignore,
broadcast_buffers=self.broadcast_buffers,
)
# Schedule comm ops to match those scheduled in the reducer's backward
# pass.
def _match_all_reduce_for_bwd_pass(self):
comm_work = []
# Schedule comm in the same order as Reducer schedules them, i.e.
# the order of the buckets. Retrieving the bucket order from the reducer
# ensures that we keep the same order in join mode, such as when bucket
# order is rebuilt dynamically.
# Returns grad_buckets in order, but real tensors are substituted with
# zero tensors of the same shape.
grad_buckets = self.reducer._get_zeros_like_grad_buckets()
for grad_bucket in grad_buckets:
# Joined processes contribute zero gradient. In the case that
# divide_by_initial_world_size=True, we divide grads by the static
# world size, if not, the dividing factor is reduced by the number
# of joined processes.
work = self.reducer._run_comm_hook(grad_bucket)
comm_work.append(work)
for work in comm_work:
work.wait()
# Allreduces the used parameter mapping across ranks.
def _match_unused_params_allreduce(self):
locally_used_param_map = self.reducer._get_local_used_map()
self.process_group.allreduce(locally_used_param_map)
def join(
self,
divide_by_initial_world_size: bool = True,
enable: bool = True,
throw_on_early_termination: bool = False,
):
r"""
Context manager for training with uneven inputs across processes in DDP.
This context manager will keep track of already-joined DDP processes,
and "shadow" the forward and backward passes by inserting collective
communication operations to match with the ones created by non-joined
DDP processes. This will ensure each collective call has a corresponding
call by already-joined DDP processes, preventing hangs or errors that
would otherwise happen when training with uneven inputs across
processes. Alternatively, if the flag ``throw_on_early_termination`` is
specified to be ``True``, all trainers will throw an error once one rank
runs out of inputs, allowing these errors to be caught and handled
according to application logic.
Once all DDP processes have joined, the context manager will broadcast
the model corresponding to the last joined process to all processes to
ensure the model is the same across all processes
(which is guaranteed by DDP).
To use this to enable training with uneven inputs across processes,
simply wrap this context manager around your training loop. No further
modifications to the model or data loading is required.
.. warning::
If the model or training loop this context manager is wrapped around
has additional distributed collective operations, such as
``SyncBatchNorm`` in the model's forward pass, then the flag
``throw_on_early_termination`` must be enabled. This is because this
context manager is not aware of non-DDP collective communication.
This flag will cause all ranks to throw when any one rank
exhausts inputs, allowing these errors to be caught and recovered
from across all ranks.
Args:
divide_by_initial_world_size (bool): If ``True``, will divide
gradients by the initial ``world_size`` DDP training was launched
with. If ``False``, will compute the effective world size
(number of ranks that have not depleted their inputs yet) and
divide gradients by that during allreduce. Set
``divide_by_initial_world_size=True`` to ensure every input
sample including the uneven inputs have equal weight in terms of
how much they contribute to the global gradient. This is
achieved by always dividing the gradient by the initial
``world_size`` even when we encounter uneven inputs. If you set
this to ``False``, we divide the gradient by the remaining
number of nodes. This ensures parity with training on a smaller
``world_size`` although it also means the uneven inputs would
contribute more towards the global gradient. Typically, you
would want to set this to ``True`` for cases where the last few
inputs of your training job are uneven. In extreme cases, where
there is a large discrepancy in the number of inputs, setting
this to ``False`` might provide better results.
enable (bool): Whether to enable uneven input detection or not. Pass
in ``enable=False`` to disable in cases where you know that
inputs are even across participating processes. Default is
``True``.
throw_on_early_termination (bool): Whether to throw an error
or continue training when at least one rank has exhausted
inputs. If ``True``, will throw upon the first rank reaching end
of data. If ``False``, will continue training with a smaller
effective world size until all ranks are joined. Note that if
this flag is specified, then the flag
``divide_by_initial_world_size`` would be ignored. Default
is ``False``.
Example::
>>> # xdoctest: +SKIP("Distributed")
>>> import torch
>>> import torch.distributed as dist
>>> import os
>>> import torch.multiprocessing as mp
>>> import torch.nn as nn
>>> # On each spawned worker
>>> def worker(rank):
>>> dist.init_process_group("nccl", rank=rank, world_size=2)
>>> torch.cuda.set_device(rank)
>>> model = nn.Linear(1, 1, bias=False).to(rank)
>>> model = torch.nn.parallel.DistributedDataParallel(
>>> model, device_ids=[rank], output_device=rank
>>> )
>>> # Rank 1 gets one more input than rank 0.
>>> inputs = [torch.tensor([1]).float() for _ in range(10 + rank)]
>>> with model.join():
>>> for _ in range(5):
>>> for inp in inputs:
>>> loss = model(inp).sum()
>>> loss.backward()
>>> # Without the join() API, the below synchronization will hang
>>> # blocking for rank 1's allreduce to complete.
>>> torch.cuda.synchronize(device=rank)
"""
return Join(
[self],
enable,
throw_on_early_termination,
divide_by_initial_world_size=divide_by_initial_world_size,
)
def join_hook(
self,
**kwargs,
):
r"""
DDP join hook enables training on uneven inputs by mirroring communications in forward and backward passes.
Arguments:
kwargs (dict): a :class:`dict` containing any keyword arguments
to modify the behavior of the join hook at run time; all
:class:`Joinable` instances sharing the same join context
manager are forwarded the same value for ``kwargs``.
The hook supports the following keyword arguments:
divide_by_initial_world_size (bool, optional):
If ``True``, then gradients are divided by the initial world
size that DDP was launched with.
If ``False``, then gradients are divided by the effective world
size (i.e. the number of non-joined processes), meaning that
the uneven inputs contribute more toward the global gradient.
Typically, this should be set to ``True`` if the degree of
unevenness is small but can be set to ``False`` in extreme
cases for possibly better results.
Default is ``True``.
"""
divide_by_initial_world_size = kwargs.get("divide_by_initial_world_size", True)
return _DDPJoinHook(
self, divide_by_initial_world_size=divide_by_initial_world_size
)
@property
def join_device(self):
return self.device
@property
def join_process_group(self):
return self.process_group
def _register_buffer_comm_hook(
self,
state,
hook: Callable,
comm_hook_location=_BufferCommHookLocation.POST_FORWARD,
):
r"""
Allow custom registration of hooks that define how buffer are synchronized across ranks.
The hook takes in an optional state and is passed in a Dict[str, Tensor]
corresponding to buffer names and the buffers, and can run arbitrary reductions
on buffers as opposed to DDP's default broadcast from rank 0. This is useful for
example if a counter needs to be summed or averaged across ranks every iteration.
Args:
state (Any): Optional state that is passed to the hook.
hook (Callable): Callable with the following signature:
``hook(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]``
comm_hook_location (_BufferCommHookLocation): Enum value indicating
where to run the hook.
_BufferCommHookLocation.PRE_FORWARD means that the
hook will run _before_ the forward pass, and
_BufferCommHookLocation.POST_FORWARD means that the
hook will run _after_ the forward pass.
NOTE: To maximize performance, users can return a
List[torch.futures.Future] from their hook, and DDP will
install and await these hooks appropriately at the end of
the backward pass. This will ensure all buffers are
synchronized by the end of the backward pass. If this
setting is used, it is recommended to pass
comm_hook_location=_BufferCommHookLocation.POST_FORWARD,
which will trigger the hook after the forward pass.
If _BufferCommHookLocation.PRE_FORWARD is used, users must
ensure appropriate synchronization when manipulating GPU
buffers in the forward pass.
"""
assert callable(hook)
self.buffer_hook = _BufferCommHook(
buffer_comm_hook=hook,
buffer_comm_hook_state=state,
buffer_comm_hook_location=comm_hook_location,
)
def register_comm_hook(self, state: object, hook: Callable):
r"""
Register communication hook for user-defined DDP aggregation of gradients across multiple workers.
This hook would be very useful for researchers to try out new ideas. For
example, this hook can be used to implement several algorithms like GossipGrad
and gradient compression which involve different communication strategies for
parameter syncs while running Distributed DataParallel training.
Args:
state (object): Passed to the hook to maintain any state information during the training process.
Examples include error feedback in gradient compression,
peers to communicate with next in GossipGrad, etc.
It is locally stored by each worker
and shared by all the gradient tensors on the worker.
hook (Callable): Callable with the following signature:
``hook(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]``:
This function is called once the bucket is ready. The
hook can perform whatever processing is needed and return
a Future indicating completion of any async work (ex: allreduce).
If the hook doesn't perform any communication, it still
must return a completed Future. The Future should hold the
new value of grad bucket's tensors. Once a bucket is ready,
c10d reducer would call this hook and use the tensors returned
by the Future and copy grads to individual parameters.
Note that the future's return type must be a single tensor.
We also provide an API called ``get_future`` to retrieve a
Future associated with the completion of ``c10d.ProcessGroup.Work``.
``get_future`` is currently supported for NCCL and also supported for most
operations on GLOO and MPI, except for peer to peer operations (send/recv).
.. warning ::
Grad bucket's tensors will not be predivided by world_size. User is responsible
to divide by the world_size in case of operations like allreduce.
.. warning ::
DDP communication hook can only be registered once and should be registered
before calling backward.
.. warning ::
The Future object that hook returns should contain a single tensor
that has the same shape with the tensors inside grad bucket.
.. warning ::
``get_future`` API supports NCCL, and partially GLOO and MPI backends (no support
for peer-to-peer operations like send/recv) and will return a ``torch.futures.Future``.
Example::
Below is an example of a noop hook that returns the same tensor.
>>> # xdoctest: +SKIP('undefined name')
>>> def noop(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]:
>>> fut = torch.futures.Future()
>>> fut.set_result(bucket.buffer())
>>> return fut
>>> ddp.register_comm_hook(state=None, hook=noop)
Example::
Below is an example of a Parallel SGD algorithm where gradients are encoded before
allreduce, and then decoded after allreduce.
>>> # xdoctest: +SKIP('undefined name')
>>> def encode_and_decode(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]:
>>> encoded_tensor = encode(bucket.buffer()) # encode gradients
>>> fut = torch.distributed.all_reduce(encoded_tensor).get_future()
>>> # Define the then callback to decode.
>>> def decode(fut):
>>> decoded_tensor = decode(fut.value()[0]) # decode gradients
>>> return decoded_tensor
>>> return fut.then(decode)
>>> ddp.register_comm_hook(state=None, hook=encode_and_decode)
"""
self._check_comm_hook(hook)
if hook.__name__ in ["bf16_compress_hook", "fp16_compress_hook"]:
# If we pass None, then the hook will try to get the world size
# by calling `dist.group.WORLD.size()`, which causes compilation
# errors. So we pre-decode the process group and pass it to the
# hook.
if state is None:
state = dist.group.WORLD
assert self.logger is not None
self.logger._set_comm_hook_name(hook.__qualname__)
self._comm_hooks.append((hook, state))
dist._register_comm_hook(self.reducer, state, hook)
def _register_builtin_comm_hook(self, comm_hook_type):
r"""
Register a built-in communication hook that specifies how DDP aggregates gradients across multiple workers.
The built-in hooks aim to provide efficient C++ implementations for certain hooks,
which might not be as efficient if implemented in Python using a Python communication hook.
Args:
comm_hook_type (dist.BuiltinCommHookType): type of communication hook, such as ALLREDUCE, FP16_COMPRESS, etc.
.. warning ::
DDP communication hook can only be registered once and should be registered
before calling backward.
Example::
Below is an example of a FP16 compression where gradients are
compressed into 16-bit floating-point numbers before allreduce, and
then decompressed after allreduce.
>>> # xdoctest: +SKIP('undefined name')
>>> ddp._register_builtin_comm_hook(dist.BuiltinCommHookType.FP16_COMPRESS)
"""
assert self.logger is not None
self.logger._set_comm_hook_name(str(comm_hook_type))
dist._register_builtin_comm_hook(self.reducer, comm_hook_type)
def _register_fused_optim(self, optim: Type, *args, optim_params=None, **kwargs):
r"""
Register an optimizer in DDP to optimize parameter immediately after its gradient reduction.
Registers an optimizer with DDP such that the optimization for a
parameter will run immediately when that parameter's gradient is
finished with reduction, instead of waiting for all parameters'
gradients to finish reduction. This can result in a training speedup
depending on your workload since the optimizer can run while gradient
reduction for other parameters are still ongoing. In addition, this has
the potential to reduce peak memory consumption during training, as it
only needs to load the per-parameter optimizer states of a single
parameter at a time, instead of loading all per-parameter optimizer
states at once.
Args:
optim (Type): a ``torch.optim.Optimizer`` class to be registered
as a fused optimizer.
*args (Sequence[Any]): Arguments to forward to `optim`.
optim_params (Optional[Iterable[torch.Tensor]]): Set of parameters
to optimize, similar to `params` argument of traditional `torch.optim`
Optimizers. If this is omitted, all DDP model parameters will be
optimized.
**kwargs: (Dict[str, Any]): Keyword arguments to forward to `optim`.
.. warning ::
_register_fused_optim should only be called once on a DDP instance,
and registering multiple fused optimizers for the same DDP model
is not currently supported. Please ping
https://github.com/pytorch/pytorch/issues/71595 if this is necessary
for your use case.
.. warning ::
_register_fused_optim and register_comm_hook currently do not
compose together, meaning that custom DDP communication hooks are
not supported with overlapped optimizers. Please ping
https://github.com/pytorch/pytorch/issues/71595 if this is necessary
for your use case.
.. warning ::
Gradient accumulation and DDP `no_sync` are currently not supported
with overlapped optimizer. Please ping
https://github.com/pytorch/pytorch/issues/71595 if this is necessary
for your use case.
Example::
>>> # xdoctest: +SKIP("No rendezvous handler")
>>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
>>> net = torch.nn.parallel.DistributedDataParallel(model, pg)
>>> lr = 1e-2
>>> betas = (0.9, 0.99)
>>> eps = 1e-6
>>> net._register_fused_optim(torch.optim.Adam, lr, betas=betas, eps=eps)
>>> # Example with subset of parameters
>>> params_to_opt = [list(net.parameters())[0]]
>>> net._register_fused_optim(
... torch.optim.Adam, lr, optim_params=params_to_opt, betas=betas, eps=eps
... )
"""
# Note: importing in function, otherwise this will cause a circular
# import as optimizer_overlap module needs to import DistributedDataParallel.
from torch.distributed.algorithms._optimizer_overlap import _as_overlapped_optim
overlapped_optim = _as_overlapped_optim(optim, optim_params, *args, **kwargs)
try:
overlapped_optim.register_ddp(self)
except NotImplementedError as e:
raise RuntimeError(
f"{optim} does not support overlapped DDP. Please file an issue to PyTorch or the respective owner of {optim}."
) from e
def _distributed_broadcast_coalesced(
self, tensors, buffer_size, authoritative_rank=0
):
dist._broadcast_coalesced(
self.process_group, tensors, buffer_size, authoritative_rank
)
def _check_sync_bufs_post_fwd(self):
return (
self.will_sync_module_buffers()
and hasattr(self, "buffer_hook")
and self.buffer_hook.buffer_comm_hook_location
== _BufferCommHookLocation.POST_FORWARD
)
def _check_sync_bufs_pre_fwd(self):
return self.will_sync_module_buffers() and (
not hasattr(self, "buffer_hook")
or self.buffer_hook.buffer_comm_hook_location
== _BufferCommHookLocation.PRE_FORWARD
)
def will_sync_module_buffers(self):
return (
self.require_forward_param_sync
and self.broadcast_buffers
and len(self.modules_buffers) > 0
)
def _find_common_rank(self, input_rank, rank_cond):
# -1 indicates that this rank is not under consideration to be the
# common_rank
rank_to_use = torch.tensor(
[input_rank if rank_cond else -1],
device=self.device,
)
dist.all_reduce(rank_to_use, op=ReduceOp.MAX, group=self.process_group)
if rank_to_use.item() == -1:
self._log_and_throw(
ValueError,
"BUG! Expected rank_cond to be true for at least one process."
" This indicates a bug in PyTorch, please report an issue.",
)
return rank_to_use.item()
def _sync_buffers(self):
with torch.no_grad():
# module buffer sync
# Synchronize buffers across processes.
# If we are running DDP with the join manager, we have to agree
# upon a rank to sync module buffers from, since rank 0 may
# already have been joined and have stale module buffers.
if self._join_config.enable:
authoritative_rank = self._find_common_rank(
self._distributed_rank, True
)
else:
# The process with rank 0 is considered the authoritative copy.
authoritative_rank = 0
# Update self.modules_buffers incase any buffers were
# reassigned.
self._assign_modules_buffers()
self._sync_module_buffers(authoritative_rank)
def _sync_module_buffers(self, authoritative_rank):
if not hasattr(self, "buffer_hook"):
self._default_broadcast_coalesced(authoritative_rank=authoritative_rank)
else:
hook = self.buffer_hook.buffer_comm_hook
state = self.buffer_hook.buffer_comm_hook_state
futs = hook(state, self.named_module_buffers)
if futs is not None:
self.reducer._install_post_backward_futures(futs)
def _default_broadcast_coalesced(
self, bufs=None, bucket_size=None, authoritative_rank=0
):
"""
Broadcasts buffers from rank 0 to rest of workers.
If bufs, bucket_size are None, default values self.modules_buffers
and self.broadcast_bucket_size are used instead.
"""
if bufs is None:
bufs = self.modules_buffers
if bucket_size is None:
bucket_size = self.broadcast_bucket_size
self._distributed_broadcast_coalesced(bufs, bucket_size, authoritative_rank)
def _passing_sync_batchnorm_handle(self, module):
for layer in module.modules():
if isinstance(layer, torch.nn.modules.SyncBatchNorm):
if self.device_type == "cpu":
self._log_and_throw(
ValueError,
"SyncBatchNorm layers only work with GPU modules",
)
def _check_comm_hook(self, hook):
if not callable(hook):
self._log_and_throw(TypeError, "Communication hook must be callable.")
sig = inspect.signature(hook)
if (
sig.parameters["bucket"].annotation != inspect._empty
and sig.parameters["bucket"].annotation != dist.GradBucket
):
self._log_and_throw(
ValueError,
"Communication hook: bucket annotation should be dist.GradBucket.",
)
if (
sig.return_annotation != inspect._empty
and sig.return_annotation != torch.futures.Future[torch.Tensor]
):
self._log_and_throw(
ValueError,
"Communication hook: return annotation should be torch.futures.Future[torch.Tensor].",
)
if hook.__name__ in [
"bf16_compress_hook",
"bf16_compress_wrapper_hook",
] and (
(torch.version.cuda is None and torch.version.hip is None)
or (
torch.version.cuda is not None
and int(torch.version.cuda.split(".")[0]) < 11
)
or not dist.is_available()
or not dist.is_nccl_available()
or torch.cuda.nccl.version() < (2, 10)
):
self._log_and_throw(
TypeError,
"BF16 all reduce communication hook required CUDA 11+ and NCCL 2.10+.",
)
@property
def _distributed_rank(self):
return dist.get_rank(self.process_group)
@staticmethod
def _get_data_parallel_params(module, named_params=False):
"""Return a generator of parameters managed by a given DDP unit."""
for param in (
module.parameters() if not named_params else module.named_parameters()
):
if not hasattr(param, "_ddp_ignored"):
yield param
@staticmethod
def _set_params_and_buffers_to_ignore_for_model(
module, params_and_buffers_to_ignore
):
"""
Set parameters and buffers to be ignored by DDP.
Expected format for parameters is the fully qualified name: {module_name}.{param_name}, and
similarly, {module_name}.{buffer_name} for buffers. For example:
params_to_ignore = []
# NB: model here is vanilla PyTorch module, not yet wrapped with DDP.
for module_name, module in model.named_modules():
for param_name, param in module.named_parameters(recurse=False):
if should_ignore(param):
# Create expected format
fqn = f"{module_name}.{param_name}"
params_to_ignore.append(fqn)
torch.nn.parallel.DistributedDataParallel._set_params_and_buffers_to_ignore_for_model(
model,
params_to_ignore
)
"""
# This is a workaround to set parameters and buffers DDP should ignore
# during synchronization. It will be removed when the API is finalized
# as part of addressing https://github.com/pytorch/pytorch/issues/43690.
module._ddp_params_and_buffers_to_ignore = params_and_buffers_to_ignore
for name, param in module.named_parameters():
if name in params_and_buffers_to_ignore:
param._ddp_ignored = True
for name, buffer in module.named_buffers():
if name in params_and_buffers_to_ignore:
buffer._ddp_ignored = True
def _get_ddp_logging_data(self):
r"""
Return a dictionary of logging data for debugging and analysis.
This interface can be called after DistributedDataParallel() is
constructed. It returns a dictionary of logging data. It could help
for debugging and analysis. The logging data includes DistributedDataParallel
constructor input parameters, some internal states of DistributedDataParallel
and performance metrics. Simply print the dictionary and see what
these metrics are.
This is a prototype interface and subject to change in the future.
"""
assert self.logger is not None
ddp_logging_data = self.logger._get_ddp_logging_data()
return {**ddp_logging_data.strs_map, **ddp_logging_data.ints_map}
def _set_ddp_runtime_logging_sample_rate(self, sample_rate):
r"""
Set sample_rate of collecting runtime stats.
This interface allows users to set sample_rate of collecting
runtime stats. The runtime stats will be recorded for the
first 10 iterations, after 10 iterations runtime stats will be
recorded once every "sample_rate" training iterations. In
default, runtime stats are recorded for the first 10 iterations,
after 10 iterations runtime stats are recorded once every
"kDDPRuntimeLoggingSampleRate=100" training iterations.
This is a prototype interface and subject to change in the future.
"""
if sample_rate < 1:
self._log_and_throw(
ValueError,
"DDP runtime logging sample rate should be equal or greater than 1",
)
self.reducer._set_ddp_runtime_logging_sample_rate(sample_rate)
def _set_static_graph(self):
"""
Set static graph for DDP.
It is recommended to set static graph in the DDP constructor, which will
call this private API internally.
"""
# If self.static_graph has been set, no need to set it again
if self.static_graph:
warnings.warn(
"You've set static_graph to be True, no need to set it again."
)
return
self.static_graph = True
self._static_graph_delay_allreduce_enqueued = False
self.reducer._set_static_graph()
assert self.logger is not None
self.logger._set_static_graph()
if self.find_unused_parameters:
warnings.warn(
"You passed find_unused_parameters=true to DistributedDataParallel, "
"`_set_static_graph` will detect unused parameters automatically, so "
"you do not need to set find_unused_parameters=true, just be sure these "
"unused parameters will not change during training loop while calling "
"`_set_static_graph`."
)
def _remove_autograd_hooks(self):
"""Remove autograd hooks registered by the reducer on the model parameters."""
self.reducer._remove_autograd_hooks()
def _check_reducer_finalized(self):
"""
Check if the reducer has processed all buckets and finalized the backward appropriately.
It is useful to call this method after calling .backward() in your training loop
in order to avoid subsequent hard to debug errors down the road due to the
reducer not finalizing backward.
"""
self.reducer._check_reducer_finalized()
def _set_sparse_metadata(self, global_unique_ids):
self.reducer._set_sparse_metadata(global_unique_ids)
def _update_process_group(self, new_process_group):
"""
Dynamically updates the process group for DDP so that we can shrink/expand DDP
world size without having to reinitialize DDP.
NOTE: If you are using custom communications hooks via, register_comm_hook,
you need to update the process groups for those hooks separately.
"""
# Force a rebuild of buckets for a new process group. This ensures all ranks
# are synchronized in terms of when they will rebuild buckets and also
# re-evaluates previous assumptions of buckets given the world size might have
# changed.
self._has_rebuilt_buckets = False
self.reducer._reset_state()
if not _rank_not_in_group(new_process_group):
self.process_group = new_process_group
self.reducer._update_process_group(new_process_group)
|