Kafka Sink Transformer

AlexNet Inference

More Information about Custom Transformer service can be found 🔗link

  1. Implement Custom Transformer ./model.py using Kserve API
 1import os
 2import argparse
 3import json
 4
 5from typing import Dict, Union
 6from kafka import KafkaProducer
 7from cloudevents.http import CloudEvent
 8from cloudevents.conversion import to_structured
 9
10from kserve import (
11    Model,
12    ModelServer,
13    model_server,
14    logging,
15    InferRequest,
16    InferResponse,
17)
18
19from kserve.logging import logger
20from kserve.utils.utils import generate_uuid
21
22kafka_producer = KafkaProducer(
23    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
24    bootstrap_servers=os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
25)
26
27class ImageTransformer(Model):
28    def __init__(self, name: str):
29        super().__init__(name, return_response_headers=True)
30        self.ready = True
31
32
33    def preprocess(
34        self, payload: Union[Dict, InferRequest], headers: Dict[str, str] = None
35    ) -> Union[Dict, InferRequest]:
36        logger.info("Received inputs %s", payload)
37        logger.info("Received headers %s", headers)
38        self.request_trace_key = os.environ.get('REQUEST_TRACE_KEY', 'algo.trace.requestId')
39        if self.request_trace_key not in payload:
40            logger.error("Request trace key '%s' not found in payload, you cannot trace the prediction result", self.request_trace_key)
41            if "instances" not in payload:
42                raise ValueError(
43                    f"Request trace key '{self.request_trace_key}' not found in payload and 'instances' key is missing."
44                )
45        else:
46            headers[self.request_trace_key] = payload.get(self.request_trace_key)
47   
48        return {"instances": payload["instances"]}
49
50    def postprocess(
51        self,
52        infer_response: Union[Dict, InferResponse],
53        headers: Dict[str, str] = None,
54        response_headers: Dict[str, str] = None,
55    ) -> Union[Dict, InferResponse]:
56        logger.info("postprocess headers: %s", headers)
57        logger.info("postprocess response headers: %s", response_headers)
58        logger.info("postprocess response: %s", infer_response)
59
60        attributes = {
61            "source": "data-and-computing/kafka-sink-transformer",
62            "type": "org.zhejianglab.zverse.data-and-computing.kafka-sink-transformer",
63            "request-host": headers.get('host', 'unknown'),
64            "kserve-isvc-name": headers.get('kserve-isvc-name', 'unknown'),
65            "kserve-isvc-namespace": headers.get('kserve-isvc-namespace', 'unknown'),
66            self.request_trace_key: headers.get(self.request_trace_key, 'unknown'),
67        }
68
69        _, cloudevent = to_structured(CloudEvent(attributes, infer_response))
70        try:
71            kafka_producer.send(os.environ.get('KAFKA_TOPIC', 'test-topic'), value=cloudevent.decode('utf-8').replace("'", '"'))
72            kafka_producer.flush()
73        except Exception as e:
74            logger.error("Failed to send message to Kafka: %s", e)
75        return infer_response
76
77parser = argparse.ArgumentParser(parents=[model_server.parser])
78args, _ = parser.parse_known_args()
79
80if __name__ == "__main__":
81    if args.configure_logging:
82        logging.configure_logging(args.log_config_file)
83    logging.logger.info("available model name: %s", args.model_name)
84    logging.logger.info("all args: %s", args.model_name)
85    model = ImageTransformer(args.model_name)
86    ModelServer().start([model])
  1. modify ./pyproject.toml
[tool.poetry]
name = "custom_transformer"
version = "0.15.2"
description = "Custom Transformer Examples. Not intended for use outside KServe Frameworks Images."
authors = ["Dan Sun <dsun20@bloomberg.net>"]
license = "Apache-2.0"
packages = [
    { include = "*.py" }
]

[tool.poetry.dependencies]
python = ">=3.9,<3.13"
kserve = {path = "../kserve", develop = true}
pillow = "^10.3.0"
kafka-python = "^2.2.15"
cloudevents = "^1.11.1"

[[tool.poetry.source]]
name = "pytorch"
url = "https://download.pytorch.org/whl/cpu"
priority = "explicit"

[tool.poetry.group.test]
optional = true

[tool.poetry.group.test.dependencies]
pytest = "^7.4.4"
mypy = "^0.991"

[tool.poetry.group.dev]
optional = true

[tool.poetry.group.dev.dependencies]
black = { version = "~24.3.0", extras = ["colorama"] }

[tool.poetry-version-plugin]
source = "file"
file_path = "../VERSION"

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
  1. prepare ../custom_transformer.Dockerfile
ARG PYTHON_VERSION=3.11
ARG BASE_IMAGE=python:${PYTHON_VERSION}-slim-bookworm
ARG VENV_PATH=/prod_venv

FROM ${BASE_IMAGE} AS builder

# Install Poetry
ARG POETRY_HOME=/opt/poetry
ARG POETRY_VERSION=1.8.3

RUN python3 -m venv ${POETRY_HOME} && ${POETRY_HOME}/bin/pip install poetry==${POETRY_VERSION}
ENV PATH="$PATH:${POETRY_HOME}/bin"

# Activate virtual env
ARG VENV_PATH
ENV VIRTUAL_ENV=${VENV_PATH}
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

COPY kserve/pyproject.toml kserve/poetry.lock kserve/
RUN cd kserve && poetry install --no-root --no-interaction --no-cache
COPY kserve kserve
RUN cd kserve && poetry install --no-interaction --no-cache

COPY custom_transformer/pyproject.toml custom_transformer/poetry.lock custom_transformer/
RUN cd custom_transformer && poetry install --no-root --no-interaction --no-cache
COPY custom_transformer custom_transformer
RUN cd custom_transformer && poetry install --no-interaction --no-cache


FROM ${BASE_IMAGE} AS prod

COPY third_party third_party

# Activate virtual env
ARG VENV_PATH
ENV VIRTUAL_ENV=${VENV_PATH}
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

RUN useradd kserve -m -u 1000 -d /home/kserve

COPY --from=builder --chown=kserve:kserve $VIRTUAL_ENV $VIRTUAL_ENV
COPY --from=builder kserve kserve
COPY --from=builder custom_transformer custom_transformer

USER 1000
ENTRYPOINT ["python", "-m", "custom_transformer.model"]
  1. regenerate poetry.lock
poetry lock --no-update
  1. build and push custom docker image
cd python
podman build -t docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9 -f custom_transformer.Dockerfile .

podman push docker-registry.lab.zverse.space/data-and-computing/ay-dev/msg-transformer:dev9