import asyncio import datetime from dataclasses import dataclass from pathlib import Path from typing import Any, Iterable from beancount import loader from beancount.core import amount, convert, data, inventory from beancount.ops import summarize from beanquery import query as bql def _amount_to_dict(amt: amount.Amount) -> dict[str, Any]: return {"number": str(amt.number), "currency": amt.currency} def _inventory_to_list(inv: inventory.Inventory) -> list[dict[str, str]]: units_inv = inv.reduce(convert.get_units) return [ _amount_to_dict(position.units) for position in sorted(units_inv, key=lambda p: p.units.currency) ] def _date_to_str(value: datetime.date | None = None) -> str | None: return value.isoformat() if value else None def _entry_base(entry: data.Directive) -> dict[str, Any]: return { "type": entry.__class__.__name__, "date": _date_to_str(entry.date), "meta": dict(entry.meta or {}), } def _posting_to_dict(posting: data.Posting) -> dict[str, Any]: cost = None if posting.cost: cost = { "number": str(posting.cost.number), "currency": posting.cost.currency, "date": _date_to_str(posting.cost.date), "label": posting.cost.label, } return { "account": posting.account, "units": _amount_to_dict(posting.units) if posting.units else None, "cost": cost, "price": _amount_to_dict(posting.price) if posting.price else None, "flag": posting.flag, "meta": dict(posting.meta or {}), } def entry_to_dict(entry: data.Directive) -> dict[str, Any]: """ Converts a financial entry object to its dictionary representation. This function handles various types of entries (e.g., Transaction, Balance, Open, Close, Note, Document, Price, and Event) by extracting relevant attributes and formatting them appropriately. The dictionary returned serves as a structured representation of the entry. :param entry: The financial entry object to be converted. It can be an instance of the following types: - data.Transaction: Represents a financial transaction with attributes like flag, payee, narration, tags, links, and postings. - data.Balance: Represents a balance directive with attributes like account and amount. - data.Open: Represents an account opening directive with attributes such as account, currencies, and booking. - data.Close: Represents an account closing directive with attributes like account. - data.Note: Represents an account note with attributes like account and comment. - data.Document: Represents a document associated with an account, including account and filename attributes. - data.Price: Represents a price directive with attributes such as currency and amount. - data.Event: Represents an event directive with attributes like type and description. :type entry: data.Directive :return: A dictionary containing the extracted and formatted attributes of the input entry, structured according to its type. :rtype: dict[str, Any] """ base = _entry_base(entry) if isinstance(entry, data.Transaction): base.update( { "flag": entry.flag, "payee": entry.payee, "narration": entry.narration, "tags": entry.tags, "links": entry.links, "postings": [_posting_to_dict(posting) for posting in entry.postings], } ) elif isinstance(entry, data.Balance): base.update({"account": entry.account, "amount": _amount_to_dict(entry.amount)}) elif isinstance(entry, data.Open): base.update( { "account": entry.account, "currencies": sorted(entry.currencies or []), "booking": str(entry.booking) if entry.booking else None, } ) elif isinstance(entry, data.Close): base.update({"account": entry.account}) elif isinstance(entry, data.Note): base.update({"account": entry.account, "comment": entry.comment}) elif isinstance(entry, data.Document): base.update({"account": entry.account, "filename": entry.filename}) elif isinstance(entry, data.Price): base.update( {"currency": entry.currency, "amount": _amount_to_dict(entry.amount)} ) elif isinstance(entry, data.Event): base.update({"type": entry.type, "description": entry.description}) return base def query_to_dict( types: Iterable[Any], rows: Iterable[Iterable[Any]] ) -> dict[str, Any]: """ Converts the given types and rows into a dictionary with column names and their corresponding row data. This function takes types to determine the column names and rows to populate the data for each column. Conversion is applied for specific data types if identified. :param types: Iterable of column type descriptors. Each descriptor could be a tuple where the first element represents the column name or a string representing the column name directly. :param rows: Iterable of rows, where each row is itself an iterable of values. Each row corresponds to the data for one record, with each value mapped to a corresponding column name. :return: A dictionary with two keys: "columns" and "rows". - The "columns" key maps to an iterable of column names as strings. - The "rows" key maps to a list of dictionaries, where each dictionary represents a row of values, with keys corresponding to column names. :rtype: dict[str, Any] """ column_names = [] for entry in types: if isinstance(entry, tuple) and entry: column_names.append(entry[0]) else: column_names.append(str(entry)) result_rows = [] for row in rows: row_dict = {} for name, value in zip(column_names, row): if isinstance(value, amount.Amount): row_dict[name] = _amount_to_dict(value) elif isinstance(value, inventory.Inventory): row_dict[name] = _inventory_to_list(value) elif isinstance(value, datetime.date): row_dict[name] = value.isoformat() else: row_dict[name] = value result_rows.append(row_dict) return {"columns": column_names, "rows": result_rows} @dataclass class LedgerSnapshot: entries: list[data.Directive] options_map: dict[str, Any] errors: list[data.BeancountError] last_loaded_at: datetime.datetime mtime: float class LedgerService: def __init__(self, ledger_path: Path, reload_interval: float = 2.0) -> None: self.ledger_path = ledger_path self.reload_interval = reload_interval self._lock = asyncio.Lock() self._snapshot: LedgerSnapshot | None = None self._stop_event = asyncio.Event() async def start(self) -> None: await self.refresh(force=True) asyncio.create_task(self._refresh_loop()) async def stop(self) -> None: self._stop_event.set() async def _refresh_loop(self) -> None: while not self._stop_event.is_set(): await self.refresh(force=False) try: await asyncio.wait_for( self._stop_event.wait(), timeout=self.reload_interval ) except asyncio.TimeoutError: continue async def refresh(self, force: bool = False) -> None: """ Asynchronously refreshes the ledger data by checking for updates on the file's last modified time and reloading the file data if necessary. A force-refresh option is available to reload the data irrespective of modifications to the file. :param force: A flag indicating whether to force a refresh, even if no updates are present in the file. Defaults to False. :return: None """ async with self._lock: mtime = self._get_mtime() if not force and self._snapshot and mtime == self._snapshot.mtime: return entries, errors, options_map = await asyncio.to_thread( loader.load_file, str(self.ledger_path) ) self._snapshot = LedgerSnapshot( entries, options_map, errors, last_loaded_at=datetime.datetime.now(datetime.UTC), mtime=mtime, ) async def snapshot(self) -> LedgerSnapshot: """ Captures the current state of the ledger as a snapshot. The snapshot is created after refreshing the ledger to ensure the data is up-to-date. If the snapshot is not available or has not been loaded, an exception will be raised. :raises RuntimeError: If the ledger snapshot is not loaded yet. :return: A ledger snapshot representing the current state. :rtype: LedgerSnapshot """ await self.refresh() if not self._snapshot: raise RuntimeError("Ledger snapshot not loaded yet") return self._snapshot async def run_query(self, query: str) -> dict[str, Any]: """ Executes a Beancount Query Language (BQL) query asynchronously and returns the results in a dictionary format. :param query: The BQL query to be executed. :type query: str :return: A dictionary containing the query's results, with column names mapped to their respective values. :rtype: dict[str, Any] """ snapshot = await self.snapshot() types, rows = await asyncio.to_thread( bql.run_query, snapshot.entries, snapshot.options_map, query ) return query_to_dict(types, rows) async def account_balances( self, on_date: datetime.date | None = None ) -> list[dict[str, Any]]: """ Get the account balances as of a specific date or the latest snapshot if no date is provided. This method retrieves a summary of account balances, either on a specified date or based on the latest available snapshot. The data is organized and returned in the form of a list of dictionaries, each representing an account and its corresponding balance. :param on_date: The date for which to compute account balances. If None, the latest snapshot is used for computation of balances. :type on_date: datetime.date or None :return: A list of dictionaries, where each dictionary contains an account name and its associated balance information. :rtype: list[dict[str, Any]] """ snapshot = await self.snapshot() balances, _ = summarize.balance_by_account(snapshot.entries, on_date) results = [] for account_name in sorted(balances.keys()): results.append( { "account": account_name, "balance": _inventory_to_list(balances[account_name]), } ) return results async def journal_entries(self, limit: int | None = None) -> list[dict[str, Any]]: """ Fetches a list of journal entries in dictionary format, with an optional limit on the number of recent transactions to return. This function retrieves all journal entries from the snapshot, filters transactions specifically of type `Transaction`, and then converts each entry into a dictionary representation. :param limit: An optional integer specifying the maximum number of recent transactions to include in the returned list. If not provided, all transactions will be returned. :return: A list of dictionaries where each dictionary represents a journal entry of type `Transaction`. :rtype: list[dict[str, Any]] """ snapshot = await self.snapshot() transactions = [ entry for entry in snapshot.entries if isinstance(entry, data.Transaction) ] if limit: transactions = transactions[-limit:] return [entry_to_dict(entry) for entry in transactions] def _get_mtime(self) -> float: if not self.ledger_path.exists(): self.ledger_path.parent.mkdir(parents=True, exist_ok=True) self.ledger_path.touch() return self.ledger_path.stat().st_mtime