FAST-RIR / code_new /RT60.py
antonjeran's picture
Upload 40 files
b53f252
import numpy as np
import math
from scipy.io import wavfile
from scipy import stats
from acoustics.utils import _is_1d
from acoustics.signal import bandpass
from acoustics.bands import (_check_band_type, octave_low, octave_high, third_low, third_high)
import soundfile as sf
from multiprocessing import Pool
def t60_impulse(raw_signal,fs): # pylint: disable=too-many-locals
"""
Reverberation time from a WAV impulse response.
:param file_name: name of the WAV file containing the impulse response.
:param bands: Octave or third bands as NumPy array.
:param rt: Reverberation time estimator. It accepts `'t30'`, `'t20'`, `'t10'` and `'edt'`.
:returns: Reverberation time :math:`T_{60}`
"""
bands =np.array([62.5 ,125, 250, 500,1000, 2000])
if np.max(raw_signal)==0 and np.min(raw_signal)==0:
print('came 1')
return .5
# fs, raw_signal = wavfile.read(file_name)
band_type = _check_band_type(bands)
# if band_type == 'octave':
low = octave_low(bands[0], bands[-1])
high = octave_high(bands[0], bands[-1])
# elif band_type == 'third':
# low = third_low(bands[0], bands[-1])
# high = third_high(bands[0], bands[-1])
init = -0.0
end = -60.0
factor = 1.0
bands =bands[3:5]
low = low[3:5]
high = high[3:5]
t60 = np.zeros(bands.size)
for band in range(bands.size):
# Filtering signal
filtered_signal = bandpass(raw_signal, low[band], high[band], fs, order=8)
abs_signal = np.abs(filtered_signal) / np.max(np.abs(filtered_signal))
# Schroeder integration
sch = np.cumsum(abs_signal[::-1]**2)[::-1]
sch_db = 10.0 * np.log10(sch / np.max(sch))
if math.isnan(sch_db[1]):
print('came 2')
return .5
# print("leng sch_db ",sch_db.size)
# print("sch_db ",sch_db)
# Linear regression
sch_init = sch_db[np.abs(sch_db - init).argmin()]
sch_end = sch_db[np.abs(sch_db - end).argmin()]
init_sample = np.where(sch_db == sch_init)[0][0]
end_sample = np.where(sch_db == sch_end)[0][0]
x = np.arange(init_sample, end_sample + 1) / fs
y = sch_db[init_sample:end_sample + 1]
slope, intercept = stats.linregress(x, y)[0:2]
# Reverberation time (T30, T20, T10 or EDT)
db_regress_init = (init - intercept) / slope
db_regress_end = (end - intercept) / slope
t60[band] = factor * (db_regress_end - db_regress_init)
mean_t60 =(t60[1]+t60[0])/2
# print("meant60 is ", mean_t60)
if math.isnan(mean_t60):
print('came 3')
return .5
return mean_t60
def t60_error(filename1,filename2):
real_wave,fs = sf.read(filename1)
fake_wave,fs = sf.read(filename2)
channel = int(real_wave.size/len(real_wave))
pool = Pool(processes=8)
results =[]
for n in range(channel):
results.append(pool.apply_async(t60_parallel, args=(n,real_wave,fake_wave,fs,)))
T60_error =0
for result in results:
T60_error = T60_error + result.get()
T60_error = T60_error/channel
pool.close()
pool.join()
# T60_error = Parallel(n_jobs=64)(delayed(t60_parallel)(n, real_wave,fake_wave,fs) for n in range(channel))#np.random.randint(0,1023,size=channel))#
# T60_error = sum(results)/channel
# for n in range(channel):
# real_wave_single = real_wave[:,n]
# fake_wave_single = fake_wave[:,n]
# Real_T60_val = t60_impulse(real_wave_single,fs)
# Fake_T60_val = t60_impulse(fake_wave_single,fs)
# T60_diff = abs(Real_T60_val-Fake_T60_val)
# T60_error = T60_error + T60_diff
# T60_error = T60_error/channel
return str(T60_error)
def t60_parallel(n,real_wave,fake_wave,fs):
real_wave_single = real_wave[n,:]
fake_wave_single = fake_wave[n,:]
Real_T60_val = t60_impulse(real_wave_single,fs)
Fake_T60_val = t60_impulse(fake_wave_single,fs)
T60_diff = abs(Real_T60_val-Fake_T60_val)
return T60_diff
if __name__ == '__main__':
t60_impulse('/home/anton/Desktop/gamma101/data/evaluation_all/SF1/Hotel_SkalskyDvur_ConferenceRoom2-MicID01-SpkID01_20170906_S-09-RIR-IR_sweep_15s_45Hzto22kHz_FS16kHz.v00.wav')