Перейти к содержанию

taskiq_dashboard

TaskiqDashboard

TaskiqDashboard(
    api_token,
    storage_type='sqlite',
    database_dsn='sqlite+aiosqlite:///taskiq_dashboard.db',
    broker=None,
    scheduler=None,
    **uvicorn_kwargs,
)

Initialize Taskiq Dashboard application.

Parameters:

  • api_token (str) –

    Access token for securing the dashboard API.

  • storage_type (str, default: 'sqlite' ) –

    Type of the storage backend ('sqlite' or 'postgres').

  • database_dsn (str, default: 'sqlite+aiosqlite:///taskiq_dashboard.db' ) –

    URL for the database.

  • broker (AsyncBroker | None, default: None ) –

    Optional Taskiq broker instance to integrate with the dashboard.

  • scheduler (TaskiqScheduler | None, default: None ) –

    Optional Taskiq scheduler instance to integrate with the dashboard.

  • uvicorn_kwargs (Any, default: {} ) –

    Additional keyword arguments to pass to uvicorn.

Source code in taskiq_dashboard/interface/application.py
def __init__(
    self,
    api_token: str,
    storage_type: str = 'sqlite',
    database_dsn: str = 'sqlite+aiosqlite:///taskiq_dashboard.db',
    broker: AsyncBroker | None = None,
    scheduler: TaskiqScheduler | None = None,
    **uvicorn_kwargs: tp.Any,
) -> None:
    """Initialize Taskiq Dashboard application.

    Args:
        api_token: Access token for securing the dashboard API.
        storage_type: Type of the storage backend ('sqlite' or 'postgres').
        database_dsn: URL for the database.
        broker: Optional Taskiq broker instance to integrate with the dashboard.
        scheduler: Optional Taskiq scheduler instance to integrate with the dashboard.
        uvicorn_kwargs: Additional keyword arguments to pass to uvicorn.
    """
    self.settings = get_settings()
    self.settings.api.token = SecretStr(api_token)

    if storage_type == 'sqlite':
        self.settings.sqlite = SqliteSettings(dsn=database_dsn)  # type: ignore[call-arg]
    else:
        self.settings.postgres = PostgresSettings(dsn=database_dsn)  # type: ignore[call-arg]

    self.broker = broker
    self.scheduler = scheduler

    self._uvicorn_kwargs = {
        'host': 'localhost',
        'port': 8000,
        'reload': False,
        'workers': 1,
        'lifespan': 'on',
        'proxy_headers': True,
        'forwarded_allow_ips': '*',
        'timeout_keep_alive': 60,
        'access_log': True,
    }
    self._uvicorn_kwargs.update(uvicorn_kwargs or {})

settings instance-attribute

settings = get_settings()

broker instance-attribute

broker = broker

scheduler instance-attribute

scheduler = scheduler

run

run()
Source code in taskiq_dashboard/interface/application.py
def run(self) -> None:
    application = get_application()
    application.state.broker = self.broker
    application.state.scheduler = self.scheduler
    uvicorn.run(
        application,
        **self._uvicorn_kwargs,  # type: ignore[arg-type]
    )

api

application

lifespan async

lifespan(app)
Source code in taskiq_dashboard/api/application.py
@contextlib.asynccontextmanager
async def lifespan(app: fastapi.FastAPI) -> tp.AsyncGenerator[None, None]:
    schema_service = await app.state.dishka_container.get(AbstractSchemaService)
    await schema_service.create_schema()

    # we probably missed events about these tasks during the downtime, so we need to mark them as abandoned
    task_repository = await app.state.dishka_container.get(AbstractTaskRepository)
    await task_repository.batch_update(
        old_status=TaskStatus.IN_PROGRESS,
        new_status=TaskStatus.ABANDONED,
    )
    await task_repository.batch_update(
        old_status=TaskStatus.QUEUED,
        new_status=TaskStatus.ABANDONED,
    )

    if app.state.broker is not None:
        await app.state.broker.startup()

    if app.state.scheduler is not None:
        for schedule_source in app.state.scheduler.sources:
            await schedule_source.startup()

    yield

    if app.state.scheduler is not None:
        for schedule_source in app.state.scheduler.sources:
            await schedule_source.shutdown()

    if app.state.broker is not None:
        await app.state.broker.shutdown()

    await app.state.dishka_container.close()

get_application

get_application()
Source code in taskiq_dashboard/api/application.py
def get_application() -> fastapi.FastAPI:
    docs_path = '/docs'
    app = fastapi.FastAPI(
        title='Taskiq Dashboard',
        summary='Taskiq administration dashboard',
        docs_url=docs_path,
        lifespan=lifespan,
        exception_handlers={
            404: exception_handler__not_found,
        },
    )
    app.include_router(router=system_router)
    app.include_router(router=task_router)
    app.include_router(router=event_router)
    app.include_router(router=action_router)
    app.include_router(router=schedule_router)
    app.mount('/static', StaticFiles(directory='taskiq_dashboard/api/static'), name='static')
    app.add_middleware(AccessTokenMiddleware)
    setup_dishka(container=dependencies.container, app=app)
    return app

middlewares

AccessTokenMiddleware

Bases: BaseHTTPMiddleware

