Artificial Intelligence

Code implementation of real-time memory sensor alert pipeline in Google Colab using FastStream, RabbitMQ, TestRabbitBiterker, Pydantic

In this notebook, we demonstrate how to build a fully memory “sensor alert” pipeline in Google Colab using FastStream, high performance, Python-Native-native stream processing framework, and how it integrates with RabbitMQ. By leveraging faststream.rabbit’s rabbit and testrabbitbiterker, we simulated the message broker without the need for an external infrastructure. We curated four different stages: ingestion and verification, standardization, monitoring and alert generation, and archive, each stage defined as Pydantic Models (Rawsensordata, ArmoryData, AlertData) to ensure data quality and type security. Under the hood, Python’s Asyncio capability is asynchronous message flowing, while Nest_asyncio can enable nested event loops in Colab. We also employed a standard logging module for traceable pipeline execution and pandas for final result checking, making it easy to visualize archived alerts in data frames.

!pip install -q faststream[rabbit] nest_asyncio

We installed FastStream and its RabbitMQ integration, providing the core stream processing framework and broker connector, as well as the Nest_Asyncio package, which can be nested in an Asyncio event loop in environments like Colab. This is achieved while using the -Q flag to keep the minimum value.

import nest_asyncio, asyncio, logging
nest_asyncio.apply()

We import the nest_asyncio, asyncio and logging modules and then apply nest_asyncio.apply() to patch Python’s event loop so that you can run nested asynchronous tasks in an environment inside COLAB or JUPYTER NOTESONCE. Logging import requests allow you to start the pipeline using detailed runtime logs.

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("sensor_pipeline")

We configure Python’s built-in logging to emit info -In logvel (and above) messages prefixed with timestamps and severity, and then create a dedicated logger called “sensor_pipeline” for emitting structured logs in a streaming pipeline.

from faststream import FastStream
from faststream.rabbit import RabbitBroker, TestRabbitBroker
from pydantic import BaseModel, Field, validator
import pandas as pd
from typing import List

We bring the core FastStream course to the RabbitMQ connector (Rabbitbbitter for real brokers and TestRabbitbitbbit Broker for memory testing), Basemodel for Pydantic, field and validator for declarative data verification, PANDAS pandas for tagular result verification and Python list types for annotation of our internal libraries and.

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app    = FastStream(broker)

We instantiate the rabbit broker that points to the (local) RabbitMQ server using the AMQP URL, and then create a FastStream application bound to that broker, setting up the messaging backbone for your pipeline stage.

class RawSensorData(BaseModel):
    sensor_id: str       = Field(..., examples=["sensor_1"])
    reading_celsius: float = Field(..., ge=-50, le=150, examples=[23.5])
   
    @validator("sensor_id")
    def must_start_with_sensor(cls, v):
        if not v.startswith("sensor_"):
            raise ValueError("sensor_id must start with 'sensor_'")
        return v


class NormalizedData(BaseModel):
    sensor_id: str
    reading_kelvin: float


class AlertData(BaseModel):
    sensor_id: str
    reading_kelvin: float
    alert: bool

These pydantic models define the architecture for each stage: RAWSENSORDATA performs input validity (e.g., reading range and sensor_prefix), standardizes Data converts Celsius to Kelvin, and AlertData encapsulates the final alert payload (including boolean values) (including boolean flags), ensuring type data traffic throughout the pipeline.

archive: List[AlertData] = []


@broker.subscriber("sensor_input")
@broker.publisher("normalized_input")
async def ingest_and_validate(raw: RawSensorData) -> dict:
    logger.info(f"Ingested raw data: {raw.json()}")
    return raw.dict()


@broker.subscriber("normalized_input")
@broker.publisher("sensor_alert")
async def normalize(data: dict) -> dict:
    norm = NormalizedData(
        sensor_id=data["sensor_id"],
        reading_kelvin=data["reading_celsius"] + 273.15
    )
    logger.info(f"Normalized to Kelvin: {norm.json()}")
    return norm.dict()


ALERT_THRESHOLD_K = 323.15  
   
@broker.subscriber("sensor_alert")
@broker.publisher("archive_topic")
async def monitor(data: dict) -> dict:
    alert_flag = data["reading_kelvin"] > ALERT_THRESHOLD_K
    alert = AlertData(
        sensor_id=data["sensor_id"],
        reading_kelvin=data["reading_kelvin"],
        alert=alert_flag
    )
    logger.info(f"Monitor result: {alert.json()}")
    return alert.dict()


@broker.subscriber("archive_topic")
async def archive_data(payload: dict):
    rec = AlertData(**payload)
    archive.append(rec)
    logger.info(f"Archived: {rec.json()}")

The memory archive list collects all finalized alerts, while the four asynchronous functions are connected via @broker.subscriber/ @broker.publisher to form the pipeline stage. These features ingest and validate the original sensor input, convert Celsius to Kelvin, check the alarm threshold, and finally archive each AlertData record, sending the logs in each step to obtain the full Traceability.

async def main():
    readings = [
        {"sensor_id": "sensor_1", "reading_celsius": 45.2},
        {"sensor_id": "sensor_2", "reading_celsius": 75.1},
        {"sensor_id": "sensor_3", "reading_celsius": 50.0},
    ]
    async with TestRabbitBroker(broker) as tb:
        for r in readings:
            await tb.publish(r, "sensor_input")
        await asyncio.sleep(0.1)
       
    df = pd.DataFrame([a.dict() for a in archive])
    print("nFinal Archived Alerts:")
    display(df)


asyncio.run(main())

Finally, the main Coroutine publishes a set of sample sensor readings into the test boundary bit sub in memory, pausing briefly to allow each pipeline stage to run, and then summing the resulting Alertdata records from the archive into a PANDAS DataFrame to simplify displaying and validating the end-to-end port alert flow. Finally, asyncio.run(main()) initiates the entire asynchronous demo in COLAB.

In short, the tutorial demonstrates how FastStream can be used in conjunction with TestRabBitBroker with RabbitMQ abstraction and memory testing through testRabbitBroker, which can accelerate the development of real-time data pipelines without the overhead of deploying external brokers. With Pydantic processing architecture validation, asynchronous management of concurrency, and pandas to enable fast data analysis, this mode provides a strong foundation for sensor monitoring, ETL tasks or event-driven workflows. You can seamlessly transition to production by exchanging the field broker URL (RabbitMQ, Kafka, Nats or redis) and running the fast process under Uvicorn or your favorite ASGI server in any Python environment.


This is COLAB notebook. Also, don’t forget to follow us twitter And join us Telegram Channel and LinkedIn GrOUP. Don’t forget to join us 90K+ ml reddit.

🔥 [Register Now] Minicon Agesic AI Virtual Conference: Free Registration + Certificate of Attendance + 4-hour Short Event (May 21, 9am-1pm) + Hands-On the Workshop


Sana Hassan, a consulting intern at Marktechpost and a dual-degree student at IIT Madras, is passionate about applying technology and AI to address real-world challenges. He is very interested in solving practical problems, and he brings a new perspective to the intersection of AI and real-life solutions.

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button