Most teams don’t need MLflow. They need a place to store model files, track which version is in production, and promote new versions without breaking anything. S3 and DynamoDB give you exactly that — artifact storage and metadata tracking — with zero servers to manage.

Here’s the DynamoDB table setup and a Python wrapper you can drop into any project today.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
aws dynamodb create-table \
  --table-name model-registry \
  --attribute-definitions \
    AttributeName=model_name,AttributeType=S \
    AttributeName=version,AttributeType=N \
  --key-schema \
    AttributeName=model_name,KeyType=HASH \
    AttributeName=version,KeyType=RANGE \
  --billing-mode PAY_PER_REQUEST

aws s3 mb s3://my-model-registry-bucket

The table uses model_name as the partition key and version as the sort key. This lets you query all versions of a model efficiently and always grab the latest one.

DynamoDB Schema and Table Design

Each item in the table stores everything you need to find, evaluate, and load a model version:

FieldTypePurpose
model_nameString (PK)Logical model name, e.g. fraud-detector
versionNumber (SK)Auto-incrementing version number
stageStringOne of staging, production, archived
s3_pathStringFull S3 URI to the artifact
metricsMapAccuracy, F1, loss — whatever you track
created_atStringISO 8601 timestamp
frameworkStringpytorch, onnx, tensorflow, etc.
descriptionStringFree-text notes about this version

You also want a Global Secondary Index (GSI) on stage so you can query “give me the production model” without scanning the entire table:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
aws dynamodb update-table \
  --table-name model-registry \
  --attribute-definitions \
    AttributeName=stage,AttributeType=S \
  --global-secondary-index-updates '[
    {
      "Create": {
        "IndexName": "stage-index",
        "KeySchema": [
          {"AttributeName": "model_name", "KeyType": "HASH"},
          {"AttributeName": "stage", "KeyType": "RANGE"}
        ],
        "Projection": {"ProjectionType": "ALL"}
      }
    }
  ]'

Note: the GSI uses model_name as the partition key and stage as the sort key, so you can query for a specific model at a specific stage in one call.

Python Model Registry Class

This is the core of the system. A single class that wraps boto3 for registering, promoting, listing, and loading models.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import boto3
import json
from datetime import datetime, timezone
from decimal import Decimal
from botocore.exceptions import ClientError


class ModelRegistry:
    def __init__(self, table_name="model-registry", bucket="my-model-registry-bucket", region="us-east-1"):
        self.dynamodb = boto3.resource("dynamodb", region_name=region)
        self.s3 = boto3.client("s3", region_name=region)
        self.table = self.dynamodb.Table(table_name)
        self.bucket = bucket

    def _next_version(self, model_name: str) -> int:
        response = self.table.query(
            KeyConditionExpression=boto3.dynamodb.conditions.Key("model_name").eq(model_name),
            ScanIndexForward=False,
            Limit=1,
        )
        if response["Items"]:
            return int(response["Items"][0]["version"]) + 1
        return 1

    def register(
        self,
        model_name: str,
        model_path: str,
        framework: str = "pytorch",
        metrics: dict = None,
        description: str = "",
    ) -> dict:
        version = self._next_version(model_name)
        s3_key = f"models/{model_name}/v{version}/{model_path.split('/')[-1]}"
        s3_uri = f"s3://{self.bucket}/{s3_key}"

        # Upload artifact to S3
        self.s3.upload_file(model_path, self.bucket, s3_key)

        # Convert float metrics to Decimal for DynamoDB
        safe_metrics = {}
        if metrics:
            safe_metrics = {k: Decimal(str(v)) for k, v in metrics.items()}

        item = {
            "model_name": model_name,
            "version": version,
            "stage": "staging",
            "s3_path": s3_uri,
            "metrics": safe_metrics,
            "framework": framework,
            "description": description,
            "created_at": datetime.now(timezone.utc).isoformat(),
        }

        self.table.put_item(Item=item)
        print(f"Registered {model_name} v{version} -> {s3_uri}")
        return item

    def promote(self, model_name: str, version: int, target_stage: str) -> None:
        valid_stages = {"staging", "production", "archived"}
        if target_stage not in valid_stages:
            raise ValueError(f"Stage must be one of {valid_stages}")

        # If promoting to production, archive the current production version first
        if target_stage == "production":
            current_prod = self.get_production(model_name)
            if current_prod:
                self.table.update_item(
                    Key={"model_name": model_name, "version": current_prod["version"]},
                    UpdateExpression="SET stage = :s",
                    ExpressionAttributeValues={":s": "archived"},
                )
                print(f"Archived {model_name} v{current_prod['version']}")

        self.table.update_item(
            Key={"model_name": model_name, "version": version},
            UpdateExpression="SET stage = :s",
            ExpressionAttributeValues={":s": target_stage},
        )
        print(f"Promoted {model_name} v{version} to {target_stage}")

    def get_production(self, model_name: str) -> dict | None:
        response = self.table.query(
            KeyConditionExpression=boto3.dynamodb.conditions.Key("model_name").eq(model_name),
            FilterExpression=boto3.dynamodb.conditions.Attr("stage").eq("production"),
        )
        items = response["Items"]
        return items[0] if items else None

    def list_versions(self, model_name: str, stage: str = None) -> list[dict]:
        key_expr = boto3.dynamodb.conditions.Key("model_name").eq(model_name)
        kwargs = {"KeyConditionExpression": key_expr, "ScanIndexForward": False}

        if stage:
            kwargs["FilterExpression"] = boto3.dynamodb.conditions.Attr("stage").eq(stage)

        response = self.table.query(**kwargs)
        return response["Items"]

    def download_model(self, model_name: str, version: int = None, dest_path: str = "./model") -> str:
        if version:
            response = self.table.get_item(Key={"model_name": model_name, "version": version})
            item = response.get("Item")
        else:
            item = self.get_production(model_name)

        if not item:
            raise ValueError(f"No model found for {model_name} v{version or 'production'}")

        s3_uri = item["s3_path"]
        s3_key = s3_uri.replace(f"s3://{self.bucket}/", "")
        filename = s3_key.split("/")[-1]
        local_path = f"{dest_path}/{filename}"

        self.s3.download_file(self.bucket, s3_key, local_path)
        print(f"Downloaded {model_name} v{item['version']} to {local_path}")
        return local_path

