Building a RESTful API for Azure Cosmos DB using FastAPI and Python
A simple API built using FastAPI that allows the user to load data into CosmosDb
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from azure.cosmos import CosmosClient, exceptions
app = FastAPI()
class Item(BaseModel):
id: str
data: dict
# Set up the Cosmos DB connection
cosmosdb_url = "YOUR_COSMOS_DB_URL"
cosmosdb_key = "YOUR_COSMOS_DB_KEY"
cosmosdb_database = "YOUR_COSMOS_DB_DATABASE"
cosmosdb_container = "YOUR_COSMOS_DB_CONTAINER"
client = CosmosClient(cosmosdb_url, cosmosdb_key)
database = client.get_database_client(cosmosdb_database)
container = database.get_container_client(cosmosdb_container)
@app.post("/items")
async def add_item(item: Item):
try:
new_item = container.create_item(item.dict())
return {"status": "success", "item": new_item}
except exceptions.CosmosError as e:
raise HTTPException(status_code=500, detail=str(e))
@app.put("/items/{item_id}")
async def update_item(item_id: str, item: Item):
try:
current_item = container.read_item(item_id, partition_key=item_id)
current_item.update(item.data)
updated_item = container.replace_item(current_item, current_item)
return {"status": "success", "item": updated_item}
except exceptions.CosmosError as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/items/{item_id}")
async def delete_item(item_id: str):
try:
container.delete_item(item_id, partition_key=item_id)
return {"status": "success", "item_id": item_id}
except exceptions.CosmosError as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)