Build Async Preidction Flow

Flow

flowchart LR
    A[User Curl] -->|HTTP| B{ISVC-Broker:Kafka}
    B -->|Subscribe| D[Trigger1]
    B -->|Subscribe| E[Kserve-Triiger]
    B -->|Subscribe| F[Trigger3]
    E --> G[Mnist Service]
    G --> |Kafka-Sink| B

Setps

1. Create Broker Setting

kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  default.topic.partitions: "10"
  default.topic.replication.factor: "1"
  bootstrap.servers: "kafka.database.svc.cluster.local:9092" #kafka service address
  default.topic.config.retention.ms: "3600"
EOF

2. Create Broker

kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: isvc-broker
  namespace: kserve-test
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing
EOF

3. Create Trigger

kubectl apply -f - << EOF
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: kserve-trigger
  namespace: kserve-test
spec:
  broker: isvc-broker
  filter:
    attributes:
      type: prediction-request-udf-attr # you can change this
  subscriber:
    uri: http://prediction-and-sink.kserve-test.svc.cluster.local/v1/models/mnist:predict
EOF

4. Create InferenceService

 1kubectl apply -f - <<EOF
 2apiVersion: serving.kserve.io/v1beta1
 3kind: InferenceService
 4metadata:
 5  name: prediction-and-sink
 6  namespace: kserve-test
 7spec:
 8  predictor:
 9    model:
10      modelFormat:
11        name: pytorch
12      storageUri: gs://kfserving-examples/models/torchserve/image_classifier/v1
13  transformer:
14    containers:
15      - image: docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9
16        name: kserve-container
17        env:
18        - name: KAFKA_BOOTSTRAP_SERVERS
19          value: kafka.database.svc.cluster.local
20        - name: KAFKA_TOPIC
21          value: test-topic # result will be saved in this topic
22        - name: REQUEST_TRACE_KEY
23          value: test-trace-id # using this key to retrieve preidtion result
24        command:
25          - "python"
26          - "-m"
27          - "model"
28        args:
29          - --model_name
30          - mnist
31EOF
root@ay-k3s01:~# kubectl -n kserve-test get pod
NAME                                                              READY   STATUS    RESTARTS   AGE
prediction-and-sink-predictor-00001-deployment-f64bb76f-jqv4m     2/2     Running   0          3m46s
prediction-and-sink-transformer-00001-deployment-76cccd867lksg9   2/2     Running   0          4m3s

Source code of the docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9 could be found 🔗here

[Optional] 5. Invoke InferenceService

  • preparation
wget -O ./mnist-input.json https://raw.githubusercontent.com/kserve/kserve/refs/heads/master/docs/samples/v1beta1/torchserve/v1/imgconv/input.json
SERVICE_NAME=prediction-and-sink
MODEL_NAME=mnist
INPUT_PATH=@./mnist-input.json
PLAIN_SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice $SERVICE_NAME -o jsonpath='{.status.url}' | cut -d "/" -f 3)
  • fire!!
export INGRESS_HOST=192.168.100.112
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
curl -v -H "Host: ${PLAIN_SERVICE_HOSTNAME}" -H "Content-Type: application/json" -d $INPUT_PATH http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict
curl -v -H "Host: ${PLAIN_SERVICE_HOSTNAME}" -H "Content-Type: application/json" -d $INPUT_PATH http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict
*   Trying 192.168.100.112:31855...
* Connected to 192.168.100.112 (192.168.100.112) port 31855
> POST /v1/models/mnist:predict HTTP/1.1
> Host: prediction-and-sink.kserve-test.ay.test.dev
> User-Agent: curl/8.5.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 401
> 
< HTTP/1.1 200 OK
< content-length: 19
< content-type: application/json
< date: Wed, 02 Jul 2025 08:55:05 GMT,Wed, 02 Jul 2025 08:55:04 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 209
< 
* Connection #0 to host 192.168.100.112 left intact
{"predictions":[2]}