dispatch async
dispatch(request, call_next)
Source code in taskiq_dashboard/api/middlewares.py
async def dispatch(self, request: Request, call_next: tp.Callable[[Request], tp.Awaitable[Response]]) -> Response:
    if not request.url.path.startswith('/api/'):
        return await call_next(request)

    token = request.headers.get('access-token')
    if not token:
        raise HTTPException(status_code=401, detail='Missing or invalid Authorization header')

    settings = get_settings()
    if settings.api.token.get_secret_value() != token:
        raise HTTPException(status_code=401, detail='Invalid access token')
    return await call_next(request)

routers

action_router module-attribute

action_router = APIRouter(
    prefix='/actions',
    tags=['Action'],
    route_class=DishkaRoute,
)

event_router module-attribute

event_router = APIRouter(
    prefix='/api/tasks',
    tags=['Event'],
    route_class=DishkaRoute,
)

schedule_router module-attribute

schedule_router = APIRouter(
    prefix='/schedules',
    tags=['Schedule'],
    route_class=DishkaRoute,
)

system_router module-attribute

system_router = APIRouter(
    tags=['System'], route_class=DishkaRoute
)

task_router module-attribute

task_router = APIRouter(
    prefix='', tags=['Tasks'], route_class=DishkaRoute
)

action

router module-attribute
router = APIRouter(
    prefix='/actions',
    tags=['Action'],
    route_class=DishkaRoute,
)
logger module-attribute
logger = getLogger(__name__)
handle_task_run async
handle_task_run(request, task_name)
Source code in taskiq_dashboard/api/routers/action.py
@router.post(
    '/run/{task_name}',
    name='Kick task',
)
async def handle_task_run(
    request: fastapi.Request,
    task_name: str,
) -> Response:
    broker: AsyncBroker | None = request.app.state.broker
    if broker is None:
        logger.error('No broker configured to handle task kick', extra={'task_name': task_name})
        return Response(status_code=status.HTTP_400_BAD_REQUEST, content=b'No broker configured')

    task = broker.find_task(task_name)
    if not task:
        logger.error('Task not found in broker', extra={'task_name': task_name})
        return Response(status_code=status.HTTP_404_NOT_FOUND, content=b'Task not found')

    await task.kicker().with_task_id(str(uuid.uuid4())).kiq()

    return Response(status_code=status.HTTP_204_NO_CONTENT)
handle_task_rerun async
handle_task_rerun(request, task_id, repository)
Source code in taskiq_dashboard/api/routers/action.py
@router.post(
    '/rerun/{task_id}',
    name='Rerun task',
)
async def handle_task_rerun(
    request: fastapi.Request,
    task_id: uuid.UUID,
    repository: dishka_fastapi.FromDishka[AbstractTaskRepository],
) -> Response:
    broker: AsyncBroker | None = request.app.state.broker
    if broker is None:
        logger.error('No broker configured to handle task kick', extra={'task_id': task_id})
        return Response(status_code=status.HTTP_400_BAD_REQUEST, content=b'No broker configured')

    existing_task = await repository.get_task_by_id(task_id)
    if existing_task is None:
        logger.error('Task not found in repository', extra={'task_id': str(task_id)})
        return Response(status_code=status.HTTP_404_NOT_FOUND, content=b'Task not found')
    task = broker.find_task(existing_task.name)
    if not task:
        logger.error('Task not found in broker', extra={'task_name': existing_task.name})
        return Response(status_code=status.HTTP_404_NOT_FOUND, content=b'Task not found')
    new_task_id = str(uuid.uuid4())
    await (
        task.kicker()
        .with_task_id(new_task_id)
        .with_labels(**existing_task.labels)
        .kiq(
            *existing_task.args,
            **existing_task.kwargs,
        )
    )

    return jinja_templates.TemplateResponse(
        'partial/notification.html',
        {
            'request': request,
            'message': (
                f"""
                Task rerun started with ID
                <a class="underline hover:ctp-text-lavander" href="/tasks/{new_task_id}">
                    {new_task_id}.
                </a>
                """
            ),
        },
        status_code=status.HTTP_200_OK,
    )
handle_task_delete async
handle_task_delete(task_id, repository)
Source code in taskiq_dashboard/api/routers/action.py
@router.get(
    '/delete/{task_id}',
    name='Delete task',
)
async def handle_task_delete(
    task_id: uuid.UUID,
    repository: dishka_fastapi.FromDishka[AbstractTaskRepository],
) -> Response:
    await repository.delete_task(task_id)
    return RedirectResponse(
        url='/',
        status_code=status.HTTP_307_TEMPORARY_REDIRECT,
    )

event

router module-attribute
router = APIRouter(
    prefix='/api/tasks',
    tags=['Event'],
    route_class=DishkaRoute,
)
logger module-attribute
logger = getLogger(__name__)
handle_task_event async
handle_task_event(task_id, event, task_repository, body)

Handle task events from TaskiqAdminMiddleware.

This endpoint receives task events such as 'queued', 'started', and 'executed' from the TaskiqAdminMiddleware. It processes the event based on the task ID and event type.

Parameters:

  • task_id (UUID) –

    The unique identifier of the task.

  • event (Literal['queued', 'started', 'executed']) –

    The type of event (e.g., 'queued', 'started', 'executed').

