File size: 84,583 Bytes
d1ed09d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
"""
Helpers for embarrassingly parallel code.
"""
# Author: Gael Varoquaux < gael dot varoquaux at normalesup dot org >
# Copyright: 2010, Gael Varoquaux
# License: BSD 3 clause

from __future__ import division

import os
import sys
from math import sqrt
import functools
import collections
import time
import threading
import itertools
from uuid import uuid4
from numbers import Integral
import warnings
import queue
import weakref
from contextlib import nullcontext

from multiprocessing import TimeoutError

from ._multiprocessing_helpers import mp

from .logger import Logger, short_format_time
from .disk import memstr_to_bytes
from ._parallel_backends import (FallbackToBackend, MultiprocessingBackend,
                                 ThreadingBackend, SequentialBackend,
                                 LokyBackend)
from ._utils import eval_expr, _Sentinel

# Make sure that those two classes are part of the public joblib.parallel API
# so that 3rd party backend implementers can import them from here.
from ._parallel_backends import AutoBatchingMixin  # noqa
from ._parallel_backends import ParallelBackendBase  # noqa


IS_PYPY = hasattr(sys, "pypy_version_info")


BACKENDS = {
    'threading': ThreadingBackend,
    'sequential': SequentialBackend,
}
# name of the backend used by default by Parallel outside of any context
# managed by ``parallel_config`` or ``parallel_backend``.

# threading is the only backend that is always everywhere
DEFAULT_BACKEND = 'threading'

MAYBE_AVAILABLE_BACKENDS = {'multiprocessing', 'loky'}

# if multiprocessing is available, so is loky, we set it as the default
# backend
if mp is not None:
    BACKENDS['multiprocessing'] = MultiprocessingBackend
    from .externals import loky
    BACKENDS['loky'] = LokyBackend
    DEFAULT_BACKEND = 'loky'


DEFAULT_THREAD_BACKEND = 'threading'


# Thread local value that can be overridden by the ``parallel_config`` context
# manager
_backend = threading.local()


def _register_dask():
    """Register Dask Backend if called with parallel_config(backend="dask")"""
    try:
        from ._dask import DaskDistributedBackend
        register_parallel_backend('dask', DaskDistributedBackend)
    except ImportError as e:
        msg = ("To use the dask.distributed backend you must install both "
               "the `dask` and distributed modules.\n\n"
               "See https://dask.pydata.org/en/latest/install.html for more "
               "information.")
        raise ImportError(msg) from e


EXTERNAL_BACKENDS = {
    'dask': _register_dask,
}


# Sentinels for the default values of the Parallel constructor and
# the parallel_config and parallel_backend context managers
default_parallel_config = {
    "backend": _Sentinel(default_value=None),
    "n_jobs": _Sentinel(default_value=None),
    "verbose": _Sentinel(default_value=0),
    "temp_folder": _Sentinel(default_value=None),
    "max_nbytes": _Sentinel(default_value="1M"),
    "mmap_mode": _Sentinel(default_value="r"),
    "prefer": _Sentinel(default_value=None),
    "require": _Sentinel(default_value=None),
}


VALID_BACKEND_HINTS = ('processes', 'threads', None)
VALID_BACKEND_CONSTRAINTS = ('sharedmem', None)


def _get_config_param(param, context_config, key):
    """Return the value of a parallel config parameter

    Explicitly setting it in Parallel has priority over setting in a
    parallel_(config/backend) context manager.
    """
    if param is not default_parallel_config[key]:
        # param is explicitly set, return it
        return param

    if context_config[key] is not default_parallel_config[key]:
        # there's a context manager and the key is set, return it
        return context_config[key]

    # Otherwise, we are in the default_parallel_config,
    # return the default value
    return param.default_value


def get_active_backend(
    prefer=default_parallel_config["prefer"],
    require=default_parallel_config["require"],
    verbose=default_parallel_config["verbose"],
):
    """Return the active default backend"""
    backend, config = _get_active_backend(prefer, require, verbose)
    n_jobs = _get_config_param(
        default_parallel_config['n_jobs'], config, "n_jobs"
    )
    return backend, n_jobs


def _get_active_backend(
    prefer=default_parallel_config["prefer"],
    require=default_parallel_config["require"],
    verbose=default_parallel_config["verbose"],
):
    """Return the active default backend"""

    backend_config = getattr(_backend, "config", default_parallel_config)

    backend = _get_config_param(
        default_parallel_config['backend'], backend_config, "backend"
    )
    prefer = _get_config_param(prefer, backend_config, "prefer")
    require = _get_config_param(require, backend_config, "require")
    verbose = _get_config_param(verbose, backend_config, "verbose")

    if prefer not in VALID_BACKEND_HINTS:
        raise ValueError(
            f"prefer={prefer} is not a valid backend hint, "
            f"expected one of {VALID_BACKEND_HINTS}"
        )
    if require not in VALID_BACKEND_CONSTRAINTS:
        raise ValueError(
            f"require={require} is not a valid backend constraint, "
            f"expected one of {VALID_BACKEND_CONSTRAINTS}"
        )
    if prefer == 'processes' and require == 'sharedmem':
        raise ValueError(
            "prefer == 'processes' and require == 'sharedmem'"
            " are inconsistent settings"
        )

    explicit_backend = True
    if backend is None:

        # We are either outside of the scope of any parallel_(config/backend)
        # context manager or the context manager did not set a backend.
        # create the default backend instance now.
        backend = BACKENDS[DEFAULT_BACKEND](nesting_level=0)
        explicit_backend = False

    # Try to use the backend set by the user with the context manager.

    nesting_level = backend.nesting_level
    uses_threads = getattr(backend, 'uses_threads', False)
    supports_sharedmem = getattr(backend, 'supports_sharedmem', False)
    # Force to use thread-based backend if the provided backend does not
    # match the shared memory constraint or if the backend is not explicitly
    # given and threads are preferred.
    force_threads = (require == 'sharedmem' and not supports_sharedmem)
    force_threads |= (
        not explicit_backend and prefer == 'threads' and not uses_threads
    )
    if force_threads:
        # This backend does not match the shared memory constraint:
        # fallback to the default thead-based backend.
        sharedmem_backend = BACKENDS[DEFAULT_THREAD_BACKEND](
            nesting_level=nesting_level
        )
        # Warn the user if we forced the backend to thread-based, while the
        # user explicitly specified a non-thread-based backend.
        if verbose >= 10 and explicit_backend:
            print(
                f"Using {sharedmem_backend.__class__.__name__} as "
                f"joblib backend instead of {backend.__class__.__name__} "
                "as the latter does not provide shared memory semantics."
            )
        # Force to n_jobs=1 by default
        thread_config = backend_config.copy()
        thread_config['n_jobs'] = 1
        return sharedmem_backend, thread_config

    return backend, backend_config


