Subsections of ☸️Kubernetes
Prepare k8s Cluster
Building a K8s Cluster, you can choose one of the following methods.
Install Kuberctl
Build Cluster
Prerequisites
Hardware Requirements:
- At least 2 GB of RAM per machine (minimum 1 GB)
- 2 CPUs on the master node
- Full network connectivity among all machines (public or private network)
Operating System:
- Ubuntu 20.04/18.04, CentOS 7/8, or any other supported Linux distribution.
Network Requirements:
- Unique hostname, MAC address, and product_uuid for each node.
- Certain ports need to be open (e.g., 6443, 2379-2380, 10250, 10251, 10252, 10255, etc.)
Disable Swap:
sudo swapoff -a
Steps to Setup Kubernetes Cluster
- Prepare Your Servers Update the Package Index and Install Necessary Packages On all your nodes (both master and worker):
sudo apt-get update
sudo apt-get install -y apt-transport-https ca-certificates curl
Add the Kubernetes APT Repository
curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
cat <<EOF | sudo tee /etc/apt/sources.list.d/kubernetes.list
deb http://apt.kubernetes.io/ kubernetes-xenial main
EOF
Install kubeadm, kubelet, and kubectl
sudo apt-get update
sudo apt-get install -y kubelet kubeadm kubectl
sudo apt-mark hold kubelet kubeadm kubectl
- Initialize the Master Node On the master node, initialize the Kubernetes control plane:
sudo kubeadm init --pod-network-cidr=192.168.0.0/16
The –pod-network-cidr flag is used to set the Pod network range. You might need to adjust this based on your network provider
Set up Local kubeconfig
mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config
- Install a Pod Network Add-on You can install a network add-on like Flannel, Calico, or Weave. For example, to install Calico:
- Join Worker Nodes to the Cluster On each worker node, run the kubeadm join command provided at the end of the kubeadm init output on the master node. It will look something like this:
sudo kubeadm join <master-ip>:6443 --token <token> --discovery-token-ca-cert-hash sha256:<hash>
If you lost the join command, you can create a new token on the master node:
sudo kubeadm token create --print-join-command
- Verify the Cluster Once all nodes have joined, you can verify the cluster status from the master node:
kubectl get nodes
This command should list all your nodes with the status “Ready”.
Subsections of Prepare k8s Cluster
Kind
Preliminary
Kind binary has installed, if not check 🔗link
Hardware Requirements:
- At least 2 GB of RAM per machine (minimum 1 GB)
- 2 CPUs on the master node
- Full network connectivity among all machines (public or private network)
Operating System:
- Ubuntu 22.04/14.04, CentOS 7/8, or any other supported Linux distribution.
Network Requirements:
- Unique hostname, MAC address, and product_uuid for each node.
- Certain ports need to be open (e.g., 6443, 2379-2380, 10250, 10251, 10252, 10255, etc.)
Customize your cluster
Creating a Kubernetes cluster is as simple as kind create cluster
kind create cluster --name test
Reference
and the you can visit https://kind.sigs.k8s.io/docs/user/quick-start/ for mode detail.
K3s
Preliminary
- Server need to have at least 2 cores, 2 GB RAM
- Agent need 1 core , 512 MB RAM
Operating System:
- K3s is expected to work on most modern Linux systems.
Network Requirements:
- The K3s server needs port 6443 to be accessible by all nodes.
- If you wish to utilize the metrics server, all nodes must be accessible to each other on port 10250.
Init server
curl -sfL https://rancher-mirror.rancher.cn/k3s/k3s-install.sh | INSTALL_K3S_MIRROR=cn sh -s - server --cluster-init --flannel-backend=vxlan --node-taint "node-role.kubernetes.io/control-plane=true:NoSchedule"
Get token
cat /var/lib/rancher/k3s/server/node-token
Join worker
curl -sfL https://rancher-mirror.rancher.cn/k3s/k3s-install.sh | INSTALL_K3S_MIRROR=cn K3S_URL=https://<master-ip>:6443 K3S_TOKEN=<join-token> sh -
Copy kubeconfig
mkdir -p $HOME/.kube
cp /etc/rancher/k3s/k3s.yaml $HOME/.kube/config
Minikube
Preliminary
Minikube binary has installed, if not check 🔗link
Hardware Requirements:
- At least 2 GB of RAM per machine (minimum 1 GB)
- 2 CPUs on the master node
- Full network connectivity among all machines (public or private network)
Operating System:
- Ubuntu 20.04/18.04, CentOS 7/8, or any other supported Linux distribution.
Network Requirements:
- Unique hostname, MAC address, and product_uuid for each node.
- Certain ports need to be open (e.g., 6443, 2379-2380, 10250, 10251, 10252, 10255, etc.)
[Optional] Disable aegis service and reboot system for Aliyun
sudo systemctl disable aegis && sudo reboot
Customize your cluster
minikube start --driver=podman --image-mirror-country=cn --kubernetes-version=v1.33.1 --image-repository=registry.cn-hangzhou.aliyuncs.com/google_containers --cpus=6 --memory=20g --disk-size=50g --force
Restart minikube
minikube stop && minikube start
Add alias
alias kubectl="minikube kubectl --"
Forward
ssh -i ~/.minikube/machines/minikube/id_rsa docker@$(minikube ip) -L '*:30443:0.0.0.0:30443' -N -f
and then you can visit https://minikube.sigs.k8s.io/docs/start/ for more detail.
FAQ
Command
Subsections of Command
Kubectl CheatSheet
Switch Context
- use different config
kubectl --kubeconfig /root/.kube/config_ack get pod
Resource
create resource
Resource Fromkubectl create -n <$namespace> -f <$file_url>
helm install <$resource_id> <$resource_id> \ --namespace <$namespace> \ --create-namespace \ --version <$version> \ --repo <$repo_url> \ --values resource.values.yaml \ --atomic
debug resource
kubectl -n <$namespace> describe <$resource_id>
- logging resource
kubectl -n <$namespace> logs -f <$resource_id>
- port forwarding resource
kubectl -n <$namespace> port-forward <$resource_id> --address 0.0.0.0 8080:80 # local:pod
- delete all resource under specific namespace
kubectl delete all --all -n <$namespace>
- delete error pods
kubectl -n <$namespace> delete pods --field-selector status.phase=Failed
- force delete
kubectl -n <$namespace> delete pod <$resource_id> --force --grace-period=0
- opening a Bash Shell inside a Pod
kubectl -n <$namespace> exec -it <$resource_id> -- bash
- copy secret to another namespace
kubectl -n <$namespaceA> get secret <$secret_name> -o json \
| jq 'del(.metadata["namespace","creationTimestamp","resourceVersion","selfLink","uid"])' \
| kubectl -n <$namespaceB> apply -f -
- copy secret to another name
kubectl -n <$namespace> get secret <$old_secret_name> -o json | \
jq 'del(.metadata["namespace","creationTimestamp","resourceVersion","selfLink","uid","ownerReferences","annotations","labels"]) | .metadata.name = "<$new_secret_name>"' | \
kubectl apply -n <$namespace> -f -
- delete all completed job
kubectl delete jobs -n <$namespace> --field-selector status.successful=1
Nodes
- add taint
kubectl taint nodes <$node_ip> <key:value>
- remove taint
kubectl remove taint
- show info extract by json path
kubectl get nodes -o jsonpath='{.items[*].spec.podCIDR}'
Deploy
- rollout show rollout history
kubectl -n <$namespace> rollout history deploy/<$deploy_resource_id>
undo rollout
kubectl -n <$namespace> rollout undo deploy <$deploy_resource_id> --to-revision=1
Helm Chart CheatSheet
Finding Charts
helm search hub wordpress
Adding Repositories
helm repo add ay-helm-mirror https://aaronyang0628.github.io/helm-chart-mirror/charts
helm repo update
Showing Chart Values
helm show values bitnami/wordpress
Packaging Charts
helm package --dependency-update --destination /tmp/ /root/metadata-operator/environments/helm/metadata-environment/charts
Conatiner
- Docker
Install Docker, you can refer to Docker Installation.
- Podman
Install Podman, you can refer to Podman Installation.
Subsections of Conatiner
CheatShett
- remove specific image
podman rmi <$image_id>
- remove all
<none>
images
podman rmi `podamn images | grep '<none>' | awk '{print $3}'`
- remove all stopped containers
podman container prune
- remove all docker images not used
podman image prune
sudo podman volume prune
- find ip address of a container
podman inspect --format='{{.NetworkSettings.IPAddress}}' minio-server
- exec into container
podman run -it <$container_id> /bin/bash
- run with environment
podman run -d --replace
-p 18123:8123 -p 19000:9000 \
--name clickhouse-server \
-e ALLOW_EMPTY_PASSWORD=yes \
--ulimit nofile=262144:262144 \
quay.m.daocloud.io/kryptonite/clickhouse-docker-rootless:20.9.3.45
--ulimit nofile=262144:262144
: 262144 is the maximum users process or for showing maximum user process limit for the logged-in user
ulimit
is admin access required Linux shell command which is used to see, set, or limit the resource usage of the current user. It is used to return the number of open file descriptors for each process. It is also used to set restrictions on the resources used by a process.
- login registry
podman login --tls-verify=false --username=ascm-org-1710208820455 cr.registry.res.cloud.zhejianglab.com -p 'xxxx'
- tag image
podman tag 76fdac66291c cr.registry.res.cloud.zhejianglab.com/ay-dev/datahub-s3-fits:1.0.0
- push image
podman push cr.registry.res.cloud.zhejianglab.com/ay-dev/datahub-s3-fits:1.0.0
- remove specific image
docker rmi <$image_id>
- remove all
<none>
images
docker rmi `docker images | grep '<none>' | awk '{print $3}'`
- remove all stopped containers
docker container prune
- remove all docker images not used
docker image prune
- find ip address of a container
docker inspect --format='{{.NetworkSettings.IPAddress}}' minio-server
- exec into container
docker exec -it <$container_id> /bin/bash
- run with environment
docker run -d --replace -p 18123:8123 -p 19000:9000 --name clickhouse-server -e ALLOW_EMPTY_PASSWORD=yes --ulimit nofile=262144:262144 quay.m.daocloud.io/kryptonite/clickhouse-docker-rootless:20.9.3.45
--ulimit nofile=262144:262144
: sssss
copy file
Copy a local file into container
docker cp ./some_file CONTAINER:/work
or copy files from container to local path
docker cp CONTAINER:/var/logs/ /tmp/app_logs
load a volume
docker run --rm \
--entrypoint bash \
-v $PWD/data:/app:ro \
-it docker.io/minio/mc:latest \
-c "mc --insecure alias set minio https://oss-cn-hangzhou-zjy-d01-a.ops.cloud.zhejianglab.com/ g83B2sji1CbAfjQO 2h8NisFRELiwOn41iXc6sgufED1n1A \
&& mc --insecure ls minio/csst-prod/ \
&& mc --insecure mb --ignore-existing minio/csst-prod/crp-test \
&& mc --insecure cp /app/modify.pdf minio/csst-prod/crp-test/ \
&& mc --insecure ls --recursive minio/csst-prod/"
Template
Subsections of Template
DevContainer Template
Subsections of DevContainer Template
Java 21 + Go 1.24
prepare .devcontainer.json
{
"name": "Go & Java DevContainer",
"build": {
"dockerfile": "Dockerfile"
},
"mounts": [
"source=/root/.kube/config,target=/root/.kube/config,type=bind",
"source=/root/.minikube/profiles/minikube/client.crt,target=/root/.minikube/profiles/minikube/client.crt,type=bind",
"source=/root/.minikube/profiles/minikube/client.key,target=/root/.minikube/profiles/minikube/client.key,type=bind",
"source=/root/.minikube/ca.crt,target=/root/.minikube/ca.crt,type=bind"
],
"customizations": {
"vscode": {
"extensions": [
"golang.go",
"vscjava.vscode-java-pack",
"redhat.java",
"vscjava.vscode-maven",
"Alibaba-Cloud.tongyi-lingma",
"vscjava.vscode-java-debug",
"vscjava.vscode-java-dependency",
"vscjava.vscode-java-test"
]
}
},
"remoteUser": "root",
"postCreateCommand": "go version && java -version && mvn -v"
}
prepare Dockerfile
FROM m.daocloud.io/docker.io/ubuntu:24.04
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && \
apt-get install -y --no-install-recommends \
ca-certificates \
curl \
git \
wget \
gnupg \
vim \
lsb-release \
apt-transport-https \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
# install OpenJDK 21
RUN mkdir -p /etc/apt/keyrings && \
wget -qO - https://packages.adoptium.net/artifactory/api/gpg/key/public | gpg --dearmor -o /etc/apt/keyrings/adoptium.gpg && \
echo "deb [signed-by=/etc/apt/keyrings/adoptium.gpg arch=amd64] https://packages.adoptium.net/artifactory/deb $(awk -F= '/^VERSION_CODENAME/{print$2}' /etc/os-release) main" | tee /etc/apt/sources.list.d/adoptium.list > /dev/null && \
apt-get update && \
apt-get install -y temurin-21-jdk && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# set java env
ENV JAVA_HOME=/usr/lib/jvm/temurin-21-jdk-amd64
# install maven
ARG MAVEN_VERSION=3.9.10
RUN wget https://dlcdn.apache.org/maven/maven-3/${MAVEN_VERSION}/binaries/apache-maven-${MAVEN_VERSION}-bin.tar.gz -O /tmp/maven.tar.gz && \
mkdir -p /opt/maven && \
tar -C /opt/maven -xzf /tmp/maven.tar.gz --strip-components=1 && \
rm /tmp/maven.tar.gz
ENV MAVEN_HOME=/opt/maven
ENV PATH="${MAVEN_HOME}/bin:${PATH}"
# install go 1.24.4
ARG GO_VERSION=1.24.4
RUN wget https://dl.google.com/go/go${GO_VERSION}.linux-amd64.tar.gz -O /tmp/go.tar.gz && \
tar -C /usr/local -xzf /tmp/go.tar.gz && \
rm /tmp/go.tar.gz
# set go env
ENV GOROOT=/usr/local/go
ENV GOPATH=/go
ENV PATH="${GOROOT}/bin:${GOPATH}/bin:${PATH}"
# install other binarys
ARG KUBECTL_VERSION=v1.33.0
RUN wget https://files.m.daocloud.io/dl.k8s.io/release/${KUBECTL_VERSION}/bin/linux/amd64/kubectl -O /tmp/kubectl && \
chmod u+x /tmp/kubectl && \
mv -f /tmp/kubectl /usr/local/bin/kubectl
ARG HELM_VERSION=v3.13.3
RUN wget https://files.m.daocloud.io/get.helm.sh/helm-${HELM_VERSION}-linux-amd64.tar.gz -O /tmp/helm-${HELM_VERSION}-linux-amd64.tar.gz && \
mkdir -p /opt/helm && \
tar -C /opt/helm -xzf /tmp/helm-${HELM_VERSION}-linux-amd64.tar.gz && \
rm /tmp/helm-${HELM_VERSION}-linux-amd64.tar.gz
ENV HELM_HOME=/opt/helm/linux-amd64
ENV PATH="${HELM_HOME}:${PATH}"
USER root
WORKDIR /workspace
DEV
Subsections of DEV
Devpod
Preliminary
1. Get provider config
# just copy ~/.kube/config
for example, the original config
apiVersion: v1
clusters:
- cluster:
certificate-authority: <$file_path>
extensions:
- extension:
provider: minikube.sigs.k8s.io
version: v1.33.0
name: cluster_info
server: https://<$minikube_ip>:8443
name: minikube
contexts:
- context:
cluster: minikube
extensions:
- extension:
provider: minikube.sigs.k8s.io
version: v1.33.0
name: context_info
namespace: default
user: minikube
name: minikube
current-context: minikube
kind: Config
preferences: {}
users:
- name: minikube
user:
client-certificate: <$file_path>
client-key: <$file_path>
you need to rename clusters.cluster.certificate-authority
, clusters.cluster.server
, users.user.client-certificate
, users.user.client-key
.
clusters.cluster.certificate-authority -> clusters.cluster.certificate-authority-data
clusters.cluster.server -> ip set to `localhost`
users.user.client-certificate -> users.user.client-certificate-data
users.user.client-key -> users.user.client-key-data
the data you paste after each key should be base64
cat <$file_path> | base64
then, modified config file should be look like this:
apiVersion: v1
clusters:
- cluster:
certificate-authority-data: xxxxxxxxxxxxxx
extensions:
- extension:
provider: minikube.sigs.k8s.io
version: v1.33.0
name: cluster_info
server: https://127.0.0.1:8443
name: minikube
contexts:
- context:
cluster: minikube
extensions:
- extension:
provider: minikube.sigs.k8s.io
version: v1.33.0
name: context_info
namespace: default
user: minikube
name: minikube
current-context: minikube
kind: Config
preferences: {}
users:
- name: minikube
user:
client-certificate-data: xxxxxxxxxxxx
client-key-data: xxxxxxxxxxxxxxxx
then we should forward minikube port in your own pc
#where you host minikube
MACHINE_IP_ADDRESS=10.200.60.102
USER=ayay
MINIKUBE_IP_ADDRESS=$(ssh -o 'UserKnownHostsFile /dev/null' $USER@$MACHINE_IP_ADDRESS '$HOME/bin/minikube ip')
ssh -o 'UserKnownHostsFile /dev/null' $USER@$MACHINE_IP_ADDRESS -L "*:8443:$MINIKUBE_IP_ADDRESS:8443" -N -f
2. Create workspace
- get git repo link
- choose appropriate provider
- choose ide type and version
- and go!
Useful Command
for more information, you can check 🔗link to install kubectl
How to use it in devpod
Everything works fine.
when you in pod, and using kubectl you should change
clusters.cluster.server
in~/.kube/config
to https://<$minikube_ip>:8443exec into devpod
kubectl -n devpod exec -it <$resource_id> -c devpod -- bin/bash
- add DNS item
10.aaa.bbb.ccc gitee.zhejianglab.com
- shutdown ssh tunnel
# check if port 8443 is already open netstat -aon|findstr "8443" # find PID ps | grep ssh # kill the process taskkill /PID <$PID> /T /F
# check if port 8443 is already open netstat -aon|findstr "8443" # find PID ps | grep ssh # kill the process kill -9 <$PID>
Dev Conatiner
write .devcontainer.json
Deploy
Operator
Subsections of Operator
KubeBuilder
Basic
Kubebuilder 是一个使用 CRDs 构建 K8s API 的 SDK,主要是:
- 基于 controller-runtime 以及 client-go 构建
- 提供一套可扩展的 API 框架,方便用户从零开始开发 CRDs 和 Controllers 和 Admission Webhooks 来扩展 K8s。
- 还提供脚手架工具初始化 CRDs 工程,自动生成 boilerplate 模板代码和配置;
Architecture
Main.go
import (
_ "k8s.io/client-go/plugin/pkg/client/auth"
ctrl "sigs.k8s.io/controller-runtime"
)
// nolint:gocyclo
func main() {
...
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{}
...
if err = (&controller.GuestbookReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Guestbook")
os.Exit(1)
}
...
if os.Getenv("ENABLE_WEBHOOKS") != "false" {
if err = webhookwebappv1.SetupGuestbookWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "Guestbook")
os.Exit(1)
}
}
Manager
Manager是核心组件,可以协调多个控制器、处理缓存、客户端、领导选举等,来自https://github.com/kubernetes-sigs/controller-runtime/blob/v0.20.0/pkg/manager/manager.go
- Client 承担了与 Kubernetes API Server 通信、操作资源对象、读写缓存等关键职责; 分为两类:
- Reader:优先读Cache, 避免频繁访问 API Server, Get后放缓存
- Writer: 支持写操作(Create、Update、Delete、Patch),直接与 API Server 交互。
- informers 是 client-go 提供的核心组件,用于监听(Watch)Kubernetes API Server 中特定资源类型(如 Pod、Deployment 或自定义 CRD)的变更事件(Create/Update/Delete)。
- Client 依赖 Informer 机制自动同步缓存。当 API Server 中资源变更时,Informer 会定时更新本地缓存,确保后续读操作获取最新数据。
- Cache
- Cache 通过 内置的client 的 ListWatcher机制 监听 API Server 的资源变更。
- 事件被写入本地缓存(如 Indexer),避免频繁访问 API Server。
- 缓存(Cache)的作用是减少对API Server的直接请求,同时保证控制器能够快速读取资源的最新状态。
- Event
Kubernetes API Server 通过 HTTP 长连接 推送资源变更事件,client-go 的 Informer 负责监听这些消息。
- Event:事件是Kubernetes API Server与Controller之间传递的信息,包含资源类型、资源名称、事件类型(ADDED、MODIFIED、DELETED)等信息,并转换成requets, check link
- API Server → Manager的Informer → Cache → Controller的Watch → Predicate过滤 → WorkQueue → Controller的Reconcile()方法
Controller
It’s a controller’s job to ensure that, for any given object the actual state of the world matches the desired state in the object. Each controller focuses on one root Kind, but may interact with other Kinds.
func (r *GuestbookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
...
}
func (r *GuestbookReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&webappv1.Guestbook{}).
Named("guestbook").
Complete(r)
}
If you wanna build your own controller, please check https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/controllers.md
每个Controller在初始化时会向Manager注册它关心的资源类型(例如通过Owns(&v1.Pod{})声明关注Pod资源)。
Manager根据Controller的注册信息,为相关资源创建对应的Informer和Watch, check link
当资源变更事件发生时,Informer会将事件从缓存中取出,并通过Predicate(过滤器)判断是否需要触发协调逻辑。
若事件通过过滤,Controller会将事件加入队列(WorkQueue),最终调用用户实现的Reconcile()函数进行处理, check link
func (c *Controller[request]) Start(ctx context.Context) error {
c.ctx = ctx
queue := c.NewQueue(c.Name, c.RateLimiter)
c.Queue = &priorityQueueWrapper[request]{TypedRateLimitingInterface: queue}
err := func() error {
// start to sync event sources
if err := c.startEventSources(ctx); err != nil {
return err
}
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go func() {
for c.processNextWorkItem(ctx) {
}
}()
}
}()
c.LogConstructor(nil).Info("All workers finished")
}
func (c *Controller[request]) processNextWorkItem(ctx context.Context) bool {
obj, priority, shutdown := c.Queue.GetWithPriority()
c.reconcileHandler(ctx, obj, priority)
}
Webhook
Webhooks are a mechanism to intercept requests to the Kubernetes API server. They can be used to validate, mutate, or even proxy requests.
func (d *GuestbookCustomDefaulter) Default(ctx context.Context, obj runtime.Object) error {}
func (v *GuestbookCustomValidator) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {}
func (v *GuestbookCustomValidator) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) {}
func (v *GuestbookCustomValidator) ValidateDelete(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {}
func SetupGuestbookWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).For(&webappv1.Guestbook{}).
WithValidator(&GuestbookCustomValidator{}).
WithDefaulter(&GuestbookCustomDefaulter{}).
Complete()
}
Links
Subsections of KubeBuilder
Quick Start
Prerequisites
- go version v1.23.0+
- docker version 17.03+.
- kubectl version v1.11.3+.
- Access to a Kubernetes v1.11.3+ cluster.
Installation
# download kubebuilder and install locally.
curl -L -o kubebuilder "https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH)"
chmod +x kubebuilder && sudo mv kubebuilder /usr/local/bin/
Create A Project
mkdir -p ~/projects/guestbook
cd ~/projects/guestbook
kubebuilder init --domain my.domain --repo my.domain/guestbook
Create An API
kubebuilder create api --group webapp --version v1 --kind Guestbook
Prepare a K8s Cluster
minikube start --kubernetes-version=v1.27.10 --image-mirror-country=cn --image-repository=registry.cn-hangzhou.aliyuncs.com/google_containers --cpus=4 --memory=4g --disk-size=50g --force
asdasda
Install CRDs
check installed crds in k8s
kubectl get crds
install guestbook
crd in k8s
cd ~/projects/guestbook
make install
uninstall CRDs
make uninstall
make undeploy
Deploy to cluster
make docker-build IMG=aaron666/guestbook-operator:test
make docker-build docker-push IMG=<some-registry>/<project-name>:tag
make deploy IMG=<some-registry>/<project-name>:tag
Operator-SDK
Links
Proxy
Subsections of Proxy
Daocloud
1. install container tools
systemctl stop firewalld && systemctl disable firewalld
sudo dnf install -y podman
podman run -d -P m.daocloud.io/docker.io/library/nginx
KubeVPN
1.install krew
- download and install
krew
- download and install
- Add the $HOME/.krew/bin directory to your PATH environment variable.
export PATH="${KREW_ROOT:-$HOME/.krew}/bin:$PATH"
- Run kubectl krew to check the installation
kubectl krew list
2. Download from kubevpn source from github
kubectl krew index add kubevpn https://gitclone.com/github.com/kubenetworks/kubevpn.git
kubectl krew install kubevpn/kubevpn
kubectl kubevpn
3. Deploy VPN in some cluster
Using different config to access different cluster and deploy vpn in that k8s.
kubectl kubevpn connect
Your terminal should look like this:
➜ ~ kubectl kubevpn connect
Password:
Starting connect
Getting network CIDR from cluster info...
Getting network CIDR from CNI...
Getting network CIDR from services...
Labeling Namespace default
Creating ServiceAccount kubevpn-traffic-manager
Creating Roles kubevpn-traffic-manager
Creating RoleBinding kubevpn-traffic-manager
Creating Service kubevpn-traffic-manager
Creating MutatingWebhookConfiguration kubevpn-traffic-manager
Creating Deployment kubevpn-traffic-manager
Pod kubevpn-traffic-manager-66d969fd45-9zlbp is Pending
Container Reason Message
control-plane ContainerCreating
vpn ContainerCreating
webhook ContainerCreating
Pod kubevpn-traffic-manager-66d969fd45-9zlbp is Running
Container Reason Message
control-plane ContainerRunning
vpn ContainerRunning
webhook ContainerRunning
Forwarding port...
Connected tunnel
Adding route...
Configured DNS service
+----------------------------------------------------------+
| Now you can access resources in the kubernetes cluster ! |
+----------------------------------------------------------+
already connected to cluster network, use command kubectl kubevpn status
to check status
➜ ~ kubectl kubevpn status
ID Mode Cluster Kubeconfig Namespace Status Netif
0 full ops-dev /root/.kube/zverse_config data-and-computing Connected utun0
use pod productpage-788df7ff7f-jpkcs
IP 172.29.2.134
➜ ~ kubectl get pods -o wide
NAME AGE IP NODE NOMINATED NODE GATES
authors-dbb57d856-mbgqk 7d23h 172.29.2.132 192.168.0.5 <none>
details-7d8b5f6bcf-hcl4t 61d 172.29.0.77 192.168.104.255 <none>
kubevpn-traffic-manager-66d969fd45-9zlbp 74s 172.29.2.136 192.168.0.5 <none>
productpage-788df7ff7f-jpkcs 61d 172.29.2.134 192.168.0.5 <none>
ratings-77b6cd4499-zvl6c 61d 172.29.0.86 192.168.104.255 <none>
reviews-85c88894d9-vgkxd 24d 172.29.2.249 192.168.0.5 <none>
use ping
to test connection, seems good
➜ ~ ping 172.29.2.134
PING 172.29.2.134 (172.29.2.134): 56 data bytes
64 bytes from 172.29.2.134: icmp_seq=0 ttl=63 time=55.727 ms
64 bytes from 172.29.2.134: icmp_seq=1 ttl=63 time=56.270 ms
64 bytes from 172.29.2.134: icmp_seq=2 ttl=63 time=55.228 ms
64 bytes from 172.29.2.134: icmp_seq=3 ttl=63 time=54.293 ms
^C
--- 172.29.2.134 ping statistics ---
4 packets transmitted, 4 packets received, 0.0% packet loss
round-trip min/avg/max/stddev = 54.293/55.380/56.270/0.728 ms
use service productpage
IP 172.21.10.49
➜ ~ kubectl get services -o wide
NAME TYPE CLUSTER-IP PORT(S) SELECTOR
authors ClusterIP 172.21.5.160 9080/TCP app=authors
details ClusterIP 172.21.6.183 9080/TCP app=details
kubernetes ClusterIP 172.21.0.1 443/TCP <none>
kubevpn-traffic-manager ClusterIP 172.21.2.86 84xxxxxx0/TCP app=kubevpn-traffic-manager
productpage ClusterIP 172.21.10.49 9080/TCP app=productpage
ratings ClusterIP 172.21.3.247 9080/TCP app=ratings
reviews ClusterIP 172.21.8.24 9080/TCP app=reviews
use command curl
to test service connection
➜ ~ curl 172.21.10.49:9080
<!DOCTYPE html>
<html>
<head>
<title>Simple Bookstore App</title>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
seems good too~
Serverless
Subsections of Serverless
Kserve
Subsections of Kserve
Install Kserve
Preliminary
Installation
Preliminary
1. Kubernetes has installed, if not check 🔗link2. Helm binary has installed, if not check 🔗link1.install from script directly
curl -s "https://raw.githubusercontent.com/kserve/kserve/release-0.15/hack/quick_install.sh" | bash
Installing Gateway API CRDs …
…
😀 Successfully installed Istio
😀 Successfully installed Cert Manager
😀 Successfully installed Knative
Preliminary
1. If you have only one node in your cluster, you need at least 6 CPUs, 6 GB of memory, and 30 GB of disk storage.2. If you have multiple nodes in your cluster, for each node you need at least 2 CPUs, 4 GB of memory, and 20 GB of disk storage.1.install knative serving CRD resources
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.18.0/serving-crds.yaml
2.install knative serving components
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.18.0/serving-core.yaml
# kubectl apply -f https://raw.githubusercontent.com/AaronYang0628/assets/refs/heads/main/knative/serving/release/download/knative-v1.18.0/serving-core.yaml
3.install network layer Istio
kubectl apply -l knative.dev/crd-install=true -f https://github.com/knative/net-istio/releases/download/knative-v1.18.0/istio.yaml
kubectl apply -f https://github.com/knative/net-istio/releases/download/knative-v1.18.0/istio.yaml
kubectl apply -f https://github.com/knative/net-istio/releases/download/knative-v1.18.0/net-istio.yaml
4.install cert manager
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.17.2/cert-manager.yaml
5.install kserve
kubectl apply --server-side -f https://github.com/kserve/kserve/releases/download/v0.15.0/kserve.yaml
kubectl apply --server-side -f https://github.com/kserve/kserve/releases/download/v0.15.0/kserve-cluster-resources.yaml
for more information, you can check 🔗https://artifacthub.io/packages/helm/prometheus-community/prometheus
Preliminary
1. Kubernetes has installed, if not check 🔗link2. ArgoCD has installed, if not check 🔗link3. Helm binary has installed, if not check 🔗link1.install gateway API CRDs
kubectl apply -f https://github.com/kubernetes-sigs/gateway-api/releases/download/v1.3.0/standard-install.yaml
2.install cert manager
following 🔗link to install cert manager
3.install istio system
following 🔗link to install three istio components (istio-base, istiod, istio-ingressgateway)
4.install Knative Operator
kubectl -n argocd apply -f - << EOF
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: knative-operator
spec:
syncPolicy:
syncOptions:
- CreateNamespace=true
project: default
source:
repoURL: https://knative.github.io/operator
chart: knative-operator
targetRevision: v1.18.1
helm:
releaseName: knative-operator
values: |
knative_operator:
knative_operator:
image: m.daocloud.io/gcr.io/knative-releases/knative.dev/operator/cmd/operator
tag: v1.18.1
resources:
requests:
cpu: 100m
memory: 100Mi
limits:
cpu: 1000m
memory: 1000Mi
operator_webhook:
image: m.daocloud.io/gcr.io/knative-releases/knative.dev/operator/cmd/webhook
tag: v1.18.1
resources:
requests:
cpu: 100m
memory: 100Mi
limits:
cpu: 500m
memory: 500Mi
destination:
server: https://kubernetes.default.svc
namespace: knative-serving
EOF
5.sync by argocd
argocd app sync argocd/knative-operator
6.install kserve serving CRD
kubectl apply -f - <<EOF
apiVersion: operator.knative.dev/v1beta1
kind: KnativeServing
metadata:
name: knative-serving
namespace: knative-serving
spec:
version: 1.18.0 # this is knative serving version
config:
domain:
example.com: ""
EOF
7.install kserve CRD
kubectl -n argocd apply -f - << EOF
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: kserve-crd
annotations:
argocd.argoproj.io/sync-options: ServerSideApply=true
argocd.argoproj.io/compare-options: IgnoreExtraneous
spec:
syncPolicy:
syncOptions:
- CreateNamespace=true
- ServerSideApply=true
project: default
source:
repoURL: https://aaronyang0628.github.io/helm-chart-mirror/charts
chart: kserve-crd
targetRevision: v0.15.2
helm:
releaseName: kserve-crd
destination:
server: https://kubernetes.default.svc
namespace: kserve
EOF
8.sync by argocd
argocd app sync argocd/kserve-crd
9.install kserve Controller
kubectl -n argocd apply -f - << EOF
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: kserve
annotations:
argocd.argoproj.io/sync-options: ServerSideApply=true
argocd.argoproj.io/compare-options: IgnoreExtraneous
spec:
syncPolicy:
syncOptions:
- CreateNamespace=true
- ServerSideApply=true
project: default
source:
repoURL: https://aaronyang0628.github.io/helm-chart-mirror/charts
chart: kserve
targetRevision: v0.15.2
helm:
releaseName: kserve
values: |
kserve:
agent:
image: m.daocloud.io/docker.io/kserve/agent
router:
image: m.daocloud.io/docker.io/kserve/router
storage:
image: m.daocloud.io/docker.io/kserve/storage-initializer
s3:
accessKeyIdName: AWS_ACCESS_KEY_ID
secretAccessKeyName: AWS_SECRET_ACCESS_KEY
endpoint: ""
region: ""
verifySSL: ""
useVirtualBucket: ""
useAnonymousCredential: ""
controller:
deploymentMode: "Serverless"
rbacProxyImage: m.daocloud.io/quay.io/brancz/kube-rbac-proxy:v0.18.0
rbacProxy:
resources:
limits:
cpu: 100m
memory: 300Mi
requests:
cpu: 100m
memory: 300Mi
gateway:
domain: example.com
image: m.daocloud.io/docker.io/kserve/kserve-controller
resources:
limits:
cpu: 100m
memory: 300Mi
requests:
cpu: 100m
memory: 300Mi
servingruntime:
tensorflow:
image: tensorflow/serving
tag: 2.6.2
mlserver:
image: m.daocloud.io/docker.io/seldonio/mlserver
tag: 1.5.0
sklearnserver:
image: m.daocloud.io/docker.io/kserve/sklearnserver
xgbserver:
image: m.daocloud.io/docker.io/kserve/xgbserver
huggingfaceserver:
image: m.daocloud.io/docker.io/kserve/huggingfaceserver
devShm:
enabled: false
sizeLimit: ""
hostIPC:
enabled: false
huggingfaceserver_multinode:
shm:
enabled: true
sizeLimit: "3Gi"
tritonserver:
image: nvcr.io/nvidia/tritonserver
pmmlserver:
image: m.daocloud.io/docker.io/kserve/pmmlserver
paddleserver:
image: m.daocloud.io/docker.io/kserve/paddleserver
lgbserver:
image: m.daocloud.io/docker.io/kserve/lgbserver
torchserve:
image: pytorch/torchserve-kfs
tag: 0.9.0
art:
image: m.daocloud.io/docker.io/kserve/art-explainer
localmodel:
enabled: false
controller:
image: m.daocloud.io/docker.io/kserve/kserve-localmodel-controller
jobNamespace: kserve-localmodel-jobs
agent:
hostPath: /mnt/models
image: m.daocloud.io/docker.io/kserve/kserve-localmodelnode-agent
inferenceservice:
resources:
limits:
cpu: "1"
memory: "2Gi"
requests:
cpu: "1"
memory: "2Gi"
destination:
server: https://kubernetes.default.svc
namespace: kserve
EOF
10.sync by argocd
argocd app sync argocd/kserve
11.install kserve eventing CRD
kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.18.1/eventing-crds.yaml
12.install kserve eventing
kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.18.1/eventing-core.yaml
FAQ
Serving
Subsections of Serving
Inference
Subsections of Inference
First Pytorch ISVC
Mnist Inference
More Information about
mnist
service can be found 🔗link
- create a namespace
kubectl create namespace kserve-test
- deploy a sample
iris
service
kubectl apply -n kserve-test -f - <<EOF
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "first-torchserve"
namespace: kserve-test
spec:
predictor:
model:
modelFormat:
name: pytorch
storageUri: gs://kfserving-examples/models/torchserve/image_classifier/v1
resources:
limits:
memory: 4Gi
EOF
- Check
InferenceService
status
kubectl -n kserve-test get inferenceservices first-torchserve
After all pods are ready, you can access the service by using the following command
If the EXTERNAL-IP value is set, your environment has an external load balancer that you can use for the ingress gateway.
export INGRESS_HOST=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
If the EXTERNAL-IP value is none (or perpetually pending), your environment does not provide an external load balancer for the ingress gateway. In this case, you can access the gateway using the service’s node port.
export INGRESS_HOST=$(minikube ip)
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
export INGRESS_HOST=$(minikube ip)
kubectl port-forward --namespace istio-system svc/istio-ingressgateway 30080:80
export INGRESS_PORT=30080
- Perform a prediction First, prepare your inference input request inside a file:
wget -O ./mnist-input.json https://raw.githubusercontent.com/kserve/kserve/refs/heads/master/docs/samples/v1beta1/torchserve/v1/imgconv/input.json
- Invoke the service
SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice first-torchserve -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://first-torchserve.kserve-test.example.com
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" "http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/mnist:predict" -d @./mnist-input.json
First Custom Model
AlexNet Inference
More Information about
AlexNet
service can be found 🔗link
- Implement Custom Model using KServe API
1import argparse
2import base64
3import io
4import time
5
6from fastapi.middleware.cors import CORSMiddleware
7from torchvision import models, transforms
8from typing import Dict
9import torch
10from PIL import Image
11
12import kserve
13from kserve import Model, ModelServer, logging
14from kserve.model_server import app
15from kserve.utils.utils import generate_uuid
16
17
18class AlexNetModel(Model):
19 def __init__(self, name: str):
20 super().__init__(name, return_response_headers=True)
21 self.name = name
22 self.load()
23 self.ready = False
24
25 def load(self):
26 self.model = models.alexnet(pretrained=True)
27 self.model.eval()
28 # The ready flag is used by model ready endpoint for readiness probes,
29 # set to True when model is loaded successfully without exceptions.
30 self.ready = True
31
32 async def predict(
33 self,
34 payload: Dict,
35 headers: Dict[str, str] = None,
36 response_headers: Dict[str, str] = None,
37 ) -> Dict:
38 start = time.time()
39 # Input follows the Tensorflow V1 HTTP API for binary values
40 # https://www.tensorflow.org/tfx/serving/api_rest#encoding_binary_values
41 img_data = payload["instances"][0]["image"]["b64"]
42 raw_img_data = base64.b64decode(img_data)
43 input_image = Image.open(io.BytesIO(raw_img_data))
44 preprocess = transforms.Compose([
45 transforms.Resize(256),
46 transforms.CenterCrop(224),
47 transforms.ToTensor(),
48 transforms.Normalize(mean=[0.485, 0.456, 0.406],
49 std=[0.229, 0.224, 0.225]),
50 ])
51 input_tensor = preprocess(input_image).unsqueeze(0)
52 output = self.model(input_tensor)
53 torch.nn.functional.softmax(output, dim=1)
54 values, top_5 = torch.topk(output, 5)
55 result = values.flatten().tolist()
56 end = time.time()
57 response_id = generate_uuid()
58
59 # Custom response headers can be added to the inference response
60 if response_headers is not None:
61 response_headers.update(
62 {"prediction-time-latency": f"{round((end - start) * 1000, 9)}"}
63 )
64
65 return {"predictions": result}
66
67
68parser = argparse.ArgumentParser(parents=[kserve.model_server.parser])
69args, _ = parser.parse_known_args()
70
71if __name__ == "__main__":
72 # Configure kserve and uvicorn logger
73 if args.configure_logging:
74 logging.configure_logging(args.log_config_file)
75 model = AlexNetModel(args.model_name)
76 model.load()
77 # Custom middlewares can be added to the model
78 app.add_middleware(
79 CORSMiddleware,
80 allow_origins=["*"],
81 allow_credentials=True,
82 allow_methods=["*"],
83 allow_headers=["*"],
84 )
85 ModelServer().start([model])
- create
requirements.txt
kserve
torchvision==0.18.0
pillow>=10.3.0,<11.0.0
- create
Dockerfile
FROM m.daocloud.io/docker.io/library/python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY model.py .
CMD ["python", "model.py", "--model_name=custom-model"]
- build and push custom docker image
docker build -t ay-custom-model .
docker tag ddfd0186813e docker-registry.lab.zverse.space/ay/ay-custom-model:latest
docker push docker-registry.lab.zverse.space/ay/ay-custom-model:latest
- create a namespace
kubectl create namespace kserve-test
- deploy a sample
custom-model
service
kubectl apply -n kserve-test -f - <<EOF
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: ay-custom-model
spec:
predictor:
containers:
- name: kserve-container
image: docker-registry.lab.zverse.space/ay/ay-custom-model:latest
EOF
- Check
InferenceService
status
kubectl -n kserve-test get inferenceservices ay-custom-model
After all pods are ready, you can access the service by using the following command
If the EXTERNAL-IP value is set, your environment has an external load balancer that you can use for the ingress gateway.
export INGRESS_HOST=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
If the EXTERNAL-IP value is none (or perpetually pending), your environment does not provide an external load balancer for the ingress gateway. In this case, you can access the gateway using the service’s node port.
export INGRESS_HOST=$(minikube ip)
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
export INGRESS_HOST=$(minikube ip)
kubectl port-forward --namespace istio-system svc/istio-ingressgateway 30080:80
export INGRESS_PORT=30080
- Perform a prediction
First, prepare your inference input request inside a file:
wget -O ./alex-net-input.json https://kserve.github.io/website/0.15/modelserving/v1beta1/custom/custom_model/input.json
- Invoke the service
export SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice ay-custom-model -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://ay-custom-model.kserve-test.example.com
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" -X POST "http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/custom-model:predict" -d @.//alex-net-input.json
First Model In Minio
Inference Model In Minio
More Information about
Deploy InferenceService with a saved model on S3
can be found 🔗link
Create Service Account
=== “yaml”
apiVersion: v1
kind: ServiceAccount
metadata:
name: sa
annotations:
eks.amazonaws.com/role-arn: arn:aws:iam::123456789012:role/s3access # replace with your IAM role ARN
serving.kserve.io/s3-endpoint: s3.amazonaws.com # replace with your s3 endpoint e.g minio-service.kubeflow:9000
serving.kserve.io/s3-usehttps: "1" # by default 1, if testing with minio you can set to 0
serving.kserve.io/s3-region: "us-east-2"
serving.kserve.io/s3-useanoncredential: "false" # omitting this is the same as false, if true will ignore provided credential and use anonymous credentials
=== “kubectl”
kubectl apply -f create-s3-sa.yaml
Create S3 Secret and attach to Service Account
Create a secret with your S3 user credential, KServe
reads the secret annotations to inject the S3 environment variables on storage initializer or model agent to download the models from S3 storage.
Create S3 secret
=== “yaml”
apiVersion: v1
kind: Secret
metadata:
name: s3creds
annotations:
serving.kserve.io/s3-endpoint: s3.amazonaws.com # replace with your s3 endpoint e.g minio-service.kubeflow:9000
serving.kserve.io/s3-usehttps: "1" # by default 1, if testing with minio you can set to 0
serving.kserve.io/s3-region: "us-east-2"
serving.kserve.io/s3-useanoncredential: "false" # omitting this is the same as false, if true will ignore provided credential and use anonymous credentials
type: Opaque
stringData: # use `stringData` for raw credential string or `data` for base64 encoded string
AWS_ACCESS_KEY_ID: XXXX
AWS_SECRET_ACCESS_KEY: XXXXXXXX
Attach secret to a service account
=== “yaml”
apiVersion: v1
kind: ServiceAccount
metadata:
name: sa
secrets:
- name: s3creds
=== “kubectl”
kubectl apply -f create-s3-secret.yaml
!!! note
If you are running kserve with istio sidecars enabled, there can be a race condition between the istio proxy being ready and the agent pulling models. This will result in a tcp dial connection refused
error when the agent tries to download from s3.
To resolve it, istio allows the blocking of other containers in a pod until the proxy container is ready.
You can enabled this by setting `proxy.holdApplicationUntilProxyStarts: true` in `istio-sidecar-injector` configmap, `proxy.holdApplicationUntilProxyStarts` flag was introduced in Istio 1.7 as an experimental feature and is turned off by default.
Deploy the model on S3 with InferenceService
Create the InferenceService with the s3 storageUri
and the service account with s3 credential attached.
=== “New Schema”
```yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "mnist-s3"
spec:
predictor:
serviceAccountName: sa
model:
modelFormat:
name: tensorflow
storageUri: "s3://kserve-examples/mnist"
```
=== “Old Schema”
```yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "mnist-s3"
spec:
predictor:
serviceAccountName: sa
tensorflow:
storageUri: "s3://kserve-examples/mnist"
```
Apply the autoscale-gpu.yaml
.
=== “kubectl”
kubectl apply -f mnist-s3.yaml
Run a prediction
Now, the ingress can be accessed at ${INGRESS_HOST}:${INGRESS_PORT}
or follow this instruction
to find out the ingress IP and port.
SERVICE_HOSTNAME=$(kubectl get inferenceservice mnist-s3 -o jsonpath='{.status.url}' | cut -d "/" -f 3)
MODEL_NAME=mnist-s3
INPUT_PATH=@./input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict -d $INPUT_PATH
!!! success “Expected Output”
```{ .bash .no-copy }
Note: Unnecessary use of -X or --request, POST is already inferred.
* Trying 35.237.217.209...
* TCP_NODELAY set
* Connected to mnist-s3.default.35.237.217.209.xip.io (35.237.217.209) port 80 (#0)
> POST /v1/models/mnist-s3:predict HTTP/1.1
> Host: mnist-s3.default.35.237.217.209.xip.io
> User-Agent: curl/7.55.1
> Accept: */*
> Content-Length: 2052
> Content-Type: application/x-www-form-urlencoded
> Expect: 100-continue
>
< HTTP/1.1 100 Continue
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< content-length: 251
< content-type: application/json
< date: Sun, 04 Apr 2021 20:06:27 GMT
< x-envoy-upstream-service-time: 5
< server: istio-envoy
<
* Connection #0 to host mnist-s3.default.35.237.217.209.xip.io left intact
{
"predictions": [
{
"predictions": [0.327352405, 2.00153053e-07, 0.0113353515, 0.203903764, 3.62863029e-05, 0.416683704, 0.000281196437, 8.36911859e-05, 0.0403052084, 1.82206513e-05],
"classes": 5
}
]
}
```
Kafka Sink Transformer
AlexNet Inference
More Information about
Custom Transformer
service can be found 🔗link
- Implement Custom Transformer
./model.py
using Kserve API
1import os
2import argparse
3import json
4
5from typing import Dict, Union
6from kafka import KafkaProducer
7from cloudevents.http import CloudEvent
8from cloudevents.conversion import to_structured
9
10from kserve import (
11 Model,
12 ModelServer,
13 model_server,
14 logging,
15 InferRequest,
16 InferResponse,
17)
18
19from kserve.logging import logger
20from kserve.utils.utils import generate_uuid
21
22kafka_producer = KafkaProducer(
23 value_serializer=lambda v: json.dumps(v).encode('utf-8'),
24 bootstrap_servers=os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
25)
26
27class ImageTransformer(Model):
28 def __init__(self, name: str):
29 super().__init__(name, return_response_headers=True)
30 self.ready = True
31
32
33 def preprocess(
34 self, payload: Union[Dict, InferRequest], headers: Dict[str, str] = None
35 ) -> Union[Dict, InferRequest]:
36 logger.info("Received inputs %s", payload)
37 logger.info("Received headers %s", headers)
38 self.request_trace_key = os.environ.get('REQUEST_TRACE_KEY', 'algo.trace.requestId')
39 if self.request_trace_key not in payload:
40 logger.error("Request trace key '%s' not found in payload, you cannot trace the prediction result", self.request_trace_key)
41 if "instances" not in payload:
42 raise ValueError(
43 f"Request trace key '{self.request_trace_key}' not found in payload and 'instances' key is missing."
44 )
45 else:
46 headers[self.request_trace_key] = payload.get(self.request_trace_key)
47
48 return {"instances": payload["instances"]}
49
50 def postprocess(
51 self,
52 infer_response: Union[Dict, InferResponse],
53 headers: Dict[str, str] = None,
54 response_headers: Dict[str, str] = None,
55 ) -> Union[Dict, InferResponse]:
56 logger.info("postprocess headers: %s", headers)
57 logger.info("postprocess response headers: %s", response_headers)
58 logger.info("postprocess response: %s", infer_response)
59
60 attributes = {
61 "source": "data-and-computing/kafka-sink-transformer",
62 "type": "org.zhejianglab.zverse.data-and-computing.kafka-sink-transformer",
63 "request-host": headers.get('host', 'unknown'),
64 "kserve-isvc-name": headers.get('kserve-isvc-name', 'unknown'),
65 "kserve-isvc-namespace": headers.get('kserve-isvc-namespace', 'unknown'),
66 self.request_trace_key: headers.get(self.request_trace_key, 'unknown'),
67 }
68
69 _, cloudevent = to_structured(CloudEvent(attributes, infer_response))
70 try:
71 kafka_producer.send(os.environ.get('KAFKA_TOPIC', 'test-topic'), value=cloudevent.decode('utf-8').replace("'", '"'))
72 kafka_producer.flush()
73 except Exception as e:
74 logger.error("Failed to send message to Kafka: %s", e)
75 return infer_response
76
77parser = argparse.ArgumentParser(parents=[model_server.parser])
78args, _ = parser.parse_known_args()
79
80if __name__ == "__main__":
81 if args.configure_logging:
82 logging.configure_logging(args.log_config_file)
83 logging.logger.info("available model name: %s", args.model_name)
84 logging.logger.info("all args: %s", args.model_name)
85 model = ImageTransformer(args.model_name)
86 ModelServer().start([model])
- modify
./pyproject.toml
[tool.poetry]
name = "custom_transformer"
version = "0.15.2"
description = "Custom Transformer Examples. Not intended for use outside KServe Frameworks Images."
authors = ["Dan Sun <dsun20@bloomberg.net>"]
license = "Apache-2.0"
packages = [
{ include = "*.py" }
]
[tool.poetry.dependencies]
python = ">=3.9,<3.13"
kserve = {path = "../kserve", develop = true}
pillow = "^10.3.0"
kafka-python = "^2.2.15"
cloudevents = "^1.11.1"
[[tool.poetry.source]]
name = "pytorch"
url = "https://download.pytorch.org/whl/cpu"
priority = "explicit"
[tool.poetry.group.test]
optional = true
[tool.poetry.group.test.dependencies]
pytest = "^7.4.4"
mypy = "^0.991"
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
black = { version = "~24.3.0", extras = ["colorama"] }
[tool.poetry-version-plugin]
source = "file"
file_path = "../VERSION"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
- prepare
../custom_transformer.Dockerfile
ARG PYTHON_VERSION=3.11
ARG BASE_IMAGE=python:${PYTHON_VERSION}-slim-bookworm
ARG VENV_PATH=/prod_venv
FROM ${BASE_IMAGE} AS builder
# Install Poetry
ARG POETRY_HOME=/opt/poetry
ARG POETRY_VERSION=1.8.3
RUN python3 -m venv ${POETRY_HOME} && ${POETRY_HOME}/bin/pip install poetry==${POETRY_VERSION}
ENV PATH="$PATH:${POETRY_HOME}/bin"
# Activate virtual env
ARG VENV_PATH
ENV VIRTUAL_ENV=${VENV_PATH}
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
COPY kserve/pyproject.toml kserve/poetry.lock kserve/
RUN cd kserve && poetry install --no-root --no-interaction --no-cache
COPY kserve kserve
RUN cd kserve && poetry install --no-interaction --no-cache
COPY custom_transformer/pyproject.toml custom_transformer/poetry.lock custom_transformer/
RUN cd custom_transformer && poetry install --no-root --no-interaction --no-cache
COPY custom_transformer custom_transformer
RUN cd custom_transformer && poetry install --no-interaction --no-cache
FROM ${BASE_IMAGE} AS prod
COPY third_party third_party
# Activate virtual env
ARG VENV_PATH
ENV VIRTUAL_ENV=${VENV_PATH}
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
RUN useradd kserve -m -u 1000 -d /home/kserve
COPY --from=builder --chown=kserve:kserve $VIRTUAL_ENV $VIRTUAL_ENV
COPY --from=builder kserve kserve
COPY --from=builder custom_transformer custom_transformer
USER 1000
ENTRYPOINT ["python", "-m", "custom_transformer.model"]
- regenerate poetry.lock
poetry lock --no-update
- build and push custom docker image
cd python
podman build -t docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9 -f custom_transformer.Dockerfile .
podman push docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9
Generative
Subsections of Generative
First Generative Service
B --> C[[Knative Serving]] --> D[自动扩缩容/灰度发布]
B --> E[[Istio]] --> F[流量管理/安全]
B --> G[[存储系统]] --> H[S3/GCS/PVC]
### 单YAML部署推理服务
```yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
namespace: kserve-test
spec:
predictor:
model:
modelFormat:
name: sklearn
resources: {}
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model"
check CRD
kubectl -n kserve-test get inferenceservices sklearn-iris
kubectl -n istio-system get svc istio-ingressgateway
export INGRESS_HOST=$(minikube ip)
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice sklearn-iris -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://sklearn-iris.kserve-test.example.com
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" "http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/sklearn-iris:predict" -d @./iris-input.json
How to deploy your own ML model
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: huggingface-llama3
namespace: kserve-test
annotations:
serving.kserve.io/deploymentMode: RawDeployment
serving.kserve.io/autoscalerClass: none
spec:
predictor:
model:
modelFormat:
name: huggingface
storageUri: pvc://llama-3-8b-pvc/hf/8b_instruction_tuned
workerSpec:
pipelineParallelSize: 2
tensorParallelSize: 1
containers:
- name: worker-container
resources:
requests:
nvidia.com/gpu: "8"
Canary Policy
KServe supports canary rollouts for inference services. Canary rollouts allow for a new version of an InferenceService to receive a percentage of traffic. Kserve supports a configurable canary rollout strategy with multiple steps. The rollout strategy can also be implemented to rollback to the previous revision if a rollout step fails.
KServe automatically tracks the last good revision that was rolled out with 100% traffic. The canaryTrafficPercent
field in the component’s spec needs to be set with the percentage of traffic that should be routed to the new revision. KServe will then automatically split the traffic between the last good revision and the revision that is currently being rolled out according to the canaryTrafficPercent
value.
When the first revision of an InferenceService
is deployed, it will receive 100% of the traffic. When multiple revisions are deployed, as in step 2, and the canary rollout strategy is configured to route 10% of the traffic to the new revision, 90% of the traffic will go to the LastestRolledoutRevision
. If there is an unhealthy or bad revision applied, traffic will not be routed to that bad revision. In step 3, the rollout strategy promotes the LatestReadyRevision
from step 2 to the LatestRolledoutRevision
. Since it is now promoted, the LatestRolledoutRevision
gets 100% of the traffic and is fully rolled out. If a rollback needs to happen, 100% of the traffic will be pinned to the previous healthy/good revision- the PreviousRolledoutRevision
.
Reference
For more information, see Canary Rollout.
Subsections of Canary Policy
Rollout Example
Create the InferenceService
Follow the First Inference Service
tutorial. Set up a namespace kserve-test
and create an InferenceService.
After rolling out the first model, 100% traffic goes to the initial model with service revision 1.
kubectl -n kserve-test get isvc sklearn-iris
Apply Canary Rollout Strategy
- Add the
canaryTrafficPercent
field to the predictor component - Update the
storageUri
to use a new/updated model.
kubectl apply -n kserve-test -f - <<EOF
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
namespace: kserve-test
spec:
predictor:
canaryTrafficPercent: 10
model:
args: ["--enable_docs_url=True"]
modelFormat:
name: sklearn
resources: {}
runtime: kserve-sklearnserver
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model-2"
EOF
After rolling out the canary model, traffic is split between the latest ready revision 2 and the previously rolled out revision 1.
kubectl -n kserve-test get isvc sklearn-iris
Check the running pods, you should now see port two pods running for the old and new model and 10% traffic is routed to
the new model. Notice revision 1 contains 0002
in its name, while revision 2 contains 0003
.
kubectl get pods
NAME READY STATUS RESTARTS AGE
sklearn-iris-predictor-00002-deployment-c7bb6c685-ktk7r 2/2 Running 0 71m
sklearn-iris-predictor-00003-deployment-8498d947-fpzcg 2/2 Running 0 20m
Run a prediction
Follow the next two steps (Determine the ingress IP and ports and Perform inference) in the First Inference Service tutorial.
Send more requests to the InferenceService
to observe the 10% of traffic that routes to the new revision.
Promote the canary model
If the canary model is healthy/passes your tests,
you can promote it by removing the canaryTrafficPercent
field and re-applying the InferenceService
custom resource with the same name sklearn-iris
kubectl apply -n kserve-test -f - <<EOF
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
namespace: kserve-test
spec:
predictor:
model:
args: ["--enable_docs_url=True"]
modelFormat:
name: sklearn
resources: {}
runtime: kserve-sklearnserver
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model-2"
EOF
Now all traffic goes to the revision 2 for the new model.
kubectl get isvc sklearn-iris
NAME URL READY PREV LATEST PREVROLLEDOUTREVISION LATESTREADYREVISION AGE
sklearn-iris http://sklearn-iris.kserve-test.example.com True 100 sklearn-iris-predictor-00002 17m
The pods for revision generation 1 automatically scales down to 0 as it is no longer getting the traffic.
kubectl get pods -l serving.kserve.io/inferenceservice=sklearn-iris
NAME READY STATUS RESTARTS AGE
sklearn-iris-predictor-00001-deployment-66c5f5b8d5-gmfvj 1/2 Terminating 0 17m
sklearn-iris-predictor-00002-deployment-5bd9ff46f8-shtzd 2/2 Running 0 15m
Rollback and pin the previous model
You can pin the previous model (model v1, for example) by setting the canaryTrafficPercent
to 0 for the current
model (model v2, for example). This rolls back from model v2 to model v1 and decreases model v2’s traffic to zero.
Apply the custom resource to set model v2’s traffic to 0%.
kubectl apply -n kserve-test -f - <<EOF
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
spec:
predictor:
canaryTrafficPercent: 0
model:
modelFormat:
name: sklearn
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model-2"
EOF
Check the traffic split, now 100% traffic goes to the previous good model (model v1) for revision generation 1.
kubectl get isvc sklearn-iris
NAME URL READY PREV LATEST PREVROLLEDOUTREVISION LATESTREADYREVISION AGE
sklearn-iris http://sklearn-iris.kserve-test.example.com True 100 0 sklearn-iris-predictor-00002 sklearn-iris-predictor-00003 18m
The pods for previous revision (model v1) now routes 100% of the traffic to its pods while the new model (model v2) routes 0% traffic to its pods.
kubectl get pods -l serving.kserve.io/inferenceservice=sklearn-iris
NAME READY STATUS RESTARTS AGE
sklearn-iris-predictor-00002-deployment-66c5f5b8d5-gmfvj 1/2 Running 0 35s
sklearn-iris-predictor-00003-deployment-5bd9ff46f8-shtzd 2/2 Running 0 16m
Route traffic using a tag
You can enable tag based routing by adding the annotation serving.kserve.io/enable-tag-routing
, so traffic can be
explicitly routed to the canary model (model v2) or the old model (model v1) via a tag in the request URL.
Apply model v2 with canaryTrafficPercent: 10
and serving.kserve.io/enable-tag-routing: "true"
.
kubectl apply -n kserve-test -f - <<EOF
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
annotations:
serving.kserve.io/enable-tag-routing: "true"
spec:
predictor:
canaryTrafficPercent: 10
model:
modelFormat:
name: sklearn
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model-2"
EOF
Check the InferenceService status to get the canary and previous model URL.
kubectl get isvc sklearn-iris -ojsonpath="{.status.components.predictor}" | jq
The output should look like
Since we updated the annotation on the InferenceService
, model v2 now corresponds to sklearn-iris-predictor--00003
.
You can now send the request explicitly to the new model or the previous model by using the tag in the request URL. Use
the curl command
from Perform inference and
add latest-
or prev-
to the model name to send a tag based request.
For example, set the model name and use the following commands to send traffic to each service based on the latest
or prev
tag.
curl the latest revision
MODEL_NAME=sklearn-iris
curl -v -H "Host: latest-${MODEL_NAME}-predictor-.kserve-test.example.com" -H "Content-Type: application/json" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict -d @./iris-input.json
or curl the previous revision
curl -v -H "Host: prev-${MODEL_NAME}-predictor-.kserve-test.example.com" -H "Content-Type: application/json" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict -d @./iris-input.json
Auto Scaling
Soft Limit
You can configure InferenceService with annotation autoscaling.knative.dev/target
for a soft limit. The soft limit is a targeted limit rather than a strictly enforced bound, particularly if there is a sudden burst of requests, this value can be exceeded.
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
namespace: kserve-test
annotations:
autoscaling.knative.dev/target: "5"
spec:
predictor:
model:
args: ["--enable_docs_url=True"]
modelFormat:
name: sklearn
resources: {}
runtime: kserve-sklearnserver
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model"
Hard Limit
You can also configure InferenceService with field containerConcurrency
with a hard limit. The hard limit is an enforced upper bound. If concurrency reaches the hard limit, surplus requests will be buffered and must wait until enough capacity is free to execute the requests.
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
namespace: kserve-test
spec:
predictor:
containerConcurrency: 5
model:
args: ["--enable_docs_url=True"]
modelFormat:
name: sklearn
resources: {}
runtime: kserve-sklearnserver
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model"
Scale with QPS
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
namespace: kserve-test
spec:
predictor:
scaleTarget: 1
scaleMetric: qps
model:
args: ["--enable_docs_url=True"]
modelFormat:
name: sklearn
resources: {}
runtime: kserve-sklearnserver
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model"
Scale with GPU
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "flowers-sample-gpu"
namespace: kserve-test
spec:
predictor:
scaleTarget: 1
scaleMetric: concurrency
model:
modelFormat:
name: tensorflow
storageUri: "gs://kfserving-examples/models/tensorflow/flowers"
runtimeVersion: "2.6.2-gpu"
resources:
limits:
nvidia.com/gpu: 1
Enable Scale To Zero
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "sklearn-iris"
namespace: kserve-test
spec:
predictor:
minReplicas: 0
model:
args: ["--enable_docs_url=True"]
modelFormat:
name: sklearn
resources: {}
runtime: kserve-sklearnserver
storageUri: "gs://kfserving-examples/models/sklearn/1.0/model"
Prepare Concurrent Requests Container
# export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
podman run --rm \
-v /root/kserve/iris-input.json:/tmp/iris-input.json \
--privileged \
-e INGRESS_HOST=$(minikube ip) \
-e INGRESS_PORT=32132 \
-e MODEL_NAME=sklearn-iris \
-e INPUT_PATH=/tmp/iris-input.json \
-e SERVICE_HOSTNAME=sklearn-iris.kserve-test.example.com \
-it m.daocloud.io/docker.io/library/golang:1.22 bash -c "go install github.com/rakyll/hey@latest; bash"
Fire
Send traffic in 30 seconds spurts maintaining 5 in-flight requests.
hey -z 30s -c 100 -m POST -host ${SERVICE_HOSTNAME} -D $INPUT_PATH http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict
Reference
For more information, please refer to the KPA documentation.
Knative
Subsections of Knative
Eventing
Subsections of Eventing
Broker
Knative Broker 是 Knative Eventing 系统的核心组件,它的主要作用是充当事件路由和分发的中枢,在事件生产者(事件源)和事件消费者(服务)之间提供解耦、可靠的事件传输。
以下是 Knative Broker 的关键作用详解:
事件接收中心:
Broker 是事件流汇聚的入口点。各种事件源(如 Kafka 主题、HTTP 源、Cloud Pub/Sub、GitHub Webhooks、定时器、自定义源等)将事件发送到 Broker。
事件生产者只需知道 Broker 的地址,无需关心最终有哪些消费者或消费者在哪里。
事件存储与缓冲:
Broker 通常基于持久化的消息系统实现(如 Apache Kafka, Google Cloud Pub/Sub, RabbitMQ, NATS Streaming 或内存实现 InMemoryChannel)。这提供了:
持久化: 确保事件在消费者处理前不会丢失(取决于底层通道实现)。
缓冲: 当消费者暂时不可用或处理速度跟不上事件产生速度时,Broker 可以缓冲事件,避免事件丢失或压垮生产者/消费者。
重试: 如果消费者处理事件失败,Broker 可以重新投递事件(通常需要结合 Trigger 和 Subscription 的重试策略)。
解耦事件源和事件消费者:
这是 Broker 最重要的作用之一。事件源只负责将事件发送到 Broker,完全不知道有哪些服务会消费这些事件。
事件消费者通过创建 Trigger 向 Broker 声明它对哪些事件感兴趣。消费者只需知道 Broker 的存在,无需知道事件是从哪个具体源产生的。
这种解耦极大提高了系统的灵活性和可维护性:
独立演进: 可以独立添加、移除或修改事件源或消费者,只要它们遵循 Broker 的契约。
动态路由: 基于事件属性(如 type, source)动态路由事件到不同的消费者,无需修改生产者或消费者代码。
多播: 同一个事件可以被多个不同的消费者同时消费(一个事件 -> Broker -> 多个匹配的 Trigger -> 多个服务)。
事件过滤与路由(通过 Trigger):
Broker 本身不直接处理复杂的过滤逻辑。过滤和路由是由 Trigger 资源实现的。
Trigger 资源绑定到特定的 Broker。
Trigger 定义了:
订阅者: 目标服务(Knative Service、Kubernetes Service、Channel 等)的地址。
过滤器: 基于事件属性(主要是 type 和 source,以及其他可扩展属性)的条件表达式。只有满足条件的事件才会被 Broker 通过该 Trigger 路由到对应的订阅者。
Broker 接收事件后,会检查所有绑定到它的 Trigger 的过滤器。对于每一个匹配的 Trigger,Broker 都会将事件发送到该 Trigger 指定的订阅者。
提供标准事件接口:
Broker 遵循 CloudEvents 规范,它接收和传递的事件都是 CloudEvents 格式的。这为不同来源的事件和不同消费者的处理提供了统一的格式标准,简化了集成。
多租户和命名空间隔离:
Broker 通常部署在 Kubernetes 的特定命名空间中。一个命名空间内可以创建多个 Broker。
这允许在同一个集群内为不同的团队、应用或环境(如 dev, staging)隔离事件流。每个团队/应用可以管理自己命名空间内的 Broker 和 Trigger。
总结比喻:
可以把 Knative Broker 想象成一个高度智能的邮局分拣中心:
接收信件(事件): 来自世界各地(不同事件源)的信件(事件)都寄到这个分拣中心(Broker)。
存储信件: 分拣中心有仓库(持久化/缓冲)临时存放信件,确保信件安全不丢失。
分拣规则(Trigger): 分拣中心里有很多分拣员(Trigger)。每个分拣员负责特定类型或来自特定地区的信件(基于事件属性过滤)。
投递信件: 分拣员(Trigger)找到符合自己负责规则的信件(事件),就把它们投递到正确的收件人(订阅者服务)家门口。
解耦: 寄信人(事件源)只需要知道分拣中心(Broker)的地址,完全不需要知道收信人(消费者)是谁、在哪里。收信人(消费者)只需要告诉分拣中心里负责自己这类信件的分拣员(创建 Trigger)自己的地址,不需要关心信是谁寄来的。分拣中心(Broker)和分拣员(Trigger)负责中间的复杂路由工作。
Broker 带来的核心价值:
松耦合: 彻底解耦事件生产者和消费者。
灵活性: 动态添加/移除消费者,动态改变路由规则(通过修改/创建/删除 Trigger)。
可靠性: 提供事件持久化和重试机制(依赖底层实现)。
可伸缩性: Broker 和消费者都可以独立伸缩。
标准化: 基于 CloudEvents。
简化开发: 开发者专注于业务逻辑(生产事件或消费事件),无需自己搭建复杂的事件总线基础设施。
Subsections of Broker
Install Kafka Broker
About
- Source, curl, kafkaSource,
- Broker
- Trigger
- Sink: ksvc, isvc
Install a Channel (messaging) layer
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-controller.yaml
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-channel.yaml
Install a Broker layer
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-broker.yaml
for more information, you can check 🔗https://knative.dev/docs/eventing/brokers/broker-types/kafka-broker/
[Optional] Install Eventing extensions
- kafka sink
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-sink.yaml
for more information, you can check 🔗https://knative.dev/docs/eventing/sinks/kafka-sink/
- kafka source
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.18.0/eventing-kafka-source.yaml
for more information, you can check 🔗https://knative.dev/docs/eventing/sources/kafka-source/
Display Broker Message
Flow
flowchart LR A[Curl] -->|HTTP| B{Broker} B -->|Subscribe| D[Trigger1] B -->|Subscribe| E[Trigger2] B -->|Subscribe| F[Trigger3] E --> G[Display Service]
Setps
1. Create Broker Setting
kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
default.topic.partitions: "10"
default.topic.replication.factor: "1"
bootstrap.servers: "kafka.database.svc.cluster.local:9092" #kafka service address
default.topic.config.retention.ms: "3600"
EOF
2. Create Broker
kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
eventing.knative.dev/broker.class: Kafka
name: first-broker
namespace: kserve-test
spec:
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
EOF
deadletterSink:
3. Create Trigger
kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: display-service-trigger
namespace: kserve-test
spec:
broker: first-broker
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
EOF
4. Create Sink Service (Display Message)
kubectl apply -f - <<EOF
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: event-display
namespace: kserve-test
spec:
template:
spec:
containers:
- image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
EOF
5. Test
kubectl run curl-test --image=curlimages/curl -it --rm --restart=Never -- \
-v "http://kafka-broker-ingress.knative-eventing.svc.cluster.local/kserve-test/first-broker" \
-X POST \
-H "Ce-Id: $(date +%s)" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: test.type" \
-H "Ce-Source: curl-test" \
-H "Content-Type: application/json" \
-d '{"test": "Broker is working"}'
6. Check message
kubectl -n kserve-test logs -f deploy/event-display-00001-deployment
Kafka Broker Invoke ISVC
1. Prepare RBAC
- create cluster role to access CRD isvc
kubectl apply -f - <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: kserve-access-for-knative
rules:
- apiGroups: ["serving.kserve.io"]
resources: ["inferenceservices", "inferenceservices/status"]
verbs: ["get", "list", "watch"]
EOF
- create rolebinding and grant privileges
kubectl apply -f - <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: kafka-controller-kserve-access
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: kserve-access-for-knative
subjects:
- kind: ServiceAccount
name: kafka-controller
namespace: knative-eventing
EOF
2. Create Broker Setting
kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
default.topic.partitions: "10"
default.topic.replication.factor: "1"
bootstrap.servers: "kafka.database.svc.cluster.local:9092" #kafka service address
default.topic.config.retention.ms: "3600"
EOF
3. Create Broker
kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
eventing.knative.dev/broker.class: Kafka
name: isvc-broker
namespace: kserve-test
spec:
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
delivery:
deadLetterSink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
EOF
4. Create InferenceService
you can create isvc first-tourchserve
service, by following 🔗link
5. Create Trigger
kubectl apply -f - << EOF
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: kserve-trigger
namespace: kserve-test
spec:
broker: isvc-broker
filter:
attributes:
type: prediction-request
subscriber:
uri: http://first-torchserve.kserve-test.svc.cluster.local/v1/models/mnist:predict
EOF
6. Test
Normally, we can invoke
first-tourchserve
by executing
export MASTER_IP=192.168.100.112
export ISTIO_INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
export SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice first-torchserve -o jsonpath='{.status.url}' | cut -d "/" -f 3)
# http://first-torchserve.kserve-test.example.com
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" "http://${MASTER_IP}:${ISTIO_INGRESS_PORT}/v1/models/mnist:predict" -d @./mnist-input.json
Now, you can access model by executing
export KAFKA_BROKER_INGRESS_PORT=$(kubectl -n knative-eventing get service kafka-broker-ingress -o jsonpath='{.spec.ports[?(@.name=="http-container")].nodePort}')
curl -v "http://${MASTER_IP}:${KAFKA_BROKER_INGRESS_PORT}/kserve-test/isvc-broker" \
-X POST \
-H "Ce-Id: $(date +%s)" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: prediction-request" \
-H "Ce-Source: event-producer" \
-H "Content-Type: application/json" \
-d @./mnist-input.json
Plugin
Subsections of Plugin
Eventing Kafka Broker
Subsections of Eventing Kafka Broker
Prepare Dev Environment
update go -> 1.24
install
ko
-> 1.8.0
go install github.com/google/ko@latest
# wget https://github.com/ko-build/ko/releases/download/v0.18.0/ko_0.18.0_Linux_x86_64.tar.gz
# tar -xzf ko_0.18.0_Linux_x86_64.tar.gz -C /usr/local/bin/ko
# cp /usr/local/bin/ko/ko /root/bin
- protoc
PB_REL="https://github.com/protocolbuffers/protobuf/releases"
curl -LO $PB_REL/download/v30.2/protoc-30.2-linux-x86_64.zip
# mkdir -p ${HOME}/bin/
mkdir -p /usr/local/bin/protoc
unzip protoc-30.2-linux-x86_64.zip -d /usr/local/bin/protoc
cp /usr/local/bin/protoc/bin/protoc /root/bin
# export PATH="$PATH:/root/bin"
rm -rf protoc-30.2-linux-x86_64.zip
- protoc-gen-go -> 1.5.4
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
export GOPATH=/usr/local/go/bin
- copy some code
mkdir -p ${GOPATH}/src/knative.dev
cd ${GOPATH}/src/knative.dev
git clone git@github.com:knative/eventing.git # clone eventing repo
git clone git@github.com:AaronYang0628/eventing-kafka-broker.git
cd eventing-kafka-broker
git remote add upstream https://github.com/knative-extensions/eventing-kafka-broker.git
git remote set-url --push upstream no_push
export KO_DOCKER_REPO=docker-registry.lab.zverse.space/data-and-computing/ay-dev
Build Async Preidction Flow
Flow
flowchart LR A[User Curl] -->|HTTP| B{ISVC-Broker:Kafka} B -->|Subscribe| D[Trigger1] B -->|Subscribe| E[Kserve-Triiger] B -->|Subscribe| F[Trigger3] E --> G[Mnist Service] G --> |Kafka-Sink| B
Setps
1. Create Broker Setting
kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
default.topic.partitions: "10"
default.topic.replication.factor: "1"
bootstrap.servers: "kafka.database.svc.cluster.local:9092" #kafka service address
default.topic.config.retention.ms: "3600"
EOF
2. Create Broker
kubectl apply -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
eventing.knative.dev/broker.class: Kafka
name: isvc-broker
namespace: kserve-test
spec:
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
EOF
3. Create Trigger
kubectl apply -f - << EOF
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: kserve-trigger
namespace: kserve-test
spec:
broker: isvc-broker
filter:
attributes:
type: prediction-request-udf-attr # you can change this
subscriber:
uri: http://prediction-and-sink.kserve-test.svc.cluster.local/v1/models/mnist:predict
EOF
4. Create InferenceService
1kubectl apply -f - <<EOF
2apiVersion: serving.kserve.io/v1beta1
3kind: InferenceService
4metadata:
5 name: prediction-and-sink
6 namespace: kserve-test
7spec:
8 predictor:
9 model:
10 modelFormat:
11 name: pytorch
12 storageUri: gs://kfserving-examples/models/torchserve/image_classifier/v1
13 transformer:
14 containers:
15 - image: docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9
16 name: kserve-container
17 env:
18 - name: KAFKA_BOOTSTRAP_SERVERS
19 value: kafka.database.svc.cluster.local
20 - name: KAFKA_TOPIC
21 value: test-topic # result will be saved in this topic
22 - name: REQUEST_TRACE_KEY
23 value: test-trace-id # using this key to retrieve preidtion result
24 command:
25 - "python"
26 - "-m"
27 - "model"
28 args:
29 - --model_name
30 - mnist
31EOF
[Optional] 5. Invoke InferenceService
- preparation
wget -O ./mnist-input.json https://raw.githubusercontent.com/kserve/kserve/refs/heads/master/docs/samples/v1beta1/torchserve/v1/imgconv/input.json
SERVICE_NAME=prediction-and-sink
MODEL_NAME=mnist
INPUT_PATH=@./mnist-input.json
PLAIN_SERVICE_HOSTNAME=$(kubectl -n kserve-test get inferenceservice $SERVICE_NAME -o jsonpath='{.status.url}' | cut -d "/" -f 3)
- fire!!
export INGRESS_HOST=192.168.100.112
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
curl -v -H "Host: ${PLAIN_SERVICE_HOSTNAME}" -H "Content-Type: application/json" -d $INPUT_PATH http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict
6. Invoke Broker
- preparation
cat > image-with-trace-id.json << EOF
{
"test-trace-id": "16ec3446-48d6-422e-9926-8224853e84a7",
"instances": [
{
"data": "iVBORw0KGgoAAAANSUhEUgAAABwAAAAcCAAAAABXZoBIAAAAw0lEQVR4nGNgGFggVVj4/y8Q2GOR83n+58/fP0DwcSqmpNN7oOTJw6f+/H2pjUU2JCSEk0EWqN0cl828e/FIxvz9/9cCh1zS5z9/G9mwyzl/+PNnKQ45nyNAr9ThMHQ/UG4tDofuB4bQIhz6fIBenMWJQ+7Vn7+zeLCbKXv6z59NOPQVgsIcW4QA9YFi6wNQLrKwsBebW/68DJ388Nun5XFocrqvIFH59+XhBAxThTfeB0r+vP/QHbuDCgr2JmOXoSsAAKK7bU3vISS4AAAAAElFTkSuQmCC"
}
]
}
EOF
- fire!!
export MASTER_IP=192.168.100.112
export KAFKA_BROKER_INGRESS_PORT=$(kubectl -n knative-eventing get service kafka-broker-ingress -o jsonpath='{.spec.ports[?(@.name=="http-container")].nodePort}')
curl -v "http://${MASTER_IP}:${KAFKA_BROKER_INGRESS_PORT}/kserve-test/isvc-broker" \
-X POST \
-H "Ce-Id: $(date +%s)" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: prediction-request-udf-attr" \
-H "Ce-Source: event-producer" \
-H "Content-Type: application/json" \
-d @./image-with-trace-id.json
- check input data in kafka topic
knative-broker-kserve-test-isvc-broker
kubectl -n database exec -it deployment/kafka-client-tools -- bash -c \
'kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --consumer.config $CLIENT_CONFIG_FILE --topic knative-broker-kserve-test-isvc-broker --from-beginning'
- check response result in kafka topic
test-topic
kubectl -n database exec -it deployment/kafka-client-tools -- bash -c \
'kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --consumer.config $CLIENT_CONFIG_FILE --topic test-topic --from-beginning'