Most teams don’t need MLflow. They need a place to store model files, track which version is in production, and promote new versions without breaking anything. S3 and DynamoDB give you exactly that — artifact storage and metadata tracking — with zero servers to manage.
Here’s the DynamoDB table setup and a Python wrapper you can drop into any project today.
1
2
3
4
5
6
7
8
9
10
11
| aws dynamodb create-table \
--table-name model-registry \
--attribute-definitions \
AttributeName=model_name,AttributeType=S \
AttributeName=version,AttributeType=N \
--key-schema \
AttributeName=model_name,KeyType=HASH \
AttributeName=version,KeyType=RANGE \
--billing-mode PAY_PER_REQUEST
aws s3 mb s3://my-model-registry-bucket
|
The table uses model_name as the partition key and version as the sort key. This lets you query all versions of a model efficiently and always grab the latest one.
DynamoDB Schema and Table Design#
Each item in the table stores everything you need to find, evaluate, and load a model version:
| Field | Type | Purpose |
|---|
model_name | String (PK) | Logical model name, e.g. fraud-detector |
version | Number (SK) | Auto-incrementing version number |
stage | String | One of staging, production, archived |
s3_path | String | Full S3 URI to the artifact |
metrics | Map | Accuracy, F1, loss — whatever you track |
created_at | String | ISO 8601 timestamp |
framework | String | pytorch, onnx, tensorflow, etc. |
description | String | Free-text notes about this version |
You also want a Global Secondary Index (GSI) on stage so you can query “give me the production model” without scanning the entire table:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| aws dynamodb update-table \
--table-name model-registry \
--attribute-definitions \
AttributeName=stage,AttributeType=S \
--global-secondary-index-updates '[
{
"Create": {
"IndexName": "stage-index",
"KeySchema": [
{"AttributeName": "model_name", "KeyType": "HASH"},
{"AttributeName": "stage", "KeyType": "RANGE"}
],
"Projection": {"ProjectionType": "ALL"}
}
}
]'
|
Note: the GSI uses model_name as the partition key and stage as the sort key, so you can query for a specific model at a specific stage in one call.
Python Model Registry Class#
This is the core of the system. A single class that wraps boto3 for registering, promoting, listing, and loading models.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
| import boto3
import json
from datetime import datetime, timezone
from decimal import Decimal
from botocore.exceptions import ClientError
class ModelRegistry:
def __init__(self, table_name="model-registry", bucket="my-model-registry-bucket", region="us-east-1"):
self.dynamodb = boto3.resource("dynamodb", region_name=region)
self.s3 = boto3.client("s3", region_name=region)
self.table = self.dynamodb.Table(table_name)
self.bucket = bucket
def _next_version(self, model_name: str) -> int:
response = self.table.query(
KeyConditionExpression=boto3.dynamodb.conditions.Key("model_name").eq(model_name),
ScanIndexForward=False,
Limit=1,
)
if response["Items"]:
return int(response["Items"][0]["version"]) + 1
return 1
def register(
self,
model_name: str,
model_path: str,
framework: str = "pytorch",
metrics: dict = None,
description: str = "",
) -> dict:
version = self._next_version(model_name)
s3_key = f"models/{model_name}/v{version}/{model_path.split('/')[-1]}"
s3_uri = f"s3://{self.bucket}/{s3_key}"
# Upload artifact to S3
self.s3.upload_file(model_path, self.bucket, s3_key)
# Convert float metrics to Decimal for DynamoDB
safe_metrics = {}
if metrics:
safe_metrics = {k: Decimal(str(v)) for k, v in metrics.items()}
item = {
"model_name": model_name,
"version": version,
"stage": "staging",
"s3_path": s3_uri,
"metrics": safe_metrics,
"framework": framework,
"description": description,
"created_at": datetime.now(timezone.utc).isoformat(),
}
self.table.put_item(Item=item)
print(f"Registered {model_name} v{version} -> {s3_uri}")
return item
def promote(self, model_name: str, version: int, target_stage: str) -> None:
valid_stages = {"staging", "production", "archived"}
if target_stage not in valid_stages:
raise ValueError(f"Stage must be one of {valid_stages}")
# If promoting to production, archive the current production version first
if target_stage == "production":
current_prod = self.get_production(model_name)
if current_prod:
self.table.update_item(
Key={"model_name": model_name, "version": current_prod["version"]},
UpdateExpression="SET stage = :s",
ExpressionAttributeValues={":s": "archived"},
)
print(f"Archived {model_name} v{current_prod['version']}")
self.table.update_item(
Key={"model_name": model_name, "version": version},
UpdateExpression="SET stage = :s",
ExpressionAttributeValues={":s": target_stage},
)
print(f"Promoted {model_name} v{version} to {target_stage}")
def get_production(self, model_name: str) -> dict | None:
response = self.table.query(
KeyConditionExpression=boto3.dynamodb.conditions.Key("model_name").eq(model_name),
FilterExpression=boto3.dynamodb.conditions.Attr("stage").eq("production"),
)
items = response["Items"]
return items[0] if items else None
def list_versions(self, model_name: str, stage: str = None) -> list[dict]:
key_expr = boto3.dynamodb.conditions.Key("model_name").eq(model_name)
kwargs = {"KeyConditionExpression": key_expr, "ScanIndexForward": False}
if stage:
kwargs["FilterExpression"] = boto3.dynamodb.conditions.Attr("stage").eq(stage)
response = self.table.query(**kwargs)
return response["Items"]
def download_model(self, model_name: str, version: int = None, dest_path: str = "./model") -> str:
if version:
response = self.table.get_item(Key={"model_name": model_name, "version": version})
item = response.get("Item")
else:
item = self.get_production(model_name)
if not item:
raise ValueError(f"No model found for {model_name} v{version or 'production'}")
s3_uri = item["s3_path"]
s3_key = s3_uri.replace(f"s3://{self.bucket}/", "")
filename = s3_key.split("/")[-1]
local_path = f"{dest_path}/{filename}"
self.s3.download_file(self.bucket, s3_key, local_path)
print(f"Downloaded {model_name} v{item['version']} to {local_path}")
return local_path
|
Usage Example#
Register a freshly trained PyTorch model, then promote it:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| registry = ModelRegistry(
table_name="model-registry",
bucket="my-model-registry-bucket",
region="us-east-1",
)
# Register a new version
entry = registry.register(
model_name="fraud-detector",
model_path="./checkpoints/fraud_model_v3.pt",
framework="pytorch",
metrics={"accuracy": 0.967, "f1": 0.941, "auc_roc": 0.983},
description="Trained on Q4 2025 transaction data, XGBoost features added",
)
# List all versions
versions = registry.list_versions("fraud-detector")
for v in versions:
print(f" v{v['version']} | {v['stage']} | {v['created_at']}")
# Promote to production (auto-archives the previous production version)
registry.promote("fraud-detector", version=3, target_stage="production")
# Fetch production model for inference
model_file = registry.download_model("fraud-detector", dest_path="./serving")
|
Uploading Different Artifact Types#
The registry is framework-agnostic. S3 doesn’t care what you upload — .pt, .onnx, .h5, .pkl, a tarball with config files. For larger models, you might want to upload a directory as a zip:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| import zipfile
import os
import tempfile
def bundle_model_dir(model_dir: str) -> str:
"""Zip a model directory into a single artifact for upload."""
archive_path = os.path.join(tempfile.gettempdir(), "model_bundle.zip")
with zipfile.ZipFile(archive_path, "w", zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in os.walk(model_dir):
for f in files:
full_path = os.path.join(root, f)
arc_name = os.path.relpath(full_path, model_dir)
zf.write(full_path, arc_name)
return archive_path
# Bundle a HuggingFace model directory and register it
bundle_path = bundle_model_dir("./fine-tuned-bert/")
registry.register(
model_name="sentiment-classifier",
model_path=bundle_path,
framework="transformers",
metrics={"accuracy": 0.923, "f1_macro": 0.911},
description="Fine-tuned BERT-base on product reviews dataset",
)
|
For ONNX models, just point at the file directly:
1
2
3
4
5
6
7
| registry.register(
model_name="fraud-detector",
model_path="./exports/fraud_model.onnx",
framework="onnx",
metrics={"accuracy": 0.965, "latency_ms": 12.4},
description="ONNX export of v3, optimized for CPU inference",
)
|
CLI Script for Managing Models#
Wrap the registry class in a CLI so your team can manage models from the terminal:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
| #!/usr/bin/env python3
"""cli.py — Command-line interface for the model registry."""
import argparse
import json
from model_registry import ModelRegistry
def main():
parser = argparse.ArgumentParser(description="Model Registry CLI")
sub = parser.add_subparsers(dest="command")
# List versions
ls = sub.add_parser("list", help="List model versions")
ls.add_argument("model_name", help="Name of the model")
ls.add_argument("--stage", choices=["staging", "production", "archived"])
# Register a model
reg = sub.add_parser("register", help="Register a new model version")
reg.add_argument("model_name")
reg.add_argument("model_path", help="Local path to model artifact")
reg.add_argument("--framework", default="pytorch")
reg.add_argument("--metrics", type=json.loads, default={})
reg.add_argument("--description", default="")
# Promote a version
prom = sub.add_parser("promote", help="Promote a model version to a stage")
prom.add_argument("model_name")
prom.add_argument("version", type=int)
prom.add_argument("stage", choices=["staging", "production", "archived"])
# Download
dl = sub.add_parser("download", help="Download a model version")
dl.add_argument("model_name")
dl.add_argument("--version", type=int, default=None)
dl.add_argument("--dest", default="./model")
args = parser.parse_args()
registry = ModelRegistry()
if args.command == "list":
versions = registry.list_versions(args.model_name, stage=args.stage)
if not versions:
print(f"No versions found for {args.model_name}")
return
print(f"{'Version':<10} {'Stage':<14} {'Framework':<14} {'Created':<28} Description")
print("-" * 90)
for v in versions:
print(
f"v{v['version']:<9} {v['stage']:<14} {v['framework']:<14} "
f"{v['created_at']:<28} {v.get('description', '')[:40]}"
)
elif args.command == "register":
registry.register(
model_name=args.model_name,
model_path=args.model_path,
framework=args.framework,
metrics=args.metrics,
description=args.description,
)
elif args.command == "promote":
registry.promote(args.model_name, args.version, args.stage)
elif args.command == "download":
registry.download_model(args.model_name, version=args.version, dest_path=args.dest)
else:
parser.print_help()
if __name__ == "__main__":
main()
|
Run it like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| # Register a new model
python cli.py register fraud-detector ./checkpoints/model.pt \
--framework pytorch \
--metrics '{"accuracy": 0.967, "f1": 0.941}' \
--description "Retrained on Jan 2026 data"
# List all versions
python cli.py list fraud-detector
# List only production versions
python cli.py list fraud-detector --stage production
# Promote version 4 to production
python cli.py promote fraud-detector 4 production
# Download the current production model
python cli.py download fraud-detector --dest ./serving
|
Common Errors and Fixes#
botocore.exceptions.ClientError: ResourceNotFoundException
The DynamoDB table doesn’t exist yet. Run the create-table command from the top of this post. Double-check the region — boto3 defaults to us-east-1 but your table might be elsewhere.
Decimal conversion errors when storing metrics
DynamoDB doesn’t accept Python float types. The register method above converts metrics to Decimal(str(v)) automatically. If you’re writing items manually, always wrap floats:
1
2
3
4
5
6
7
| from decimal import Decimal
# Wrong — DynamoDB rejects this
item = {"metrics": {"accuracy": 0.95}}
# Correct
item = {"metrics": {"accuracy": Decimal("0.95")}}
|
AccessDenied on S3 upload
Your IAM role or user needs s3:PutObject on the bucket and dynamodb:PutItem / dynamodb:Query on the table. Here’s a minimal IAM policy:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:PutObject", "s3:GetObject"],
"Resource": "arn:aws:s3:::my-model-registry-bucket/*"
},
{
"Effect": "Allow",
"Action": ["dynamodb:PutItem", "dynamodb:GetItem", "dynamodb:Query", "dynamodb:UpdateItem"],
"Resource": "arn:aws:dynamodb:us-east-1:*:table/model-registry"
}
]
}
|
Version number race conditions
The _next_version method queries the latest version and increments it. If two people register simultaneously, they could get the same version number. DynamoDB’s put_item with a ConditionExpression fixes this:
1
2
3
4
| self.table.put_item(
Item=item,
ConditionExpression="attribute_not_exists(model_name) AND attribute_not_exists(version)",
)
|
This makes the write fail if the version already exists, and you can retry with a new version number.
Large model uploads timing out
For models bigger than a few hundred MB, use boto3’s multipart upload config:
1
2
3
4
5
6
7
8
| from boto3.s3.transfer import TransferConfig
config = TransferConfig(
multipart_threshold=100 * 1024 * 1024, # 100 MB
multipart_chunksize=100 * 1024 * 1024,
max_concurrency=4,
)
self.s3.upload_file(model_path, self.bucket, s3_key, Config=config)
|