Subsections of Serverless

Subsections of Kserve

Install Kserve

Preliminary

  • v 1.30 + Kubernetes has installed, if not check 🔗link
  • Helm has installed, if not check 🔗link

Installation

Install By

Preliminary

1. Kubernetes has installed, if not check 🔗link


2. Helm binary has installed, if not check 🔗link


1.install from script directly

curl -s "https://raw.githubusercontent.com/kserve/kserve/release-0.15/hack/quick_install.sh" | bash
Expectd Output

Installing Gateway API CRDs …

😀 Successfully installed Istio

😀 Successfully installed Cert Manager

😀 Successfully installed Knative

But you probably will ecounter some error due to the network, like this:

you need to reinstall some components

export KSERVE_VERSION=v0.15.2
export deploymentMode=Serverless
helm upgrade --namespace kserve kserve-crd oci://ghcr.io/kserve/charts/kserve-crd --version $KSERVE_VERSION
helm upgrade --namespace kserve kserve oci://ghcr.io/kserve/charts/kserve --version $KSERVE_VERSION --set-string kserve.controller.deploymentMode="$deploymentMode"
# helm upgrade knative-operator --namespace knative-serving  https://github.com/knative/operator/releases/download/knative-v1.15.7/knative-operator-v1.15.7.tgz

Preliminary

1. If you have only one node in your cluster, you need at least 6 CPUs, 6 GB of memory, and 30 GB of disk storage.


2. If you have multiple nodes in your cluster, for each node you need at least 2 CPUs, 4 GB of memory, and 20 GB of disk storage.


1.install knative serving CRD resources

kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.18.0/serving-crds.yaml

2.install knative serving components

kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.18.0/serving-core.yaml
# kubectl apply -f https://raw.githubusercontent.com/AaronYang0628/assets/refs/heads/main/knative/serving/release/download/knative-v1.18.0/serving-core.yaml

3.install network layer Istio

kubectl apply -l knative.dev/crd-install=true -f https://github.com/knative/net-istio/releases/download/knative-v1.18.0/istio.yaml
kubectl apply -f https://github.com/knative/net-istio/releases/download/knative-v1.18.0/istio.yaml
kubectl apply -f https://github.com/knative/net-istio/releases/download/knative-v1.18.0/net-istio.yaml

Monitor the Knative components until all of the components show a STATUS of Running or Completed.

kubectl get pods -n knative-serving

#NAME                                      READY   STATUS    RESTARTS   AGE
#3scale-kourier-control-54cc54cc58-mmdgq   1/1     Running   0          81s
#activator-67656dcbbb-8mftq                1/1     Running   0          97s
#autoscaler-df6856b64-5h4lc                1/1     Running   0          97s
#controller-788796f49d-4x6pm               1/1     Running   0          97s
#domain-mapping-65f58c79dc-9cw6d           1/1     Running   0          97s
#domainmapping-webhook-cc646465c-jnwbz     1/1     Running   0          97s
#webhook-859796bc7-8n5g2                   1/1     Running   0          96s

4.install cert manager

kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.17.2/cert-manager.yaml

5.install kserve

kubectl apply --server-side -f https://github.com/kserve/kserve/releases/download/v0.15.0/kserve.yaml
kubectl apply --server-side -f https://github.com/kserve/kserve/releases/download/v0.15.0/kserve-cluster-resources.yaml
Reference

Preliminary

1. Kubernetes has installed, if not check 🔗link


2. ArgoCD has installed, if not check 🔗link


3. Helm binary has installed, if not check 🔗link


1.install gateway API CRDs

kubectl apply -f https://github.com/kubernetes-sigs/gateway-api/releases/download/v1.3.0/standard-install.yaml

2.install cert manager

Reference

following 🔗link to install cert manager

3.install istio system

Reference

following 🔗link to install three istio components (istio-base, istiod, istio-ingressgateway)

4.install Knative Operator

kubectl -n argocd apply -f - << EOF
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
  name: knative-operator
spec:
  syncPolicy:
    syncOptions:
    - CreateNamespace=true
  project: default
  source:
    repoURL: https://knative.github.io/operator
    chart: knative-operator
    targetRevision: v1.18.1
    helm:
      releaseName: knative-operator
      values: |
        knative_operator:
          knative_operator:
            image: m.daocloud.io/gcr.io/knative-releases/knative.dev/operator/cmd/operator
            tag: v1.18.1
            resources:
              requests:
                cpu: 100m
                memory: 100Mi
              limits:
                cpu: 1000m
                memory: 1000Mi
          operator_webhook:
            image: m.daocloud.io/gcr.io/knative-releases/knative.dev/operator/cmd/webhook
            tag: v1.18.1
            resources:
              requests:
                cpu: 100m
                memory: 100Mi
              limits:
                cpu: 500m
                memory: 500Mi
  destination:
    server: https://kubernetes.default.svc
    namespace: knative-serving
EOF

5.sync by argocd

argocd app sync argocd/knative-operator

6.install kserve serving CRD

kubectl apply -f - <<EOF
apiVersion: operator.knative.dev/v1beta1
kind: KnativeServing
metadata:
  name: knative-serving
  namespace: knative-serving
spec:
  version: 1.18.0 # this is knative serving version
  config:
    domain:
      example.com: ""
EOF

7.install kserve CRD

kubectl -n argocd apply -f - << EOF
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
  name: kserve-crd
  annotations:
    argocd.argoproj.io/sync-options: ServerSideApply=true
    argocd.argoproj.io/compare-options: IgnoreExtraneous
spec:
  syncPolicy:
    syncOptions:
    - CreateNamespace=true
    - ServerSideApply=true
  project: default
  source:
    repoURL: https://aaronyang0628.github.io/helm-chart-mirror/charts
    chart: kserve-crd
    targetRevision: v0.15.2
    helm:
      releaseName: kserve-crd 
  destination:
    server: https://kubernetes.default.svc
    namespace: kserve
EOF
knative-serving    activator-cbf5b6b55-7gw8s                                 Running        116s
knative-serving    autoscaler-c5d454c88-nxrms                                Running        115s
knative-serving    autoscaler-hpa-6c966695c6-9ld24                           Running        113s
knative-serving    cleanup-serving-serving-1.18.0-45nhg                      Completed      113s
knative-serving    controller-84f96b7676-jjqfp                               Running        115s
knative-serving    net-istio-controller-574679cd5f-2sf4d                     Running        112s
knative-serving    net-istio-webhook-85c99487db-mmq7n                        Running        111s
knative-serving    storage-version-migration-serving-serving-1.18.0-k28vf    Completed      113s
knative-serving    webhook-75d4fb6db5-qqcwz                                  Running        114s

8.sync by argocd

argocd app sync argocd/kserve-crd

9.install kserve Controller

kubectl -n argocd apply -f - << EOF
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
  name: kserve
  annotations:
    argocd.argoproj.io/sync-options: ServerSideApply=true
    argocd.argoproj.io/compare-options: IgnoreExtraneous
spec:
  syncPolicy:
    syncOptions:
    - CreateNamespace=true
    - ServerSideApply=true
  project: default
  source:
    repoURL: https://aaronyang0628.github.io/helm-chart-mirror/charts
    chart: kserve
    targetRevision: v0.15.2
    helm:
      releaseName: kserve
      values: |
        kserve:
          agent:
            image: m.daocloud.io/docker.io/kserve/agent
          router:
            image: m.daocloud.io/docker.io/kserve/router
          storage:
            image: m.daocloud.io/docker.io/kserve/storage-initializer
            s3:
              accessKeyIdName: AWS_ACCESS_KEY_ID
              secretAccessKeyName: AWS_SECRET_ACCESS_KEY
              endpoint: ""
              region: ""
              verifySSL: ""
              useVirtualBucket: ""
              useAnonymousCredential: ""
          controller:
            deploymentMode: "Serverless"
            rbacProxyImage: m.daocloud.io/quay.io/brancz/kube-rbac-proxy:v0.18.0
            rbacProxy:
              resources:
                limits:
                  cpu: 100m
                  memory: 300Mi
                requests:
                  cpu: 100m
                  memory: 300Mi
            gateway:
              domain: example.com
            image: m.daocloud.io/docker.io/kserve/kserve-controller
            resources:
              limits:
                cpu: 100m
                memory: 300Mi
              requests:
                cpu: 100m
                memory: 300Mi
          servingruntime:
            tensorflow:
              image: tensorflow/serving
              tag: 2.6.2
            mlserver:
              image: m.daocloud.io/docker.io/seldonio/mlserver
              tag: 1.5.0
            sklearnserver:
              image: m.daocloud.io/docker.io/kserve/sklearnserver
            xgbserver:
              image: m.daocloud.io/docker.io/kserve/xgbserver
            huggingfaceserver:
              image: m.daocloud.io/docker.io/kserve/huggingfaceserver
              devShm:
                enabled: false
                sizeLimit: ""
              hostIPC:
                enabled: false
            huggingfaceserver_multinode:
              shm:
                enabled: true
                sizeLimit: "3Gi"
            tritonserver:
              image: nvcr.io/nvidia/tritonserver
            pmmlserver:
              image: m.daocloud.io/docker.io/kserve/pmmlserver
            paddleserver:
              image: m.daocloud.io/docker.io/kserve/paddleserver
            lgbserver:
              image: m.daocloud.io/docker.io/kserve/lgbserver
            torchserve:
              image: pytorch/torchserve-kfs
              tag: 0.9.0
            art:
              image: m.daocloud.io/docker.io/kserve/art-explainer
          localmodel:
            enabled: false
            controller:
              image: m.daocloud.io/docker.io/kserve/kserve-localmodel-controller
            jobNamespace: kserve-localmodel-jobs
            agent:
              hostPath: /mnt/models
              image: m.daocloud.io/docker.io/kserve/kserve-localmodelnode-agent
          inferenceservice:
            resources:
              limits:
                cpu: "1"
                memory: "2Gi"
              requests:
                cpu: "1"
                memory: "2Gi"
  destination:
    server: https://kubernetes.default.svc
    namespace: kserve
