Subsections of Eventing

Broker

Knative Broker 是 Knative Eventing 系统的核心组件,它的主要作用是充当事件路由和分发的中枢,在事件生产者(事件源)和事件消费者(服务)之间提供解耦、可靠的事件传输。

以下是 Knative Broker 的关键作用详解:

事件接收中心:

Broker 是事件流汇聚的入口点。各种事件源(如 Kafka 主题、HTTP 源、Cloud Pub/Sub、GitHub Webhooks、定时器、自定义源等)将事件发送到 Broker。

事件生产者只需知道 Broker 的地址,无需关心最终有哪些消费者或消费者在哪里。

事件存储与缓冲:

Broker 通常基于持久化的消息系统实现(如 Apache Kafka, Google Cloud Pub/Sub, RabbitMQ, NATS Streaming 或内存实现 InMemoryChannel)。这提供了:

持久化: 确保事件在消费者处理前不会丢失(取决于底层通道实现)。

缓冲: 当消费者暂时不可用或处理速度跟不上事件产生速度时,Broker 可以缓冲事件,避免事件丢失或压垮生产者/消费者。

重试: 如果消费者处理事件失败,Broker 可以重新投递事件(通常需要结合 Trigger 和 Subscription 的重试策略)。

解耦事件源和事件消费者:

这是 Broker 最重要的作用之一。事件源只负责将事件发送到 Broker,完全不知道有哪些服务会消费这些事件。

事件消费者通过创建 Trigger 向 Broker 声明它对哪些事件感兴趣。消费者只需知道 Broker 的存在,无需知道事件是从哪个具体源产生的。

这种解耦极大提高了系统的灵活性和可维护性:

独立演进: 可以独立添加、移除或修改事件源或消费者,只要它们遵循 Broker 的契约。

动态路由: 基于事件属性(如 type, source)动态路由事件到不同的消费者,无需修改生产者或消费者代码。

多播: 同一个事件可以被多个不同的消费者同时消费(一个事件 -> Broker -> 多个匹配的 Trigger -> 多个服务)。

事件过滤与路由(通过 Trigger):

Broker 本身不直接处理复杂的过滤逻辑。过滤和路由是由 Trigger 资源实现的。

Trigger 资源绑定到特定的 Broker。

Trigger 定义了:

订阅者: 目标服务(Knative Service、Kubernetes Service、Channel 等)的地址。

过滤器: 基于事件属性(主要是 type 和 source,以及其他可扩展属性)的条件表达式。只有满足条件的事件才会被 Broker 通过该 Trigger 路由到对应的订阅者。

Broker 接收事件后,会检查所有绑定到它的 Trigger 的过滤器。对于每一个匹配的 Trigger,Broker 都会将事件发送到该 Trigger 指定的订阅者。

提供标准事件接口:

Broker 遵循 CloudEvents 规范,它接收和传递的事件都是 CloudEvents 格式的。这为不同来源的事件和不同消费者的处理提供了统一的格式标准,简化了集成。

多租户和命名空间隔离:

Broker 通常部署在 Kubernetes 的特定命名空间中。一个命名空间内可以创建多个 Broker。

这允许在同一个集群内为不同的团队、应用或环境(如 dev, staging)隔离事件流。每个团队/应用可以管理自己命名空间内的 Broker 和 Trigger。

总结比喻:

可以把 Knative Broker 想象成一个高度智能的邮局分拣中心:

接收信件(事件): 来自世界各地(不同事件源)的信件(事件)都寄到这个分拣中心(Broker)。

存储信件: 分拣中心有仓库(持久化/缓冲)临时存放信件,确保信件安全不丢失。

分拣规则(Trigger): 分拣中心里有很多分拣员(Trigger)。每个分拣员负责特定类型或来自特定地区的信件(基于事件属性过滤)。

投递信件: 分拣员(Trigger)找到符合自己负责规则的信件(事件),就把它们投递到正确的收件人(订阅者服务)家门口。

