Kafka Broker Invoke ISVC
1. Prepare RBAC
- create cluster role to access CRD isvc
kubectl apply -f - <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: kserve-access-for-knative
rules:
- apiGroups: ["serving.kserve.io"]
resources: ["inferenceservices", "inferenceservices/status"]
verbs: ["get", "list", "watch"]
EOF
- create rolebinding and grant privileges
kubectl apply -f - <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: kafka-controller-kserve-access
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: kserve-access-for-knative
subjects:
- kind: ServiceAccount
name: kafka-controller
namespace: knative-eventing
EOF
2. Create Broker Setting
kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
default.topic.partitions: "10"
default.topic.replication.factor: "1"
bootstrap.servers: "kafka.database.svc.cluster.local:9092" #kafka service address
default.topic.config.retention.ms: "3600"
EOF
3. Create Broker
kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
eventing.knative.dev/broker.class: Kafka
name: isvc-broker
namespace: kserve-test
spec:
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
delivery:
deadLetterSink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
EOF
4. Create InferenceService
Reference
you can create isvc first-tourchserve
service, by following 🔗link
5. Create Trigger
kubectl apply -f - << EOF
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: kserve-trigger
namespace: kserve-test
spec:
broker: isvc-broker
filter:
attributes:
type: prediction-request
subscriber:
uri: http://first-torchserve.kserve-test.svc.cluster.local/v1/models/mnist:predict
EOF
6. Test
Normally, we can invoke
first-tourchserve
by executing
export MASTER_IP=192.168.100.112
export ISTIO_INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
export SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice first-torchserve -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://first-torchserve.kserve-test.example.com
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" "http://${MASTER_IP}:${ISTIO_INGRESS_PORT}/v1/models/mnist:predict" -d @./mnist-input.json
Now, you can access model by executing
export KAFKA_BROKER_INGRESS_PORT=$(kubectl -n knative-eventing get service kafka-broker-ingress -o jsonpath='{.spec.ports[?(@.name=="http-container")].nodePort}')
curl -v "http://${MASTER_IP}:${KAFKA_BROKER_INGRESS_PORT}/kserve-test/isvc-broker" \
-X POST \
-H "Ce-Id: $(date +%s)" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: prediction-request" \
-H "Ce-Source: event-producer" \
-H "Content-Type: application/json" \
-d @./mnist-input.json