EOF
Internal error occurred: failed calling webhook "clusterservingruntime.kserve-webhook-server.validator": failed to call webhook: Post "https://kserve-webhook-server-service.kserve.svc:443/validate-serving-kserve-io-v1alpha1-clusterservingruntime?timeout=10s": no endpoints available for service "kserve-webhook-server-service"                               Running        114s

Just wait for a while and the resync, and it will be fine.

10.sync by argocd

argocd app sync argocd/kserve

11.install kserve eventing CRD

kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.18.1/eventing-crds.yaml

12.install kserve eventing

kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.18.1/eventing-core.yaml
knative-eventing   eventing-controller-cc45869cd-fmhg8        1/1     Running       0          3m33s
knative-eventing   eventing-webhook-67fcc6959b-lktxd          1/1     Running       0          3m33s
knative-eventing   job-sink-7f5d754db-tbf2z                   1/1     Running       0          3m33s

FAQ

You can add standard markdown syntax:

  • multiple paragraphs
  • bullet point lists
  • emphasized, bold and even bold emphasized text
  • links
  • etc.
...and even source code

the possibilities are endless (almost - including other shortcodes may or may not work)

You can add standard markdown syntax:

  • multiple paragraphs
  • bullet point lists
  • emphasized, bold and even bold emphasized text
  • links
  • etc.
...and even source code

the possibilities are endless (almost - including other shortcodes may or may not work)

Subsections of Serving

Subsections of Inference

First Pytorch ISVC

Mnist Inference

More Information about mnist service can be found 🔗link

  1. create a namespace
kubectl create namespace kserve-test
  1. deploy a sample iris service
kubectl apply -n kserve-test -f - <<EOF
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "first-torchserve"
  namespace: kserve-test
spec:
  predictor:
    model:
      modelFormat:
        name: pytorch
      storageUri: gs://kfserving-examples/models/torchserve/image_classifier/v1
      resources:
        limits:
          memory: 4Gi
EOF
  1. Check InferenceService status
kubectl -n kserve-test get inferenceservices first-torchserve 
kubectl -n kserve-test get pod
#NAME                                           READY   STATUS    RESTARTS   AGE
#first-torchserve-predictor-00001-deplo...      2/2     Running   0          25s

kubectl -n kserve-test get inferenceservices first-torchserve
#NAME           URL   READY     PREV   LATEST   PREVROLLEDOUTREVISION   LATESTREADYREVISION   AGE
#kserve-test   first-torchserve      http://first-torchserve.kserve-test.example.com   True           100                              first-torchserve-predictor-00001   2m59s

After all pods are ready, you can access the service by using the following command

Access By

If the EXTERNAL-IP value is set, your environment has an external load balancer that you can use for the ingress gateway.

export INGRESS_HOST=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')

If the EXTERNAL-IP value is none (or perpetually pending), your environment does not provide an external load balancer for the ingress gateway. In this case, you can access the gateway using the service’s node port.

export INGRESS_HOST=$(minikube ip)
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
export INGRESS_HOST=$(minikube ip)
kubectl port-forward --namespace istio-system svc/istio-ingressgateway 30080:80
export INGRESS_PORT=30080
  1. Perform a prediction First, prepare your inference input request inside a file:
wget -O ./mnist-input.json https://raw.githubusercontent.com/kserve/kserve/refs/heads/master/docs/samples/v1beta1/torchserve/v1/imgconv/input.json
ssh -i ~/.minikube/machines/minikube/id_rsa docker@$(minikube ip) -L "*:${INGRESS_PORT}:0.0.0.0:${INGRESS_PORT}" -N -f
  1. Invoke the service
SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice first-torchserve  -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://first-torchserve.kserve-test.example.com 
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" "http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/mnist:predict" -d @./mnist-input.json
*   Trying 192.168.58.2...
* TCP_NODELAY set
* Connected to 192.168.58.2 (192.168.58.2) port 32132 (#0)
> POST /v1/models/mnist:predict HTTP/1.1
> Host: my-torchserve.kserve-test.example.com
> User-Agent: curl/7.61.1
> Accept: */*
> Content-Type: application/json
> Content-Length: 401
> 
* upload completely sent off: 401 out of 401 bytes
< HTTP/1.1 200 OK
< content-length: 19
< content-type: application/json
< date: Mon, 09 Jun 2025 09:27:27 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 1128
< 
* Connection #0 to host 192.168.58.2 left intact
{"predictions":[2]}

First Custom Model

AlexNet Inference

More Information about AlexNet service can be found 🔗link

  1. Implement Custom Model using KServe API
 1import argparse
 2import base64
 3import io
 4import time
 5
 6from fastapi.middleware.cors import CORSMiddleware
 7from torchvision import models, transforms
 8from typing import Dict
 9import torch
10from PIL import Image
11
12import kserve
13from kserve import Model, ModelServer, logging
14from kserve.model_server import app
15from kserve.utils.utils import generate_uuid
16
17
18class AlexNetModel(Model):
19    def __init__(self, name: str):
20        super().__init__(name, return_response_headers=True)
21        self.name = name
22        self.load()
23        self.ready = False
24
25    def load(self):
26        self.model = models.alexnet(pretrained=True)
27        self.model.eval()
28        # The ready flag is used by model ready endpoint for readiness probes,
29        # set to True when model is loaded successfully without exceptions.
30        self.ready = True
31
32    async def predict(
33        self,
34        payload: Dict,
35        headers: Dict[str, str] = None,
36        response_headers: Dict[str, str] = None,
37    ) -> Dict:
38        start = time.time()
39        # Input follows the Tensorflow V1 HTTP API for binary values
40        # https://www.tensorflow.org/tfx/serving/api_rest#encoding_binary_values
41        img_data = payload["instances"][0]["image"]["b64"]
42        raw_img_data = base64.b64decode(img_data)
43        input_image = Image.open(io.BytesIO(raw_img_data))
44        preprocess = transforms.Compose([
45            transforms.Resize(256),
46            transforms.CenterCrop(224),
47            transforms.ToTensor(),
48            transforms.Normalize(mean=[0.485, 0.456, 0.406],
49                                 std=[0.229, 0.224, 0.225]),
50        ])
51        input_tensor = preprocess(input_image).unsqueeze(0)
52        output = self.model(input_tensor)
53        torch.nn.functional.softmax(output, dim=1)
54        values, top_5 = torch.topk(output, 5)
55        result = values.flatten().tolist()
56        end = time.time()
57        response_id = generate_uuid()
58
59        # Custom response headers can be added to the inference response
60        if response_headers is not None:
61            response_headers.update(
62                {"prediction-time-latency": f"{round((end - start) * 1000, 9)}"}
63            )
64
65        return {"predictions": result}
66
67
68parser = argparse.ArgumentParser(parents=[kserve.model_server.parser])
69args, _ = parser.parse_known_args()
70
71if __name__ == "__main__":
72    # Configure kserve and uvicorn logger
73    if args.configure_logging:
74        logging.configure_logging(args.log_config_file)
75    model = AlexNetModel(args.model_name)
76    model.load()
77    # Custom middlewares can be added to the model
78    app.add_middleware(
79        CORSMiddleware,
80        allow_origins=["*"],
81        allow_credentials=True,
82        allow_methods=["*"],
83        allow_headers=["*"],
84    )
85    ModelServer().start([model])
  1. create requirements.txt
kserve
torchvision==0.18.0
pillow>=10.3.0,<11.0.0
  1. create Dockerfile
FROM m.daocloud.io/docker.io/library/python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir  -r requirements.txt 

COPY model.py .

CMD ["python", "model.py", "--model_name=custom-model"]
  1. build and push custom docker image
docker build -t ay-custom-model .
docker tag ddfd0186813e docker-registry.lab.zverse.space/ay/ay-custom-model:latest
docker push docker-registry.lab.zverse.space/ay/ay-custom-model:latest
  1. create a namespace
kubectl create namespace kserve-test
  1. deploy a sample custom-model service
kubectl apply -n kserve-test -f - <<EOF
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: ay-custom-model
spec:
  predictor:
    containers:
      - name: kserve-container
        image: docker-registry.lab.zverse.space/ay/ay-custom-model:latest
EOF
  1. Check InferenceService status
kubectl -n kserve-test get inferenceservices ay-custom-model
kubectl -n kserve-test get pod
#NAME                                           READY   STATUS    RESTARTS   AGE
#ay-custom-model-predictor-00003-dcf4rk         2/2     Running   0        167m

kubectl -n kserve-test get inferenceservices ay-custom-model
#NAME           URL   READY     PREV   LATEST   PREVROLLEDOUTREVISION   LATESTREADYREVISION   AGE
#ay-custom-model   http://ay-custom-model.kserve-test.example.com   True           100                              ay-custom-model-predictor-00003   177m

After all pods are ready, you can access the service by using the following command

Access By

If the EXTERNAL-IP value is set, your environment has an external load balancer that you can use for the ingress gateway.

export INGRESS_HOST=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')

If the EXTERNAL-IP value is none (or perpetually pending), your environment does not provide an external load balancer for the ingress gateway. In this case, you can access the gateway using the service’s node port.

export INGRESS_HOST=$(minikube ip)
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
export INGRESS_HOST=$(minikube ip)
kubectl port-forward --namespace istio-system svc/istio-ingressgateway 30080:80
export INGRESS_PORT=30080
  1. Perform a prediction

First, prepare your inference input request inside a file:

wget -O ./alex-net-input.json https://kserve.github.io/website/0.15/modelserving/v1beta1/custom/custom_model/input.json
ssh -i ~/.minikube/machines/minikube/id_rsa docker@$(minikube ip) -L "*:${INGRESS_PORT}:0.0.0.0:${INGRESS_PORT}" -N -f
  1. Invoke the service
export SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice ay-custom-model  -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://ay-custom-model.kserve-test.example.com
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" -X POST "http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/custom-model:predict" -d @.//alex-net-input.json
*   Trying 192.168.58.2:30704...
* Connected to 192.168.58.2 (192.168.58.2) port 30704
> POST /v1/models/custom-model:predict HTTP/1.1
> Host: ay-custom-model.kserve-test.example.com
> User-Agent: curl/8.5.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 105339
> 
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< content-length: 110
< content-type: application/json
< date: Wed, 11 Jun 2025 03:38:30 GMT
< prediction-time-latency: 89.966773987
< server: istio-envoy
< x-envoy-upstream-service-time: 93
< 
* Connection #0 to host 192.168.58.2 left intact
{"predictions":[14.975619316101074,14.0368070602417,13.966034889221191,12.252280235290527,12.086270332336426]}

First Model In Minio

Inference Model In Minio

More Information about Deploy InferenceService with a saved model on S3 can be found 🔗link

Create Service Account

=== “yaml”

apiVersion: v1
kind: ServiceAccount
metadata:
  name: sa
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::123456789012:role/s3access # replace with your IAM role ARN
    serving.kserve.io/s3-endpoint: s3.amazonaws.com # replace with your s3 endpoint e.g minio-service.kubeflow:9000
    serving.kserve.io/s3-usehttps: "1" # by default 1, if testing with minio you can set to 0
    serving.kserve.io/s3-region: "us-east-2"
    serving.kserve.io/s3-useanoncredential: "false" # omitting this is the same as false, if true will ignore provided credential and use anonymous credentials

=== “kubectl”

kubectl apply -f create-s3-sa.yaml

Create S3 Secret and attach to Service Account

Create a secret with your S3 user credential, KServe reads the secret annotations to inject the S3 environment variables on storage initializer or model agent to download the models from S3 storage.

Create S3 secret

=== “yaml”

apiVersion: v1
kind: Secret
metadata:
  name: s3creds
  annotations:
     serving.kserve.io/s3-endpoint: s3.amazonaws.com # replace with your s3 endpoint e.g minio-service.kubeflow:9000
     serving.kserve.io/s3-usehttps: "1" # by default 1, if testing with minio you can set to 0
     serving.kserve.io/s3-region: "us-east-2"
     serving.kserve.io/s3-useanoncredential: "false" # omitting this is the same as false, if true will ignore provided credential and use anonymous credentials
type: Opaque
stringData: # use `stringData` for raw credential string or `data` for base64 encoded string
  AWS_ACCESS_KEY_ID: XXXX
  AWS_SECRET_ACCESS_KEY: XXXXXXXX

Attach secret to a service account

=== “yaml”

apiVersion: v1
kind: ServiceAccount
metadata:
  name: sa
secrets:
- name: s3creds

=== “kubectl”

kubectl apply -f create-s3-secret.yaml

!!! note If you are running kserve with istio sidecars enabled, there can be a race condition between the istio proxy being ready and the agent pulling models. This will result in a tcp dial connection refused error when the agent tries to download from s3.

To resolve it, istio allows the blocking of other containers in a pod until the proxy container is ready.

You can enabled this by setting `proxy.holdApplicationUntilProxyStarts: true` in `istio-sidecar-injector` configmap, `proxy.holdApplicationUntilProxyStarts` flag was introduced in Istio 1.7 as an experimental feature and is turned off by default.

Deploy the model on S3 with InferenceService

Create the InferenceService with the s3 storageUri and the service account with s3 credential attached.

=== “New Schema”

```yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "mnist-s3"
spec:
  predictor:
    serviceAccountName: sa
    model:
      modelFormat:
        name: tensorflow
      storageUri: "s3://kserve-examples/mnist"
```

=== “Old Schema”

```yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "mnist-s3"
spec:
  predictor:
    serviceAccountName: sa
    tensorflow:
      storageUri: "s3://kserve-examples/mnist"
```

Apply the autoscale-gpu.yaml.

=== “kubectl”

kubectl apply -f mnist-s3.yaml

Run a prediction

Now, the ingress can be accessed at ${INGRESS_HOST}:${INGRESS_PORT} or follow this instruction to find out the ingress IP and port.

SERVICE_HOSTNAME=$(kubectl get inferenceservice mnist-s3 -o jsonpath='{.status.url}' | cut -d "/" -f 3)

MODEL_NAME=mnist-s3
INPUT_PATH=@./input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict -d $INPUT_PATH

!!! success “Expected Output”

```{ .bash .no-copy }
Note: Unnecessary use of -X or --request, POST is already inferred.
*   Trying 35.237.217.209...
* TCP_NODELAY set
* Connected to mnist-s3.default.35.237.217.209.xip.io (35.237.217.209) port 80 (#0)
> POST /v1/models/mnist-s3:predict HTTP/1.1
> Host: mnist-s3.default.35.237.217.209.xip.io
> User-Agent: curl/7.55.1
> Accept: */*
> Content-Length: 2052
> Content-Type: application/x-www-form-urlencoded
> Expect: 100-continue
>
< HTTP/1.1 100 Continue
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< content-length: 251
< content-type: application/json
< date: Sun, 04 Apr 2021 20:06:27 GMT
< x-envoy-upstream-service-time: 5
< server: istio-envoy
<
* Connection #0 to host mnist-s3.default.35.237.217.209.xip.io left intact
{
    "predictions": [
        {
            "predictions": [0.327352405, 2.00153053e-07, 0.0113353515, 0.203903764, 3.62863029e-05, 0.416683704, 0.000281196437, 8.36911859e-05, 0.0403052084, 1.82206513e-05],
            "classes": 5
        }
    ]
}
```

Kafka Sink Transformer

AlexNet Inference

More Information about Custom Transformer service can be found 🔗link

  1. Implement Custom Transformer ./model.py using Kserve API
 1import os
 2import argparse
 3import json
 4
 5from typing import Dict, Union
 6from kafka import KafkaProducer
 7from cloudevents.http import CloudEvent
 8from cloudevents.conversion import to_structured
 9
10from kserve import (
11    Model,
12    ModelServer,
13    model_server,
14    logging,
15    InferRequest,
16    InferResponse,
17)
18
19from kserve.logging import logger
20from kserve.utils.utils import generate_uuid
21
22kafka_producer = KafkaProducer(
23    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
24    bootstrap_servers=os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
25)
26
27class ImageTransformer(Model):
28    def __init__(self, name: str):
29        super().__init__(name, return_response_headers=True)
30        self.ready = True
31
32
33    def preprocess(
34        self, payload: Union[Dict, InferRequest], headers: Dict[str, str] = None
35    ) -> Union[Dict, InferRequest]:
36        logger.info("Received inputs %s", payload)
37        logger.info("Received headers %s", headers)
38        self.request_trace_key = os.environ.get('REQUEST_TRACE_KEY', 'algo.trace.requestId')
39        if self.request_trace_key not in payload:
40            logger.error("Request trace key '%s' not found in payload, you cannot trace the prediction result", self.request_trace_key)
41            if "instances" not in payload:
42                raise ValueError(
43                    f"Request trace key '{self.request_trace_key}' not found in payload and 'instances' key is missing."
44                )
45        else:
46            headers[self.request_trace_key] = payload.get(self.request_trace_key)
47   
48        return {"instances": payload["instances"]}
49
50    def postprocess(
51        self,
52        infer_response: Union[Dict, InferResponse],
53        headers: Dict[str, str] = None,
54        response_headers: Dict[str, str] = None,
55    ) -> Union[Dict, InferResponse]:
56        logger.info("postprocess headers: %s", headers)
57        logger.info("postprocess response headers: %s", response_headers)
58        logger.info("postprocess response: %s", infer_response)
59
60        attributes = {
61            "source": "data-and-computing/kafka-sink-transformer",
62            "type": "org.zhejianglab.zverse.data-and-computing.kafka-sink-transformer",
63            "request-host": headers.get('host', 'unknown'),
64            "kserve-isvc-name": headers.get('kserve-isvc-name', 'unknown'),
65            "kserve-isvc-namespace": headers.get('kserve-isvc-namespace', 'unknown'),
66            self.request_trace_key: headers.get(self.request_trace_key, 'unknown'),
67        }
68
69        _, cloudevent = to_structured(CloudEvent(attributes, infer_response))
70        try:
71            kafka_producer.send(os.environ.get('KAFKA_TOPIC', 'test-topic'), value=cloudevent.decode('utf-8').replace("'", '"'))
72            kafka_producer.flush()
73        except Exception as e:
74            logger.error("Failed to send message to Kafka: %s", e)
75        return infer_response
76
77parser = argparse.ArgumentParser(parents=[model_server.parser])
78args, _ = parser.parse_known_args()
79
80if __name__ == "__main__":
81    if args.configure_logging:
82        logging.configure_logging(args.log_config_file)
83    logging.logger.info("available model name: %s", args.model_name)
84    logging.logger.info("all args: %s", args.model_name)
85    model = ImageTransformer(args.model_name)
86    ModelServer().start([model])
  1. modify ./pyproject.toml
[tool.poetry]
name = "custom_transformer"
version = "0.15.2"
description = "Custom Transformer Examples. Not intended for use outside KServe Frameworks Images."
authors = ["Dan Sun <dsun20@bloomberg.net>"]
license = "Apache-2.0"
packages = [
    { include = "*.py" }
]

[tool.poetry.dependencies]
python = ">=3.9,<3.13"
kserve = {path = "../kserve", develop = true}
pillow = "^10.3.0"
kafka-python = "^2.2.15"
cloudevents = "^1.11.1"

[[tool.poetry.source]]
name = "pytorch"
url = "https://download.pytorch.org/whl/cpu"
priority = "explicit"

[tool.poetry.group.test]
optional = true

[tool.poetry.group.test.dependencies]
pytest = "^7.4.4"
mypy = "^0.991"

[tool.poetry.group.dev]
optional = true

[tool.poetry.group.dev.dependencies]
black = { version = "~24.3.0", extras = ["colorama"] }

[tool.poetry-version-plugin]
source = "file"
file_path = "../VERSION"

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
  1. prepare ../custom_transformer.Dockerfile
ARG PYTHON_VERSION=3.11
ARG BASE_IMAGE=python:${PYTHON_VERSION}-slim-bookworm
ARG VENV_PATH=/prod_venv

FROM ${BASE_IMAGE} AS builder

# Install Poetry
ARG POETRY_HOME=/opt/poetry
ARG POETRY_VERSION=1.8.3

RUN python3 -m venv ${POETRY_HOME} && ${POETRY_HOME}/bin/pip install poetry==${POETRY_VERSION}
ENV PATH="$PATH:${POETRY_HOME}/bin"

# Activate virtual env
ARG VENV_PATH
ENV VIRTUAL_ENV=${VENV_PATH}
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

COPY kserve/pyproject.toml kserve/poetry.lock kserve/
RUN cd kserve && poetry install --no-root --no-interaction --no-cache
COPY kserve kserve
RUN cd kserve && poetry install --no-interaction --no-cache

COPY custom_transformer/pyproject.toml custom_transformer/poetry.lock custom_transformer/
RUN cd custom_transformer && poetry install --no-root --no-interaction --no-cache
COPY custom_transformer custom_transformer
RUN cd custom_transformer && poetry install --no-interaction --no-cache


FROM ${BASE_IMAGE} AS prod

COPY third_party third_party

# Activate virtual env
ARG VENV_PATH
ENV VIRTUAL_ENV=${VENV_PATH}
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

RUN useradd kserve -m -u 1000 -d /home/kserve

COPY --from=builder --chown=kserve:kserve $VIRTUAL_ENV $VIRTUAL_ENV
COPY --from=builder kserve kserve
COPY --from=builder custom_transformer custom_transformer

USER 1000
ENTRYPOINT ["python", "-m", "custom_transformer.model"]
  1. regenerate poetry.lock
poetry lock --no-update
  1. build and push custom docker image
cd python
podman build -t docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9 -f custom_transformer.Dockerfile .

podman push docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9

Subsections of Generative

First Generative Service

B --> C[[Knative Serving]] --> D[自动扩缩容/灰度发布]
B --> E[[Istio]] --> F[流量管理/安全]
B --> G[[存储系统]] --> H[S3/GCS/PVC]
### 单YAML部署推理服务
```yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "sklearn-iris"
  namespace: kserve-test
spec:
  predictor:
    model:
      modelFormat:
        name: sklearn
      resources: {}
      storageUri: "gs://kfserving-examples/models/sklearn/1.0/model"

check CRD

kubectl -n kserve-test get inferenceservices sklearn-iris 
kubectl -n istio-system get svc istio-ingressgateway 
export INGRESS_HOST=$(minikube ip)
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice sklearn-iris  -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://sklearn-iris.kserve-test.example.com 
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" "http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/sklearn-iris:predict" -d @./iris-input.json

How to deploy your own ML model

apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: huggingface-llama3
  namespace: kserve-test
  annotations:
    serving.kserve.io/deploymentMode: RawDeployment
    serving.kserve.io/autoscalerClass: none
spec:
  predictor:
    model:
      modelFormat:
        name: huggingface
      storageUri: pvc://llama-3-8b-pvc/hf/8b_instruction_tuned
    workerSpec:
      pipelineParallelSize: 2
      tensorParallelSize: 1
      containers:
      - name: worker-container
          resources: 
          requests:
              nvidia.com/gpu: "8"

check https://kserve.github.io/website/0.15/modelserving/v1beta1/llm/huggingface/multi-node/#workerspec-and-servingruntime

Canary Policy

KServe supports canary rollouts for inference services. Canary rollouts allow for a new version of an InferenceService to receive a percentage of traffic. Kserve supports a configurable canary rollout strategy with multiple steps. The rollout strategy can also be implemented to rollback to the previous revision if a rollout step fails.

KServe automatically tracks the last good revision that was rolled out with 100% traffic. The canaryTrafficPercent field in the component’s spec needs to be set with the percentage of traffic that should be routed to the new revision. KServe will then automatically split the traffic between the last good revision and the revision that is currently being rolled out according to the canaryTrafficPercent value.

When the first revision of an InferenceService is deployed, it will receive 100% of the traffic. When multiple revisions are deployed, as in step 2, and the canary rollout strategy is configured to route 10% of the traffic to the new revision, 90% of the traffic will go to the LastestRolledoutRevision. If there is an unhealthy or bad revision applied, traffic will not be routed to that bad revision. In step 3, the rollout strategy promotes the LatestReadyRevision from step 2 to the LatestRolledoutRevision. Since it is now promoted, the LatestRolledoutRevision gets 100% of the traffic and is fully rolled out. If a rollback needs to happen, 100% of the traffic will be pinned to the previous healthy/good revision- the PreviousRolledoutRevision.

Canary Rollout Strategy Steps 1-2 Canary Rollout Strategy Steps 1-2 Canary Rollout Strategy Step 3 Canary Rollout Strategy Step 3

Reference

For more information, see Canary Rollout.

Subsections of Canary Policy

Rollout Example

Create the InferenceService

Follow the First Inference Service tutorial. Set up a namespace kserve-test and create an InferenceService.

After rolling out the first model, 100% traffic goes to the initial model with service revision 1.

kubectl -n kserve-test get isvc sklearn-iris
NAME       URL              READY   PREV   LATEST   PREVROLLEDOUTREVISION   LATESTREADYREVISION                AGE
sklearn-iris   http://sklearn-iris.kserve-test.example.com   True      100       sklearn-iris-predictor--00001   46s      2m39s     70s

Apply Canary Rollout Strategy

  • Add the canaryTrafficPercent field to the predictor component
  • Update the storageUri to use a new/updated model.
kubectl apply -n kserve-test -f - <<EOF
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "sklearn-iris"
  namespace: kserve-test
spec:
  predictor:
    canaryTrafficPercent: 10
    model:
      args: ["--enable_docs_url=True"]
      modelFormat:
        name: sklearn
      resources: {}
      runtime: kserve-sklearnserver
      storageUri: "gs://kfserving-examples/models/sklearn/1.0/model-2"
EOF

After rolling out the canary model, traffic is split between the latest ready revision 2 and the previously rolled out revision 1.

kubectl -n kserve-test get isvc sklearn-iris
NAME       URL              READY   PREV   LATEST   PREVROLLEDOUTREVISION   LATESTREADYREVISION                AGE
sklearn-iris   http://sklearn-iris.kserve-test.example.com   True    90     10       sklearn-iris-predictor-00002   sklearn-iris-predictor-00003   19h

Check the running pods, you should now see port two pods running for the old and new model and 10% traffic is routed to the new model. Notice revision 1 contains 0002 in its name, while revision 2 contains 0003.

kubectl get pods 

NAME                                                        READY   STATUS    RESTARTS   AGE
sklearn-iris-predictor-00002-deployment-c7bb6c685-ktk7r     2/2     Running   0          71m
sklearn-iris-predictor-00003-deployment-8498d947-fpzcg      2/2     Running   0          20m

Run a prediction

Follow the next two steps (Determine the ingress IP and ports and Perform inference) in the First Inference Service tutorial.

Send more requests to the InferenceService to observe the 10% of traffic that routes to the new revision.

Promote the canary model

If the canary model is healthy/passes your tests,

you can promote it by removing the canaryTrafficPercent field and re-applying the InferenceService custom resource with the same name sklearn-iris

kubectl apply -n kserve-test -f - <<EOF
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "sklearn-iris"
  namespace: kserve-test
spec:
  predictor:
    model:
      args: ["--enable_docs_url=True"]
      modelFormat:
        name: sklearn
      resources: {}
      runtime: kserve-sklearnserver
      storageUri: "gs://kfserving-examples/models/sklearn/1.0/model-2"
EOF

Now all traffic goes to the revision 2 for the new model.

kubectl get isvc sklearn-iris
NAME       URL                                   READY   PREV   LATEST   PREVROLLEDOUTREVISION   LATESTREADYREVISION                AGE
sklearn-iris   http://sklearn-iris.kserve-test.example.com   True           100                              sklearn-iris-predictor-00002   17m

The pods for revision generation 1 automatically scales down to 0 as it is no longer getting the traffic.

kubectl get pods -l serving.kserve.io/inferenceservice=sklearn-iris
NAME                                                           READY   STATUS        RESTARTS   AGE
sklearn-iris-predictor-00001-deployment-66c5f5b8d5-gmfvj   1/2     Terminating   0          17m
sklearn-iris-predictor-00002-deployment-5bd9ff46f8-shtzd   2/2     Running       0          15m

Rollback and pin the previous model

You can pin the previous model (model v1, for example) by setting the canaryTrafficPercent to 0 for the current model (model v2, for example). This rolls back from model v2 to model v1 and decreases model v2’s traffic to zero.

Apply the custom resource to set model v2’s traffic to 0%.

kubectl apply -n kserve-test -f - <<EOF
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "sklearn-iris"
spec:
  predictor:
    canaryTrafficPercent: 0
    model:
      modelFormat:
        name: sklearn
      storageUri: "gs://kfserving-examples/models/sklearn/1.0/model-2"
EOF

Check the traffic split, now 100% traffic goes to the previous good model (model v1) for revision generation 1.

kubectl get isvc sklearn-iris
NAME       URL                                   READY   PREV   LATEST   PREVROLLEDOUTREVISION              LATESTREADYREVISION                AGE
sklearn-iris   http://sklearn-iris.kserve-test.example.com   True    100    0        sklearn-iris-predictor-00002   sklearn-iris-predictor-00003   18m

The pods for previous revision (model v1) now routes 100% of the traffic to its pods while the new model (model v2) routes 0% traffic to its pods.

kubectl get pods -l serving.kserve.io/inferenceservice=sklearn-iris

NAME                                                       READY   STATUS        RESTARTS   AGE
sklearn-iris-predictor-00002-deployment-66c5f5b8d5-gmfvj   1/2     Running       0          35s
sklearn-iris-predictor-00003-deployment-5bd9ff46f8-shtzd   2/2     Running       0          16m

Route traffic using a tag

You can enable tag based routing by adding the annotation serving.kserve.io/enable-tag-routing, so traffic can be explicitly routed to the canary model (model v2) or the old model (model v1) via a tag in the request URL.

Apply model v2 with canaryTrafficPercent: 10 and serving.kserve.io/enable-tag-routing: "true".

kubectl apply -n kserve-test -f - <<EOF
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "sklearn-iris"
  annotations:
    serving.kserve.io/enable-tag-routing: "true"
spec:
  predictor:
    canaryTrafficPercent: 10
    model:
      modelFormat:
        name: sklearn
      storageUri: "gs://kfserving-examples/models/sklearn/1.0/model-2"
EOF

Check the InferenceService status to get the canary and previous model URL.

kubectl get isvc sklearn-iris -ojsonpath="{.status.components.predictor}"  | jq

The output should look like

{
    "address": {
    "url": "http://sklearn-iris-predictor-.kserve-test.svc.cluster.local"
    },
    "latestCreatedRevision": "sklearn-iris-predictor--00003",
    "latestReadyRevision": "sklearn-iris-predictor--00003",
    "latestRolledoutRevision": "sklearn-iris-predictor--00001",
    "previousRolledoutRevision": "sklearn-iris-predictor--00001",
    "traffic": [
    {
        "latestRevision": true,
        "percent": 10,
        "revisionName": "sklearn-iris-predictor--00003",
        "tag": "latest",
        "url": "http://latest-sklearn-iris-predictor-.kserve-test.example.com"
    },
    {
        "latestRevision": false,
        "percent": 90,
        "revisionName": "sklearn-iris-predictor--00001",
        "tag": "prev",
        "url": "http://prev-sklearn-iris-predictor-.kserve-test.example.com"
    }
    ],
    "url": "http://sklearn-iris-predictor-.kserve-test.example.com"
}

Since we updated the annotation on the InferenceService, model v2 now corresponds to sklearn-iris-predictor--00003.

You can now send the request explicitly to the new model or the previous model by using the tag in the request URL. Use the curl command from Perform inference and add latest- or prev- to the model name to send a tag based request.

For example, set the model name and use the following commands to send traffic to each service based on the latest or prev tag.

curl the latest revision

MODEL_NAME=sklearn-iris
curl -v -H "Host: latest-${MODEL_NAME}-predictor-.kserve-test.example.com" -H "Content-Type: application/json" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict -d @./iris-input.json

or curl the previous revision

curl -v -H "Host: prev-${MODEL_NAME}-predictor-.kserve-test.example.com" -H "Content-Type: application/json" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict -d @./iris-input.json

Auto Scaling

Soft Limit

You can configure InferenceService with annotation autoscaling.knative.dev/target for a soft limit. The soft limit is a targeted limit rather than a strictly enforced bound, particularly if there is a sudden burst of requests, this value can be exceeded.

apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "sklearn-iris"
  namespace: kserve-test
  annotations:
    autoscaling.knative.dev/target: "5"
spec:
  predictor:
    model:
      args: ["--enable_docs_url=True"]
      modelFormat:
        name: sklearn
      resources: {}
      runtime: kserve-sklearnserver
      storageUri: "gs://kfserving-examples/models/sklearn/1.0/model"

Hard Limit

You can also configure InferenceService with field containerConcurrency with a hard limit. The hard limit is an enforced upper bound. If concurrency reaches the hard limit, surplus requests will be buffered and must wait until enough capacity is free to execute the requests.

apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "sklearn-iris"
  namespace: kserve-test
spec:
  predictor:
    containerConcurrency: 5
    model:
      args: ["--enable_docs_url=True"]
      modelFormat:
        name: sklearn
      resources: {}
      runtime: kserve-sklearnserver
      storageUri: "gs://kfserving-examples/models/sklearn/1.0/model"

Scale with QPS

apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "sklearn-iris"
  namespace: kserve-test
spec:
  predictor:
    scaleTarget: 1
    scaleMetric: qps
    model:
      args: ["--enable_docs_url=True"]
      modelFormat:
        name: sklearn
      resources: {}
      runtime: kserve-sklearnserver
      storageUri: "gs://kfserving-examples/models/sklearn/1.0/model"

Scale with GPU

apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "flowers-sample-gpu"
  namespace: kserve-test
spec:
  predictor:
    scaleTarget: 1
    scaleMetric: concurrency
    model:
      modelFormat:
        name: tensorflow
      storageUri: "gs://kfserving-examples/models/tensorflow/flowers"
      runtimeVersion: "2.6.2-gpu"
      resources:
        limits:
          nvidia.com/gpu: 1

Enable Scale To Zero

apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "sklearn-iris"
  namespace: kserve-test
spec:
  predictor:
    minReplicas: 0
    model:
      args: ["--enable_docs_url=True"]
      modelFormat:
        name: sklearn
      resources: {}
      runtime: kserve-sklearnserver
      storageUri: "gs://kfserving-examples/models/sklearn/1.0/model"

Prepare Concurrent Requests Container

# export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
podman run --rm \
      -v /root/kserve/iris-input.json:/tmp/iris-input.json \
      --privileged \
      -e INGRESS_HOST=$(minikube ip) \
      -e INGRESS_PORT=32132 \
      -e MODEL_NAME=sklearn-iris \
      -e INPUT_PATH=/tmp/iris-input.json \
      -e SERVICE_HOSTNAME=sklearn-iris.kserve-test.example.com \
      -it m.daocloud.io/docker.io/library/golang:1.22  bash -c "go install github.com/rakyll/hey@latest; bash"

Fire

Send traffic in 30 seconds spurts maintaining 5 in-flight requests.

hey -z 30s -c 100 -m POST -host ${SERVICE_HOSTNAME} -D $INPUT_PATH http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict
Summary:
  Total:        30.1390 secs
  Slowest:      0.5015 secs
  Fastest:      0.0252 secs
  Average:      0.1451 secs
  Requests/sec: 687.3483
  
  Total data:   4371076 bytes
  Size/request: 211 bytes

Response time histogram:
  0.025 [1]     |
  0.073 [14]    |
  0.120 [33]    |
  0.168 [19363] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
  0.216 [1171]  |■■
  0.263 [28]    |
  0.311 [6]     |
  0.359 [0]     |
  0.406 [0]     |
  0.454 [0]     |
  0.502 [100]   |


Latency distribution:
  10% in 0.1341 secs
  25% in 0.1363 secs
  50% in 0.1388 secs
  75% in 0.1462 secs
  90% in 0.1587 secs
  95% in 0.1754 secs
  99% in 0.1968 secs

Details (average, fastest, slowest):
  DNS+dialup:   0.0000 secs, 0.0252 secs, 0.5015 secs
  DNS-lookup:   0.0000 secs, 0.0000 secs, 0.0000 secs
  req write:    0.0000 secs, 0.0000 secs, 0.0005 secs
  resp wait:    0.1451 secs, 0.0251 secs, 0.5015 secs
  resp read:    0.0000 secs, 0.0000 secs, 0.0003 secs

Status code distribution:
  [500] 20716 responses

Reference

For more information, please refer to the KPA documentation.

Subsections of Knative

Subsections of Eventing

Broker

Knative Broker 是 Knative Eventing 系统的核心组件,它的主要作用是充当事件路由和分发的中枢,在事件生产者(事件源)和事件消费者(服务)之间提供解耦、可靠的事件传输。

以下是 Knative Broker 的关键作用详解:

事件接收中心:

Broker 是事件流汇聚的入口点。各种事件源(如 Kafka 主题、HTTP 源、Cloud Pub/Sub、GitHub Webhooks、定时器、自定义源等)将事件发送到 Broker。

事件生产者只需知道 Broker 的地址,无需关心最终有哪些消费者或消费者在哪里。

事件存储与缓冲:

Broker 通常基于持久化的消息系统实现(如 Apache Kafka, Google Cloud Pub/Sub, RabbitMQ, NATS Streaming 或内存实现 InMemoryChannel)。这提供了:

持久化: 确保事件在消费者处理前不会丢失(取决于底层通道实现)。

缓冲: 当消费者暂时不可用或处理速度跟不上事件产生速度时,Broker 可以缓冲事件,避免事件丢失或压垮生产者/消费者。

重试: 如果消费者处理事件失败,Broker 可以重新投递事件(通常需要结合 Trigger 和 Subscription 的重试策略)。

解耦事件源和事件消费者:

这是 Broker 最重要的作用之一。事件源只负责将事件发送到 Broker,完全不知道有哪些服务会消费这些事件。

事件消费者通过创建 Trigger 向 Broker 声明它对哪些事件感兴趣。消费者只需知道 Broker 的存在,无需知道事件是从哪个具体源产生的。

这种解耦极大提高了系统的灵活性和可维护性:

独立演进: 可以独立添加、移除或修改事件源或消费者,只要它们遵循 Broker 的契约。

动态路由: 基于事件属性(如 type, source)动态路由事件到不同的消费者,无需修改生产者或消费者代码。

多播: 同一个事件可以被多个不同的消费者同时消费(一个事件 -> Broker -> 多个匹配的 Trigger -> 多个服务)。

事件过滤与路由(通过 Trigger):

Broker 本身不直接处理复杂的过滤逻辑。过滤和路由是由 Trigger 资源实现的。

Trigger 资源绑定到特定的 Broker。

Trigger 定义了:

订阅者: 目标服务(Knative Service、Kubernetes Service、Channel 等)的地址。

过滤器: 基于事件属性(主要是 type 和 source,以及其他可扩展属性)的条件表达式。只有满足条件的事件才会被 Broker 通过该 Trigger 路由到对应的订阅者。

Broker 接收事件后,会检查所有绑定到它的 Trigger 的过滤器。对于每一个匹配的 Trigger,Broker 都会将事件发送到该 Trigger 指定的订阅者。

提供标准事件接口:

Broker 遵循 CloudEvents 规范,它接收和传递的事件都是 CloudEvents 格式的。这为不同来源的事件和不同消费者的处理提供了统一的格式标准,简化了集成。

多租户和命名空间隔离:

Broker 通常部署在 Kubernetes 的特定命名空间中。一个命名空间内可以创建多个 Broker。

这允许在同一个集群内为不同的团队、应用或环境(如 dev, staging)隔离事件流。每个团队/应用可以管理自己命名空间内的 Broker 和 Trigger。

总结比喻:

可以把 Knative Broker 想象成一个高度智能的邮局分拣中心:

接收信件(事件): 来自世界各地(不同事件源)的信件(事件)都寄到这个分拣中心(Broker)。

存储信件: 分拣中心有仓库(持久化/缓冲)临时存放信件,确保信件安全不丢失。

分拣规则(Trigger): 分拣中心里有很多分拣员(Trigger)。每个分拣员负责特定类型或来自特定地区的信件(基于事件属性过滤)。

投递信件: 分拣员(Trigger)找到符合自己负责规则的信件(事件),就把它们投递到正确的收件人(订阅者服务)家门口。

解耦: 寄信人(事件源)只需要知道分拣中心(Broker)的地址,完全不需要知道收信人(消费者)是谁、在哪里。收信人(消费者)只需要告诉分拣中心里负责自己这类信件的分拣员(创建 Trigger)自己的地址,不需要关心信是谁寄来的。分拣中心(Broker)和分拣员(Trigger)负责中间的复杂路由工作。

Broker 带来的核心价值:

松耦合: 彻底解耦事件生产者和消费者。

灵活性: 动态添加/移除消费者,动态改变路由规则(通过修改/创建/删除 Trigger)。

可靠性: 提供事件持久化和重试机制(依赖底层实现)。

可伸缩性: Broker 和消费者都可以独立伸缩。

标准化: 基于 CloudEvents。

简化开发: 开发者专注于业务逻辑(生产事件或消费事件),无需自己搭建复杂的事件总线基础设施。

Subsections of Broker

Install Kafka Broker

About

broker broker

  • Source, curl, kafkaSource,
  • Broker
  • Trigger
  • Sink: ksvc, isvc

Install a Channel (messaging) layer

kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-controller.yaml
configmap/kafka-broker-config created
configmap/kafka-channel-config created
customresourcedefinition.apiextensions.k8s.io/kafkachannels.messaging.knative.dev created
customresourcedefinition.apiextensions.k8s.io/consumers.internal.kafka.eventing.knative.dev created
customresourcedefinition.apiextensions.k8s.io/consumergroups.internal.kafka.eventing.knative.dev created
customresourcedefinition.apiextensions.k8s.io/kafkasinks.eventing.knative.dev created
customresourcedefinition.apiextensions.k8s.io/kafkasources.sources.knative.dev created
clusterrole.rbac.authorization.k8s.io/eventing-kafka-source-observer created
configmap/config-kafka-source-defaults created
configmap/config-kafka-autoscaler created
configmap/config-kafka-features created
configmap/config-kafka-leader-election created
configmap/kafka-config-logging created
configmap/config-namespaced-broker-resources created
configmap/config-tracing configured
clusterrole.rbac.authorization.k8s.io/knative-kafka-addressable-resolver created
clusterrole.rbac.authorization.k8s.io/knative-kafka-channelable-manipulator created
clusterrole.rbac.authorization.k8s.io/kafka-controller created
serviceaccount/kafka-controller created
clusterrolebinding.rbac.authorization.k8s.io/kafka-controller created
clusterrolebinding.rbac.authorization.k8s.io/kafka-controller-addressable-resolver created
deployment.apps/kafka-controller created
clusterrole.rbac.authorization.k8s.io/kafka-webhook-eventing created
serviceaccount/kafka-webhook-eventing created
clusterrolebinding.rbac.authorization.k8s.io/kafka-webhook-eventing created
mutatingwebhookconfiguration.admissionregistration.k8s.io/defaulting.webhook.kafka.eventing.knative.dev created
mutatingwebhookconfiguration.admissionregistration.k8s.io/pods.defaulting.webhook.kafka.eventing.knative.dev created
secret/kafka-webhook-eventing-certs created
validatingwebhookconfiguration.admissionregistration.k8s.io/validation.webhook.kafka.eventing.knative.dev created
deployment.apps/kafka-webhook-eventing created
service/kafka-webhook-eventing created
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-channel.yaml
configmap/config-kafka-channel-data-plane created
clusterrole.rbac.authorization.k8s.io/knative-kafka-channel-data-plane created
serviceaccount/knative-kafka-channel-data-plane created
clusterrolebinding.rbac.authorization.k8s.io/knative-kafka-channel-data-plane created
statefulset.apps/kafka-channel-dispatcher created
deployment.apps/kafka-channel-receiver created
service/kafka-channel-ingress created

Install a Broker layer

kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-broker.yaml
configmap/config-kafka-broker-data-plane created
clusterrole.rbac.authorization.k8s.io/knative-kafka-broker-data-plane created
serviceaccount/knative-kafka-broker-data-plane created
clusterrolebinding.rbac.authorization.k8s.io/knative-kafka-broker-data-plane created
statefulset.apps/kafka-broker-dispatcher created
deployment.apps/kafka-broker-receiver created
service/kafka-broker-ingress created
Reference

please check sts

root@ay-k3s01:~# kubectl -n knative-eventing  get sts
NAME                       READY   AGE
kafka-broker-dispatcher    1/1     19m
kafka-channel-dispatcher   0/0     22m

some sts replia is 0, please check

[Optional] Install Eventing extensions

  • kafka sink
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-sink.yaml
Reference

for more information, you can check 🔗https://knative.dev/docs/eventing/sinks/kafka-sink/

  • kafka source
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-source.yaml
Reference

for more information, you can check 🔗https://knative.dev/docs/eventing/sources/kafka-source/

Display Broker Message

Flow

flowchart LR
    A[Curl] -->|HTTP| B{Broker}
    B -->|Subscribe| D[Trigger1]
    B -->|Subscribe| E[Trigger2]
    B -->|Subscribe| F[Trigger3]
    E --> G[Display Service]

Setps

1. Create Broker Setting

kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  default.topic.partitions: "10"
  default.topic.replication.factor: "1"
  bootstrap.servers: "kafka.database.svc.cluster.local:9092" #kafka service address
  default.topic.config.retention.ms: "3600"
EOF

2. Create Broker

kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: first-broker
  namespace: kserve-test
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing
EOF

deadletterSink:

3. Create Trigger

kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: display-service-trigger
  namespace: kserve-test
spec:
  broker: first-broker
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display
EOF

4. Create Sink Service (Display Message)

kubectl apply -f - <<EOF
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: event-display
  namespace: kserve-test
spec:
  template:
    spec:
      containers:
        - image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
EOF

5. Test

kubectl run curl-test --image=curlimages/curl -it --rm --restart=Never -- \
  -v "http://kafka-broker-ingress.knative-eventing.svc.cluster.local/kserve-test/first-broker" \
  -X POST \
  -H "Ce-Id: $(date +%s)" \
  -H "Ce-Specversion: 1.0" \
  -H "Ce-Type: test.type" \
  -H "Ce-Source: curl-test" \
  -H "Content-Type: application/json" \
  -d '{"test": "Broker is working"}'

6. Check message

kubectl -n kserve-test logs -f deploy/event-display-00001-deployment 
2025/07/02 09:01:25 Failed to read tracing config, using the no-op default: empty json tracing config
☁️  cloudevents.Event
Context Attributes,
  specversion: 1.0
  type: test.type
  source: curl-test
  id: 1751446880
  datacontenttype: application/json
Extensions,
  knativekafkaoffset: 6
  knativekafkapartition: 6
Data,
  {
    "test": "Broker is working"
  }

Kafka Broker Invoke ISVC

1. Prepare RBAC

  • create cluster role to access CRD isvc
kubectl apply -f - <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: kserve-access-for-knative
rules:
- apiGroups: ["serving.kserve.io"]
  resources: ["inferenceservices", "inferenceservices/status"]
  verbs: ["get", "list", "watch"]
EOF
  • create rolebinding and grant privileges
kubectl apply -f - <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: kafka-controller-kserve-access
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: kserve-access-for-knative
subjects:
- kind: ServiceAccount
  name: kafka-controller
  namespace: knative-eventing
EOF

2. Create Broker Setting

kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  default.topic.partitions: "10"
  default.topic.replication.factor: "1"
  bootstrap.servers: "kafka.database.svc.cluster.local:9092" #kafka service address
  default.topic.config.retention.ms: "3600"
EOF

3. Create Broker

kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: isvc-broker
  namespace: kserve-test
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: event-display
EOF

4. Create InferenceService

Reference

you can create isvc first-tourchserve service, by following 🔗link

5. Create Trigger

kubectl apply -f - << EOF
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: kserve-trigger
  namespace: kserve-test
spec:
  broker: isvc-broker
  filter:
    attributes:
      type: prediction-request
  subscriber:
    uri: http://first-torchserve.kserve-test.svc.cluster.local/v1/models/mnist:predict
EOF

6. Test

Normally, we can invoke first-tourchserve by executing

export MASTER_IP=192.168.100.112
export ISTIO_INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
export SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice first-torchserve  -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://first-torchserve.kserve-test.example.com 
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" "http://${MASTER_IP}:${ISTIO_INGRESS_PORT}/v1/models/mnist:predict" -d @./mnist-input.json

Now, you can access model by executing

export KAFKA_BROKER_INGRESS_PORT=$(kubectl -n knative-eventing get service kafka-broker-ingress -o jsonpath='{.spec.ports[?(@.name=="http-container")].nodePort}')
curl -v "http://${MASTER_IP}:${KAFKA_BROKER_INGRESS_PORT}/kserve-test/isvc-broker" \
  -X POST \
  -H "Ce-Id: $(date +%s)" \
  -H "Ce-Specversion: 1.0" \
  -H "Ce-Type: prediction-request" \
  -H "Ce-Source: event-producer" \
  -H "Content-Type: application/json" \
  -d @./mnist-input.json 

please check kafka

# list all topics, find suffix is `isvc-broker` -> knative-broker-kserve-test-isvc-broker
kubectl -n database exec -it deployment/kafka-client-tools -- bash -c \
    'kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVER --command-config $CLIENT_CONFIG_FILE --list'
# retrieve msg from that topic
kubectl -n database exec -it deployment/kafka-client-tools -- bash -c \
  'kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --consumer.config $CLIENT_CONFIG_FILE --topic knative-broker-kserve-test-isvc-broker --from-beginning'

And then, you could see

{
    "instances": [
        {
            "data": "iVBORw0KGgoAAAANSUhEUgAAABwAAAAcCAAAAABXZoBIAAAAw0lEQVR4nGNgGFggVVj4/y8Q2GOR83n+58/fP0DwcSqmpNN7oOTJw6f+/H2pjUU2JCSEk0EWqN0cl828e/FIxvz9/9cCh1zS5z9/G9mwyzl/+PNnKQ45nyNAr9ThMHQ/UG4tDofuB4bQIhz6fIBenMWJQ+7Vn7+zeLCbKXv6z59NOPQVgsIcW4QA9YFi6wNQLrKwsBebW/68DJ388Nun5XFocrqvIFH59+XhBAxThTfeB0r+vP/QHbuDCgr2JmOXoSsAAKK7bU3vISS4AAAAAElFTkSuQmCC"
        }
    ]
}
{
    "predictions": [
        2
    ]
}

Subsections of Plugin

Subsections of Eventing Kafka Broker

Prepare Dev Environment

  1. update go -> 1.24

  2. install ko -> 1.8.0

go install github.com/google/ko@latest
# wget https://github.com/ko-build/ko/releases/download/v0.18.0/ko_0.18.0_Linux_x86_64.tar.gz
# tar -xzf ko_0.18.0_Linux_x86_64.tar.gz  -C /usr/local/bin/ko
# cp /usr/local/bin/ko/ko /root/bin
  1. protoc
PB_REL="https://github.com/protocolbuffers/protobuf/releases"
curl -LO $PB_REL/download/v30.2/protoc-30.2-linux-x86_64.zip
# mkdir -p ${HOME}/bin/
mkdir -p /usr/local/bin/protoc
unzip protoc-30.2-linux-x86_64.zip -d /usr/local/bin/protoc
cp /usr/local/bin/protoc/bin/protoc /root/bin
# export PATH="$PATH:/root/bin"
rm -rf protoc-30.2-linux-x86_64.zip
  1. protoc-gen-go -> 1.5.4
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
export GOPATH=/usr/local/go/bin
  1. copy some code
mkdir -p ${GOPATH}/src/knative.dev
cd ${GOPATH}/src/knative.dev
git clone git@github.com:knative/eventing.git # clone eventing repo
git clone git@github.com:AaronYang0628/eventing-kafka-broker.git
cd eventing-kafka-broker
git remote add upstream https://github.com/knative-extensions/eventing-kafka-broker.git
git remote set-url --push upstream no_push
export KO_DOCKER_REPO=docker-registry.lab.zverse.space/data-and-computing/ay-dev

Build Async Preidction Flow

Flow

flowchart LR
    A[User Curl] -->|HTTP| B{ISVC-Broker:Kafka}
    B -->|Subscribe| D[Trigger1]
    B -->|Subscribe| E[Kserve-Triiger]
    B -->|Subscribe| F[Trigger3]
    E --> G[Mnist Service]
    G --> |Kafka-Sink| B

Setps

1. Create Broker Setting

kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  default.topic.partitions: "10"
  default.topic.replication.factor: "1"
  bootstrap.servers: "kafka.database.svc.cluster.local:9092" #kafka service address
  default.topic.config.retention.ms: "3600"
EOF

2. Create Broker

kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: isvc-broker
  namespace: kserve-test
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing
EOF

3. Create Trigger

kubectl apply -f - << EOF
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: kserve-trigger
  namespace: kserve-test
spec:
  broker: isvc-broker
  filter:
    attributes:
      type: prediction-request-udf-attr # you can change this
  subscriber:
    uri: http://prediction-and-sink.kserve-test.svc.cluster.local/v1/models/mnist:predict
EOF

4. Create InferenceService

 1kubectl apply -f - <<EOF
 2apiVersion: serving.kserve.io/v1beta1
 3kind: InferenceService
 4metadata:
 5  name: prediction-and-sink
 6  namespace: kserve-test
 7spec:
 8  predictor:
 9    model:
10      modelFormat:
11        name: pytorch
12      storageUri: gs://kfserving-examples/models/torchserve/image_classifier/v1
13  transformer:
14    containers:
15      - image: docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9
16        name: kserve-container
17        env:
18        - name: KAFKA_BOOTSTRAP_SERVERS
19          value: kafka.database.svc.cluster.local
20        - name: KAFKA_TOPIC
21          value: test-topic # result will be saved in this topic
22        - name: REQUEST_TRACE_KEY
23          value: test-trace-id # using this key to retrieve preidtion result
24        command:
25          - "python"
26          - "-m"
27          - "model"
28        args:
29          - --model_name
30          - mnist
31EOF
root@ay-k3s01:~# kubectl -n kserve-test get pod
NAME                                                              READY   STATUS    RESTARTS   AGE
prediction-and-sink-predictor-00001-deployment-f64bb76f-jqv4m     2/2     Running   0          3m46s
prediction-and-sink-transformer-00001-deployment-76cccd867lksg9   2/2     Running   0          4m3s

Source code of the docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9 could be found 🔗here

[Optional] 5. Invoke InferenceService

  • preparation
wget -O ./mnist-input.json https://raw.githubusercontent.com/kserve/kserve/refs/heads/master/docs/samples/v1beta1/torchserve/v1/imgconv/input.json
SERVICE_NAME=prediction-and-sink
MODEL_NAME=mnist
INPUT_PATH=@./mnist-input.json
PLAIN_SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice $SERVICE_NAME -o jsonpath='{.status.url}' | cut -d "/" -f 3)
  • fire!!