解耦: 寄信人(事件源)只需要知道分拣中心(Broker)的地址,完全不需要知道收信人(消费者)是谁、在哪里。收信人(消费者)只需要告诉分拣中心里负责自己这类信件的分拣员(创建 Trigger)自己的地址,不需要关心信是谁寄来的。分拣中心(Broker)和分拣员(Trigger)负责中间的复杂路由工作。

Broker 带来的核心价值:

松耦合: 彻底解耦事件生产者和消费者。

灵活性: 动态添加/移除消费者,动态改变路由规则(通过修改/创建/删除 Trigger)。

可靠性: 提供事件持久化和重试机制(依赖底层实现)。

可伸缩性: Broker 和消费者都可以独立伸缩。

标准化: 基于 CloudEvents。

简化开发: 开发者专注于业务逻辑(生产事件或消费事件),无需自己搭建复杂的事件总线基础设施。

Subsections of Broker

Install Kafka Broker

About

broker broker

  • Source, curl, kafkaSource,
  • Broker
  • Trigger
  • Sink: ksvc, isvc

Install a Channel (messaging) layer

kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-controller.yaml
configmap/kafka-broker-config created
configmap/kafka-channel-config created
customresourcedefinition.apiextensions.k8s.io/kafkachannels.messaging.knative.dev created
customresourcedefinition.apiextensions.k8s.io/consumers.internal.kafka.eventing.knative.dev created
customresourcedefinition.apiextensions.k8s.io/consumergroups.internal.kafka.eventing.knative.dev created
customresourcedefinition.apiextensions.k8s.io/kafkasinks.eventing.knative.dev created
customresourcedefinition.apiextensions.k8s.io/kafkasources.sources.knative.dev created
clusterrole.rbac.authorization.k8s.io/eventing-kafka-source-observer created
configmap/config-kafka-source-defaults created
configmap/config-kafka-autoscaler created
configmap/config-kafka-features created
configmap/config-kafka-leader-election created
configmap/kafka-config-logging created
configmap/config-namespaced-broker-resources created
configmap/config-tracing configured
clusterrole.rbac.authorization.k8s.io/knative-kafka-addressable-resolver created
clusterrole.rbac.authorization.k8s.io/knative-kafka-channelable-manipulator created
clusterrole.rbac.authorization.k8s.io/kafka-controller created
serviceaccount/kafka-controller created
clusterrolebinding.rbac.authorization.k8s.io/kafka-controller created
clusterrolebinding.rbac.authorization.k8s.io/kafka-controller-addressable-resolver created
deployment.apps/kafka-controller created
clusterrole.rbac.authorization.k8s.io/kafka-webhook-eventing created
serviceaccount/kafka-webhook-eventing created
clusterrolebinding.rbac.authorization.k8s.io/kafka-webhook-eventing created
mutatingwebhookconfiguration.admissionregistration.k8s.io/defaulting.webhook.kafka.eventing.knative.dev created
mutatingwebhookconfiguration.admissionregistration.k8s.io/pods.defaulting.webhook.kafka.eventing.knative.dev created
secret/kafka-webhook-eventing-certs created
validatingwebhookconfiguration.admissionregistration.k8s.io/validation.webhook.kafka.eventing.knative.dev created
deployment.apps/kafka-webhook-eventing created
service/kafka-webhook-eventing created
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-channel.yaml
configmap/config-kafka-channel-data-plane created
clusterrole.rbac.authorization.k8s.io/knative-kafka-channel-data-plane created
serviceaccount/knative-kafka-channel-data-plane created
clusterrolebinding.rbac.authorization.k8s.io/knative-kafka-channel-data-plane created
statefulset.apps/kafka-channel-dispatcher created
deployment.apps/kafka-channel-receiver created
service/kafka-channel-ingress created

Install a Broker layer

kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-broker.yaml
configmap/config-kafka-broker-data-plane created
clusterrole.rbac.authorization.k8s.io/knative-kafka-broker-data-plane created
serviceaccount/knative-kafka-broker-data-plane created
clusterrolebinding.rbac.authorization.k8s.io/knative-kafka-broker-data-plane created
statefulset.apps/kafka-broker-dispatcher created
deployment.apps/kafka-broker-receiver created
service/kafka-broker-ingress created
Reference

