Building a RESTful API for Azure Cosmos DB using FastAPI and Python

A simple API built using FastAPI that allows the user to load data into CosmosDb

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from azure.cosmos import CosmosClient, exceptions

app = FastAPI()

class Item(BaseModel):
    id: str
    data: dict

# Set up the Cosmos DB connection
cosmosdb_url = "YOUR_COSMOS_DB_URL"
cosmosdb_key = "YOUR_COSMOS_DB_KEY"
cosmosdb_database = "YOUR_COSMOS_DB_DATABASE"
cosmosdb_container = "YOUR_COSMOS_DB_CONTAINER"

client = CosmosClient(cosmosdb_url, cosmosdb_key)
database = client.get_database_client(cosmosdb_database)
container = database.get_container_client(cosmosdb_container)

@app.post("/items")
async def add_item(item: Item):
    try:
        new_item = container.create_item(item.dict())
        return {"status": "success", "item": new_item}
    except exceptions.CosmosError as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.put("/items/{item_id}")
async def update_item(item_id: str, item: Item):
    try:
        current_item = container.read_item(item_id, partition_key=item_id)
        current_item.update(item.data)
        updated_item = container.replace_item(current_item, current_item)
        return {"status": "success", "item": updated_item}
    except exceptions.CosmosError as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.delete("/items/{item_id}")
async def delete_item(item_id: str):
    try:
        container.delete_item(item_id, partition_key=item_id)
        return {"status": "success", "item_id": item_id}
    except exceptions.CosmosError as e:
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)