Source code in taskiq_dashboard/api/routers/event.py
@router.post(
    '/{task_id}/{event}',
    name='Receive task event',
)
async def handle_task_event(
    task_id: uuid.UUID,
    event: tp.Annotated[tp.Literal['queued', 'started', 'executed'], fastapi.Path(title='Event type')],
    task_repository: dishka_fastapi.FromDishka[AbstractTaskRepository],
    body: tp.Annotated[dict[str, tp.Any], fastapi.Body(title='Event data')],
) -> Response:
    """
    Handle task events from TaskiqAdminMiddleware.

    This endpoint receives task events such as 'queued', 'started', and 'executed'
    from the TaskiqAdminMiddleware. It processes the event based on the task ID
    and event type.

    Args:
        task_id: The unique identifier of the task.
        event: The type of event (e.g., 'queued', 'started', 'executed').
    """
    # Here you would implement the logic to handle the task event,
    # such as updating a database record or logging the event.
    task_arguments: QueuedTask | StartedTask | ExecutedTask
    match event:
        case 'queued':
            task_arguments = QueuedTask.model_validate(body)
            await task_repository.create_task(task_id, task_arguments)
            logger.info('Task queued event', extra={'task_id': task_id})
        case 'started':
            task_arguments = StartedTask.model_validate(body)
            await task_repository.update_task(task_id, task_arguments)
            logger.info('Task started event', extra={'task_id': task_id})
        case 'executed':
            task_arguments = ExecutedTask.model_validate(body)
            await task_repository.update_task(task_id, task_arguments)
            logger.info('Task executed event', extra={'task_id': task_id})
    return Response(status_code=status.HTTP_204_NO_CONTENT)

exception_handlers

exception_handler__not_found async
exception_handler__not_found(request, __)
Source code in taskiq_dashboard/api/routers/exception_handlers.py
async def exception_handler__not_found(
    request: fastapi.Request,
    __: fastapi.HTTPException,
) -> HTMLResponse:
    return jinja_templates.TemplateResponse(
        '404.html',
        {'request': request},
    )

schedule

router module-attribute
router = APIRouter(
    prefix='/schedules',
    tags=['Schedule'],
    route_class=DishkaRoute,
)
logger module-attribute
logger = getLogger(__name__)
ScheduleFilter

Bases: BaseModel

limit class-attribute instance-attribute
limit = 30
offset class-attribute instance-attribute
offset = 0
handle_schedule_list async
handle_schedule_list(request, query, hx_request=False)
Source code in taskiq_dashboard/api/routers/schedule.py
@router.get(
    '/',
    name='Schedule list view',
    response_class=HTMLResponse,
)
async def handle_schedule_list(
    request: fastapi.Request,
    query: tp.Annotated[ScheduleFilter, fastapi.Query(...)],
    hx_request: tp.Annotated[bool, fastapi.Header(description='Request from htmx')] = False,  # noqa: FBT002
) -> HTMLResponse:
    scheduler: TaskiqScheduler | None = request.app.state.scheduler
    if not scheduler:
        return jinja_templates.TemplateResponse(
            name='404.html',
            context={
                'request': request,
                'message': 'Scheduler not configured.',
            },
            status_code=status.HTTP_404_NOT_FOUND,
        )
    schedules = []
    for schedule_source in sorted(scheduler.sources, key=lambda s: id(s)):
        schedules_from_source = [schedule.model_dump() for schedule in await schedule_source.get_schedules()]
        schedules_from_source.sort(key=lambda s: s['schedule_id'])
        for schedule in schedules_from_source:
            schedule['source'] = schedule_source.__class__.__name__
            schedule['source_id'] = id(schedule_source)
        schedules.extend(schedules_from_source)
        if len(schedules) >= query.offset + query.limit:
            break

    headers: dict[str, str] = {}
    template_name = 'schedule_page.html'
    if hx_request:
        template_name = 'partial/schedule_list.html'
        headers = {
            'HX-Push-Url': '/schedules/?' + urlencode(query.model_dump(exclude={'limit', 'offset'})),
        }

    return jinja_templates.TemplateResponse(
        name=template_name,
        context={
            'request': request,
            'schedules': schedules[query.offset :],
            'limit': query.limit,
            'offset': query.offset,
        },
        headers=headers,
        status_code=status.HTTP_200_OK,
    )
handle_schedule_details async
handle_schedule_details(request, schedule_id)
Source code in taskiq_dashboard/api/routers/schedule.py
@router.get(
    '/{schedule_id}',
    name='Schedule details view',
    response_class=HTMLResponse,
)
async def handle_schedule_details(
    request: fastapi.Request,
    schedule_id: str,
) -> HTMLResponse:
    scheduler: TaskiqScheduler | None = request.app.state.scheduler
    if not scheduler:
        return jinja_templates.TemplateResponse(
            name='404.html',
            context={
                'request': request,
                'message': 'Scheduler not configured.',
            },
            status_code=status.HTTP_404_NOT_FOUND,
        )
    for schedule_source in scheduler.sources:
        for schedule in await schedule_source.get_schedules():
            if schedule.schedule_id == str(schedule_id):
                schedule_dict = schedule.model_dump()
                schedule_dict['source'] = schedule_source.__class__.__name__
                schedule_dict['source_id'] = id(schedule_source)
                return jinja_templates.TemplateResponse(
                    name='schedule_details.html',
                    context={
                        'request': request,
                        'schedule': schedule_dict,
                    },
                    status_code=status.HTTP_200_OK,
                )
    return jinja_templates.TemplateResponse(
        name='404.html',
        context={
            'request': request,
            'message': 'Schedule not found.',
        },
        status_code=status.HTTP_404_NOT_FOUND,
    )