please check sts

root@ay-k3s01:~# kubectl -n knative-eventing  get sts
NAME                       READY   AGE
kafka-broker-dispatcher    1/1     19m
kafka-channel-dispatcher   0/0     22m

some sts replia is 0, please check

[Optional] Install Eventing extensions

  • kafka sink
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-sink.yaml
Reference

for more information, you can check 🔗https://knative.dev/docs/eventing/sinks/kafka-sink/

  • kafka source
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-source.yaml
Reference

for more information, you can check 🔗https://knative.dev/docs/eventing/sources/kafka-source/

Display Broker Message

Flow

flowchart LR
    A[Curl] -->|HTTP| B{Broker}
    B -->|Subscribe| D[Trigger1]
    B -->|Subscribe| E[Trigger2]
    B -->|Subscribe| F[Trigger3]
    E --> G[Display Service]

Setps

1. Create Broker Setting

kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  default.topic.partitions: "10"
  default.topic.replication.factor: "1"
  bootstrap.servers: "kafka.database.svc.cluster.local:9092" #kafka service address
  default.topic.config.retention.ms: "3600"
EOF

2. Create Broker

kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: first-broker
  namespace: kserve-test
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing
EOF

deadletterSink:

3. Create Trigger

kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: display-service-trigger
  namespace: kserve-test
spec:
  broker: first-broker
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display
EOF

4. Create Sink Service (Display Message)

kubectl apply -f - <<EOF
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: event-display
  namespace: kserve-test
spec:
  template:
    spec:
      containers:
        - image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
EOF

5. Test

kubectl run curl-test --image=curlimages/curl -it --rm --restart=Never -- \
  -v "http://kafka-broker-ingress.knative-eventing.svc.cluster.local/kserve-test/first-broker" \
  -X POST \
  -H "Ce-Id: $(date +%s)" \
  -H "Ce-Specversion: 1.0" \
  -H "Ce-Type: test.type" \
  -H "Ce-Source: curl-test" \
  -H "Content-Type: application/json" \
  -d '{"test": "Broker is working"}'

6. Check message

kubectl -n kserve-test logs -f deploy/event-display-00001-deployment 
2025/07/02 09:01:25 Failed to read tracing config, using the no-op default: empty json tracing config
☁️  cloudevents.Event
Context Attributes,
  specversion: 1.0
  type: test.type
  source: curl-test
  id: 1751446880
  datacontenttype: application/json
Extensions,
  knativekafkaoffset: 6
  knativekafkapartition: 6
Data,
  {
    "test": "Broker is working"
  }

Kafka Broker Invoke ISVC

1. Prepare RBAC

  • create cluster role to access CRD isvc
kubectl apply -f - <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: kserve-access-for-knative
rules:
- apiGroups: ["serving.kserve.io"]
  resources: ["inferenceservices", "inferenceservices/status"]
  verbs: ["get", "list", "watch"]
EOF
  • create rolebinding and grant privileges
kubectl apply -f - <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: kafka-controller-kserve-access
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: kserve-access-for-knative
subjects:
- kind: ServiceAccount
  name: kafka-controller
  namespace: knative-eventing
EOF

2. Create Broker Setting

kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  default.topic.partitions: "10"
  default.topic.replication.factor: "1"
  bootstrap.servers: "kafka.database.svc.cluster.local:9092" #kafka service address
  default.topic.config.retention.ms: "3600"
EOF

3. Create Broker

kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: isvc-broker
  namespace: kserve-test
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: event-display
EOF

4. Create InferenceService

Reference

you can create isvc first-tourchserve service, by following 🔗link

5. Create Trigger

kubectl apply -f - << EOF
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: kserve-trigger
  namespace: kserve-test
spec:
  broker: isvc-broker
  filter:
    attributes:
      type: prediction-request
  subscriber:
    uri: http://first-torchserve.kserve-test.svc.cluster.local/v1/models/mnist:predict
