File size: 3,379 Bytes
55d70df
3bfc5ac
55d70df
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import json
import os
import spacy
from spacy.tokens import DocBin

def read_in_chunks(file_path, chunk_size=1024):
    """Read file in chunks to handle large files."""
    print(f"Reading file: {file_path}")
    if not os.path.exists(file_path):
        print(f"Error: File not found at {file_path}")
        return

    with open(file_path, 'r', encoding='utf-8') as file:
        while True:
            data = file.read(chunk_size)
            if not data:
                break
            yield data

def extract_text_and_entities(item):
    """Dynamically extract text and entities, handling multiple JSON formats."""
    print(f"Processing item: {item}")
    if isinstance(item, dict):
        # Dictionary structure: {"text": ..., "entities": ...}
        text = item.get("text", "")
        entities = item.get("entities", [])
    elif isinstance(item, list) and len(item) >= 2:
        # List structure: ["text", {"entities": ...}]
        text = item[0] if isinstance(item[0], str) else ""
        entities = item[1].get("entities", []) if isinstance(item[1], dict) else []
    else:
        print(f"Unexpected item format: {item}")
        return None, []  # Return empty text and entities

    valid_entities = [
        (start, end, label) for start, end, label in entities
        if isinstance(start, int) and isinstance(end, int) and isinstance(label, str)
    ]
    return text, valid_entities

def convert_json_to_spacy(json_file_path, spacy_file_path):
    """Convert JSON data to spaCy format and save as .spacy file."""
    try:
        print(f"Reading JSON from: {json_file_path}")
        file_content = "".join(chunk for chunk in read_in_chunks(json_file_path))

        data = json.loads(file_content)  # Parse JSON data
        print(f"Successfully loaded JSON data. Found {len(data)} items.")

        spacy_format = []
        for item in data:
            text, entities = extract_text_and_entities(item)
            if text:  # Skip if text is empty or invalid
                spacy_format.append({"text": text, "entities": entities})

        # Create a blank spaCy model
        nlp = spacy.blank("en")
        doc_bin = DocBin()

        for entry in spacy_format:
            print(f"Creating spaCy Doc for text: {entry['text']}")
            doc = nlp.make_doc(entry["text"])
            entities = []
            seen_positions = set()

            for start, end, label in entry["entities"]:
                if start < 0 or end > len(doc.text) or start >= end:
                    print(f"Invalid span: start={start}, end={end}, label={label}")
                    continue
                if not any(start < e_end and end > e_start for e_start, e_end, _ in seen_positions):
                    span = doc.char_span(start, end, label=label)
                    if span is not None:
                        entities.append(span)
                        seen_positions.add((start, end, label))
                else:
                    print(f"Overlapping span: start={start}, end={end}, label={label}")

            doc.ents = entities
            doc_bin.add(doc)

        doc_bin.to_disk(spacy_file_path)
        print(f"Data has been successfully saved to {spacy_file_path}!")

    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")