Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| import torch | |
| import datetime | |
| import requests | |
| from io import BytesIO | |
| from google.cloud import storage | |
| from transformers import AutoImageProcessor, AutoModelForObjectDetection | |
| from label_studio_ml.model import LabelStudioMLBase | |
| from lxml import etree | |
| from uuid import uuid4 | |
| from PIL import Image | |
| from creds import get_credentials | |
| os.environ["GOOGLE_APPLICATION_CREDENTIALS"]= get_credentials() | |
| print("cred set") | |
| def generate_download_signed_url_v4(blob_name): | |
| """Generates a v4 signed URL for downloading a blob. | |
| Note that this method requires a service account key file. You can not use | |
| this if you are using Application Default Credentials from Google Compute | |
| Engine or from the Google Cloud SDK. | |
| """ | |
| bucket_name = os.getenv("bucket") | |
| storage_client = storage.Client() | |
| bucket = storage_client.bucket(bucket_name) | |
| blob = bucket.blob(blob_name) | |
| url = blob.generate_signed_url( | |
| version="v4", | |
| # This URL is valid for 15 minutes | |
| expiration=datetime.timedelta(minutes=15), | |
| # Allow GET requests using this URL. | |
| method="GET", | |
| ) | |
| print("Generated GET signed URL:") | |
| print(url) | |
| print("You can use this URL with any user agent, for example:") | |
| print(f"curl '{url}'") | |
| return url | |
| class Model(LabelStudioMLBase): | |
| image_processor = AutoImageProcessor.from_pretrained("diegokauer/conditional-detr-coe-int") | |
| model = AutoModelForObjectDetection.from_pretrained("diegokauer/conditional-detr-coe-int") | |
| def predict(self, tasks, **kwargs): | |
| """ This is where inference happens: model returns | |
| the list of predictions based on input list of tasks | |
| """ | |
| predictions = [] | |
| for task in tasks: | |
| url = task["data"]["image"] | |
| response = requests.get(generate_download_signed_url_v4(url)) | |
| image = Image.open(BytesIO(response.content)) | |
| original_width, original_height = image.size | |
| with torch.no_grad(): | |
| inputs = image_processor(images=image, return_tensors="pt") | |
| outputs = model(**inputs) | |
| target_sizes = torch.tensor([image.size[::-1]]) | |
| results = image_processor.post_process_object_detection(outputs, threshold=0.5, target_sizes=target_sizes)[0] | |
| result_list = [] | |
| for score, label, box in zip(results['scores'], results['labels'], results['boxes']): | |
| label_id = str(uuid4())[:4] | |
| x, y, x2, y2 = tuple(box) | |
| result_list.append({ | |
| 'id': label_id, | |
| 'original_width': original_width, | |
| 'original_height': original_height, | |
| 'from_name': "label", | |
| 'to_name': "image", | |
| 'type': 'labels', | |
| 'score': score, # per-region score, visible in the editor | |
| 'value': { | |
| 'x': x, | |
| 'y': y, | |
| 'width': x2-x, | |
| 'height': y2-y, | |
| 'rotation': 0, | |
| 'labels': [self.id2label[label]] | |
| } | |
| }) | |
| predictions.append({ | |
| 'score': results['scores'].mean(), # prediction overall score, visible in the data manager columns | |
| 'model_version': 'diegokauer/conditional-detr-coe-int', # all predictions will be differentiated by model version | |
| 'result': result_list | |
| }) | |
| return predictions | |
| def fit(self, event, annotations, **kwargs): | |
| """ This is where training happens: train your model given list of annotations, | |
| then returns dict with created links and resources | |
| """ | |
| return {'path/to/created/model': 'my/model.bin'} |