File size: 3,864 Bytes
08fa61a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from keras._tf_keras.keras.models import load_model
import keras
import warnings
import traceback
import cv2
import sys
import tensorflow as tf
import numpy as np
import exceptions
import os
from PIL import Image
from exceptions.NotFaceError import NotFaceError
from inference_sdk import InferenceHTTPClient
from transformers import pipeline, SegformerForSemanticSegmentation, SegformerImageProcessor, SegformerFeatureExtractor

def warning_with_traceback(message, category, filename, lineno, file=None, line=None):
    log = file if hasattr(file,'write') else sys.stderr
    traceback.print_stack(file=log)
    log.write(warnings.formatwarning(message, category, filename, lineno, line))
    
warnings.showwarning = warning_with_traceback


@keras.saving.register_keras_serializable()
class CustomPreprocessingLayer(tf.keras.layers.Layer):
    def __init__(self, input_shape, **kwargs):
        self.input_shape = input_shape
        super(CustomPreprocessingLayer, self).__init__(**kwargs)
        
    def build(self, input_shape):
        pass  # No trainable weights to build

    def call(self, image_matrix):
        image = tf.convert_to_tensor(image_matrix, dtype=tf.int32)
        image = tf.image.resize(image, [self.input_shape[0], self.input_shape[1]])
        return image
    def get_config(self):
        config = super(CustomPreprocessingLayer, self).get_config()
        config.update({'input_shape': self.input_shape})
        return config
    
    @classmethod
    def from_config(cls, config):
        return cls(**config)

class FaceClassifierModel:
    def __init__(self,  client:InferenceHTTPClient, image_size=224, batcb_size=16):
        self.model = load_model("./models/efficientnet_face_detection.h5")
        self.image_size = image_size
        self.batch_size = batcb_size
        self.seed = 42
        self.client = client
    
    async def classify(self, image_bytes: str, confidence_threshold=0.5):
        tf.random.set_seed(self.seed)
        nparr = np.frombuffer(image_bytes, np.uint8)
        
        # Dekode array NumPy menjadi citra OpenCV
        image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
        image = cv2.resize(image, [self.image_size, self.image_size])
        image_expanded = tf.expand_dims(image,axis=0)
        image_batch = tf.data.Dataset.from_tensor_slices(image_expanded).batch(self.batch_size)
        pred = self.model.predict(image_batch)
        if pred[0][0] <= confidence_threshold:
            raise NotFaceError("Ini bukan wajah")
        
        # lanjut klasifikasi muka
        result = await self.client.infer_async(image, model_id="skinclassification-kyxvj/1")
        result["face_confidence"] = float(pred[0][0])
        return result
    
    
class FaceSegmentationModel:
    def __init__(self):
        model_checkpoint = os.path.join("models","segformer-b0-finetuned-segments-skin-outputs", "checkpoint-1640")
        self.model = SegformerForSemanticSegmentation.from_pretrained(model_checkpoint, local_files_only=True)
        self.image_processor = SegformerImageProcessor.from_pretrained(model_checkpoint, local_files_only=True)
        self.pipeline = pipeline("image-segmentation", model=self.model, image_processor=self.image_processor)
    
    def infer(self, image:Image.Image):
        '''
        Infer the input image. it will return list of {'score', 'label', and 'mask'}
        
        Example:
            [{'score': None,
            'label': 'background',
            'mask': <PIL.Image.Image image mode=L size=500x500>},
            {'score': None,
            'label': 'acne',
            'mask': <PIL.Image.Image image mode=L size=500x500>},
            {'score': None,
            'label': 'dry',
            'mask': <PIL.Image.Image image mode=L size=500x500>}]
        '''
        results = self.pipeline(image)
        return results