相关推荐recommended
超详细||YOLOv8基础教程(环境搭建,训练,测试,部署看一篇就够)(在推理视频中添加FPS信息)
作者:mmseoamin日期:2024-01-25

一、YOLOv8环境搭建

这篇文章将跳过基础的深度学习环境的搭建,如果没有完成的可以看我的这篇博客:超详细||深度学习环境搭建记录cuda+anaconda+pytorch+pycharm-CSDN博客

1. 在github上下载源码:

ONNX > OpenVINO > CoreML > TFLite">GitHub - ultralytics/ultralytics: NEW - YOLOv8 🚀 in PyTorch > ONNX > OpenVINO > CoreML > TFLite超详细||YOLOv8基础教程(环境搭建,训练,测试,部署看一篇就够)(在推理视频中添加FPS信息),第1张

2. 安装ultralytics(YOLOv8改名为ultralytics)

这里有两种方式安装ultralytics

  • 直接使用CLI
    pip install ultralytics
    
    • 使用requirements.txt安装,这种方法是在上面下载的源码处安装,方便对yolov8进行改进
      cd ultralytics
      pip install -r requirements.txt
      

      3. 安装wandb

      pip install wandb

      登录自己的wandb账号

      wandb login

      二、开始训练

      1. 构建数据集

      数据集要严格按照下面的目录格式,image的格式为jpg,label的格式为txt,对应的image和label的名字要一致

      Dataset
      	└─images
      	   └─train
             └─val
      	└─labels
      	   └─train
             └─val

      2. 创建一个dataset.yaml文件

      更换自己的image train和image val的地址,labels地址不用,它会自动索引

      将classes改为自己的类别,从0开始

      path: ../datasets/coco128  # dataset root dir
      train: images/train2017  # train images (relative to 'path') 128 images
      val: images/train2017  # val images (relative to 'path') 128 images
      test:  # test images (optional)
      # Classes
      names:
        0: person
        1: bicycle
        2: car
        3: motorcycle
        4: airplane
        5: bus
        6: train
        7: truck
        8: boat

      3. 新建一个train.py,修改相关参数,运行即可开始训练

      from ultralytics import YOLO
      if __name__ == '__main__':
          # Load a model
          model = YOLO(r'\ultralytics\detection\yolov8n\yolov8n.yaml')  # 不使用预训练权重训练
          # model = YOLO(r'yolov8p.yaml').load("yolov8n.pt")  # 使用预训练权重训练
          # Trainparameters ----------------------------------------------------------------------------------------------
          model.train(
              data=r'\ultralytics\detection\dataset\appledata.yaml',
              epochs= 30 , # (int) number of epochs to train for
              patience= 50 , # (int) epochs to wait for no observable improvement for early stopping of training
              batch= 8 , # (int) number of images per batch (-1 for AutoBatch)
              imgsz= 320 , # (int) size of input images as integer or w,h
              save= True , # (bool) save train checkpoints and predict results
              save_period= -1, # (int) Save checkpoint every x epochs (disabled if < 1)
              cache= False , # (bool) True/ram, disk or False. Use cache for data loading
              device= 0 , # (int | str | list, optional) device to run on, i.e. cuda device=0 or device=0,1,2,3 or device=cpu
              workers= 16 , # (int) number of worker threads for data loading (per RANK if DDP)
              project= 'result', # (str, optional) project name
              name= 'yolov8n' ,# (str, optional) experiment name, results saved to 'project/name' directory
              exist_ok= False , # (bool) whether to overwrite existing experiment
              pretrained= False , # (bool | str) whether to use a pretrained model (bool) or a model to load weights from (str)
              optimizer= 'SGD',  # (str) optimizer to use, choices=[SGD, Adam, Adamax, AdamW, NAdam, RAdam, RMSProp, auto]
              verbose= True ,# (bool) whether to print verbose output
              seed= 0 , # (int) random seed for reproducibility
              deterministic= True , # (bool) whether to enable deterministic mode
              single_cls= True , # (bool) train multi-class data as single-class
              rect= False  ,# (bool) rectangular training if mode='train' or rectangular validation if mode='val'
              cos_lr= False , # (bool) use cosine learning rate scheduler
              close_mosaic= 0,  # (int) disable mosaic augmentation for final epochs
              resume= False , # (bool) resume training from last checkpoint
              amp= False,  # (bool) Automatic Mixed Precision (AMP) training, choices=[True, False], True runs AMP check
              fraction= 1.0 , # (float) dataset fraction to train on (default is 1.0, all images in train set)
              profile= False,  # (bool) profile ONNX and TensorRT speeds during training for loggers
              # Segmentation
              overlap_mask= True , # (bool) masks should overlap during training (segment train only)
              mask_ratio= 4,  # (int) mask downsample ratio (segment train only)
              # Classification
              dropout= 0.0,  # (float) use dropout regularization (classify train only)
              # Hyperparameters ----------------------------------------------------------------------------------------------
              lr0=0.01,  # (float) initial learning rate (i.e. SGD=1E-2, Adam=1E-3)
              lrf=0.01,  # (float) final learning rate (lr0 * lrf)
              momentum=0.937,  # (float) SGD momentum/Adam beta1
              weight_decay=0.0005,  # (float) optimizer weight decay 5e-4
              warmup_epochs=3.0,  # (float) warmup epochs (fractions ok)
              warmup_momentum=0.8,  # (float) warmup initial momentum
              warmup_bias_lr=0.1,  # (float) warmup initial bias lr
              box=7.5,  # (float) box loss gain
              cls=0.5,  # (float) cls loss gain (scale with pixels)
              dfl=1.5,  # (float) dfl loss gain
              pose=12.0,  # (float) pose loss gain
              kobj=1.0,  # (float) keypoint obj loss gain
              label_smoothing=0.0,  # (float) label smoothing (fraction)
              nbs=64,  # (int) nominal batch size
              hsv_h=0.015,  # (float) image HSV-Hue augmentation (fraction)
              hsv_s=0.7,  # (float) image HSV-Saturation augmentation (fraction)
              hsv_v=0.4,  # (float) image HSV-Value augmentation (fraction)
              degrees=0.0,  # (float) image rotation (+/- deg)
              translate=0.1,  # (float) image translation (+/- fraction)
              scale=0.5,  # (float) image scale (+/- gain)
              shear=0.0,  # (float) image shear (+/- deg)
              perspective=0.0,  # (float) image perspective (+/- fraction), range 0-0.001
              flipud=0.0,  # (float) image flip up-down (probability)
              fliplr=0.5,  # (float) image flip left-right (probability)
              mosaic=1.0,  # (float) image mosaic (probability)
              mixup=0.0,  # (float) image mixup (probability)
              copy_paste=0.0,  # (float) segment copy-paste (probability)
                      )
      

      三、测试与验证

      1. 新建一个test.py, 这个可以打印网路信息,参数量以及FLOPs,还有每一层网络的信息

      from ultralytics import YOLO
      if __name__ == '__main__':
          # Load a model
          model = YOLO(r'\ultralytics\detection\yolov8n\yolov8n.yaml')  # build a new model from YAML
          model.info()

      2. 新建一个val.py,这个可以打印模型在验证集上的结果,如mAP,推理速度等

      from ultralytics import YOLO
      if __name__ == '__main__':
          # Load a model
          model = YOLO(r'\ultralytics\detection\yolov8n\result\yolov8n4\weights\best.pt')  # build a new model from YAML
          # Validate the model
          model.val(
              val=True,  # (bool) validate/test during training
              data=r'\ultralytics\detection\dataset\appledata.yaml',
              split='val',  # (str) dataset split to use for validation, i.e. 'val', 'test' or 'train'
              batch=1,  # 测试速度时一般设置为 1 ,设置越大速度越快。   (int) number of images per batch (-1 for AutoBatch)
              imgsz=320,  # (int) size of input images as integer or w,h
              device=0,  # (int | str | list, optional) device to run on, i.e. cuda device=0 or device=0,1,2,3 or device=cpu
              workers=8,  # (int) number of worker threads for data loading (per RANK if DDP)
              save_json=False,  # (bool) save results to JSON file
              save_hybrid=False,  # (bool) save hybrid version of labels (labels + additional predictions)
              conf=0.001,  # (float, optional) object confidence threshold for detection (default 0.25 predict, 0.001 val)
              iou=0.7,  # (float) intersection over union (IoU) threshold for NMS
              project='val',  # (str, optional) project name
              name='',  # (str, optional) experiment name, results saved to 'project/name' directory
              max_det=300,  # (int) maximum number of detections per image
              half=True,  # (bool) use half precision (FP16)
              dnn=False,  # (bool) use OpenCV DNN for ONNX inference
              plots=True,  # (bool) save plots during train/val
          )

      3. 新建一个predict.py,这个可以根据训练好的权重文件进行推理,权重文件格式支持pt,onnx等,支持图片,视频,摄像头等进行推理

      from ultralytics import YOLO
      if __name__ == '__main__':
          # Load a model
          model = YOLO(r'\deploy\yolov8n.pt')  # pretrained YOLOv8n model
          model.predict(
              source=r'\deploy\output_video.mp4',
              save=False,  # save predict results
              imgsz=320,  # (int) size of input images as integer or w,h
              conf=0.25,  # object confidence threshold for detection (default 0.25 predict, 0.001 val)
              iou=0.7,  # # intersection over union (IoU) threshold for NMS
              show=True,  # show results if possible
              project='',  # (str, optional) project name
              name='',  # (str, optional) experiment name, results saved to 'project/name' directory
              save_txt=False,  # save results as .txt file
              save_conf=True,  # save results with confidence scores
              save_crop=False,  # save cropped images with results
              show_labels=True,  # show object labels in plots
              show_conf=True,  # show object confidence scores in plots
              vid_stride=1,  # video frame-rate stride
              line_width=1,  # bounding box thickness (pixels)
              visualize=False,  # visualize model features
              augment=False,  # apply image augmentation to prediction sources
              agnostic_nms=False,  # class-agnostic NMS
              retina_masks=False,  # use high-resolution segmentation masks
              boxes=True,  # Show boxes in segmentation predictions
          )

      四、onnx模型部署

      1. 新建一个export.py,将pt文件转化为onnx文件

      from ultralytics import YOLO
      # Load a model
      model = YOLO('/ultralytics/weight file/yolov8n.pt')  # load a custom trained model
      # Export the model
      model.export(format='onnx')

      2. 将onnx文件添加到之前提到的predict.py中,进行推理。

      3. 如果想在推理的视频中添加FPS信息,请把ultralytics/engine/predictor.py替换为下面的代码。

      # Ultralytics YOLO 🚀, AGPL-3.0 license
      """
      Run prediction on images, videos, directories, globs, YouTube, webcam, streams, etc.
      Usage - sources:
          $ yolo mode=predict model=yolov8n.pt source=0                               # webcam
       img.jpg                         # image
       vid.mp4                         # video
       screen                          # screenshot
       path/                           # directory
       list.txt                        # list of images
       list.streams                    # list of streams
       'path/*.jpg'                    # glob
       'https://youtu.be/Zgi9g1ksQHc'  # YouTube
       'rtsp://example.com/media.mp4'  # RTSP, RTMP, HTTP stream
      Usage - formats:
          $ yolo mode=predict model=yolov8n.pt                 # PyTorch
                                    yolov8n.torchscript        # TorchScript
                                    yolov8n.onnx               # ONNX Runtime or OpenCV DNN with dnn=True
                                    yolov8n_openvino_model     # OpenVINO
                                    yolov8n.engine             # TensorRT
                                    yolov8n.mlmodel            # CoreML (macOS-only)
                                    yolov8n_saved_model        # TensorFlow SavedModel
                                    yolov8n.pb                 # TensorFlow GraphDef
                                    yolov8n.tflite             # TensorFlow Lite
                                    yolov8n_edgetpu.tflite     # TensorFlow Edge TPU
                                    yolov8n_paddle_model       # PaddlePaddle
      """
      import platform
      from pathlib import Path
      import cv2
      import numpy as np
      import torch
      from ultralytics.cfg import get_cfg
      from ultralytics.data import load_inference_source
      from ultralytics.data.augment import LetterBox, classify_transforms
      from ultralytics.nn.autobackend import AutoBackend
      from ultralytics.utils import DEFAULT_CFG, LOGGER, MACOS, SETTINGS, WINDOWS, callbacks, colorstr, ops
      from ultralytics.utils.checks import check_imgsz, check_imshow
      from ultralytics.utils.files import increment_path
      from ultralytics.utils.torch_utils import select_device, smart_inference_mode
      STREAM_WARNING = """
          WARNING ⚠️ stream/video/webcam/dir predict source will accumulate results in RAM unless `stream=True` is passed,
          causing potential out-of-memory errors for large sources or long-running streams/videos.
          Usage:
              results = model(source=..., stream=True)  # generator of Results objects
              for r in results:
                  boxes = r.boxes  # Boxes object for bbox outputs
                  masks = r.masks  # Masks object for segment masks outputs
                  probs = r.probs  # Class probabilities for classification outputs
      """
      class BasePredictor:
          """
          BasePredictor
          A base class for creating predictors.
          Attributes:
              args (SimpleNamespace): Configuration for the predictor.
              save_dir (Path): Directory to save results.
              done_warmup (bool): Whether the predictor has finished setup.
              model (nn.Module): Model used for prediction.
              data (dict): Data configuration.
              device (torch.device): Device used for prediction.
              dataset (Dataset): Dataset used for prediction.
              vid_path (str): Path to video file.
              vid_writer (cv2.VideoWriter): Video writer for saving video output.
              data_path (str): Path to data.
          """
          def __init__(self, cfg=DEFAULT_CFG, overrides=None, _callbacks=None):
              """
              Initializes the BasePredictor class.
              Args:
                  cfg (str, optional): Path to a configuration file. Defaults to DEFAULT_CFG.
                  overrides (dict, optional): Configuration overrides. Defaults to None.
              """
              self.args = get_cfg(cfg, overrides)
              self.save_dir = self.get_save_dir()
              if self.args.conf is None:
                  self.args.conf = 0.25  # default conf=0.25
              self.done_warmup = False
              if self.args.show:
                  self.args.show = check_imshow(warn=True)
              # Usable if setup is done
              self.model = None
              self.data = self.args.data  # data_dict
              self.imgsz = None
              self.device = None
              self.dataset = None
              self.vid_path, self.vid_writer = None, None
              self.plotted_img = None
              self.data_path = None
              self.source_type = None
              self.batch = None
              self.results = None
              self.transforms = None
              self.callbacks = _callbacks or callbacks.get_default_callbacks()
              self.txt_path = None
              callbacks.add_integration_callbacks(self)
          def get_save_dir(self):
              project = self.args.project or Path(SETTINGS['runs_dir']) / self.args.task
              name = self.args.name or f'{self.args.mode}'
              return increment_path(Path(project) / name, exist_ok=self.args.exist_ok)
          def preprocess(self, im):
              """Prepares input image before inference.
              Args:
                  im (torch.Tensor | List(np.ndarray)): BCHW for tensor, [(HWC) x B] for list.
              """
              not_tensor = not isinstance(im, torch.Tensor)
              if not_tensor:
                  im = np.stack(self.pre_transform(im))
                  im = im[..., ::-1].transpose((0, 3, 1, 2))  # BGR to RGB, BHWC to BCHW, (n, 3, h, w)
                  im = np.ascontiguousarray(im)  # contiguous
                  im = torch.from_numpy(im)
              img = im.to(self.device)
              img = img.half() if self.model.fp16 else img.float()  # uint8 to fp16/32
              if not_tensor:
                  img /= 255  # 0 - 255 to 0.0 - 1.0
              return img
          def inference(self, im, *args, **kwargs):
              visualize = increment_path(self.save_dir / Path(self.batch[0][0]).stem,
                                         mkdir=True) if self.args.visualize and (not self.source_type.tensor) else False
              return self.model(im, augment=self.args.augment, visualize=visualize)
          def pre_transform(self, im):
              """Pre-transform input image before inference.
              Args:
                  im (List(np.ndarray)): (N, 3, h, w) for tensor, [(h, w, 3) x N] for list.
              Return: A list of transformed imgs.
              """
              same_shapes = all(x.shape == im[0].shape for x in im)
              auto = same_shapes and self.model.pt
              return [LetterBox(self.imgsz, auto=auto, stride=self.model.stride)(image=x) for x in im]
          def write_results(self, idx, results, batch):
              """Write inference results to a file or directory."""
              p, im, _ = batch
              log_string = ''
              if len(im.shape) == 3:
                  im = im[None]  # expand for batch dim
              if self.source_type.webcam or self.source_type.from_img or self.source_type.tensor:  # batch_size >= 1
                  log_string += f'{idx}: '
                  frame = self.dataset.count
              else:
                  frame = getattr(self.dataset, 'frame', 0)
              self.data_path = p
              self.txt_path = str(self.save_dir / 'labels' / p.stem) + ('' if self.dataset.mode == 'image' else f'_{frame}')
              log_string += '%gx%g ' % im.shape[2:]  # print string
              result = results[idx]
              log_string += result.verbose()
              if self.args.save or self.args.show:  # Add bbox to image
                  plot_args = {
                      'line_width': self.args.line_width,
                      'boxes': self.args.boxes,
                      'conf': self.args.show_conf,
                      'labels': self.args.show_labels}
                  if not self.args.retina_masks:
                      plot_args['im_gpu'] = im[idx]
                  self.plotted_img = result.plot(**plot_args)
              # Write
              if self.args.save_txt:
                  result.save_txt(f'{self.txt_path}.txt', save_conf=self.args.save_conf)
              if self.args.save_crop:
                  result.save_crop(save_dir=self.save_dir / 'crops',
                                   file_name=self.data_path.stem + ('' if self.dataset.mode == 'image' else f'_{frame}'))
              return log_string
          def postprocess(self, preds, img, orig_imgs):
              """Post-processes predictions for an image and returns them."""
              return preds
          def __call__(self, source=None, model=None, stream=False, *args, **kwargs):
              """Performs inference on an image or stream."""
              self.stream = stream
              if stream:
                  return self.stream_inference(source, model, *args, **kwargs)
              else:
                  return list(self.stream_inference(source, model, *args, **kwargs))  # merge list of Result into one
          def predict_cli(self, source=None, model=None):
              """Method used for CLI prediction. It uses always generator as outputs as not required by CLI mode."""
              gen = self.stream_inference(source, model)
              for _ in gen:  # running CLI inference without accumulating any outputs (do not modify)
                  pass
          def setup_source(self, source):
              """Sets up source and inference mode."""
              self.imgsz = check_imgsz(self.args.imgsz, stride=self.model.stride, min_dim=2)  # check image size
              self.transforms = getattr(self.model.model, 'transforms', classify_transforms(
                  self.imgsz[0])) if self.args.task == 'classify' else None
              self.dataset = load_inference_source(source=source, imgsz=self.imgsz, vid_stride=self.args.vid_stride)
              self.source_type = self.dataset.source_type
              if not getattr(self, 'stream', True) and (self.dataset.mode == 'stream' or  # streams
         len(self.dataset) > 1000 or  # images
         any(getattr(self.dataset, 'video_flag', [False]))):  # videos
                  LOGGER.warning(STREAM_WARNING)
              self.vid_path, self.vid_writer = [None] * self.dataset.bs, [None] * self.dataset.bs
          @smart_inference_mode()
          def stream_inference(self, source=None, model=None, *args, **kwargs):
              """Streams real-time inference on camera feed and saves results to file."""
              if self.args.verbose:
                  LOGGER.info('')
              # Setup model
              if not self.model:
                  self.setup_model(model)
              # Setup source every time predict is called
              self.setup_source(source if source is not None else self.args.source)
              # Check if save_dir/ label file exists
              if self.args.save or self.args.save_txt:
                  (self.save_dir / 'labels' if self.args.save_txt else self.save_dir).mkdir(parents=True, exist_ok=True)
              # Warmup model
              if not self.done_warmup:
                  self.model.warmup(imgsz=(1 if self.model.pt or self.model.triton else self.dataset.bs, 3, *self.imgsz))
                  self.done_warmup = True
              self.seen, self.windows, self.batch, self.profilers = 0, [], None, (ops.Profile(), ops.Profile(), ops.Profile())
              self.run_callbacks('on_predict_start')
              for batch in self.dataset:
                  self.run_callbacks('on_predict_batch_start')
                  self.batch = batch
                  path, im0s, vid_cap, s = batch
                  # Preprocess
                  with self.profilers[0]:
                      im = self.preprocess(im0s)
                  # Inference
                  with self.profilers[1]:
                      preds = self.inference(im, *args, **kwargs)
                  # Postprocess
                  with self.profilers[2]:
                      self.results = self.postprocess(preds, im, im0s)
                  self.run_callbacks('on_predict_postprocess_end')
                  # Visualize, save, write results
                  n = len(im0s)
                  for i in range(n):
                      self.seen += 1
                      self.results[i].speed = {
                          'preprocess': self.profilers[0].dt * 1E3 / n,
                          'inference': self.profilers[1].dt * 1E3 / n,
                          'postprocess': self.profilers[2].dt * 1E3 / n}
                      p, im0 = path[i], None if self.source_type.tensor else im0s[i].copy()
                      p = Path(p)
                      if self.args.verbose or self.args.save or self.args.save_txt or self.args.show:
                          s += self.write_results(i, self.results, (p, im, im0))
                      if self.args.save or self.args.save_txt:
                          self.results[i].save_dir = self.save_dir.__str__()
                      if self.args.show and self.plotted_img is not None:
                          self.show(p)
                      if self.args.save and self.plotted_img is not None:
                          self.save_preds(vid_cap, i, str(self.save_dir / p.name))
                  self.run_callbacks('on_predict_batch_end')
                  yield from self.results
                  # Print time (inference-not only)
                  if self.args.verbose:
                      LOGGER.info(f'{s}{(self.profilers[0].dt+self.profilers[1].dt+self.profilers[2].dt) * 1E3:.2f}ms')
              # Release assets
              if isinstance(self.vid_writer[-1], cv2.VideoWriter):
                  self.vid_writer[-1].release()  # release final video writer
              # Print results
              if self.args.verbose and self.seen:
                  t = tuple(x.t / self.seen * 1E3 for x in self.profilers)  # speeds per image
                  LOGGER.info(f'Speed: %.2fms preprocess, %.2fms inference, %.2fms postprocess per image at shape '
                              f'{(1, 3, *im.shape[2:])}' % t)
              if self.args.save or self.args.save_txt or self.args.save_crop:
                  nl = len(list(self.save_dir.glob('labels/*.txt')))  # number of labels
                  s = f"\n{nl} label{'s' * (nl > 1)} saved to {self.save_dir / 'labels'}" if self.args.save_txt else ''
                  LOGGER.info(f"Results saved to {colorstr('bold', self.save_dir)}{s}")
              self.run_callbacks('on_predict_end')
          def setup_model(self, model, verbose=True):
              """Initialize YOLO model with given parameters and set it to evaluation mode."""
              self.model = AutoBackend(model or self.args.model,
                                       device=select_device(self.args.device, verbose=verbose),
                                       dnn=self.args.dnn,
                                       data=self.args.data,
                                       fp16=self.args.half,
                                       fuse=True,
                                       verbose=verbose)
              self.device = self.model.device  # update device
              self.args.half = self.model.fp16  # update half
              self.model.eval()
          def show(self, p):
              """Display an image in a window using OpenCV imshow()."""
              
              im0 = self.plotted_img
              
              #--------------------后添加-----------------------------
              str_FPS = "FPS: %.2f" % (1. / (self.profilers[0].dt + self.profilers[1].dt + self.profilers[2].dt))
              cv2.putText(im0, str_FPS, (50, 50), cv2.FONT_HERSHEY_COMPLEX, 1, (0, 255, 0), 3)
              # --------------------后添加-----------------------------
              
              if platform.system() == 'Linux' and p not in self.windows:
                  self.windows.append(p)
                  cv2.namedWindow(str(p), cv2.WINDOW_NORMAL | cv2.WINDOW_KEEPRATIO)  # allow window resize (Linux)
                  cv2.resizeWindow(str(p), im0.shape[1], im0.shape[0])
              cv2.imshow(str(p), im0)
              cv2.waitKey(500 if self.batch[3].startswith('image') else 1)  # 1 millisecond
          def save_preds(self, vid_cap, idx, save_path):
              """Save video predictions as mp4 at specified path."""
              im0 = self.plotted_img
              # Save imgs
              if self.dataset.mode == 'image':
                  cv2.imwrite(save_path, im0)
              else:  # 'video' or 'stream'
                  if self.vid_path[idx] != save_path:  # new video
                      self.vid_path[idx] = save_path
                      if isinstance(self.vid_writer[idx], cv2.VideoWriter):
                          self.vid_writer[idx].release()  # release previous video writer
                      if vid_cap:  # video
                          fps = int(vid_cap.get(cv2.CAP_PROP_FPS))  # integer required, floats produce error in MP4 codec
                          w = int(vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                          h = int(vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
                      else:  # stream
                          fps, w, h = 30, im0.shape[1], im0.shape[0]
                      suffix = '.mp4' if MACOS else '.avi' if WINDOWS else '.avi'
                      fourcc = 'avc1' if MACOS else 'WMV2' if WINDOWS else 'MJPG'
                      save_path = str(Path(save_path).with_suffix(suffix))
                      self.vid_writer[idx] = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*fourcc), fps, (w, h))
                  self.vid_writer[idx].write(im0)
          def run_callbacks(self, event: str):
              """Runs all registered callbacks for a specific event."""
              for callback in self.callbacks.get(event, []):
                  callback(self)
          def add_callback(self, event: str, func):
              """
              Add callback
              """
              self.callbacks[event].append(func)