Usage Example

Register a freshly trained PyTorch model, then promote it:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
registry = ModelRegistry(
    table_name="model-registry",
    bucket="my-model-registry-bucket",
    region="us-east-1",
)

# Register a new version
entry = registry.register(
    model_name="fraud-detector",
    model_path="./checkpoints/fraud_model_v3.pt",
    framework="pytorch",
    metrics={"accuracy": 0.967, "f1": 0.941, "auc_roc": 0.983},
    description="Trained on Q4 2025 transaction data, XGBoost features added",
)

# List all versions
versions = registry.list_versions("fraud-detector")
for v in versions:
    print(f"  v{v['version']} | {v['stage']} | {v['created_at']}")

# Promote to production (auto-archives the previous production version)
registry.promote("fraud-detector", version=3, target_stage="production")

# Fetch production model for inference
model_file = registry.download_model("fraud-detector", dest_path="./serving")

Uploading Different Artifact Types

The registry is framework-agnostic. S3 doesn’t care what you upload — .pt, .onnx, .h5, .pkl, a tarball with config files. For larger models, you might want to upload a directory as a zip:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import zipfile
import os
import tempfile


def bundle_model_dir(model_dir: str) -> str:
    """Zip a model directory into a single artifact for upload."""
    archive_path = os.path.join(tempfile.gettempdir(), "model_bundle.zip")
    with zipfile.ZipFile(archive_path, "w", zipfile.ZIP_DEFLATED) as zf:
        for root, dirs, files in os.walk(model_dir):
            for f in files:
                full_path = os.path.join(root, f)
                arc_name = os.path.relpath(full_path, model_dir)
                zf.write(full_path, arc_name)
    return archive_path


# Bundle a HuggingFace model directory and register it
bundle_path = bundle_model_dir("./fine-tuned-bert/")
registry.register(
    model_name="sentiment-classifier",
    model_path=bundle_path,
    framework="transformers",
    metrics={"accuracy": 0.923, "f1_macro": 0.911},
    description="Fine-tuned BERT-base on product reviews dataset",
)

For ONNX models, just point at the file directly:

1
2
3
4
5
6
7
registry.register(
    model_name="fraud-detector",
    model_path="./exports/fraud_model.onnx",
    framework="onnx",
    metrics={"accuracy": 0.965, "latency_ms": 12.4},
    description="ONNX export of v3, optimized for CPU inference",
)

CLI Script for Managing Models

Wrap the registry class in a CLI so your team can manage models from the terminal:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#!/usr/bin/env python3
"""cli.py — Command-line interface for the model registry."""

import argparse
import json
from model_registry import ModelRegistry


