在python中使用nvidia的VPF库对RTSP流进行硬解码并使用opencv进行显示
解码并处理视频流的多线程应用
随着视频处理技术的不断发展,越来越多的应用需要对视频流进行解码和处理。在本文中,我们将介绍一个基于Python的多线程应用程序,该应用程序可以解码并处理多个RTSP视频流,同时利用GPU加速,以提高处理速度。
这个应用程序使用了一些关键的Python库和工具,包括PyNvCodec、OpenCV、和PyCUDA等。它充分利用了现代GPU的计算能力,实现了高效的视频解码和处理。
多线程解码
在这个应用程序中,我们使用了Python的concurrent.futures
库来实现多线程解码。每个视频流都在独立的线程中解码,这样可以同时处理多个视频流,充分利用了多核CPU的性能。
from concurrent.futures import ThreadPoolExecutor# ...# 创建线程池pool = ThreadPoolExecutor(max_workers=len(urls))futures = []# 遍历每个视频流并提交解码任务for url in urls: future = pool.submit(decode_rtsp_stream, index, url, gpuID) futures.append(future) index += 1# 等待所有任务完成pool.shutdown()# 获取每个任务的结果for future in futures: future.result()
视频解码和处理
视频解码是这个应用程序的核心功能。我们使用PyNvCodec库来进行视频解码,同时利用了GPU来加速处理。
def decode_rtsp_stream(thread_index: int, url: str, gpu_id: int): # 获取视频流参数 params = get_stream_params(url) # ... # 创建NvDecoder实例 nvdec = nvc.PyNvDecoder(w, h, f, c, g) # ... while True: # 读取视频流数据 bits = proc.stdout.read(read_size) # ... # 解码视频帧 surf = nvdec.DecodeSurfaceFromPacket(enc_packet, pkt_data) # ... # 执行颜色空间转换和表面下载 cvtSurface = nv_cvt.Execute(surf, cc_ctx) success = nv_down.DownloadSingleSurface(cvtSurface, data) # ... # 显示解码后的帧 cv2.imshow(str(thread_index), new_data) cv2.waitKey(1) # ...
完整代码
这个应用程序可以广泛用于视频监控、实时视频分析、视频编码和解码等领域。通过多线程解码和GPU加速,它可以处理多个高分辨率视频流,并在实时性要求较高的情况下提供流畅的显示和处理效果。
import osimport sysimport subprocessimport jsonimport PyNvCodec as nvcimport numpy as npfrom io import BytesIOfrom multiprocessing import Processimport uuidimport timefrom concurrent.futures import ThreadPoolExecutorimport cv2import pycuda.gpuarray as gpuarray# import PytorchNvCodec as pnvcimport torchimport torchvision.transforms as Tdef add_cuda_dll_directories(): if os.name == "nt": cuda_path = os.environ.get("CUDA_PATH") if cuda_path: os.add_dll_directory(cuda_path) else: print("CUDA_PATH environment variable is not set.", file=sys.stderr) exit(1) sys_path = os.environ.get("PATH") if sys_path: paths = sys_path.split(";") for path in paths: if os.path.isdir(path) and path != '.': os.add_dll_directory(path) else: print("PATH environment variable is not set.", file=sys.stderr) exit(1)def surface_to_tensor(surface: nvc.Surface) -> torch.Tensor: """ Converts planar rgb surface to cuda float tensor. """ if surface.Format() != nvc.PixelFormat.RGB_PLANAR: raise RuntimeError("Surface shall be of RGB_PLANAR pixel format") surf_plane = surface.PlanePtr() img_tensor = pnvc.DptrToTensor( surf_plane.GpuMem(), surf_plane.Width(), surf_plane.Height(), surf_plane.Pitch(), surf_plane.ElemSize(), ) if img_tensor is None: raise RuntimeError("Can not export to tensor.") img_tensor.resize_(3, int(surf_plane.Height() / 3), surf_plane.Width()) img_tensor = img_tensor.type(dtype=torch.cuda.FloatTensor) img_tensor = torch.divide(img_tensor, 255.0) img_tensor = torch.clamp(img_tensor, 0.0, 1.0) return img_tensordef get_stream_params(url: str): cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", url, ] proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) stdout = proc.communicate()[0] bio = BytesIO(stdout) json_out = json.load(bio) params = {} if not "streams" in json_out: return {} for stream in json_out["streams"]: if stream["codec_type"] == "video": params["width"] = stream["width"] params["height"] = stream["height"] params["framerate"] = float(eval(stream["avg_frame_rate"])) codec_name = stream["codec_name"] is_h264 = True if codec_name == "h264" else False is_hevc = True if codec_name == "hevc" else False if not is_h264 and not is_hevc: raise ValueError( "Unsupported codec: " + codec_name + ". Only H.264 and HEVC are supported in this sample." ) else: params["codec"] = ( nvc.CudaVideoCodec.H264 if is_h264 else nvc.CudaVideoCodec.HEVC ) pix_fmt = stream["pix_fmt"] is_yuv420 = pix_fmt == "yuv420p" is_yuv444 = pix_fmt == "yuv444p" # YUVJ420P and YUVJ444P are deprecated but still wide spread, so handle # them as well. They also indicate JPEG color range. is_yuvj420 = pix_fmt == "yuvj420p" is_yuvj444 = pix_fmt == "yuvj444p" if is_yuvj420: is_yuv420 = True params["color_range"] = nvc.ColorRange.JPEG if is_yuvj444: is_yuv444 = True params["color_range"] = nvc.ColorRange.JPEG if not is_yuv420 and not is_yuv444: raise ValueError( "Unsupported pixel format: " + pix_fmt + ". Only YUV420 and YUV444 are supported in this sample." ) else: params["format"] = ( nvc.PixelFormat.NV12 if is_yuv420 else nvc.PixelFormat.YUV444 ) # Color range default option. We may have set when parsing # pixel format, so check first. if "color_range" not in params: params["color_range"] = nvc.ColorRange.MPEG # Check actual value. if "color_range" in stream: color_range = stream["color_range"] if color_range == "pc" or color_range == "jpeg": params["color_range"] = nvc.ColorRange.JPEG # Color space default option: params["color_space"] = nvc.ColorSpace.BT_601 # Check actual value. if "color_space" in stream: color_space = stream["color_space"] if color_space == "bt709": params["color_space"] = nvc.ColorSpace.BT_709 return params return {}def decode_rtsp_stream(thread_index: int, url: str, gpu_id: int): params = get_stream_params(url) if not len(params): raise ValueError("Can not get " + url + " streams params") w = params["width"] h = params["height"] f = params["format"] c = params["codec"] framerate = params["framerate"] g = gpu_id if nvc.CudaVideoCodec.H264 == c: codec_name = "h264" elif nvc.CudaVideoCodec.HEVC == c: codec_name = "hevc" bsf_name = codec_name + "_mp4toannexb,dump_extra=all" cmd = [ "ffmpeg", "-hide_banner", "-i", url, "-c:v", "copy", "-bsf:v", bsf_name, "-f", codec_name, "pipe:1", ] proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) nvdec = nvc.PyNvDecoder(w, h, f, c, g) read_size = 4096 rt = 0 fd = 0 t0 = time.time() print("running stream") # nv_cvt = nvc.PySurfaceConverter( # w, h, self.nvYuv.Format(), nvc.PixelFormat.RGB, 0 # ) nv_cvt = nvc.PySurfaceConverter(w, h, nvc.PixelFormat.NV12, nvc.PixelFormat.BGR, g) cc_ctx = nvc.ColorspaceConversionContext( params["color_space"], params["color_range"] ) nv_down = nvc.PySurfaceDownloader( w, h, nv_cvt.Format(), g ) data = np.zeros((w * h, 3), np.uint8) empty_count = 0 while True: t1=time.time() if not read_size: read_size = int(rt / fd) rt = read_size fd = 1 bits = proc.stdout.read(read_size) if not len(bits): print("Can't read data from pipe") break else: rt += len(bits) enc_packet = np.frombuffer(buffer=bits, dtype=np.uint8) pkt_data = nvc.PacketData() try: surf = nvdec.DecodeSurfaceFromPacket(enc_packet, pkt_data) if not surf.Empty(): fd += 1 if pkt_data.bsl < read_size: read_size = pkt_data.bsl cvtSurface = nv_cvt.Execute(surf, cc_ctx) success = nv_down.DownloadSingleSurface(cvtSurface, data) if success: new_data = data.reshape((h, w, 3)) cv2.imshow(str(thread_index), new_data) cv2.waitKey(1) else: empty_count += 1 if empty_count > framerate * 30: print("surf is Empty too many times > "+str(framerate * 30)) nvdec = nvc.PyNvDecoder(w, h, f, c, g) empty_count = 0 except nvc.HwResetException: nvdec = nvc.PyNvDecoder(w, h, f, c, g) empty_count = 0 continue t2 = time.time() # print((t2-t1)*1000)if __name__ == "__main__": add_cuda_dll_directories() print("This sample decodes multiple videos in parallel on given GPU.") print("It doesn't do anything beside decoding, output isn't saved.") print("Usage: SampleDecodeRTSP.py $gpu_id $url1 ... $urlN .") if len(sys.argv) < 2: print("Provide gpu ID and input URL(s).") exit(1) gpuID = int(sys.argv[1]) urls = sys.argv[2:] pool = ThreadPoolExecutor(max_workers=len(urls)) futures = [] index = 0 for url in urls: future = pool.submit(decode_rtsp_stream, index, url, gpuID) futures.append(future) index += 1 pool.shutdown() for future in futures: future.result()
运行脚本
python rtsp_decoder.py 0 rtsp://admin:a1234567@10.10.16.26:554/Streaming/Channels/101?transportmode=multicast
VPF库安装
windows11编译VideoProcessingFramework库_random_2011的博客-CSDN博客
来源地址:https://blog.csdn.net/q317379184/article/details/132174203
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341