class parallel_config:
    """Set the default backend or configuration for :class:`~joblib.Parallel`.

    This is an alternative to directly passing keyword arguments to the
    :class:`~joblib.Parallel` class constructor. It is particularly useful when
    calling into library code that uses joblib internally but does not expose
    the various parallel configuration arguments in its own API.

    Parameters
    ----------
    backend: str or ParallelBackendBase instance, default=None
        If ``backend`` is a string it must match a previously registered
        implementation using the :func:`~register_parallel_backend` function.

        By default the following backends are available:

        - 'loky': single-host, process-based parallelism (used by default),
        - 'threading': single-host, thread-based parallelism,
        - 'multiprocessing': legacy single-host, process-based parallelism.

        'loky' is recommended to run functions that manipulate Python objects.
        'threading' is a low-overhead alternative that is most efficient for
        functions that release the Global Interpreter Lock: e.g. I/O-bound
        code or CPU-bound code in a few calls to native code that explicitly
        releases the GIL. Note that on some rare systems (such as pyodide),
        multiprocessing and loky may not be available, in which case joblib
        defaults to threading.

        In addition, if the ``dask`` and ``distributed`` Python packages are
        installed, it is possible to use the 'dask' backend for better
        scheduling of nested parallel calls without over-subscription and
        potentially distribute parallel calls over a networked cluster of
        several hosts.

        It is also possible to use the distributed 'ray' backend for
        distributing the workload to a cluster of nodes. See more details
        in the Examples section below.

        Alternatively the backend can be passed directly as an instance.

    n_jobs: int, default=None
        The maximum number of concurrently running jobs, such as the number
        of Python worker processes when ``backend="loky"`` or the size of the
        thread-pool when ``backend="threading"``.
        This argument is converted to an integer, rounded below for float.
        If -1 is given, `joblib` tries to use all CPUs. The number of CPUs
        ``n_cpus`` is obtained with :func:`~cpu_count`.
        For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance,
        using ``n_jobs=-2`` will result in all CPUs but one being used.
        This argument can also go above ``n_cpus``, which will cause
        oversubscription. In some cases, slight oversubscription can be
        beneficial, e.g., for tasks with large I/O operations.
        If 1 is given, no parallel computing code is used at all, and the
        behavior amounts to a simple python `for` loop. This mode is not
        compatible with `timeout`.
        None is a marker for 'unset' that will be interpreted as n_jobs=1
        unless the call is performed under a :func:`~parallel_config`
        context manager that sets another value for ``n_jobs``.
        If n_jobs = 0 then a ValueError is raised.

    verbose: int, default=0
        The verbosity level: if non zero, progress messages are
        printed. Above 50, the output is sent to stdout.
        The frequency of the messages increases with the verbosity level.
        If it more than 10, all iterations are reported.

    temp_folder: str or None, default=None
        Folder to be used by the pool for memmapping large arrays
        for sharing memory with worker processes. If None, this will try in
        order:

        - a folder pointed by the ``JOBLIB_TEMP_FOLDER`` environment
          variable,
        - ``/dev/shm`` if the folder exists and is writable: this is a
          RAM disk filesystem available by default on modern Linux
          distributions,
        - the default system temporary folder that can be
          overridden with ``TMP``, ``TMPDIR`` or ``TEMP`` environment
          variables, typically ``/tmp`` under Unix operating systems.

    max_nbytes int, str, or None, optional, default='1M'
        Threshold on the size of arrays passed to the workers that
        triggers automated memory mapping in temp_folder. Can be an int
        in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte.
        Use None to disable memmapping of large arrays.

    mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r'
        Memmapping mode for numpy arrays passed to workers. None will
        disable memmapping, other modes defined in the numpy.memmap doc:
        https://numpy.org/doc/stable/reference/generated/numpy.memmap.html
        Also, see 'max_nbytes' parameter documentation for more details.

    prefer: str in {'processes', 'threads'} or None, default=None
        Soft hint to choose the default backend.
        The default process-based backend is 'loky' and the default
        thread-based backend is 'threading'. Ignored if the ``backend``
        parameter is specified.

    require: 'sharedmem' or None, default=None
        Hard constraint to select the backend. If set to 'sharedmem',
        the selected backend will be single-host and thread-based.

    inner_max_num_threads: int, default=None
        If not None, overwrites the limit set on the number of threads
        usable in some third-party library threadpools like OpenBLAS,
        MKL or OpenMP. This is only used with the ``loky`` backend.

    backend_params: dict
        Additional parameters to pass to the backend constructor when
        backend is a string.

    Notes
    -----
    Joblib tries to limit the oversubscription by limiting the number of
    threads usable in some third-party library threadpools like OpenBLAS, MKL
    or OpenMP. The default limit in each worker is set to
    ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be
    overwritten with the ``inner_max_num_threads`` argument which will be used
    to set this limit in the child processes.

    .. versionadded:: 1.3

    Examples
    --------
    >>> from operator import neg
    >>> with parallel_config(backend='threading'):
    ...     print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
    ...
    [-1, -2, -3, -4, -5]

    To use the 'ray' joblib backend add the following lines:

    >>> from ray.util.joblib import register_ray  # doctest: +SKIP
    >>> register_ray()  # doctest: +SKIP
    >>> with parallel_config(backend="ray"):  # doctest: +SKIP
    ...     print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
    [-1, -2, -3, -4, -5]

    """
    def __init__(
        self,
        backend=default_parallel_config["backend"],
        *,
        n_jobs=default_parallel_config["n_jobs"],
        verbose=default_parallel_config["verbose"],
        temp_folder=default_parallel_config["temp_folder"],
        max_nbytes=default_parallel_config["max_nbytes"],
        mmap_mode=default_parallel_config["mmap_mode"],
        prefer=default_parallel_config["prefer"],
        require=default_parallel_config["require"],
        inner_max_num_threads=None,
        **backend_params
    ):
        # Save the parallel info and set the active parallel config
        self.old_parallel_config = getattr(
            _backend, "config", default_parallel_config
        )

        backend = self._check_backend(
            backend, inner_max_num_threads, **backend_params
        )

        new_config = {
            "n_jobs": n_jobs,
            "verbose": verbose,
            "temp_folder": temp_folder,
            "max_nbytes": max_nbytes,
            "mmap_mode": mmap_mode,
            "prefer": prefer,
            "require": require,
            "backend": backend
        }
        self.parallel_config = self.old_parallel_config.copy()
        self.parallel_config.update({
            k: v for k, v in new_config.items()
            if not isinstance(v, _Sentinel)
        })

        setattr(_backend, "config", self.parallel_config)

    def _check_backend(self, backend, inner_max_num_threads, **backend_params):
        if backend is default_parallel_config['backend']:
            if inner_max_num_threads is not None or len(backend_params) > 0:
                raise ValueError(
                    "inner_max_num_threads and other constructor "
                    "parameters backend_params are only supported "
                    "when backend is not None."
                )
            return backend

        if isinstance(backend, str):
            # Handle non-registered or missing backends
            if backend not in BACKENDS:
                if backend in EXTERNAL_BACKENDS:
                    register = EXTERNAL_BACKENDS[backend]
                    register()
                elif backend in MAYBE_AVAILABLE_BACKENDS:
                    warnings.warn(
                        f"joblib backend '{backend}' is not available on "
                        f"your system, falling back to {DEFAULT_BACKEND}.",
                        UserWarning,
                        stacklevel=2
                    )
                    BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
                else:
                    raise ValueError(
                        f"Invalid backend: {backend}, expected one of "
                        f"{sorted(BACKENDS.keys())}"
                    )

            backend = BACKENDS[backend](**backend_params)

        if inner_max_num_threads is not None:
            msg = (
                f"{backend.__class__.__name__} does not accept setting the "
                "inner_max_num_threads argument."
            )
            assert backend.supports_inner_max_num_threads, msg
            backend.inner_max_num_threads = inner_max_num_threads

        # If the nesting_level of the backend is not set previously, use the
        # nesting level from the previous active_backend to set it
        if backend.nesting_level is None:
            parent_backend = self.old_parallel_config['backend']
            if parent_backend is default_parallel_config['backend']:
                nesting_level = 0
            else:
                nesting_level = parent_backend.nesting_level
            backend.nesting_level = nesting_level

        return backend

    def __enter__(self):
        return self.parallel_config

    def __exit__(self, type, value, traceback):
        self.unregister()

    def unregister(self):
        setattr(_backend, "config", self.old_parallel_config)


