WebRTC VAD封装

March 5, 2021, 4:51 p.m.

read: 66

import webrtcvad
import contextlib
import wave
import numpy as np
from scipy.io import wavfile
import sys
import collections
import numpy as np
import os

vad = webrtcvad.Vad()
vad.set_mode(1)
sample_rate = 16000

def read_wave(path):
    """Reads a .wav file.
    Takes the path, and returns (PCM audio data, sample rate).
    """
    with contextlib.closing(wave.open(path, 'rb')) as wf:
        num_channels = wf.getnchannels()
        assert num_channels == 1
        sample_width = wf.getsampwidth()
        assert sample_width == 2
        sample_rate = wf.getframerate()
        assert sample_rate in (8000, 16000, 32000, 48000)
        pcm_data = wf.readframes(wf.getnframes())
        return pcm_data, sample_rate


class Frame(object):
    """Represents a "frame" of audio data."""
    def __init__(self, bytes, timestamp, duration):
        self.bytes = bytes
        self.timestamp = timestamp
        self.duration = duration


def frame_generator(frame_duration_ms, audio, sample_rate):
    """Generates audio frames from PCM audio data.
    Takes the desired frame duration in milliseconds, the PCM data, and
    the sample rate.
    Yields Frames of the requested duration.
    """
    n = int(sample_rate * (frame_duration_ms / 1000.0) * 2)
    offset = 0
    timestamp = 0.0
    duration = (float(n) / sample_rate) / 2.0
    while offset + n < len(audio):
        yield Frame(audio[offset:offset + n], timestamp, duration)
        timestamp += duration
        offset += n


def vad_collector(sample_rate, frame_duration_ms,
                  padding_duration_ms, vad, frames):
    num_padding_frames = int(padding_duration_ms / frame_duration_ms)
    ring_buffer = collections.deque(maxlen=num_padding_frames)
    triggered = False
    voiced_frames = []
    # is_speech
    is_speech_list = []
    is_speech_list2 = []

    for frame_idx, frame in enumerate(frames):
        is_speech = vad.is_speech(frame.bytes, sample_rate)
        is_speech_list.append(1 if is_speech else 0)
        # sys.stdout.write('1' if is_speech else '0')
        if not triggered:
            ring_buffer.append((frame, is_speech, frame_idx))
            num_voiced = len([f for f, speech, f_idx in ring_buffer if speech])
            # If we're NOTTRIGGERED and more than 90% of the frames in
            # the ring buffer are voiced frames, then enter the
            # TRIGGERED state.
            if num_voiced > 0.9 * ring_buffer.maxlen:
                triggered = True
                for f, s, i in ring_buffer:
                    voiced_frames.append(i)
                ring_buffer.clear()
        else:
            voiced_frames.append(frame_idx)
            ring_buffer.append((frame, is_speech))
            num_unvoiced = len([f for f, speech in ring_buffer if not speech])
            if num_unvoiced > 0.9 * ring_buffer.maxlen:
                triggered = False
                # yield voiced_frames
                is_speech_list2 += voiced_frames
                ring_buffer.clear()
                voiced_frames = []
    is_speech_list2 += voiced_frames
    is_speech_list3 = np.zeros(len(is_speech_list), dtype=np.int32)
    for idx in is_speech_list2:
        is_speech_list3[idx] = 1
    return np.array(is_speech_list), is_speech_list3
    # return ''.join(is_speech_list), ''.join(is_speech_list3.astype('str'))

def one_runs(a):
    # Create an array that is 1 where a is 0, and pad each end with an extra 0.
    iszero = np.concatenate(([0], np.equal(a, 1).view(np.int8), [0]))
    absdiff = np.abs(np.diff(iszero))
    # Runs start and end where absdiff is 1.
    ranges = np.where(absdiff == 1)[0].reshape(-1, 2)
    return ranges

def gen_target_file_list(target_dir, target_ext='.wav'):
    l = []
    for root, dirs, files in os.walk(target_dir, followlinks=True):
        for f in files:
            f = os.path.join(root, f)
            ext = os.path.splitext(f)[1]
            ext = ext.lower()
            if ext == target_ext and '._' not in f:
                l.append(f)
    return l

if __name__ == '__main__':
    wav_path = ''
    sr, sig = wavfile.read(wav_path)
    pcm_data = sig.tobytes()
    frames = frame_generator(10, pcm_data, 16000)
    list1, list2 = vad_collector(sample_rate, 10, 30, vad, frames)
    npy_path ='vad.npy'
    np.save(npy_path, list1)




评论

评论模块试运行中,标*的为必填项
昵称:
邮箱:
*内容:
评论

Golang安装

方案一:apt安装# 安装的不是最新的 sudo apt-get install golang-go 方案二:预编译二进制安装# 下载地址替换成最新的安装包 wget -c https://dl.…

OPENVINO pip环境安装

文章标题:OPENVINO pip环境安装文章内容:pip 无法安装 支持V7版本api的openVino环境,如果仅需v10版本的openVino环境 pip install openvino 安…

推荐使用 Firefox 访问此站点 | 服务器由huxia赞助 | 友情链接: 张鹏的博客  杨洋的博客   李号的博客
Developed by zhangpeng && hupeng | Powered by ASP.NET Run on IIS | © 2018-2021 hupeng.me. All Rights Reserved.