The Quick Version#
Video analytics means extracting structured data from video: how many people crossed a line, which zones are occupied, how long objects stay in an area. The pipeline is always the same — read frames, detect objects, track them across frames, and apply business logic.
1
| pip install opencv-python ultralytics numpy
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| import cv2
from ultralytics import YOLO
model = YOLO("yolov8n.pt") # nano model for speed
cap = cv2.VideoCapture("parking_lot.mp4") # or 0 for webcam, or rtsp:// URL
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
results = model.track(frame, persist=True, verbose=False)
annotated = results[0].plot()
# Count objects by class
boxes = results[0].boxes
if boxes is not None and boxes.cls is not None:
classes = boxes.cls.cpu().numpy()
people = sum(1 for c in classes if int(c) == 0) # class 0 = person
cars = sum(1 for c in classes if int(c) == 2) # class 2 = car
cv2.putText(annotated, f"People: {people} Cars: {cars}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow("Analytics", annotated)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cap.release()
cv2.destroyAllWindows()
|
That gives you real-time object detection with tracking IDs and a live count overlay. The persist=True flag enables ByteTrack across frames, so each object gets a consistent ID.
Zone-Based Monitoring#
Counting objects in specific regions — like parking spaces, doorways, or restricted areas — requires defining polygonal zones and checking if detected objects fall inside them.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
| import numpy as np
from dataclasses import dataclass, field
@dataclass
class Zone:
name: str
polygon: np.ndarray # Nx2 array of points
count: int = 0
object_ids: set = field(default_factory=set)
# Define zones as polygon vertices (x, y)
zones = [
Zone("Entrance", np.array([[100, 400], [300, 400], [300, 600], [100, 600]])),
Zone("Parking A", np.array([[400, 200], [800, 200], [800, 500], [400, 500]])),
]
def point_in_zone(point: tuple, zone: Zone) -> bool:
"""Check if a point is inside a polygon using cv2."""
result = cv2.pointPolygonTest(zone.polygon.astype(np.float32), point, False)
return result >= 0
def get_center(box) -> tuple:
"""Get bottom-center of bounding box (feet position for people)."""
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
return (int((x1 + x2) / 2), int(y2))
def process_frame(results, zones: list[Zone]) -> list[Zone]:
"""Update zone counts based on detections."""
# Reset counts each frame
for zone in zones:
zone.count = 0
zone.object_ids = set()
boxes = results[0].boxes
if boxes is None or boxes.id is None:
return zones
for box in boxes:
center = get_center(box)
track_id = int(box.id[0].cpu().numpy())
for zone in zones:
if point_in_zone(center, zone):
zone.count += 1
zone.object_ids.add(track_id)
return zones
def draw_zones(frame, zones: list[Zone]):
"""Draw zone overlays with counts."""
for zone in zones:
color = (0, 255, 0) if zone.count == 0 else (0, 165, 255)
cv2.polylines(frame, [zone.polygon], True, color, 2)
centroid = zone.polygon.mean(axis=0).astype(int)
cv2.putText(frame, f"{zone.name}: {zone.count}",
tuple(centroid), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
return frame
|
This gives you per-zone occupancy counts with visual overlays. Orange zones have objects in them, green zones are empty.
Line Crossing Counter#
Count objects crossing a boundary — useful for footfall counting, traffic monitoring, or throughput measurement.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
| @dataclass
class LineCrossing:
"""Count objects crossing a line defined by two points."""
p1: tuple
p2: tuple
in_count: int = 0
out_count: int = 0
_prev_positions: dict = field(default_factory=dict)
def update(self, track_id: int, center: tuple):
"""Track position relative to line and count crossings."""
# Which side of the line is the point on?
side = ((self.p2[0] - self.p1[0]) * (center[1] - self.p1[1]) -
(self.p2[1] - self.p1[1]) * (center[0] - self.p1[0]))
current_side = "left" if side > 0 else "right"
if track_id in self._prev_positions:
prev_side = self._prev_positions[track_id]
if prev_side == "left" and current_side == "right":
self.in_count += 1
elif prev_side == "right" and current_side == "left":
self.out_count += 1
self._prev_positions[track_id] = current_side
# Usage in the main loop
counter = LineCrossing(p1=(0, 400), p2=(800, 400))
cap = cv2.VideoCapture("entrance.mp4")
model = YOLO("yolov8n.pt")
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
results = model.track(frame, persist=True, classes=[0], verbose=False) # track people only
boxes = results[0].boxes
if boxes is not None and boxes.id is not None:
for box in boxes:
center = get_center(box)
track_id = int(box.id[0].cpu().numpy())
counter.update(track_id, center)
# Draw the line and counts
cv2.line(frame, counter.p1, counter.p2, (255, 0, 0), 2)
cv2.putText(frame, f"In: {counter.in_count} Out: {counter.out_count}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow("Line Counter", frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cap.release()
|
The classes=[0] filter tells YOLO to only detect people, skipping cars, bags, and other objects. This reduces false crossings from irrelevant detections.
Processing RTSP Streams#
Real deployments use IP cameras with RTSP streams. OpenCV handles these natively, but you need to deal with connection drops and frame buffering.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| import threading
import time
class RTSPStream:
"""Threaded RTSP reader that always has the latest frame."""
def __init__(self, url: str):
self.url = url
self.frame = None
self.running = False
self.thread = None
def start(self):
self.running = True
self.thread = threading.Thread(target=self._read_loop, daemon=True)
self.thread.start()
def _read_loop(self):
while self.running:
cap = cv2.VideoCapture(self.url)
while self.running and cap.isOpened():
ret, frame = cap.read()
if not ret:
break
self.frame = frame
cap.release()
time.sleep(2) # reconnect delay
def read(self):
return self.frame is not None, self.frame
def stop(self):
self.running = False
# Usage
stream = RTSPStream("rtsp://admin:[email protected]:554/stream1")
stream.start()
time.sleep(2) # wait for first frame
while True:
ret, frame = stream.read()
if not ret:
continue
# ... run detection on frame ...
|
The threaded reader solves two problems: it prevents frame buffer buildup (you always get the latest frame, not a queued old one), and it handles disconnections by automatically reconnecting.
Common Errors and Fixes#
Low FPS with large models
Switch from yolov8s.pt to yolov8n.pt (nano) for 3-5x speedup with moderate accuracy loss. Or process every Nth frame:
1
2
3
4
5
6
7
| frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
frame_count += 1
if frame_count % 3 != 0: # process every 3rd frame
continue
# ... detection code ...
|
Tracking IDs keep resetting
Make sure you’re passing persist=True to model.track(). Without it, the tracker resets every frame and assigns new IDs.
RTSP stream lags behind real-time
OpenCV buffers RTSP frames internally. Use the threaded reader above, or set cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) to minimize buffering.
Memory grows over time
The tracker’s internal state grows as it accumulates object histories. For long-running deployments, periodically reset the tracker with model.reset() (once per hour or when the scene is empty).
cv2.imshow doesn’t work on a headless server
Drop the display code entirely and write results to a file, database, or message queue instead. For debugging, save annotated frames periodically: cv2.imwrite(f"debug_{frame_count}.jpg", annotated).