Subsections of Inference

First Pytorch ISVC

Mnist Inference

More Information about mnist service can be found 🔗link

  1. create a namespace
kubectl create namespace kserve-test
  1. deploy a sample iris service
kubectl apply -n kserve-test -f - <<EOF
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "first-torchserve"
  namespace: kserve-test
spec:
  predictor:
    model:
      modelFormat:
        name: pytorch
      storageUri: gs://kfserving-examples/models/torchserve/image_classifier/v1
      resources:
        limits:
          memory: 4Gi
EOF
  1. Check InferenceService status
kubectl -n kserve-test get inferenceservices first-torchserve 
kubectl -n kserve-test get pod
#NAME                                           READY   STATUS    RESTARTS   AGE
#first-torchserve-predictor-00001-deplo...      2/2     Running   0          25s

kubectl -n kserve-test get inferenceservices first-torchserve
#NAME           URL   READY     PREV   LATEST   PREVROLLEDOUTREVISION   LATESTREADYREVISION   AGE
#kserve-test   first-torchserve      http://first-torchserve.kserve-test.example.com   True           100                              first-torchserve-predictor-00001   2m59s

After all pods are ready, you can access the service by using the following command

Access By

If the EXTERNAL-IP value is set, your environment has an external load balancer that you can use for the ingress gateway.

export INGRESS_HOST=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')

If the EXTERNAL-IP value is none (or perpetually pending), your environment does not provide an external load balancer for the ingress gateway. In this case, you can access the gateway using the service’s node port.

export INGRESS_HOST=$(minikube ip)
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
export INGRESS_HOST=$(minikube ip)
kubectl port-forward --namespace istio-system svc/istio-ingressgateway 30080:80
export INGRESS_PORT=30080
  1. Perform a prediction First, prepare your inference input request inside a file:
wget -O ./mnist-input.json https://raw.githubusercontent.com/kserve/kserve/refs/heads/master/docs/samples/v1beta1/torchserve/v1/imgconv/input.json
ssh -i ~/.minikube/machines/minikube/id_rsa docker@$(minikube ip) -L "*:${INGRESS_PORT}:0.0.0.0:${INGRESS_PORT}" -N -f
  1. Invoke the service
SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice first-torchserve  -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://first-torchserve.kserve-test.example.com 
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" "http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/mnist:predict" -d @./mnist-input.json
*   Trying 192.168.58.2...
* TCP_NODELAY set
* Connected to 192.168.58.2 (192.168.58.2) port 32132 (#0)
> POST /v1/models/mnist:predict HTTP/1.1
> Host: my-torchserve.kserve-test.example.com
> User-Agent: curl/7.61.1
> Accept: */*
> Content-Type: application/json
> Content-Length: 401
> 
* upload completely sent off: 401 out of 401 bytes
< HTTP/1.1 200 OK
< content-length: 19
< content-type: application/json
< date: Mon, 09 Jun 2025 09:27:27 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 1128
< 
* Connection #0 to host 192.168.58.2 left intact
{"predictions":[2]}

First Custom Model

AlexNet Inference

More Information about AlexNet service can be found 🔗link

  1. Implement Custom Model using KServe API
 1import argparse
 2import base64
 3import io
 4import time
 5
 6from fastapi.middleware.cors import CORSMiddleware
 7from torchvision import models, transforms
 8from typing import Dict
 9import torch
10from PIL import Image
11
12import kserve
13from kserve import Model, ModelServer, logging
14from kserve.model_server import app
15from kserve.utils.utils import generate_uuid
16
17
18class AlexNetModel(Model):
19    def __init__(self, name: str):
20        super().__init__(name, return_response_headers=True)
21        self.name = name
22        self.load()
23        self.ready = False
24
25    def load(self):
26        self.model = models.alexnet(pretrained=True)
27        self.model.eval()
28        # The ready flag is used by model ready endpoint for readiness probes,
29        # set to True when model is loaded successfully without exceptions.
30        self.ready = True
31
32    async def predict(
33        self,
34        payload: Dict,
35        headers: Dict[str, str] = None,
36        response_headers: Dict[str, str] = None,
37    ) -> Dict:
38        start = time.time()
39        # Input follows the Tensorflow V1 HTTP API for binary values
40        # https://www.tensorflow.org/tfx/serving/api_rest#encoding_binary_values
41        img_data = payload["instances"][0]["image"]["b64"]
42        raw_img_data = base64.b64decode(img_data)
43        input_image = Image.open(io.BytesIO(raw_img_data))
44        preprocess = transforms.Compose([
45            transforms.Resize(256),
46            transforms.CenterCrop(224),
47            transforms.ToTensor(),
48            transforms.Normalize(mean=[0.485, 0.456, 0.406],
49                                 std=[0.229, 0.224, 0.225]),
50        ])
51        input_tensor = preprocess(input_image).unsqueeze(0)
52        output = self.model(input_tensor)
53        torch.nn.functional.softmax(output, dim=1)
54        values, top_5 = torch.topk(output, 5)
55        result = values.flatten().tolist()
56        end = time.time()
57        response_id = generate_uuid()
58
59        # Custom response headers can be added to the inference response
60        if response_headers is not None:
61            response_headers.update(
62                {"prediction-time-latency": f"{round((end - start) * 1000, 9)}"}
63            )
64
65        return {"predictions": result}
66
67
68parser = argparse.ArgumentParser(parents=[kserve.model_server.parser])
69args, _ = parser.parse_known_args()
70
71if __name__ == "__main__":
72    # Configure kserve and uvicorn logger
73    if args.configure_logging:
74        logging.configure_logging(args.log_config_file)
75    model = AlexNetModel(args.model_name)
76    model.load()
77    # Custom middlewares can be added to the model
78    app.add_middleware(
79        CORSMiddleware,
80        allow_origins=["*"],
81        allow_credentials=True,
82        allow_methods=["*"],
83        allow_headers=["*"],
84    )
85    ModelServer().start([model])
  1. create requirements.txt
kserve
torchvision==0.18.0
pillow>=10.3.0,<11.0.0
  1. create Dockerfile
FROM m.daocloud.io/docker.io/library/python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir  -r requirements.txt 

COPY model.py .

CMD ["python", "model.py", "--model_name=custom-model"]
  1. build and push custom docker image
docker build -t ay-custom-model .
docker tag ddfd0186813e docker-registry.lab.zverse.space/ay/ay-custom-model:latest
docker push docker-registry.lab.zverse.space/ay/ay-custom-model:latest
  1. create a namespace
kubectl create namespace kserve-test
  1. deploy a sample custom-model service
kubectl apply -n kserve-test -f - <<EOF
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: ay-custom-model
spec:
  predictor:
    containers:
      - name: kserve-container
        image: docker-registry.lab.zverse.space/ay/ay-custom-model:latest
EOF
  1. Check InferenceService status
kubectl -n kserve-test get inferenceservices ay-custom-model
kubectl -n kserve-test get pod
#NAME                                           READY   STATUS    RESTARTS   AGE
#ay-custom-model-predictor-00003-dcf4rk         2/2     Running   0        167m

kubectl -n kserve-test get inferenceservices ay-custom-model
#NAME           URL   READY     PREV   LATEST   PREVROLLEDOUTREVISION   LATESTREADYREVISION   AGE
#ay-custom-model   http://ay-custom-model.kserve-test.example.com   True           100                              ay-custom-model-predictor-00003   177m

After all pods are ready, you can access the service by using the following command

Access By

If the EXTERNAL-IP value is set, your environment has an external load balancer that you can use for the ingress gateway.

export INGRESS_HOST=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')

If the EXTERNAL-IP value is none (or perpetually pending), your environment does not provide an external load balancer for the ingress gateway. In this case, you can access the gateway using the service’s node port.

export INGRESS_HOST=$(minikube ip)
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
export INGRESS_HOST=$(minikube ip)
kubectl port-forward --namespace istio-system svc/istio-ingressgateway 30080:80
export INGRESS_PORT=30080
  1. Perform a prediction

First, prepare your inference input request inside a file:

wget -O ./alex-net-input.json https://kserve.github.io/website/0.15/modelserving/v1beta1/custom/custom_model/input.json
ssh -i ~/.minikube/machines/minikube/id_rsa docker@$(minikube ip) -L "*:${INGRESS_PORT}:0.0.0.0:${INGRESS_PORT}" -N -f
  1. Invoke the service
export SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice ay-custom-model  -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://ay-custom-model.kserve-test.example.com
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" -X POST "http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/custom-model:predict" -d @.//alex-net-input.json
*   Trying 192.168.58.2:30704...
* Connected to 192.168.58.2 (192.168.58.2) port 30704
> POST /v1/models/custom-model:predict HTTP/1.1
> Host: ay-custom-model.kserve-test.example.com
> User-Agent: curl/8.5.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 105339
> 
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< content-length: 110
< content-type: application/json
< date: Wed, 11 Jun 2025 03:38:30 GMT
< prediction-time-latency: 89.966773987
< server: istio-envoy
< x-envoy-upstream-service-time: 93
< 
* Connection #0 to host 192.168.58.2 left intact
{"predictions":[14.975619316101074,14.0368070602417,13.966034889221191,12.252280235290527,12.086270332336426]}

First Model In Minio

Inference Model In Minio

More Information about Deploy InferenceService with a saved model on S3 can be found 🔗link

Create Service Account

=== “yaml”

apiVersion: v1
kind: ServiceAccount
metadata:
  name: sa
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::123456789012:role/s3access # replace with your IAM role ARN
    serving.kserve.io/s3-endpoint: s3.amazonaws.com # replace with your s3 endpoint e.g minio-service.kubeflow:9000
    serving.kserve.io/s3-usehttps: "1" # by default 1, if testing with minio you can set to 0
    serving.kserve.io/s3-region: "us-east-2"
    serving.kserve.io/s3-useanoncredential: "false" # omitting this is the same as false, if true will ignore provided credential and use anonymous credentials

=== “kubectl”

kubectl apply -f create-s3-sa.yaml

Create S3 Secret and attach to Service Account

Create a secret with your S3 user credential, KServe reads the secret annotations to inject the S3 environment variables on storage initializer or model agent to download the models from S3 storage.

Create S3 secret

=== “yaml”

apiVersion: v1
kind: Secret
metadata:
  name: s3creds
  annotations:
     serving.kserve.io/s3-endpoint: s3.amazonaws.com # replace with your s3 endpoint e.g minio-service.kubeflow:9000
     serving.kserve.io/s3-usehttps: "1" # by default 1, if testing with minio you can set to 0
     serving.kserve.io/s3-region: "us-east-2"
     serving.kserve.io/s3-useanoncredential: "false" # omitting this is the same as false, if true will ignore provided credential and use anonymous credentials
type: Opaque
stringData: # use `stringData` for raw credential string or `data` for base64 encoded string
  AWS_ACCESS_KEY_ID: XXXX
  AWS_SECRET_ACCESS_KEY: XXXXXXXX

Attach secret to a service account

=== “yaml”

apiVersion: v1
kind: ServiceAccount
metadata:
  name: sa
secrets:
- name: s3creds

=== “kubectl”

kubectl apply -f create-s3-secret.yaml

!!! note If you are running kserve with istio sidecars enabled, there can be a race condition between the istio proxy being ready and the agent pulling models. This will result in a tcp dial connection refused error when the agent tries to download from s3.

To resolve it, istio allows the blocking of other containers in a pod until the proxy container is ready.

You can enabled this by setting `proxy.holdApplicationUntilProxyStarts: true` in `istio-sidecar-injector` configmap, `proxy.holdApplicationUntilProxyStarts` flag was introduced in Istio 1.7 as an experimental feature and is turned off by default.

Deploy the model on S3 with InferenceService

Create the InferenceService with the s3 storageUri and the service account with s3 credential attached.

=== “New Schema”

```yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "mnist-s3"
spec:
  predictor:
    serviceAccountName: sa
    model:
      modelFormat:
        name: tensorflow
      storageUri: "s3://kserve-examples/mnist"
```

=== “Old Schema”

```yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "mnist-s3"
spec:
  predictor:
    serviceAccountName: sa
    tensorflow:
      storageUri: "s3://kserve-examples/mnist"
```

Apply the autoscale-gpu.yaml.

=== “kubectl”

kubectl apply -f mnist-s3.yaml

Run a prediction

Now, the ingress can be accessed at ${INGRESS_HOST}:${INGRESS_PORT} or follow this instruction to find out the ingress IP and port.

SERVICE_HOSTNAME=$(kubectl get inferenceservice mnist-s3 -o jsonpath='{.status.url}' | cut -d "/" -f 3)

MODEL_NAME=mnist-s3
INPUT_PATH=@./input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict -d $INPUT_PATH

!!! success “Expected Output”

```{ .bash .no-copy }
Note: Unnecessary use of -X or --request, POST is already inferred.
*   Trying 35.237.217.209...
* TCP_NODELAY set
* Connected to mnist-s3.default.35.237.217.209.xip.io (35.237.217.209) port 80 (#0)
> POST /v1/models/mnist-s3:predict HTTP/1.1
> Host: mnist-s3.default.35.237.217.209.xip.io
> User-Agent: curl/7.55.1
> Accept: */*
> Content-Length: 2052
> Content-Type: application/x-www-form-urlencoded
> Expect: 100-continue
>
< HTTP/1.1 100 Continue
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< content-length: 251
< content-type: application/json
< date: Sun, 04 Apr 2021 20:06:27 GMT
< x-envoy-upstream-service-time: 5
< server: istio-envoy
<
* Connection #0 to host mnist-s3.default.35.237.217.209.xip.io left intact
{
    "predictions": [
        {
            "predictions": [0.327352405, 2.00153053e-07, 0.0113353515, 0.203903764, 3.62863029e-05, 0.416683704, 0.000281196437, 8.36911859e-05, 0.0403052084, 1.82206513e-05],
            "classes": 5
        }
    ]
}
```

Kafka Sink Transformer

AlexNet Inference

More Information about Custom Transformer service can be found 🔗link

  1. Implement Custom Transformer ./model.py using Kserve API
 1import os
 2import argparse
 3import json
 4
 5from typing import Dict, Union
 6from kafka import KafkaProducer
 7from cloudevents.http import CloudEvent
 8from cloudevents.conversion import to_structured
 9
10from kserve import (
11    Model,
12    ModelServer,
13    model_server,
14    logging,
15    InferRequest,
16    InferResponse,
17)
18
19from kserve.logging import logger
20from kserve.utils.utils import generate_uuid
21
22kafka_producer = KafkaProducer(
23    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
24    bootstrap_servers=os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
25)
26
27class ImageTransformer(Model):
28    def __init__(self, name: str):
29        super().__init__(name, return_response_headers=True)
30        self.ready = True
31
32
33    def preprocess(
34        self, payload: Union[Dict, InferRequest], headers: Dict[str, str] = None
35    ) -> Union[Dict, InferRequest]:
36        logger.info("Received inputs %s", payload)
37        logger.info("Received headers %s", headers)
38        self.request_trace_key = os.environ.get('REQUEST_TRACE_KEY', 'algo.trace.requestId')
39        if self.request_trace_key not in payload:
40            logger.error("Request trace key '%s' not found in payload, you cannot trace the prediction result", self.request_trace_key)
41            if "instances" not in payload:
42                raise ValueError(
43                    f"Request trace key '{self.request_trace_key}' not found in payload and 'instances' key is missing."
44                )
45        else:
46            headers[self.request_trace_key] = payload.get(self.request_trace_key)
47   
48        return {"instances": payload["instances"]}
49
50    def postprocess(
51        self,
52        infer_response: Union[Dict, InferResponse],
53        headers: Dict[str, str] = None,
54        response_headers: Dict[str, str] = None,
55    ) -> Union[Dict, InferResponse]:
56        logger.info("postprocess headers: %s", headers)
57        logger.info("postprocess response headers: %s", response_headers)
58        logger.info("postprocess response: %s", infer_response)
59
60        attributes = {
61            "source": "data-and-computing/kafka-sink-transformer",
62            "type": "org.zhejianglab.zverse.data-and-computing.kafka-sink-transformer",
63            "request-host": headers.get('host', 'unknown'),
64            "kserve-isvc-name": headers.get('kserve-isvc-name', 'unknown'),
65            "kserve-isvc-namespace": headers.get('kserve-isvc-namespace', 'unknown'),
66            self.request_trace_key: headers.get(self.request_trace_key, 'unknown'),
67        }
68
69        _, cloudevent = to_structured(CloudEvent(attributes, infer_response))
70        try:
71            kafka_producer.send(os.environ.get('KAFKA_TOPIC', 'test-topic'), value=cloudevent.decode('utf-8').replace("'", '"'))
72            kafka_producer.flush()
73        except Exception as e:
74            logger.error("Failed to send message to Kafka: %s", e)
75        return infer_response
76
77parser = argparse.ArgumentParser(parents=[model_server.parser])
78args, _ = parser.parse_known_args()
79
80if __name__ == "__main__":
81    if args.configure_logging:
82        logging.configure_logging(args.log_config_file)
83    logging.logger.info("available model name: %s", args.model_name)
84    logging.logger.info("all args: %s", args.model_name)
85    model = ImageTransformer(args.model_name)
86    ModelServer().start([model])
  1. modify ./pyproject.toml
[tool.poetry]
name = "custom_transformer"
version = "0.15.2"
description = "Custom Transformer Examples. Not intended for use outside KServe Frameworks Images."
authors = ["Dan Sun <dsun20@bloomberg.net>"]
license = "Apache-2.0"
packages = [
    { include = "*.py" }
]

[tool.poetry.dependencies]
python = ">=3.9,<3.13"
kserve = {path = "../kserve", develop = true}
pillow = "^10.3.0"
kafka-python = "^2.2.15"
cloudevents = "^1.11.1"

[[tool.poetry.source]]
name = "pytorch"
url = "https://download.pytorch.org/whl/cpu"
priority = "explicit"

[tool.poetry.group.test]
optional = true

[tool.poetry.group.test.dependencies]
pytest = "^7.4.4"
mypy = "^0.991"

[tool.poetry.group.dev]
optional = true

[tool.poetry.group.dev.dependencies]
black = { version = "~24.3.0", extras = ["colorama"] }

[tool.poetry-version-plugin]
source = "file"
file_path = "../VERSION"

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
  1. prepare ../custom_transformer.Dockerfile
ARG PYTHON_VERSION=3.11
ARG BASE_IMAGE=python:${PYTHON_VERSION}-slim-bookworm
ARG VENV_PATH=/prod_venv

FROM ${BASE_IMAGE} AS builder

# Install Poetry
ARG POETRY_HOME=/opt/poetry
ARG POETRY_VERSION=1.8.3

RUN python3 -m venv ${POETRY_HOME} && ${POETRY_HOME}/bin/pip install poetry==${POETRY_VERSION}
ENV PATH="$PATH:${POETRY_HOME}/bin"

# Activate virtual env
ARG VENV_PATH
ENV VIRTUAL_ENV=${VENV_PATH}
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

COPY kserve/pyproject.toml kserve/poetry.lock kserve/
RUN cd kserve && poetry install --no-root --no-interaction --no-cache
COPY kserve kserve
RUN cd kserve && poetry install --no-interaction --no-cache

COPY custom_transformer/pyproject.toml custom_transformer/poetry.lock custom_transformer/
RUN cd custom_transformer && poetry install --no-root --no-interaction --no-cache
COPY custom_transformer custom_transformer
RUN cd custom_transformer && poetry install --no-interaction --no-cache


FROM ${BASE_IMAGE} AS prod

COPY third_party third_party

# Activate virtual env
ARG VENV_PATH
ENV VIRTUAL_ENV=${VENV_PATH}
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

RUN useradd kserve -m -u 1000 -d /home/kserve

COPY --from=builder --chown=kserve:kserve $VIRTUAL_ENV $VIRTUAL_ENV
COPY --from=builder kserve kserve
COPY --from=builder custom_transformer custom_transformer

USER 1000
ENTRYPOINT ["python", "-m", "custom_transformer.model"]
  1. regenerate poetry.lock
poetry lock --no-update
  1. build and push custom docker image
cd python
podman build -t docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9 -f custom_transformer.Dockerfile .

podman push docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9