Skip to content

๐Ÿ–– Middlewares

A "*middleware"* (in common meaning) is a function that works with every request before it is processed by a message handler. And also with every response before returning it.

Panini middleware works similar to Django or Flask-like frameworks, and also offers to use middleware for sending messages.

Example of an app with such middleware:

import time
from panini import app as panini_app
from panini.middleware import Middleware

app = panini_app.App(
    service_name="another_panini_app",
    host="127.0.0.1",
    port=4222,
)

class ProcessTimeMiddleware(Middleware):
    async def send_request(self, subject: str, message, request_func, *args, **kwargs):
        """
        measure request processing time and add to a response
        """
        start_time = time.time()
        response = await request_func(subject, message, *args, **kwargs)
        process_time = time.time() - start_time
        response["process-time"] = str(process_time)
        return response

@app.task(interval=1)
async def publish_periodically():
    message = {"request_type":"Bitcoin price"}
    result = await app.request(subject="some.request.subject", message=message)
    # received response: {"bitcoin-price": 100000, "process-time": '0.0051320'}
    print(f"requested Bitcoin price, result: {result}")

@app.listen("some.publish.subject")
async def receive_messages(msg):
    print("received request for Bitcoin price")
    bitcoin_price = 100000
    # response to send: {"bitcoin-price": 100000}
    return {"bitcoin-price": bitcoin_price}

if __name__ == "__main__":
    app.add_middleware(ProcessTimeMiddleware)
    app.start()

As you can see ProcessTimeMiddleware measures the request time and adds it to the response in the process-time field.

Of course, in real tasks, the listener will be in another microservice. We put the task and the listener in one microservice to simplify this example.

Middleware in Detail

Panini has two core operations for communication between microservices: send and listen. Using Panini Middleware, you can apply modifications to both of these operations before and after they are called.

How it works:

  1. It takes each operation in your application.
  2. It can then do something to that operation or run any needed code (before the operation is called).
  3. It passes the operation to be processed by the rest of the application.
  4. It then takes the response generated by the application.
  5. It can do something to that response or run any needed code (after an operation is called).
  6. It returns the response.

Middleware Interface

The basic interface of Panini Middleware looks like this:

class Middleware:
        def __init__(self, *args, **kwargs):
        pass

    async def send_publish(self, subject: str, message, publish_func, *args, **kwargs):
        """
        :param subject: str
        :param message: any of supported types
        :param publish_func: Callable for publish
        :return: None
        """

    async def listen_publish(self, msg, callback):
        """
        :param msg: Msg
        :param callback: Callable, that will be called on receive message
        :return: None
        """

    async def send_request(self, subject: str, message, request_func, *args, **kwargs):
        """
        :param subject: str
        :param message: any of supported types
        :param request_func: Callable for request
        :return: any of supported types
        """

    async def listen_request(self, msg, callback):
        """
        :param msg: Msg
        :param callback: Callable, that will be called on receive message
        :return: any of supported types
        """

        # and composed functions for better user experience:
        async def send_any(self, subject: str, message, send_func, *args, **kwargs):
        """
        :param subject: str
        :param message: any of supported types
        :param send_func: Callable for send
        :return: None or any of supported types
        """

    async def listen_any(self, msg, callback):
        """
        :param msg: Msg
        :param callback: Callable, that will be called on receive message
        :return: None or any of supported types
        """

Note that send_any will do the job for send_request and send_publish, if they are not implemented, but won't override them if they exist. The same applies for listen_any.

Examples

ProcessTimeMiddleware

Let's look at the first example in this section. This middleware adds a process-time parameter to the response. The process-time parameter contains the time in seconds it takes to send and process the request. The send_request function receives:

  • The subject and message, as in a common request function.
  • A function request_func that receives the subject, message, and args/kwargs.
    • This function sends a request to the corresponding subject with the provided message.
    • This function also calls any other middlewares (if they exist) inside the app.
    • Then, it returns the response before returning it.
  • You can further modify the response before returning it.
import time

from panini.middleware import Middleware

class ProcessTimeMiddleware(Middleware):
    async def send_request(self, subject: str, message, request_func, *args, **kwargs):
        start_time = time.time()
        response = await request_func(subject, message, *args, **kwargs)
        process_time = time.time() - start_time
        response["process-time"] = str(process_time)
        return response

To register the middleware, add ProcessTimeMiddleware before app.start():

app.add_middleware(ProcessTimeMiddleware)

๐Ÿ’ก

Please note that you should only use the async interface for creating Middlewares, as written in the examples. The synchronous interface does not support middlewares. However, add_middleware is called synchronously, as in the example above.

TestingMiddleware

Imagine you want to easily switch to test mode in your application, which means:

  • You want to always send it to different subjects (the same subject, but with a test prefix)
  • You want to remove some meaningful data from requests & responses
from panini.middleware import Middleware

class TestingMiddleware(Middleware):
    def __init__(self, meaningful_key):
        self.meaningful_key = meaningful_key

    async def send_any(self, subject: str, message, send_func, *args, **kwargs):
                subject = "test." + subject
        if self.meaningful_key in message:
            del message[self.meaningful_key]

        response = await send_func(subject, message, *args, **kwargs)

        if self.meaningful_key in response:
            del response[self.meaningful_key]
        return response

    async def listen_any(self, msg, callback):
        if self.meaningful_key in msg.data:
            del msg.data[self.meaningful_key]

        response = await callback(msg)

        if self.meaningful_key in response:
            del response[self.meaningful_key]
        return response

Then, add the TestingMiddleware to your application and specify the meaningful_key parameter like this:

app.add_middleware(TestingMiddleware, "meaningful_key")

๐Ÿ’ก

Please note that you should only use the async interface for creating Middlewares, as written in the examples. The synchronous interface does not support middleware. However, add_middleware is called synchronously, as in the example above.

Built-in Middlewares

You can use some built-in middlewares for common cases that are already implemented in Panini:

  • ErrorMiddleware - calls the callback when an error occurs
  • NATSTimeoutMiddleware - logs the NATS timeout
  • PrometheusMonitoringMiddleware - measures the performance of the app; for more details, see here

Hierarchical Middlewares

If you want to create hierarchical middlewares with more than one inheritance, please recall the methods from your base middleware:

class FooMiddleware(Middleware):
    async def send_publish(self, subject: str, message, publish_func, *args, **kwargs):
        print("In Foo Middleware: publish")
        await publish_func(subject, message, *args, **kwargs)

class BarMiddleware(FooMiddleware):
    async def send_request(self, subject: str, message, request_func, *args, **kwargs):
        print("In Bar Middleware: request")
                return await request_func(subject, message, *args, **kwargs)

    async def send_publish(self, subject: str, message, publish_func, *args, **kwargs):
        return await super(BarMiddleware, self).send_publish(subject, message, *args, **kwargs)