# Copyright (c) Facebook, Inc. and its affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import argparse import unittest import torch from fairseq.sequence_generator import SequenceGenerator import tests.utils as test_utils class TestSequenceGeneratorBase(unittest.TestCase): def assertHypoTokens(self, hypo, tokens): self.assertTensorEqual(hypo['tokens'], torch.LongTensor(tokens)) def assertHypoScore(self, hypo, pos_probs, normalized=True, lenpen=1.): pos_scores = torch.FloatTensor(pos_probs).log() self.assertAlmostEqual(hypo['positional_scores'], pos_scores) self.assertEqual(pos_scores.numel(), hypo['tokens'].numel()) score = pos_scores.sum() if normalized: score /= pos_scores.numel()**lenpen self.assertLess(abs(score - hypo['score']), 1e-6) def assertAlmostEqual(self, t1, t2): self.assertEqual(t1.size(), t2.size(), "size mismatch") self.assertLess((t1 - t2).abs().max(), 1e-4) def assertTensorEqual(self, t1, t2): self.assertEqual(t1.size(), t2.size(), "size mismatch") self.assertEqual(t1.ne(t2).long().sum(), 0) class TestSequenceGenerator(TestSequenceGeneratorBase): def setUp(self): self.tgt_dict, self.w1, self.w2, src_tokens, src_lengths, self.model = ( test_utils.sequence_generator_setup() ) self.sample = { 'net_input': { 'src_tokens': src_tokens, 'src_lengths': src_lengths, }, } def test_with_normalization(self): generator = SequenceGenerator(self.tgt_dict, beam_size=2) hypos = generator.generate([self.model], self.sample) eos, w1, w2 = self.tgt_dict.eos(), self.w1, self.w2 # sentence 1, beam 1 self.assertHypoTokens(hypos[0][0], [w1, eos]) self.assertHypoScore(hypos[0][0], [0.9, 1.0]) # sentence 1, beam 2 self.assertHypoTokens(hypos[0][1], [w2, w1, w2, eos]) self.assertHypoScore(hypos[0][1], [0.1, 0.9, 0.9, 1.0]) # sentence 2, beam 1 self.assertHypoTokens(hypos[1][0], [w1, w2, w1, eos]) self.assertHypoScore(hypos[1][0], [0.7, 0.4, 0.4, 1.0]) # sentence 2, beam 2 self.assertHypoTokens(hypos[1][1], [w1, w2, eos]) self.assertHypoScore(hypos[1][1], [0.7, 0.4, 0.6]) def test_without_normalization(self): # Sentence 1: unchanged from the normalized case # Sentence 2: beams swap order generator = SequenceGenerator(self.tgt_dict, beam_size=2, normalize_scores=False) hypos = generator.generate([self.model], self.sample) eos, w1, w2 = self.tgt_dict.eos(), self.w1, self.w2 # sentence 1, beam 1 self.assertHypoTokens(hypos[0][0], [w1, eos]) self.assertHypoScore(hypos[0][0], [0.9, 1.0], normalized=False) # sentence 1, beam 2 self.assertHypoTokens(hypos[0][1], [w2, w1, w2, eos]) self.assertHypoScore(hypos[0][1], [0.1, 0.9, 0.9, 1.0], normalized=False) # sentence 2, beam 1 self.assertHypoTokens(hypos[1][0], [w1, w2, eos]) self.assertHypoScore(hypos[1][0], [0.7, 0.4, 0.6], normalized=False) # sentence 2, beam 2 self.assertHypoTokens(hypos[1][1], [w1, w2, w1, eos]) self.assertHypoScore(hypos[1][1], [0.7, 0.4, 0.4, 1.0], normalized=False) def test_with_lenpen_favoring_short_hypos(self): lenpen = 0.6 generator = SequenceGenerator(self.tgt_dict, beam_size=2, len_penalty=lenpen) hypos = generator.generate([self.model], self.sample) eos, w1, w2 = self.tgt_dict.eos(), self.w1, self.w2 # sentence 1, beam 1 self.assertHypoTokens(hypos[0][0], [w1, eos]) self.assertHypoScore(hypos[0][0], [0.9, 1.0], lenpen=lenpen) # sentence 1, beam 2 self.assertHypoTokens(hypos[0][1], [w2, w1, w2, eos]) self.assertHypoScore(hypos[0][1], [0.1, 0.9, 0.9, 1.0], lenpen=lenpen) # sentence 2, beam 1 self.assertHypoTokens(hypos[1][0], [w1, w2, eos]) self.assertHypoScore(hypos[1][0], [0.7, 0.4, 0.6], lenpen=lenpen) # sentence 2, beam 2 self.assertHypoTokens(hypos[1][1], [w1, w2, w1, eos]) self.assertHypoScore(hypos[1][1], [0.7, 0.4, 0.4, 1.0], lenpen=lenpen) def test_with_lenpen_favoring_long_hypos(self): lenpen = 5.0 generator = SequenceGenerator(self.tgt_dict, beam_size=2, len_penalty=lenpen) hypos = generator.generate([self.model], self.sample) eos, w1, w2 = self.tgt_dict.eos(), self.w1, self.w2 # sentence 1, beam 1 self.assertHypoTokens(hypos[0][0], [w2, w1, w2, eos]) self.assertHypoScore(hypos[0][0], [0.1, 0.9, 0.9, 1.0], lenpen=lenpen) # sentence 1, beam 2 self.assertHypoTokens(hypos[0][1], [w1, eos]) self.assertHypoScore(hypos[0][1], [0.9, 1.0], lenpen=lenpen) # sentence 2, beam 1 self.assertHypoTokens(hypos[1][0], [w1, w2, w1, eos]) self.assertHypoScore(hypos[1][0], [0.7, 0.4, 0.4, 1.0], lenpen=lenpen) # sentence 2, beam 2 self.assertHypoTokens(hypos[1][1], [w1, w2, eos]) self.assertHypoScore(hypos[1][1], [0.7, 0.4, 0.6], lenpen=lenpen) def test_maxlen(self): generator = SequenceGenerator(self.tgt_dict, beam_size=2, max_len_b=2) hypos = generator.generate([self.model], self.sample) eos, w1, w2 = self.tgt_dict.eos(), self.w1, self.w2 # sentence 1, beam 1 self.assertHypoTokens(hypos[0][0], [w1, eos]) self.assertHypoScore(hypos[0][0], [0.9, 1.0]) # sentence 1, beam 2 self.assertHypoTokens(hypos[0][1], [w2, w2, eos]) self.assertHypoScore(hypos[0][1], [0.1, 0.1, 0.6]) # sentence 2, beam 1 self.assertHypoTokens(hypos[1][0], [w1, w2, eos]) self.assertHypoScore(hypos[1][0], [0.7, 0.4, 0.6]) # sentence 2, beam 2 self.assertHypoTokens(hypos[1][1], [w2, w2, eos]) self.assertHypoScore(hypos[1][1], [0.3, 0.9, 0.01]) class TestDiverseBeamSearch(TestSequenceGeneratorBase): def setUp(self): # construct dummy dictionary d = test_utils.dummy_dictionary(vocab_size=2) self.assertEqual(d.pad(), 1) self.assertEqual(d.eos(), 2) self.assertEqual(d.unk(), 3) self.eos = d.eos() self.w1 = 4 self.w2 = 5 # construct source data self.src_tokens = torch.LongTensor([ [self.w1, self.w2, self.eos], [self.w1, self.w2, self.eos], ]) self.src_lengths = torch.LongTensor([2, 2]) args = argparse.Namespace() unk = 0. args.beam_probs = [ # step 0: torch.FloatTensor([ # eos w1 w2 # sentence 1: [0.0, unk, 0.9, 0.1], # beam 1 [0.0, unk, 0.9, 0.1], # beam 2 # sentence 2: [0.0, unk, 0.7, 0.3], [0.0, unk, 0.7, 0.3], ]), # step 1: torch.FloatTensor([ # eos w1 w2 # sentence 1: [0.0, unk, 0.6, 0.4], [0.0, unk, 0.6, 0.4], # sentence 2: [0.25, unk, 0.35, 0.4], [0.25, unk, 0.35, 0.4], ]), # step 2: torch.FloatTensor([ # eos w1 w2 # sentence 1: [1.0, unk, 0.0, 0.0], [1.0, unk, 0.0, 0.0], # sentence 2: [0.9, unk, 0.1, 0.0], [0.9, unk, 0.1, 0.0], ]), ] task = test_utils.TestTranslationTask.setup_task(args, d, d) self.model = task.build_model(args) self.tgt_dict = task.target_dictionary def test_diverse_beam_search(self): generator = SequenceGenerator( self.tgt_dict, beam_size=2, diverse_beam_groups=2, diverse_beam_strength=0., ) sample = {'net_input': {'src_tokens': self.src_tokens, 'src_lengths': self.src_lengths}} hypos = generator.generate([self.model], sample) eos, w1, w2 = self.eos, self.w1, self.w2 # sentence 1, beam 1 self.assertHypoTokens(hypos[0][0], [w1, w1, eos]) self.assertHypoScore(hypos[0][0], [0.9, 0.6, 1.0]) # sentence 1, beam 2 self.assertHypoTokens(hypos[0][1], [w1, w1, eos]) self.assertHypoScore(hypos[0][1], [0.9, 0.6, 1.0]) # sentence 2, beam 1 self.assertHypoTokens(hypos[1][0], [w1, w2, eos]) self.assertHypoScore(hypos[1][0], [0.7, 0.4, 0.9]) # sentence 2, beam 2 self.assertHypoTokens(hypos[1][1], [w1, w2, eos]) self.assertHypoScore(hypos[1][1], [0.7, 0.4, 0.9]) class TestTopPSamplingSearch(TestSequenceGeneratorBase): def setUp(self): # construct dummy dictionary d = test_utils.dummy_dictionary(vocab_size=2) self.assertEqual(d.pad(), 1) self.assertEqual(d.eos(), 2) self.assertEqual(d.unk(), 3) self.eos = d.eos() self.w1 = 4 self.w2 = 5 # construct source data self.src_tokens = torch.LongTensor([ [self.w1, self.w2, self.eos], [self.w1, self.w2, self.eos], ]) self.src_lengths = torch.LongTensor([2, 2]) args = argparse.Namespace() unk = 0. # The minimal probability of top 2 tokens. self.min_top2_prob = 0.75 # The minimal probability of the top 1 token. self.min_top1_prob = 0.4 w1_prob = self.min_top1_prob w2_prob = self.min_top2_prob - self.min_top1_prob eos_prob = 1 - self.min_top2_prob args.beam_probs = [ # step 0: torch.FloatTensor([ # eos w1 w2 [0.0, unk, 1.0, 0.0], [0.0, unk, 1.0, 0.0], [0.0, unk, 1.0, 0.0], [0.0, unk, 1.0, 0.0], ]), # step 1: torch.FloatTensor([ # eos w1 w2 [eos_prob, unk, w1_prob, w2_prob], [eos_prob, unk, w1_prob, w2_prob], [eos_prob, unk, w1_prob, w2_prob], [eos_prob, unk, w1_prob, w2_prob], ]), # step 2: torch.FloatTensor([ # eos w1 w2 [1.0, unk, 0.0, 0.0], [1.0, unk, 0.0, 0.0], [1.0, unk, 0.0, 0.0], [1.0, unk, 0.0, 0.0], ]), ] task = test_utils.TestTranslationTask.setup_task(args, d, d) self.model = task.build_model(args) self.tgt_dict = task.target_dictionary def test_topp_sampling_search_low_prob(self): # Given a prob low enough to top-P sampling, we expect only the top # 1 token to be sampled, which always results in the same output. low_sampling_topp = self.min_top1_prob/2.0 generator = SequenceGenerator( self.tgt_dict, beam_size=2, sampling=True, sampling_topp=low_sampling_topp ) sample = { 'net_input': { 'src_tokens': self.src_tokens, 'src_lengths': self.src_lengths } } hypos = generator.generate([self.model], sample) eos, w1 = self.eos, self.w1 # sentence 1, beam 1 self.assertHypoTokens(hypos[0][0], [w1, w1, eos]) self.assertHypoScore(hypos[0][0], [1.0, 0.4, 1.0]) # sentence 1, beam 2 self.assertHypoTokens(hypos[0][1], [w1, w1, eos]) self.assertHypoScore(hypos[0][1], [1.0, 0.4, 1.0]) # sentence 2, beam 1 self.assertHypoTokens(hypos[1][0], [w1, w1, eos]) self.assertHypoScore(hypos[1][0], [1.0, 0.4, 1.0]) # sentence 2, beam 2 self.assertHypoTokens(hypos[1][1], [w1, w1, eos]) self.assertHypoScore(hypos[1][1], [1.0, 0.4, 1.0]) def test_topp_sampling_search_high_prob(self): # Given a prob high enough to top-P sampling, any of the top 2 # tokens could be sampled. This can cause different outputs. high_sampling_topp = (self.min_top1_prob+self.min_top2_prob)/2.0 generator = SequenceGenerator( self.tgt_dict, beam_size=2, sampling=True, sampling_topp=high_sampling_topp ) sample = { 'net_input': { 'src_tokens': self.src_tokens, 'src_lengths': self.src_lengths } } hypos = generator.generate([self.model], sample) eos, w1, w2 = self.eos, self.w1, self.w2 # sentence 1, beam 1 self.assertTrue(self.hypoTokens(hypos[0][0], [w1, w1, eos]) or self.hypoTokens(hypos[0][0], [w1, w2, eos])) self.assertTrue(self.hypoScore(hypos[0][0], [1.0, 0.4, 1.0]) or self.hypoScore(hypos[0][0], [1.0, 0.35, 1.0])) # sentence 1, beam 2 self.assertTrue(self.hypoTokens(hypos[0][1], [w1, w1, eos]) or self.hypoTokens(hypos[0][1], [w1, w2, eos])) self.assertTrue(self.hypoScore(hypos[0][1], [1.0, 0.4, 1.0]) or self.hypoScore(hypos[0][1], [1.0, 0.35, 1.0])) # sentence 2, beam 1 self.assertTrue(self.hypoTokens(hypos[1][0], [w1, w1, eos]) or self.hypoTokens(hypos[1][0], [w1, w2, eos])) self.assertTrue(self.hypoScore(hypos[1][0], [1.0, 0.4, 1.0]) or self.hypoScore(hypos[1][0], [1.0, 0.35, 1.0])) # sentence 2, beam 2 self.assertTrue(self.hypoTokens(hypos[1][1], [w1, w1, eos]) or self.hypoTokens(hypos[1][1], [w1, w2, eos])) self.assertTrue(self.hypoScore(hypos[1][1], [1.0, 0.4, 1.0]) or self.hypoScore(hypos[1][1], [1.0, 0.35, 1.0])) def hypoTokens(self, hypo, tokens): return self.tensorEqual(hypo['tokens'], torch.LongTensor(tokens)) def hypoScore(self, hypo, pos_probs, normalized=True, lenpen=1.): pos_scores = torch.FloatTensor(pos_probs).log() if not self.almostEqual(hypo['positional_scores'], pos_scores): return False if pos_scores.numel() != hypo['tokens'].numel(): return False score = pos_scores.sum() if normalized: score /= pos_scores.numel() ** lenpen return abs(score - hypo['score']) < 1e-6 def almostEqual(self, t1, t2): return t1.size() == t2.size() and (t1 - t2).abs().max() < 1e-4 def tensorEqual(self, t1, t2): return t1.size() == t2.size() and t1.ne(t2).long().sum() == 0 if __name__ == '__main__': unittest.main()