Skip to content

๐Ÿ‘ฎโ€โ™‚๏ธ Incoming data serialization

Note: Since Panini v0.8.0 internal validator has been removed, if you need incoming message validation, we recommend using dataclasses.

Ensuring reliable communication between microservices is crucial for a system's reliability. To help with this, Panini v0.8.0 introduced dataclasses as a way to validate incoming messages.

Panini uses dataclasses to check all fields and data types of values for incoming JSON messages. Weโ€™ve tested it mostly with Pydantic dataclasses and also with Mashumaro.

To illustrate this, here is an example of a Panini application with dataclasses used to serialize incoming messages:

import asyncio
from panini import app as panini_app
from pydantic import BaseModel

papp = panini_app.App(
    service_name="serializers",
    host="127.0.0.1",
    port=4222,
)

log = papp.logger


class SubTestData(BaseModel):
    subkey1: str
    subkey2: int

class TestMessage(BaseModel):
    key1: str
    key2: int
    key3: float
    key4: list
    key5: dict
    key6: SubTestData
    key7: int = None
    key8: int = None


message = {
    "key1": "value1",
    "key2": 2,
    "key3": 3.0,
    "key4": [1, 2, 3, 4],
    "key5": {"1": 1, "2": 2, "3": 3, "4": 4, "5": 5},
    "key6": {"subkey1": "1", "subkey2": 2, "3": 3, "4": 4, "5": 5},
    "key7": None,
}

@papp.task()
async def publish_dataclass():
    for _ in range(10):
        message_dataclass = TestMessage(**message)
        await papp.publish(
            subject="some.publish.subject",
            message=message_dataclass
        )

@papp.listen("some.publish.subject", data_type=TestMessage)
async def receive_dataclass(msg):
    log.info(f"got message {msg.data}")
    await asyncio.sleep(1)


if __name__ == "__main__":
    papp.start()

For more information, read the Pydantic validation documentation.

Experimental: Custom incoming message serialization

Panini also supports any Callable object as a data_type for custom processing. Here is an example of how to use it:

from panini.exceptions import MessageSchemaError
from panini import app as panini_app

app = panini_app.App(
    service_name="test_serializer_callable",
    host="127.0.0.1",
    port=4222,
)


def callable_validator(message):
    if type(message) is not dict:
        raise MessageSchemaError("type(data) is not dict")
    if "data" not in message:
        raise MessageSchemaError("'data' not in message")
    if type(message["data"]) is not int:
        raise MessageSchemaError("type(message['data']) is not int")
    if message["data"] < 0:
        raise MessageSchemaError(f"Value of field 'data' is {message['data']} that negative")
    message["data"] += 1
    return message


@app.listen("test_validator.foo", data_type=callable_validator)
async def publish(msg):
    return {"success": True}


@app.listen("test_validator.foo-with-error-cb", data_type=callable_validator)
async def publish(msg):
    return {"success": True}


@app.listen("test_validator.check")
async def check(msg):
    try:
        message = callable_validator(**msg.data)
    except MessageSchemaError:
        return {"success": False}

    return {"success": True}


if __name__ == "__main__":
    app.start()

Using a custom data_type to process messages is an experimental feature, so please use it with caution.