"""Publisher API: Notifier (toast methods) and TaskTracker (context manager)."""
import asyncio
import logging
import time
from typing import Optional
from .bus import NotificationBus
from .state import (
TaskMilestone,
TaskStatus,
Toast,
ToastType,
TrackedTask,
)
logger = logging.getLogger(__name__)
[docs]
class TaskTracker:
"""Context manager for tracking a long-running task with milestones."""
[docs]
def __init__(self, bus: NotificationBus, task: TrackedTask):
"""Initialize tracker with a bus reference and task identity."""
self._bus = bus
self._task_id = task.id
self._finished = False
[docs]
def step(self, message: str) -> None:
"""Add a named milestone and increment current_step."""
if self._finished:
return
current = self._get_task()
if current is None:
return
milestone = TaskMilestone(message=message)
self._bus.update_task(
self._task_id,
milestones=(*current.milestones, milestone),
current_step=current.current_step + 1,
status=TaskStatus.RUNNING,
)
[docs]
def set_progress(self, value: float) -> None:
"""Update continuous progress (0.0-1.0). Does NOT create a milestone."""
if self._finished:
return
self._bus.update_task(self._task_id, progress=value)
[docs]
def update(self, title: str) -> None:
"""Update the task title."""
if self._finished:
return
self._bus.update_task(self._task_id, title=title)
[docs]
def complete(self, message: Optional[str] = None) -> None:
"""Explicitly mark the task as completed."""
if self._finished:
return
self._finished = True
changes = {
"status": TaskStatus.COMPLETED,
"progress": 1.0,
"completed_at": time.time(),
}
if message:
current = self._get_task()
if current:
changes["milestones"] = (
*current.milestones,
TaskMilestone(message=message),
)
self._bus.update_task(self._task_id, **changes)
[docs]
def fail(self, message: str) -> None:
"""Explicitly mark the task as failed."""
if self._finished:
return
self._finished = True
self._bus.update_task(
self._task_id,
status=TaskStatus.FAILED,
error_message=message,
completed_at=time.time(),
)
[docs]
def cancel(self) -> None:
"""Explicitly mark the task as cancelled."""
if self._finished:
return
self._finished = True
self._bus.update_task(
self._task_id,
status=TaskStatus.CANCELLED,
completed_at=time.time(),
)
def _get_task(self) -> Optional[TrackedTask]:
"""Get the current task state from the bus."""
for t in self._bus.tasks.value:
if t.id == self._task_id:
return t
return None
[docs]
class Notifier:
"""Main publisher API for notifications."""
[docs]
def __init__(self, bus: NotificationBus):
"""Initialize notifier with a notification bus."""
self._bus = bus
[docs]
def success(self, message: str, *, timeout: Optional[float] = None) -> None:
"""Publish a success toast. ``timeout`` overrides the per-type default."""
self._bus.add_toast(Toast(message=message, type=ToastType.SUCCESS, timeout=timeout))
[docs]
def error(self, message: str, *, timeout: Optional[float] = None) -> None:
"""Publish an error toast. ``timeout`` overrides the per-type default."""
self._bus.add_toast(Toast(message=message, type=ToastType.ERROR, timeout=timeout))
[docs]
def warning(self, message: str, *, timeout: Optional[float] = None) -> None:
"""Publish a warning toast. ``timeout`` overrides the per-type default."""
self._bus.add_toast(Toast(message=message, type=ToastType.WARNING, timeout=timeout))
[docs]
def info(self, message: str, *, timeout: Optional[float] = None) -> None:
"""Publish an info toast. ``timeout`` overrides the per-type default."""
self._bus.add_toast(Toast(message=message, type=ToastType.INFO, timeout=timeout))
[docs]
def cancel(self, message: str, *, timeout: Optional[float] = None) -> None:
"""Publish a cancellation toast (gray). ``timeout`` overrides default."""
self._bus.add_toast(Toast(message=message, type=ToastType.CANCEL, timeout=timeout))
[docs]
def dismiss(self, toast_id: str) -> None:
"""Dismiss a toast by ID."""
self._bus.remove_toast(toast_id)
[docs]
def track(self, title: str, total_steps: Optional[int] = None) -> "_TaskTrackerContextManager":
"""Return a TaskTracker context manager for a long-running task."""
task = TrackedTask(title=title, total_steps=total_steps)
self._bus.add_task(task)
return _TaskTrackerContextManager(self._bus, task)
class _TaskTrackerContextManager(TaskTracker):
"""TaskTracker that also acts as a context manager."""
def __enter__(self) -> TaskTracker:
self._bus.update_task(self._task_id, status=TaskStatus.RUNNING)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
if self._finished:
# Already explicitly completed/failed/cancelled.
# But if there's an exception AND the task wasn't already
# FAILED or CANCELLED, override to FAILED so the error is visible.
if exc_type is not None:
current = self._get_task()
if current and current.status not in (
TaskStatus.FAILED,
TaskStatus.CANCELLED,
):
self._bus.update_task(
self._task_id,
status=TaskStatus.FAILED,
error_message=str(exc_val),
completed_at=None,
)
self._bus.add_toast(Toast(message=str(exc_val), type=ToastType.ERROR))
return False # Re-raise if exception
if exc_type is None:
self._finished = True
self._bus.update_task(
self._task_id,
status=TaskStatus.COMPLETED,
completed_at=time.time(),
)
elif issubclass(exc_type, asyncio.CancelledError):
self.cancel()
return False # Re-raise CancelledError
else:
self.fail(str(exc_val))
# Publish error toast
self._bus.add_toast(Toast(message=str(exc_val), type=ToastType.ERROR))
return False # Re-raise exception
return False
class _NoopTaskTracker:
"""TaskTracker that does nothing (used when no provider is mounted)."""
def step(self, message: str) -> None:
pass
def set_progress(self, value: float) -> None:
pass
def update(self, title: str) -> None:
pass
def complete(self, message: Optional[str] = None) -> None:
pass
def fail(self, message: str) -> None:
pass
def cancel(self) -> None:
pass
class _NoopTaskTrackerContextManager(_NoopTaskTracker):
"""Noop context manager version."""
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return False
[docs]
class NoopNotifier:
"""Notifier that does nothing (used when no provider is mounted)."""
[docs]
def success(self, message: str, *, timeout: Optional[float] = None) -> None:
"""No-op success toast."""
[docs]
def error(self, message: str, *, timeout: Optional[float] = None) -> None:
"""No-op error toast."""
[docs]
def warning(self, message: str, *, timeout: Optional[float] = None) -> None:
"""No-op warning toast."""
[docs]
def info(self, message: str, *, timeout: Optional[float] = None) -> None:
"""No-op info toast."""
[docs]
def cancel(self, message: str, *, timeout: Optional[float] = None) -> None:
"""No-op cancel toast."""
[docs]
def dismiss(self, toast_id: str) -> None:
"""No-op dismiss."""
[docs]
def track(self, title: str, total_steps: Optional[int] = None):
"""Return a no-op task tracker context manager."""
return _NoopTaskTrackerContextManager()