Subsections of Inference
First Pytorch ISVC
Mnist Inference
More Information about
mnist
service can be found 🔗link
- create a namespace
kubectl create namespace kserve-test
- deploy a sample
iris
service
kubectl apply -n kserve-test -f - <<EOF
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "first-torchserve"
namespace: kserve-test
spec:
predictor:
model:
modelFormat:
name: pytorch
storageUri: gs://kfserving-examples/models/torchserve/image_classifier/v1
resources:
limits:
memory: 4Gi
EOF
- Check
InferenceService
status
kubectl -n kserve-test get inferenceservices first-torchserve
After all pods are ready, you can access the service by using the following command
If the EXTERNAL-IP value is set, your environment has an external load balancer that you can use for the ingress gateway.
export INGRESS_HOST=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
If the EXTERNAL-IP value is none (or perpetually pending), your environment does not provide an external load balancer for the ingress gateway. In this case, you can access the gateway using the service’s node port.
export INGRESS_HOST=$(minikube ip)
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
export INGRESS_HOST=$(minikube ip)
kubectl port-forward --namespace istio-system svc/istio-ingressgateway 30080:80
export INGRESS_PORT=30080
- Perform a prediction First, prepare your inference input request inside a file:
wget -O ./mnist-input.json https://raw.githubusercontent.com/kserve/kserve/refs/heads/master/docs/samples/v1beta1/torchserve/v1/imgconv/input.json
- Invoke the service
SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice first-torchserve -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://first-torchserve.kserve-test.example.com
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" "http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/mnist:predict" -d @./mnist-input.json
First Custom Model
AlexNet Inference
More Information about
AlexNet
service can be found 🔗link
- Implement Custom Model using KServe API
1import argparse
2import base64
3import io
4import time
5
6from fastapi.middleware.cors import CORSMiddleware
7from torchvision import models, transforms
8from typing import Dict
9import torch
10from PIL import Image
11
12import kserve
13from kserve import Model, ModelServer, logging
14from kserve.model_server import app
15from kserve.utils.utils import generate_uuid
16
17
18class AlexNetModel(Model):
19 def __init__(self, name: str):
20 super().__init__(name, return_response_headers=True)
21 self.name = name
22 self.load()
23 self.ready = False
24
25 def load(self):
26 self.model = models.alexnet(pretrained=True)
27 self.model.eval()
28 # The ready flag is used by model ready endpoint for readiness probes,
29 # set to True when model is loaded successfully without exceptions.
30 self.ready = True
31
32 async def predict(
33 self,
34 payload: Dict,
35 headers: Dict[str, str] = None,
36 response_headers: Dict[str, str] = None,
37 ) -> Dict:
38 start = time.time()
39 # Input follows the Tensorflow V1 HTTP API for binary values
40 # https://www.tensorflow.org/tfx/serving/api_rest#encoding_binary_values
41 img_data = payload["instances"][0]["image"]["b64"]
42 raw_img_data = base64.b64decode(img_data)
43 input_image = Image.open(io.BytesIO(raw_img_data))
44 preprocess = transforms.Compose([
45 transforms.Resize(256),
46 transforms.CenterCrop(224),
47 transforms.ToTensor(),
48 transforms.Normalize(mean=[0.485, 0.456, 0.406],
49 std=[0.229, 0.224, 0.225]),
50 ])
51 input_tensor = preprocess(input_image).unsqueeze(0)
52 output = self.model(input_tensor)
53 torch.nn.functional.softmax(output, dim=1)
54 values, top_5 = torch.topk(output, 5)
55 result = values.flatten().tolist()
56 end = time.time()
57 response_id = generate_uuid()
58
59 # Custom response headers can be added to the inference response
60 if response_headers is not None:
61 response_headers.update(
62 {"prediction-time-latency": f"{round((end - start) * 1000, 9)}"}
63 )
64
65 return {"predictions": result}
66
67
68parser = argparse.ArgumentParser(parents=[kserve.model_server.parser])
69args, _ = parser.parse_known_args()
70
71if __name__ == "__main__":
72 # Configure kserve and uvicorn logger
73 if args.configure_logging:
74 logging.configure_logging(args.log_config_file)
75 model = AlexNetModel(args.model_name)
76 model.load()
77 # Custom middlewares can be added to the model
78 app.add_middleware(
79 CORSMiddleware,
80 allow_origins=["*"],
81 allow_credentials=True,
82 allow_methods=["*"],
83 allow_headers=["*"],
84 )
85 ModelServer().start([model])
- create
requirements.txt
kserve
torchvision==0.18.0
pillow>=10.3.0,<11.0.0
- create
Dockerfile
FROM m.daocloud.io/docker.io/library/python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY model.py .
CMD ["python", "model.py", "--model_name=custom-model"]
- build and push custom docker image
docker build -t ay-custom-model .
docker tag ddfd0186813e docker-registry.lab.zverse.space/ay/ay-custom-model:latest
docker push docker-registry.lab.zverse.space/ay/ay-custom-model:latest
- create a namespace
kubectl create namespace kserve-test
- deploy a sample
custom-model
service
kubectl apply -n kserve-test -f - <<EOF
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: ay-custom-model
spec:
predictor:
containers:
- name: kserve-container
image: docker-registry.lab.zverse.space/ay/ay-custom-model:latest
EOF
- Check
InferenceService
status
kubectl -n kserve-test get inferenceservices ay-custom-model
After all pods are ready, you can access the service by using the following command
If the EXTERNAL-IP value is set, your environment has an external load balancer that you can use for the ingress gateway.
export INGRESS_HOST=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
If the EXTERNAL-IP value is none (or perpetually pending), your environment does not provide an external load balancer for the ingress gateway. In this case, you can access the gateway using the service’s node port.
export INGRESS_HOST=$(minikube ip)
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
export INGRESS_HOST=$(minikube ip)
kubectl port-forward --namespace istio-system svc/istio-ingressgateway 30080:80
export INGRESS_PORT=30080
- Perform a prediction
First, prepare your inference input request inside a file:
wget -O ./alex-net-input.json https://kserve.github.io/website/0.15/modelserving/v1beta1/custom/custom_model/input.json
- Invoke the service
export SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice ay-custom-model -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://ay-custom-model.kserve-test.example.com
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" -X POST "http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/custom-model:predict" -d @.//alex-net-input.json
First Model In Minio
Inference Model In Minio
More Information about
Deploy InferenceService with a saved model on S3
can be found 🔗link
Create Service Account
=== “yaml”
apiVersion: v1
kind: ServiceAccount
metadata:
name: sa
annotations:
eks.amazonaws.com/role-arn: arn:aws:iam::123456789012:role/s3access # replace with your IAM role ARN
serving.kserve.io/s3-endpoint: s3.amazonaws.com # replace with your s3 endpoint e.g minio-service.kubeflow:9000
serving.kserve.io/s3-usehttps: "1" # by default 1, if testing with minio you can set to 0
serving.kserve.io/s3-region: "us-east-2"
serving.kserve.io/s3-useanoncredential: "false" # omitting this is the same as false, if true will ignore provided credential and use anonymous credentials
=== “kubectl”
kubectl apply -f create-s3-sa.yaml
Create S3 Secret and attach to Service Account
Create a secret with your S3 user credential, KServe
reads the secret annotations to inject the S3 environment variables on storage initializer or model agent to download the models from S3 storage.
Create S3 secret
=== “yaml”
apiVersion: v1
kind: Secret
metadata:
name: s3creds
annotations:
serving.kserve.io/s3-endpoint: s3.amazonaws.com # replace with your s3 endpoint e.g minio-service.kubeflow:9000
serving.kserve.io/s3-usehttps: "1" # by default 1, if testing with minio you can set to 0
serving.kserve.io/s3-region: "us-east-2"
serving.kserve.io/s3-useanoncredential: "false" # omitting this is the same as false, if true will ignore provided credential and use anonymous credentials
type: Opaque
stringData: # use `stringData` for raw credential string or `data` for base64 encoded string
AWS_ACCESS_KEY_ID: XXXX
AWS_SECRET_ACCESS_KEY: XXXXXXXX
Attach secret to a service account
=== “yaml”
apiVersion: v1
kind: ServiceAccount
metadata:
name: sa
secrets:
- name: s3creds
=== “kubectl”
kubectl apply -f create-s3-secret.yaml
!!! note
If you are running kserve with istio sidecars enabled, there can be a race condition between the istio proxy being ready and the agent pulling models. This will result in a tcp dial connection refused
error when the agent tries to download from s3.
To resolve it, istio allows the blocking of other containers in a pod until the proxy container is ready.
You can enabled this by setting `proxy.holdApplicationUntilProxyStarts: true` in `istio-sidecar-injector` configmap, `proxy.holdApplicationUntilProxyStarts` flag was introduced in Istio 1.7 as an experimental feature and is turned off by default.
Deploy the model on S3 with InferenceService
Create the InferenceService with the s3 storageUri
and the service account with s3 credential attached.
=== “New Schema”
```yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "mnist-s3"
spec:
predictor:
serviceAccountName: sa
model:
modelFormat:
name: tensorflow
storageUri: "s3://kserve-examples/mnist"
```
=== “Old Schema”
```yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "mnist-s3"
spec:
predictor:
serviceAccountName: sa
tensorflow:
storageUri: "s3://kserve-examples/mnist"
```
Apply the autoscale-gpu.yaml
.
=== “kubectl”
kubectl apply -f mnist-s3.yaml
Run a prediction
Now, the ingress can be accessed at ${INGRESS_HOST}:${INGRESS_PORT}
or follow this instruction
to find out the ingress IP and port.
SERVICE_HOSTNAME=$(kubectl get inferenceservice mnist-s3 -o jsonpath='{.status.url}' | cut -d "/" -f 3)
MODEL_NAME=mnist-s3
INPUT_PATH=@./input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict -d $INPUT_PATH
!!! success “Expected Output”
```{ .bash .no-copy }
Note: Unnecessary use of -X or --request, POST is already inferred.
* Trying 35.237.217.209...
* TCP_NODELAY set
* Connected to mnist-s3.default.35.237.217.209.xip.io (35.237.217.209) port 80 (#0)
> POST /v1/models/mnist-s3:predict HTTP/1.1
> Host: mnist-s3.default.35.237.217.209.xip.io
> User-Agent: curl/7.55.1
> Accept: */*
> Content-Length: 2052
> Content-Type: application/x-www-form-urlencoded
> Expect: 100-continue
>
< HTTP/1.1 100 Continue
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< content-length: 251
< content-type: application/json
< date: Sun, 04 Apr 2021 20:06:27 GMT
< x-envoy-upstream-service-time: 5
< server: istio-envoy
<
* Connection #0 to host mnist-s3.default.35.237.217.209.xip.io left intact
{
"predictions": [
{
"predictions": [0.327352405, 2.00153053e-07, 0.0113353515, 0.203903764, 3.62863029e-05, 0.416683704, 0.000281196437, 8.36911859e-05, 0.0403052084, 1.82206513e-05],
"classes": 5
}
]
}
```
Kafka Sink Transformer
AlexNet Inference
More Information about
Custom Transformer
service can be found 🔗link
- Implement Custom Transformer
./model.py
using Kserve API
1import os
2import argparse
3import json
4
5from typing import Dict, Union
6from kafka import KafkaProducer
7from cloudevents.http import CloudEvent
8from cloudevents.conversion import to_structured
9
10from kserve import (
11 Model,
12 ModelServer,
13 model_server,
14 logging,
15 InferRequest,
16 InferResponse,
17)
18
19from kserve.logging import logger
20from kserve.utils.utils import generate_uuid
21
22kafka_producer = KafkaProducer(
23 value_serializer=lambda v: json.dumps(v).encode('utf-8'),
24 bootstrap_servers=os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
25)
26
27class ImageTransformer(Model):
28 def __init__(self, name: str):
29 super().__init__(name, return_response_headers=True)
30 self.ready = True
31
32
33 def preprocess(
34 self, payload: Union[Dict, InferRequest], headers: Dict[str, str] = None
35 ) -> Union[Dict, InferRequest]:
36 logger.info("Received inputs %s", payload)
37 logger.info("Received headers %s", headers)
38 self.request_trace_key = os.environ.get('REQUEST_TRACE_KEY', 'algo.trace.requestId')
39 if self.request_trace_key not in payload:
40 logger.error("Request trace key '%s' not found in payload, you cannot trace the prediction result", self.request_trace_key)
41 if "instances" not in payload:
42 raise ValueError(
43 f"Request trace key '{self.request_trace_key}' not found in payload and 'instances' key is missing."
44 )
45 else:
46 headers[self.request_trace_key] = payload.get(self.request_trace_key)
47
48 return {"instances": payload["instances"]}
49
50 def postprocess(
51 self,
52 infer_response: Union[Dict, InferResponse],
53 headers: Dict[str, str] = None,
54 response_headers: Dict[str, str] = None,
55 ) -> Union[Dict, InferResponse]:
56 logger.info("postprocess headers: %s", headers)
57 logger.info("postprocess response headers: %s", response_headers)
58 logger.info("postprocess response: %s", infer_response)
59
60 attributes = {
61 "source": "data-and-computing/kafka-sink-transformer",
62 "type": "org.zhejianglab.zverse.data-and-computing.kafka-sink-transformer",
63 "request-host": headers.get('host', 'unknown'),
64 "kserve-isvc-name": headers.get('kserve-isvc-name', 'unknown'),
65 "kserve-isvc-namespace": headers.get('kserve-isvc-namespace', 'unknown'),
66 self.request_trace_key: headers.get(self.request_trace_key, 'unknown'),
67 }
68
69 _, cloudevent = to_structured(CloudEvent(attributes, infer_response))
70 try:
71 kafka_producer.send(os.environ.get('KAFKA_TOPIC', 'test-topic'), value=cloudevent.decode('utf-8').replace("'", '"'))
72 kafka_producer.flush()
73 except Exception as e:
74 logger.error("Failed to send message to Kafka: %s", e)
75 return infer_response
76
77parser = argparse.ArgumentParser(parents=[model_server.parser])
78args, _ = parser.parse_known_args()
79
80if __name__ == "__main__":
81 if args.configure_logging:
82 logging.configure_logging(args.log_config_file)
83 logging.logger.info("available model name: %s", args.model_name)
84 logging.logger.info("all args: %s", args.model_name)
85 model = ImageTransformer(args.model_name)
86 ModelServer().start([model])
- modify
./pyproject.toml
[tool.poetry]
name = "custom_transformer"
version = "0.15.2"
description = "Custom Transformer Examples. Not intended for use outside KServe Frameworks Images."
authors = ["Dan Sun <dsun20@bloomberg.net>"]
license = "Apache-2.0"
packages = [
{ include = "*.py" }
]
[tool.poetry.dependencies]
python = ">=3.9,<3.13"
kserve = {path = "../kserve", develop = true}
pillow = "^10.3.0"
kafka-python = "^2.2.15"
cloudevents = "^1.11.1"
[[tool.poetry.source]]
name = "pytorch"
url = "https://download.pytorch.org/whl/cpu"
priority = "explicit"
[tool.poetry.group.test]
optional = true
[tool.poetry.group.test.dependencies]
pytest = "^7.4.4"
mypy = "^0.991"
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
black = { version = "~24.3.0", extras = ["colorama"] }
[tool.poetry-version-plugin]
source = "file"
file_path = "../VERSION"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
- prepare
../custom_transformer.Dockerfile
ARG PYTHON_VERSION=3.11
ARG BASE_IMAGE=python:${PYTHON_VERSION}-slim-bookworm
ARG VENV_PATH=/prod_venv
FROM ${BASE_IMAGE} AS builder
# Install Poetry
ARG POETRY_HOME=/opt/poetry
ARG POETRY_VERSION=1.8.3
RUN python3 -m venv ${POETRY_HOME} && ${POETRY_HOME}/bin/pip install poetry==${POETRY_VERSION}
ENV PATH="$PATH:${POETRY_HOME}/bin"
# Activate virtual env
ARG VENV_PATH
ENV VIRTUAL_ENV=${VENV_PATH}
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
COPY kserve/pyproject.toml kserve/poetry.lock kserve/
RUN cd kserve && poetry install --no-root --no-interaction --no-cache
COPY kserve kserve
RUN cd kserve && poetry install --no-interaction --no-cache
COPY custom_transformer/pyproject.toml custom_transformer/poetry.lock custom_transformer/
RUN cd custom_transformer && poetry install --no-root --no-interaction --no-cache
COPY custom_transformer custom_transformer
RUN cd custom_transformer && poetry install --no-interaction --no-cache
FROM ${BASE_IMAGE} AS prod
COPY third_party third_party
# Activate virtual env
ARG VENV_PATH
ENV VIRTUAL_ENV=${VENV_PATH}
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
RUN useradd kserve -m -u 1000 -d /home/kserve
COPY --from=builder --chown=kserve:kserve $VIRTUAL_ENV $VIRTUAL_ENV
COPY --from=builder kserve kserve
COPY --from=builder custom_transformer custom_transformer
USER 1000
ENTRYPOINT ["python", "-m", "custom_transformer.model"]
- regenerate poetry.lock
poetry lock --no-update
- build and push custom docker image
cd python
podman build -t docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9 -f custom_transformer.Dockerfile .
podman push docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9