峰值查找:signal.find_peaks(spectrum, prominence=0, distance=200)

  • prominence:显著性阈值,峰值点左右延伸一条直线不超过比当前峰值高的点,左右两段最小值中的较大值与峰值之间的差值作为显著性值;
  • distance:水平距离,峰值之间水平距离阈值。
a = np.array([2, 3, 1, 5, 4, 7, 2, 6, 2])
peak_idx, props = signal.find_peaks(a, distance=1, prominence=0, height=3)
peak_idx, props
(array([1, 3, 5, 7], dtype=int64),
 {'peak_heights': array([3., 5., 7., 6.]),
  'prominences': array([1., 1., 5., 4.]),
  'left_bases': array([0, 2, 2, 6], dtype=int64),
  'right_bases': array([2, 4, 6, 8], dtype=int64)})
符合条件的峰值点索引为[1,3,5,7],对应值为[3,5,7,6]
1. 峰值3的显著性:从峰值3向左右分别延长一条直线,直到最后一个不超过当前峰值的点,得到两段线段,左边线段最小值称为left_base,右边线段最小值称为right_base,3向左延伸到2(索引0),向右延伸到2(索引2),显著性值为3-max(a[0], a[2])=1;
2. 峰值5的显著性:5向左延伸到1(索引2),向右延伸到4(索引4),显著性值为5-max(a[2], a[4])=1;
3. 峰值7的显著性:7向左延伸到1(索引2),向右延伸到2(索引6),显著性值为7-max(a[2], a[6])=5;
2. 峰值6的显著性:6向左延伸到2(索引6),向右延伸到2(索引8),显著性值为6-max(a[6], a[8])=4;

4. 将临近的特征点进行组合形成特征点对

5. 对每个特征点对进行hash编码

编码过程:将f1和f2进行10bit量化,其余bit用来存储时间偏移合集形成32bit的hash码

Hash = f1|f2<<10|diff_t<<20,存储信息(t1,Hash)

具体步骤如下:

  • 傅里叶变换得到频谱;
  • 每帧选取n_peaks个极大值点(基于显著性的峰值查找)—>(i, freq),所有特征点追加到列表;
  • 每个极值点在其附近邻域(如i:i+100)内生成一定数量特征对(附近邻域满足一定条件,如diff=other_time-cur_time,diff>1&&diff<10),将特征对编码成Hash值(32位),diff<<20 | other_time<<10 |cur_time,Hash值存入数据库Hash—>(ref_i, song_id);
  • 提取待检索音频的Hash值,查找其Hash值在数据库中哪些音乐中出现song_id—>(Hash, ref_i, test_i);
  • 统计同一song_id中相对时间偏移test_i-ref_i出现最多的次数;
  • 统计所有song_id中相对偏移最多次数最大的索引,即得到检索的song_id。

具体实现如下:

import numpy as np
import librosa
from scipy import signal
import pickle
import os
fix_rate = 16000 
win_length_seconds = 0.5
frequency_bits = 10
num_peaks = 15
# 构造歌曲名与歌曲id之间的映射字典
def song_collect(base_path):
    index = 0
    dic_idx2song = {}
    for roots, dirs, files in os.walk(base_path):
        for file in files:
            if file.endswith(('.mp3', '.wav')):
                file_song = os.path.join(roots, file)
                dic_idx2song[index] = file_song
                index += 1
    return dic_idx2song