EOF

6. Test

Normally, we can invoke first-tourchserve by executing

export MASTER_IP=192.168.100.112
export ISTIO_INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
export SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice first-torchserve  -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://first-torchserve.kserve-test.example.com 
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" "http://${MASTER_IP}:${ISTIO_INGRESS_PORT}/v1/models/mnist:predict" -d @./mnist-input.json

Now, you can access model by executing

export KAFKA_BROKER_INGRESS_PORT=$(kubectl -n knative-eventing get service kafka-broker-ingress -o jsonpath='{.spec.ports[?(@.name=="http-container")].nodePort}')
curl -v "http://${MASTER_IP}:${KAFKA_BROKER_INGRESS_PORT}/kserve-test/isvc-broker" \
  -X POST \
  -H "Ce-Id: $(date +%s)" \
  -H "Ce-Specversion: 1.0" \
  -H "Ce-Type: prediction-request" \
  -H "Ce-Source: event-producer" \
  -H "Content-Type: application/json" \
  -d @./mnist-input.json 

please check kafka

# list all topics, find suffix is `isvc-broker` -> knative-broker-kserve-test-isvc-broker
kubectl -n database exec -it deployment/kafka-client-tools -- bash -c \
    'kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVER --command-config $CLIENT_CONFIG_FILE --list'
# retrieve msg from that topic
kubectl -n database exec -it deployment/kafka-client-tools -- bash -c \
  'kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --consumer.config $CLIENT_CONFIG_FILE --topic knative-broker-kserve-test-isvc-broker --from-beginning'

And then, you could see

{
    "instances": [
        {
            "data": "iVBORw0KGgoAAAANSUhEUgAAABwAAAAcCAAAAABXZoBIAAAAw0lEQVR4nGNgGFggVVj4/y8Q2GOR83n+58/fP0DwcSqmpNN7oOTJw6f+/H2pjUU2JCSEk0EWqN0cl828e/FIxvz9/9cCh1zS5z9/G9mwyzl/+PNnKQ45nyNAr9ThMHQ/UG4tDofuB4bQIhz6fIBenMWJQ+7Vn7+zeLCbKXv6z59NOPQVgsIcW4QA9YFi6wNQLrKwsBebW/68DJ388Nun5XFocrqvIFH59+XhBAxThTfeB0r+vP/QHbuDCgr2JmOXoSsAAKK7bU3vISS4AAAAAElFTkSuQmCC"
        }
    ]
}
{
    "predictions": [
        2
    ]
}

Subsections of Plugin

Subsections of Eventing Kafka Broker

Prepare Dev Environment

  1. update go -> 1.24

  2. install ko -> 1.8.0

go install github.com/google/ko@latest
# wget https://github.com/ko-build/ko/releases/download/v0.18.0/ko_0.18.0_Linux_x86_64.tar.gz
# tar -xzf ko_0.18.0_Linux_x86_64.tar.gz  -C /usr/local/bin/ko
# cp /usr/local/bin/ko/ko /root/bin
  1. protoc
PB_REL="https://github.com/protocolbuffers/protobuf/releases"
curl -LO $PB_REL/download/v30.2/protoc-30.2-linux-x86_64.zip
# mkdir -p ${HOME}/bin/
mkdir -p /usr/local/bin/protoc
unzip protoc-30.2-linux-x86_64.zip -d /usr/local/bin/protoc
cp /usr/local/bin/protoc/bin/protoc /root/bin
# export PATH="$PATH:/root/bin"
rm -rf protoc-30.2-linux-x86_64.zip
  1. protoc-gen-go -> 1.5.4
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
export GOPATH=/usr/local/go/bin
  1. copy some code
mkdir -p ${GOPATH}/src/knative.dev
cd ${GOPATH}/src/knative.dev
git clone git@github.com:knative/eventing.git # clone eventing repo
git clone git@github.com:AaronYang0628/eventing-kafka-broker.git
cd eventing-kafka-broker
git remote add upstream https://github.com/knative-extensions/eventing-kafka-broker.git
git remote set-url --push upstream no_push
export KO_DOCKER_REPO=docker-registry.lab.zverse.space/data-and-computing/ay-dev

