package audio import ( "bytes" "io" "math" "testing" ) // TestSincResamplerUpsampling 测试上采样 16000Hz → 44100Hz func TestSincResamplerUpsampling(t *testing.T) { // VeryHigh 质量 FIR 延迟约 969 输入样本,数据量需远超延迟 inputSamples := make([]int16, 8000) for i := range inputSamples { inputSamples[i] = int16(math.Sin(2*math.Pi*440.0*float64(i)/16000.0) * 8000) } inputData := encodeInt16LE(inputSamples) r := newSincResampler(inputData, 16000, 44100, 2).(*sincResampler) outputSamples := readAllSamples(t, r) expectedSamples := int(float64(len(inputSamples)) * 44100.0 / 16000.0) t.Logf("输入: %d 样本 @ 16000Hz", len(inputSamples)) t.Logf("输出: %d 样本 @ 44100Hz (期望 ~%d)", outputSamples, expectedSamples) if outputSamples == 0 { t.Fatal("没有输出数据") } // 上采样:输出应多于输入 if outputSamples <= len(inputSamples) { t.Errorf("上采样失败:输出(%d) 应多于输入(%d)", outputSamples, len(inputSamples)) } assertWithinTolerance(t, outputSamples, expectedSamples, 0.15) } // TestSincResamplerPassthrough 测试采样率相同时直接透传 func TestSincResamplerPassthrough(t *testing.T) { inputSamples := []int16{100, 200, 300, 400, 500, 600} inputData := encodeInt16LE(inputSamples) r := newSincResampler(inputData, 16000, 16000, 2) if _, ok := r.(*bytes.Buffer); !ok { t.Error("采样率相同时应该直接透传原始 reader") } } // TestSincResamplerDownsampling 测试下采样 44100Hz → 16000Hz func TestSincResamplerDownsampling(t *testing.T) { inputSamples := make([]int16, 8000) for i := range inputSamples { inputSamples[i] = int16(math.Sin(2*math.Pi*440.0*float64(i)/44100.0) * 8000) } inputData := encodeInt16LE(inputSamples) r := newSincResampler(inputData, 44100, 16000, 2).(*sincResampler) outputSamples := readAllSamples(t, r) expectedSamples := int(float64(len(inputSamples)) * 16000.0 / 44100.0) t.Logf("输入: %d 样本 @ 44100Hz", len(inputSamples)) t.Logf("输出: %d 样本 @ 16000Hz (期望 ~%d)", outputSamples, expectedSamples) if outputSamples == 0 { t.Fatal("没有输出数据") } // 下采样:输出应少于输入 if outputSamples >= len(inputSamples) { t.Errorf("下采样失败:输出(%d) 应少于输入(%d)", outputSamples, len(inputSamples)) } assertWithinTolerance(t, outputSamples, expectedSamples, 0.15) } // TestSincResamplerFlush 测试小数据量时 Flush 获取尾部残留 func TestSincResamplerFlush(t *testing.T) { // 小数据集:输入少于 FIR 延迟,输出主要来自 Flush inputSamples := make([]int16, 500) for i := range inputSamples { inputSamples[i] = int16(i * 100) } inputData := encodeInt16LE(inputSamples) r := newSincResampler(inputData, 16000, 44100, 2).(*sincResampler) outputSamples := readAllSamples(t, r) t.Logf("小数据输入: %d 样本, 输出: %d 样本 (来自 Flush)", len(inputSamples), outputSamples) // 即使输入小于延迟,Flush 也应产出数据 if outputSamples == 0 { t.Fatal("Flush 未产生任何数据") } } // TestSincResamplerShortBuffer 测试 io.Reader 边界行为 func TestSincResamplerShortBuffer(t *testing.T) { inputSamples := make([]int16, 2000) for i := range inputSamples { inputSamples[i] = int16(i) } inputData := encodeInt16LE(inputSamples) r := newSincResampler(inputData, 16000, 44100, 2).(*sincResampler) // 1 字节 buffer → ErrShortBuffer _, err := r.Read(make([]byte, 1)) if err != io.ErrShortBuffer { t.Errorf("期望 io.ErrShortBuffer,得到: %v", err) } // 2 字节 buffer → 正常工作 buf := make([]byte, 2) n, err := r.Read(buf) if n != 2 || err != nil { t.Errorf("2 字节 buffer 应正常读取: n=%d, err=%v", n, err) } } // TestSincResamplerStreaming 测试流式多次 Read 的正确性 func TestSincResamplerStreaming(t *testing.T) { inputSamples := make([]int16, 10000) for i := range inputSamples { inputSamples[i] = int16(math.Sin(2*math.Pi*440.0*float64(i)/16000.0) * 8000) } inputData := encodeInt16LE(inputSamples) r := newSincResampler(inputData, 16000, 44100, 2).(*sincResampler) // 小 buffer 模拟流式读取 buf := make([]byte, 128) totalSamples := 0 readCount := 0 for { n, err := r.Read(buf) if n > 0 { totalSamples += n / 2 readCount++ } if err == io.EOF { break } if err != nil { t.Fatalf("读取失败: %v", err) } } expectedSamples := int(float64(len(inputSamples)) * 44100.0 / 16000.0) t.Logf("流式读取: %d 次, 共 %d 样本 (期望 ~%d)", readCount, totalSamples, expectedSamples) if readCount < 50 { t.Errorf("流式读取次数过少: %d", readCount) } assertWithinTolerance(t, totalSamples, expectedSamples, 0.15) } // TestSincResamplerSineWave 测试已知正弦波信号的重采样 func TestSincResamplerSineWave(t *testing.T) { const freq = 440.0 const inRate = 16000 inputSamples := make([]int16, inRate/4) // 0.25 秒 for i := range inputSamples { inputSamples[i] = int16(math.Sin(2*math.Pi*freq*float64(i)/float64(inRate)) * 16000) } inputData := encodeInt16LE(inputSamples) r := newSincResampler(inputData, inRate, 44100, 2).(*sincResampler) output := readAllSamples(t, r) expected := int(float64(len(inputSamples)) * 44100.0 / float64(inRate)) t.Logf("440Hz 正弦波: %d → %d 样本 (期望 ~%d)", len(inputSamples), output, expected) if output == 0 { t.Fatal("正弦波重采样无输出") } assertWithinTolerance(t, output, expected, 0.15) } // --- 辅助函数 --- func encodeInt16LE(samples []int16) *bytes.Buffer { buf := bytes.NewBuffer(nil) for _, s := range samples { buf.Write([]byte{byte(s), byte(s >> 8)}) } return buf } func readAllSamples(t *testing.T, r io.Reader) int { t.Helper() outputData := bytes.NewBuffer(nil) buf := make([]byte, 4096) for { n, err := r.Read(buf) if n > 0 { outputData.Write(buf[:n]) } if err == io.EOF { break } if err != nil { t.Fatalf("读取失败: %v", err) } } return outputData.Len() / 2 } func assertWithinTolerance(t *testing.T, actual, expected int, tolerance float64) { t.Helper() delta := math.Abs(float64(actual - expected)) maxDelta := float64(expected) * tolerance if delta > maxDelta && delta > 10 { t.Errorf("超出容忍度: 实际 %d, 期望 %d (差: %.0f, 上限: %.0f)", actual, expected, delta, maxDelta) } }