system

router module-attribute
router = APIRouter(tags=['System'], route_class=DishkaRoute)
HealthCheckResponse

Bases: BaseModel

status instance-attribute
status
app_name instance-attribute
app_name
get_liveness async
get_liveness()
Source code in taskiq_dashboard/api/routers/system.py
@router.get('/liveness', name='liveness', summary='Проверка работоспособности сервиса')
async def get_liveness() -> HealthCheckResponse:
    return HealthCheckResponse(
        status='alive',
        app_name='taskiq dashboard',
    )
get_readiness async
get_readiness()
Source code in taskiq_dashboard/api/routers/system.py
@router.get('/readiness', name='readiness', summary='Проверка готовности обслуживать входящие запросы')
async def get_readiness() -> HealthCheckResponse:
    # TODO: maybe add "select 1" to database
    return HealthCheckResponse(
        status='ready',
        app_name='taskiq dashboard',
    )

task

router module-attribute
router = APIRouter(
    prefix='', tags=['Tasks'], route_class=DishkaRoute
)
TaskFilter

Bases: BaseModel

q class-attribute instance-attribute
q = ''
status class-attribute instance-attribute
status = None
limit class-attribute instance-attribute
limit = 30
offset class-attribute instance-attribute
offset = 0
sort_by class-attribute instance-attribute
sort_by = 'started_at'
sort_order class-attribute instance-attribute
sort_order = 'desc'
model_config class-attribute instance-attribute
model_config = ConfigDict(extra='ignore')
validate_status classmethod
validate_status(value)
Source code in taskiq_dashboard/api/routers/task.py
@pydantic.field_validator('status', mode='before')
@classmethod
def validate_status(
    cls,
    value: TaskStatus | str | None,
) -> TaskStatus | None:
    if isinstance(value, str) and value == 'null':
        return None
    return value  # type: ignore[return-value]
serialize_status
serialize_status(value)
Source code in taskiq_dashboard/api/routers/task.py
@pydantic.field_serializer('status', mode='plain')
def serialize_status(
    self,
    value: TaskStatus | None,
) -> str | None:
    if value is None:
        return 'null'
    return str(value.value)
search_tasks async
search_tasks(request, repository, query, hx_request=False)
Source code in taskiq_dashboard/api/routers/task.py
@router.get(
    '/',
    name='Task list view',
    response_class=HTMLResponse,
)
async def search_tasks(
    request: fastapi.Request,
    repository: dishka_fastapi.FromDishka[AbstractTaskRepository],
    query: tp.Annotated[TaskFilter, fastapi.Query(...)],
    hx_request: tp.Annotated[bool, fastapi.Header(description='Request from htmx')] = False,  # noqa: FBT002
) -> HTMLResponse:
    tasks = await repository.find_tasks(
        name=query.q,
        status=query.status,
        limit=query.limit,
        offset=query.offset,
        sort_by=query.sort_by,
        sort_order=query.sort_order,
    )
    headers: dict[str, str] = {}
    template_name = 'home.html'
    if hx_request:
        headers = {
            'HX-Push-Url': '/?' + urlencode(query.model_dump(exclude={'limit', 'offset'})),
        }
        template_name = 'partial/task_list.html'
    return jinja_templates.TemplateResponse(
        template_name,
        {
            'request': request,
            'results': [task.model_dump() for task in tasks],
            **query.model_dump(),
        },
        headers=headers,
    )
task_details async
task_details(request, repository, task_id)

Display detailed information for a specific task.

Source code in taskiq_dashboard/api/routers/task.py
@router.get(
    '/tasks/{task_id:uuid}',
    name='Task details view',
    response_class=HTMLResponse,
)
async def task_details(
    request: fastapi.Request,
    repository: dishka_fastapi.FromDishka[AbstractTaskRepository],
    task_id: uuid.UUID,
) -> HTMLResponse:
    """
    Display detailed information for a specific task.
    """
    # Get task by ID
    task = await repository.get_task_by_id(task_id)

    if task is None:
        # If task is not found, return 404 page
        return jinja_templates.TemplateResponse(
            name='404.html',
            context={
                'request': request,
                'message': f'Task with ID {task_id} not found',
            },
            status_code=404,
        )

    # Convert task to JSON for the frontend
    task_json = json.dumps(task.model_dump(mode='json'))

    return jinja_templates.TemplateResponse(
        name='task_details.html',
        context={
            'request': request,
            'task': task,
            'task_json': task_json,
            'enable_actions': request.app.state.broker is not None,
            'enable_additional_actions': False,  # Placeholder for future features like retries with different args
        },
    )

templates

jinja_templates module-attribute

jinja_templates = Jinja2Templates(
    directory='taskiq_dashboard/api/templates'
)

dependencies

container module-attribute

container = make_async_container(TaskiqDashboardProvider())

TaskiqDashboardProvider

TaskiqDashboardProvider(scope=APP)

Bases: Provider

Source code in taskiq_dashboard/dependencies.py
def __init__(self, scope: Scope = Scope.APP) -> None:
    super().__init__(scope=scope)

provide_settings

provide_settings()
Source code in taskiq_dashboard/dependencies.py
@provide
def provide_settings(self) -> Settings:
    return get_settings()

provide_session_provider async

