The Pipeline at a Glance#
A surveillance analytics pipeline reads video frames, runs object detection, tracks objects across frames, and applies rules – zone intrusion, line crossing, heatmap accumulation, event logging. YOLOv8 handles detection and tracking. The supervision library from Roboflow gives you zone polygons, line counters, and heatmap overlays without writing the geometry math yourself.
Install everything first:
1
| pip install ultralytics supervision opencv-python numpy
|
Here is a minimal loop that reads a video file (or RTSP stream) and runs YOLOv8 tracking on every frame:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| import cv2
from ultralytics import YOLO
model = YOLO("yolov8n.pt") # nano model, swap to yolov8s.pt or yolov8m.pt for accuracy
source = "parking_lot.mp4" # works with RTSP too: "rtsp://admin:[email protected]:554/stream"
cap = cv2.VideoCapture(source)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# persist=True keeps track IDs stable across frames
results = model.track(frame, persist=True, classes=[0, 2, 3, 5, 7], verbose=False)
annotated = results[0].plot()
cv2.imshow("Surveillance", annotated)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cap.release()
cv2.destroyAllWindows()
|
The classes filter restricts detection to person (0), car (2), motorcycle (3), bus (5), and truck (7) from the COCO dataset. This cuts down on noise from backpacks, traffic cones, and other irrelevant objects.
RTSP Streams and Video Files#
YOLOv8 and OpenCV both accept RTSP URLs directly. For IP cameras, the URL pattern is usually:
1
| rtsp://<user>:<password>@<ip>:<port>/stream1
|
A few things to watch out for with RTSP:
- Latency: Set
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) to reduce frame buffering. Without this, you process stale frames while the buffer fills. - Reconnection: RTSP streams drop. Wrap your read loop with a reconnect:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| import cv2
import time
def open_stream(url):
cap = cv2.VideoCapture(url)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
return cap
url = "rtsp://admin:[email protected]:554/stream1"
cap = open_stream(url)
consecutive_failures = 0
while True:
ret, frame = cap.read()
if not ret:
consecutive_failures += 1
if consecutive_failures > 30:
print("Stream lost. Reconnecting...")
cap.release()
time.sleep(5)
cap = open_stream(url)
consecutive_failures = 0
continue
consecutive_failures = 0
# ... process frame here
|
For video files, just pass the file path instead. The same pipeline works for both.
Zone-Based Intrusion Detection#
Zone detection answers the question: “Is anyone inside this area right now?” You define a polygon (the restricted zone), and on every frame you check which detections fall inside it.
The supervision library has a PolygonZone class that does the point-in-polygon math for you:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
| import cv2
import numpy as np
from ultralytics import YOLO
import supervision as sv
model = YOLO("yolov8n.pt")
# Define a restricted zone as a polygon (x, y coordinates)
# These coordinates define a quadrilateral in the frame
zone_polygon = np.array([
[200, 400],
[600, 400],
[600, 700],
[200, 700],
])
cap = cv2.VideoCapture("warehouse.mp4")
ret, sample_frame = cap.read()
frame_resolution = (sample_frame.shape[1], sample_frame.shape[0]) # (width, height)
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # rewind
zone = sv.PolygonZone(polygon=zone_polygon)
zone_annotator = sv.PolygonZoneAnnotator(zone=zone, color=sv.Color.RED, thickness=2)
box_annotator = sv.BoxAnnotator(thickness=2)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
results = model.track(frame, persist=True, classes=[0], verbose=False)[0]
detections = sv.Detections.from_ultralytics(results)
# Check which detections are inside the zone
zone_triggered = zone.trigger(detections=detections)
intruder_count = int(zone_triggered.sum())
# Annotate frame
frame = box_annotator.annotate(scene=frame, detections=detections)
frame = zone_annotator.annotate(scene=frame)
if intruder_count > 0:
cv2.putText(frame, f"ALERT: {intruder_count} intruder(s) in zone",
(10, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 3)
cv2.imshow("Zone Detection", frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cap.release()
cv2.destroyAllWindows()
|
zone.trigger() returns a boolean array – True for each detection whose anchor point falls inside the polygon. You can set the anchor point with the triggering_anchors parameter (defaults to bottom center of the bounding box, which works well for people standing on the ground).
To define your own zone polygon, open a frame in an image editor, note the corner coordinates, and plug them in. For production, you would store these as JSON config per camera.
Counting Objects Crossing a Line#
Line counting tracks how many objects cross a virtual line in each direction. This is the bread and butter of entry/exit counting at doors, gates, and hallways.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
| import cv2
import numpy as np
from ultralytics import YOLO
import supervision as sv
model = YOLO("yolov8n.pt")
# Define a horizontal counting line (start_point, end_point)
LINE_START = sv.Point(0, 500)
LINE_END = sv.Point(1280, 500)
cap = cv2.VideoCapture("entrance.mp4")
line_zone = sv.LineZone(start=LINE_START, end=LINE_END)
line_annotator = sv.LineZoneAnnotator(thickness=2)
box_annotator = sv.BoxAnnotator(thickness=2)
label_annotator = sv.LabelAnnotator(text_thickness=1, text_scale=0.5)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
results = model.track(frame, persist=True, classes=[0, 2], verbose=False)[0]
detections = sv.Detections.from_ultralytics(results)
# Update line crossing counts
line_zone.trigger(detections=detections)
# Build labels with tracker IDs
labels = []
for i in range(len(detections)):
class_id = int(detections.class_id[i]) if detections.class_id is not None else -1
tracker_id = int(detections.tracker_id[i]) if detections.tracker_id is not None else -1
class_name = model.names.get(class_id, "unknown")
labels.append(f"#{tracker_id} {class_name}")
# Annotate
frame = box_annotator.annotate(scene=frame, detections=detections)
frame = label_annotator.annotate(scene=frame, detections=detections, labels=labels)
frame = line_annotator.annotate(scene=frame, line_counter=line_zone)
cv2.putText(frame, f"In: {line_zone.in_count} Out: {line_zone.out_count}",
(10, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 0), 3)
cv2.imshow("Line Counter", frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cap.release()
cv2.destroyAllWindows()
|
The line divides the frame into two sides. Objects moving from one side to the other increment in_count or out_count depending on direction. Place the line perpendicular to the flow of traffic for best results.
Heatmap Generation from Detection Tracks#
Heatmaps show where activity concentrates over time. You accumulate detection positions into a 2D array and overlay it on the frame. The supervision library has a HeatMapAnnotator that handles the blending:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| import cv2
from ultralytics import YOLO
import supervision as sv
model = YOLO("yolov8n.pt")
cap = cv2.VideoCapture("lobby.mp4")
heatmap_annotator = sv.HeatMapAnnotator(
position=sv.Position.BOTTOM_CENTER,
opacity=0.5,
radius=40,
kernel_size=25,
)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
results = model.track(frame, persist=True, classes=[0], verbose=False)[0]
detections = sv.Detections.from_ultralytics(results)
# Accumulate and render heatmap
frame = heatmap_annotator.annotate(scene=frame, detections=detections)
cv2.imshow("Heatmap", frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cap.release()
cv2.destroyAllWindows()
|
The HeatMapAnnotator maintains an internal accumulator, so the heatmap builds up over time as more detections are processed. radius controls the size of each detection’s contribution, and kernel_size controls the Gaussian blur smoothing. Larger values give smoother, more spread-out heatmaps.
For saving a final heatmap image at the end of processing, grab the last annotated frame and write it:
1
| cv2.imwrite("heatmap_result.jpg", frame)
|
Logging Detection Events with Timestamps#
For audit trails and post-incident review, write every detection event to a structured log file. This is more useful than video recording alone because you can query it – “show me all person detections in zone A between 2am and 4am.”
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| import cv2
import csv
import datetime
from ultralytics import YOLO
model = YOLO("yolov8n.pt")
cap = cv2.VideoCapture("entrance.mp4")
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
log_file = open("detections.csv", "w", newline="")
writer = csv.writer(log_file)
writer.writerow(["timestamp", "frame_num", "track_id", "class", "confidence", "x1", "y1", "x2", "y2"])
frame_num = 0
start_time = datetime.datetime.now()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
results = model.track(frame, persist=True, classes=[0, 2], verbose=False)[0]
if results.boxes is not None and len(results.boxes) > 0:
for box in results.boxes:
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = float(box.conf[0])
cls = int(box.cls[0])
class_name = model.names[cls]
track_id = int(box.id[0]) if box.id is not None else -1
# Calculate timestamp from frame number
elapsed_seconds = frame_num / fps
timestamp = start_time + datetime.timedelta(seconds=elapsed_seconds)
writer.writerow([
timestamp.isoformat(),
frame_num,
track_id,
class_name,
round(conf, 3),
round(x1, 1),
round(y1, 1),
round(x2, 1),
round(y2, 1),
])
frame_num += 1
cap.release()
log_file.close()
print(f"Logged {frame_num} frames to detections.csv")
|
The CSV output looks like this:
1
2
3
4
| timestamp,frame_num,track_id,class,confidence,x1,y1,x2,y2
2026-02-15T14:30:00.000000,0,1,person,0.912,150.3,200.1,280.7,550.4
2026-02-15T14:30:00.033333,1,1,person,0.908,152.1,201.3,282.4,551.2
2026-02-15T14:30:00.033333,1,2,car,0.875,400.0,300.0,700.5,500.3
|
You can load this into pandas for analysis, feed it to a dashboard, or trigger alerts when specific patterns appear (like a person detected in a restricted zone after hours).
Putting It All Together#
Here is the full pipeline that combines zone intrusion, line counting, heatmap, and logging into a single processing loop:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
| import cv2
import csv
import datetime
import numpy as np
from ultralytics import YOLO
import supervision as sv
# Setup
model = YOLO("yolov8s.pt") # small model, good balance of speed and accuracy
source = "facility.mp4" # or RTSP URL
cap = cv2.VideoCapture(source)
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
# Zone intrusion polygon
zone_polygon = np.array([[100, 300], [500, 300], [500, 600], [100, 600]])
zone = sv.PolygonZone(polygon=zone_polygon)
zone_annotator = sv.PolygonZoneAnnotator(zone=zone, color=sv.Color.RED)
# Line counter
line_zone = sv.LineZone(start=sv.Point(0, 400), end=sv.Point(1280, 400))
line_annotator = sv.LineZoneAnnotator(thickness=2)
# Heatmap
heatmap_annotator = sv.HeatMapAnnotator(opacity=0.3, radius=30, kernel_size=25)
# Annotators
box_annotator = sv.BoxAnnotator(thickness=2)
# CSV logger
log_file = open("surveillance_log.csv", "w", newline="")
writer = csv.writer(log_file)
writer.writerow(["timestamp", "frame", "track_id", "class", "conf", "in_zone", "x1", "y1", "x2", "y2"])
frame_num = 0
start_time = datetime.datetime.now()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
results = model.track(frame, persist=True, classes=[0, 2, 3, 5, 7], verbose=False)[0]
detections = sv.Detections.from_ultralytics(results)
# Zone check
in_zone = zone.trigger(detections=detections)
intruder_count = int(in_zone.sum())
# Line crossing
line_zone.trigger(detections=detections)
# Log each detection
elapsed = frame_num / fps
ts = (start_time + datetime.timedelta(seconds=elapsed)).isoformat()
for i in range(len(detections)):
cid = int(detections.class_id[i]) if detections.class_id is not None else -1
tid = int(detections.tracker_id[i]) if detections.tracker_id is not None else -1
conf = float(detections.confidence[i]) if detections.confidence is not None else 0.0
x1, y1, x2, y2 = detections.xyxy[i].tolist()
writer.writerow([ts, frame_num, tid, model.names.get(cid, "unknown"),
round(conf, 3), bool(in_zone[i]), round(x1, 1),
round(y1, 1), round(x2, 1), round(y2, 1)])
# Annotate frame
frame = heatmap_annotator.annotate(scene=frame, detections=detections)
frame = box_annotator.annotate(scene=frame, detections=detections)
frame = zone_annotator.annotate(scene=frame)
frame = line_annotator.annotate(scene=frame, line_counter=line_zone)
# Status overlay
status = f"In: {line_zone.in_count} Out: {line_zone.out_count} Zone: {intruder_count}"
cv2.putText(frame, status, (10, 35), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 0), 2)
if intruder_count > 0:
cv2.putText(frame, "ZONE ALERT", (10, 75),
cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 3)
cv2.imshow("Surveillance Pipeline", frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
frame_num += 1
cap.release()
log_file.close()
cv2.destroyAllWindows()
print(f"Processed {frame_num} frames. In: {line_zone.in_count}, Out: {line_zone.out_count}")
|
- Model size matters:
yolov8n.pt runs at ~45 FPS on a decent GPU, yolov8s.pt at ~30 FPS, yolov8m.pt at ~18 FPS. Pick the smallest model that gives you acceptable accuracy for your scene. - Skip frames: If you are processing recorded video and do not need every frame, process every 2nd or 3rd frame. Detection results do not change much between adjacent frames.
- GPU inference: YOLOv8 auto-detects CUDA. Verify with
model.device after loading – it should show device(type='cuda', index=0). - Resolution: Downscale high-resolution streams before inference. A 4K frame is overkill for detecting people. Resize to 1280x720 before passing to the model.
- Batch processing: For offline analysis, use
model.track(source="video.mp4", stream=True) to process without holding all results in memory.
Common Errors and Fixes#
TypeError: 'NoneType' object is not subscriptable when accessing box.id
This happens when tracking is not enabled or when a detection has no assigned track ID yet. Always check for None:
1
| track_id = int(box.id[0]) if box.id is not None else -1
|
RTSP stream hangs or returns black frames
OpenCV’s RTSP handling can stall. Set the backend explicitly and reduce buffering:
1
2
| cap = cv2.VideoCapture("rtsp://...", cv2.CAP_FFMPEG)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
If the stream still hangs, the camera may require specific transport settings. Try adding ?tcp to the RTSP URL or setting the environment variable OPENCV_FFMPEG_CAPTURE_OPTIONS=rtsp_transport;tcp before opening the stream.
supervision version mismatch: PolygonZone.__init__() got an unexpected keyword argument
The supervision API changed between versions. The code above targets supervision>=0.18.0. Check your version:
If you are on an older version, upgrade with pip install --upgrade supervision.
YOLOv8 model.track() not returning tracker IDs
Make sure you pass persist=True. Without it, the tracker resets on every call and never assigns stable IDs. Also, tracker IDs only appear after an object has been seen in enough consecutive frames (default warmup is 1 frame, but it varies by tracker config).
Out of GPU memory on high-resolution video
Resize frames before inference instead of feeding full resolution:
1
2
| frame_resized = cv2.resize(frame, (1280, 720))
results = model.track(frame_resized, persist=True, verbose=False)
|
Or switch to a smaller model (yolov8n.pt instead of yolov8m.pt).