Spaces:
Sleeping
Sleeping
| import warnings | |
| warnings.simplefilter(action='ignore', category=FutureWarning) | |
| from src.application.text.preprocessing import split_into_sentences | |
| from src.application.text.search import generate_search_phrases, search_by_google | |
| from src.application.url_reader import URLReader | |
| import numpy as np | |
| import nltk | |
| import torch | |
| from nltk.corpus import stopwords | |
| from sentence_transformers import SentenceTransformer, util | |
| import math | |
| from difflib import SequenceMatcher | |
| # Download necessary NLTK data files | |
| nltk.download('punkt', quiet=True) | |
| nltk.download('punkt_tab', quiet=True) | |
| nltk.download('stopwords', quiet=True) | |
| # load the model | |
| DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| PARAPHASE_MODEL = SentenceTransformer('paraphrase-MiniLM-L6-v2') | |
| PARAPHASE_MODEL.to(DEVICE) | |
| BATCH_SIZE = 8 | |
| PARAPHRASE_THRESHOLD = 0.8 | |
| PARAPHRASE_THRESHOLD_FOR_OPPOSITE = 0.7 | |
| MIN_SAME_SENTENCE_LEN = 6 | |
| MIN_PHRASE_SENTENCE_LEN = 10 | |
| MIN_RATIO_PARAPHRASE_NUM = 0.7 | |
| MAX_CHAR_SIZE = 30000 | |
| def detect_text_by_relative_search(input_text, index, is_support_opposite = False): | |
| checked_urls = set() | |
| searched_phrases = generate_search_phrases(input_text[index]) | |
| for candidate in searched_phrases: | |
| search_results = search_by_google(candidate) | |
| urls = [item['link'] for item in search_results.get("items", [])] | |
| for url in urls[:10]: | |
| if url in checked_urls: # visited url | |
| continue | |
| if "bbc.com" not in url: | |
| continue | |
| checked_urls.add(url) | |
| print(f"\t\tChecking URL: {url}") | |
| content = URLReader(url) | |
| if content.is_extracted is True: | |
| if content.title is None or content.text is None: | |
| print(f"\t\t\tβββ Title or text not found") | |
| continue | |
| page_text = content.title + "\n" + content.text | |
| if len(page_text) > MAX_CHAR_SIZE: | |
| print(f"\t\t\tβββ More than {MAX_CHAR_SIZE} characters") | |
| continue | |
| print(f"\t\t\tβββ Title: {content.title}") | |
| paraphrase, aligned_first_sentences = check_paraphrase(input_text[index], page_text, url) | |
| if paraphrase is False: | |
| return paraphrase, url, aligned_first_sentences, content.images, index | |
| sub_paraphrase = True | |
| while sub_paraphrase == True: | |
| index += 1 | |
| print(f"----search {index} < {len(input_text)}----") | |
| if index >= len(input_text): | |
| print(f"input_text_last: {input_text[-1]}") | |
| break | |
| print(f"input_text: {input_text[index]}") | |
| sub_paraphrase, sub_sentences = check_paraphrase(input_text[index], page_text, url) | |
| print(f"sub_paraphrase: {sub_paraphrase}") | |
| print(f"sub_sentences: {sub_sentences}") | |
| if sub_paraphrase == True: | |
| aligned_first_sentences["input_sentence"] += "<br>" + sub_sentences["input_sentence"] | |
| aligned_first_sentences["matched_sentence"] += "<br>" + sub_sentences["matched_sentence"] | |
| aligned_first_sentences["similarity"] += sub_sentences["similarity"] | |
| aligned_first_sentences["similarity"] /= 2 | |
| print(f"paraphrase: {paraphrase}") | |
| print(f"aligned_first_sentences: {aligned_first_sentences}") | |
| return paraphrase, url, aligned_first_sentences, content.images, index | |
| return False, None, [], [], index | |
| def longest_common_subsequence(arr1, arr2): | |
| """ | |
| Finds the length of the longest common subsequence (contiguous) between | |
| two arrays. | |
| Args: | |
| arr1: The first array. | |
| arr2: The second array. | |
| Returns: | |
| The length of the longest common subsequence. | |
| Returns 0 if either input is invalid. | |
| """ | |
| if not isinstance(arr1, list) or not isinstance(arr2, list): | |
| return 0 | |
| n = len(arr1) | |
| m = len(arr2) | |
| if n == 0 or m == 0: #handle empty list | |
| return 0 | |
| # Create table dp with size (n+1) x (m+1) | |
| dp = [[0] * (m + 1) for _ in range(n + 1)] | |
| max_length = 0 | |
| for i in range(1, n + 1): | |
| for j in range(1, m + 1): | |
| if arr1[i - 1] == arr2[j - 1]: | |
| dp[i][j] = dp[i - 1][j - 1] + 1 | |
| max_length = max(max_length, dp[i][j]) | |
| else: | |
| dp[i][j] = 0 # set 0 since the array must be consecutive | |
| return max_length | |
| def check_sentence(input_sentence, source_sentence, min_same_sentence_len, | |
| min_phrase_sentence_len, verbose=False): | |
| """ | |
| Checks if two sentences are similar based on exact match or | |
| longest common subsequence. | |
| Args: | |
| input_sentence: The input sentence. | |
| source_sentence: The source sentence. | |
| min_same_sentence_len: Minimum length for exact sentence match. | |
| min_phrase_sentence_len: Minimum length for common subsequence match. | |
| verbose: If True, print debug information. | |
| Returns: | |
| True if the sentences are considered similar, False otherwise. | |
| Returns False if input is not valid. | |
| """ | |
| if not isinstance(input_sentence, str) or not isinstance(source_sentence, str): | |
| return False | |
| input_sentence = input_sentence.strip() | |
| source_sentence = source_sentence.strip() | |
| if not input_sentence or not source_sentence: # handle empty string | |
| return False | |
| input_words = input_sentence.split() # split without arguments | |
| source_words = source_sentence.split() # split without arguments | |
| if input_sentence == source_sentence and len(input_words) >= min_same_sentence_len: | |
| if verbose: | |
| print("Exact match found.") | |
| return True | |
| max_overlap_len = longest_common_subsequence(input_words, source_words) | |
| if verbose: | |
| print(f"Max overlap length: {max_overlap_len}") # print overlap length | |
| if max_overlap_len >= min_phrase_sentence_len: | |
| return True | |
| return False | |
| def check_paraphrase(input_text, page_text, url): | |
| """ | |
| Checks if the input text is paraphrased in the content at the given URL. | |
| Args: | |
| input_text: The text to check for paraphrase. | |
| page_text: The text of the web page to compare with. | |
| verbose: If True, print debug information. | |
| Returns: | |
| A tuple containing: | |
| - is_paraphrase: True if the input text is considered a paraphrase, False otherwise. | |
| - paraphrase_results: A list of dictionaries, each containing: | |
| - input_sentence: The sentence from the input text. | |
| - matched_sentence: The corresponding sentence from the web page (if found). | |
| - similarity: The cosine similarity score between the sentences. | |
| - is_paraphrase_sentence: True if the individual sentence pair meets the paraphrase criteria, False otherwise. | |
| """ | |
| is_paraphrase_text = False | |
| if not isinstance(input_text, str) or not isinstance(page_text, str): | |
| return False, [] | |
| # Extract sentences from input text and web page | |
| #input_text = remove_punctuation(input_text) | |
| input_sentences = split_into_sentences(input_text) | |
| if not page_text: | |
| return is_paraphrase_text, [] | |
| #page_text = remove_punctuation(page_text) | |
| page_sentences = split_into_sentences(page_text) | |
| if not input_sentences or not page_sentences: | |
| return is_paraphrase_text, [] | |
| additional_sentences = [] | |
| for sentence in page_sentences: | |
| if ", external" in sentence: | |
| additional_sentences.append(sentence.replace(", external", "")) | |
| page_sentences.extend(additional_sentences) | |
| # min_matching_sentences = math.ceil(len(input_sentences) * MIN_RATIO_PARAPHRASE_NUM) | |
| # Encode sentences into embeddings | |
| embeddings1 = PARAPHASE_MODEL.encode(input_sentences, convert_to_tensor=True, device=DEVICE) | |
| embeddings2 = PARAPHASE_MODEL.encode(page_sentences, convert_to_tensor=True, device=DEVICE) | |
| # Compute cosine similarity matrix | |
| similarity_matrix = util.cos_sim(embeddings1, embeddings2).cpu().numpy() | |
| # Find sentence alignments | |
| alignment = {} | |
| paraphrased_sentence_count = 0 | |
| for i, sentence1 in enumerate(input_sentences): | |
| max_sim_index = np.argmax(similarity_matrix[i]) | |
| max_similarity = similarity_matrix[i][max_sim_index] | |
| is_paraphrase_sentence = max_similarity > PARAPHRASE_THRESHOLD | |
| if is_paraphrase_sentence is False: | |
| alignment = { | |
| "input_sentence": sentence1, | |
| "matched_sentence": "", | |
| "similarity": max_similarity, | |
| "label": "", | |
| "paraphrase": is_paraphrase_sentence, | |
| "url": "", | |
| } | |
| else: | |
| alignment = { | |
| "input_sentence": sentence1, | |
| "matched_sentence": page_sentences[max_sim_index], | |
| "similarity": max_similarity, | |
| "label": "", | |
| "paraphrase": is_paraphrase_sentence, | |
| "url": url, | |
| } | |
| # Check for individual sentence paraphrase if overall paraphrase not yet found | |
| if not is_paraphrase_text and check_sentence( | |
| sentence1, page_sentences[max_sim_index], MIN_SAME_SENTENCE_LEN, MIN_PHRASE_SENTENCE_LEN | |
| ): | |
| is_paraphrase_text = True | |
| #alignment.append(item) | |
| paraphrased_sentence_count += 1 if is_paraphrase_sentence else 0 | |
| # Check if enough sentences are paraphrases | |
| is_paraphrase_text = paraphrased_sentence_count > 0 #min_matching_sentences | |
| return is_paraphrase_text, alignment | |
| def similarity_ratio(a, b): | |
| """ | |
| Calculates the similarity ratio between two strings using SequenceMatcher. | |
| Args: | |
| a: The first string. | |
| b: The second string. | |
| Returns: | |
| A float representing the similarity ratio between 0.0 and 1.0. | |
| Returns 0.0 if either input is None or not a string. | |
| """ | |
| if not isinstance(a, str) or not isinstance(b, str) or a is None or b is None: | |
| return 0.0 # Handle cases where inputs are not strings or None | |
| return SequenceMatcher(None, a, b).ratio() | |
| def check_human(alligned_sentences): | |
| """ | |
| Checks if a sufficient number of input sentences are found within | |
| source sentences. | |
| Returns: | |
| bool: True if the condition is met, False otherwise. | |
| """ | |
| if not alligned_sentences: # Handle empty data case | |
| return False | |
| if alligned_sentences["similarity"] >= 0.99: | |
| return True | |
| return False | |
| if __name__ == '__main__': | |
| pass |