修改了原始的speech recogintion库
self.phrase_threshold = 0.3 # minimum seconds of speaking audio before we consider the speaking audio a phrase - values below this are ignored (for filtering out clicks and pops)
self.non_speaking_duration = 0.5 # seconds of non-speaking audio to keep on both sides of the recording
# add
self.pool = Pool(processes=1)
def record(self, source, duration=None, offset=None):
detector.SetSensitivity(",".join(["0.85"] * len(snowboy_hot_word_files)).encode())
detector.SetSensitivity(",".join(["0.6"] * len(snowboy_hot_word_files)).encode())
snowboy_sample_rate = detector.SampleRate()
elapsed_time = 0
resampling_state = None
damping = self.dynamic_energy_adjustment_damping ** seconds_per_buffer # account for different chunk sizes and rates
# # buffers capable of holding 5 seconds of original audio
# five_seconds_buffer_count = int(math.ceil(5 / seconds_per_buffer))
# # buffers capable of holding 0.5 seconds of resampled audio
# half_second_buffer_count = int(math.ceil(0.5 / seconds_per_buffer))
# frames = collections.deque(maxlen=five_seconds_buffer_count)
# resampled_frames = collections.deque(maxlen=half_second_buffer_count)
# # snowboy check interval
# check_interval = 0.05
# last_check = time.time()
while True:
elapsed_time += seconds_per_buffer
if timeout and elapsed_time > timeout:
buffer = source.stream.read(source.CHUNK)
if len(buffer) == 0: break # reached end of the stream
# frames.append(buffer)
# resample audio to the required sample rate
resampled_buffer, resampling_state = audioop.ratecv(buffer, source.SAMPLE_WIDTH, 1, source.SAMPLE_RATE, snowboy_sample_rate, resampling_state)
self.energy_threshold = self.energy_threshold * damping + target_energy * (1 - damping)
return b"", elapsed_time
# resampled_frames.append(resampled_buffer)
# if time.time() - last_check > check_interval:
# # run Snowboy on the resampled audio
# snowboy_result = detector.RunDetection(b"".join(resampled_frames))
# assert snowboy_result != -1, "Error initializing streams or reading audio data"
# if snowboy_result > 0: break # wake word found
# resampled_frames.clear()
# last_check = time.time()
# return b"".join(frames), elapsed_time
# def listen(self, source, timeout=None, phrase_time_limit=None, snowboy_configuration=None):
def listen(self, source, timeout=None, phrase_time_limit=None, snowboy_configuration=None, min_time_after_hot_word=3, hot_word_callback=None):
Records a single phrase from ``source`` (an ``AudioSource`` instance) into an ``AudioData`` instance, which it returns.
@ -693,7 +714,9 @@ class Recognizer(AudioSource):
snowboy_location, snowboy_hot_word_files = snowboy_configuration
buffer, delta_time = self.snowboy_wait_for_hot_word(snowboy_location, snowboy_hot_word_files, source, timeout)
elapsed_time += delta_time
# if len(buffer) == 0: break # reached end of the stream
# frames.append(buffer)
# add
if hot_word_callback is not None:
self.pool.apply_async(hot_word_callback, [])
# perform the speech recognition with the keywords file (this is inside the context manager so the file isn;t deleted until we're done)
decoder.set_kws("keywords", f.name)
decoder.process_raw(raw_data, False, True) # process audio data with recognition enabled (no_search = False), as a full utterance (full_utt = True)
decoder.end_utt() # stop utterance processing
elif grammar is not None: # a path to a FSG or JSGF grammar
if not os.path.exists(grammar):
raise ValueError("Grammar '{0}' does not exist.".format(grammar))
fsg = FsgModel(fsg_path, decoder.get_logmath(), 7.5)
decoder.set_fsg(grammar_name, fsg)
decoder.process_raw(raw_data, False, True) # process audio data with recognition enabled (no_search = False), as a full utterance (full_utt = True)
decoder.end_utt() # stop utterance processing
else: # no keywords, perform freeform recognition
decoder.start_utt() # begin utterance processing
decoder.process_raw(raw_data, False, True) # process audio data with recognition enabled (no_search = False), as a full utterance (full_utt = True)
decoder.end_utt() # stop utterance processing
if "transcript" not in best_hypothesis: raise UnknownValueError()
return best_hypothesis["transcript"]
def recognize_google_cn(self, audio_data, key=None, language="en-US", pfilter=0, show_all=False):
Performs speech recognition on ``audio_data`` (an ``AudioData`` instance), using the Google Speech Recognition API.
The Google Speech Recognition API key is specified by ``key``. If not specified, it uses a generic key that works out of the box. This should generally be used for personal or testing purposes only, as it **may be revoked by Google at any time**.
To obtain your own API key, simply following the steps on the `API Keys <http://www.chromium.org/developers/how-tos/api-keys>`__ page at the Chromium Developers site. In the Google Developers Console, Google Speech Recognition is listed as "Speech API".
The recognition language is determined by ``language``, an RFC5646 language tag like ``"en-US"`` (US English) or ``"fr-FR"`` (International French), defaulting to US English. A list of supported language tags can be found in this `StackOverflow answer <http://stackoverflow.com/a/14302134>`__.
The profanity filter level can be adjusted with ``pfilter``: 0 - No filter, 1 - Only shows the first character and replaces the rest with asterisks. The default is level 0.
Returns the most likely transcription if ``show_all`` is false (the default). Otherwise, returns the raw API response as a JSON dictionary.
Raises a ``speech_recognition.UnknownValueError`` exception if the speech is unintelligible. Raises a ``speech_recognition.RequestError`` exception if the speech recognition operation failed, if the key isn't valid, or if there is no internet connection.
assert isinstance(audio_data, AudioData), "``audio_data`` must be audio data"
assert key is None or isinstance(key, str), "``key`` must be ``None`` or a string"
assert isinstance(language, str), "``language`` must be a string"
flac_data = audio_data.get_flac_data(
convert_rate=None if audio_data.sample_rate >= 8000 else 8000, # audio samples must be at least 8 kHz
convert_width=2 # audio samples must be 16-bit
if key is None: key = "AIzaSyBOti4mM-6x9WDnZIjIeyEU21OpBXqWBgw"
url = "http://www.google.cn/speech-api/v2/recognize?{}".format(urlencode({
"client": "chromium",
"lang": language,
"key": key,
"pFilter": pfilter
request = Request(url, data=flac_data, headers={"Content-Type": "audio/x-flac; rate={}".format(audio_data.sample_rate)})
# obtain audio transcription results
response = urlopen(request, timeout=self.operation_timeout)
except HTTPError as e:
raise RequestError("recognition request failed: {}".format(e.reason))
except URLError as e:
raise RequestError("recognition connection failed: {}".format(e.reason))
response_text = response.read().decode("utf-8")
# ignore any blank blocks
actual_result = []
for line in response_text.split("\n"):
if not line: continue
result = json.loads(line)["result"]
if len(result) != 0:
actual_result = result[0]
# return results
if show_all: return actual_result
if not isinstance(actual_result, dict) or len(actual_result.get("alternative", [])) == 0: raise UnknownValueError()
if "confidence" in actual_result["alternative"]:
# return alternative with highest confidence score
best_hypothesis = max(actual_result["alternative"], key=lambda alternative: alternative["confidence"])
# when there is no confidence available, we arbitrarily choose the first hypothesis.
best_hypothesis = actual_result["alternative"][0]
if "transcript" not in best_hypothesis: raise UnknownValueError()
return best_hypothesis["transcript"]
def recognize_google_cloud(self, audio_data, credentials_json=None, language="en-US", preferred_phrases=None, show_all=False):
Performs speech recognition on ``audio_data`` (an ``AudioData`` instance), using the Google Cloud Speech API.
Raises a ``speech_recognition.UnknownValueError`` exception if the speech is unintelligible. Raises a ``speech_recognition.RequestError`` exception if the speech recognition operation failed, if the credentials aren't valid, or if there is no Internet connection.
assert isinstance(audio_data, AudioData), "``audio_data`` must be audio data"
if credentials_json is not None:
try: json.loads(credentials_json)
except Exception: raise AssertionError("``credentials_json`` must be ``None`` or a valid JSON string")
if credentials_json is None:
assert os.environ.get('GOOGLE_APPLICATION_CREDENTIALS') is not None
assert isinstance(language, str), "``language`` must be a string"
assert preferred_phrases is None or all(isinstance(preferred_phrases, (type(""), type(u""))) for preferred_phrases in preferred_phrases), "``preferred_phrases`` must be a list of strings"
# See https://cloud.google.com/speech/reference/rest/v1/RecognitionConfig
import socket
from google.cloud import speech
from google.cloud.speech import enums
from google.cloud.speech import types
from google.api_core.exceptions import GoogleAPICallError
except ImportError:
raise RequestError('missing google-cloud-speech module: ensure that google-cloud-speech is set up correctly.')
if credentials_json is not None:
client = speech.SpeechClient.from_service_account_json(credentials_json)
client = speech.SpeechClient()
flac_data = audio_data.get_flac_data(
convert_rate=None if 8000 <= audio_data.sample_rate <= 48000 else max(8000, min(audio_data.sample_rate, 48000)), # audio sample rate must be between 8 kHz and 48 kHz inclusive - clamp sample rate into this range
convert_width=2 # audio samples must be 16-bit
audio = types.RecognitionAudio(content=flac_data)
from oauth2client.client import GoogleCredentials
from googleapiclient.discovery import build
import googleapiclient.errors
# cannot simply use 'http = httplib2.Http(timeout=self.operation_timeout)'
# because discovery.build() says 'Arguments http and credentials are mutually exclusive'
import socket
import googleapiclient.http
if self.operation_timeout and socket.getdefaulttimeout() is None:
# override constant (used by googleapiclient.http.build_http())
googleapiclient.http.DEFAULT_HTTP_TIMEOUT_SEC = self.operation_timeout
if credentials_json is None:
api_credentials = GoogleCredentials.get_application_default()
# the credentials can only be read from a file, so we'll make a temp file and write in the contents to work around that
with PortableNamedTemporaryFile("w") as f:
api_credentials = GoogleCredentials.from_stream(f.name)
speech_service = build("speech", "v1", credentials=api_credentials, cache_discovery=False)
except ImportError:
raise RequestError("missing google-api-python-client module: ensure that google-api-python-client is set up correctly.")
speech_config = {"encoding": "FLAC", "sampleRateHertz": audio_data.sample_rate, "languageCode": language}
config = {
'encoding': enums.RecognitionConfig.AudioEncoding.FLAC,
'sample_rate_hertz': audio_data.sample_rate,
'language_code': language
if preferred_phrases is not None:
speech_config["speechContexts"] = [{"phrases": preferred_phrases}]
config['speechContexts'] = [types.SpeechContext(
if show_all:
speech_config["enableWordTimeOffsets"] = True # some useful extra options for when we want all the output
request = speech_service.speech().recognize(body={"audio": {"content": base64.b64encode(flac_data).decode("utf8")}, "config": speech_config})
config['enableWordTimeOffsets'] = True # some useful extra options for when we want all the output
opts = {}
if self.operation_timeout and socket.getdefaulttimeout() is None:
opts['timeout'] = self.operation_timeout
config = types.RecognitionConfig(**config)
response = request.execute()
except googleapiclient.errors.HttpError as e:
response = client.recognize(config, audio, **opts)
except GoogleAPICallError as e:
raise RequestError(e)
except URLError as e:
raise RequestError("recognition connection failed: {0}".format(e.reason))
if show_all: return response
if "results" not in response or len(response["results"]) == 0: raise UnknownValueError()
transcript = ""
for result in response["results"]:
transcript += result["alternatives"][0]["transcript"].strip() + " "
if len(response.results) == 0: raise UnknownValueError()
transcript = ''
for result in response.results:
transcript += result.alternatives[0].transcript.strip() + ' '
return transcript
def recognize_wit(self, audio_data, key, show_all=False):
if "_text" not in result or result["_text"] is None: raise UnknownValueError()
return result["_text"]
def recognize_azure(self, audio_data, key, language="en-US", result_format="simple", profanity="masked", location="westus", show_all=False):
Performs speech recognition on ``audio_data`` (an ``AudioData`` instance), using the Microsoft Azure Speech API.
The Microsoft Azure Speech API key is specified by ``key``. Unfortunately, these are not available without `signing up for an account <https://azure.microsoft.com/en-ca/pricing/details/cognitive-services/speech-api/>`__ with Microsoft Azure.
To get the API key, go to the `Microsoft Azure Portal Resources <https://portal.azure.com/>`__ page, go to "All Resources" > "Add" > "See All" > Search "Speech > "Create", and fill in the form to make a "Speech" resource. On the resulting page (which is also accessible from the "All Resources" page in the Azure Portal), go to the "Show Access Keys" page, which will have two API keys, either of which can be used for the `key` parameter. Microsoft Azure Speech API keys are 32-character lowercase hexadecimal strings.
The recognition language is determined by ``language``, a BCP-47 language tag like ``"en-US"`` (US English) or ``"fr-FR"`` (International French), defaulting to US English. A list of supported language values can be found in the `API documentation <https://docs.microsoft.com/en-us/azure/cognitive-services/speech/api-reference-rest/bingvoicerecognition#recognition-language>`__ under "Interactive and dictation mode".
Returns the most likely transcription if ``show_all`` is false (the default). Otherwise, returns the `raw API response <https://docs.microsoft.com/en-us/azure/cognitive-services/speech/api-reference-rest/bingvoicerecognition#sample-responses>`__ as a JSON dictionary.
Raises a ``speech_recognition.UnknownValueError`` exception if the speech is unintelligible. Raises a ``speech_recognition.RequestError`` exception if the speech recognition operation failed, if the key isn't valid, or if there is no internet connection.
assert isinstance(audio_data, AudioData), "Data must be audio data"
assert isinstance(key, str), "``key`` must be a string"
assert isinstance(result_format, str), "``format`` must be a string"
assert isinstance(language, str), "``language`` must be a string"
access_token, expire_time = getattr(self, "azure_cached_access_token", None), getattr(self, "azure_cached_access_token_expiry", None)
allow_caching = True
from time import monotonic # we need monotonic time to avoid being affected by system clock changes, but this is only available in Python 3.3+
except ImportError:
from monotonic import monotonic # use time.monotonic backport for Python 2 if available (from https://pypi.python.org/pypi/monotonic)
except (ImportError, RuntimeError):
expire_time = None # monotonic time not available, don't cache access tokens
allow_caching = False # don't allow caching, since monotonic time isn't available
if expire_time is None or monotonic() > expire_time: # caching not enabled, first credential request, or the access token from the previous one expired
# get an access token using OAuth
credential_url = "https://" + location + ".api.cognitive.microsoft.com/sts/v1.0/issueToken"
credential_request = Request(credential_url, data=b"", headers={
"Content-type": "application/x-www-form-urlencoded",
"Content-Length": "0",
"Ocp-Apim-Subscription-Key": key,
if allow_caching:
start_time = monotonic()
credential_response = urlopen(credential_request, timeout=60) # credential response can take longer, use longer timeout instead of default one
except HTTPError as e:
raise RequestError("credential request failed: {}".format(e.reason))
except URLError as e:
raise RequestError("credential connection failed: {}".format(e.reason))
access_token = credential_response.read().decode("utf-8")
if allow_caching:
# save the token for the duration it is valid for
self.azure_cached_access_token = access_token
self.azure_cached_access_token_expiry = start_time + 600 # according to https://docs.microsoft.com/en-us/azure/cognitive-services/Speech-Service/rest-apis#authentication, the token expires in exactly 10 minutes
wav_data = audio_data.get_wav_data(
convert_rate=16000, # audio samples must be 8kHz or 16 kHz
convert_width=2 # audio samples should be 16-bit
url = "https://" + location + ".stt.speech.microsoft.com/speech/recognition/conversation/cognitiveservices/v1?{}".format(urlencode({
"language": language,
"format": result_format,
"profanity": profanity
if sys.version_info >= (3, 6): # chunked-transfer requests are only supported in the standard library as of Python 3.6+, use it if possible
request = Request(url, data=io.BytesIO(wav_data), headers={
"Authorization": "Bearer {}".format(access_token),
"Content-type": "audio/wav; codec=\"audio/pcm\"; samplerate=16000",
"Transfer-Encoding": "chunked",
else: # fall back on manually formatting the POST body as a chunked request
ascii_hex_data_length = "{:X}".format(len(wav_data)).encode("utf-8")
chunked_transfer_encoding_data = ascii_hex_data_length + b"\r\n" + wav_data + b"\r\n0\r\n\r\n"
request = Request(url, data=chunked_transfer_encoding_data, headers={
"Authorization": "Bearer {}".format(access_token),
"Content-type": "audio/wav; codec=\"audio/pcm\"; samplerate=16000",
"Transfer-Encoding": "chunked",
response = urlopen(request, timeout=self.operation_timeout)
except HTTPError as e:
raise RequestError("recognition request failed: {}".format(e.reason))
except URLError as e:
raise RequestError("recognition connection failed: {}".format(e.reason))
response_text = response.read().decode("utf-8")
result = json.loads(response_text)
# return results
if show_all: return result
if "RecognitionStatus" not in result or result["RecognitionStatus"] != "Success" or "DisplayText" not in result: raise UnknownValueError()
return result["DisplayText"]
def recognize_bing(self, audio_data, key, language="en-US", show_all=False):
Performs speech recognition on ``audio_data`` (an ``AudioData`` instance), using the Microsoft Bing Speech API.
human_string = self.tflabels[node_id]
return human_string
def get_flac_converter():
"""Returns the absolute path of a FLAC converter executable, or raises an OSError if none can be found."""