6. Invoke Broker

  • preparation
cat > image-with-trace-id.json << EOF
{
    "test-trace-id": "16ec3446-48d6-422e-9926-8224853e84a7",
    "instances": [
        {
            "data": "iVBORw0KGgoAAAANSUhEUgAAABwAAAAcCAAAAABXZoBIAAAAw0lEQVR4nGNgGFggVVj4/y8Q2GOR83n+58/fP0DwcSqmpNN7oOTJw6f+/H2pjUU2JCSEk0EWqN0cl828e/FIxvz9/9cCh1zS5z9/G9mwyzl/+PNnKQ45nyNAr9ThMHQ/UG4tDofuB4bQIhz6fIBenMWJQ+7Vn7+zeLCbKXv6z59NOPQVgsIcW4QA9YFi6wNQLrKwsBebW/68DJ388Nun5XFocrqvIFH59+XhBAxThTfeB0r+vP/QHbuDCgr2JmOXoSsAAKK7bU3vISS4AAAAAElFTkSuQmCC"
        }
    ]
}
EOF
  • fire!!
export MASTER_IP=192.168.100.112
export KAFKA_BROKER_INGRESS_PORT=$(kubectl -n knative-eventing get service kafka-broker-ingress -o jsonpath='{.spec.ports[?(@.name=="http-container")].nodePort}')
curl -v "http://${MASTER_IP}:${KAFKA_BROKER_INGRESS_PORT}/kserve-test/isvc-broker" \
  -X POST \
  -H "Ce-Id: $(date +%s)" \
  -H "Ce-Specversion: 1.0" \
  -H "Ce-Type: prediction-request-udf-attr" \
  -H "Ce-Source: event-producer" \
  -H "Content-Type: application/json" \
  -d @./image-with-trace-id.json 
  • check input data in kafka topic knative-broker-kserve-test-isvc-broker
kubectl -n database exec -it deployment/kafka-client-tools -- bash -c \
  'kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --consumer.config $CLIENT_CONFIG_FILE --topic knative-broker-kserve-test-isvc-broker --from-beginning'
{
    "test-trace-id": "16ec3446-48d6-422e-9926-8224853e84a7",
    "instances": [
    {
        "data": "iVBORw0KGgoAAAANSUhEUgAAABwAAAAcCAAAAABXZoBIAAAAw0lEQVR4nGNgGFggVVj4/y8Q2GOR83n+58/fP0DwcSqmpNN7oOTJw6f+/H2pjUU2JCSEk0EWqN0cl828e/FIxvz9/9cCh1zS5z9/G9mwyzl/+PNnKQ45nyNAr9ThMHQ/UG4tDofuB4bQIhz6fIBenMWJQ+7Vn7+zeLCbKXv6z59NOPQVgsIcW4QA9YFi6wNQLrKwsBebW/68DJ388Nun5XFocrqvIFH59+XhBAxThTfeB0r+vP/QHbuDCgr2JmOXoSsAAKK7bU3vISS4AAAAAElFTkSuQmCC"
    }]
}
{
    "predictions": [2] // result will be saved in this topic as well
}
  • check response result in kafka topic test-topic
kubectl -n database exec -it deployment/kafka-client-tools -- bash -c \
  'kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --consumer.config $CLIENT_CONFIG_FILE --topic test-topic --from-beginning'
{
    "specversion": "1.0",
    "id": "822e3115-0185-4752-9967-f408dda72004",
    "source": "data-and-computing/kafka-sink-transformer",
    "type": "org.zhejianglab.zverse.data-and-computing.kafka-sink-transformer",
    "time": "2025-07-02T08:57:04.133497+00:00",
    "data":
    {
        "predictions": [2]
    },
    "request-host": "prediction-and-sink-transformer.kserve-test.svc.cluster.local",
    "kserve-isvc-name": "prediction-and-sink",
    "kserve-isvc-namespace": "kserve-test",
    "test-trace-id": "16ec3446-48d6-422e-9926-8224853e84a7"
}