class parallel_backend(parallel_config):
    """Change the default backend used by Parallel inside a with block.

    .. warning::
        It is advised to use the :class:`~joblib.parallel_config` context
        manager instead, which allows more fine-grained control over the
        backend configuration.

    If ``backend`` is a string it must match a previously registered
    implementation using the :func:`~register_parallel_backend` function.

    By default the following backends are available:

    - 'loky': single-host, process-based parallelism (used by default),
    - 'threading': single-host, thread-based parallelism,
    - 'multiprocessing': legacy single-host, process-based parallelism.

    'loky' is recommended to run functions that manipulate Python objects.
    'threading' is a low-overhead alternative that is most efficient for
    functions that release the Global Interpreter Lock: e.g. I/O-bound code or
    CPU-bound code in a few calls to native code that explicitly releases the
    GIL. Note that on some rare systems (such as Pyodide),
    multiprocessing and loky may not be available, in which case joblib
    defaults to threading.

    You can also use the `Dask <https://docs.dask.org/en/stable/>`_ joblib
    backend to distribute work across machines. This works well with
    scikit-learn estimators with the ``n_jobs`` parameter, for example::

    >>> import joblib  # doctest: +SKIP
    >>> from sklearn.model_selection import GridSearchCV  # doctest: +SKIP
    >>> from dask.distributed import Client, LocalCluster # doctest: +SKIP

    >>> # create a local Dask cluster
    >>> cluster = LocalCluster()  # doctest: +SKIP
    >>> client = Client(cluster)  # doctest: +SKIP
    >>> grid_search = GridSearchCV(estimator, param_grid, n_jobs=-1)
    ... # doctest: +SKIP
    >>> with joblib.parallel_backend("dask", scatter=[X, y]):  # doctest: +SKIP
    ...     grid_search.fit(X, y)

    It is also possible to use the distributed 'ray' backend for distributing
    the workload to a cluster of nodes. To use the 'ray' joblib backend add
    the following lines::

     >>> from ray.util.joblib import register_ray  # doctest: +SKIP
     >>> register_ray()  # doctest: +SKIP
     >>> with parallel_backend("ray"):  # doctest: +SKIP
     ...     print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
     [-1, -2, -3, -4, -5]

    Alternatively the backend can be passed directly as an instance.

    By default all available workers will be used (``n_jobs=-1``) unless the
    caller passes an explicit value for the ``n_jobs`` parameter.

    This is an alternative to passing a ``backend='backend_name'`` argument to
    the :class:`~Parallel` class constructor. It is particularly useful when
    calling into library code that uses joblib internally but does not expose
    the backend argument in its own API.

    >>> from operator import neg
    >>> with parallel_backend('threading'):
    ...     print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
    ...
    [-1, -2, -3, -4, -5]

    Joblib also tries to limit the oversubscription by limiting the number of
    threads usable in some third-party library threadpools like OpenBLAS, MKL
    or OpenMP. The default limit in each worker is set to
    ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be
    overwritten with the ``inner_max_num_threads`` argument which will be used
    to set this limit in the child processes.

    .. versionadded:: 0.10

    See Also
    --------
    joblib.parallel_config: context manager to change the backend
        configuration.
    """
    def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None,
                 **backend_params):

        super().__init__(
            backend=backend,
            n_jobs=n_jobs,
            inner_max_num_threads=inner_max_num_threads,
            **backend_params
        )

        if self.old_parallel_config is None:
            self.old_backend_and_jobs = None
        else:
            self.old_backend_and_jobs = (
                self.old_parallel_config["backend"],
                self.old_parallel_config["n_jobs"],
            )
        self.new_backend_and_jobs = (
            self.parallel_config["backend"],
            self.parallel_config["n_jobs"],
        )

    def __enter__(self):
        return self.new_backend_and_jobs


# Under Linux or OS X the default start method of multiprocessing
# can cause third party libraries to crash. Under Python 3.4+ it is possible
# to set an environment variable to switch the default start method from
# 'fork' to 'forkserver' or 'spawn' to avoid this issue albeit at the cost
# of causing semantic changes and some additional pool instantiation overhead.
DEFAULT_MP_CONTEXT = None
if hasattr(mp, 'get_context'):
    method = os.environ.get('JOBLIB_START_METHOD', '').strip() or None
    if method is not None:
        DEFAULT_MP_CONTEXT = mp.get_context(method=method)


class BatchedCalls(object):
    """Wrap a sequence of (func, args, kwargs) tuples as a single callable"""

    def __init__(self, iterator_slice, backend_and_jobs, reducer_callback=None,
                 pickle_cache=None):
        self.items = list(iterator_slice)
        self._size = len(self.items)
        self._reducer_callback = reducer_callback
        if isinstance(backend_and_jobs, tuple):
            self._backend, self._n_jobs = backend_and_jobs
        else:
            # this is for backward compatibility purposes. Before 0.12.6,
            # nested backends were returned without n_jobs indications.
            self._backend, self._n_jobs = backend_and_jobs, None
        self._pickle_cache = pickle_cache if pickle_cache is not None else {}

    def __call__(self):
        # Set the default nested backend to self._backend but do not set the
        # change the default number of processes to -1
        with parallel_config(backend=self._backend, n_jobs=self._n_jobs):
            return [func(*args, **kwargs)
                    for func, args, kwargs in self.items]

    def __reduce__(self):
        if self._reducer_callback is not None:
            self._reducer_callback()
        # no need to pickle the callback.
        return (
            BatchedCalls,
            (self.items, (self._backend, self._n_jobs), None,
             self._pickle_cache)
        )

    def __len__(self):
        return self._size


# Possible exit status for a task
TASK_DONE = "Done"
TASK_ERROR = "Error"
TASK_PENDING = "Pending"


###############################################################################
# CPU count that works also when multiprocessing has been disabled via
# the JOBLIB_MULTIPROCESSING environment variable
def cpu_count(only_physical_cores=False):
    """Return the number of CPUs.

    This delegates to loky.cpu_count that takes into account additional
    constraints such as Linux CFS scheduler quotas (typically set by container
    runtimes such as docker) and CPU affinity (for instance using the taskset
    command on Linux).

    If only_physical_cores is True, do not take hyperthreading / SMT logical
    cores into account.
    """
    if mp is None:
        return 1

    return loky.cpu_count(only_physical_cores=only_physical_cores)


###############################################################################
# For verbosity

def _verbosity_filter(index, verbose):
    """ Returns False for indices increasingly apart, the distance
        depending on the value of verbose.

        We use a lag increasing as the square of index
    """
    if not verbose:
        return True
    elif verbose > 10:
        return False
    if index == 0:
        return False
    verbose = .5 * (11 - verbose) ** 2
    scale = sqrt(index / verbose)
    next_scale = sqrt((index + 1) / verbose)
    return (int(next_scale) == int(scale))


###############################################################################
def delayed(function):
    """Decorator used to capture the arguments of a function."""

    def delayed_function(*args, **kwargs):
        return function, args, kwargs
    try:
        delayed_function = functools.wraps(function)(delayed_function)
    except AttributeError:
        " functools.wraps fails on some callable objects "
    return delayed_function


