使用 python 与 Adapter Eim 插件实时绘制音乐频谱

使用 python 与 pyaudio 包实时分析音频数据,通过 adapter eim 将数据发送给 Scratch,Scratch 收到数据后快速画图呈现。

Scratch 代码在此

import pyaudio
import numpy as np
import time 
from codelab_adapter_client import AdapterNode

class MyNode(AdapterNode):
    NODE_ID = "eim/list_data"

    def __init__(self):
        super().__init__()
    
    def send_data(self, content):
        message = self.message_template()
        message["payload"]["content"] = content
        self.publish(message)

node = MyNode()
node.receive_loop_as_thread()
time.sleep(0.1)

chunk = 1024           # 每次以 1024 个 sample 为 1 组(chunk)
num_of_bands = 10          # downsample 频率范围,合并分成 10 组呈现结果          
fs = 44100                 # 采样率

# 使用 pyaudio 从音频输入设备读取数据,每个 sample 以 16 bit 存储,单通道
pa = pyaudio.PyAudio()



def freq_index():
    end_index = []
    i = num_of_bands
    while i >0:
        end_index.append(int(chunk/2**i))
        i = i - 1

    start_index = end_index[:-1]
    start_index.insert(0,0)  
    
    return start_index, end_index        

start_index, end_index = freq_index()       


def callback(in_data, frame_count, time_info, status):
    data = np.fromstring(in_data, dtype=np.int16)    
    data_n = np.array(data)/(2**15)
    data_w = data_n*np.hanning(len(data_n))
    fft = np.abs(np.fft.fft(data_w))[0:int(chunk/2)]/chunk
    fft[1:]=2*fft[1:]


    out_li = [i for i in range(num_of_bands)]
    for i in np.arange(num_of_bands):
        fft_band = np.mean(fft[start_index[i]:end_index[i]])*10000       
        out_li[i]=float(fft_band)
        i=i+1
    
    
    node.send_data(out_li)
    return(None, pyaudio.paContinue)

stream = pa.open(format=pyaudio.paInt16, channels=1, rate=fs, input=True, frames_per_buffer=int(chunk), stream_callback=callback)

stream.start_stream()

while stream.is_active():
    time.sleep(1/fs*chunk)
    
stream.stop_stream()
stream.close()
p.terminate()




2赞

相比正文的脚本,以下版本有两处改动:

  • Python 代码会直接通过 EIM 在新的浏览器窗口中自动打开对应的 Scratch 项目并运行,只需手动全屏即可(保证 Adapter 已运行)

  • 使用 try 与 except 结构完成 cleanup,关掉 pyaudio 的 stream

import pyaudio
import numpy as np
import webbrowser
import time 
from codelab_adapter_client import AdapterNode


# EIM 初始化
class MyNode(AdapterNode):
    NODE_ID = "eim/bands"

    def __init__(self):
        super().__init__()
        self.is_ready = False
    
    def send_data(self, content):
        message = self.message_template()
        message["payload"]["content"] = content
        self.publish(message)
        
    def extension_message_handle(self, topic, payload):
        self.logger.info(f'the message payload from scratch: {payload}')
        content = payload["content"]
        if content == "ready":
            self.is_ready = True    
            
node = MyNode()
node.receive_loop_as_thread()
time.sleep(0.1)



# 在新的浏览器窗口自动打开对应 Scratch 项目,使用 EIM 发送消息使 Scratch 项目待命,同时等待项目加载成功后的返回消息
webbrowser.open('https://create.codelab.club/projects/9943/editor/', new=1)

print("Waiting for Scratch.")

while not node.is_ready:  
    node.send_data("go")
    time.sleep(0.5)

print("scratch is ready")
        
        

# pyaudio stream 配置参数       
chunk = 1024                  # 每次以 1024 个 sample 为 1 组(chunk)从音频流中读取数据
num_of_bands = 10             # downsample 频率范围,合并分成 10 组呈现结果          
fs = 44100                    # 采样率
num_channels = 1              # 单声道
pa_format = pyaudio.paInt16   # quantization 2**16



# 生成 10 个频段的起末索引值,合并求频段 fft 均值用
# 以 22050Hz(采样率的一半)算起,按照相继低一个八度(低八度音符的频率是比它高一个八度音符频率的一半)的规律取 bin 值,进而确定频段区间:22050 Hz,11025 Hz,5512 Hz,2756 Hz
# 1378 Hz,689 Hz,344 Hz,172 Hz,86 Hz,43 Hz,22 Hz,11 Hz,0 Hz
def freq_index():
    end_index = []
    i = num_of_bands
    while i >0:
        end_index.append(int(chunk/2**i))
        i = i - 1

    start_index = end_index[:-1]
    start_index.insert(0,0)  
    
    return start_index, end_index        

start_index, end_index = freq_index() 

pa = pyaudio.PyAudio()



# pyaudio 对音频流数据的读/写有两种方式,callback 和 block,这里使用 callback
try:   
    def callback(in_data, frame_count, time_info, status):
        data = np.fromstring(in_data, dtype=np.int16)               # 数据读取
        data_n = np.array(data)/(2**15)                             # 数据标准化,因为格式定义的是 pyaudio.paInt16,考虑数据的正负值,除以 2**15
        data_w = data_n*np.hanning(len(data_n))                     # hanning window
        fft = np.abs(np.fft.fft(data_w))[0:int(chunk/2)]/chunk      # fft,并对结果做 scaling
        fft[1:]=2*fft[1:]                                           # 
    
        out_li = [i for i in range(num_of_bands)]
        for i in np.arange(num_of_bands):
            fft_band = np.mean(fft[start_index[i]:end_index[i]])*10000       
            out_li[i]=float(fft_band)
            i=i+1   
        print(out_li)
        node.send_data(out_li)
        return(None, pyaudio.paContinue)

    stream = pa.open(format=pa_format, channels=num_channels, rate=fs, input=True, frames_per_buffer=int(chunk), stream_callback=callback)
    stream.start_stream()

    while stream.is_active():
        time.sleep(1/fs*chunk)

        

except KeyboardInterrupt:
    stream.stop_stream()
    stream.close()
    pa.terminate()
    print('interrupt by user') 

2赞