The Quick Version

Video analytics means extracting structured data from video: how many people crossed a line, which zones are occupied, how long objects stay in an area. The pipeline is always the same — read frames, detect objects, track them across frames, and apply business logic.

1
pip install opencv-python ultralytics numpy
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import cv2
from ultralytics import YOLO

model = YOLO("yolov8n.pt")  # nano model for speed
cap = cv2.VideoCapture("parking_lot.mp4")  # or 0 for webcam, or rtsp:// URL

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    results = model.track(frame, persist=True, verbose=False)
    annotated = results[0].plot()

    # Count objects by class
    boxes = results[0].boxes
    if boxes is not None and boxes.cls is not None:
        classes = boxes.cls.cpu().numpy()
        people = sum(1 for c in classes if int(c) == 0)  # class 0 = person
        cars = sum(1 for c in classes if int(c) == 2)     # class 2 = car
        cv2.putText(annotated, f"People: {people} Cars: {cars}",
                    (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

    cv2.imshow("Analytics", annotated)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()

That gives you real-time object detection with tracking IDs and a live count overlay. The persist=True flag enables ByteTrack across frames, so each object gets a consistent ID.

Zone-Based Monitoring

Counting objects in specific regions — like parking spaces, doorways, or restricted areas — requires defining polygonal zones and checking if detected objects fall inside them.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import numpy as np
from dataclasses import dataclass, field

@dataclass
class Zone:
    name: str
    polygon: np.ndarray  # Nx2 array of points
    count: int = 0
    object_ids: set = field(default_factory=set)

# Define zones as polygon vertices (x, y)
zones = [
    Zone("Entrance", np.array([[100, 400], [300, 400], [300, 600], [100, 600]])),
    Zone("Parking A", np.array([[400, 200], [800, 200], [800, 500], [400, 500]])),
]

def point_in_zone(point: tuple, zone: Zone) -> bool:
    """Check if a point is inside a polygon using cv2."""
    result = cv2.pointPolygonTest(zone.polygon.astype(np.float32), point, False)
    return result >= 0

def get_center(box) -> tuple:
    """Get bottom-center of bounding box (feet position for people)."""
    x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
    return (int((x1 + x2) / 2), int(y2))

def process_frame(results, zones: list[Zone]) -> list[Zone]:
    """Update zone counts based on detections."""
    # Reset counts each frame
    for zone in zones:
        zone.count = 0
        zone.object_ids = set()

    boxes = results[0].boxes
    if boxes is None or boxes.id is None:
        return zones

    for box in boxes:
        center = get_center(box)
        track_id = int(box.id[0].cpu().numpy())

        for zone in zones:
            if point_in_zone(center, zone):
                zone.count += 1
                zone.object_ids.add(track_id)

    return zones

def draw_zones(frame, zones: list[Zone]):
    """Draw zone overlays with counts."""
    for zone in zones:
        color = (0, 255, 0) if zone.count == 0 else (0, 165, 255)
        cv2.polylines(frame, [zone.polygon], True, color, 2)
        centroid = zone.polygon.mean(axis=0).astype(int)
        cv2.putText(frame, f"{zone.name}: {zone.count}",
                    tuple(centroid), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
    return frame

This gives you per-zone occupancy counts with visual overlays. Orange zones have objects in them, green zones are empty.

Line Crossing Counter

Count objects crossing a boundary — useful for footfall counting, traffic monitoring, or throughput measurement.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@dataclass
class LineCrossing:
    """Count objects crossing a line defined by two points."""
    p1: tuple
    p2: tuple
    in_count: int = 0
    out_count: int = 0
    _prev_positions: dict = field(default_factory=dict)

    def update(self, track_id: int, center: tuple):
        """Track position relative to line and count crossings."""
        # Which side of the line is the point on?
        side = ((self.p2[0] - self.p1[0]) * (center[1] - self.p1[1]) -
                (self.p2[1] - self.p1[1]) * (center[0] - self.p1[0]))
        current_side = "left" if side > 0 else "right"

        if track_id in self._prev_positions:
            prev_side = self._prev_positions[track_id]
            if prev_side == "left" and current_side == "right":
                self.in_count += 1
            elif prev_side == "right" and current_side == "left":
                self.out_count += 1

        self._prev_positions[track_id] = current_side

# Usage in the main loop
counter = LineCrossing(p1=(0, 400), p2=(800, 400))

cap = cv2.VideoCapture("entrance.mp4")
model = YOLO("yolov8n.pt")

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    results = model.track(frame, persist=True, classes=[0], verbose=False)  # track people only
    boxes = results[0].boxes

    if boxes is not None and boxes.id is not None:
        for box in boxes:
            center = get_center(box)
            track_id = int(box.id[0].cpu().numpy())
            counter.update(track_id, center)

    # Draw the line and counts
    cv2.line(frame, counter.p1, counter.p2, (255, 0, 0), 2)
    cv2.putText(frame, f"In: {counter.in_count} Out: {counter.out_count}",
                (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

    cv2.imshow("Line Counter", frame)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap.release()

The classes=[0] filter tells YOLO to only detect people, skipping cars, bags, and other objects. This reduces false crossings from irrelevant detections.

Processing RTSP Streams

Real deployments use IP cameras with RTSP streams. OpenCV handles these natively, but you need to deal with connection drops and frame buffering.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import threading
import time

class RTSPStream:
    """Threaded RTSP reader that always has the latest frame."""
    def __init__(self, url: str):
        self.url = url
        self.frame = None
        self.running = False
        self.thread = None

    def start(self):
        self.running = True
        self.thread = threading.Thread(target=self._read_loop, daemon=True)
        self.thread.start()

    def _read_loop(self):
        while self.running:
            cap = cv2.VideoCapture(self.url)
            while self.running and cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                self.frame = frame
            cap.release()
            time.sleep(2)  # reconnect delay

    def read(self):
        return self.frame is not None, self.frame

    def stop(self):
        self.running = False

# Usage
stream = RTSPStream("rtsp://admin:[email protected]:554/stream1")
stream.start()
time.sleep(2)  # wait for first frame

while True:
    ret, frame = stream.read()
    if not ret:
        continue
    # ... run detection on frame ...

The threaded reader solves two problems: it prevents frame buffer buildup (you always get the latest frame, not a queued old one), and it handles disconnections by automatically reconnecting.

Common Errors and Fixes

Low FPS with large models

Switch from yolov8s.pt to yolov8n.pt (nano) for 3-5x speedup with moderate accuracy loss. Or process every Nth frame:

1
2
3
4
5
6
7
frame_count = 0
while cap.isOpened():
    ret, frame = cap.read()
    frame_count += 1
    if frame_count % 3 != 0:  # process every 3rd frame
        continue
    # ... detection code ...

Tracking IDs keep resetting

Make sure you’re passing persist=True to model.track(). Without it, the tracker resets every frame and assigns new IDs.

RTSP stream lags behind real-time

OpenCV buffers RTSP frames internally. Use the threaded reader above, or set cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) to minimize buffering.

Memory grows over time

The tracker’s internal state grows as it accumulates object histories. For long-running deployments, periodically reset the tracker with model.reset() (once per hour or when the scene is empty).

cv2.imshow doesn’t work on a headless server

Drop the display code entirely and write results to a file, database, or message queue instead. For debugging, save annotated frames periodically: cv2.imwrite(f"debug_{frame_count}.jpg", annotated).