import websocket import datetime import hashlib import base64 import hmac import json from urllib.parse import urlencode import time import ssl from wsgiref.handlers import format_date_time from datetime import datetime from time import mktime import _thread as thread import os from chatglm6b_example import * import pyaudio import wave import sys import numpy as np import vlc STATUS_FIRST_FRAME = 0 # 第一帧的标识 STATUS_CONTINUE_FRAME = 1 # 中间帧标识 STATUS_LAST_FRAME = 2 # 最后一帧的标识 class Ws_Param(object): # 初始化 def __init__(self, APPID, APIKey, APISecret, Text): self.APPID = APPID self.APIKey = APIKey self.APISecret = APISecret self.Text = Text # 公共参数(common) self.CommonArgs = {"app_id": self.APPID} # 业务参数(business),更多个性化参数可在官网查看 # self.BusinessArgs = { # "aue": "raw", "auf": "audio/L16;rate=16000", "vcn": "xiaoyan", "tte": "utf8"} self.BusinessArgs = { "aue": "lame", "sfl": 1, "auf": "audio/L16;rate=16000", "vcn": "aisjiuxu", "tte": "utf8"} self.Data = {"status": 2, "text": str( base64.b64encode(self.Text.encode('utf-8')), "UTF8")} # 使用小语种须使用以下方式,此处的unicode指的是 utf16小端的编码方式,即"UTF-16LE"” # self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-16')), "UTF8")} # 生成url def create_url(self): url = 'wss://tts-api.xfyun.cn/v2/tts' # 生成RFC1123格式的时间戳 now = datetime.now() date = format_date_time(mktime(now.timetuple())) # 拼接字符串 signature_origin = "host: " + "ws-api.xfyun.cn" + "\n" signature_origin += "date: " + date + "\n" signature_origin += "GET " + "/v2/tts " + "HTTP/1.1" # 进行hmac-sha256进行加密 signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_sha = base64.b64encode( signature_sha).decode(encoding='utf-8') authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % ( self.APIKey, "hmac-sha256", "host date request-line", signature_sha) authorization = base64.b64encode( authorization_origin.encode('utf-8')).decode(encoding='utf-8') # 将请求的鉴权参数组合为字典 v = { "authorization": authorization, "date": date, "host": "ws-api.xfyun.cn" } # 拼接鉴权参数,生成url url = url + '?' + urlencode(v) # print("date: ", date) # print("v: ", v) # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致 # print('websocket url :', url) return url def on_message(ws, message): try: message = json.loads(message) code = message["code"] sid = message["sid"] audio = message["data"]["audio"] audio = base64.b64decode(audio) status = message["data"]["status"] # print(message) # if status == 2: # print("ws is closed") # ws.close() if code != 0: errMsg = message["message"] # print("sid:%s call error:%s code is:%s" % (sid, errMsg, code)) else: if status == 2: # print("ws is closed") ws.close() # with open('./demo.pcm', 'ab') as f: # f.write(audio) with open('./demo.mp3', 'ab') as f: f.write(audio) # with wave.open("temp.wav", "wb") as wf: # wf.setparams((1, 2, 16000, 0, 'NONE', 'NONE')) # wf.writeframes(audio) except Exception as e: print("receive msg,but parse exception:", e) # 收到websocket错误的处理 def on_error(ws, error): print("### error:", error) # 收到websocket关闭的处理 def on_close(ws): print("### closed ###") # 收到websocket连接建立的处理 def on_open(ws): def run(*args): d = {"common": wsParam.CommonArgs, "business": wsParam.BusinessArgs, "data": wsParam.Data, } d = json.dumps(d) ws.send(d) if os.path.exists('./demo.mp3'): os.remove('./demo.mp3') thread.start_new_thread(run, ()) class Ws_Param_stt(object): # 初始化 def __init__(self, APPID, APIKey, APISecret, AudioFile): self.APPID = APPID self.APIKey = APIKey self.APISecret = APISecret self.AudioFile = AudioFile # 公共参数(common) self.CommonArgs = {"app_id": self.APPID} # 业务参数(business),更多个性化参数可在官网查看 self.BusinessArgs = {"domain": "iat", "language": "zh_cn", "accent": "mandarin", "vinfo":1,"vad_eos":10000} # 生成url def create_url(self): url = 'wss://ws-api.xfyun.cn/v2/iat' # 生成RFC1123格式的时间戳 now = datetime.now() date = format_date_time(mktime(now.timetuple())) # 拼接字符串 signature_origin = "host: " + "ws-api.xfyun.cn" + "\n" signature_origin += "date: " + date + "\n" signature_origin += "GET " + "/v2/iat " + "HTTP/1.1" # 进行hmac-sha256进行加密 signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8') authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % ( self.APIKey, "hmac-sha256", "host date request-line", signature_sha) authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') # 将请求的鉴权参数组合为字典 v = { "authorization": authorization, "date": date, "host": "ws-api.xfyun.cn" } # 拼接鉴权参数,生成url url = url + '?' + urlencode(v) # print("date: ",date) # print("v: ",v) # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致 # print('websocket url :', url) return url # 收到websocket消息的处理 def on_message_stt(ws, message): try: code = json.loads(message)["code"] sid = json.loads(message)["sid"] if code != 0: errMsg = json.loads(message)["message"] print("sid:%s call error:%s code is:%s" % (sid, errMsg, code)) else: data = json.loads(message)["data"]["result"]["ws"] #print(data) #print(json.loads(message)) result = "" for i in data: for w in i["cw"]: result += w["w"] #print("sid:%s call success!,data is:%s" % (sid, json.dumps(data, ensure_ascii=False))) for word in data: #print(word['cw'][0]['w']) with open("text.txt","a") as f: f.write(word['cw'][0]['w']) except Exception as e: print("receive msg,but parse exception:", e) # 收到websocket错误的处理 def on_error_stt(ws, error): print("### error:", error) # 收到websocket关闭的处理 def on_close_stt(ws,a,b): print("### closed ###") with open("text.txt","r") as f: print(f.read()) # 收到websocket连接建立的处理 def on_open_stt(ws): def run(*args): frameSize = 8000 # 每一帧的音频大小 intervel = 0.04 # 发送音频间隔(单位:s) status = STATUS_FIRST_FRAME # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧 with open("text.txt","w") as f: f.write("") with open(wsParam_stt.AudioFile, "rb") as fp: frames=0 while True: buf = fp.read(frameSize) # 文件结束 if not buf: status = STATUS_LAST_FRAME # 第一帧处理 # 发送第一帧音频,带business 参数 # appid 必须带上,只需第一帧发送 if status == STATUS_FIRST_FRAME: d = {"common": wsParam_stt.CommonArgs, "business": wsParam_stt.BusinessArgs, "data": {"status": 0, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}} d = json.dumps(d) ws.send(d) status = STATUS_CONTINUE_FRAME # 中间帧处理 elif status == STATUS_CONTINUE_FRAME: d = {"data": {"status": 1, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}} ws.send(json.dumps(d)) # 最后一帧处理 elif status == STATUS_LAST_FRAME: d = {"data": {"status": 2, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}} ws.send(json.dumps(d)) time.sleep(1) break # 模拟音频采样间隔 time.sleep(intervel) frames+=1 print(frames) ws.close() thread.start_new_thread(run, ()) def speech2text(wsParam): websocket.enableTrace(False) wsUrl = wsParam.create_url() ws = websocket.WebSocketApp(wsUrl, on_message=on_message_stt, on_error=on_error_stt, on_close=on_close_stt) ws.on_open = on_open_stt ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) with open("text.txt","r") as f: text=f.read() return text if __name__ == "__main__": # 测试时候在此处正确填写相关信息即可运行 CHUNK = 512 FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 16000 RECORD_SECONDS = 5 WAVE_OUTPUT_FILENAME = "output.wav" p = pyaudio.PyAudio() stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) print("recording...") frames = [] for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): data = stream.read(CHUNK) frames.append(data) print("done") stream.stop_stream() stream.close() p.terminate() wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(p.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b''.join(frames)) wf.close() f = open("output.wav",'rb') f.seek(0) f.read(44) data = np.fromfile(f, dtype=np.int16) data.tofile("output.pcm") wsParam_stt = Ws_Param_stt(APPID='44385032', APISecret='YWNkNTZhMTkwZWFiZjhjNjU4MDg2MzY4', APIKey='83f2198f66ed394e7a73deb57c1c1b44', AudioFile=r'./output.pcm') temp=speech2text(wsParam_stt) text = gettxt(temp) print(text) text = text.replace('\n', '') wsParam = Ws_Param(APPID='44385032', APISecret='YWNkNTZhMTkwZWFiZjhjNjU4MDg2MzY4', APIKey='83f2198f66ed394e7a73deb57c1c1b44', Text=text) websocket.enableTrace(False) wsUrl = wsParam.create_url() ws = websocket.WebSocketApp( wsUrl, on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) file = "demo.mp3" p=vlc.MediaPlayer(file) p.play() input("playing") #os.system(file)