# 提取局部最大特征点
def collect_map(y, fs, win_length_seconds=0.5, num_peaks=15):
    win_length = int(win_length_seconds * fs)
    hop_length = int(win_length // 2)
    n_fft = int(2**np.ceil(np.log2(win_length)))
    S = librosa.stft(y, n_fft=n_fft, win_length=win_length, hop_length=hop_length)
    S = np.abs(S) # 获取频谱图
    D, T = np.shape(S)
    constellation_map = []  
    for i in range(T):
        spectrum = S[:, i]
        peaks_index, props = signal.find_peaks(spectrum, prominence=0, distance=200)
        # 根据显著性进行排序
        n_peaks= min(num_peaks, len(peaks_index))
        largest_peaks_index = np.argpartition(props['prominences'], -n_peaks)[-n_peaks:]
        for peak_index in peaks_index[largest_peaks_index]:
            frequency = fs / n_fft * peak_index
            # 保存局部最大值点的时-频信息
            constellation_map.append([i, frequency])
    return constellation_map
# 进行Hash编码
def create_hash(constellation_map, fs, frequency_bits=10, song_id=None):
    upper_frequency = fs / 2
    hashes = {}
    for idx, (time, freq) in enumerate(constellation_map):
        for other_time, other_freq in constellation_map[idx: idx + 100]: # 从邻近的100个点中找点对
            diff = int(other_time - time)
            if diff <= 1 or diff > 10: # 在一定时间范围内找点对
                continue
            freq_binned = int(freq / upper_frequency * (2 ** frequency_bits))
            other_freq_binned = int(other_freq / upper_frequency * (2 ** frequency_bits))
            hash = int(freq_binned) | (int(other_freq_binned) << 10) | (int(diff) << 20)
            hashes[hash] = (time, song_id)
    return hashes

特征提取:feature_collect.py

# 获取数据库中所有音乐
path_music = 'data'
current_path = os.getcwd()
path_songs = os.path.join(current_path, path_music)
dic_idx2song = song_collect(path_songs)
# 对每条音乐进行特征提取
database = {}
for song_id in dic_idx2song.keys():
    file = dic_idx2song[song_id]
    print("collect info of file", file)
    # 读取音乐
    y, fs = librosa.load(file, sr=fix_rate) 
    # 提取特征对
    constellation_map = collect_map(y, fs, win_length_seconds=win_length_seconds, num_peaks=num_peaks)
    # 获取hash值
    hashes = create_hash(constellation_map, fs, frequency_bits=frequency_bits, song_id=song_id)
    # 把hash信息填充入数据库
    for hash, time_index_pair in hashes.items():
        if hash not in database:
            database[hash] = []
        database[hash].append(time_index_pair)
# 对数据进行保存
with open('database.pickle', 'wb') as db:
    pickle.dump(database, db, pickle.HIGHEST_PROTOCOL)
with open('song_index.pickle', 'wb') as songs:
    pickle.dump(dic_idx2song, songs, pickle.HIGHEST_PROTOCOL)
# 加载数据库
database = pickle.load(open('database.pickle', 'rb'))
dic_idx2song = pickle.load(open('song_index.pickle', 'rb'))
print(len(database))
# 检索过程
def getscores(y, fs, database):
    # 对检索语音提取hash
    constellation_map = collect_map(y, fs)
    hashes = create_hash(constellation_map, fs, frequency_bits=10, song_id=None)
    # 获取与数据库中每首歌的hash匹配
    matches_per_song = {}
    for hash, (sample_time, _) in hashes.items():
        if hash in database:
            maching_occurences = database[hash]
            for source_time, song_index in maching_occurences:
                if song_index not in matches_per_song:
                    matches_per_song[song_index] = []
                matches_per_song[song_index].append((hash, sample_time, source_time))
    scores = {}
    # 对于匹配的hash,计算测试样本时间和数据库中样本时间的偏差
    for song_index, matches in matches_per_song.items():
#         scores[song_index] = len(matches)
        song_scores_by_offset = {}
        # 对相同的时间偏差进行累计
        for hash, sample_time, source_time in matches:
            delta = source_time - sample_time
            if delta not in song_scores_by_offset:
                song_scores_by_offset[delta] = 0
            song_scores_by_offset[delta] += 1
        # 计算每条歌曲的最大累计偏差
        song_scores_by_offset= sorted(song_scores_by_offset.items(), key=lambda x: x[1], reverse=True)
        scores[song_index] = song_scores_by_offset[0]
    scores = sorted(scores.items(), key=lambda x:x[1][1], reverse=True)
    return scores

音乐检索:music_research.py

import threading
from playsound import playsound
def cycle(path):
    while 1:
        playsound(path)
def play(path, cyc=False):
    if cyc:
        cycle(path)
    else:
        playsound(path)
path = 'test_music/record4.wav'
y, fs = librosa.load(path, sr=fix_rate)
# 播放待检索音频
music = threading.Thread(target=play, args=(path,))
music.start()
# 检索打分
scores = getscores(y, fs, database)
# 打印检索信息
for k, v in scores:
    file = dic_idx2song[k]
    name = os.path.split(file)[-1]
    # print("%s :%d"%(name, v))
    print("%s: %d: %d"%(name, v[0], v[1]))
# 打印结果
if len(scores) > 0 and scores[0][1][1] > 50:
    print("检索结果为:", os.path.split(dic_idx2song[scores[0][0]])[-1])
else:
    print("没有搜索到该音乐")

麦克风录音识别音乐:

import pyaudio
import wave
RATE = 48000 # 采样率
CHUNK = 1024 # 帧大小
record_seconds = 10 # 录音时长s
CHANNWLS = 2 # 通道数
# 创建pyaudio流
audio = pyaudio.PyAudio()
stream = audio.open(format=pyaudio.paInt16, # 使用量化位数16位
                   channels=CHANNWLS, # 输入声道数目
                   rate=RATE, # 采样率
                   input=True, # 打开输入流
                   frames_per_buffer=CHUNK) # 缓冲区大小
frames = [] # 存放录制的数据
# 开始录音
print('录音中。。。')
for i in range(0, int(RATE / CHUNK * record_seconds)):
    # 从麦克风读取数据流
    data = stream.read(CHUNK)
    # 将数据追加到列表中
    frames.append(data)
# 停止录音,关闭输入流
stream.stop_stream()
stream.close()
audio.terminate()
# 将录音数据写入wav文件中
with wave.open('test_music/test.wav', 'wb') as wf:
    wf.setnchannels(CHANNWLS)
    wf.setsampwidth(audio.get_sample_size(pyaudio.paInt16))
    wf.setframerate(RATE)
    wf.writeframes(b''.join(frames))
# 打开录音文件
path = 'test_music/test.wav'
y, fs = librosa.load(path, sr=fix_rate)
# 线程播放待检索音频
# music = threading.Thread(target=play, args=(path,))
# music.start()
# 音乐检索
print('检索中。。。')
scores = getscores(y, fix_rate, database)
# 打印检索信息
# for k, v in scores:
#     file = dic_idx2song[k]
#     name = os.path.split(file)[-1]
#     # print("%s :%d"%(name, v))
#     print("%s: %d: %d"%(name, v[0], v[1]))
# 打印结果
if len(scores) > 0 and scores[0][1][1] > 50:
    print("检索结果为:", os.path.split(dic_idx2song[scores[0][0]])[-1])
else:
    print("没有搜索到该音乐")

参考: 音乐检索-Shazam算法原理_哔哩哔哩_bilibili

Sha zam 音乐 识别 算法 的小型 Matlab 实现 A部分:特征提取 打开local_settings_sample.m,填写您选择的songdir和hashdir并另存为local_settings.m。可以是任何目录,hashdir并且必须手动创建。必须包含 .wav 格式、单声道和采样频率(最好是 8kHz)的songdir音频片段。对于 音乐 识别 ,每个音频片段都是一首完整的歌曲。 运行extract_features.m。.mat这会在文件中为每首歌曲创建一个哈希表。详情见王博士的解释。 B 部分: 音乐 识别 测试 1 - 运行tag_test.m:该文件采用一段测试 音乐 ,添加噪音并尝试通过将其与哈希表匹配来 识别 它。 测试 2 - 运行tagmix_test.m(实验性):此文件混合了 2 首歌曲的片段并尝试 识别 两者。
如何用java来实现 sha zam ? 几天以前我读了一篇论文 :How sha zam works?( sha zam 的工作原理) 这让我有了自己写一个 sha zam 的兴趣,并且更重要的是,我对用java实现 sha zam 更感兴趣。 关于 sha zam sha zam 的最重要一项功能即是类似于网易云 音乐 和QQ 音乐 中的“听歌识曲”,当你开启听歌识曲功能以后,仅需要两三秒,便可...
在讲 算法 之前,上一些前人的资料。 http://coding-geek.com/how- sha zam -works/ https://laplacian.wordpress.com/2009/01/10/how- sha zam -works/ http://royvanrijn.com/blog/2010/06/creating- sha zam -in-java/ 当然历史也有点久远了,如果你有心去...
音乐 雷达 sha zam 算法 Ever wondered how your favorite music searching app works?. Well, this is the correct place to find out.有没有想过您喜欢的 音乐 搜索应用程序如何工作? 好吧,这是找出正确的地方。 Ever since I was a kid I was both a music and ...