export INGRESS_HOST=192.168.100.112
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
curl -v -H "Host: ${PLAIN_SERVICE_HOSTNAME}" -H "Content-Type: application/json" -d $INPUT_PATH http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict
curl -v -H "Host: ${PLAIN_SERVICE_HOSTNAME}" -H "Content-Type: application/json" -d $INPUT_PATH http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict
*   Trying 192.168.100.112:31855...
* Connected to 192.168.100.112 (192.168.100.112) port 31855
> POST /v1/models/mnist:predict HTTP/1.1
> Host: prediction-and-sink.kserve-test.ay.test.dev
> User-Agent: curl/8.5.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 401
> 
< HTTP/1.1 200 OK
< content-length: 19
< content-type: application/json
< date: Wed, 02 Jul 2025 08:55:05 GMT,Wed, 02 Jul 2025 08:55:04 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 209
< 
* Connection #0 to host 192.168.100.112 left intact
{"predictions":[2]}

6. Invoke Broker

  • preparation
cat > image-with-trace-id.json << EOF
{
    "test-trace-id": "16ec3446-48d6-422e-9926-8224853e84a7",
    "instances": [
        {
            "data": "iVBORw0KGgoAAAANSUhEUgAAABwAAAAcCAAAAABXZoBIAAAAw0lEQVR4nGNgGFggVVj4/y8Q2GOR83n+58/fP0DwcSqmpNN7oOTJw6f+/H2pjUU2JCSEk0EWqN0cl828e/FIxvz9/9cCh1zS5z9/G9mwyzl/+PNnKQ45nyNAr9ThMHQ/UG4tDofuB4bQIhz6fIBenMWJQ+7Vn7+zeLCbKXv6z59NOPQVgsIcW4QA9YFi6wNQLrKwsBebW/68DJ388Nun5XFocrqvIFH59+XhBAxThTfeB0r+vP/QHbuDCgr2JmOXoSsAAKK7bU3vISS4AAAAAElFTkSuQmCC"
        }
    ]
}
EOF
  • fire!!
