Spaces:
Sleeping
Sleeping
import json | |
import spacy | |
from spacy.tokens import DocBin | |
def read_in_chunks(file_path, chunk_size=1024): | |
"""Read file in chunks to handle large files.""" | |
print(f"Reading file: {file_path}") | |
if not os.path.exists(file_path): | |
print(f"Error: File not found at {file_path}") | |
return | |
with open(file_path, 'r', encoding='utf-8') as file: | |
while True: | |
data = file.read(chunk_size) | |
if not data: | |
break | |
yield data | |
def extract_text_and_entities(item): | |
"""Dynamically extract text and entities, handling multiple JSON formats.""" | |
print(f"Processing item: {item}") | |
if isinstance(item, dict): | |
# Dictionary structure: {"text": ..., "entities": ...} | |
text = item.get("text", "") | |
entities = item.get("entities", []) | |
elif isinstance(item, list) and len(item) >= 2: | |
# List structure: ["text", {"entities": ...}] | |
text = item[0] if isinstance(item[0], str) else "" | |
entities = item[1].get("entities", []) if isinstance(item[1], dict) else [] | |
else: | |
print(f"Unexpected item format: {item}") | |
return None, [] # Return empty text and entities | |
valid_entities = [ | |
(start, end, label) for start, end, label in entities | |
if isinstance(start, int) and isinstance(end, int) and isinstance(label, str) | |
] | |
return text, valid_entities | |
def convert_json_to_spacy(json_file_path, spacy_file_path): | |
"""Convert JSON data to spaCy format and save as .spacy file.""" | |
try: | |
print(f"Reading JSON from: {json_file_path}") | |
file_content = "".join(chunk for chunk in read_in_chunks(json_file_path)) | |
data = json.loads(file_content) # Parse JSON data | |
print(f"Successfully loaded JSON data. Found {len(data)} items.") | |
spacy_format = [] | |
for item in data: | |
text, entities = extract_text_and_entities(item) | |
if text: # Skip if text is empty or invalid | |
spacy_format.append({"text": text, "entities": entities}) | |
# Create a blank spaCy model | |
nlp = spacy.blank("en") | |
doc_bin = DocBin() | |
for entry in spacy_format: | |
print(f"Creating spaCy Doc for text: {entry['text']}") | |
doc = nlp.make_doc(entry["text"]) | |
entities = [] | |
seen_positions = set() | |
for start, end, label in entry["entities"]: | |
if start < 0 or end > len(doc.text) or start >= end: | |
print(f"Invalid span: start={start}, end={end}, label={label}") | |
continue | |
if not any(start < e_end and end > e_start for e_start, e_end, _ in seen_positions): | |
span = doc.char_span(start, end, label=label) | |
if span is not None: | |
entities.append(span) | |
seen_positions.add((start, end, label)) | |
else: | |
print(f"Overlapping span: start={start}, end={end}, label={label}") | |
doc.ents = entities | |
doc_bin.add(doc) | |
doc_bin.to_disk(spacy_file_path) | |
print(f"Data has been successfully saved to {spacy_file_path}!") | |
except json.JSONDecodeError as e: | |
print(f"Error decoding JSON: {e}") | |
except Exception as e: | |
print(f"An unexpected error occurred: {e}") | |