The Pipeline at a Glance

A surveillance analytics pipeline reads video frames, runs object detection, tracks objects across frames, and applies rules – zone intrusion, line crossing, heatmap accumulation, event logging. YOLOv8 handles detection and tracking. The supervision library from Roboflow gives you zone polygons, line counters, and heatmap overlays without writing the geometry math yourself.

Install everything first:

1
pip install ultralytics supervision opencv-python numpy

Here is a minimal loop that reads a video file (or RTSP stream) and runs YOLOv8 tracking on every frame:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import cv2
from ultralytics import YOLO

model = YOLO("yolov8n.pt")  # nano model, swap to yolov8s.pt or yolov8m.pt for accuracy
source = "parking_lot.mp4"  # works with RTSP too: "rtsp://admin:[email protected]:554/stream"

cap = cv2.VideoCapture(source)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # persist=True keeps track IDs stable across frames
    results = model.track(frame, persist=True, classes=[0, 2, 3, 5, 7], verbose=False)
    annotated = results[0].plot()

    cv2.imshow("Surveillance", annotated)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()

The classes filter restricts detection to person (0), car (2), motorcycle (3), bus (5), and truck (7) from the COCO dataset. This cuts down on noise from backpacks, traffic cones, and other irrelevant objects.

RTSP Streams and Video Files

YOLOv8 and OpenCV both accept RTSP URLs directly. For IP cameras, the URL pattern is usually:

1
rtsp://<user>:<password>@<ip>:<port>/stream1

A few things to watch out for with RTSP:

  • Latency: Set cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) to reduce frame buffering. Without this, you process stale frames while the buffer fills.
  • Reconnection: RTSP streams drop. Wrap your read loop with a reconnect:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import cv2
import time

def open_stream(url):
    cap = cv2.VideoCapture(url)
    cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
    return cap

url = "rtsp://admin:[email protected]:554/stream1"
cap = open_stream(url)

consecutive_failures = 0

while True:
    ret, frame = cap.read()
    if not ret:
        consecutive_failures += 1
        if consecutive_failures > 30:
            print("Stream lost. Reconnecting...")
            cap.release()
            time.sleep(5)
            cap = open_stream(url)
            consecutive_failures = 0
        continue

    consecutive_failures = 0
    # ... process frame here

For video files, just pass the file path instead. The same pipeline works for both.

Zone-Based Intrusion Detection

Zone detection answers the question: “Is anyone inside this area right now?” You define a polygon (the restricted zone), and on every frame you check which detections fall inside it.

The supervision library has a PolygonZone class that does the point-in-polygon math for you:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import cv2
import numpy as np
from ultralytics import YOLO
import supervision as sv

model = YOLO("yolov8n.pt")

# Define a restricted zone as a polygon (x, y coordinates)
# These coordinates define a quadrilateral in the frame
zone_polygon = np.array([
    [200, 400],
    [600, 400],
    [600, 700],
    [200, 700],
])

cap = cv2.VideoCapture("warehouse.mp4")
ret, sample_frame = cap.read()
frame_resolution = (sample_frame.shape[1], sample_frame.shape[0])  # (width, height)
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)  # rewind