###############################################################################
class BatchCompletionCallBack(object):
    """Callback to keep track of completed results and schedule the next tasks.

    This callable is executed by the parent process whenever a worker process
    has completed a batch of tasks.

    It is used for progress reporting, to update estimate of the batch
    processing duration and to schedule the next batch of tasks to be
    processed.

    It is assumed that this callback will always be triggered by the backend
    right after the end of a task, in case of success as well as in case of
    failure.
    """

    ##########################################################################
    #                   METHODS CALLED BY THE MAIN THREAD                    #
    ##########################################################################
    def __init__(self, dispatch_timestamp, batch_size, parallel):
        self.dispatch_timestamp = dispatch_timestamp
        self.batch_size = batch_size
        self.parallel = parallel
        self.parallel_call_id = parallel._call_id

        # Internals to keep track of the status and outcome of the task.

        # Used to hold a reference to the future-like object returned by the
        # backend after launching this task
        # This will be set later when calling `register_job`, as it is only
        # created once the task has been submitted.
        self.job = None

        if not parallel._backend.supports_retrieve_callback:
            # The status is only used for asynchronous result retrieval in the
            # callback.
            self.status = None
        else:
            # The initial status for the job is TASK_PENDING.
            # Once it is done, it will be either TASK_DONE, or TASK_ERROR.
            self.status = TASK_PENDING

    def register_job(self, job):
        """Register the object returned by `apply_async`."""
        self.job = job

    def get_result(self, timeout):
        """Returns the raw result of the task that was submitted.

        If the task raised an exception rather than returning, this same
        exception will be raised instead.

        If the backend supports the retrieval callback, it is assumed that this
        method is only called after the result has been registered. It is
        ensured by checking that `self.status(timeout)` does not return
        TASK_PENDING. In this case, `get_result` directly returns the
        registered result (or raise the registered exception).

        For other backends, there are no such assumptions, but `get_result`
        still needs to synchronously retrieve the result before it can
        return it or raise. It will block at most `self.timeout` seconds
        waiting for retrieval to complete, after that it raises a TimeoutError.
        """

        backend = self.parallel._backend

        if backend.supports_retrieve_callback:
            # We assume that the result has already been retrieved by the
            # callback thread, and is stored internally. It's just waiting to
            # be returned.
            return self._return_or_raise()

        # For other backends, the main thread needs to run the retrieval step.
        try:
            if backend.supports_timeout:
                result = self.job.get(timeout=timeout)
            else:
                result = self.job.get()
            outcome = dict(result=result, status=TASK_DONE)
        except BaseException as e:
            outcome = dict(result=e, status=TASK_ERROR)
        self._register_outcome(outcome)

        return self._return_or_raise()

    def _return_or_raise(self):
        try:
            if self.status == TASK_ERROR:
                raise self._result
            return self._result
        finally:
            del self._result

    def get_status(self, timeout):
        """Get the status of the task.

        This function also checks if the timeout has been reached and register
        the TimeoutError outcome when it is the case.
        """
        if timeout is None or self.status != TASK_PENDING:
            return self.status

        # The computation are running and the status is pending.
        # Check that we did not wait for this jobs more than `timeout`.
        now = time.time()
        if not hasattr(self, "_completion_timeout_counter"):
            self._completion_timeout_counter = now

        if (now - self._completion_timeout_counter) > timeout:
            outcome = dict(result=TimeoutError(), status=TASK_ERROR)
            self._register_outcome(outcome)

        return self.status

    ##########################################################################
    #                     METHODS CALLED BY CALLBACK THREADS                 #
    ##########################################################################
    def __call__(self, out):
        """Function called by the callback thread after a job is completed."""

        # If the backend doesn't support callback retrievals, the next batch of
        # tasks is dispatched regardless. The result will be retrieved by the
        # main thread when calling `get_result`.
        if not self.parallel._backend.supports_retrieve_callback:
            self._dispatch_new()
            return

        # If the backend supports retrieving the result in the callback, it
        # registers the task outcome (TASK_ERROR or TASK_DONE), and schedules
        # the next batch if needed.
        with self.parallel._lock:
            # Edge case where while the task was processing, the `parallel`
            # instance has been reset and a new call has been issued, but the
            # worker managed to complete the task and trigger this callback
            # call just before being aborted by the reset.
            if self.parallel._call_id != self.parallel_call_id:
                return

            # When aborting, stop as fast as possible and do not retrieve the
            # result as it won't be returned by the Parallel call.
            if self.parallel._aborting:
                return

            # Retrieves the result of the task in the main process and dispatch
            # a new batch if needed.
            job_succeeded = self._retrieve_result(out)

            if not self.parallel.return_ordered:
                # Append the job to the queue in the order of completion
                # instead of submission.
                self.parallel._jobs.append(self)

        if job_succeeded:
            self._dispatch_new()

    def _dispatch_new(self):
        """Schedule the next batch of tasks to be processed."""

        # This steps ensure that auto-batching works as expected.
        this_batch_duration = time.time() - self.dispatch_timestamp
        self.parallel._backend.batch_completed(self.batch_size,
                                               this_batch_duration)

        # Schedule the next batch of tasks.
        with self.parallel._lock:
            self.parallel.n_completed_tasks += self.batch_size
            self.parallel.print_progress()
            if self.parallel._original_iterator is not None:
                self.parallel.dispatch_next()

    def _retrieve_result(self, out):
        """Fetch and register the outcome of a task.

        Return True if the task succeeded, False otherwise.
        This function is only called by backends that support retrieving
        the task result in the callback thread.
        """
        try:
            result = self.parallel._backend.retrieve_result_callback(out)
            outcome = dict(status=TASK_DONE, result=result)
        except BaseException as e:
            # Avoid keeping references to parallel in the error.
            e.__traceback__ = None
            outcome = dict(result=e, status=TASK_ERROR)

        self._register_outcome(outcome)
        return outcome['status'] != TASK_ERROR

    ##########################################################################
    #            This method can be called either in the main thread         #
    #                        or in the callback thread.                      #
    ##########################################################################
    def _register_outcome(self, outcome):
        """Register the outcome of a task.

        This method can be called only once, future calls will be ignored.
        """
        # Covers the edge case where the main thread tries to register a
        # `TimeoutError` while the callback thread tries to register a result
        # at the same time.
        with self.parallel._lock:
            if self.status not in (TASK_PENDING, None):
                return
            self.status = outcome["status"]

        self._result = outcome["result"]

        # Once the result and the status are extracted, the last reference to
        # the job can be deleted.
        self.job = None

        # As soon as an error as been spotted, early stopping flags are sent to
        # the `parallel` instance.
        if self.status == TASK_ERROR:
            self.parallel._exception = True
            self.parallel._aborting = True


###############################################################################
def register_parallel_backend(name, factory, make_default=False):
    """Register a new Parallel backend factory.

    The new backend can then be selected by passing its name as the backend
    argument to the :class:`~Parallel` class. Moreover, the default backend can
    be overwritten globally by setting make_default=True.

    The factory can be any callable that takes no argument and return an
    instance of ``ParallelBackendBase``.

    Warning: this function is experimental and subject to change in a future
    version of joblib.

    .. versionadded:: 0.10
    """
    BACKENDS[name] = factory
    if make_default:
        global DEFAULT_BACKEND
        DEFAULT_BACKEND = name


def effective_n_jobs(n_jobs=-1):
    """Determine the number of jobs that can actually run in parallel

    n_jobs is the number of workers requested by the callers. Passing n_jobs=-1
    means requesting all available workers for instance matching the number of
    CPU cores on the worker host(s).

    This method should return a guesstimate of the number of workers that can
    actually perform work concurrently with the currently enabled default
    backend. The primary use case is to make it possible for the caller to know
    in how many chunks to slice the work.

    In general working on larger data chunks is more efficient (less scheduling
    overhead and better use of CPU cache prefetching heuristics) as long as all
    the workers have enough work to do.

    Warning: this function is experimental and subject to change in a future
    version of joblib.

    .. versionadded:: 0.10
    """
    if n_jobs == 1:
        return 1

    backend, backend_n_jobs = get_active_backend()
    if n_jobs is None:
        n_jobs = backend_n_jobs
    return backend.effective_n_jobs(n_jobs=n_jobs)


