Build Async Preidction Flow
Flow
flowchart LR A[User Curl] -->|HTTP| B{ISVC-Broker:Kafka} B -->|Subscribe| D[Trigger1] B -->|Subscribe| E[Kserve-Triiger] B -->|Subscribe| F[Trigger3] E --> G[Mnist Service] G --> |Kafka-Sink| B
Setps
1. Create Broker Setting
kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
default.topic.partitions: "10"
default.topic.replication.factor: "1"
bootstrap.servers: "kafka.database.svc.cluster.local:9092" #kafka service address
default.topic.config.retention.ms: "3600"
EOF
2. Create Broker
kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
eventing.knative.dev/broker.class: Kafka
name: isvc-broker
namespace: kserve-test
spec:
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
EOF
3. Create Trigger
kubectl apply -f - << EOF
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: kserve-trigger
namespace: kserve-test
spec:
broker: isvc-broker
filter:
attributes:
type: prediction-request-udf-attr # you can change this
subscriber:
uri: http://prediction-and-sink.kserve-test.svc.cluster.local/v1/models/mnist:predict
EOF
4. Create InferenceService
1kubectl apply -f - <<EOF
2apiVersion: serving.kserve.io/v1beta1
3kind: InferenceService
4metadata:
5 name: prediction-and-sink
6 namespace: kserve-test
7spec:
8 predictor:
9 model:
10 modelFormat:
11 name: pytorch
12 storageUri: gs://kfserving-examples/models/torchserve/image_classifier/v1
13 transformer:
14 containers:
15 - image: docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9
16 name: kserve-container
17 env:
18 - name: KAFKA_BOOTSTRAP_SERVERS
19 value: kafka.database.svc.cluster.local
20 - name: KAFKA_TOPIC
21 value: test-topic # result will be saved in this topic
22 - name: REQUEST_TRACE_KEY
23 value: test-trace-id # using this key to retrieve preidtion result
24 command:
25 - "python"
26 - "-m"
27 - "model"
28 args:
29 - --model_name
30 - mnist
31EOF
[Optional] 5. Invoke InferenceService
- preparation
wget -O ./mnist-input.json https://raw.githubusercontent.com/kserve/kserve/refs/heads/master/docs/samples/v1beta1/torchserve/v1/imgconv/input.json
SERVICE_NAME=prediction-and-sink
MODEL_NAME=mnist
INPUT_PATH=@./mnist-input.json
PLAIN_SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice $SERVICE_NAME -o jsonpath='{.status.url}' | cut -d "/" -f 3)
- fire!!
export INGRESS_HOST=192.168.100.112
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
curl -v -H "Host: ${PLAIN_SERVICE_HOSTNAME}" -H "Content-Type: application/json" -d $INPUT_PATH http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict
6. Invoke Broker
- preparation
cat > image-with-trace-id.json << EOF
{
"test-trace-id": "16ec3446-48d6-422e-9926-8224853e84a7",
"instances": [
{
"data": "iVBORw0KGgoAAAANSUhEUgAAABwAAAAcCAAAAABXZoBIAAAAw0lEQVR4nGNgGFggVVj4/y8Q2GOR83n+58/fP0DwcSqmpNN7oOTJw6f+/H2pjUU2JCSEk0EWqN0cl828e/FIxvz9/9cCh1zS5z9/G9mwyzl/+PNnKQ45nyNAr9ThMHQ/UG4tDofuB4bQIhz6fIBenMWJQ+7Vn7+zeLCbKXv6z59NOPQVgsIcW4QA9YFi6wNQLrKwsBebW/68DJ388Nun5XFocrqvIFH59+XhBAxThTfeB0r+vP/QHbuDCgr2JmOXoSsAAKK7bU3vISS4AAAAAElFTkSuQmCC"
}
]
}
EOF
- fire!!
export MASTER_IP=192.168.100.112
export KAFKA_BROKER_INGRESS_PORT=$(kubectl -n knative-eventing get service kafka-broker-ingress -o jsonpath='{.spec.ports[?(@.name=="http-container")].nodePort}')
curl -v "http://${MASTER_IP}:${KAFKA_BROKER_INGRESS_PORT}/kserve-test/isvc-broker" \
-X POST \
-H "Ce-Id: $(date +%s)" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: prediction-request-udf-attr" \
-H "Ce-Source: event-producer" \
-H "Content-Type: application/json" \
-d @./image-with-trace-id.json
- check input data in kafka topic
knative-broker-kserve-test-isvc-broker
kubectl -n database exec -it deployment/kafka-client-tools -- bash -c \
'kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --consumer.config $CLIENT_CONFIG_FILE --topic knative-broker-kserve-test-isvc-broker --from-beginning'
- check response result in kafka topic
test-topic
kubectl -n database exec -it deployment/kafka-client-tools -- bash -c \
'kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --consumer.config $CLIENT_CONFIG_FILE --topic test-topic --from-beginning'