python mp4转m3u8 低成本网站视频托管

通过 rclone 工具将视频上传到 Cloudflare R2 存储桶(赞美 Cloudflare)

利用 Cloudflare 提供的免费出站带宽成本对比:

我们的方案:300GB 视频 15TB 流量每月仅需 2.18 美元

传统视频托管服务:同等流量通常需要数千美元 技术细节:- 由于大多数浏览器不直接支持 HLS 流播放- 使用 MuxHQ 的 hls-video-element 组件来处理 HLS 播放

网站即可低成本播放视频内容

#!/usr/bin/env python3
import logging
import os
import shutil
import subprocess
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from fractions import Fraction
from pathlib import Path

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)


@dataclass
class VideoVariant:
    resolution: str
    bitrate: str
    output_name: str
    profile: str
    level: str
    width: int
    height: int

    @property
    def bandwidth(self) -> int:
        """Calculate total bandwidth including audio"""
        video_bitrate = int(self.bitrate.rstrip("k")) * 1000
        audio_bitrate = 128000  # 128k audio
        return video_bitrate + audio_bitrate


class FFmpegNotFoundError(Exception):
    """Raised when FFmpeg or FFprobe are not found"""

    pass


class VideoProcessor:
    def __init__(self, input_file: Path, output_dir: Path | None = None):
        self.input_file = Path(input_file)
        self.output_dir = output_dir or Path.cwd()
        self.basename = self.input_file.stem
        self.output_path = self.output_dir / self.basename

        # Define video variants (resolution, bitrate, name, profile, level)
        self.variants = [
            VideoVariant("1280x720", "1200k", "720p", "main", "3.1", 1280, 720),
            VideoVariant("1920x1080", "2500k", "1080p", "high", "4.2", 1920, 1080),
            VideoVariant("3840x2160", "8000k", "2160p", "high", "5.1", 3840, 2160),
        ]

    @staticmethod
    def check_dependencies() -> None:
        """Verify FFmpeg and FFprobe are installed"""
        for cmd in ["ffmpeg", "ffprobe"]:
            if not shutil.which(cmd):
                raise FFmpegNotFoundError(f"{cmd} is not installed or not in PATH")

    def validate_input(self) -> None:
        """Validate input file exists and is accessible"""
        if not self.input_file.is_file():
            raise FileNotFoundError(f"Input file '{self.input_file}' does not exist")

    def setup_output_directory(self) -> None:
        """Setup output directory, removing if it exists"""
        if self.output_path.exists():
            logger.warning(f"Removing existing output directory: {self.output_path}")
            shutil.rmtree(self.output_path)
        self.output_path.mkdir(parents=True, exist_ok=True)

    def get_frame_rate(self) -> int:
        """Get frame rate from input video"""
        cmd = [
            "ffprobe",
            "-v",
            "0",
            "-of",
            "default=noprint_wrappers=1:nokey=1",
            "-select_streams",
            "v:0",
            "-show_entries",
            "stream=avg_frame_rate",
            str(self.input_file),
        ]

        try:
            output = subprocess.check_output(cmd, stderr=subprocess.PIPE).decode().strip()
            num, denom = map(int, output.split("/"))
            return int(Fraction(num, denom))
        except (subprocess.CalledProcessError, ValueError) as e:
            logger.error(f"Failed to get frame rate: {e}")
            return 30  # Default to 30 fps if detection fails

    def process_variant(self, variant: VideoVariant, gop_size: int) -> bool:
        """Process a single video variant"""
        output_playlist = self.output_path / f"{variant.output_name}.m3u8"
        segment_pattern = self.output_path / f"{variant.output_name}_%03d.ts"

        cmd = [
            "ffmpeg",
            "-y",
            "-i",
            str(self.input_file),
            "-c:v",
            "libx264",
            "-preset",
            "ultrafast",
            "-profile:v",
            variant.profile,
            "-level:v",
            variant.level,
            "-b:v",
            variant.bitrate,
            "-threads",
            "0",
            "-s",
            variant.resolution,
            "-c:a",
            "aac",
            "-b:a",
            "128k",
            "-ac",
            "2",
            "-g",
            str(gop_size),
            "-keyint_min",
            str(gop_size),
            "-sc_threshold",
            "0",
            "-force_key_frames",
            "expr:gte(t,n_forced*4)",
            "-hls_time",
            "4",
            "-hls_list_size",
            "0",
            "-hls_flags",
            "independent_segments",
            "-hls_segment_filename",
            str(segment_pattern),
            str(output_playlist),
        ]

        try:
            subprocess.run(cmd, check=True, capture_output=True)
            logger.info(f"Successfully processed {variant.output_name}")
            return True
        except subprocess.CalledProcessError as e:
            logger.error(f"Failed to process {variant.output_name}: {e.stderr.decode()}")
            return False

    def generate_master_playlist(self) -> None:
        """Generate the master HLS playlist"""
        master_playlist_path = self.output_path / "playlist.m3u8"

        with open(master_playlist_path, "w") as f:
            f.write("#EXTM3U\n")
            f.write("#EXT-X-VERSION:3\n")

            for variant in self.variants:
                f.write("\n")
                f.write(f"#EXT-X-STREAM-INF:BANDWIDTH={variant.bandwidth}," f"RESOLUTION={variant.resolution}\n")
                f.write(f"{variant.output_name}.m3u8\n")

        logger.info(f"Generated master playlist: {master_playlist_path}")

    def process(self) -> None:
        """Main processing function"""
        try:
            # Initial setup and validation
            self.check_dependencies()
            self.validate_input()
            self.setup_output_directory()

            # Get video parameters
            frame_rate = self.get_frame_rate()
            gop_size = frame_rate * 4  # 4-second GOP

            # Process variants in parallel
            max_workers = int(max((os.cpu_count() or 1) / 2, 1))
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = [executor.submit(self.process_variant, variant, gop_size) for variant in self.variants]

                # Wait for all processes to complete
                if not all(future.result() for future in futures):
                    raise RuntimeError("One or more variants failed to process")

            # Generate master playlist
            self.generate_master_playlist()

            logger.info(f"Processing completed successfully. Output directory: {self.output_path}")

        except Exception as e:
            logger.error(f"Processing failed: {str(e)}")
            raise

作者:spike

分类: Python

创作时间:2025-01-25

更新时间:2025-01-28

联系方式放在中括号之中例如[[email protected]],回复评论在开头加上标号例如:#1