provide_session_provider(settings)
Source code in taskiq_dashboard/dependencies.py
@provide
async def provide_session_provider(
    self,
    settings: Settings,
) -> tp.AsyncGenerator[AsyncPostgresSessionProvider, tp.Any]:
    session_provider = AsyncPostgresSessionProvider(
        connection_settings=settings.postgres if settings.storage_type == 'postgres' else settings.sqlite,
    )
    yield session_provider
    await session_provider.close()

provide_task_service

provide_task_service(settings, session_provider)
Source code in taskiq_dashboard/dependencies.py
@provide
def provide_task_service(
    self,
    settings: Settings,
    session_provider: AsyncPostgresSessionProvider,
) -> AbstractTaskRepository:
    return TaskRepository(
        session_provider=session_provider,
        task_model=PostgresTask if settings.storage_type == 'postgres' else SqliteTask,
    )

provide_schema_service

provide_schema_service(settings, session_provider)
Source code in taskiq_dashboard/dependencies.py
@provide
def provide_schema_service(
    self,
    settings: Settings,
    session_provider: AsyncPostgresSessionProvider,
) -> AbstractSchemaService:
    return SchemaService(
        session_provider=session_provider,
        table_name='taskiq_dashboard__tasks' if settings.storage_type == 'postgres' else 'tasks',
    )

infrastructure

APISettings

Bases: BaseSettings

host class-attribute instance-attribute

host = '0.0.0.0'

port class-attribute instance-attribute

port = 8000

token class-attribute instance-attribute

token = SecretStr('supersecret')

model_config class-attribute instance-attribute

model_config = SettingsConfigDict(extra='allow')

PostgresSettings

Bases: BaseSettings

Настройки для подключения к PostgreSQL.

driver class-attribute instance-attribute

driver = 'postgresql+asyncpg'

host class-attribute instance-attribute

host = 'localhost'

port class-attribute instance-attribute

port = 5432

user class-attribute instance-attribute

user = 'taskiq-dashboard'

password class-attribute instance-attribute

password = SecretStr('look_in_vault')

database class-attribute instance-attribute

database = 'taskiq-dashboard'

min_pool_size class-attribute instance-attribute

min_pool_size = 1

max_pool_size class-attribute instance-attribute

max_pool_size = 5

dsn property

dsn

Возвращает строку подключения к PostgreSQL составленную из параметров класса.

Пример использования с asyncpg:

>>> import asyncpg
>>> async def create_pool(settings: PostgresSettings) -> asyncpg.pool.Pool:
>>>     return await asyncpg.create_pool(
>>>            dsn=settings.postgres.dsn.get_secret_value(),
>>>            min_size=settings.postgres.min_size,
>>>            max_size=settings.postgres.max_size,
>>>            statement_cache_size=settings.postgres.statement_cache_size,
>>>     )

Пример использования с SQLAlchemy:

>>> import sqlalchemy
>>> async def create_pool(settings: PostgresSettings) -> sqlalchemy.ext.asyncio.AsyncEngine:
>>>     return sqlalchemy.ext.asyncio.create_async_engine(
>>>         settings.postgres.dsn.get_secret_value()
>>>     )

model_config class-attribute instance-attribute

model_config = SettingsConfigDict(extra='ignore')

Settings

Bases: BaseSettings

api class-attribute instance-attribute

api = APISettings()

storage_type class-attribute instance-attribute

storage_type = 'sqlite'

postgres class-attribute instance-attribute

postgres = PostgresSettings()

sqlite class-attribute instance-attribute

sqlite = SqliteSettings()

model_config class-attribute instance-attribute

model_config = SettingsConfigDict(
    env_nested_delimiter='__',
    env_prefix='TASKIQ_DASHBOARD__',
    env_file=('conf/.env', getenv('ENV_FILE', '.env')),
    env_file_encoding='utf-8',
)

SqliteSettings

Bases: BaseSettings

driver class-attribute instance-attribute

driver = 'sqlite+aiosqlite'

file_path class-attribute instance-attribute

file_path = 'taskiq_dashboard.db'

dsn property

dsn

model_config class-attribute instance-attribute

model_config = SettingsConfigDict(extra='ignore')

get_settings cached

get_settings()
Source code in taskiq_dashboard/infrastructure/settings.py
@cache
def get_settings() -> Settings:
    return Settings()

database

schemas

sa_metadata module-attribute
sa_metadata = MetaData(
    naming_convention={
        'ix': 'ix_%(column_0_label)s',
        'uq': 'uq_%(table_name)s_%(column_0_name)s',
        'ck': 'ck_%(table_name)s_%(constraint_name)s',
        'fk': 'fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s',
        'pk': 'pk_%(table_name)s',
    }
)
BaseTableSchema
PostgresTask

Bases: BaseTableSchema