###############################################################################
class Parallel(Logger):
    ''' Helper class for readable parallel mapping.

        Read more in the :ref:`User Guide <parallel>`.

        Parameters
        ----------
        n_jobs: int, default=None
            The maximum number of concurrently running jobs, such as the number
            of Python worker processes when ``backend="loky"`` or the size of
            the thread-pool when ``backend="threading"``.
            This argument is converted to an integer, rounded below for float.
            If -1 is given, `joblib` tries to use all CPUs. The number of CPUs
            ``n_cpus`` is obtained with :func:`~cpu_count`.
            For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance,
            using ``n_jobs=-2`` will result in all CPUs but one being used.
            This argument can also go above ``n_cpus``, which will cause
            oversubscription. In some cases, slight oversubscription can be
            beneficial, e.g., for tasks with large I/O operations.
            If 1 is given, no parallel computing code is used at all, and the
            behavior amounts to a simple python `for` loop. This mode is not
            compatible with ``timeout``.
            None is a marker for 'unset' that will be interpreted as n_jobs=1
            unless the call is performed under a :func:`~parallel_config`
            context manager that sets another value for ``n_jobs``.
            If n_jobs = 0 then a ValueError is raised.
        backend: str, ParallelBackendBase instance or None, default='loky'
            Specify the parallelization backend implementation.
            Supported backends are:

            - "loky" used by default, can induce some
              communication and memory overhead when exchanging input and
              output data with the worker Python processes. On some rare
              systems (such as Pyiodide), the loky backend may not be
              available.
            - "multiprocessing" previous process-based backend based on
              `multiprocessing.Pool`. Less robust than `loky`.
            - "threading" is a very low-overhead backend but it suffers
              from the Python Global Interpreter Lock if the called function
              relies a lot on Python objects. "threading" is mostly useful
              when the execution bottleneck is a compiled extension that
              explicitly releases the GIL (for instance a Cython loop wrapped
              in a "with nogil" block or an expensive call to a library such
              as NumPy).
            - finally, you can register backends by calling
              :func:`~register_parallel_backend`. This will allow you to
              implement a backend of your liking.

            It is not recommended to hard-code the backend name in a call to
            :class:`~Parallel` in a library. Instead it is recommended to set
            soft hints (prefer) or hard constraints (require) so as to make it
            possible for library users to change the backend from the outside
            using the :func:`~parallel_config` context manager.
        return_as: str in {'list', 'generator', 'generator_unordered'}, default='list'
            If 'list', calls to this instance will return a list, only when
            all results have been processed and retrieved.
            If 'generator', it will return a generator that yields the results
            as soon as they are available, in the order the tasks have been
            submitted with.
            If 'generator_unordered', the generator will immediately yield
            available results independently of the submission order. The output
            order is not deterministic in this case because it depends on the
            concurrency of the workers.
        prefer: str in {'processes', 'threads'} or None, default=None
            Soft hint to choose the default backend if no specific backend
            was selected with the :func:`~parallel_config` context manager.
            The default process-based backend is 'loky' and the default
            thread-based backend is 'threading'. Ignored if the ``backend``
            parameter is specified.
        require: 'sharedmem' or None, default=None
            Hard constraint to select the backend. If set to 'sharedmem',
            the selected backend will be single-host and thread-based even
            if the user asked for a non-thread based backend with
            :func:`~joblib.parallel_config`.
        verbose: int, default=0
            The verbosity level: if non zero, progress messages are
            printed. Above 50, the output is sent to stdout.
            The frequency of the messages increases with the verbosity level.
            If it more than 10, all iterations are reported.
        timeout: float or None, default=None
            Timeout limit for each task to complete.  If any task takes longer
            a TimeOutError will be raised. Only applied when n_jobs != 1
        pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'}, default='2*n_jobs'
            The number of batches (of tasks) to be pre-dispatched.
            Default is '2*n_jobs'. When batch_size="auto" this is reasonable
            default and the workers should never starve. Note that only basic
            arithmetics are allowed here and no modules can be used in this
            expression.
        batch_size: int or 'auto', default='auto'
            The number of atomic tasks to dispatch at once to each
            worker. When individual evaluations are very fast, dispatching
            calls to workers can be slower than sequential computation because
            of the overhead. Batching fast computations together can mitigate
            this.
            The ``'auto'`` strategy keeps track of the time it takes for a
            batch to complete, and dynamically adjusts the batch size to keep
            the time on the order of half a second, using a heuristic. The
            initial batch size is 1.
            ``batch_size="auto"`` with ``backend="threading"`` will dispatch
            batches of a single task at a time as the threading backend has
            very little overhead and using larger batch size has not proved to
            bring any gain in that case.
        temp_folder: str or None, default=None
            Folder to be used by the pool for memmapping large arrays
            for sharing memory with worker processes. If None, this will try in
            order:

            - a folder pointed by the JOBLIB_TEMP_FOLDER environment
              variable,
            - /dev/shm if the folder exists and is writable: this is a
              RAM disk filesystem available by default on modern Linux
              distributions,
            - the default system temporary folder that can be
              overridden with TMP, TMPDIR or TEMP environment
              variables, typically /tmp under Unix operating systems.

            Only active when ``backend="loky"`` or ``"multiprocessing"``.
        max_nbytes int, str, or None, optional, default='1M'
            Threshold on the size of arrays passed to the workers that
            triggers automated memory mapping in temp_folder. Can be an int
            in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte.
            Use None to disable memmapping of large arrays.
            Only active when ``backend="loky"`` or ``"multiprocessing"``.
        mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r'
            Memmapping mode for numpy arrays passed to workers. None will
            disable memmapping, other modes defined in the numpy.memmap doc:
            https://numpy.org/doc/stable/reference/generated/numpy.memmap.html
            Also, see 'max_nbytes' parameter documentation for more details.

        Notes
        -----

        This object uses workers to compute in parallel the application of a
        function to many different arguments. The main functionality it brings
        in addition to using the raw multiprocessing or concurrent.futures API
        are (see examples for details):

        * More readable code, in particular since it avoids
          constructing list of arguments.

        * Easier debugging:
            - informative tracebacks even when the error happens on
              the client side
            - using 'n_jobs=1' enables to turn off parallel computing
              for debugging without changing the codepath
            - early capture of pickling errors

        * An optional progress meter.

        * Interruption of multiprocesses jobs with 'Ctrl-C'

        * Flexible pickling control for the communication to and from
          the worker processes.

        * Ability to use shared memory efficiently with worker
          processes for large numpy-based datastructures.

        Note that the intended usage is to run one call at a time. Multiple
        calls to the same Parallel object will result in a ``RuntimeError``

        Examples
        --------

        A simple example:

        >>> from math import sqrt
        >>> from joblib import Parallel, delayed
        >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10))
        [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

        Reshaping the output when the function has several return
        values:

        >>> from math import modf
        >>> from joblib import Parallel, delayed
        >>> r = Parallel(n_jobs=1)(delayed(modf)(i/2.) for i in range(10))
        >>> res, i = zip(*r)
        >>> res
        (0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5)
        >>> i
        (0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0)

        The progress meter: the higher the value of `verbose`, the more
        messages:

        >>> from time import sleep
        >>> from joblib import Parallel, delayed
        >>> r = Parallel(n_jobs=2, verbose=10)(
        ...     delayed(sleep)(.2) for _ in range(10)) #doctest: +SKIP
        [Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    0.6s
        [Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    0.8s
        [Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:    1.4s finished

        Traceback example, note how the line of the error is indicated
        as well as the values of the parameter passed to the function that
        triggered the exception, even though the traceback happens in the
        child process:

        >>> from heapq import nlargest
        >>> from joblib import Parallel, delayed
        >>> Parallel(n_jobs=2)(
        ... delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3))
        ... # doctest: +SKIP
        -----------------------------------------------------------------------
        Sub-process traceback:
        -----------------------------------------------------------------------
        TypeError                                      Mon Nov 12 11:37:46 2012
        PID: 12934                                Python 2.7.3: /usr/bin/python
        ........................................................................
        /usr/lib/python2.7/heapq.pyc in nlargest(n=2, iterable=3, key=None)
            419         if n >= size:
            420             return sorted(iterable, key=key, reverse=True)[:n]
            421
            422     # When key is none, use simpler decoration
            423     if key is None:
        --> 424         it = izip(iterable, count(0,-1))           # decorate
            425         result = _nlargest(n, it)
            426         return map(itemgetter(0), result)          # undecorate
            427
            428     # General case, slowest method
         TypeError: izip argument #1 must support iteration
        _______________________________________________________________________


        Using pre_dispatch in a producer/consumer situation, where the
        data is generated on the fly. Note how the producer is first
        called 3 times before the parallel loop is initiated, and then
        called to generate new data on the fly:

        >>> from math import sqrt
        >>> from joblib import Parallel, delayed
        >>> def producer():
        ...     for i in range(6):
        ...         print('Produced %s' % i)
        ...         yield i
        >>> out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
        ...     delayed(sqrt)(i) for i in producer()) #doctest: +SKIP
        Produced 0
        Produced 1
        Produced 2
        [Parallel(n_jobs=2)]: Done 1 jobs     | elapsed:  0.0s
        Produced 3
        [Parallel(n_jobs=2)]: Done 2 jobs     | elapsed:  0.0s
        Produced 4
        [Parallel(n_jobs=2)]: Done 3 jobs     | elapsed:  0.0s
        Produced 5
        [Parallel(n_jobs=2)]: Done 4 jobs     | elapsed:  0.0s
        [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed:  0.0s remaining: 0.0s
        [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed:  0.0s finished

    '''  # noqa: E501
    def __init__(
        self,
        n_jobs=default_parallel_config["n_jobs"],
        backend=default_parallel_config['backend'],
        return_as="list",
        verbose=default_parallel_config["verbose"],
        timeout=None,
        pre_dispatch='2 * n_jobs',
        batch_size='auto',
        temp_folder=default_parallel_config["temp_folder"],
        max_nbytes=default_parallel_config["max_nbytes"],
        mmap_mode=default_parallel_config["mmap_mode"],
        prefer=default_parallel_config["prefer"],
        require=default_parallel_config["require"],
    ):
        # Initiate parent Logger class state
        super().__init__()

        # Interpret n_jobs=None as 'unset'
        if n_jobs is None:
            n_jobs = default_parallel_config["n_jobs"]

        active_backend, context_config = _get_active_backend(
            prefer=prefer, require=require, verbose=verbose
        )

        nesting_level = active_backend.nesting_level

        self.verbose = _get_config_param(verbose, context_config, "verbose")
        self.timeout = timeout
        self.pre_dispatch = pre_dispatch

        if return_as not in {"list", "generator", "generator_unordered"}:
            raise ValueError(
                'Expected `return_as` parameter to be a string equal to "list"'
                f',"generator" or "generator_unordered", but got {return_as} '
                "instead."
            )
        self.return_as = return_as
        self.return_generator = return_as != "list"
        self.return_ordered = return_as != "generator_unordered"

        # Check if we are under a parallel_config or parallel_backend
        # context manager and use the config from the context manager
        # for arguments that are not explicitly set.
        self._backend_args = {
            k: _get_config_param(param, context_config, k) for param, k in [
                (max_nbytes, "max_nbytes"),
                (temp_folder, "temp_folder"),
                (mmap_mode, "mmap_mode"),
                (prefer, "prefer"),
                (require, "require"),
                (verbose, "verbose"),
            ]
        }

        if isinstance(self._backend_args["max_nbytes"], str):
            self._backend_args["max_nbytes"] = memstr_to_bytes(
                self._backend_args["max_nbytes"]
            )
        self._backend_args["verbose"] = max(
            0, self._backend_args["verbose"] - 50
        )

        if DEFAULT_MP_CONTEXT is not None:
            self._backend_args['context'] = DEFAULT_MP_CONTEXT
        elif hasattr(mp, "get_context"):
            self._backend_args['context'] = mp.get_context()

        if backend is default_parallel_config['backend'] or backend is None:
            backend = active_backend

        elif isinstance(backend, ParallelBackendBase):
            # Use provided backend as is, with the current nesting_level if it
            # is not set yet.
            if backend.nesting_level is None:
                backend.nesting_level = nesting_level

        elif hasattr(backend, 'Pool') and hasattr(backend, 'Lock'):
            # Make it possible to pass a custom multiprocessing context as
            # backend to change the start method to forkserver or spawn or
            # preload modules on the forkserver helper process.
            self._backend_args['context'] = backend
            backend = MultiprocessingBackend(nesting_level=nesting_level)

        elif backend not in BACKENDS and backend in MAYBE_AVAILABLE_BACKENDS:
            warnings.warn(
                f"joblib backend '{backend}' is not available on "
                f"your system, falling back to {DEFAULT_BACKEND}.",
                UserWarning,
                stacklevel=2)
            BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
            backend = BACKENDS[DEFAULT_BACKEND](nesting_level=nesting_level)

        else:
            try:
                backend_factory = BACKENDS[backend]
            except KeyError as e:
                raise ValueError("Invalid backend: %s, expected one of %r"
                                 % (backend, sorted(BACKENDS.keys()))) from e
            backend = backend_factory(nesting_level=nesting_level)

        n_jobs = _get_config_param(n_jobs, context_config, "n_jobs")
        if n_jobs is None:
            # No specific context override and no specific value request:
            # default to the default of the backend.
            n_jobs = backend.default_n_jobs
        try:
            n_jobs = int(n_jobs)
        except ValueError:
            raise ValueError("n_jobs could not be converted to int")
        self.n_jobs = n_jobs

        if (require == 'sharedmem' and
                not getattr(backend, 'supports_sharedmem', False)):
            raise ValueError("Backend %s does not support shared memory"
                             % backend)

        if (batch_size == 'auto' or isinstance(batch_size, Integral) and
                batch_size > 0):
            self.batch_size = batch_size
        else:
            raise ValueError(
                "batch_size must be 'auto' or a positive integer, got: %r"
                % batch_size)

        if not isinstance(backend, SequentialBackend):
            if self.return_generator and not backend.supports_return_generator:
                raise ValueError(
                    "Backend {} does not support "
                    "return_as={}".format(backend, return_as)
                )
            # This lock is used to coordinate the main thread of this process
            # with the async callback thread of our the pool.
            self._lock = threading.RLock()
            self._jobs = collections.deque()
            self._pending_outputs = list()
            self._ready_batches = queue.Queue()
            self._reducer_callback = None

        # Internal variables
        self._backend = backend
        self._running = False
        self._managed_backend = False
        self._id = uuid4().hex
        self._call_ref = None

    def __enter__(self):
        self._managed_backend = True
        self._calling = False
        self._initialize_backend()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self._managed_backend = False
        if self.return_generator and self._calling:
            self._abort()
        self._terminate_and_reset()

    def _initialize_backend(self):
        """Build a process or thread pool and return the number of workers"""
        try:
            n_jobs = self._backend.configure(n_jobs=self.n_jobs, parallel=self,
                                             **self._backend_args)
            if self.timeout is not None and not self._backend.supports_timeout:
                warnings.warn(
                    'The backend class {!r} does not support timeout. '
                    "You have set 'timeout={}' in Parallel but "
                    "the 'timeout' parameter will not be used.".format(
                        self._backend.__class__.__name__,
                        self.timeout))

        except FallbackToBackend as e:
            # Recursively initialize the backend in case of requested fallback.
            self._backend = e.backend
            n_jobs = self._initialize_backend()

        return n_jobs

    def _effective_n_jobs(self):
        if self._backend:
            return self._backend.effective_n_jobs(self.n_jobs)
        return 1

    def _terminate_and_reset(self):
        if hasattr(self._backend, 'stop_call') and self._calling:
            self._backend.stop_call()
        self._calling = False
        if not self._managed_backend:
            self._backend.terminate()

    def _dispatch(self, batch):
        """Queue the batch for computing, with or without multiprocessing

        WARNING: this method is not thread-safe: it should be only called
        indirectly via dispatch_one_batch.

        """
        # If job.get() catches an exception, it closes the queue:
        if self._aborting:
            return

        batch_size = len(batch)

        self.n_dispatched_tasks += batch_size
        self.n_dispatched_batches += 1

        dispatch_timestamp = time.time()

        batch_tracker = BatchCompletionCallBack(
            dispatch_timestamp, batch_size, self
        )

        if self.return_ordered:
            self._jobs.append(batch_tracker)

        # If return_ordered is False, the batch_tracker is not stored in the
        # jobs queue at the time of submission. Instead, it will be appended to
        # the queue by itself as soon as the callback is triggered to be able
        # to return the results in the order of completion.

        job = self._backend.apply_async(batch, callback=batch_tracker)
        batch_tracker.register_job(job)

    def dispatch_next(self):
        """Dispatch more data for parallel processing

        This method is meant to be called concurrently by the multiprocessing
        callback. We rely on the thread-safety of dispatch_one_batch to protect
        against concurrent consumption of the unprotected iterator.

        """
        if not self.dispatch_one_batch(self._original_iterator):
            self._iterating = False
            self._original_iterator = None

    def dispatch_one_batch(self, iterator):
        """Prefetch the tasks for the next batch and dispatch them.

        The effective size of the batch is computed here.
        If there are no more jobs to dispatch, return False, else return True.

        The iterator consumption and dispatching is protected by the same
        lock so calling this function should be thread safe.

        """

        if self._aborting:
            return False

        batch_size = self._get_batch_size()

        with self._lock:
            # to ensure an even distribution of the workload between workers,
            # we look ahead in the original iterators more than batch_size
            # tasks - However, we keep consuming only one batch at each
            # dispatch_one_batch call. The extra tasks are stored in a local
            # queue, _ready_batches, that is looked-up prior to re-consuming
            # tasks from the origal iterator.
            try:
                tasks = self._ready_batches.get(block=False)
            except queue.Empty:
                # slice the iterator n_jobs * batchsize items at a time. If the
                # slice returns less than that, then the current batchsize puts
                # too much weight on a subset of workers, while other may end
                # up starving. So in this case, re-scale the batch size
                # accordingly to distribute evenly the last items between all
                # workers.
                n_jobs = self._cached_effective_n_jobs
                big_batch_size = batch_size * n_jobs

                try:
                    islice = list(itertools.islice(iterator, big_batch_size))
                except Exception as e:
                    # Handle the fact that the generator of task raised an
                    # exception. As this part of the code can be executed in
                    # a thread internal to the backend, register a task with
                    # an error that will be raised in the user's thread.
                    if isinstance(e.__context__, queue.Empty):
                        # Suppress the cause of the exception if it is
                        # queue.Empty to avoid cluttered traceback. Only do it
                        # if the __context__ is really empty to avoid messing
                        # with causes of the original error.
                        e.__cause__ = None
                    batch_tracker = BatchCompletionCallBack(
                        0, batch_size, self
                    )
                    self._jobs.append(batch_tracker)
                    batch_tracker._register_outcome(dict(
                        result=e, status=TASK_ERROR
                    ))
                    return True

                if len(islice) == 0:
                    return False
                elif (iterator is self._original_iterator and
                      len(islice) < big_batch_size):
                    # We reached the end of the original iterator (unless
                    # iterator is the ``pre_dispatch``-long initial slice of
                    # the original iterator) -- decrease the batch size to
                    # account for potential variance in the batches running
                    # time.
                    final_batch_size = max(1, len(islice) // (10 * n_jobs))
                else:
                    final_batch_size = max(1, len(islice) // n_jobs)

                # enqueue n_jobs batches in a local queue
                for i in range(0, len(islice), final_batch_size):
                    tasks = BatchedCalls(islice[i:i + final_batch_size],
                                         self._backend.get_nested_backend(),
                                         self._reducer_callback,
                                         self._pickle_cache)
                    self._ready_batches.put(tasks)

                # finally, get one task.
                tasks = self._ready_batches.get(block=False)
            if len(tasks) == 0:
                # No more tasks available in the iterator: tell caller to stop.
                return False
            else:
                self._dispatch(tasks)
                return True

    def _get_batch_size(self):
        """Returns the effective batch size for dispatch"""
        if self.batch_size == 'auto':
            return self._backend.compute_batch_size()
        else:
            # Fixed batch size strategy
            return self.batch_size

    def _print(self, msg):
        """Display the message on stout or stderr depending on verbosity"""
        # XXX: Not using the logger framework: need to
        # learn to use logger better.
        if not self.verbose:
            return
        if self.verbose < 50:
            writer = sys.stderr.write
        else:
            writer = sys.stdout.write
        writer(f"[{self}]: {msg}\n")

    def _is_completed(self):
        """Check if all tasks have been completed"""
        return self.n_completed_tasks == self.n_dispatched_tasks and not (
            self._iterating or self._aborting
        )

    def print_progress(self):
        """Display the process of the parallel execution only a fraction
           of time, controlled by self.verbose.
        """

        if not self.verbose:
            return

        elapsed_time = time.time() - self._start_time

        if self._is_completed():
            # Make sure that we get a last message telling us we are done
            self._print(
                f"Done {self.n_completed_tasks:3d} out of "
                f"{self.n_completed_tasks:3d} | elapsed: "
                f"{short_format_time(elapsed_time)} finished"
            )
            return

        # Original job iterator becomes None once it has been fully
        # consumed: at this point we know the total number of jobs and we are
        # able to display an estimation of the remaining time based on already
        # completed jobs. Otherwise, we simply display the number of completed
        # tasks.
        elif self._original_iterator is not None:
            if _verbosity_filter(self.n_dispatched_batches, self.verbose):
                return
            self._print(
                f"Done {self.n_completed_tasks:3d} tasks      | elapsed: "
                f"{short_format_time(elapsed_time)}"
            )
        else:
            index = self.n_completed_tasks
            # We are finished dispatching
            total_tasks = self.n_dispatched_tasks
            # We always display the first loop
            if not index == 0:
                # Display depending on the number of remaining items
                # A message as soon as we finish dispatching, cursor is 0
                cursor = (total_tasks - index + 1 -
                          self._pre_dispatch_amount)
                frequency = (total_tasks // self.verbose) + 1
                is_last_item = (index + 1 == total_tasks)
                if (is_last_item or cursor % frequency):
                    return
            remaining_time = (elapsed_time / index) * \
                             (self.n_dispatched_tasks - index * 1.0)
            # only display status if remaining time is greater or equal to 0
            self._print(
                f"Done {index:3d} out of {total_tasks:3d} | elapsed: "
                f"{short_format_time(elapsed_time)} remaining: "
                f"{short_format_time(remaining_time)}"
            )

    def _abort(self):
        # Stop dispatching new jobs in the async callback thread
        self._aborting = True

        # If the backend allows it, cancel or kill remaining running
        # tasks without waiting for the results as we will raise
        # the exception we got back to the caller instead of returning
        # any result.
        backend = self._backend
        if (not self._aborted and hasattr(backend, 'abort_everything')):
            # If the backend is managed externally we need to make sure
            # to leave it in a working state to allow for future jobs
            # scheduling.
            ensure_ready = self._managed_backend
            backend.abort_everything(ensure_ready=ensure_ready)
        self._aborted = True

    def _start(self, iterator, pre_dispatch):
        # Only set self._iterating to True if at least a batch
        # was dispatched. In particular this covers the edge
        # case of Parallel used with an exhausted iterator. If
        # self._original_iterator is None, then this means either
        # that pre_dispatch == "all", n_jobs == 1 or that the first batch
        # was very quick and its callback already dispatched all the
        # remaining jobs.
        self._iterating = False
        if self.dispatch_one_batch(iterator):
            self._iterating = self._original_iterator is not None

        while self.dispatch_one_batch(iterator):
            pass

        if pre_dispatch == "all":
            # The iterable was consumed all at once by the above for loop.
            # No need to wait for async callbacks to trigger to
            # consumption.
            self._iterating = False

    def _get_outputs(self, iterator, pre_dispatch):
        """Iterator returning the tasks' output as soon as they are ready."""
        dispatch_thread_id = threading.get_ident()
        detach_generator_exit = False
        try:
            self._start(iterator, pre_dispatch)
            # first yield returns None, for internal use only. This ensures
            # that we enter the try/except block and start dispatching the
            # tasks.
            yield

            with self._backend.retrieval_context():
                yield from self._retrieve()

        except GeneratorExit:
            # The generator has been garbage collected before being fully
            # consumed. This aborts the remaining tasks if possible and warn
            # the user if necessary.
            self._exception = True

            # In some interpreters such as PyPy, GeneratorExit can be raised in
            # a different thread than the one used to start the dispatch of the
            # parallel tasks. This can lead to hang when a thread attempts to
            # join itself. As workaround, we detach the execution of the
            # aborting code to a dedicated thread. We then need to make sure
            # the rest of the function does not call `_terminate_and_reset`
            # in finally.
            if dispatch_thread_id != threading.get_ident():
                if not IS_PYPY:
                    warnings.warn(
                        "A generator produced by joblib.Parallel has been "
                        "gc'ed in an unexpected thread. This behavior should "
                        "not cause major -issues but to make sure, please "
                        "report this warning and your use case at "
                        "https://github.com/joblib/joblib/issues so it can "
                        "be investigated."
                    )

                detach_generator_exit = True
                _parallel = self

                class _GeneratorExitThread(threading.Thread):
                    def run(self):
                        _parallel._abort()
                        if _parallel.return_generator:
                            _parallel._warn_exit_early()
                        _parallel._terminate_and_reset()

                _GeneratorExitThread(
                    name="GeneratorExitThread"
                ).start()
                return

            # Otherwise, we are in the thread that started the dispatch: we can
            # safely abort the execution and warn the user.
            self._abort()
            if self.return_generator:
                self._warn_exit_early()

            raise

        # Note: we catch any BaseException instead of just Exception instances
        # to also include KeyboardInterrupt
        except BaseException:
            self._exception = True
            self._abort()
            raise
        finally:
            # Store the unconsumed tasks and terminate the workers if necessary
            _remaining_outputs = ([] if self._exception else self._jobs)
            self._jobs = collections.deque()
            self._running = False
            if not detach_generator_exit:
                self._terminate_and_reset()

        while len(_remaining_outputs) > 0:
            batched_results = _remaining_outputs.popleft()
            batched_results = batched_results.get_result(self.timeout)
            for result in batched_results:
                yield result

    def _wait_retrieval(self):
        """Return True if we need to continue retrieving some tasks."""

        # If the input load is still being iterated over, it means that tasks
        # are still on the dispatch waitlist and their results will need to
        # be retrieved later on.
        if self._iterating:
            return True

        # If some of the dispatched tasks are still being processed by the
        # workers, wait for the compute to finish before starting retrieval
        if self.n_completed_tasks < self.n_dispatched_tasks:
            return True

        # For backends that does not support retrieving asynchronously the
        # result to the main process, all results must be carefully retrieved
        # in the _retrieve loop in the main thread while the backend is alive.
        # For other backends, the actual retrieval is done asynchronously in
        # the callback thread, and we can terminate the backend before the
        # `self._jobs` result list has been emptied. The remaining results
        # will be collected in the `finally` step of the generator.
        if not self._backend.supports_retrieve_callback:
            if len(self._jobs) > 0:
                return True

        return False

    def _retrieve(self):
        while self._wait_retrieval():

            # If the callback thread of a worker has signaled that its task
            # triggered an exception, or if the retrieval loop has raised an
            # exception (e.g. `GeneratorExit`), exit the loop and surface the
            # worker traceback.
            if self._aborting:
                self._raise_error_fast()
                break

            # If the next job is not ready for retrieval yet, we just wait for
            # async callbacks to progress.
            if ((len(self._jobs) == 0) or
                (self._jobs[0].get_status(
                    timeout=self.timeout) == TASK_PENDING)):
                time.sleep(0.01)
                continue

            # We need to be careful: the job list can be filling up as
            # we empty it and Python list are not thread-safe by
            # default hence the use of the lock
            with self._lock:
                batched_results = self._jobs.popleft()

            # Flatten the batched results to output one output at a time
            batched_results = batched_results.get_result(self.timeout)
            for result in batched_results:
                self._nb_consumed += 1
                yield result

    def _raise_error_fast(self):
        """If we are aborting, raise if a job caused an error."""

        # Find the first job whose status is TASK_ERROR if it exists.
        with self._lock:
            error_job = next((job for job in self._jobs
                              if job.status == TASK_ERROR), None)

        # If this error job exists, immediately raise the error by
        # calling get_result. This job might not exists if abort has been
        # called directly or if the generator is gc'ed.
        if error_job is not None:
            error_job.get_result(self.timeout)

    def _warn_exit_early(self):
        """Warn the user if the generator is gc'ed before being consumned."""
        ready_outputs = self.n_completed_tasks - self._nb_consumed
        is_completed = self._is_completed()
        msg = ""
        if ready_outputs:
            msg += (
                f"{ready_outputs} tasks have been successfully executed "
                " but not used."
            )
            if not is_completed:
                msg += " Additionally, "

        if not is_completed:
            msg += (
                f"{self.n_dispatched_tasks - self.n_completed_tasks} tasks "
                "which were still being processed by the workers have been "
                "cancelled."
            )

        if msg:
            msg += (
                " You could benefit from adjusting the input task "
                "iterator to limit unnecessary computation time."
            )

            warnings.warn(msg)

    def _get_sequential_output(self, iterable):
        """Separate loop for sequential output.

        This simplifies the traceback in case of errors and reduces the
        overhead of calling sequential tasks with `joblib`.
        """
        try:
            self._iterating = True
            self._original_iterator = iterable
            batch_size = self._get_batch_size()

            if batch_size != 1:
                it = iter(iterable)
                iterable_batched = iter(
                    lambda: tuple(itertools.islice(it, batch_size)), ()
                )
                iterable = (
                    task for batch in iterable_batched for task in batch
                )

            # first yield returns None, for internal use only. This ensures
            # that we enter the try/except block and setup the generator.
            yield None

            # Sequentially call the tasks and yield the results.
            for func, args, kwargs in iterable:
                self.n_dispatched_batches += 1
                self.n_dispatched_tasks += 1
                res = func(*args, **kwargs)
                self.n_completed_tasks += 1
                self.print_progress()
                yield res
                self._nb_consumed += 1
        except BaseException:
            self._exception = True
            self._aborting = True
            self._aborted = True
            raise
        finally:
            self.print_progress()
            self._running = False
            self._iterating = False
            self._original_iterator = None

    def _reset_run_tracking(self):
        """Reset the counters and flags used to track the execution."""

        # Makes sur the parallel instance was not previously running in a
        # thread-safe way.
        with getattr(self, '_lock', nullcontext()):
            if self._running:
                msg = 'This Parallel instance is already running !'
                if self.return_generator is True:
                    msg += (
                        " Before submitting new tasks, you must wait for the "
                        "completion of all the previous tasks, or clean all "
                        "references to the output generator."
                    )
                raise RuntimeError(msg)
            self._running = True

        # Counter to keep track of the task dispatched and completed.
        self.n_dispatched_batches = 0
        self.n_dispatched_tasks = 0
        self.n_completed_tasks = 0

        # Following count is incremented by one each time the user iterates
        # on the output generator, it is used to prepare an informative
        # warning message in case the generator is deleted before all the
        # dispatched tasks have been consumed.
        self._nb_consumed = 0

        # Following flags are used to synchronize the threads in case one of
        # the tasks error-out to ensure that all workers abort fast and that
        # the backend terminates properly.

        # Set to True as soon as a worker signals that a task errors-out
        self._exception = False
        # Set to True in case of early termination following an incident
        self._aborting = False
        # Set to True after abortion is complete
        self._aborted = False

    def __call__(self, iterable):
        """Main function to dispatch parallel tasks."""

        self._reset_run_tracking()
        self._start_time = time.time()

        if not self._managed_backend:
            n_jobs = self._initialize_backend()
        else:
            n_jobs = self._effective_n_jobs()

        if n_jobs == 1:
            # If n_jobs==1, run the computation sequentially and return
            # immediately to avoid overheads.
            output = self._get_sequential_output(iterable)
            next(output)
            return output if self.return_generator else list(output)

        # Let's create an ID that uniquely identifies the current call. If the
        # call is interrupted early and that the same instance is immediately
        # re-used, this id will be used to prevent workers that were
        # concurrently finalizing a task from the previous call to run the
        # callback.
        with self._lock:
            self._call_id = uuid4().hex

        # self._effective_n_jobs should be called in the Parallel.__call__
        # thread only -- store its value in an attribute for further queries.
        self._cached_effective_n_jobs = n_jobs

        if isinstance(self._backend, LokyBackend):
            # For the loky backend, we add a callback executed when reducing
            # BatchCalls, that makes the loky executor use a temporary folder
            # specific to this Parallel object when pickling temporary memmaps.
            # This callback is necessary to ensure that several Parallel
            # objects using the same reusable executor don't use the same
            # temporary resources.

            def _batched_calls_reducer_callback():
                # Relevant implementation detail: the following lines, called
                # when reducing BatchedCalls, are called in a thread-safe
                # situation, meaning that the context of the temporary folder
                # manager will not be changed in between the callback execution
                # and the end of the BatchedCalls pickling. The reason is that
                # pickling (the only place where set_current_context is used)
                # is done from a single thread (the queue_feeder_thread).
                self._backend._workers._temp_folder_manager.set_current_context(  # noqa
                    self._id
                )
            self._reducer_callback = _batched_calls_reducer_callback

        # self._effective_n_jobs should be called in the Parallel.__call__
        # thread only -- store its value in an attribute for further queries.
        self._cached_effective_n_jobs = n_jobs

        backend_name = self._backend.__class__.__name__
        if n_jobs == 0:
            raise RuntimeError("%s has no active worker." % backend_name)

        self._print(
            f"Using backend {backend_name} with {n_jobs} concurrent workers."
        )
        if hasattr(self._backend, 'start_call'):
            self._backend.start_call()

        # Following flag prevents double calls to `backend.stop_call`.
        self._calling = True

        iterator = iter(iterable)
        pre_dispatch = self.pre_dispatch

        if pre_dispatch == 'all':
            # prevent further dispatch via multiprocessing callback thread
            self._original_iterator = None
            self._pre_dispatch_amount = 0
        else:
            self._original_iterator = iterator
            if hasattr(pre_dispatch, 'endswith'):
                pre_dispatch = eval_expr(
                    pre_dispatch.replace("n_jobs", str(n_jobs))
                )
            self._pre_dispatch_amount = pre_dispatch = int(pre_dispatch)

            # The main thread will consume the first pre_dispatch items and
            # the remaining items will later be lazily dispatched by async
            # callbacks upon task completions.

            # TODO: this iterator should be batch_size * n_jobs
            iterator = itertools.islice(iterator, self._pre_dispatch_amount)

        # Use a caching dict for callables that are pickled with cloudpickle to
        # improve performances. This cache is used only in the case of
        # functions that are defined in the __main__ module, functions that
        # are defined locally (inside another function) and lambda expressions.
        self._pickle_cache = dict()

        output = self._get_outputs(iterator, pre_dispatch)
        self._call_ref = weakref.ref(output)

        # The first item from the output is blank, but it makes the interpreter
        # progress until it enters the Try/Except block of the generator and
        # reaches the first `yield` statement. This starts the asynchronous
        # dispatch of the tasks to the workers.
        next(output)

        return output if self.return_generator else list(output)

    def __repr__(self):
        return '%s(n_jobs=%s)' % (self.__class__.__name__, self.n_jobs)