WebRTC VAD封装
March 5, 2021, 4:51 p.m.
read: 2531
import webrtcvad
import contextlib
import wave
import numpy as np
from scipy.io import wavfile
import sys
import collections
import numpy as np
import os
vad = webrtcvad.Vad()
vad.set_mode(1)
sample_rate = 16000
def read_wave(path):
"""Reads a .wav file.
Takes the path, and returns (PCM audio data, sample rate).
"""
with contextlib.closing(wave.open(path, 'rb')) as wf:
num_channels = wf.getnchannels()
assert num_channels == 1
sample_width = wf.getsampwidth()
assert sample_width == 2
sample_rate = wf.getframerate()
assert sample_rate in (8000, 16000, 32000, 48000)
pcm_data = wf.readframes(wf.getnframes())
return pcm_data, sample_rate
class Frame(object):
"""Represents a "frame" of audio data."""
def __init__(self, bytes, timestamp, duration):
self.bytes = bytes
self.timestamp = timestamp
self.duration = duration
def frame_generator(frame_duration_ms, audio, sample_rate):
"""Generates audio frames from PCM audio data.
Takes the desired frame duration in milliseconds, the PCM data, and
the sample rate.
Yields Frames of the requested duration.
"""
n = int(sample_rate * (frame_duration_ms / 1000.0) * 2)
offset = 0
timestamp = 0.0
duration = (float(n) / sample_rate) / 2.0
while offset + n < len(audio):
yield Frame(audio[offset:offset + n], timestamp, duration)
timestamp += duration
offset += n
def vad_collector(sample_rate, frame_duration_ms,
padding_duration_ms, vad, frames):
num_padding_frames = int(padding_duration_ms / frame_duration_ms)
ring_buffer = collections.deque(maxlen=num_padding_frames)
triggered = False
voiced_frames = []
# is_speech
is_speech_list = []
is_speech_list2 = []
for frame_idx, frame in enumerate(frames):
is_speech = vad.is_speech(frame.bytes, sample_rate)
is_speech_list.append(1 if is_speech else 0)
# sys.stdout.write('1' if is_speech else '0')
if not triggered:
ring_buffer.append((frame, is_speech, frame_idx))
num_voiced = len([f for f, speech, f_idx in ring_buffer if speech])
# If we're NOTTRIGGERED and more than 90% of the frames in
# the ring buffer are voiced frames, then enter the
# TRIGGERED state.
if num_voiced > 0.9 * ring_buffer.maxlen:
triggered = True
for f, s, i in ring_buffer:
voiced_frames.append(i)
ring_buffer.clear()
else:
voiced_frames.append(frame_idx)
ring_buffer.append((frame, is_speech))
num_unvoiced = len([f for f, speech in ring_buffer if not speech])
if num_unvoiced > 0.9 * ring_buffer.maxlen:
triggered = False
# yield voiced_frames
is_speech_list2 += voiced_frames
ring_buffer.clear()
voiced_frames = []
is_speech_list2 += voiced_frames
is_speech_list3 = np.zeros(len(is_speech_list), dtype=np.int32)
for idx in is_speech_list2:
is_speech_list3[idx] = 1
return np.array(is_speech_list), is_speech_list3
# return ''.join(is_speech_list), ''.join(is_speech_list3.astype('str'))
def one_runs(a):
# Create an array that is 1 where a is 0, and pad each end with an extra 0.
iszero = np.concatenate(([0], np.equal(a, 1).view(np.int8), [0]))
absdiff = np.abs(np.diff(iszero))
# Runs start and end where absdiff is 1.
ranges = np.where(absdiff == 1)[0].reshape(-1, 2)
return ranges
def gen_target_file_list(target_dir, target_ext='.wav'):
l = []
for root, dirs, files in os.walk(target_dir, followlinks=True):
for f in files:
f = os.path.join(root, f)
ext = os.path.splitext(f)[1]
ext = ext.lower()
if ext == target_ext and '._' not in f:
l.append(f)
return l
if __name__ == '__main__':
wav_path = ''
sr, sig = wavfile.read(wav_path)
pcm_data = sig.tobytes()
frames = frame_generator(10, pcm_data, 16000)
list1, list2 = vad_collector(sample_rate, 10, 30, vad, frames)
npy_path ='vad.npy'
np.save(npy_path, list1)