Build Async Preidction Flow

Flow

flowchart LR
    A[User Curl] -->|HTTP| B{ISVC-Broker:Kafka}
    B -->|Subscribe| D[Trigger1]
    B -->|Subscribe| E[Kserve-Triiger]
    B -->|Subscribe| F[Trigger3]
    E --> G[Mnist Service]
    G --> |Kafka-Sink| B

Setps

1. Create Broker Setting

kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  default.topic.partitions: "10"
  default.topic.replication.factor: "1"
  bootstrap.servers: "kafka.database.svc.cluster.local:9092" #kafka service address
  default.topic.config.retention.ms: "3600"
EOF

2. Create Broker

kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: isvc-broker
  namespace: kserve-test
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing
EOF

3. Create Trigger

kubectl apply -f - << EOF
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: kserve-trigger
  namespace: kserve-test
spec:
  broker: isvc-broker
  filter:
    attributes:
      type: prediction-request-udf-attr # you can change this
  subscriber:
    uri: http://prediction-and-sink.kserve-test.svc.cluster.local/v1/models/mnist:predict
EOF

4. Create InferenceService

 1kubectl apply -f - <<EOF
 2apiVersion: serving.kserve.io/v1beta1
 3kind: InferenceService
 4metadata:
 5  name: prediction-and-sink
 6  namespace: kserve-test
 7spec:
 8  predictor:
 9    model:
10      modelFormat:
11        name: pytorch
12      storageUri: gs://kfserving-examples/models/torchserve/image_classifier/v1
13  transformer:
14    containers:
15      - image: docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9
16        name: kserve-container
17        env:
18        - name: KAFKA_BOOTSTRAP_SERVERS
19          value: kafka.database.svc.cluster.local
20        - name: KAFKA_TOPIC
21          value: test-topic # result will be saved in this topic
22        - name: REQUEST_TRACE_KEY
23          value: test-trace-id # using this key to retrieve preidtion result
24        command:
25          - "python"
26          - "-m"
27          - "model"
28        args:
29          - --model_name
30          - mnist
31EOF
root@ay-k3s01:~# kubectl -n kserve-test get pod
NAME                                                              READY   STATUS    RESTARTS   AGE
prediction-and-sink-predictor-00001-deployment-f64bb76f-jqv4m     2/2     Running   0          3m46s
prediction-and-sink-transformer-00001-deployment-76cccd867lksg9   2/2     Running   0          4m3s

Source code of the docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9 could be found 🔗here

[Optional] 5. Invoke InferenceService

  • preparation
wget -O ./mnist-input.json https://raw.githubusercontent.com/kserve/kserve/refs/heads/master/docs/samples/v1beta1/torchserve/v1/imgconv/input.json
SERVICE_NAME=prediction-and-sink
MODEL_NAME=mnist
INPUT_PATH=@./mnist-input.json
PLAIN_SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice $SERVICE_NAME -o jsonpath='{.status.url}' | cut -d "/" -f 3)
  • fire!!
export INGRESS_HOST=192.168.100.112
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
curl -v -H "Host: ${PLAIN_SERVICE_HOSTNAME}" -H "Content-Type: application/json" -d $INPUT_PATH http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict
curl -v -H "Host: ${PLAIN_SERVICE_HOSTNAME}" -H "Content-Type: application/json" -d $INPUT_PATH http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict
*   Trying 192.168.100.112:31855...
* Connected to 192.168.100.112 (192.168.100.112) port 31855
> POST /v1/models/mnist:predict HTTP/1.1
> Host: prediction-and-sink.kserve-test.ay.test.dev
> User-Agent: curl/8.5.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 401
> 
< HTTP/1.1 200 OK
< content-length: 19
< content-type: application/json
< date: Wed, 02 Jul 2025 08:55:05 GMT,Wed, 02 Jul 2025 08:55:04 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 209
< 
* Connection #0 to host 192.168.100.112 left intact
{"predictions":[2]}