zone = sv.PolygonZone(polygon=zone_polygon)
zone_annotator = sv.PolygonZoneAnnotator(zone=zone, color=sv.Color.RED, thickness=2)
box_annotator = sv.BoxAnnotator(thickness=2)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    results = model.track(frame, persist=True, classes=[0], verbose=False)[0]
    detections = sv.Detections.from_ultralytics(results)

    # Check which detections are inside the zone
    zone_triggered = zone.trigger(detections=detections)
    intruder_count = int(zone_triggered.sum())

    # Annotate frame
    frame = box_annotator.annotate(scene=frame, detections=detections)
    frame = zone_annotator.annotate(scene=frame)

    if intruder_count > 0:
        cv2.putText(frame, f"ALERT: {intruder_count} intruder(s) in zone",
                    (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 3)

    cv2.imshow("Zone Detection", frame)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()

zone.trigger() returns a boolean array – True for each detection whose anchor point falls inside the polygon. You can set the anchor point with the triggering_anchors parameter (defaults to bottom center of the bounding box, which works well for people standing on the ground).

To define your own zone polygon, open a frame in an image editor, note the corner coordinates, and plug them in. For production, you would store these as JSON config per camera.

Counting Objects Crossing a Line

Line counting tracks how many objects cross a virtual line in each direction. This is the bread and butter of entry/exit counting at doors, gates, and hallways.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import cv2
import numpy as np
from ultralytics import YOLO
import supervision as sv

model = YOLO("yolov8n.pt")

# Define a horizontal counting line (start_point, end_point)
LINE_START = sv.Point(0, 500)
LINE_END = sv.Point(1280, 500)

cap = cv2.VideoCapture("entrance.mp4")

line_zone = sv.LineZone(start=LINE_START, end=LINE_END)
line_annotator = sv.LineZoneAnnotator(thickness=2)
box_annotator = sv.BoxAnnotator(thickness=2)
label_annotator = sv.LabelAnnotator(text_thickness=1, text_scale=0.5)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    results = model.track(frame, persist=True, classes=[0, 2], verbose=False)[0]
    detections = sv.Detections.from_ultralytics(results)

    # Update line crossing counts
    line_zone.trigger(detections=detections)

    # Build labels with tracker IDs
    labels = []
    for i in range(len(detections)):
        class_id = int(detections.class_id[i]) if detections.class_id is not None else -1
        tracker_id = int(detections.tracker_id[i]) if detections.tracker_id is not None else -1
        class_name = model.names.get(class_id, "unknown")
        labels.append(f"#{tracker_id} {class_name}")

    # Annotate
    frame = box_annotator.annotate(scene=frame, detections=detections)
    frame = label_annotator.annotate(scene=frame, detections=detections, labels=labels)
    frame = line_annotator.annotate(scene=frame, line_counter=line_zone)

    cv2.putText(frame, f"In: {line_zone.in_count}  Out: {line_zone.out_count}",
                (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 0), 3)

    cv2.imshow("Line Counter", frame)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()

The line divides the frame into two sides. Objects moving from one side to the other increment in_count or out_count depending on direction. Place the line perpendicular to the flow of traffic for best results.

Heatmap Generation from Detection Tracks

Heatmaps show where activity concentrates over time. You accumulate detection positions into a 2D array and overlay it on the frame. The supervision library has a HeatMapAnnotator that handles the blending:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import cv2
from ultralytics import YOLO
import supervision as sv

model = YOLO("yolov8n.pt")
cap = cv2.VideoCapture("lobby.mp4")

heatmap_annotator = sv.HeatMapAnnotator(
    position=sv.Position.BOTTOM_CENTER,
    opacity=0.5,
    radius=40,
    kernel_size=25,
)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    results = model.track(frame, persist=True, classes=[0], verbose=False)[0]
    detections = sv.Detections.from_ultralytics(results)

    # Accumulate and render heatmap
    frame = heatmap_annotator.annotate(scene=frame, detections=detections)

    cv2.imshow("Heatmap", frame)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()

The HeatMapAnnotator maintains an internal accumulator, so the heatmap builds up over time as more detections are processed. radius controls the size of each detection’s contribution, and kernel_size controls the Gaussian blur smoothing. Larger values give smoother, more spread-out heatmaps.

For saving a final heatmap image at the end of processing, grab the last annotated frame and write it:

1
cv2.imwrite("heatmap_result.jpg", frame)

Logging Detection Events with Timestamps

For audit trails and post-incident review, write every detection event to a structured log file. This is more useful than video recording alone because you can query it – “show me all person detections in zone A between 2am and 4am.”

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import cv2
import csv
import datetime
from ultralytics import YOLO

model = YOLO("yolov8n.pt")
cap = cv2.VideoCapture("entrance.mp4")
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0

log_file = open("detections.csv", "w", newline="")
writer = csv.writer(log_file)
writer.writerow(["timestamp", "frame_num", "track_id", "class", "confidence", "x1", "y1", "x2", "y2"])

frame_num = 0
start_time = datetime.datetime.now()

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    results = model.track(frame, persist=True, classes=[0, 2], verbose=False)[0]

    if results.boxes is not None and len(results.boxes) > 0:
        for box in results.boxes:
            x1, y1, x2, y2 = box.xyxy[0].tolist()
            conf = float(box.conf[0])
            cls = int(box.cls[0])
            class_name = model.names[cls]
            track_id = int(box.id[0]) if box.id is not None else -1

            # Calculate timestamp from frame number
            elapsed_seconds = frame_num / fps
            timestamp = start_time + datetime.timedelta(seconds=elapsed_seconds)

            writer.writerow([
                timestamp.isoformat(),
                frame_num,
                track_id,
                class_name,
                round(conf, 3),
                round(x1, 1),
                round(y1, 1),
                round(x2, 1),
                round(y2, 1),
            ])

    frame_num += 1

cap.release()
log_file.close()
print(f"Logged {frame_num} frames to detections.csv")

The CSV output looks like this:

1
2
3
4
timestamp,frame_num,track_id,class,confidence,x1,y1,x2,y2
2026-02-15T14:30:00.000000,0,1,person,0.912,150.3,200.1,280.7,550.4
2026-02-15T14:30:00.033333,1,1,person,0.908,152.1,201.3,282.4,551.2
2026-02-15T14:30:00.033333,1,2,car,0.875,400.0,300.0,700.5,500.3

You can load this into pandas for analysis, feed it to a dashboard, or trigger alerts when specific patterns appear (like a person detected in a restricted zone after hours).

Putting It All Together

Here is the full pipeline that combines zone intrusion, line counting, heatmap, and logging into a single processing loop:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import cv2
import csv
import datetime
import numpy as np
from ultralytics import YOLO
import supervision as sv

# Setup
model = YOLO("yolov8s.pt")  # small model, good balance of speed and accuracy
source = "facility.mp4"      # or RTSP URL
cap = cv2.VideoCapture(source)
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0

# Zone intrusion polygon
zone_polygon = np.array([[100, 300], [500, 300], [500, 600], [100, 600]])
zone = sv.PolygonZone(polygon=zone_polygon)
zone_annotator = sv.PolygonZoneAnnotator(zone=zone, color=sv.Color.RED)

# Line counter
line_zone = sv.LineZone(start=sv.Point(0, 400), end=sv.Point(1280, 400))
line_annotator = sv.LineZoneAnnotator(thickness=2)

# Heatmap
heatmap_annotator = sv.HeatMapAnnotator(opacity=0.3, radius=30, kernel_size=25)

# Annotators
box_annotator = sv.BoxAnnotator(thickness=2)

# CSV logger
log_file = open("surveillance_log.csv", "w", newline="")
writer = csv.writer(log_file)
writer.writerow(["timestamp", "frame", "track_id", "class", "conf", "in_zone", "x1", "y1", "x2", "y2"])

frame_num = 0
start_time = datetime.datetime.now()

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    results = model.track(frame, persist=True, classes=[0, 2, 3, 5, 7], verbose=False)[0]
    detections = sv.Detections.from_ultralytics(results)

    # Zone check
    in_zone = zone.trigger(detections=detections)
    intruder_count = int(in_zone.sum())

    # Line crossing
    line_zone.trigger(detections=detections)

    # Log each detection
    elapsed = frame_num / fps
    ts = (start_time + datetime.timedelta(seconds=elapsed)).isoformat()
    for i in range(len(detections)):
        cid = int(detections.class_id[i]) if detections.class_id is not None else -1
        tid = int(detections.tracker_id[i]) if detections.tracker_id is not None else -1
        conf = float(detections.confidence[i]) if detections.confidence is not None else 0.0
        x1, y1, x2, y2 = detections.xyxy[i].tolist()
        writer.writerow([ts, frame_num, tid, model.names.get(cid, "unknown"),
                         round(conf, 3), bool(in_zone[i]), round(x1, 1),
                         round(y1, 1), round(x2, 1), round(y2, 1)])

    # Annotate frame
    frame = heatmap_annotator.annotate(scene=frame, detections=detections)
    frame = box_annotator.annotate(scene=frame, detections=detections)
    frame = zone_annotator.annotate(scene=frame)
    frame = line_annotator.annotate(scene=frame, line_counter=line_zone)

    # Status overlay
    status = f"In: {line_zone.in_count} Out: {line_zone.out_count} Zone: {intruder_count}"
    cv2.putText(frame, status, (10, 35), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 0), 2)

    if intruder_count > 0:
        cv2.putText(frame, "ZONE ALERT", (10, 75),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 3)

    cv2.imshow("Surveillance Pipeline", frame)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

    frame_num += 1