export MASTER_IP=192.168.100.112
export KAFKA_BROKER_INGRESS_PORT=$(kubectl -n knative-eventing get service kafka-broker-ingress -o jsonpath='{.spec.ports[?(@.name=="http-container")].nodePort}')
curl -v "http://${MASTER_IP}:${KAFKA_BROKER_INGRESS_PORT}/kserve-test/isvc-broker" \
  -X POST \
  -H "Ce-Id: $(date +%s)" \
  -H "Ce-Specversion: 1.0" \
  -H "Ce-Type: prediction-request-udf-attr" \
  -H "Ce-Source: event-producer" \
  -H "Content-Type: application/json" \
  -d @./image-with-trace-id.json 
  • check input data in kafka topic knative-broker-kserve-test-isvc-broker
kubectl -n database exec -it deployment/kafka-client-tools -- bash -c \
  'kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --consumer.config $CLIENT_CONFIG_FILE --topic knative-broker-kserve-test-isvc-broker --from-beginning'
{
    "test-trace-id": "16ec3446-48d6-422e-9926-8224853e84a7",
    "instances": [
    {
        "data": "iVBORw0KGgoAAAANSUhEUgAAABwAAAAcCAAAAABXZoBIAAAAw0lEQVR4nGNgGFggVVj4/y8Q2GOR83n+58/fP0DwcSqmpNN7oOTJw6f+/H2pjUU2JCSEk0EWqN0cl828e/FIxvz9/9cCh1zS5z9/G9mwyzl/+PNnKQ45nyNAr9ThMHQ/UG4tDofuB4bQIhz6fIBenMWJQ+7Vn7+zeLCbKXv6z59NOPQVgsIcW4QA9YFi6wNQLrKwsBebW/68DJ388Nun5XFocrqvIFH59+XhBAxThTfeB0r+vP/QHbuDCgr2JmOXoSsAAKK7bU3vISS4AAAAAElFTkSuQmCC"
    }]
}
{
    "predictions": [2] // result will be saved in this topic as well
}
  • check response result in kafka topic test-topic
kubectl -n database exec -it deployment/kafka-client-tools -- bash -c \
  'kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --consumer.config $CLIENT_CONFIG_FILE --topic test-topic --from-beginning'
{
    "specversion": "1.0",
    "id": "822e3115-0185-4752-9967-f408dda72004",
    "source": "data-and-computing/kafka-sink-transformer",
    "type": "org.zhejianglab.zverse.data-and-computing.kafka-sink-transformer",
    "time": "2025-07-02T08:57:04.133497+00:00",
    "data":
    {
        "predictions": [2]
    },
    "request-host": "prediction-and-sink-transformer.kserve-test.svc.cluster.local",
    "kserve-isvc-name": "prediction-and-sink",
    "kserve-isvc-namespace": "kserve-test",
    "test-trace-id": "16ec3446-48d6-422e-9926-8224853e84a7"
}