6. Invoke Broker

  • preparation
cat > image-with-trace-id.json << EOF
{
    "test-trace-id": "16ec3446-48d6-422e-9926-8224853e84a7",
    "instances": [
        {
            "data": "iVBORw0KGgoAAAANSUhEUgAAABwAAAAcCAAAAABXZoBIAAAAw0lEQVR4nGNgGFggVVj4/y8Q2GOR83n+58/fP0DwcSqmpNN7oOTJw6f+/H2pjUU2JCSEk0EWqN0cl828e/FIxvz9/9cCh1zS5z9/G9mwyzl/+PNnKQ45nyNAr9ThMHQ/UG4tDofuB4bQIhz6fIBenMWJQ+7Vn7+zeLCbKXv6z59NOPQVgsIcW4QA9YFi6wNQLrKwsBebW/68DJ388Nun5XFocrqvIFH59+XhBAxThTfeB0r+vP/QHbuDCgr2JmOXoSsAAKK7bU3vISS4AAAAAElFTkSuQmCC"
        }
    ]
}
EOF
  • fire!!
export MASTER_IP=192.168.100.112
export KAFKA_BROKER_INGRESS_PORT=$(kubectl -n knative-eventing get service kafka-broker-ingress -o jsonpath='{.spec.ports[?(@.name=="http-container")].nodePort}')
curl -v "http://${MASTER_IP}:${KAFKA_BROKER_INGRESS_PORT}/kserve-test/isvc-broker" \
  -X POST \
  -H "Ce-Id: $(date +%s)" \
  -H "Ce-Specversion: 1.0" \
  -H "Ce-Type: prediction-request-udf-attr" \
  -H "Ce-Source: event-producer" \
  -H "Content-Type: application/json" \
  -d @./image-with-trace-id.json 
  • check input data in kafka topic knative-broker-kserve-test-isvc-broker
kubectl -n database exec -it deployment/kafka-client-tools -- bash -c \
  'kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --consumer.config $CLIENT_CONFIG_FILE --topic knative-broker-kserve-test-isvc-broker --from-beginning'
{
    "test-trace-id": "16ec3446-48d6-422e-9926-8224853e84a7",
    "instances": [
    {
        "data": "iVBORw0KGgoAAAANSUhEUgAAABwAAAAcCAAAAABXZoBIAAAAw0lEQVR4nGNgGFggVVj4/y8Q2GOR83n+58/fP0DwcSqmpNN7oOTJw6f+/H2pjUU2JCSEk0EWqN0cl828e/FIxvz9/9cCh1zS5z9/G9mwyzl/+PNnKQ45nyNAr9ThMHQ/UG4tDofuB4bQIhz6fIBenMWJQ+7Vn7+zeLCbKXv6z59NOPQVgsIcW4QA9YFi6wNQLrKwsBebW/68DJ388Nun5XFocrqvIFH59+XhBAxThTfeB0r+vP/QHbuDCgr2JmOXoSsAAKK7bU3vISS4AAAAAElFTkSuQmCC"
    }]
}
{
    "predictions": [2] // result will be saved in this topic as well
}
  • check response result in kafka topic test-topic
kubectl -n database exec -it deployment/kafka-client-tools -- bash -c \
  'kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --consumer.config $CLIENT_CONFIG_FILE --topic test-topic --from-beginning'
{
    "specversion": "1.0",
    "id": "822e3115-0185-4752-9967-f408dda72004",
    "source": "data-and-computing/kafka-sink-transformer",
    "type": "org.zhejianglab.zverse.data-and-computing.kafka-sink-transformer",
    "time": "2025-07-02T08:57:04.133497+00:00",
    "data":
    {
        "predictions": [2]
    },
    "request-host": "prediction-and-sink-transformer.kserve-test.svc.cluster.local",
    "kserve-isvc-name": "prediction-and-sink",
    "kserve-isvc-namespace": "kserve-test",
    "test-trace-id": "16ec3446-48d6-422e-9926-8224853e84a7"
}