id class-attribute instance-attribute
id = mapped_column(
    UUID(as_uuid=True), primary_key=True, default=uuid4
)
name class-attribute instance-attribute
name = mapped_column(TEXT, nullable=False)
status class-attribute instance-attribute
status = mapped_column(Integer, nullable=False)
worker class-attribute instance-attribute
worker = mapped_column(TEXT, nullable=False)
args class-attribute instance-attribute
args = mapped_column(JSONB, nullable=False, default='[]')
kwargs class-attribute instance-attribute
kwargs = mapped_column(JSONB, nullable=False, default='{}')
labels class-attribute instance-attribute
labels = mapped_column(JSONB, nullable=False, default='{}')
result class-attribute instance-attribute
result = mapped_column(JSONB, nullable=True, default=None)
error class-attribute instance-attribute
error = mapped_column(TEXT, nullable=True, default=None)
queued_at class-attribute instance-attribute
queued_at = mapped_column(
    DateTime(timezone=True), nullable=False, default=now
)
started_at class-attribute instance-attribute
started_at = mapped_column(
    DateTime(timezone=True), nullable=True
)
finished_at class-attribute instance-attribute
finished_at = mapped_column(
    DateTime(timezone=True), nullable=True
)
SqliteTask

Bases: BaseTableSchema

id class-attribute instance-attribute
id = mapped_column(
    Uuid(as_uuid=True), primary_key=True, default=uuid4
)
name class-attribute instance-attribute
name = mapped_column(TEXT, nullable=False)
status class-attribute instance-attribute
status = mapped_column(INTEGER, nullable=False)
worker class-attribute instance-attribute
worker = mapped_column(TEXT, nullable=False)
args class-attribute instance-attribute
args = mapped_column(JSON, nullable=False, default='[]')
kwargs class-attribute instance-attribute
kwargs = mapped_column(JSON, nullable=False, default='{}')
labels class-attribute instance-attribute
labels = mapped_column(JSON, nullable=False, default='{}')
result class-attribute instance-attribute
result = mapped_column(JSON, nullable=True, default=None)
error class-attribute instance-attribute
error = mapped_column(TEXT, nullable=True, default=None)
queued_at class-attribute instance-attribute
queued_at = mapped_column(
    DateTime(timezone=True), nullable=False, default=now
)
started_at class-attribute instance-attribute
started_at = mapped_column(
    DateTime(timezone=True), nullable=True
)
finished_at class-attribute instance-attribute
finished_at = mapped_column(
    DateTime(timezone=True), nullable=True
)

session_provider

AsyncPostgresSessionProvider
AsyncPostgresSessionProvider(connection_settings)
Source code in taskiq_dashboard/infrastructure/database/session_provider.py
def __init__(
    self,
    connection_settings: PostgresSettings | SqliteSettings,
) -> None:
    engine_parameters: dict[str, tp.Any] = {
        'echo': False,
    }

    if isinstance(connection_settings, PostgresSettings):
        engine_parameters.update(
            {
                'pool_size': connection_settings.min_pool_size,
                'max_overflow': connection_settings.max_pool_size - connection_settings.min_pool_size,
                'execution_options': {'prepare': False},
                'connect_args': {  # for connection through pgbouncer
                    'statement_cache_size': 0,
                    'prepared_statement_cache_size': 0,
                    'prepared_statement_name_func': lambda: f'__asyncpg_{uuid.uuid4()}__',
                },
            }
        )

    self._engine = sa_async.create_async_engine(
        connection_settings.dsn.get_secret_value(),
        **engine_parameters,
    )
    self._session_factory = sa_async.async_sessionmaker(
        bind=self._engine,
        expire_on_commit=False,
        class_=sa_async.AsyncSession,
    )
session async
session()

Create and manage a new AsyncSession.

Usage

async with repository.session() as session: # use session for database operations result = await session.execute(...)

Source code in taskiq_dashboard/infrastructure/database/session_provider.py
@asynccontextmanager
async def session(self) -> tp.AsyncGenerator[sa_async.AsyncSession, None]:
    """
    Create and manage a new AsyncSession.

    Usage:
        async with repository.session() as session:
            # use session for database operations
            result = await session.execute(...)
    """
    session = self._session_factory()
    try:
        yield session
        await session.commit()
    except Exception:
        await session.rollback()
        raise
    finally:
        await session.close()
close async
close()

Close the engine and release all connections.

Source code in taskiq_dashboard/infrastructure/database/session_provider.py
async def close(self) -> None:
    """Close the engine and release all connections."""
    await self._engine.dispose()

services

schema_service

SchemaService
SchemaService(
    session_provider, table_name='taskiq_dashboard__tasks'
)

Bases: AbstractSchemaService

Source code in taskiq_dashboard/infrastructure/services/schema_service.py
def __init__(
    self,
    session_provider: AsyncPostgresSessionProvider,
    table_name: str = 'taskiq_dashboard__tasks',
) -> None:
    self._session_provider = session_provider
    self._table_name = table_name
create_schema async
create_schema()
Source code in taskiq_dashboard/infrastructure/services/schema_service.py
async def create_schema(self) -> None:
    query = f"""
    CREATE TABLE IF NOT EXISTS {self._table_name} (
        id UUID NOT NULL,
        name TEXT NOT NULL,
        status INTEGER NOT NULL,
        worker TEXT NOT NULL,
        args JSONB NOT NULL DEFAULT '[]',
        kwargs JSONB NOT NULL DEFAULT '{{}}',
        labels JSONB NOT NULL DEFAULT '{{}}',
        result JSONB DEFAULT NULL,
        error TEXT DEFAULT NULL,
        queued_at TIMESTAMP WITH TIME ZONE,
        started_at TIMESTAMP WITH TIME ZONE,
        finished_at TIMESTAMP WITH TIME ZONE,
        CONSTRAINT pk_{self._table_name} PRIMARY KEY (id)
    );
    """
    async with self._session_provider.session() as session:
        await session.execute(sa.text(query))

task_service

TaskRepository
TaskRepository(session_provider, task_model)

Bases: AbstractTaskRepository

Source code in taskiq_dashboard/infrastructure/services/task_service.py
def __init__(
    self, session_provider: AsyncPostgresSessionProvider, task_model: type[PostgresTask] | type[SqliteTask]
) -> None:
    self._session_provider = session_provider
    self.task = task_model
task instance-attribute
task = task_model
find_tasks async
find_tasks(
    name=None,
    status=None,
    sort_by=None,
    sort_order='desc',
    limit=30,
    offset=0,
)
Source code in taskiq_dashboard/infrastructure/services/task_service.py
async def find_tasks(  # noqa: PLR0913
    self,
    name: str | None = None,
    status: TaskStatus | None = None,
    sort_by: tp.Literal['started_at', 'finished_at'] | None = None,
    sort_order: tp.Literal['asc', 'desc'] = 'desc',
    limit: int = 30,
    offset: int = 0,
) -> list[Task]:
    query = sa.select(self.task)
    if name and len(name) > 1:
        search_pattern = f'%{name.strip()}%'
        query = query.where(self.task.name.ilike(search_pattern))
    if status is not None:
        query = query.where(self.task.status == status.value)
    if sort_by:
        if sort_by == 'finished_at':
            sort_column = self.task.finished_at
        elif sort_by == 'started_at':
            sort_column = self.task.started_at
        else:
            raise ValueError('Unsupported sort_by value: %s', sort_by)
        query = query.order_by(sort_column.asc()) if sort_order == 'asc' else query.order_by(sort_column.desc())
    query = query.limit(limit).offset(offset)
    async with self._session_provider.session() as session:
        result = await session.execute(query)
        task_schemas = result.scalars().all()
    return [Task.model_validate(task) for task in task_schemas]
get_task_by_id async
get_task_by_id(task_id)
Source code in taskiq_dashboard/infrastructure/services/task_service.py
async def get_task_by_id(self, task_id: uuid.UUID) -> Task | None:
    query = sa.select(self.task).where(self.task.id == task_id)
    async with self._session_provider.session() as session:
        result = await session.execute(query)
        task = result.scalar_one_or_none()

    if not task:
        return None

    return Task.model_validate(task)
create_task async
create_task(task_id, task_arguments)
Source code in taskiq_dashboard/infrastructure/services/task_service.py
async def create_task(
    self,
    task_id: uuid.UUID,
    task_arguments: QueuedTask,
) -> None:
    query = sa.insert(self.task).values(
        id=task_id,
        name=task_arguments.task_name,
        status=TaskStatus.QUEUED.value,
        worker=task_arguments.worker,
        args=task_arguments.args,
        kwargs=task_arguments.kwargs,
        labels=task_arguments.labels,
        queued_at=task_arguments.queued_at,
    )
    async with self._session_provider.session() as session:
        await session.execute(query)
update_task async
update_task(task_id, task_arguments)
Source code in taskiq_dashboard/infrastructure/services/task_service.py
async def update_task(
    self,
    task_id: uuid.UUID,
    task_arguments: StartedTask | ExecutedTask,
) -> None:
    query = sa.update(self.task).where(self.task.id == task_id)

    if isinstance(task_arguments, StartedTask):
        task_status = TaskStatus.IN_PROGRESS
        query = query.values(
            status=task_status.value,
            started_at=task_arguments.started_at,
            args=task_arguments.args,
            kwargs=task_arguments.kwargs,
            labels=task_arguments.labels,
            name=task_arguments.task_name,
            worker=task_arguments.worker,
        )
    else:
        task_status = TaskStatus.FAILURE if task_arguments.error is not None else TaskStatus.COMPLETED
        query = query.values(
            status=task_status.value,
            finished_at=task_arguments.finished_at,
            result=task_arguments.return_value.get('return_value'),
            error=task_arguments.error,
        )
    async with self._session_provider.session() as session:
        await session.execute(query)
batch_update async
batch_update(old_status, new_status)
Source code in taskiq_dashboard/infrastructure/services/task_service.py
async def batch_update(
    self,
    old_status: TaskStatus,
    new_status: TaskStatus,
) -> None:
    query = sa.update(self.task).where(self.task.status == old_status.value).values(status=new_status.value)
    async with self._session_provider.session() as session:
        await session.execute(query)
delete_task async
delete_task(task_id)
Source code in taskiq_dashboard/infrastructure/services/task_service.py
async def delete_task(
    self,
    task_id: uuid.UUID,
) -> None:
    query = sa.delete(self.task).where(self.task.id == task_id)
    async with self._session_provider.session() as session:
        await session.execute(query)

settings

PostgresSettings

Bases: BaseSettings

Настройки для подключения к PostgreSQL.

driver class-attribute instance-attribute
driver = 'postgresql+asyncpg'
host class-attribute instance-attribute
host = 'localhost'
port class-attribute instance-attribute
port = 5432
user class-attribute instance-attribute
user = 'taskiq-dashboard'
password class-attribute instance-attribute
password = SecretStr('look_in_vault')
database class-attribute instance-attribute
database = 'taskiq-dashboard'
min_pool_size class-attribute instance-attribute
min_pool_size = 1
max_pool_size class-attribute instance-attribute
max_pool_size = 5
dsn property
dsn

Возвращает строку подключения к PostgreSQL составленную из параметров класса.

