为了账号安全,请及时绑定邮箱和手机立即绑定

如何使用 Python 接收“icecast”互联网广播流以立即播放?

如何使用 Python 接收“icecast”互联网广播流以立即播放?

开心每一天1111 2022-04-27 16:11:16
我想要一个互联网音频/广播流(特别是Longplayer,点击直接流 URL)并用 python 播放它。最好是后台运行,以便脚本能够继续运行其主循环。(例如作为游戏背景音乐或其他东西,虽然 Pyglet、PyGame 等可能为此提供自己的工具。)我已经看到了一些可能已过时的示例,这些示例使用录制互联网广播requests并将其转储到文件中,但这并不是我想要的,而且答案的评论似乎有关于requests存在问题的争论?(见这里)我愿意使用任何你可以使用的包pip,只要它适用于 Python 3.X。(目前使用 3.6 纯粹是因为我还没有努力安装 3.7)重申一下,我不想保存流,只需立即播放(或者在需要时使用缓冲?)回给用户。这最好是不阻塞脚本,我想这需要多线程/多处理,但这对于播放来说是次要的。)
查看完整描述

2 回答

?
吃鸡游戏

TA贡献1829条经验 获得超7个赞

正如这些看似简单的问题似乎总是如此,魔鬼在细节中。我最终写了一些代码来解决这个问题。pip 依赖项可以使用python3 -m pip install ffmpeg-python PyOpenAL. 代码的工作流程可以分为两步:

  1. 代码必须从在线流中下载 mp3 文件数据的二进制块,并将它们转换为原始 PCM 数据(基本上是签名的 uint16_t 幅度值)以进行播放。这是使用ffmpeg-python 库完成的,它是FFmpeg的包装器。此包装器在单独的进程中运行 FFmpeg,因此此处不会发生阻塞。

  2. 然后代码必须将这些块排队以进行播放。这是使用PyOpenAL完成的,它是OpenAL的包装器。在创建设备和上下文以启用音频播放后,将创建一个 3d 定位的源。这个源不断地与缓冲区(模拟“环形缓冲区”)一起排队,这些缓冲区填充了从 FFmpeg 输入的数据。这与第一步在单独的线程上运行,使下载新音频块独立于音频块播放运行。

这是该代码的样子(带有一些注释)。如果您对代码或此答案的任何其他部分有任何疑问,请告诉我。

import ctypes

import ffmpeg

import numpy as np

from openal.al import *

from openal.alc import *

from queue import Queue, Empty

from threading import Thread

import time

from urllib.request import urlopen


def init_audio():

    #Create an OpenAL device and context.

    device_name = alcGetString(None, ALC_DEFAULT_DEVICE_SPECIFIER)

    device = alcOpenDevice(device_name)

    context = alcCreateContext(device, None)

    alcMakeContextCurrent(context)

    return (device, context)


def create_audio_source():

    #Create an OpenAL source.

    source = ctypes.c_uint()

    alGenSources(1, ctypes.pointer(source))

    return source


def create_audio_buffers(num_buffers):

    #Create a ctypes array of OpenAL buffers.

    buffers = (ctypes.c_uint * num_buffers)()

    buffers_ptr = ctypes.cast(

        ctypes.pointer(buffers), 

        ctypes.POINTER(ctypes.c_uint),

    )

    alGenBuffers(num_buffers, buffers_ptr)

    return buffers_ptr


def fill_audio_buffer(buffer_id, chunk):

    #Fill an OpenAL buffer with a chunk of PCM data.

    alBufferData(buffer_id, AL_FORMAT_STEREO16, chunk, len(chunk), 44100)


def get_audio_chunk(process, chunk_size):

    #Fetch a chunk of PCM data from the FFMPEG process.

    return process.stdout.read(chunk_size)


def play_audio(process):

    #Queues up PCM chunks for playing through OpenAL

    num_buffers = 4

    chunk_size = 8192

    device, context = init_audio()

    source = create_audio_source()

    buffers = create_audio_buffers(num_buffers)


    #Initialize the OpenAL buffers with some chunks

    for i in range(num_buffers):

        buffer_id = ctypes.c_uint(buffers[i])

        chunk = get_audio_chunk(process, chunk_size)

        fill_audio_buffer(buffer_id, chunk)


    #Queue the OpenAL buffers into the OpenAL source and start playing sound!

    alSourceQueueBuffers(source, num_buffers, buffers)

    alSourcePlay(source)

    num_used_buffers = ctypes.pointer(ctypes.c_int())


    while True:

        #Check if any buffers are used up/processed and refill them with data.

        alGetSourcei(source, AL_BUFFERS_PROCESSED, num_used_buffers)

        if num_used_buffers.contents.value != 0:

            used_buffer_id = ctypes.c_uint()

            used_buffer_ptr = ctypes.pointer(used_buffer_id)

            alSourceUnqueueBuffers(source, 1, used_buffer_ptr)

            chunk = get_audio_chunk(process, chunk_size)

            fill_audio_buffer(used_buffer_id, chunk)

            alSourceQueueBuffers(source, 1, used_buffer_ptr)


if __name__ == "__main__":    

    url = "http://icecast.spc.org:8000/longplayer"


    #Run FFMPEG in a separate process using subprocess, so it is non-blocking

    process = (

        ffmpeg

        .input(url)

        .output("pipe:", format='s16le', acodec='pcm_s16le', ac=2, ar=44100, loglevel="quiet")

        .run_async(pipe_stdout=True)

    )


    #Run audio playing OpenAL code in a separate thread

    thread = Thread(target=play_audio, args=(process,), daemon=True)

    thread.start()


    #Some example code to show that this is not being blocked by the audio.

    start = time.time()

    while True:

        print(time.time() - start)


查看完整回答
反对 回复 2022-04-27
?
慕姐8265434

TA贡献1813条经验 获得超2个赞

使用pyminiaudio : (它提供了一个 icecast 流源类):


import miniaudio


def title_printer(client: miniaudio.IceCastClient, new_title: str) -> None:

    print("Stream title: ", new_title)


with miniaudio.IceCastClient("http://icecast.spc.org:8000/longplayer",

        update_stream_title=title_printer) as source:

    print("Connected to internet stream, audio format:", source.audio_format.name)

    print("Station name: ", source.station_name)

    print("Station genre: ", source.station_genre)

    print("Press <enter> to quit playing.\n")

    stream = miniaudio.stream_any(source, source.audio_format)

    with miniaudio.PlaybackDevice() as device:

        device.start(stream)

        input()   # wait for user input, stream plays in background


查看完整回答
反对 回复 2022-04-27
  • 2 回答
  • 0 关注
  • 152 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信