Subsections of Serverless
Kserve
Subsections of Kserve
Install Kserve
Preliminary
Installation
Preliminary
1. Kubernetes has installed, if not check 🔗link2. Helm binary has installed, if not check 🔗link1.install from script directly
curl -s "https://raw.githubusercontent.com/kserve/kserve/release-0.15/hack/quick_install.sh" | bash
Installing Gateway API CRDs …
…
😀 Successfully installed Istio
😀 Successfully installed Cert Manager
😀 Successfully installed Knative
Preliminary
1. If you have only one node in your cluster, you need at least 6 CPUs, 6 GB of memory, and 30 GB of disk storage.2. If you have multiple nodes in your cluster, for each node you need at least 2 CPUs, 4 GB of memory, and 20 GB of disk storage.1.install knative serving CRD resources
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.18.0/serving-crds.yaml
2.install knative serving components
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.18.0/serving-core.yaml
# kubectl apply -f https://raw.githubusercontent.com/AaronYang0628/assets/refs/heads/main/knative/serving/release/download/knative-v1.18.0/serving-core.yaml
3.install network layer Istio
kubectl apply -l knative.dev/crd-install=true -f https://github.com/knative/net-istio/releases/download/knative-v1.18.0/istio.yaml
kubectl apply -f https://github.com/knative/net-istio/releases/download/knative-v1.18.0/istio.yaml
kubectl apply -f https://github.com/knative/net-istio/releases/download/knative-v1.18.0/net-istio.yaml
4.install cert manager
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.17.2/cert-manager.yaml
5.install kserve
kubectl apply --server-side -f https://github.com/kserve/kserve/releases/download/v0.15.0/kserve.yaml
kubectl apply --server-side -f https://github.com/kserve/kserve/releases/download/v0.15.0/kserve-cluster-resources.yaml
for more information, you can check 🔗https://artifacthub.io/packages/helm/prometheus-community/prometheus
Preliminary
1. Kubernetes has installed, if not check 🔗link2. ArgoCD has installed, if not check 🔗link3. Helm binary has installed, if not check 🔗link1.install gateway API CRDs
kubectl apply -f https://github.com/kubernetes-sigs/gateway-api/releases/download/v1.3.0/standard-install.yaml
2.install cert manager
following 🔗link to install cert manager
3.install istio system
following 🔗link to install three istio components (istio-base, istiod, istio-ingressgateway)
4.install Knative Operator
kubectl -n argocd apply -f - << EOF
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: knative-operator
spec:
syncPolicy:
syncOptions:
- CreateNamespace=true
project: default
source:
repoURL: https://knative.github.io/operator
chart: knative-operator
targetRevision: v1.18.1
helm:
releaseName: knative-operator
values: |
knative_operator:
knative_operator:
image: m.daocloud.io/gcr.io/knative-releases/knative.dev/operator/cmd/operator
tag: v1.18.1
resources:
requests:
cpu: 100m
memory: 100Mi
limits:
cpu: 1000m
memory: 1000Mi
operator_webhook:
image: m.daocloud.io/gcr.io/knative-releases/knative.dev/operator/cmd/webhook
tag: v1.18.1
resources:
requests:
cpu: 100m
memory: 100Mi
limits:
cpu: 500m
memory: 500Mi
destination:
server: https://kubernetes.default.svc
namespace: knative-serving
EOF
5.sync by argocd
argocd app sync argocd/knative-operator
6.install kserve serving CRD
kubectl apply -f - <<EOF
apiVersion: operator.knative.dev/v1beta1
kind: KnativeServing
metadata:
name: knative-serving
namespace: knative-serving
spec:
version: 1.18.0 # this is knative serving version
config:
domain:
example.com: ""
EOF
7.install kserve CRD
kubectl -n argocd apply -f - << EOF
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: kserve-crd
annotations:
argocd.argoproj.io/sync-options: ServerSideApply=true
argocd.argoproj.io/compare-options: IgnoreExtraneous
spec:
syncPolicy:
syncOptions:
- CreateNamespace=true
- ServerSideApply=true
project: default
source:
repoURL: https://aaronyang0628.github.io/helm-chart-mirror/charts
chart: kserve-crd
targetRevision: v0.15.2
helm:
releaseName: kserve-crd
destination:
server: https://kubernetes.default.svc
namespace: kserve
EOF
8.sync by argocd
argocd app sync argocd/kserve-crd
9.install kserve Controller
kubectl -n argocd apply -f - << EOF
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: kserve
annotations:
argocd.argoproj.io/sync-options: ServerSideApply=true
argocd.argoproj.io/compare-options: IgnoreExtraneous
spec:
syncPolicy:
syncOptions:
- CreateNamespace=true
- ServerSideApply=true
project: default
source:
repoURL: https://aaronyang0628.github.io/helm-chart-mirror/charts
chart: kserve
targetRevision: v0.15.2
helm:
releaseName: kserve
values: |
kserve:
agent:
image: m.daocloud.io/docker.io/kserve/agent
router:
image: m.daocloud.io/docker.io/kserve/router
storage:
image: m.daocloud.io/docker.io/kserve/storage-initializer
s3:
accessKeyIdName: AWS_ACCESS_KEY_ID
secretAccessKeyName: AWS_SECRET_ACCESS_KEY
endpoint: ""
region: ""
verifySSL: ""
useVirtualBucket: ""
useAnonymousCredential: ""
controller:
deploymentMode: "Serverless"
rbacProxyImage: m.daocloud.io/quay.io/brancz/kube-rbac-proxy:v0.18.0
rbacProxy:
resources:
limits:
cpu: 100m
memory: 300Mi
requests:
cpu: 100m
memory: 300Mi
gateway:
domain: example.com
image: m.daocloud.io/docker.io/kserve/kserve-controller
resources:
limits:
cpu: 100m
memory: 300Mi
requests:
cpu: 100m
memory: 300Mi
servingruntime:
tensorflow:
image: tensorflow/serving
tag: 2.6.2
mlserver:
image: m.daocloud.io/docker.io/seldonio/mlserver
tag: 1.5.0
sklearnserver:
image: m.daocloud.io/docker.io/kserve/sklearnserver
xgbserver:
image: m.daocloud.io/docker.io/kserve/xgbserver
huggingfaceserver:
image: m.daocloud.io/docker.io/kserve/huggingfaceserver
devShm:
enabled: false
sizeLimit: ""
hostIPC:
enabled: false
huggingfaceserver_multinode:
shm:
enabled: true
sizeLimit: "3Gi"
tritonserver:
image: nvcr.io/nvidia/tritonserver
pmmlserver:
image: m.daocloud.io/docker.io/kserve/pmmlserver
paddleserver:
image: m.daocloud.io/docker.io/kserve/paddleserver
lgbserver:
image: m.daocloud.io/docker.io/kserve/lgbserver
torchserve:
image: pytorch/torchserve-kfs
tag: 0.9.0
art:
image: m.daocloud.io/docker.io/kserve/art-explainer
localmodel:
enabled: false
controller:
image: m.daocloud.io/docker.io/kserve/kserve-localmodel-controller
jobNamespace: kserve-localmodel-jobs
agent:
hostPath: /mnt/models
image: m.daocloud.io/docker.io/kserve/kserve-localmodelnode-agent
inferenceservice:
resources:
limits:
cpu: "1"
memory: "2Gi"
requests:
cpu: "1"
memory: "2Gi"
destination:
server: https://kubernetes.default.svc
namespace: kserve
EOF
10.sync by argocd
argocd app sync argocd/kserve
11.install kserve eventing CRD
kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.18.1/eventing-crds.yaml
12.install kserve eventing
kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.18.1/eventing-core.yaml
FAQ
Serving
Subsections of Serving
Inference
Subsections of Inference
First Pytorch ISVC
Mnist Inference
More Information about
mnist
service can be found 🔗link
- create a namespace
kubectl create namespace kserve-test
- deploy a sample
iris
service
kubectl apply -n kserve-test -f - <<EOF
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "first-torchserve"
namespace: kserve-test
spec:
predictor:
model:
modelFormat:
name: pytorch
storageUri: gs://kfserving-examples/models/torchserve/image_classifier/v1
resources:
limits:
memory: 4Gi
EOF
- Check
InferenceService
status
kubectl -n kserve-test get inferenceservices first-torchserve
After all pods are ready, you can access the service by using the following command
If the EXTERNAL-IP value is set, your environment has an external load balancer that you can use for the ingress gateway.
export INGRESS_HOST=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
If the EXTERNAL-IP value is none (or perpetually pending), your environment does not provide an external load balancer for the ingress gateway. In this case, you can access the gateway using the service’s node port.
export INGRESS_HOST=$(minikube ip)
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
export INGRESS_HOST=$(minikube ip)
kubectl port-forward --namespace istio-system svc/istio-ingressgateway 30080:80
export INGRESS_PORT=30080
- Perform a prediction First, prepare your inference input request inside a file:
wget -O ./mnist-input.json https://raw.githubusercontent.com/kserve/kserve/refs/heads/master/docs/samples/v1beta1/torchserve/v1/imgconv/input.json
- Invoke the service
SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice first-torchserve -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://first-torchserve.kserve-test.example.com
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" "http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/mnist:predict" -d @./mnist-input.json
First Custom Model
AlexNet Inference
More Information about
AlexNet
service can be found 🔗link
- Implement Custom Model using KServe API
1import argparse
2import base64
3import io
4import time
5
6from fastapi.middleware.cors import CORSMiddleware
7from torchvision import models, transforms
8from typing import Dict
9import torch
10from PIL import Image
11
12import kserve
13from kserve import Model, ModelServer, logging
14from kserve.model_server import app
15from kserve.utils.utils import generate_uuid
16
17
18class AlexNetModel(Model):
19 def __init__(self, name: str):
20 super().__init__(name, return_response_headers=True)
21 self.name = name
22 self.load()
23 self.ready = False
24
25 def load(self):
26 self.model = models.alexnet(pretrained=True)
27 self.model.eval()
28 # The ready flag is used by model ready endpoint for readiness probes,
29 # set to True when model is loaded successfully without exceptions.
30 self.ready = True
31
32 async def predict(
33 self,
34 payload: Dict,
35 headers: Dict[str, str] = None,
36 response_headers: Dict[str, str] = None,
37 ) -> Dict:
38 start = time.time()
39 # Input follows the Tensorflow V1 HTTP API for binary values
40 # https://www.tensorflow.org/tfx/serving/api_rest#encoding_binary_values
41 img_data = payload["instances"][0]["image"]["b64"]
42 raw_img_data = base64.b64decode(img_data)
43 input_image = Image.open(io.BytesIO(raw_img_data))
44 preprocess = transforms.Compose([
45 transforms.Resize(256),
46 transforms.CenterCrop(224),
47 transforms.ToTensor(),
48 transforms.Normalize(mean=[0.485, 0.456, 0.406],
49 std=[0.229, 0.224, 0.225]),
50 ])
51 input_tensor = preprocess(input_image).unsqueeze(0)
52 output = self.model(input_tensor)
53 torch.nn.functional.softmax(output, dim=1)
54 values, top_5 = torch.topk(output, 5)
55 result = values.flatten().tolist()
56 end = time.time()
57 response_id = generate_uuid()
58
59 # Custom response headers can be added to the inference response
60 if response_headers is not None:
61 response_headers.update(
62 {"prediction-time-latency": f"{round((end - start) * 1000, 9)}"}
63 )
64
65 return {"predictions": result}
66
67
68parser = argparse.ArgumentParser(parents=[kserve.model_server.parser])
69args, _ = parser.parse_known_args()
70
71if __name__ == "__main__":
72 # Configure kserve and uvicorn logger
73 if args.configure_logging:
74 logging.configure_logging(args.log_config_file)
75 model = AlexNetModel(args.model_name)
76 model.load()
77 # Custom middlewares can be added to the model
78 app.add_middleware(
79 CORSMiddleware,
80 allow_origins=["*"],
81 allow_credentials=True,
82 allow_methods=["*"],
83 allow_headers=["*"],
84 )
85 ModelServer().start([model])
- create
requirements.txt
kserve
torchvision==0.18.0
pillow>=10.3.0,<11.0.0
- create
Dockerfile
FROM m.daocloud.io/docker.io/library/python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY model.py .
CMD ["python", "model.py", "--model_name=custom-model"]
- build and push custom docker image
docker build -t ay-custom-model .
docker tag ddfd0186813e docker-registry.lab.zverse.space/ay/ay-custom-model:latest
docker push docker-registry.lab.zverse.space/ay/ay-custom-model:latest
- create a namespace
kubectl create namespace kserve-test
- deploy a sample
custom-model
service
kubectl apply -n kserve-test -f - <<EOF
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: ay-custom-model
spec:
predictor:
containers:
- name: kserve-container
image: docker-registry.lab.zverse.space/ay/ay-custom-model:latest
EOF
- Check
InferenceService
status
kubectl -n kserve-test get inferenceservices ay-custom-model
After all pods are ready, you can access the service by using the following command
If the EXTERNAL-IP value is set, your environment has an external load balancer that you can use for the ingress gateway.
export INGRESS_HOST=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
If the EXTERNAL-IP value is none (or perpetually pending), your environment does not provide an external load balancer for the ingress gateway. In this case, you can access the gateway using the service’s node port.
export INGRESS_HOST=$(minikube ip)
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
export INGRESS_HOST=$(minikube ip)
kubectl port-forward --namespace istio-system svc/istio-ingressgateway 30080:80
export INGRESS_PORT=30080
- Perform a prediction
First, prepare your inference input request inside a file:
wget -O ./alex-net-input.json https://kserve.github.io/website/0.15/modelserving/v1beta1/custom/custom_model/input.json
- Invoke the service
export SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice ay-custom-model -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://ay-custom-model.kserve-test.example.com
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" -X POST "http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/custom-model:predict" -d @.//alex-net-input.json
First Model In Minio
Inference Model In Minio
More Information about
Deploy InferenceService with a saved model on S3
can be found 🔗link
Create Service Account
=== “yaml”
apiVersion: v1
kind: ServiceAccount
metadata:
name: sa
annotations:
eks.amazonaws.com/role-arn: arn:aws:iam::123456789012:role/s3access # replace with your IAM role ARN
serving.kserve.io/s3-endpoint: s3.amazonaws.com # replace with your s3 endpoint e.g minio-service.kubeflow:9000
serving.kserve.io/s3-usehttps: "1" # by default 1, if testing with minio you can set to 0
serving.kserve.io/s3-region: "us-east-2"
serving.kserve.io/s3-useanoncredential: "false" # omitting this is the same as false, if true will ignore provided credential and use anonymous credentials
=== “kubectl”
kubectl apply -f create-s3-sa.yaml
Create S3 Secret and attach to Service Account
Create a secret with your S3 user credential, KServe
reads the secret annotations to inject the S3 environment variables on storage initializer or model agent to download the models from S3 storage.
Create S3 secret
=== “yaml”
apiVersion: v1
kind: Secret
metadata:
name: s3creds
annotations:
serving.kserve.io/s3-endpoint: s3.amazonaws.com # replace with your s3 endpoint e.g minio-service.kubeflow:9000
serving.kserve.io/s3-usehttps: "1" # by default 1, if testing with minio you can set to 0
serving.kserve.io/s3-region: "us-east-2"
serving.kserve.io/s3-useanoncredential: "false" # omitting this is the same as false, if true will ignore provided credential and use anonymous credentials
type: Opaque
stringData: # use `stringData` for raw credential string or `data` for base64 encoded string
AWS_ACCESS_KEY_ID: XXXX
AWS_SECRET_ACCESS_KEY: XXXXXXXX
Attach secret to a service account
=== “yaml”
apiVersion: v1
kind: ServiceAccount
metadata:
name: sa
secrets:
- name: s3creds
=== “kubectl”
kubectl apply -f create-s3-secret.yaml
!!! note
If you are running kserve with istio sidecars enabled, there can be a race condition between the istio proxy being ready and the agent pulling models. This will result in a tcp dial connection refused
error when the agent tries to download from s3.
To resolve it, istio allows the blocking of other containers in a pod until the proxy container is ready.
You can enabled this by setting `proxy.holdApplicationUntilProxyStarts: true` in `istio-sidecar-injector` configmap, `proxy.holdApplicationUntilProxyStarts` flag was introduced in Istio 1.7 as an experimental feature and is turned off by default.
Deploy the model on S3 with InferenceService
Create the InferenceService with the s3 storageUri
and the service account with s3 credential attached.
=== “New Schema”
```yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "mnist-s3"
spec:
predictor:
serviceAccountName: sa
model:
modelFormat:
name: tensorflow
storageUri: "s3://kserve-examples/mnist"
```
=== “Old Schema”
```yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "mnist-s3"
spec:
predictor:
serviceAccountName: sa
tensorflow:
storageUri: "s3://kserve-examples/mnist"
```
Apply the autoscale-gpu.yaml
.
=== “kubectl”
kubectl apply -f mnist-s3.yaml
Run a prediction
Now, the ingress can be accessed at ${INGRESS_HOST}:${INGRESS_PORT}
or follow this instruction
to find out the ingress IP and port.
SERVICE_HOSTNAME=$(kubectl get inferenceservice mnist-s3 -o jsonpath='{.status.url}' | cut -d "/" -f 3)
MODEL_NAME=mnist-s3
INPUT_PATH=@./input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict -d $INPUT_PATH
!!! success “Expected Output”
```{ .bash .no-copy }
Note: Unnecessary use of -X or --request, POST is already inferred.
* Trying 35.237.217.209...
* TCP_NODELAY set
* Connected to mnist-s3.default.35.237.217.209.xip.io (35.237.217.209) port 80 (#0)
> POST /v1/models/mnist-s3:predict HTTP/1.1
> Host: mnist-s3.default.35.237.217.209.xip.io
> User-Agent: curl/7.55.1
> Accept: */*
> Content-Length: 2052
> Content-Type: application/x-www-form-urlencoded
> Expect: 100-continue
>
< HTTP/1.1 100 Continue
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< content-length: 251
< content-type: application/json
< date: Sun, 04 Apr 2021 20:06:27 GMT
< x-envoy-upstream-service-time: 5
< server: istio-envoy
<
* Connection #0 to host mnist-s3.default.35.237.217.209.xip.io left intact
{
"predictions": [
{
"predictions": [0.327352405, 2.00153053e-07, 0.0113353515, 0.203903764, 3.62863029e-05, 0.416683704, 0.000281196437, 8.36911859e-05, 0.0403052084, 1.82206513e-05],
"classes": 5
}
]
}
```
Kafka Sink Transformer
AlexNet Inference
More Information about
Custom Transformer
service can be found 🔗link
- Implement Custom Transformer
./model.py
using Kserve API
1import os
2import argparse
3import json
4
5from typing import Dict, Union
6from kafka import KafkaProducer
7from cloudevents.http import CloudEvent
8from cloudevents.conversion import to_structured
9
10from kserve import (
11 Model,
12 ModelServer,
13 model_server,
14 logging,
15 InferRequest,
16 InferResponse,
17)
18
19from kserve.logging import logger
20from kserve.utils.utils import generate_uuid
21
22kafka_producer = KafkaProducer(
23 value_serializer=lambda v: json.dumps(v).encode('utf-8'),
24 bootstrap_servers=os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
25)
26
27class ImageTransformer(Model):
28 def __init__(self, name: str):
29 super().__init__(name, return_response_headers=True)
30 self.ready = True
31
32
33 def preprocess(
34 self, payload: Union[Dict, InferRequest], headers: Dict[str, str] = None
35 ) -> Union[Dict, InferRequest]:
36 logger.info("Received inputs %s", payload)
37 logger.info("Received headers %s", headers)
38 self.request_trace_key = os.environ.get('REQUEST_TRACE_KEY', 'algo.trace.requestId')
39 if self.request_trace_key not in payload:
40 logger.error("Request trace key '%s' not found in payload, you cannot trace the prediction result", self.request_trace_key)
41 if "instances" not in payload:
42 raise ValueError(
43 f"Request trace key '{self.request_trace_key}' not found in payload and 'instances' key is missing."
44 )
45 else:
46 headers[self.request_trace_key] = payload.get(self.request_trace_key)
47
48 return {"instances": payload["instances"]}
49
50 def postprocess(
51 self,
52 infer_response: Union[Dict, InferResponse],
53 headers: Dict[str, str] = None,
54 response_headers: Dict[str, str] = None,
55 ) -> Union[Dict, InferResponse]:
56 logger.info("postprocess headers: %s", headers)
57 logger.info("postprocess response headers: %s", response_headers)
58 logger.info("postprocess response: %s", infer_response)
59
60 attributes = {
61 "source": "data-and-computing/kafka-sink-transformer",
62 "type": "org.zhejianglab.zverse.data-and-computing.kafka-sink-transformer",
63 "request-host": headers.get('host', 'unknown'),
64 "kserve-isvc-name": headers.get('kserve-isvc-name', 'unknown'),
65 "kserve-isvc-namespace": headers.get('kserve-isvc-namespace', 'unknown'),
66 self.request_trace_key: headers.get(self.request_trace_key, 'unknown'),
67 }
68
69 _, cloudevent = to_structured(CloudEvent(attributes, infer_response))
70 try:
71 kafka_producer.send(os.environ.get('KAFKA_TOPIC', 'test-topic'), value=cloudevent.decode('utf-8').replace("'", '"'))
72 kafka_producer.flush()
73 except Exception as e:
74 logger.error("Failed to send message to Kafka: %s", e)
75 return infer_response
76
77parser = argparse.ArgumentParser(parents=[model_server.parser])
78args, _ = parser.parse_known_args()
79
80if __name__ == "__main__":
81 if args.configure_logging:
82 logging.configure_logging(args.log_config_file)
83 logging.logger.info("available model name: %s", args.model_name)
84 logging.logger.info("all args: %s", args.model_name)
85 model = ImageTransformer(args.model_name)
86 ModelServer().start([model])
- modify
./pyproject.toml
[tool.poetry]
name = "custom_transformer"
version = "0.15.2"
description = "Custom Transformer Examples. Not intended for use outside KServe Frameworks Images."
authors = ["Dan Sun <dsun20@bloomberg.net>"]
license = "Apache-2.0"
packages = [
{ include = "*.py" }
]
[tool.poetry.dependencies]
python = ">=3.9,<3.13"
kserve = {path = "../kserve", develop = true}
pillow = "^10.3.0"
kafka-python = "^2.2.15"
cloudevents = "^1.11.1"
[[tool.poetry.source]]
name = "pytorch"
url = "https://download.pytorch.org/whl/cpu"
priority = "explicit"
[tool.poetry.group.test]
optional = true
[tool.poetry.group.test.dependencies]
pytest = "^7.4.4"
mypy = "^0.991"
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
black = { version = "~24.3.0", extras = ["colorama"] }
[tool.poetry-version-plugin]
source = "file"
file_path = "../VERSION"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
- prepare
../custom_transformer.Dockerfile
ARG PYTHON_VERSION=3.11
ARG BASE_IMAGE=python:${PYTHON_VERSION}-slim-bookworm
ARG VENV_PATH=/prod_venv
FROM ${BASE_IMAGE} AS builder
# Install Poetry
ARG POETRY_HOME=/opt/poetry
ARG POETRY_VERSION=1.8.3
RUN python3 -m venv ${POETRY_HOME} && ${POETRY_HOME}/bin/pip install poetry==${POETRY_VERSION}
ENV PATH="$PATH:${POETRY_HOME}/bin"
# Activate virtual env
ARG VENV_PATH
ENV VIRTUAL_ENV=${VENV_PATH}
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
COPY kserve/pyproject.toml kserve/poetry.lock kserve/
RUN cd kserve && poetry install --no-root --no-interaction --no-cache
COPY kserve kserve
RUN cd kserve && poetry install --no-interaction --no-cache
COPY custom_transformer/pyproject.toml custom_transformer/poetry.lock custom_transformer/
RUN cd custom_transformer && poetry install --no-root --no-interaction --no-cache
COPY custom_transformer custom_transformer
RUN cd custom_transformer && poetry install --no-interaction --no-cache
FROM ${BASE_IMAGE} AS prod
COPY third_party third_party
# Activate virtual env
ARG VENV_PATH
ENV VIRTUAL_ENV=${VENV_PATH}
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
RUN useradd kserve -m -u 1000 -d /home/kserve
COPY --from=builder --chown=kserve:kserve $VIRTUAL_ENV $VIRTUAL_ENV
COPY --from=builder kserve kserve
COPY --from=builder custom_transformer custom_transformer
USER 1000
ENTRYPOINT ["python", "-m", "custom_transformer.model"]
- regenerate poetry.lock
poetry lock --no-update
- build and push custom docker image
cd python
podman build -t docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9 -f custom_transformer.Dockerfile .
podman push docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9
Generative
Subsections of Generative
First Generative Service
B --> C[[Knative Serving]] --> D[自动扩缩容/灰度发布]
B --> E[[Istio]] --> F[流量管理/安全]
B --> G[[存储系统]] --> H[S3/GCS/PVC]
### 单YAML部署推理服务
```yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
namespace: kserve-test
spec:
predictor:
model:
modelFormat:
name: sklearn
resources: {}
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model"
check CRD
kubectl -n kserve-test get inferenceservices sklearn-iris
kubectl -n istio-system get svc istio-ingressgateway
export INGRESS_HOST=$(minikube ip)
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice sklearn-iris -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://sklearn-iris.kserve-test.example.com
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" "http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/sklearn-iris:predict" -d @./iris-input.json
How to deploy your own ML model
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: huggingface-llama3
namespace: kserve-test
annotations:
serving.kserve.io/deploymentMode: RawDeployment
serving.kserve.io/autoscalerClass: none
spec:
predictor:
model:
modelFormat:
name: huggingface
storageUri: pvc://llama-3-8b-pvc/hf/8b_instruction_tuned
workerSpec:
pipelineParallelSize: 2
tensorParallelSize: 1
containers:
- name: worker-container
resources:
requests:
nvidia.com/gpu: "8"
Canary Policy
KServe supports canary rollouts for inference services. Canary rollouts allow for a new version of an InferenceService to receive a percentage of traffic. Kserve supports a configurable canary rollout strategy with multiple steps. The rollout strategy can also be implemented to rollback to the previous revision if a rollout step fails.
KServe automatically tracks the last good revision that was rolled out with 100% traffic. The canaryTrafficPercent
field in the component’s spec needs to be set with the percentage of traffic that should be routed to the new revision. KServe will then automatically split the traffic between the last good revision and the revision that is currently being rolled out according to the canaryTrafficPercent
value.
When the first revision of an InferenceService
is deployed, it will receive 100% of the traffic. When multiple revisions are deployed, as in step 2, and the canary rollout strategy is configured to route 10% of the traffic to the new revision, 90% of the traffic will go to the LastestRolledoutRevision
. If there is an unhealthy or bad revision applied, traffic will not be routed to that bad revision. In step 3, the rollout strategy promotes the LatestReadyRevision
from step 2 to the LatestRolledoutRevision
. Since it is now promoted, the LatestRolledoutRevision
gets 100% of the traffic and is fully rolled out. If a rollback needs to happen, 100% of the traffic will be pinned to the previous healthy/good revision- the PreviousRolledoutRevision
.
Reference
For more information, see Canary Rollout.
Subsections of Canary Policy
Rollout Example
Create the InferenceService
Follow the First Inference Service
tutorial. Set up a namespace kserve-test
and create an InferenceService.
After rolling out the first model, 100% traffic goes to the initial model with service revision 1.
kubectl -n kserve-test get isvc sklearn-iris
Apply Canary Rollout Strategy
- Add the
canaryTrafficPercent
field to the predictor component - Update the
storageUri
to use a new/updated model.
kubectl apply -n kserve-test -f - <<EOF
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
namespace: kserve-test
spec:
predictor:
canaryTrafficPercent: 10
model:
args: ["--enable_docs_url=True"]
modelFormat:
name: sklearn
resources: {}
runtime: kserve-sklearnserver
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model-2"
EOF
After rolling out the canary model, traffic is split between the latest ready revision 2 and the previously rolled out revision 1.
kubectl -n kserve-test get isvc sklearn-iris
Check the running pods, you should now see port two pods running for the old and new model and 10% traffic is routed to
the new model. Notice revision 1 contains 0002
in its name, while revision 2 contains 0003
.
kubectl get pods
NAME READY STATUS RESTARTS AGE
sklearn-iris-predictor-00002-deployment-c7bb6c685-ktk7r 2/2 Running 0 71m
sklearn-iris-predictor-00003-deployment-8498d947-fpzcg 2/2 Running 0 20m
Run a prediction
Follow the next two steps (Determine the ingress IP and ports and Perform inference) in the First Inference Service tutorial.
Send more requests to the InferenceService
to observe the 10% of traffic that routes to the new revision.
Promote the canary model
If the canary model is healthy/passes your tests,
you can promote it by removing the canaryTrafficPercent
field and re-applying the InferenceService
custom resource with the same name sklearn-iris
kubectl apply -n kserve-test -f - <<EOF
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
namespace: kserve-test
spec:
predictor:
model:
args: ["--enable_docs_url=True"]
modelFormat:
name: sklearn
resources: {}
runtime: kserve-sklearnserver
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model-2"
EOF
Now all traffic goes to the revision 2 for the new model.
kubectl get isvc sklearn-iris
NAME URL READY PREV LATEST PREVROLLEDOUTREVISION LATESTREADYREVISION AGE
sklearn-iris http://sklearn-iris.kserve-test.example.com True 100 sklearn-iris-predictor-00002 17m
The pods for revision generation 1 automatically scales down to 0 as it is no longer getting the traffic.
kubectl get pods -l serving.kserve.io/inferenceservice=sklearn-iris
NAME READY STATUS RESTARTS AGE
sklearn-iris-predictor-00001-deployment-66c5f5b8d5-gmfvj 1/2 Terminating 0 17m
sklearn-iris-predictor-00002-deployment-5bd9ff46f8-shtzd 2/2 Running 0 15m
Rollback and pin the previous model
You can pin the previous model (model v1, for example) by setting the canaryTrafficPercent
to 0 for the current
model (model v2, for example). This rolls back from model v2 to model v1 and decreases model v2’s traffic to zero.
Apply the custom resource to set model v2’s traffic to 0%.
kubectl apply -n kserve-test -f - <<EOF
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
spec:
predictor:
canaryTrafficPercent: 0
model:
modelFormat:
name: sklearn
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model-2"
EOF
Check the traffic split, now 100% traffic goes to the previous good model (model v1) for revision generation 1.
kubectl get isvc sklearn-iris
NAME URL READY PREV LATEST PREVROLLEDOUTREVISION LATESTREADYREVISION AGE
sklearn-iris http://sklearn-iris.kserve-test.example.com True 100 0 sklearn-iris-predictor-00002 sklearn-iris-predictor-00003 18m
The pods for previous revision (model v1) now routes 100% of the traffic to its pods while the new model (model v2) routes 0% traffic to its pods.
kubectl get pods -l serving.kserve.io/inferenceservice=sklearn-iris
NAME READY STATUS RESTARTS AGE
sklearn-iris-predictor-00002-deployment-66c5f5b8d5-gmfvj 1/2 Running 0 35s
sklearn-iris-predictor-00003-deployment-5bd9ff46f8-shtzd 2/2 Running 0 16m
Route traffic using a tag
You can enable tag based routing by adding the annotation serving.kserve.io/enable-tag-routing
, so traffic can be
explicitly routed to the canary model (model v2) or the old model (model v1) via a tag in the request URL.
Apply model v2 with canaryTrafficPercent: 10
and serving.kserve.io/enable-tag-routing: "true"
.
kubectl apply -n kserve-test -f - <<EOF
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
annotations:
serving.kserve.io/enable-tag-routing: "true"
spec:
predictor:
canaryTrafficPercent: 10
model:
modelFormat:
name: sklearn
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model-2"
EOF
Check the InferenceService status to get the canary and previous model URL.
kubectl get isvc sklearn-iris -ojsonpath="{.status.components.predictor}" | jq
The output should look like
Since we updated the annotation on the InferenceService
, model v2 now corresponds to sklearn-iris-predictor--00003
.
You can now send the request explicitly to the new model or the previous model by using the tag in the request URL. Use
the curl command
from Perform inference and
add latest-
or prev-
to the model name to send a tag based request.
For example, set the model name and use the following commands to send traffic to each service based on the latest
or prev
tag.
curl the latest revision
MODEL_NAME=sklearn-iris
curl -v -H "Host: latest-${MODEL_NAME}-predictor-.kserve-test.example.com" -H "Content-Type: application/json" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict -d @./iris-input.json
or curl the previous revision
curl -v -H "Host: prev-${MODEL_NAME}-predictor-.kserve-test.example.com" -H "Content-Type: application/json" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict -d @./iris-input.json
Auto Scaling
Soft Limit
You can configure InferenceService with annotation autoscaling.knative.dev/target
for a soft limit. The soft limit is a targeted limit rather than a strictly enforced bound, particularly if there is a sudden burst of requests, this value can be exceeded.
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
namespace: kserve-test
annotations:
autoscaling.knative.dev/target: "5"
spec:
predictor:
model:
args: ["--enable_docs_url=True"]
modelFormat:
name: sklearn
resources: {}
runtime: kserve-sklearnserver
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model"
Hard Limit
You can also configure InferenceService with field containerConcurrency
with a hard limit. The hard limit is an enforced upper bound. If concurrency reaches the hard limit, surplus requests will be buffered and must wait until enough capacity is free to execute the requests.
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
namespace: kserve-test
spec:
predictor:
containerConcurrency: 5
model:
args: ["--enable_docs_url=True"]
modelFormat:
name: sklearn
resources: {}
runtime: kserve-sklearnserver
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model"
Scale with QPS
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
namespace: kserve-test
spec:
predictor:
scaleTarget: 1
scaleMetric: qps
model:
args: ["--enable_docs_url=True"]
modelFormat:
name: sklearn
resources: {}
runtime: kserve-sklearnserver
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model"
Scale with GPU
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "flowers-sample-gpu"
namespace: kserve-test
spec:
predictor:
scaleTarget: 1
scaleMetric: concurrency
model:
modelFormat:
name: tensorflow
storageUri: "gs://kfserving-examples/models/tensorflow/flowers"
runtimeVersion: "2.6.2-gpu"
resources:
limits:
nvidia.com/gpu: 1
Enable Scale To Zero
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
namespace: kserve-test
spec:
predictor:
minReplicas: 0
model:
args: ["--enable_docs_url=True"]
modelFormat:
name: sklearn
resources: {}
runtime: kserve-sklearnserver
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model"
Prepare Concurrent Requests Container
# export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
podman run --rm \
-v /root/kserve/iris-input.json:/tmp/iris-input.json \
--privileged \
-e INGRESS_HOST=$(minikube ip) \
-e INGRESS_PORT=32132 \
-e MODEL_NAME=sklearn-iris \
-e INPUT_PATH=/tmp/iris-input.json \
-e SERVICE_HOSTNAME=sklearn-iris.kserve-test.example.com \
-it m.daocloud.io/docker.io/library/golang:1.22 bash -c "go install github.com/rakyll/hey@latest; bash"
Fire
Send traffic in 30 seconds spurts maintaining 5 in-flight requests.
hey -z 30s -c 100 -m POST -host ${SERVICE_HOSTNAME} -D $INPUT_PATH http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict
Reference
For more information, please refer to the KPA documentation.
Knative
Subsections of Knative
Eventing
Subsections of Eventing
Broker
Knative Broker 是 Knative Eventing 系统的核心组件,它的主要作用是充当事件路由和分发的中枢,在事件生产者(事件源)和事件消费者(服务)之间提供解耦、可靠的事件传输。
以下是 Knative Broker 的关键作用详解:
事件接收中心:
Broker 是事件流汇聚的入口点。各种事件源(如 Kafka 主题、HTTP 源、Cloud Pub/Sub、GitHub Webhooks、定时器、自定义源等)将事件发送到 Broker。
事件生产者只需知道 Broker 的地址,无需关心最终有哪些消费者或消费者在哪里。
事件存储与缓冲:
Broker 通常基于持久化的消息系统实现(如 Apache Kafka, Google Cloud Pub/Sub, RabbitMQ, NATS Streaming 或内存实现 InMemoryChannel)。这提供了:
持久化: 确保事件在消费者处理前不会丢失(取决于底层通道实现)。
缓冲: 当消费者暂时不可用或处理速度跟不上事件产生速度时,Broker 可以缓冲事件,避免事件丢失或压垮生产者/消费者。
重试: 如果消费者处理事件失败,Broker 可以重新投递事件(通常需要结合 Trigger 和 Subscription 的重试策略)。
解耦事件源和事件消费者:
这是 Broker 最重要的作用之一。事件源只负责将事件发送到 Broker,完全不知道有哪些服务会消费这些事件。
事件消费者通过创建 Trigger 向 Broker 声明它对哪些事件感兴趣。消费者只需知道 Broker 的存在,无需知道事件是从哪个具体源产生的。
这种解耦极大提高了系统的灵活性和可维护性:
独立演进: 可以独立添加、移除或修改事件源或消费者,只要它们遵循 Broker 的契约。
动态路由: 基于事件属性(如 type, source)动态路由事件到不同的消费者,无需修改生产者或消费者代码。
多播: 同一个事件可以被多个不同的消费者同时消费(一个事件 -> Broker -> 多个匹配的 Trigger -> 多个服务)。
事件过滤与路由(通过 Trigger):
Broker 本身不直接处理复杂的过滤逻辑。过滤和路由是由 Trigger 资源实现的。
Trigger 资源绑定到特定的 Broker。
Trigger 定义了:
订阅者: 目标服务(Knative Service、Kubernetes Service、Channel 等)的地址。
过滤器: 基于事件属性(主要是 type 和 source,以及其他可扩展属性)的条件表达式。只有满足条件的事件才会被 Broker 通过该 Trigger 路由到对应的订阅者。
Broker 接收事件后,会检查所有绑定到它的 Trigger 的过滤器。对于每一个匹配的 Trigger,Broker 都会将事件发送到该 Trigger 指定的订阅者。
提供标准事件接口:
Broker 遵循 CloudEvents 规范,它接收和传递的事件都是 CloudEvents 格式的。这为不同来源的事件和不同消费者的处理提供了统一的格式标准,简化了集成。
多租户和命名空间隔离:
Broker 通常部署在 Kubernetes 的特定命名空间中。一个命名空间内可以创建多个 Broker。
这允许在同一个集群内为不同的团队、应用或环境(如 dev, staging)隔离事件流。每个团队/应用可以管理自己命名空间内的 Broker 和 Trigger。
总结比喻:
可以把 Knative Broker 想象成一个高度智能的邮局分拣中心:
接收信件(事件): 来自世界各地(不同事件源)的信件(事件)都寄到这个分拣中心(Broker)。
存储信件: 分拣中心有仓库(持久化/缓冲)临时存放信件,确保信件安全不丢失。
分拣规则(Trigger): 分拣中心里有很多分拣员(Trigger)。每个分拣员负责特定类型或来自特定地区的信件(基于事件属性过滤)。
投递信件: 分拣员(Trigger)找到符合自己负责规则的信件(事件),就把它们投递到正确的收件人(订阅者服务)家门口。
解耦: 寄信人(事件源)只需要知道分拣中心(Broker)的地址,完全不需要知道收信人(消费者)是谁、在哪里。收信人(消费者)只需要告诉分拣中心里负责自己这类信件的分拣员(创建 Trigger)自己的地址,不需要关心信是谁寄来的。分拣中心(Broker)和分拣员(Trigger)负责中间的复杂路由工作。
Broker 带来的核心价值:
松耦合: 彻底解耦事件生产者和消费者。
灵活性: 动态添加/移除消费者,动态改变路由规则(通过修改/创建/删除 Trigger)。
可靠性: 提供事件持久化和重试机制(依赖底层实现)。
可伸缩性: Broker 和消费者都可以独立伸缩。
标准化: 基于 CloudEvents。
简化开发: 开发者专注于业务逻辑(生产事件或消费事件),无需自己搭建复杂的事件总线基础设施。
Subsections of Broker
Install Kafka Broker
About
- Source, curl, kafkaSource,
- Broker
- Trigger
- Sink: ksvc, isvc
Install a Channel (messaging) layer
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-controller.yaml
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-channel.yaml
Install a Broker layer
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-broker.yaml
for more information, you can check 🔗https://knative.dev/docs/eventing/brokers/broker-types/kafka-broker/
[Optional] Install Eventing extensions
- kafka sink
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-sink.yaml
for more information, you can check 🔗https://knative.dev/docs/eventing/sinks/kafka-sink/
- kafka source
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-source.yaml
for more information, you can check 🔗https://knative.dev/docs/eventing/sources/kafka-source/
Display Broker Message
Flow
flowchart LR A[Curl] -->|HTTP| B{Broker} B -->|Subscribe| D[Trigger1] B -->|Subscribe| E[Trigger2] B -->|Subscribe| F[Trigger3] E --> G[Display Service]
Setps
1. Create Broker Setting
kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
default.topic.partitions: "10"
default.topic.replication.factor: "1"
bootstrap.servers: "kafka.database.svc.cluster.local:9092" #kafka service address
default.topic.config.retention.ms: "3600"
EOF
2. Create Broker
kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
eventing.knative.dev/broker.class: Kafka
name: first-broker
namespace: kserve-test
spec:
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
EOF
deadletterSink:
3. Create Trigger
kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: display-service-trigger
namespace: kserve-test
spec:
broker: first-broker
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
EOF
4. Create Sink Service (Display Message)
kubectl apply -f - <<EOF
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: event-display
namespace: kserve-test
spec:
template:
spec:
containers:
- image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
EOF
5. Test
kubectl run curl-test --image=curlimages/curl -it --rm --restart=Never -- \
-v "http://kafka-broker-ingress.knative-eventing.svc.cluster.local/kserve-test/first-broker" \
-X POST \
-H "Ce-Id: $(date +%s)" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: test.type" \
-H "Ce-Source: curl-test" \
-H "Content-Type: application/json" \
-d '{"test": "Broker is working"}'
6. Check message
kubectl -n kserve-test logs -f deploy/event-display-00001-deployment
Kafka Broker Invoke ISVC
1. Prepare RBAC
- create cluster role to access CRD isvc
kubectl apply -f - <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: kserve-access-for-knative
rules:
- apiGroups: ["serving.kserve.io"]
resources: ["inferenceservices", "inferenceservices/status"]
verbs: ["get", "list", "watch"]
EOF
- create rolebinding and grant privileges
kubectl apply -f - <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: kafka-controller-kserve-access
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: kserve-access-for-knative
subjects:
- kind: ServiceAccount
name: kafka-controller
namespace: knative-eventing
EOF
2. Create Broker Setting
kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
default.topic.partitions: "10"
default.topic.replication.factor: "1"
bootstrap.servers: "kafka.database.svc.cluster.local:9092" #kafka service address
default.topic.config.retention.ms: "3600"
EOF
3. Create Broker
kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
eventing.knative.dev/broker.class: Kafka
name: isvc-broker
namespace: kserve-test
spec:
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
delivery:
deadLetterSink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
EOF
4. Create InferenceService
you can create isvc first-tourchserve
service, by following 🔗link
5. Create Trigger
kubectl apply -f - << EOF
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: kserve-trigger
namespace: kserve-test
spec:
broker: isvc-broker
filter:
attributes:
type: prediction-request
subscriber:
uri: http://first-torchserve.kserve-test.svc.cluster.local/v1/models/mnist:predict
EOF
6. Test
Normally, we can invoke
first-tourchserve
by executing
export MASTER_IP=192.168.100.112
export ISTIO_INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
export SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice first-torchserve -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://first-torchserve.kserve-test.example.com
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" "http://${MASTER_IP}:${ISTIO_INGRESS_PORT}/v1/models/mnist:predict" -d @./mnist-input.json
Now, you can access model by executing
export KAFKA_BROKER_INGRESS_PORT=$(kubectl -n knative-eventing get service kafka-broker-ingress -o jsonpath='{.spec.ports[?(@.name=="http-container")].nodePort}')
curl -v "http://${MASTER_IP}:${KAFKA_BROKER_INGRESS_PORT}/kserve-test/isvc-broker" \
-X POST \
-H "Ce-Id: $(date +%s)" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: prediction-request" \
-H "Ce-Source: event-producer" \
-H "Content-Type: application/json" \
-d @./mnist-input.json
Plugin
Subsections of Plugin
Eventing Kafka Broker
Subsections of Eventing Kafka Broker
Prepare Dev Environment
update go -> 1.24
install
ko
-> 1.8.0
go install github.com/google/ko@latest
# wget https://github.com/ko-build/ko/releases/download/v0.18.0/ko_0.18.0_Linux_x86_64.tar.gz
# tar -xzf ko_0.18.0_Linux_x86_64.tar.gz -C /usr/local/bin/ko
# cp /usr/local/bin/ko/ko /root/bin
- protoc
PB_REL="https://github.com/protocolbuffers/protobuf/releases"
curl -LO $PB_REL/download/v30.2/protoc-30.2-linux-x86_64.zip
# mkdir -p ${HOME}/bin/
mkdir -p /usr/local/bin/protoc
unzip protoc-30.2-linux-x86_64.zip -d /usr/local/bin/protoc
cp /usr/local/bin/protoc/bin/protoc /root/bin
# export PATH="$PATH:/root/bin"
rm -rf protoc-30.2-linux-x86_64.zip
- protoc-gen-go -> 1.5.4
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
export GOPATH=/usr/local/go/bin
- copy some code
mkdir -p ${GOPATH}/src/knative.dev
cd ${GOPATH}/src/knative.dev
git clone git@github.com:knative/eventing.git # clone eventing repo
git clone git@github.com:AaronYang0628/eventing-kafka-broker.git
cd eventing-kafka-broker
git remote add upstream https://github.com/knative-extensions/eventing-kafka-broker.git
git remote set-url --push upstream no_push
export KO_DOCKER_REPO=docker-registry.lab.zverse.space/data-and-computing/ay-dev
Build Async Preidction Flow
Flow
flowchart LR A[User Curl] -->|HTTP| B{ISVC-Broker:Kafka} B -->|Subscribe| D[Trigger1] B -->|Subscribe| E[Kserve-Triiger] B -->|Subscribe| F[Trigger3] E --> G[Mnist Service] G --> |Kafka-Sink| B
Setps
1. Create Broker Setting
kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
default.topic.partitions: "10"
default.topic.replication.factor: "1"
bootstrap.servers: "kafka.database.svc.cluster.local:9092" #kafka service address
default.topic.config.retention.ms: "3600"
EOF
2. Create Broker
kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
eventing.knative.dev/broker.class: Kafka
name: isvc-broker
namespace: kserve-test
spec:
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
EOF
3. Create Trigger
kubectl apply -f - << EOF
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: kserve-trigger
namespace: kserve-test
spec:
broker: isvc-broker
filter:
attributes:
type: prediction-request-udf-attr # you can change this
subscriber:
uri: http://prediction-and-sink.kserve-test.svc.cluster.local/v1/models/mnist:predict
EOF
4. Create InferenceService
1kubectl apply -f - <<EOF
2apiVersion: serving.kserve.io/v1beta1
3kind: InferenceService
4metadata:
5 name: prediction-and-sink
6 namespace: kserve-test
7spec:
8 predictor:
9 model:
10 modelFormat:
11 name: pytorch
12 storageUri: gs://kfserving-examples/models/torchserve/image_classifier/v1
13 transformer:
14 containers:
15 - image: docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9
16 name: kserve-container
17 env:
18 - name: KAFKA_BOOTSTRAP_SERVERS
19 value: kafka.database.svc.cluster.local
20 - name: KAFKA_TOPIC
21 value: test-topic # result will be saved in this topic
22 - name: REQUEST_TRACE_KEY
23 value: test-trace-id # using this key to retrieve preidtion result
24 command:
25 - "python"
26 - "-m"
27 - "model"
28 args:
29 - --model_name
30 - mnist
31EOF
[Optional] 5. Invoke InferenceService
- preparation
wget -O ./mnist-input.json https://raw.githubusercontent.com/kserve/kserve/refs/heads/master/docs/samples/v1beta1/torchserve/v1/imgconv/input.json
SERVICE_NAME=prediction-and-sink
MODEL_NAME=mnist
INPUT_PATH=@./mnist-input.json
PLAIN_SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice $SERVICE_NAME -o jsonpath='{.status.url}' | cut -d "/" -f 3)
- fire!!
export INGRESS_HOST=192.168.100.112
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
curl -v -H "Host: ${PLAIN_SERVICE_HOSTNAME}" -H "Content-Type: application/json" -d $INPUT_PATH http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict
6. Invoke Broker
- preparation
cat > image-with-trace-id.json << EOF
{
"test-trace-id": "16ec3446-48d6-422e-9926-8224853e84a7",
"instances": [
{
"data": "iVBORw0KGgoAAAANSUhEUgAAABwAAAAcCAAAAABXZoBIAAAAw0lEQVR4nGNgGFggVVj4/y8Q2GOR83n+58/fP0DwcSqmpNN7oOTJw6f+/H2pjUU2JCSEk0EWqN0cl828e/FIxvz9/9cCh1zS5z9/G9mwyzl/+PNnKQ45nyNAr9ThMHQ/UG4tDofuB4bQIhz6fIBenMWJQ+7Vn7+zeLCbKXv6z59NOPQVgsIcW4QA9YFi6wNQLrKwsBebW/68DJ388Nun5XFocrqvIFH59+XhBAxThTfeB0r+vP/QHbuDCgr2JmOXoSsAAKK7bU3vISS4AAAAAElFTkSuQmCC"
}
]
}
EOF
- fire!!
export MASTER_IP=192.168.100.112
export KAFKA_BROKER_INGRESS_PORT=$(kubectl -n knative-eventing get service kafka-broker-ingress -o jsonpath='{.spec.ports[?(@.name=="http-container")].nodePort}')
curl -v "http://${MASTER_IP}:${KAFKA_BROKER_INGRESS_PORT}/kserve-test/isvc-broker" \
-X POST \
-H "Ce-Id: $(date +%s)" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: prediction-request-udf-attr" \
-H "Ce-Source: event-producer" \
-H "Content-Type: application/json" \
-d @./image-with-trace-id.json
- check input data in kafka topic
knative-broker-kserve-test-isvc-broker
kubectl -n database exec -it deployment/kafka-client-tools -- bash -c \
'kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --consumer.config $CLIENT_CONFIG_FILE --topic knative-broker-kserve-test-isvc-broker --from-beginning'
- check response result in kafka topic
test-topic
kubectl -n database exec -it deployment/kafka-client-tools -- bash -c \
'kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --consumer.config $CLIENT_CONFIG_FILE --topic test-topic --from-beginning'