def main():
    parser = argparse.ArgumentParser(description="Model Registry CLI")
    sub = parser.add_subparsers(dest="command")

    # List versions
    ls = sub.add_parser("list", help="List model versions")
    ls.add_argument("model_name", help="Name of the model")
    ls.add_argument("--stage", choices=["staging", "production", "archived"])

    # Register a model
    reg = sub.add_parser("register", help="Register a new model version")
    reg.add_argument("model_name")
    reg.add_argument("model_path", help="Local path to model artifact")
    reg.add_argument("--framework", default="pytorch")
    reg.add_argument("--metrics", type=json.loads, default={})
    reg.add_argument("--description", default="")

    # Promote a version
    prom = sub.add_parser("promote", help="Promote a model version to a stage")
    prom.add_argument("model_name")
    prom.add_argument("version", type=int)
    prom.add_argument("stage", choices=["staging", "production", "archived"])

    # Download
    dl = sub.add_parser("download", help="Download a model version")
    dl.add_argument("model_name")
    dl.add_argument("--version", type=int, default=None)
    dl.add_argument("--dest", default="./model")

    args = parser.parse_args()
    registry = ModelRegistry()

    if args.command == "list":
        versions = registry.list_versions(args.model_name, stage=args.stage)
        if not versions:
            print(f"No versions found for {args.model_name}")
            return
        print(f"{'Version':<10} {'Stage':<14} {'Framework':<14} {'Created':<28} Description")
        print("-" * 90)
        for v in versions:
            print(
                f"v{v['version']:<9} {v['stage']:<14} {v['framework']:<14} "
                f"{v['created_at']:<28} {v.get('description', '')[:40]}"
            )

    elif args.command == "register":
        registry.register(
            model_name=args.model_name,
            model_path=args.model_path,
            framework=args.framework,
            metrics=args.metrics,
            description=args.description,
        )

    elif args.command == "promote":
        registry.promote(args.model_name, args.version, args.stage)

    elif args.command == "download":
        registry.download_model(args.model_name, version=args.version, dest_path=args.dest)

    else:
        parser.print_help()


if __name__ == "__main__":
    main()

Run it like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# Register a new model
python cli.py register fraud-detector ./checkpoints/model.pt \
  --framework pytorch \
  --metrics '{"accuracy": 0.967, "f1": 0.941}' \
  --description "Retrained on Jan 2026 data"

# List all versions
python cli.py list fraud-detector

# List only production versions
python cli.py list fraud-detector --stage production

# Promote version 4 to production
python cli.py promote fraud-detector 4 production

# Download the current production model
python cli.py download fraud-detector --dest ./serving

Common Errors and Fixes

botocore.exceptions.ClientError: ResourceNotFoundException The DynamoDB table doesn’t exist yet. Run the create-table command from the top of this post. Double-check the region — boto3 defaults to us-east-1 but your table might be elsewhere.

Decimal conversion errors when storing metrics DynamoDB doesn’t accept Python float types. The register method above converts metrics to Decimal(str(v)) automatically. If you’re writing items manually, always wrap floats:

1
2
3
4
5
6
7
from decimal import Decimal

# Wrong — DynamoDB rejects this
item = {"metrics": {"accuracy": 0.95}}

# Correct
item = {"metrics": {"accuracy": Decimal("0.95")}}

AccessDenied on S3 upload Your IAM role or user needs s3:PutObject on the bucket and dynamodb:PutItem / dynamodb:Query on the table. Here’s a minimal IAM policy:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["s3:PutObject", "s3:GetObject"],
      "Resource": "arn:aws:s3:::my-model-registry-bucket/*"
    },
    {
      "Effect": "Allow",
      "Action": ["dynamodb:PutItem", "dynamodb:GetItem", "dynamodb:Query", "dynamodb:UpdateItem"],
      "Resource": "arn:aws:dynamodb:us-east-1:*:table/model-registry"
    }
  ]
}

Version number race conditions The _next_version method queries the latest version and increments it. If two people register simultaneously, they could get the same version number. DynamoDB’s put_item with a ConditionExpression fixes this:

1
2
3
4
self.table.put_item(
    Item=item,
    ConditionExpression="attribute_not_exists(model_name) AND attribute_not_exists(version)",
)

This makes the write fail if the version already exists, and you can retry with a new version number.

Large model uploads timing out For models bigger than a few hundred MB, use boto3’s multipart upload config:

1
2
3
4
5
6
7
8
from boto3.s3.transfer import TransferConfig

config = TransferConfig(
    multipart_threshold=100 * 1024 * 1024,  # 100 MB
    multipart_chunksize=100 * 1024 * 1024,
    max_concurrency=4,
)
self.s3.upload_file(model_path, self.bucket, s3_key, Config=config)