Пример использования с asyncpg:

>>> import asyncpg
>>> async def create_pool(settings: PostgresSettings) -> asyncpg.pool.Pool:
>>>     return await asyncpg.create_pool(
>>>            dsn=settings.postgres.dsn.get_secret_value(),
>>>            min_size=settings.postgres.min_size,
>>>            max_size=settings.postgres.max_size,
>>>            statement_cache_size=settings.postgres.statement_cache_size,
>>>     )

Пример использования с SQLAlchemy:

>>> import sqlalchemy
>>> async def create_pool(settings: PostgresSettings) -> sqlalchemy.ext.asyncio.AsyncEngine:
>>>     return sqlalchemy.ext.asyncio.create_async_engine(
>>>         settings.postgres.dsn.get_secret_value()
>>>     )
model_config class-attribute instance-attribute
model_config = SettingsConfigDict(extra='ignore')

SqliteSettings

Bases: BaseSettings

driver class-attribute instance-attribute
driver = 'sqlite+aiosqlite'
file_path class-attribute instance-attribute
file_path = 'taskiq_dashboard.db'
dsn property
dsn
model_config class-attribute instance-attribute
model_config = SettingsConfigDict(extra='ignore')

APISettings

Bases: BaseSettings

host class-attribute instance-attribute
host = '0.0.0.0'
port class-attribute instance-attribute
port = 8000
token class-attribute instance-attribute
token = SecretStr('supersecret')
model_config class-attribute instance-attribute
model_config = SettingsConfigDict(extra='allow')

Settings

Bases: BaseSettings

api class-attribute instance-attribute
api = APISettings()
storage_type class-attribute instance-attribute
storage_type = 'sqlite'
postgres class-attribute instance-attribute
postgres = PostgresSettings()
sqlite class-attribute instance-attribute
sqlite = SqliteSettings()
model_config class-attribute instance-attribute
model_config = SettingsConfigDict(
    env_nested_delimiter='__',
    env_prefix='TASKIQ_DASHBOARD__',
    env_file=('conf/.env', getenv('ENV_FILE', '.env')),
    env_file_encoding='utf-8',
)

get_settings cached

get_settings()
Source code in taskiq_dashboard/infrastructure/settings.py
@cache
def get_settings() -> Settings:
    return Settings()

interface

application

TaskiqDashboard

TaskiqDashboard(
    api_token,
    storage_type='sqlite',
    database_dsn='sqlite+aiosqlite:///taskiq_dashboard.db',
    broker=None,
    scheduler=None,
    **uvicorn_kwargs,
)

Initialize Taskiq Dashboard application.

Parameters:

  • api_token (str) –

    Access token for securing the dashboard API.

  • storage_type (str, default: 'sqlite' ) –

    Type of the storage backend ('sqlite' or 'postgres').

  • database_dsn (str, default: 'sqlite+aiosqlite:///taskiq_dashboard.db' ) –

    URL for the database.

  • broker (AsyncBroker | None, default: None ) –

    Optional Taskiq broker instance to integrate with the dashboard.

  • scheduler (TaskiqScheduler | None, default: None ) –

    Optional Taskiq scheduler instance to integrate with the dashboard.

  • uvicorn_kwargs (Any, default: {} ) –

    Additional keyword arguments to pass to uvicorn.

Source code in taskiq_dashboard/interface/application.py
def __init__(
    self,
    api_token: str,
    storage_type: str = 'sqlite',
    database_dsn: str = 'sqlite+aiosqlite:///taskiq_dashboard.db',
    broker: AsyncBroker | None = None,
    scheduler: TaskiqScheduler | None = None,
    **uvicorn_kwargs: tp.Any,
) -> None:
    """Initialize Taskiq Dashboard application.

    Args:
        api_token: Access token for securing the dashboard API.
        storage_type: Type of the storage backend ('sqlite' or 'postgres').
        database_dsn: URL for the database.
        broker: Optional Taskiq broker instance to integrate with the dashboard.
        scheduler: Optional Taskiq scheduler instance to integrate with the dashboard.
        uvicorn_kwargs: Additional keyword arguments to pass to uvicorn.
    """
    self.settings = get_settings()
    self.settings.api.token = SecretStr(api_token)

    if storage_type == 'sqlite':
        self.settings.sqlite = SqliteSettings(dsn=database_dsn)  # type: ignore[call-arg]
    else:
        self.settings.postgres = PostgresSettings(dsn=database_dsn)  # type: ignore[call-arg]

    self.broker = broker
    self.scheduler = scheduler

    self._uvicorn_kwargs = {
        'host': 'localhost',
        'port': 8000,
        'reload': False,
        'workers': 1,
        'lifespan': 'on',
        'proxy_headers': True,
        'forwarded_allow_ips': '*',
        'timeout_keep_alive': 60,
        'access_log': True,
    }
    self._uvicorn_kwargs.update(uvicorn_kwargs or {})
settings instance-attribute
settings = get_settings()
broker instance-attribute
broker = broker
scheduler instance-attribute
scheduler = scheduler
run
run()
Source code in taskiq_dashboard/interface/application.py
def run(self) -> None:
    application = get_application()
    application.state.broker = self.broker
    application.state.scheduler = self.scheduler
    uvicorn.run(
        application,
        **self._uvicorn_kwargs,  # type: ignore[arg-type]
    )