File size: 3,849 Bytes
a31c0b9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f68fadb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import os
import tempfile
from TTS.api import TTS



class TTSTalker():
    def __init__(self) -> None:
        model_name = TTS.list_models()[0]
        self.tts = TTS(model_name)

    def test(self, text, language='en'):

        tempf  = tempfile.NamedTemporaryFile(
                delete = False,
                suffix = ('.'+'wav'),
            )

        self.tts.tts_to_file(text, speaker=self.tts.speakers[0], language=language, file_path=tempf.name)

        return tempf.name
    
import urllib.request
import tempfile
import requests
import json
import time


class TTSTalkerPlayHT():
    def __init__(self) -> None:

        if 0:
            from easydict import EasyDict
            self = EasyDict()
            text = 'hello world'

        self.url = "https://play.ht/api/v1"
        self.headers = {
            'Authorization': 'f35fc9d7ce0549a88f6cdc15ec860b6e',
            'X-User-ID': '96tPb0H2cXbobV9u8iLVGyJPUPc2',
            'Content-Type': 'application/json'
        }
        
    def test(self, text, language='en', **kwargs):
        payload = json.dumps({
            "title": "Testing public api convertion",
            "voice": "en-US-MichelleNeural",
            "content": [text],
        })
        get_url = self.url+f'/convert'
        response = requests.request(
            "POST", 
            get_url, 
            headers=self.headers, 
            data=payload)
        
        if response.status_code == 404:
            print('404')
            return
        
        # transcriptionId 如果成功是马上返回的
        data = json.loads(response.text)
        transcriptionId = data['transcriptionId']
        
        
        
        s_time = time.time()
        while time.time() - s_time < 10:

            if 0:
                get_url = self.url+f'/articleStatus?transcriptionId={transcriptionId}'
                response = requests.get(
                    get_url,
                    headers=self.headers,
                )
            else:
                get_url = self.url+f'/articleStatus'
                response = requests.get(
                    get_url,
                    params={
                        'transcriptionId': transcriptionId
                    },
                    headers=self.headers,
                )
                
            if response.status_code == 404:
                print(response.text)
                print('404')
                return
            
            # articleStatus返回的不一定马上就有audioUrl
            data = json.loads(response.text)
            converted = data['converted']
            if converted != True:
                time.sleep(0.5)
                continue
            
        # articleStatus 表示转换完成
        audioUrl = data['audioUrl']
            
        tempf  = tempfile.NamedTemporaryFile(
                delete = False,
                suffix = ('.'+'mp3'),
            )


        def download_dropbox_url(url, filepath, chunk_size=1024):

            import requests
            headers = {'user-agent': 'Wget/1.16 (linux-gnu)'}
            r = requests.get(url, stream=True, headers=headers)
            with open(filepath, 'wb') as f:
                for chunk in r.iter_content(chunk_size=chunk_size):
                    if chunk:
                        f.write(chunk)
            return filepath


        download_dropbox_url(audioUrl, tempf.name)
        
        # urllib.request.urlretrieve(audioUrl, tempf.name)
        
        # response = requests.get(audioUrl)
        # with open(tempf.name, "wb") as f:
        #     f.write(response.content)
        
        # import subprocess
        # cmd = f'wget -O {tempf.name} {audioUrl}'
        # # ['wget', audioUrl, '-O', tempf.name]
        # subprocess.call(cmd)
        
        return tempf.name