Kafka Sink Transformer
AlexNet Inference
More Information about
Custom Transformer
service can be found 🔗link
- Implement Custom Transformer
./model.py
using Kserve API
1import os
2import argparse
3import json
4
5from typing import Dict, Union
6from kafka import KafkaProducer
7from cloudevents.http import CloudEvent
8from cloudevents.conversion import to_structured
9
10from kserve import (
11 Model,
12 ModelServer,
13 model_server,
14 logging,
15 InferRequest,
16 InferResponse,
17)
18
19from kserve.logging import logger
20from kserve.utils.utils import generate_uuid
21
22kafka_producer = KafkaProducer(
23 value_serializer=lambda v: json.dumps(v).encode('utf-8'),
24 bootstrap_servers=os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
25)
26
27class ImageTransformer(Model):
28 def __init__(self, name: str):
29 super().__init__(name, return_response_headers=True)
30 self.ready = True
31
32
33 def preprocess(
34 self, payload: Union[Dict, InferRequest], headers: Dict[str, str] = None
35 ) -> Union[Dict, InferRequest]:
36 logger.info("Received inputs %s", payload)
37 logger.info("Received headers %s", headers)
38 self.request_trace_key = os.environ.get('REQUEST_TRACE_KEY', 'algo.trace.requestId')
39 if self.request_trace_key not in payload:
40 logger.error("Request trace key '%s' not found in payload, you cannot trace the prediction result", self.request_trace_key)
41 if "instances" not in payload:
42 raise ValueError(
43 f"Request trace key '{self.request_trace_key}' not found in payload and 'instances' key is missing."
44 )
45 else:
46 headers[self.request_trace_key] = payload.get(self.request_trace_key)
47
48 return {"instances": payload["instances"]}
49
50 def postprocess(
51 self,
52 infer_response: Union[Dict, InferResponse],
53 headers: Dict[str, str] = None,
54 response_headers: Dict[str, str] = None,
55 ) -> Union[Dict, InferResponse]:
56 logger.info("postprocess headers: %s", headers)
57 logger.info("postprocess response headers: %s", response_headers)
58 logger.info("postprocess response: %s", infer_response)
59
60 attributes = {
61 "source": "data-and-computing/kafka-sink-transformer",
62 "type": "org.zhejianglab.zverse.data-and-computing.kafka-sink-transformer",
63 "request-host": headers.get('host', 'unknown'),
64 "kserve-isvc-name": headers.get('kserve-isvc-name', 'unknown'),
65 "kserve-isvc-namespace": headers.get('kserve-isvc-namespace', 'unknown'),
66 self.request_trace_key: headers.get(self.request_trace_key, 'unknown'),
67 }
68
69 _, cloudevent = to_structured(CloudEvent(attributes, infer_response))
70 try:
71 kafka_producer.send(os.environ.get('KAFKA_TOPIC', 'test-topic'), value=cloudevent.decode('utf-8').replace("'", '"'))
72 kafka_producer.flush()
73 except Exception as e:
74 logger.error("Failed to send message to Kafka: %s", e)
75 return infer_response
76
77parser = argparse.ArgumentParser(parents=[model_server.parser])
78args, _ = parser.parse_known_args()
79
80if __name__ == "__main__":
81 if args.configure_logging:
82 logging.configure_logging(args.log_config_file)
83 logging.logger.info("available model name: %s", args.model_name)
84 logging.logger.info("all args: %s", args.model_name)
85 model = ImageTransformer(args.model_name)
86 ModelServer().start([model])
- modify
./pyproject.toml
[tool.poetry]
name = "custom_transformer"
version = "0.15.2"
description = "Custom Transformer Examples. Not intended for use outside KServe Frameworks Images."
authors = ["Dan Sun <dsun20@bloomberg.net>"]
license = "Apache-2.0"
packages = [
{ include = "*.py" }
]
[tool.poetry.dependencies]
python = ">=3.9,<3.13"
kserve = {path = "../kserve", develop = true}
pillow = "^10.3.0"
kafka-python = "^2.2.15"
cloudevents = "^1.11.1"
[[tool.poetry.source]]
name = "pytorch"
url = "https://download.pytorch.org/whl/cpu"
priority = "explicit"
[tool.poetry.group.test]
optional = true
[tool.poetry.group.test.dependencies]
pytest = "^7.4.4"
mypy = "^0.991"
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
black = { version = "~24.3.0", extras = ["colorama"] }
[tool.poetry-version-plugin]
source = "file"
file_path = "../VERSION"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
- prepare
../custom_transformer.Dockerfile
ARG PYTHON_VERSION=3.11
ARG BASE_IMAGE=python:${PYTHON_VERSION}-slim-bookworm
ARG VENV_PATH=/prod_venv
FROM ${BASE_IMAGE} AS builder
# Install Poetry
ARG POETRY_HOME=/opt/poetry
ARG POETRY_VERSION=1.8.3
RUN python3 -m venv ${POETRY_HOME} && ${POETRY_HOME}/bin/pip install poetry==${POETRY_VERSION}
ENV PATH="$PATH:${POETRY_HOME}/bin"
# Activate virtual env
ARG VENV_PATH
ENV VIRTUAL_ENV=${VENV_PATH}
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
COPY kserve/pyproject.toml kserve/poetry.lock kserve/
RUN cd kserve && poetry install --no-root --no-interaction --no-cache
COPY kserve kserve
RUN cd kserve && poetry install --no-interaction --no-cache
COPY custom_transformer/pyproject.toml custom_transformer/poetry.lock custom_transformer/
RUN cd custom_transformer && poetry install --no-root --no-interaction --no-cache
COPY custom_transformer custom_transformer
RUN cd custom_transformer && poetry install --no-interaction --no-cache
FROM ${BASE_IMAGE} AS prod
COPY third_party third_party
# Activate virtual env
ARG VENV_PATH
ENV VIRTUAL_ENV=${VENV_PATH}
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
RUN useradd kserve -m -u 1000 -d /home/kserve
COPY --from=builder --chown=kserve:kserve $VIRTUAL_ENV $VIRTUAL_ENV
COPY --from=builder kserve kserve
COPY --from=builder custom_transformer custom_transformer
USER 1000
ENTRYPOINT ["python", "-m", "custom_transformer.model"]
- regenerate poetry.lock
poetry lock --no-update
- build and push custom docker image
cd python
podman build -t docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9 -f custom_transformer.Dockerfile .
podman push docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9