Files
ledger/main.py
Bernard Siebens 78872fd4d6 Add ledger management service and API integration
- Introduced `LedgerService` to handle ledger file operations, queries, and snapshots.
- Added API routes for journal entry retrieval, account balances, and custom queries using FastAPI.
- Updated project dependencies to include `ruff` for linting, along with its configuration.
- Integrated `lifespan` for managing the lifecycle of `LedgerService`.
2026-01-24 21:45:19 +01:00

79 lines
1.8 KiB
Python

import datetime
import os
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from services.ledger import LedgerService
def _ledger_path() -> Path:
env_path = os.getenv("LEDGER_FILE")
if env_path:
return Path(env_path)
return Path.cwd() / "ledger.beancount"
class QueryRequest(BaseModel):
query: str = Field(min_length=1)
@asynccontextmanager
async def lifespan(application: FastAPI):
ledger_path = _ledger_path()
service = LedgerService(ledger_path)
application.state.ledger_service = service
await service.start()
try:
yield
finally:
await service.stop()
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def root():
return {"message": "Ledger API"}
@app.get("/status")
async def status():
service: LedgerService = app.state.ledger_service
snapshot = await service.snapshot()
return {
"ledger_path": str(service.ledger_path),
"last_loaded_at": snapshot.last_loaded_at.isoformat() + "Z",
"error_count": len(snapshot.errors),
"errors": [str(err) for err in snapshot.errors],
}
@app.get("/accounts")
async def accounts(on_date: datetime.date | None = None):
service: LedgerService = app.state.ledger_service
return await service.account_balances(on_date)
@app.get("/journal")
async def journal(limit: int | None = None):
service: LedgerService = app.state.ledger_service
return await service.journal_entries(limit)
@app.post("/query")
async def query(payload: QueryRequest):
service: LedgerService = app.state.ledger_service
try:
return await service.run_query(payload.query)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc