python利用ffmpeg修改MP3音质
一键安装ffmpeg
@echo off
set "ffmpeg_url=http://link.zhuazii.cn/down.php/75582a2b8500965fd2b2bd6aa0fc7ee0.zip" ; 替换为实际的FFmpeg下载链接
set "ffmpeg_path=C:\ffmpeg\bin"
set "ffmpeg_zip=C:\ffmpeg.zip"
REM 下载FFmpeg
echo Downloading FFmpeg...
powershell -Command "Invoke-WebRequest -Uri %ffmpeg_url% -OutFile '%ffmpeg_zip%'"
REM 检查FFmpeg的zip文件是否存在
if not exist "%ffmpeg_zip%" (
echo Failed to download FFmpeg.
pause
exit /b
)
REM 解压FFmpeg
echo Extracting FFmpeg...
powershell -Command "Expand-Archive -Path '%ffmpeg_zip%' -DestinationPath 'C:\ffmpeg'"
REM 检查解压后的FFmpeg bin目录是否存在
if not exist "%ffmpeg_path%" (
echo Failed to extract FFmpeg.
pause
exit /b
)
REM 检查FFmpeg的bin目录是否在系统Path中
echo %PATH% | findstr /i /c:"%ffmpeg_path%" >nul
if errorlevel 1 (
REM 如果不在,添加到系统Path
setx /M PATH "%PATH%;%ffmpeg_path%"
echo FFmpeg bin directory added to system PATH.
) else (
echo FFmpeg bin directory is already in system PATH.
)
pause
源代码
import os
import threading
from tkinter import Tk, Label, Entry, Button, filedialog, StringVar, scrolledtext, messagebox, IntVar
from tkinter.ttk import Checkbutton
from pydub import AudioSegment
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor, as_completed
# 创建一个队列来存储日志消息
log_queue = Queue()
def process_file(file_path, output_folder, bitrate):
try:
audio = AudioSegment.from_mp3(file_path)
output_path = os.path.join(output_folder, os.path.basename(file_path))
audio.export(output_path, format='mp3', bitrate=bitrate)
return f'已处理 {os.path.basename(file_path)}'
except Exception as e:
return f'错误: {e}'
def log_to_gui(text_log):
while True:
try:
message = log_queue.get(timeout=1)
text_log.configure(state='normal')
text_log.insert('end', message + '\n')
text_log.see('end')
text_log.configure(state='disabled')
except Empty:
pass
def process_files_thread(input_folder, output_folder, bitrate, use_multithreading):
if use_multithreading:
with ThreadPoolExecutor(max_workers=4) as executor:
mp3_files = [f for f in os.listdir(input_folder) if f.lower().endswith('.mp3')]
futures = [executor.submit(process_file, os.path.join(input_folder, f), output_folder, bitrate) for f in mp3_files]
for future in as_completed(futures):
result = future.result()
log_queue.put(result)
else:
mp3_files = [f for f in os.listdir(input_folder) if f.lower().endswith('.mp3')]
for filename in mp3_files:
file_path = os.path.join(input_folder, filename)
result = process_file(file_path, output_folder, bitrate)
log_queue.put(result)
log_queue.put('处理完成。')
def process_files():
input_folder = entry_input.get()
output_folder = entry_output.get()
bitrate = bitrate_var.get()
use_multithreading = var_multithreading.get()
if not input_folder or not output_folder or not bitrate:
log_queue.put('错误:请填写所有字段。\n')
return
if not bitrate.endswith('k'):
bitrate += 'k'
os.makedirs(output_folder, exist_ok=True)
if use_multithreading:
messagebox.showinfo("提示", "勾选了多线程处理,无法同步更新输出日志。")
threading.Thread(target=process_files_thread, args=(input_folder, output_folder, bitrate, use_multithreading), daemon=True).start()
def browse_input():
folder_selected = filedialog.askdirectory()
entry_input.delete(0, 'end')
entry_input.insert(0, folder_selected)
def browse_output():
folder_selected = filedialog.askdirectory()
entry_output.delete(0, 'end')
entry_output.insert(0, folder_selected)
root = Tk()
root.title('MP3音质转换器')
Label(root, text='输入目录:').grid(row=0, column=0)
Label(root, text='输出目录:').grid(row=1, column=0)
Label(root, text='比特率(例如:192k):').grid(row=2, column=0)
entry_input = Entry(root, width=50)
entry_input.grid(row=0, column=1)
entry_output = Entry(root, width=50)
entry_output.grid(row=1, column=1)
bitrate_var = StringVar()
Entry(root, textvariable=bitrate_var, width=20).grid(row=2, column=1)
var_multithreading = IntVar()
Checkbutton(root, text='四线程处理', variable=var_multithreading).grid(row=3, column=1)
Button(root, text='浏览输入', command=browse_input).grid(row=0, column=2)
Button(root, text='浏览输出', command=browse_output).grid(row=1, column=2)
Button(root, text='开始', command=process_files).grid(row=2, column=2)
text_log = scrolledtext.ScrolledText(root, width=60, height=10, state='disabled')
text_log.grid(row=4, column=0, columnspan=3, pady=10)
threading.Thread(target=lambda: log_to_gui(text_log), daemon=True).start()
root.mainloop()