cap.release()
log_file.close()
cv2.destroyAllWindows()
print(f"Processed {frame_num} frames. In: {line_zone.in_count}, Out: {line_zone.out_count}")

Performance Tips

  • Model size matters: yolov8n.pt runs at ~45 FPS on a decent GPU, yolov8s.pt at ~30 FPS, yolov8m.pt at ~18 FPS. Pick the smallest model that gives you acceptable accuracy for your scene.
  • Skip frames: If you are processing recorded video and do not need every frame, process every 2nd or 3rd frame. Detection results do not change much between adjacent frames.
  • GPU inference: YOLOv8 auto-detects CUDA. Verify with model.device after loading – it should show device(type='cuda', index=0).
  • Resolution: Downscale high-resolution streams before inference. A 4K frame is overkill for detecting people. Resize to 1280x720 before passing to the model.
  • Batch processing: For offline analysis, use model.track(source="video.mp4", stream=True) to process without holding all results in memory.

Common Errors and Fixes

TypeError: 'NoneType' object is not subscriptable when accessing box.id

This happens when tracking is not enabled or when a detection has no assigned track ID yet. Always check for None:

1
track_id = int(box.id[0]) if box.id is not None else -1

RTSP stream hangs or returns black frames

OpenCV’s RTSP handling can stall. Set the backend explicitly and reduce buffering:

1
2
cap = cv2.VideoCapture("rtsp://...", cv2.CAP_FFMPEG)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)

If the stream still hangs, the camera may require specific transport settings. Try adding ?tcp to the RTSP URL or setting the environment variable OPENCV_FFMPEG_CAPTURE_OPTIONS=rtsp_transport;tcp before opening the stream.

supervision version mismatch: PolygonZone.__init__() got an unexpected keyword argument

The supervision API changed between versions. The code above targets supervision>=0.18.0. Check your version:

1
pip show supervision

If you are on an older version, upgrade with pip install --upgrade supervision.

YOLOv8 model.track() not returning tracker IDs

Make sure you pass persist=True. Without it, the tracker resets on every call and never assigns stable IDs. Also, tracker IDs only appear after an object has been seen in enough consecutive frames (default warmup is 1 frame, but it varies by tracker config).

Out of GPU memory on high-resolution video

Resize frames before inference instead of feeding full resolution:

1
2
frame_resized = cv2.resize(frame, (1280, 720))
results = model.track(frame_resized, persist=True, verbose=False)

Or switch to a smaller model (yolov8n.pt instead of yolov8m.pt).