镜像自地址
https://github.com/binary-husky/gpt_academic.git
已同步 2025-12-06 06:26:47 +00:00
error
这个提交包含在:
@@ -0,0 +1,81 @@
|
||||
import time, threading
|
||||
|
||||
|
||||
class AliyunASR():
|
||||
def __init__(self):
|
||||
self.event_on_result_chg = threading.Event()
|
||||
self.event_on_entence_end = threading.Event()
|
||||
|
||||
def test_on_sentence_begin(self, message, *args):
|
||||
print("test_on_sentence_begin:{}".format(message))
|
||||
|
||||
def test_on_sentence_end(self, message, *args):
|
||||
print("test_on_sentence_end:{}".format(message))
|
||||
self.event_on_entence_end.set()
|
||||
|
||||
def test_on_start(self, message, *args):
|
||||
print("test_on_start:{}".format(message))
|
||||
|
||||
def test_on_error(self, message, *args):
|
||||
print("on_error args=>{}".format(args))
|
||||
|
||||
def test_on_close(self, *args):
|
||||
print("on_close: args=>{}".format(args))
|
||||
|
||||
def test_on_result_chg(self, message, *args):
|
||||
print("test_on_chg:{}".format(message))
|
||||
self.parsed_text = message['payload']['result']
|
||||
self.event_on_result_chg.set()
|
||||
|
||||
def test_on_completed(self, message, *args):
|
||||
print("on_completed:args=>{} message=>{}".format(args, message))
|
||||
|
||||
def audio_convertion_thread(self, uuid):
|
||||
# 在一个异步线程中采集音频
|
||||
import nls # pip install git+https://github.com/aliyun/alibabacloud-nls-python-sdk.git
|
||||
from scipy import io
|
||||
from .audio_io import change_sample_rate
|
||||
NEW_SAMPLERATE = 16000
|
||||
from .audio_io import RealtimeAudioDistribution
|
||||
rad = RealtimeAudioDistribution()
|
||||
import tempfile
|
||||
temp_folder = tempfile.gettempdir()
|
||||
|
||||
URL="wss://nls-gateway.cn-shanghai.aliyuncs.com/ws/v1"
|
||||
TOKEN="f37f30e0f9934c34a992f6f64f7eba4f" # 参考https://help.aliyun.com/document_detail/450255.html获取token
|
||||
APPKEY="RoPlZrM88DnAFkZK" # 获取Appkey请前往控制台:https://nls-portal.console.aliyun.com/applist
|
||||
sr = nls.NlsSpeechTranscriber(
|
||||
url=URL,
|
||||
token=TOKEN,
|
||||
appkey=APPKEY,
|
||||
on_sentence_begin=self.test_on_sentence_begin,
|
||||
on_sentence_end=self.test_on_sentence_end,
|
||||
on_start=self.test_on_start,
|
||||
on_result_changed=self.test_on_result_chg,
|
||||
on_completed=self.test_on_completed,
|
||||
on_error=self.test_on_error,
|
||||
on_close=self.test_on_close,
|
||||
callback_args=[uuid.hex]
|
||||
)
|
||||
|
||||
r = sr.start(aformat="pcm",
|
||||
enable_intermediate_result=True,
|
||||
enable_punctuation_prediction=True,
|
||||
enable_inverse_text_normalization=True)
|
||||
|
||||
while not self.stop:
|
||||
# time.sleep(self.capture_interval)
|
||||
audio = rad.read(uuid.hex)
|
||||
if audio is not None:
|
||||
# convert to pcm file
|
||||
temp_file = f'{temp_folder}/{uuid.hex}.pcm' #
|
||||
dsdata = change_sample_rate(audio, rad.rate, NEW_SAMPLERATE) # 48000 --> 16000
|
||||
io.wavfile.write(temp_file, NEW_SAMPLERATE, dsdata)
|
||||
# read pcm binary
|
||||
with open(temp_file, "rb") as f: data = f.read()
|
||||
print('audio len:', len(audio), '\t ds len:', len(dsdata), '\t need n send:', len(data)//640)
|
||||
slices = zip(*(iter(data),) * 640) # 640个字节为一组
|
||||
for i in slices: sr.send_audio(bytes(i))
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
r = sr.stop()
|
||||
@@ -1,4 +1,5 @@
|
||||
import numpy as np
|
||||
from scipy import interpolate
|
||||
|
||||
def Singleton(cls):
|
||||
_instance = {}
|
||||
@@ -15,12 +16,12 @@ def Singleton(cls):
|
||||
class RealtimeAudioDistribution():
|
||||
def __init__(self) -> None:
|
||||
self.data = {}
|
||||
self.max_len = 1024*64
|
||||
self.max_len = 1024*1024
|
||||
self.rate = 48000 # 只读,每秒采样数量
|
||||
|
||||
def feed(self, uuid, audio):
|
||||
print('feed')
|
||||
self.rate, audio_ = audio
|
||||
print('feed', len(audio_), audio_[-25:])
|
||||
if uuid not in self.data:
|
||||
self.data[uuid] = audio_
|
||||
else:
|
||||
@@ -31,7 +32,17 @@ class RealtimeAudioDistribution():
|
||||
def read(self, uuid):
|
||||
if uuid in self.data:
|
||||
res = self.data.pop(uuid)
|
||||
print('read', len(res))
|
||||
print('read', len(res), res)
|
||||
else:
|
||||
res = None
|
||||
return res
|
||||
return res
|
||||
|
||||
def change_sample_rate(audio, old_sr, new_sr):
|
||||
duration = audio.shape[0] / old_sr
|
||||
|
||||
time_old = np.linspace(0, duration, audio.shape[0])
|
||||
time_new = np.linspace(0, duration, int(audio.shape[0] * new_sr / old_sr))
|
||||
|
||||
interpolator = interpolate.interp1d(time_old, audio.T)
|
||||
new_audio = interpolator(time_new).T
|
||||
return new_audio.astype(np.int16)
|
||||
在新工单中引用
屏蔽一个用户