Skip to content

implement mongo streams to monitor new jobs #8

@WolfgangWenzel

Description

@WolfgangWenzel

import asyncio
from odmantic import AIOEngine, Model
from motor.motor_asyncio import AsyncIOMotorClient

class Item(Model):
name: str

async def main():
client = AsyncIOMotorClient("mongodb://localhost:27017/?replicaSet=rs0")
engine = AIOEngine(motor_client=client, database="mydb")

# Get the underlying Motor collection for your model
coll = engine.get_collection(Item)

pipeline = [
    {"$match": {"operationType": {"$in": ["insert", "update", "replace", "delete"]}}}
]

# full_document="updateLookup" fetches the post-update document for updates
async with coll.watch(pipeline, full_document="updateLookup") as stream:
    async for change in stream:
        print("op:", change["operationType"], "key:", change["documentKey"])

        # If you want the updated document (for insert/replace and updateLookup):
        doc = change.get("fullDocument")
        if doc:
            # Turn the raw dict into an ODMantic model instance
            item = Item.model_validate(doc)
            print("as model:", item)

asyncio.run(main())

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions