Перейти к содержанию

taskiq_pg

aiopg

AiopgResultBackend

AiopgResultBackend(
    dsn="postgres://postgres:postgres@localhost:5432/postgres",
    keep_results=True,
    table_name="taskiq_results",
    field_for_task_id="VarChar",
    serializer=None,
    **connect_kwargs,
)

Bases: BasePostgresResultBackend

Result backend for TaskIQ based on Aiopg.

Construct new result backend.

Parameters:

  • dsn (Callable[[], str] | str | None, default: 'postgres://postgres:postgres@localhost:5432/postgres' ) –

    connection string to PostgreSQL, or callable returning one.

  • keep_results (bool, default: True ) –

    flag to not remove results from the database after reading.

  • table_name (str, default: 'taskiq_results' ) –

    name of the table to store results.

  • field_for_task_id (Literal['VarChar', 'Text', 'Uuid'], default: 'VarChar' ) –

    type of the field to store task_id.

  • serializer (TaskiqSerializer | None, default: None ) –

    serializer class to serialize/deserialize result from task.

  • connect_kwargs (Any, default: {} ) –

    additional arguments for creating connection pool.

Source code in src/taskiq_pg/_internal/result_backend.py
def __init__(
    self,
    dsn: tp.Callable[[], str] | str | None = "postgres://postgres:postgres@localhost:5432/postgres",
    keep_results: bool = True,
    table_name: str = "taskiq_results",
    field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = "VarChar",
    serializer: TaskiqSerializer | None = None,
    **connect_kwargs: tp.Any,
) -> None:
    """
    Construct new result backend.

    Args:
        dsn: connection string to PostgreSQL, or callable returning one.
        keep_results: flag to not remove results from the database after reading.
        table_name: name of the table to store results.
        field_for_task_id: type of the field to store task_id.
        serializer: serializer class to serialize/deserialize result from task.
        connect_kwargs: additional arguments for creating connection pool.

    """
    self._dsn: tp.Final = dsn
    self.keep_results: tp.Final = keep_results
    self.table_name: tp.Final = table_name
    self.field_for_task_id: tp.Final = field_for_task_id
    self.connect_kwargs: tp.Final = connect_kwargs
    self.serializer = serializer or PickleSerializer()

keep_results instance-attribute

keep_results = keep_results

table_name instance-attribute

table_name = table_name

field_for_task_id instance-attribute

field_for_task_id = field_for_task_id

connect_kwargs instance-attribute

connect_kwargs = connect_kwargs

serializer instance-attribute

serializer = serializer or PickleSerializer()

dsn property

dsn

Get the DSN string.

Returns the DSN string or None if not set.

startup async

startup()

Initialize the result backend.

Construct new connection pool and create new table for results if not exists.

Source code in src/taskiq_pg/aiopg/result_backend.py
async def startup(self) -> None:
    """
    Initialize the result backend.

    Construct new connection pool
    and create new table for results if not exists.
    """
    try:
        self._database_pool = await create_pool(
            self.dsn,
            **self.connect_kwargs,
        )

        async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
            await cursor.execute(
                queries.CREATE_TABLE_QUERY.format(
                    self.table_name,
                    self.field_for_task_id,
                ),
            )
            await cursor.execute(
                queries.CREATE_INDEX_QUERY.format(
                    self.table_name,
                    self.table_name,
                ),
            )
    except Exception as error:
        raise exceptions.DatabaseConnectionError(str(error)) from error

shutdown async

shutdown()

Close the connection pool.

Source code in src/taskiq_pg/aiopg/result_backend.py
async def shutdown(self) -> None:
    """Close the connection pool."""
    if getattr(self, "_database_pool", None) is not None:
        self._database_pool.close()

set_result async

set_result(task_id, result)

Set result to the PostgreSQL table.

Parameters:

  • task_id (Any) –

    ID of the task.

  • result (TaskiqResult[_ReturnType]) –

    result of the task.

Source code in src/taskiq_pg/aiopg/result_backend.py
async def set_result(
    self,
    task_id: tp.Any,
    result: TaskiqResult[ReturnType],
) -> None:
    """
    Set result to the PostgreSQL table.

    Args:
        task_id (Any): ID of the task.
        result (TaskiqResult[_ReturnType]):  result of the task.

    """
    dumped_result = self.serializer.dumpb(result)
    async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
        await cursor.execute(
            queries.INSERT_RESULT_QUERY.format(
                self.table_name,
            ),
            (
                task_id,
                dumped_result,
                dumped_result,
            ),
        )

is_result_ready async

is_result_ready(task_id)

Return whether the result is ready.

Parameters:

  • task_id (Any) –

    ID of the task.

Returns:

  • bool ( bool ) –

    True if the result is ready else False.

Source code in src/taskiq_pg/aiopg/result_backend.py
async def is_result_ready(
    self,
    task_id: tp.Any,
) -> bool:
    """
    Return whether the result is ready.

    Args:
        task_id (Any): ID of the task.

    Returns:
        bool: True if the result is ready else False.

    """
    async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
        await cursor.execute(
            queries.IS_RESULT_EXISTS_QUERY.format(
                self.table_name,
            ),
            (task_id,),
        )
        result = await cursor.fetchone()
        return bool(result[0]) if result else False

get_result async

get_result(task_id, with_logs=False)

Retrieve result from the task.

:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.

Source code in src/taskiq_pg/aiopg/result_backend.py
async def get_result(
    self,
    task_id: tp.Any,
    with_logs: bool = False,
) -> TaskiqResult[ReturnType]:
    """
    Retrieve result from the task.

    :param task_id: task's id.
    :param with_logs: if True it will download task's logs.
    :raises ResultIsMissingError: if there is no result when trying to get it.
    :return: TaskiqResult.
    """
    async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
        await cursor.execute(
            queries.SELECT_RESULT_QUERY.format(
                self.table_name,
            ),
            (task_id,),
        )
        result = await cursor.fetchone()

        if not result:
            msg = f"Cannot find record with task_id = {task_id} in PostgreSQL"
            raise exceptions.ResultIsMissingError(
                msg,
            )

        result_in_bytes: bytes = result[0]

        if not self.keep_results:
            await cursor.execute(
                queries.DELETE_RESULT_QUERY.format(
                    self.table_name,
                ),
                (task_id,),
            )

        taskiq_result: TaskiqResult[ReturnType] = self.serializer.loadb(
            result_in_bytes,
        )

        if not with_logs:
            taskiq_result.log = None

        return taskiq_result

AiopgScheduleSource

AiopgScheduleSource(
    broker,
    dsn="postgresql://postgres:postgres@localhost:5432/postgres",
    table_name="taskiq_schedules",
    **connect_kwargs,
)

Bases: BasePostgresScheduleSource

Schedule source that uses aiopg to store schedules in PostgreSQL.

Initialize the PostgreSQL scheduler source.

Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks across application restarts.

Parameters:

  • dsn (str | Callable[[], str], default: 'postgresql://postgres:postgres@localhost:5432/postgres' ) –

    PostgreSQL connection string

  • table_name (str, default: 'taskiq_schedules' ) –

    Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.

  • broker (AsyncBroker) –

    The TaskIQ broker instance to use for finding and managing tasks. Required if startup_schedule is provided.

  • **connect_kwargs (Any, default: {} ) –

    Additional keyword arguments passed to the database connection pool.

Source code in src/taskiq_pg/_internal/schedule_source.py
def __init__(
    self,
    broker: AsyncBroker,
    dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres",
    table_name: str = "taskiq_schedules",
    **connect_kwargs: tp.Any,
) -> None:
    """
    Initialize the PostgreSQL scheduler source.

    Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database.
    This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks
    across application restarts.

    Args:
        dsn: PostgreSQL connection string
        table_name: Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.
        broker: The TaskIQ broker instance to use for finding and managing tasks.
            Required if startup_schedule is provided.
        **connect_kwargs: Additional keyword arguments passed to the database connection pool.

    """
    self._broker: tp.Final = broker
    self._dsn: tp.Final = dsn
    self._table_name: tp.Final = table_name
    self._connect_kwargs: tp.Final = connect_kwargs

dsn property

dsn

Get the DSN string.

Returns the DSN string or None if not set.

extract_scheduled_tasks_from_broker

extract_scheduled_tasks_from_broker()

Extract schedules from tasks that were registered in broker.

Returns:

  • list[ScheduledTask]

    A list of ScheduledTask instances extracted from the task's labels.

Source code in src/taskiq_pg/_internal/schedule_source.py
def extract_scheduled_tasks_from_broker(self) -> list[ScheduledTask]:
    """
    Extract schedules from tasks that were registered in broker.

    Returns:
        A list of ScheduledTask instances extracted from the task's labels.
    """
    scheduled_tasks_for_creation: list[ScheduledTask] = []
    for task_name, task in self._broker.get_all_tasks().items():
        if "schedule" not in task.labels:
            logger.debug("Task %s has no schedule, skipping", task_name)
            continue
        if not isinstance(task.labels["schedule"], list):
            logger.warning(
                "Schedule for task %s is not a list, skipping",
                task_name,
            )
            continue
        for schedule in task.labels["schedule"]:
            try:
                new_schedule = ScheduledTask.model_validate(
                    {
                        "task_name": task_name,
                        "labels": schedule.get("labels", {}),
                        "args": schedule.get("args", []),
                        "kwargs": schedule.get("kwargs", {}),
                        "schedule_id": str(uuid.uuid4()),
                        "cron": schedule.get("cron", None),
                        "cron_offset": schedule.get("cron_offset", None),
                        "time": schedule.get("time", None),
                    },
                )
                scheduled_tasks_for_creation.append(new_schedule)
            except ValidationError:  # noqa: PERF203
                logger.exception(
                    "Schedule for task %s is not valid, skipping",
                    task_name,
                )
                continue
    return scheduled_tasks_for_creation

startup async

startup()

Initialize the schedule source.

Construct new connection pool, create new table for schedules if not exists and fill table with schedules from task labels.

Source code in src/taskiq_pg/aiopg/schedule_source.py
async def startup(self) -> None:
    """
    Initialize the schedule source.

    Construct new connection pool, create new table for schedules if not exists
    and fill table with schedules from task labels.
    """
    try:
        self._database_pool = await create_pool(
            dsn=self.dsn,
            **self._connect_kwargs,
        )
        async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
            await cursor.execute(CREATE_SCHEDULES_TABLE_QUERY.format(self._table_name))
        scheduled_tasks_for_creation = self.extract_scheduled_tasks_from_broker()
        await self._update_schedules_on_startup(scheduled_tasks_for_creation)
    except Exception as error:
        raise exceptions.DatabaseConnectionError(str(error)) from error

shutdown async

shutdown()

Close the connection pool.

Source code in src/taskiq_pg/aiopg/schedule_source.py
async def shutdown(self) -> None:
    """Close the connection pool."""
    if getattr(self, "_database_pool", None) is not None:
        self._database_pool.close()

get_schedules async

get_schedules()

Fetch schedules from the database.

Source code in src/taskiq_pg/aiopg/schedule_source.py
async def get_schedules(self) -> list["ScheduledTask"]:
    """Fetch schedules from the database."""
    async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
        await cursor.execute(
            SELECT_SCHEDULES_QUERY.format(self._table_name),
        )
        schedules, rows = [], await cursor.fetchall()
    for schedule_id, task_name, schedule in rows:
        schedules.append(
            ScheduledTask.model_validate(
                {
                    "schedule_id": str(schedule_id),
                    "task_name": task_name,
                    "labels": schedule["labels"],
                    "args": schedule["args"],
                    "kwargs": schedule["kwargs"],
                    "cron": schedule["cron"],
                    "cron_offset": schedule["cron_offset"],
                    "time": schedule["time"],
                },
            ),
        )
    return schedules

add_schedule async

add_schedule(schedule)

Add a new schedule.

Parameters:

  • schedule (ScheduledTask) –

    schedule to add.

Source code in src/taskiq_pg/aiopg/schedule_source.py
async def add_schedule(self, schedule: "ScheduledTask") -> None:
    """
    Add a new schedule.

    Args:
        schedule: schedule to add.
    """
    async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
        await cursor.execute(
            INSERT_SCHEDULE_QUERY.format(self._table_name),
            [
                schedule.schedule_id,
                schedule.task_name,
                schedule.model_dump_json(
                    exclude={"schedule_id", "task_name"},
                ),
            ],
        )

delete_schedule async

delete_schedule(schedule_id)

Method to delete schedule by id.

This is useful for schedule cancelation.

Parameters:

  • schedule_id (str) –

    id of schedule to delete.

Source code in src/taskiq_pg/aiopg/schedule_source.py
async def delete_schedule(self, schedule_id: str) -> None:
    """
    Method to delete schedule by id.

    This is useful for schedule cancelation.

    Args:
        schedule_id: id of schedule to delete.
    """
    async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
        await cursor.execute(
            DELETE_SCHEDULE_QUERY.format(self._table_name),
            [schedule_id],
        )

post_send async

post_send(task)

Delete a task after it's completed.

Source code in src/taskiq_pg/aiopg/schedule_source.py
async def post_send(self, task: ScheduledTask) -> None:
    """Delete a task after it's completed."""
    if task.time is not None:
        await self.delete_schedule(task.schedule_id)

broker

queries

CREATE_TABLE_QUERY module-attribute

CREATE_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n    task_id {} UNIQUE,\n    result BYTEA\n)\n"

CREATE_INDEX_QUERY module-attribute

CREATE_INDEX_QUERY = "\nCREATE INDEX IF NOT EXISTS {}_task_id_idx ON {} USING HASH (task_id)\n"

INSERT_RESULT_QUERY module-attribute

INSERT_RESULT_QUERY = "\nINSERT INTO {} VALUES (%s, %s)\nON CONFLICT (task_id)\nDO UPDATE\nSET result = %s\n"

IS_RESULT_EXISTS_QUERY module-attribute

IS_RESULT_EXISTS_QUERY = "\nSELECT EXISTS(\n    SELECT 1 FROM {} WHERE task_id = %s\n)\n"

SELECT_RESULT_QUERY module-attribute

SELECT_RESULT_QUERY = (
    "\nSELECT result FROM {} WHERE task_id = %s\n"
)

DELETE_RESULT_QUERY module-attribute

DELETE_RESULT_QUERY = (
    "\nDELETE FROM {} WHERE task_id = %s\n"
)

CREATE_SCHEDULES_TABLE_QUERY module-attribute

CREATE_SCHEDULES_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n    id UUID PRIMARY KEY,\n    task_name VARCHAR(100) NOT NULL,\n    schedule JSONB NOT NULL,\n    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),\n    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"

INSERT_SCHEDULE_QUERY module-attribute

INSERT_SCHEDULE_QUERY = "\nINSERT INTO {} (id, task_name, schedule)\nVALUES (%s, %s, %s)\nON CONFLICT (id) DO UPDATE\nSET task_name = EXCLUDED.task_name,\n    schedule = EXCLUDED.schedule,\n    updated_at = NOW();\n"

SELECT_SCHEDULES_QUERY module-attribute

SELECT_SCHEDULES_QUERY = (
    "\nSELECT id, task_name, schedule\nFROM {};\n"
)

DELETE_ALL_SCHEDULES_QUERY module-attribute

DELETE_ALL_SCHEDULES_QUERY = '\nDELETE FROM {};\n'

DELETE_SCHEDULE_QUERY module-attribute

DELETE_SCHEDULE_QUERY = '\nDELETE FROM {} WHERE id = %s;\n'

result_backend

AiopgResultBackend

AiopgResultBackend(
    dsn="postgres://postgres:postgres@localhost:5432/postgres",
    keep_results=True,
    table_name="taskiq_results",
    field_for_task_id="VarChar",
    serializer=None,
    **connect_kwargs,
)

Bases: BasePostgresResultBackend

Result backend for TaskIQ based on Aiopg.

Construct new result backend.

Parameters:

  • dsn (Callable[[], str] | str | None, default: 'postgres://postgres:postgres@localhost:5432/postgres' ) –

    connection string to PostgreSQL, or callable returning one.

  • keep_results (bool, default: True ) –

    flag to not remove results from the database after reading.

  • table_name (str, default: 'taskiq_results' ) –

    name of the table to store results.

  • field_for_task_id (Literal['VarChar', 'Text', 'Uuid'], default: 'VarChar' ) –

    type of the field to store task_id.

  • serializer (TaskiqSerializer | None, default: None ) –

    serializer class to serialize/deserialize result from task.

  • connect_kwargs (Any, default: {} ) –

    additional arguments for creating connection pool.

Source code in src/taskiq_pg/_internal/result_backend.py
def __init__(
    self,
    dsn: tp.Callable[[], str] | str | None = "postgres://postgres:postgres@localhost:5432/postgres",
    keep_results: bool = True,
    table_name: str = "taskiq_results",
    field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = "VarChar",
    serializer: TaskiqSerializer | None = None,
    **connect_kwargs: tp.Any,
) -> None:
    """
    Construct new result backend.

    Args:
        dsn: connection string to PostgreSQL, or callable returning one.
        keep_results: flag to not remove results from the database after reading.
        table_name: name of the table to store results.
        field_for_task_id: type of the field to store task_id.
        serializer: serializer class to serialize/deserialize result from task.
        connect_kwargs: additional arguments for creating connection pool.

    """
    self._dsn: tp.Final = dsn
    self.keep_results: tp.Final = keep_results
    self.table_name: tp.Final = table_name
    self.field_for_task_id: tp.Final = field_for_task_id
    self.connect_kwargs: tp.Final = connect_kwargs
    self.serializer = serializer or PickleSerializer()
keep_results instance-attribute
keep_results = keep_results
table_name instance-attribute
table_name = table_name
field_for_task_id instance-attribute
field_for_task_id = field_for_task_id
connect_kwargs instance-attribute
connect_kwargs = connect_kwargs
serializer instance-attribute
serializer = serializer or PickleSerializer()
dsn property
dsn

Get the DSN string.

Returns the DSN string or None if not set.

startup async
startup()

Initialize the result backend.

Construct new connection pool and create new table for results if not exists.

Source code in src/taskiq_pg/aiopg/result_backend.py
async def startup(self) -> None:
    """
    Initialize the result backend.

    Construct new connection pool
    and create new table for results if not exists.
    """
    try:
        self._database_pool = await create_pool(
            self.dsn,
            **self.connect_kwargs,
        )

        async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
            await cursor.execute(
                queries.CREATE_TABLE_QUERY.format(
                    self.table_name,
                    self.field_for_task_id,
                ),
            )
            await cursor.execute(
                queries.CREATE_INDEX_QUERY.format(
                    self.table_name,
                    self.table_name,
                ),
            )
    except Exception as error:
        raise exceptions.DatabaseConnectionError(str(error)) from error
shutdown async
shutdown()

Close the connection pool.

Source code in src/taskiq_pg/aiopg/result_backend.py
async def shutdown(self) -> None:
    """Close the connection pool."""
    if getattr(self, "_database_pool", None) is not None:
        self._database_pool.close()
set_result async
set_result(task_id, result)

Set result to the PostgreSQL table.

Parameters:

  • task_id (Any) –

    ID of the task.

  • result (TaskiqResult[_ReturnType]) –

    result of the task.

Source code in src/taskiq_pg/aiopg/result_backend.py
async def set_result(
    self,
    task_id: tp.Any,
    result: TaskiqResult[ReturnType],
) -> None:
    """
    Set result to the PostgreSQL table.

    Args:
        task_id (Any): ID of the task.
        result (TaskiqResult[_ReturnType]):  result of the task.

    """
    dumped_result = self.serializer.dumpb(result)
    async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
        await cursor.execute(
            queries.INSERT_RESULT_QUERY.format(
                self.table_name,
            ),
            (
                task_id,
                dumped_result,
                dumped_result,
            ),
        )
is_result_ready async
is_result_ready(task_id)

Return whether the result is ready.

Parameters:

  • task_id (Any) –

    ID of the task.

Returns:

  • bool ( bool ) –

    True if the result is ready else False.

Source code in src/taskiq_pg/aiopg/result_backend.py
async def is_result_ready(
    self,
    task_id: tp.Any,
) -> bool:
    """
    Return whether the result is ready.

    Args:
        task_id (Any): ID of the task.

    Returns:
        bool: True if the result is ready else False.

    """
    async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
        await cursor.execute(
            queries.IS_RESULT_EXISTS_QUERY.format(
                self.table_name,
            ),
            (task_id,),
        )
        result = await cursor.fetchone()
        return bool(result[0]) if result else False
get_result async
get_result(task_id, with_logs=False)

Retrieve result from the task.

:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.

Source code in src/taskiq_pg/aiopg/result_backend.py
async def get_result(
    self,
    task_id: tp.Any,
    with_logs: bool = False,
) -> TaskiqResult[ReturnType]:
    """
    Retrieve result from the task.

    :param task_id: task's id.
    :param with_logs: if True it will download task's logs.
    :raises ResultIsMissingError: if there is no result when trying to get it.
    :return: TaskiqResult.
    """
    async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
        await cursor.execute(
            queries.SELECT_RESULT_QUERY.format(
                self.table_name,
            ),
            (task_id,),
        )
        result = await cursor.fetchone()

        if not result:
            msg = f"Cannot find record with task_id = {task_id} in PostgreSQL"
            raise exceptions.ResultIsMissingError(
                msg,
            )

        result_in_bytes: bytes = result[0]

        if not self.keep_results:
            await cursor.execute(
                queries.DELETE_RESULT_QUERY.format(
                    self.table_name,
                ),
                (task_id,),
            )

        taskiq_result: TaskiqResult[ReturnType] = self.serializer.loadb(
            result_in_bytes,
        )

        if not with_logs:
            taskiq_result.log = None

        return taskiq_result

schedule_source

logger module-attribute

logger = getLogger('taskiq_pg.aiopg_schedule_source')

AiopgScheduleSource

AiopgScheduleSource(
    broker,
    dsn="postgresql://postgres:postgres@localhost:5432/postgres",
    table_name="taskiq_schedules",
    **connect_kwargs,
)

Bases: BasePostgresScheduleSource

Schedule source that uses aiopg to store schedules in PostgreSQL.

Initialize the PostgreSQL scheduler source.

Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks across application restarts.

Parameters:

  • dsn (str | Callable[[], str], default: 'postgresql://postgres:postgres@localhost:5432/postgres' ) –

    PostgreSQL connection string

  • table_name (str, default: 'taskiq_schedules' ) –

    Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.

  • broker (AsyncBroker) –

    The TaskIQ broker instance to use for finding and managing tasks. Required if startup_schedule is provided.

  • **connect_kwargs (Any, default: {} ) –

    Additional keyword arguments passed to the database connection pool.

Source code in src/taskiq_pg/_internal/schedule_source.py
def __init__(
    self,
    broker: AsyncBroker,
    dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres",
    table_name: str = "taskiq_schedules",
    **connect_kwargs: tp.Any,
) -> None:
    """
    Initialize the PostgreSQL scheduler source.

    Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database.
    This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks
    across application restarts.

    Args:
        dsn: PostgreSQL connection string
        table_name: Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.
        broker: The TaskIQ broker instance to use for finding and managing tasks.
            Required if startup_schedule is provided.
        **connect_kwargs: Additional keyword arguments passed to the database connection pool.

    """
    self._broker: tp.Final = broker
    self._dsn: tp.Final = dsn
    self._table_name: tp.Final = table_name
    self._connect_kwargs: tp.Final = connect_kwargs
dsn property
dsn

Get the DSN string.

Returns the DSN string or None if not set.

startup async
startup()

Initialize the schedule source.

Construct new connection pool, create new table for schedules if not exists and fill table with schedules from task labels.

Source code in src/taskiq_pg/aiopg/schedule_source.py
async def startup(self) -> None:
    """
    Initialize the schedule source.

    Construct new connection pool, create new table for schedules if not exists
    and fill table with schedules from task labels.
    """
    try:
        self._database_pool = await create_pool(
            dsn=self.dsn,
            **self._connect_kwargs,
        )
        async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
            await cursor.execute(CREATE_SCHEDULES_TABLE_QUERY.format(self._table_name))
        scheduled_tasks_for_creation = self.extract_scheduled_tasks_from_broker()
        await self._update_schedules_on_startup(scheduled_tasks_for_creation)
    except Exception as error:
        raise exceptions.DatabaseConnectionError(str(error)) from error
shutdown async
shutdown()

Close the connection pool.

Source code in src/taskiq_pg/aiopg/schedule_source.py
async def shutdown(self) -> None:
    """Close the connection pool."""
    if getattr(self, "_database_pool", None) is not None:
        self._database_pool.close()
get_schedules async
get_schedules()

Fetch schedules from the database.

Source code in src/taskiq_pg/aiopg/schedule_source.py
async def get_schedules(self) -> list["ScheduledTask"]:
    """Fetch schedules from the database."""
    async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
        await cursor.execute(
            SELECT_SCHEDULES_QUERY.format(self._table_name),
        )
        schedules, rows = [], await cursor.fetchall()
    for schedule_id, task_name, schedule in rows:
        schedules.append(
            ScheduledTask.model_validate(
                {
                    "schedule_id": str(schedule_id),
                    "task_name": task_name,
                    "labels": schedule["labels"],
                    "args": schedule["args"],
                    "kwargs": schedule["kwargs"],
                    "cron": schedule["cron"],
                    "cron_offset": schedule["cron_offset"],
                    "time": schedule["time"],
                },
            ),
        )
    return schedules
add_schedule async
add_schedule(schedule)

Add a new schedule.

Parameters:

  • schedule (ScheduledTask) –

    schedule to add.

Source code in src/taskiq_pg/aiopg/schedule_source.py
async def add_schedule(self, schedule: "ScheduledTask") -> None:
    """
    Add a new schedule.

    Args:
        schedule: schedule to add.
    """
    async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
        await cursor.execute(
            INSERT_SCHEDULE_QUERY.format(self._table_name),
            [
                schedule.schedule_id,
                schedule.task_name,
                schedule.model_dump_json(
                    exclude={"schedule_id", "task_name"},
                ),
            ],
        )
delete_schedule async
delete_schedule(schedule_id)

Method to delete schedule by id.

This is useful for schedule cancelation.

Parameters:

  • schedule_id (str) –

    id of schedule to delete.

Source code in src/taskiq_pg/aiopg/schedule_source.py
async def delete_schedule(self, schedule_id: str) -> None:
    """
    Method to delete schedule by id.

    This is useful for schedule cancelation.

    Args:
        schedule_id: id of schedule to delete.
    """
    async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
        await cursor.execute(
            DELETE_SCHEDULE_QUERY.format(self._table_name),
            [schedule_id],
        )
post_send async
post_send(task)

Delete a task after it's completed.

Source code in src/taskiq_pg/aiopg/schedule_source.py
async def post_send(self, task: ScheduledTask) -> None:
    """Delete a task after it's completed."""
    if task.time is not None:
        await self.delete_schedule(task.schedule_id)
extract_scheduled_tasks_from_broker
extract_scheduled_tasks_from_broker()

Extract schedules from tasks that were registered in broker.

Returns:

  • list[ScheduledTask]

    A list of ScheduledTask instances extracted from the task's labels.

Source code in src/taskiq_pg/_internal/schedule_source.py
def extract_scheduled_tasks_from_broker(self) -> list[ScheduledTask]:
    """
    Extract schedules from tasks that were registered in broker.

    Returns:
        A list of ScheduledTask instances extracted from the task's labels.
    """
    scheduled_tasks_for_creation: list[ScheduledTask] = []
    for task_name, task in self._broker.get_all_tasks().items():
        if "schedule" not in task.labels:
            logger.debug("Task %s has no schedule, skipping", task_name)
            continue
        if not isinstance(task.labels["schedule"], list):
            logger.warning(
                "Schedule for task %s is not a list, skipping",
                task_name,
            )
            continue
        for schedule in task.labels["schedule"]:
            try:
                new_schedule = ScheduledTask.model_validate(
                    {
                        "task_name": task_name,
                        "labels": schedule.get("labels", {}),
                        "args": schedule.get("args", []),
                        "kwargs": schedule.get("kwargs", {}),
                        "schedule_id": str(uuid.uuid4()),
                        "cron": schedule.get("cron", None),
                        "cron_offset": schedule.get("cron_offset", None),
                        "time": schedule.get("time", None),
                    },
                )
                scheduled_tasks_for_creation.append(new_schedule)
            except ValidationError:  # noqa: PERF203
                logger.exception(
                    "Schedule for task %s is not valid, skipping",
                    task_name,
                )
                continue
    return scheduled_tasks_for_creation

asyncpg

AsyncpgBroker

AsyncpgBroker(
    dsn="postgresql://postgres:postgres@localhost:5432/postgres",
    result_backend=None,
    task_id_generator=None,
    channel_name="taskiq",
    table_name="taskiq_messages",
    max_retry_attempts=5,
    read_kwargs=None,
    write_kwargs=None,
)

Bases: BasePostgresBroker

Broker that uses asyncpg as driver and PostgreSQL with LISTEN/NOTIFY mechanism.

Construct a new broker.

Parameters:

  • dsn (str | Callable[[], str], default: 'postgresql://postgres:postgres@localhost:5432/postgres' ) –

    connection string to PostgreSQL, or callable returning one.

  • result_backend (AsyncResultBackend[_T] | None, default: None ) –

    Custom result backend.

  • task_id_generator (Callable[[], str] | None, default: None ) –

    Custom task_id generator.

  • channel_name (str, default: 'taskiq' ) –

    Name of the channel to listen on.

  • table_name (str, default: 'taskiq_messages' ) –

    Name of the table to store messages.

  • max_retry_attempts (int, default: 5 ) –

    Maximum number of message processing attempts.

  • read_kwargs (dict[str, Any] | None, default: None ) –

    Additional arguments for read connection creation.

  • write_kwargs (dict[str, Any] | None, default: None ) –

    Additional arguments for write pool creation.

Source code in src/taskiq_pg/_internal/broker.py
def __init__(  # noqa: PLR0913
    self,
    dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres",
    result_backend: AsyncResultBackend[_T] | None = None,
    task_id_generator: tp.Callable[[], str] | None = None,
    channel_name: str = "taskiq",
    table_name: str = "taskiq_messages",
    max_retry_attempts: int = 5,
    read_kwargs: dict[str, tp.Any] | None = None,
    write_kwargs: dict[str, tp.Any] | None = None,
) -> None:
    """
    Construct a new broker.

    Args:
        dsn: connection string to PostgreSQL, or callable returning one.
        result_backend: Custom result backend.
        task_id_generator: Custom task_id generator.
        channel_name: Name of the channel to listen on.
        table_name: Name of the table to store messages.
        max_retry_attempts: Maximum number of message processing attempts.
        read_kwargs: Additional arguments for read connection creation.
        write_kwargs: Additional arguments for write pool creation.

    """
    super().__init__(
        result_backend=result_backend,
        task_id_generator=task_id_generator,
    )
    self._dsn: str | tp.Callable[[], str] = dsn
    self.channel_name: str = channel_name
    self.table_name: str = table_name
    self.read_kwargs: dict[str, tp.Any] = read_kwargs or {}
    self.write_kwargs: dict[str, tp.Any] = write_kwargs or {}
    self.max_retry_attempts: int = max_retry_attempts
    self._queue: asyncio.Queue[str] | None = None

channel_name instance-attribute

channel_name = channel_name

table_name instance-attribute

table_name = table_name

read_kwargs instance-attribute

read_kwargs = read_kwargs or {}

write_kwargs instance-attribute

write_kwargs = write_kwargs or {}

max_retry_attempts instance-attribute

max_retry_attempts = max_retry_attempts

dsn property

dsn

Get the DSN string.

Returns:

  • str

    A string with dsn or None if dsn isn't set yet.

startup async

startup()

Initialize the broker.

Source code in src/taskiq_pg/asyncpg/broker.py
async def startup(self) -> None:
    """Initialize the broker."""
    await super().startup()

    self._read_conn = await asyncpg.connect(self.dsn, **self.read_kwargs)
    self._write_pool = await asyncpg.create_pool(self.dsn, **self.write_kwargs)

    if self._read_conn is None:
        msg = "_read_conn not initialized"
        raise RuntimeError(msg)

    async with self._write_pool.acquire() as conn:
        await conn.execute(CREATE_MESSAGE_TABLE_QUERY.format(self.table_name))

    await self._read_conn.add_listener(self.channel_name, self._notification_handler)
    self._queue = asyncio.Queue()

shutdown async

shutdown()

Close all connections on shutdown.

Source code in src/taskiq_pg/asyncpg/broker.py
async def shutdown(self) -> None:
    """Close all connections on shutdown."""
    await super().shutdown()
    if self._read_conn is not None:
        await self._read_conn.remove_listener(self.channel_name, self._notification_handler)
        await self._read_conn.close()
    if self._write_pool is not None:
        await self._write_pool.close()

kick async

kick(message)

Send message to the channel.

Inserts the message into the database and sends a NOTIFY.

:param message: Message to send.

Source code in src/taskiq_pg/asyncpg/broker.py
async def kick(self, message: BrokerMessage) -> None:
    """
    Send message to the channel.

    Inserts the message into the database and sends a NOTIFY.

    :param message: Message to send.
    """
    if self._write_pool is None:
        msg = "Please run startup before kicking."
        raise ValueError(msg)

    async with self._write_pool.acquire() as conn:
        # Insert the message into the database
        message_inserted_id = tp.cast(
            "int",
            await conn.fetchval(
                INSERT_MESSAGE_QUERY.format(self.table_name),
                message.task_id,
                message.task_name,
                message.message.decode(),
                json.dumps(message.labels),
            ),
        )

        delay_value = message.labels.get("delay")
        if delay_value is not None:
            delay_seconds = int(delay_value)
            _ = asyncio.create_task(  # noqa: RUF006
                self._schedule_notification(message_inserted_id, delay_seconds),
            )
        else:
            # Send a NOTIFY with the message ID as payload
            _ = await conn.execute(
                f"NOTIFY {self.channel_name}, '{message_inserted_id}'",
            )

listen async

listen()

Listen to the channel.

Yields messages as they are received.

:yields: AckableMessage instances.

Source code in src/taskiq_pg/asyncpg/broker.py
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
    """
    Listen to the channel.

    Yields messages as they are received.

    :yields: AckableMessage instances.
    """
    if self._write_pool is None:
        msg = "Call startup before starting listening."
        raise ValueError(msg)
    if self._queue is None:
        msg = "Startup did not initialize the queue."
        raise ValueError(msg)

    while True:
        try:
            payload = await self._queue.get()
            message_id = int(payload)
            async with self._write_pool.acquire() as conn:
                claimed = await conn.fetchrow(
                    CLAIM_MESSAGE_QUERY.format(self.table_name),
                    message_id,
                )
            if claimed is None:
                continue
            message_str = claimed["message"]
            if not isinstance(message_str, str):
                msg = "message is not a string"
                raise TypeError(msg)
            message_data = message_str.encode()

            async def ack(*, _message_id: int = message_id) -> None:
                if self._write_pool is None:
                    msg = "Call startup before starting listening."
                    raise ValueError(msg)

                async with self._write_pool.acquire() as conn:
                    _ = await conn.execute(
                        DELETE_MESSAGE_QUERY.format(self.table_name),
                        _message_id,
                    )

            yield AckableMessage(data=message_data, ack=ack)
        except Exception:
            logger.exception("Error processing message")
            continue

AsyncpgResultBackend

AsyncpgResultBackend(
    dsn="postgres://postgres:postgres@localhost:5432/postgres",
    keep_results=True,
    table_name="taskiq_results",
    field_for_task_id="VarChar",
    serializer=None,
    **connect_kwargs,
)

Bases: BasePostgresResultBackend

Result backend for TaskIQ based on asyncpg.

Construct new result backend.

Parameters:

  • dsn (Callable[[], str] | str | None, default: 'postgres://postgres:postgres@localhost:5432/postgres' ) –

    connection string to PostgreSQL, or callable returning one.

  • keep_results (bool, default: True ) –

    flag to not remove results from the database after reading.

  • table_name (str, default: 'taskiq_results' ) –

    name of the table to store results.

  • field_for_task_id (Literal['VarChar', 'Text', 'Uuid'], default: 'VarChar' ) –

    type of the field to store task_id.

  • serializer (TaskiqSerializer | None, default: None ) –

    serializer class to serialize/deserialize result from task.

  • connect_kwargs (Any, default: {} ) –

    additional arguments for creating connection pool.

Source code in src/taskiq_pg/_internal/result_backend.py
def __init__(
    self,
    dsn: tp.Callable[[], str] | str | None = "postgres://postgres:postgres@localhost:5432/postgres",
    keep_results: bool = True,
    table_name: str = "taskiq_results",
    field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = "VarChar",
    serializer: TaskiqSerializer | None = None,
    **connect_kwargs: tp.Any,
) -> None:
    """
    Construct new result backend.

    Args:
        dsn: connection string to PostgreSQL, or callable returning one.
        keep_results: flag to not remove results from the database after reading.
        table_name: name of the table to store results.
        field_for_task_id: type of the field to store task_id.
        serializer: serializer class to serialize/deserialize result from task.
        connect_kwargs: additional arguments for creating connection pool.

    """
    self._dsn: tp.Final = dsn
    self.keep_results: tp.Final = keep_results
    self.table_name: tp.Final = table_name
    self.field_for_task_id: tp.Final = field_for_task_id
    self.connect_kwargs: tp.Final = connect_kwargs
    self.serializer = serializer or PickleSerializer()

keep_results instance-attribute

keep_results = keep_results

table_name instance-attribute

table_name = table_name

field_for_task_id instance-attribute

field_for_task_id = field_for_task_id

connect_kwargs instance-attribute

connect_kwargs = connect_kwargs

serializer instance-attribute

serializer = serializer or PickleSerializer()

dsn property

dsn

Get the DSN string.

Returns the DSN string or None if not set.

startup async

startup()

Initialize the result backend.

Construct new connection pool and create new table for results if not exists.

Source code in src/taskiq_pg/asyncpg/result_backend.py
async def startup(self) -> None:
    """
    Initialize the result backend.

    Construct new connection pool and create new table for results if not exists.
    """
    _database_pool = await asyncpg.create_pool(
        dsn=self.dsn,
        **self.connect_kwargs,
    )
    self._database_pool = _database_pool

    await self._database_pool.execute(
        CREATE_TABLE_QUERY.format(
            self.table_name,
            self.field_for_task_id,
        ),
    )
    await self._database_pool.execute(
        CREATE_INDEX_QUERY.format(
            self.table_name,
            self.table_name,
        ),
    )

shutdown async

shutdown()

Close the connection pool.

Source code in src/taskiq_pg/asyncpg/result_backend.py
async def shutdown(self) -> None:
    """Close the connection pool."""
    if getattr(self, "_database_pool", None) is not None:
        await self._database_pool.close()

set_result async

set_result(task_id, result)

Set result to the PostgreSQL table.

:param task_id: ID of the task. :param result: result of the task.

Source code in src/taskiq_pg/asyncpg/result_backend.py
async def set_result(
    self,
    task_id: str,
    result: TaskiqResult[ReturnType],
) -> None:
    """
    Set result to the PostgreSQL table.

    :param task_id: ID of the task.
    :param result: result of the task.
    """
    _ = await self._database_pool.execute(
        INSERT_RESULT_QUERY.format(
            self.table_name,
        ),
        task_id,
        self.serializer.dumpb(model_dump(result)),
    )

is_result_ready async

is_result_ready(task_id)

Returns whether the result is ready.

:param task_id: ID of the task. :returns: True if the result is ready else False.

Source code in src/taskiq_pg/asyncpg/result_backend.py
async def is_result_ready(self, task_id: str) -> bool:
    """
    Returns whether the result is ready.

    :param task_id: ID of the task.
    :returns: True if the result is ready else False.
    """
    return tp.cast(
        "bool",
        await self._database_pool.fetchval(
            IS_RESULT_EXISTS_QUERY.format(
                self.table_name,
            ),
            task_id,
        ),
    )

get_result async

get_result(task_id, with_logs=False)

Retrieve result from the task.

:param task_id: task's id. :param with_logs: if True it will download task's logs. (deprecated in taskiq) :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.

Source code in src/taskiq_pg/asyncpg/result_backend.py
async def get_result(
    self,
    task_id: str,
    with_logs: bool = False,
) -> TaskiqResult[ReturnType]:
    """
    Retrieve result from the task.

    :param task_id: task's id.
    :param with_logs: if True it will download task's logs. (deprecated in taskiq)
    :raises ResultIsMissingError: if there is no result when trying to get it.
    :return: TaskiqResult.
    """
    result_in_bytes = tp.cast(
        "bytes",
        await self._database_pool.fetchval(
            SELECT_RESULT_QUERY.format(
                self.table_name,
            ),
            task_id,
        ),
    )
    if not self.keep_results:
        await self._database_pool.execute(
            DELETE_RESULT_QUERY.format(
                self.table_name,
            ),
            task_id,
        )
    taskiq_result: tp.Final = model_validate(
        TaskiqResult[ReturnType],
        self.serializer.loadb(result_in_bytes),
    )
    if not with_logs:
        taskiq_result.log = None
    return taskiq_result

AsyncpgScheduleSource

AsyncpgScheduleSource(
    broker,
    dsn="postgresql://postgres:postgres@localhost:5432/postgres",
    table_name="taskiq_schedules",
    **connect_kwargs,
)

Bases: BasePostgresScheduleSource

Schedule source that uses asyncpg to store schedules in PostgreSQL.

Initialize the PostgreSQL scheduler source.

Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks across application restarts.

Parameters:

  • dsn (str | Callable[[], str], default: 'postgresql://postgres:postgres@localhost:5432/postgres' ) –

    PostgreSQL connection string

  • table_name (str, default: 'taskiq_schedules' ) –

    Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.

  • broker (AsyncBroker) –

    The TaskIQ broker instance to use for finding and managing tasks. Required if startup_schedule is provided.

  • **connect_kwargs (Any, default: {} ) –

    Additional keyword arguments passed to the database connection pool.

Source code in src/taskiq_pg/_internal/schedule_source.py
def __init__(
    self,
    broker: AsyncBroker,
    dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres",
    table_name: str = "taskiq_schedules",
    **connect_kwargs: tp.Any,
) -> None:
    """
    Initialize the PostgreSQL scheduler source.

    Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database.
    This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks
    across application restarts.

    Args:
        dsn: PostgreSQL connection string
        table_name: Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.
        broker: The TaskIQ broker instance to use for finding and managing tasks.
            Required if startup_schedule is provided.
        **connect_kwargs: Additional keyword arguments passed to the database connection pool.

    """
    self._broker: tp.Final = broker
    self._dsn: tp.Final = dsn
    self._table_name: tp.Final = table_name
    self._connect_kwargs: tp.Final = connect_kwargs

dsn property

dsn

Get the DSN string.

Returns the DSN string or None if not set.

extract_scheduled_tasks_from_broker

extract_scheduled_tasks_from_broker()

Extract schedules from tasks that were registered in broker.

Returns:

  • list[ScheduledTask]

    A list of ScheduledTask instances extracted from the task's labels.

Source code in src/taskiq_pg/_internal/schedule_source.py
def extract_scheduled_tasks_from_broker(self) -> list[ScheduledTask]:
    """
    Extract schedules from tasks that were registered in broker.

    Returns:
        A list of ScheduledTask instances extracted from the task's labels.
    """
    scheduled_tasks_for_creation: list[ScheduledTask] = []
    for task_name, task in self._broker.get_all_tasks().items():
        if "schedule" not in task.labels:
            logger.debug("Task %s has no schedule, skipping", task_name)
            continue
        if not isinstance(task.labels["schedule"], list):
            logger.warning(
                "Schedule for task %s is not a list, skipping",
                task_name,
            )
            continue
        for schedule in task.labels["schedule"]:
            try:
                new_schedule = ScheduledTask.model_validate(
                    {
                        "task_name": task_name,
                        "labels": schedule.get("labels", {}),
                        "args": schedule.get("args", []),
                        "kwargs": schedule.get("kwargs", {}),
                        "schedule_id": str(uuid.uuid4()),
                        "cron": schedule.get("cron", None),
                        "cron_offset": schedule.get("cron_offset", None),
                        "time": schedule.get("time", None),
                    },
                )
                scheduled_tasks_for_creation.append(new_schedule)
            except ValidationError:  # noqa: PERF203
                logger.exception(
                    "Schedule for task %s is not valid, skipping",
                    task_name,
                )
                continue
    return scheduled_tasks_for_creation

startup async

startup()

Initialize the schedule source.

Construct new connection pool, create new table for schedules if not exists and fill table with schedules from task labels.

Source code in src/taskiq_pg/asyncpg/schedule_source.py
async def startup(self) -> None:
    """
    Initialize the schedule source.

    Construct new connection pool, create new table for schedules if not exists
    and fill table with schedules from task labels.
    """
    self._database_pool = await asyncpg.create_pool(
        dsn=self.dsn,
        **self._connect_kwargs,
    )
    await self._database_pool.execute(
        CREATE_SCHEDULES_TABLE_QUERY.format(
            self._table_name,
        ),
    )
    scheduled_tasks_for_creation = self.extract_scheduled_tasks_from_broker()
    await self._update_schedules_on_startup(scheduled_tasks_for_creation)

shutdown async

shutdown()

Close the connection pool.

Source code in src/taskiq_pg/asyncpg/schedule_source.py
async def shutdown(self) -> None:
    """Close the connection pool."""
    if getattr(self, "_database_pool", None) is not None:
        await self._database_pool.close()

get_schedules async

get_schedules()

Fetch schedules from the database.

Source code in src/taskiq_pg/asyncpg/schedule_source.py
async def get_schedules(self) -> list["ScheduledTask"]:
    """Fetch schedules from the database."""
    async with self._database_pool.acquire() as conn:
        rows_with_schedules = await conn.fetch(
            SELECT_SCHEDULES_QUERY.format(self._table_name),
        )
    schedules = []
    for row in rows_with_schedules:
        schedule = json.loads(row["schedule"])
        schedules.append(
            ScheduledTask.model_validate(
                {
                    "schedule_id": str(row["id"]),
                    "task_name": row["task_name"],
                    "labels": schedule["labels"],
                    "args": schedule["args"],
                    "kwargs": schedule["kwargs"],
                    "cron": schedule["cron"],
                    "cron_offset": schedule["cron_offset"],
                    "time": schedule["time"],
                },
            ),
        )
    return schedules

add_schedule async

add_schedule(schedule)

Add a new schedule.

Parameters:

  • schedule (ScheduledTask) –

    schedule to add.

Source code in src/taskiq_pg/asyncpg/schedule_source.py
async def add_schedule(self, schedule: "ScheduledTask") -> None:
    """
    Add a new schedule.

    Args:
        schedule: schedule to add.
    """
    await self._database_pool.execute(
        INSERT_SCHEDULE_QUERY.format(self._table_name),
        str(schedule.schedule_id),
        schedule.task_name,
        schedule.model_dump_json(
            exclude={"schedule_id", "task_name"},
        ),
    )

delete_schedule async

delete_schedule(schedule_id)

Method to delete schedule by id.

This is useful for schedule cancelation.

Parameters:

  • schedule_id (str) –

    id of schedule to delete.

Source code in src/taskiq_pg/asyncpg/schedule_source.py
async def delete_schedule(self, schedule_id: str) -> None:
    """
    Method to delete schedule by id.

    This is useful for schedule cancelation.

    Args:
        schedule_id: id of schedule to delete.
    """
    await self._database_pool.execute(
        DELETE_SCHEDULE_QUERY.format(self._table_name),
        schedule_id,
    )

post_send async

post_send(task)

Delete a task after it's completed.

Source code in src/taskiq_pg/asyncpg/schedule_source.py
async def post_send(self, task: ScheduledTask) -> None:
    """Delete a task after it's completed."""
    if task.time is not None:
        await self.delete_schedule(task.schedule_id)

broker

logger module-attribute

logger = getLogger('taskiq.asyncpg_broker')

AsyncpgBroker

AsyncpgBroker(
    dsn="postgresql://postgres:postgres@localhost:5432/postgres",
    result_backend=None,
    task_id_generator=None,
    channel_name="taskiq",
    table_name="taskiq_messages",
    max_retry_attempts=5,
    read_kwargs=None,
    write_kwargs=None,
)

Bases: BasePostgresBroker

Broker that uses asyncpg as driver and PostgreSQL with LISTEN/NOTIFY mechanism.

Construct a new broker.

Parameters:

  • dsn (str | Callable[[], str], default: 'postgresql://postgres:postgres@localhost:5432/postgres' ) –

    connection string to PostgreSQL, or callable returning one.

  • result_backend (AsyncResultBackend[_T] | None, default: None ) –

    Custom result backend.

  • task_id_generator (Callable[[], str] | None, default: None ) –

    Custom task_id generator.

  • channel_name (str, default: 'taskiq' ) –

    Name of the channel to listen on.

  • table_name (str, default: 'taskiq_messages' ) –

    Name of the table to store messages.

  • max_retry_attempts (int, default: 5 ) –

    Maximum number of message processing attempts.

  • read_kwargs (dict[str, Any] | None, default: None ) –

    Additional arguments for read connection creation.

  • write_kwargs (dict[str, Any] | None, default: None ) –

    Additional arguments for write pool creation.

Source code in src/taskiq_pg/_internal/broker.py
def __init__(  # noqa: PLR0913
    self,
    dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres",
    result_backend: AsyncResultBackend[_T] | None = None,
    task_id_generator: tp.Callable[[], str] | None = None,
    channel_name: str = "taskiq",
    table_name: str = "taskiq_messages",
    max_retry_attempts: int = 5,
    read_kwargs: dict[str, tp.Any] | None = None,
    write_kwargs: dict[str, tp.Any] | None = None,
) -> None:
    """
    Construct a new broker.

    Args:
        dsn: connection string to PostgreSQL, or callable returning one.
        result_backend: Custom result backend.
        task_id_generator: Custom task_id generator.
        channel_name: Name of the channel to listen on.
        table_name: Name of the table to store messages.
        max_retry_attempts: Maximum number of message processing attempts.
        read_kwargs: Additional arguments for read connection creation.
        write_kwargs: Additional arguments for write pool creation.

    """
    super().__init__(
        result_backend=result_backend,
        task_id_generator=task_id_generator,
    )
    self._dsn: str | tp.Callable[[], str] = dsn
    self.channel_name: str = channel_name
    self.table_name: str = table_name
    self.read_kwargs: dict[str, tp.Any] = read_kwargs or {}
    self.write_kwargs: dict[str, tp.Any] = write_kwargs or {}
    self.max_retry_attempts: int = max_retry_attempts
    self._queue: asyncio.Queue[str] | None = None
channel_name instance-attribute
channel_name = channel_name
table_name instance-attribute
table_name = table_name
read_kwargs instance-attribute
read_kwargs = read_kwargs or {}
write_kwargs instance-attribute
write_kwargs = write_kwargs or {}
max_retry_attempts instance-attribute
max_retry_attempts = max_retry_attempts
dsn property
dsn

Get the DSN string.

Returns:

  • str

    A string with dsn or None if dsn isn't set yet.

startup async
startup()

Initialize the broker.

Source code in src/taskiq_pg/asyncpg/broker.py
async def startup(self) -> None:
    """Initialize the broker."""
    await super().startup()

    self._read_conn = await asyncpg.connect(self.dsn, **self.read_kwargs)
    self._write_pool = await asyncpg.create_pool(self.dsn, **self.write_kwargs)

    if self._read_conn is None:
        msg = "_read_conn not initialized"
        raise RuntimeError(msg)

    async with self._write_pool.acquire() as conn:
        await conn.execute(CREATE_MESSAGE_TABLE_QUERY.format(self.table_name))

    await self._read_conn.add_listener(self.channel_name, self._notification_handler)
    self._queue = asyncio.Queue()
shutdown async
shutdown()

Close all connections on shutdown.

Source code in src/taskiq_pg/asyncpg/broker.py
async def shutdown(self) -> None:
    """Close all connections on shutdown."""
    await super().shutdown()
    if self._read_conn is not None:
        await self._read_conn.remove_listener(self.channel_name, self._notification_handler)
        await self._read_conn.close()
    if self._write_pool is not None:
        await self._write_pool.close()
kick async
kick(message)

Send message to the channel.

Inserts the message into the database and sends a NOTIFY.

:param message: Message to send.

Source code in src/taskiq_pg/asyncpg/broker.py
async def kick(self, message: BrokerMessage) -> None:
    """
    Send message to the channel.

    Inserts the message into the database and sends a NOTIFY.

    :param message: Message to send.
    """
    if self._write_pool is None:
        msg = "Please run startup before kicking."
        raise ValueError(msg)

    async with self._write_pool.acquire() as conn:
        # Insert the message into the database
        message_inserted_id = tp.cast(
            "int",
            await conn.fetchval(
                INSERT_MESSAGE_QUERY.format(self.table_name),
                message.task_id,
                message.task_name,
                message.message.decode(),
                json.dumps(message.labels),
            ),
        )

        delay_value = message.labels.get("delay")
        if delay_value is not None:
            delay_seconds = int(delay_value)
            _ = asyncio.create_task(  # noqa: RUF006
                self._schedule_notification(message_inserted_id, delay_seconds),
            )
        else:
            # Send a NOTIFY with the message ID as payload
            _ = await conn.execute(
                f"NOTIFY {self.channel_name}, '{message_inserted_id}'",
            )
listen async
listen()

Listen to the channel.

Yields messages as they are received.

:yields: AckableMessage instances.

Source code in src/taskiq_pg/asyncpg/broker.py
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
    """
    Listen to the channel.

    Yields messages as they are received.

    :yields: AckableMessage instances.
    """
    if self._write_pool is None:
        msg = "Call startup before starting listening."
        raise ValueError(msg)
    if self._queue is None:
        msg = "Startup did not initialize the queue."
        raise ValueError(msg)

    while True:
        try:
            payload = await self._queue.get()
            message_id = int(payload)
            async with self._write_pool.acquire() as conn:
                claimed = await conn.fetchrow(
                    CLAIM_MESSAGE_QUERY.format(self.table_name),
                    message_id,
                )
            if claimed is None:
                continue
            message_str = claimed["message"]
            if not isinstance(message_str, str):
                msg = "message is not a string"
                raise TypeError(msg)
            message_data = message_str.encode()

            async def ack(*, _message_id: int = message_id) -> None:
                if self._write_pool is None:
                    msg = "Call startup before starting listening."
                    raise ValueError(msg)

                async with self._write_pool.acquire() as conn:
                    _ = await conn.execute(
                        DELETE_MESSAGE_QUERY.format(self.table_name),
                        _message_id,
                    )

            yield AckableMessage(data=message_data, ack=ack)
        except Exception:
            logger.exception("Error processing message")
            continue

queries

CREATE_TABLE_QUERY module-attribute

CREATE_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n    task_id {} UNIQUE,\n    result BYTEA\n)\n"

CREATE_INDEX_QUERY module-attribute

CREATE_INDEX_QUERY = "\nCREATE INDEX IF NOT EXISTS {}_task_id_idx ON {} USING HASH (task_id)\n"

INSERT_RESULT_QUERY module-attribute

INSERT_RESULT_QUERY = "\nINSERT INTO {} VALUES ($1, $2)\nON CONFLICT (task_id)\nDO UPDATE\nSET result = $2\n"

IS_RESULT_EXISTS_QUERY module-attribute

IS_RESULT_EXISTS_QUERY = "\nSELECT EXISTS(\n    SELECT 1 FROM {} WHERE task_id = $1\n)\n"

SELECT_RESULT_QUERY module-attribute

SELECT_RESULT_QUERY = (
    "\nSELECT result FROM {} WHERE task_id = $1\n"
)

DELETE_RESULT_QUERY module-attribute

DELETE_RESULT_QUERY = (
    "\nDELETE FROM {} WHERE task_id = $1\n"
)

CREATE_MESSAGE_TABLE_QUERY module-attribute

CREATE_MESSAGE_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n    id SERIAL PRIMARY KEY,\n    task_id VARCHAR NOT NULL,\n    task_name VARCHAR NOT NULL,\n    message TEXT NOT NULL,\n    labels JSONB NOT NULL,\n    status TEXT NOT NULL DEFAULT 'pending',\n    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"

INSERT_MESSAGE_QUERY module-attribute

INSERT_MESSAGE_QUERY = "\nINSERT INTO {} (task_id, task_name, message, labels)\nVALUES ($1, $2, $3, $4)\nRETURNING id\n"

CLAIM_MESSAGE_QUERY module-attribute

CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING id, message"

DELETE_MESSAGE_QUERY module-attribute

DELETE_MESSAGE_QUERY = 'DELETE FROM {} WHERE id = $1'

CREATE_SCHEDULES_TABLE_QUERY module-attribute

CREATE_SCHEDULES_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n    id UUID PRIMARY KEY,\n    task_name VARCHAR(100) NOT NULL,\n    schedule JSONB NOT NULL,\n    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),\n    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"

INSERT_SCHEDULE_QUERY module-attribute

INSERT_SCHEDULE_QUERY = "\nINSERT INTO {} (id, task_name, schedule)\nVALUES ($1, $2, $3)\nON CONFLICT (id) DO UPDATE\nSET task_name = EXCLUDED.task_name,\n    schedule = EXCLUDED.schedule,\n    updated_at = NOW();\n"

SELECT_SCHEDULES_QUERY module-attribute

SELECT_SCHEDULES_QUERY = (
    "\nSELECT id, task_name, schedule\nFROM {};\n"
)

DELETE_ALL_SCHEDULES_QUERY module-attribute

DELETE_ALL_SCHEDULES_QUERY = '\nDELETE FROM {};\n'

DELETE_SCHEDULE_QUERY module-attribute

DELETE_SCHEDULE_QUERY = '\nDELETE FROM {} WHERE id = $1;\n'

result_backend

AsyncpgResultBackend

AsyncpgResultBackend(
    dsn="postgres://postgres:postgres@localhost:5432/postgres",
    keep_results=True,
    table_name="taskiq_results",
    field_for_task_id="VarChar",
    serializer=None,
    **connect_kwargs,
)

Bases: BasePostgresResultBackend

Result backend for TaskIQ based on asyncpg.

Construct new result backend.

Parameters:

  • dsn (Callable[[], str] | str | None, default: 'postgres://postgres:postgres@localhost:5432/postgres' ) –

    connection string to PostgreSQL, or callable returning one.

  • keep_results (bool, default: True ) –

    flag to not remove results from the database after reading.

  • table_name (str, default: 'taskiq_results' ) –

    name of the table to store results.

  • field_for_task_id (Literal['VarChar', 'Text', 'Uuid'], default: 'VarChar' ) –

    type of the field to store task_id.

  • serializer (TaskiqSerializer | None, default: None ) –

    serializer class to serialize/deserialize result from task.

  • connect_kwargs (Any, default: {} ) –

    additional arguments for creating connection pool.

Source code in src/taskiq_pg/_internal/result_backend.py
def __init__(
    self,
    dsn: tp.Callable[[], str] | str | None = "postgres://postgres:postgres@localhost:5432/postgres",
    keep_results: bool = True,
    table_name: str = "taskiq_results",
    field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = "VarChar",
    serializer: TaskiqSerializer | None = None,
    **connect_kwargs: tp.Any,
) -> None:
    """
    Construct new result backend.

    Args:
        dsn: connection string to PostgreSQL, or callable returning one.
        keep_results: flag to not remove results from the database after reading.
        table_name: name of the table to store results.
        field_for_task_id: type of the field to store task_id.
        serializer: serializer class to serialize/deserialize result from task.
        connect_kwargs: additional arguments for creating connection pool.

    """
    self._dsn: tp.Final = dsn
    self.keep_results: tp.Final = keep_results
    self.table_name: tp.Final = table_name
    self.field_for_task_id: tp.Final = field_for_task_id
    self.connect_kwargs: tp.Final = connect_kwargs
    self.serializer = serializer or PickleSerializer()
keep_results instance-attribute
keep_results = keep_results
table_name instance-attribute
table_name = table_name
field_for_task_id instance-attribute
field_for_task_id = field_for_task_id
connect_kwargs instance-attribute
connect_kwargs = connect_kwargs
serializer instance-attribute
serializer = serializer or PickleSerializer()
dsn property
dsn

Get the DSN string.

Returns the DSN string or None if not set.

startup async
startup()

Initialize the result backend.

Construct new connection pool and create new table for results if not exists.

Source code in src/taskiq_pg/asyncpg/result_backend.py
async def startup(self) -> None:
    """
    Initialize the result backend.

    Construct new connection pool and create new table for results if not exists.
    """
    _database_pool = await asyncpg.create_pool(
        dsn=self.dsn,
        **self.connect_kwargs,
    )
    self._database_pool = _database_pool

    await self._database_pool.execute(
        CREATE_TABLE_QUERY.format(
            self.table_name,
            self.field_for_task_id,
        ),
    )
    await self._database_pool.execute(
        CREATE_INDEX_QUERY.format(
            self.table_name,
            self.table_name,
        ),
    )
shutdown async
shutdown()

Close the connection pool.

Source code in src/taskiq_pg/asyncpg/result_backend.py
async def shutdown(self) -> None:
    """Close the connection pool."""
    if getattr(self, "_database_pool", None) is not None:
        await self._database_pool.close()
set_result async
set_result(task_id, result)

Set result to the PostgreSQL table.

:param task_id: ID of the task. :param result: result of the task.

Source code in src/taskiq_pg/asyncpg/result_backend.py
async def set_result(
    self,
    task_id: str,
    result: TaskiqResult[ReturnType],
) -> None:
    """
    Set result to the PostgreSQL table.

    :param task_id: ID of the task.
    :param result: result of the task.
    """
    _ = await self._database_pool.execute(
        INSERT_RESULT_QUERY.format(
            self.table_name,
        ),
        task_id,
        self.serializer.dumpb(model_dump(result)),
    )
is_result_ready async
is_result_ready(task_id)

Returns whether the result is ready.

:param task_id: ID of the task. :returns: True if the result is ready else False.

Source code in src/taskiq_pg/asyncpg/result_backend.py
async def is_result_ready(self, task_id: str) -> bool:
    """
    Returns whether the result is ready.

    :param task_id: ID of the task.
    :returns: True if the result is ready else False.
    """
    return tp.cast(
        "bool",
        await self._database_pool.fetchval(
            IS_RESULT_EXISTS_QUERY.format(
                self.table_name,
            ),
            task_id,
        ),
    )
get_result async
get_result(task_id, with_logs=False)

Retrieve result from the task.

:param task_id: task's id. :param with_logs: if True it will download task's logs. (deprecated in taskiq) :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.

Source code in src/taskiq_pg/asyncpg/result_backend.py
async def get_result(
    self,
    task_id: str,
    with_logs: bool = False,
) -> TaskiqResult[ReturnType]:
    """
    Retrieve result from the task.

    :param task_id: task's id.
    :param with_logs: if True it will download task's logs. (deprecated in taskiq)
    :raises ResultIsMissingError: if there is no result when trying to get it.
    :return: TaskiqResult.
    """
    result_in_bytes = tp.cast(
        "bytes",
        await self._database_pool.fetchval(
            SELECT_RESULT_QUERY.format(
                self.table_name,
            ),
            task_id,
        ),
    )
    if not self.keep_results:
        await self._database_pool.execute(
            DELETE_RESULT_QUERY.format(
                self.table_name,
            ),
            task_id,
        )
    taskiq_result: tp.Final = model_validate(
        TaskiqResult[ReturnType],
        self.serializer.loadb(result_in_bytes),
    )
    if not with_logs:
        taskiq_result.log = None
    return taskiq_result

schedule_source

logger module-attribute

logger = getLogger('taskiq_pg.asyncpg_schedule_source')

AsyncpgScheduleSource

AsyncpgScheduleSource(
    broker,
    dsn="postgresql://postgres:postgres@localhost:5432/postgres",
    table_name="taskiq_schedules",
    **connect_kwargs,
)

Bases: BasePostgresScheduleSource

Schedule source that uses asyncpg to store schedules in PostgreSQL.

Initialize the PostgreSQL scheduler source.

Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks across application restarts.

Parameters:

  • dsn (str | Callable[[], str], default: 'postgresql://postgres:postgres@localhost:5432/postgres' ) –

    PostgreSQL connection string

  • table_name (str, default: 'taskiq_schedules' ) –

    Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.

  • broker (AsyncBroker) –

    The TaskIQ broker instance to use for finding and managing tasks. Required if startup_schedule is provided.

  • **connect_kwargs (Any, default: {} ) –

    Additional keyword arguments passed to the database connection pool.

Source code in src/taskiq_pg/_internal/schedule_source.py
def __init__(
    self,
    broker: AsyncBroker,
    dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres",
    table_name: str = "taskiq_schedules",
    **connect_kwargs: tp.Any,
) -> None:
    """
    Initialize the PostgreSQL scheduler source.

    Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database.
    This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks
    across application restarts.

    Args:
        dsn: PostgreSQL connection string
        table_name: Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.
        broker: The TaskIQ broker instance to use for finding and managing tasks.
            Required if startup_schedule is provided.
        **connect_kwargs: Additional keyword arguments passed to the database connection pool.

    """
    self._broker: tp.Final = broker
    self._dsn: tp.Final = dsn
    self._table_name: tp.Final = table_name
    self._connect_kwargs: tp.Final = connect_kwargs
dsn property
dsn

Get the DSN string.

Returns the DSN string or None if not set.

startup async
startup()

Initialize the schedule source.

Construct new connection pool, create new table for schedules if not exists and fill table with schedules from task labels.

Source code in src/taskiq_pg/asyncpg/schedule_source.py
async def startup(self) -> None:
    """
    Initialize the schedule source.

    Construct new connection pool, create new table for schedules if not exists
    and fill table with schedules from task labels.
    """
    self._database_pool = await asyncpg.create_pool(
        dsn=self.dsn,
        **self._connect_kwargs,
    )
    await self._database_pool.execute(
        CREATE_SCHEDULES_TABLE_QUERY.format(
            self._table_name,
        ),
    )
    scheduled_tasks_for_creation = self.extract_scheduled_tasks_from_broker()
    await self._update_schedules_on_startup(scheduled_tasks_for_creation)
shutdown async
shutdown()

Close the connection pool.

Source code in src/taskiq_pg/asyncpg/schedule_source.py
async def shutdown(self) -> None:
    """Close the connection pool."""
    if getattr(self, "_database_pool", None) is not None:
        await self._database_pool.close()
get_schedules async
get_schedules()

Fetch schedules from the database.

Source code in src/taskiq_pg/asyncpg/schedule_source.py
async def get_schedules(self) -> list["ScheduledTask"]:
    """Fetch schedules from the database."""
    async with self._database_pool.acquire() as conn:
        rows_with_schedules = await conn.fetch(
            SELECT_SCHEDULES_QUERY.format(self._table_name),
        )
    schedules = []
    for row in rows_with_schedules:
        schedule = json.loads(row["schedule"])
        schedules.append(
            ScheduledTask.model_validate(
                {
                    "schedule_id": str(row["id"]),
                    "task_name": row["task_name"],
                    "labels": schedule["labels"],
                    "args": schedule["args"],
                    "kwargs": schedule["kwargs"],
                    "cron": schedule["cron"],
                    "cron_offset": schedule["cron_offset"],
                    "time": schedule["time"],
                },
            ),
        )
    return schedules
add_schedule async
add_schedule(schedule)

Add a new schedule.

Parameters:

  • schedule (ScheduledTask) –

    schedule to add.

Source code in src/taskiq_pg/asyncpg/schedule_source.py
async def add_schedule(self, schedule: "ScheduledTask") -> None:
    """
    Add a new schedule.

    Args:
        schedule: schedule to add.
    """
    await self._database_pool.execute(
        INSERT_SCHEDULE_QUERY.format(self._table_name),
        str(schedule.schedule_id),
        schedule.task_name,
        schedule.model_dump_json(
            exclude={"schedule_id", "task_name"},
        ),
    )
delete_schedule async
delete_schedule(schedule_id)

Method to delete schedule by id.

This is useful for schedule cancelation.

Parameters:

  • schedule_id (str) –

    id of schedule to delete.

Source code in src/taskiq_pg/asyncpg/schedule_source.py
async def delete_schedule(self, schedule_id: str) -> None:
    """
    Method to delete schedule by id.

    This is useful for schedule cancelation.

    Args:
        schedule_id: id of schedule to delete.
    """
    await self._database_pool.execute(
        DELETE_SCHEDULE_QUERY.format(self._table_name),
        schedule_id,
    )
post_send async
post_send(task)

Delete a task after it's completed.

Source code in src/taskiq_pg/asyncpg/schedule_source.py
async def post_send(self, task: ScheduledTask) -> None:
    """Delete a task after it's completed."""
    if task.time is not None:
        await self.delete_schedule(task.schedule_id)
extract_scheduled_tasks_from_broker
extract_scheduled_tasks_from_broker()

Extract schedules from tasks that were registered in broker.

Returns:

  • list[ScheduledTask]

    A list of ScheduledTask instances extracted from the task's labels.

Source code in src/taskiq_pg/_internal/schedule_source.py
def extract_scheduled_tasks_from_broker(self) -> list[ScheduledTask]:
    """
    Extract schedules from tasks that were registered in broker.

    Returns:
        A list of ScheduledTask instances extracted from the task's labels.
    """
    scheduled_tasks_for_creation: list[ScheduledTask] = []
    for task_name, task in self._broker.get_all_tasks().items():
        if "schedule" not in task.labels:
            logger.debug("Task %s has no schedule, skipping", task_name)
            continue
        if not isinstance(task.labels["schedule"], list):
            logger.warning(
                "Schedule for task %s is not a list, skipping",
                task_name,
            )
            continue
        for schedule in task.labels["schedule"]:
            try:
                new_schedule = ScheduledTask.model_validate(
                    {
                        "task_name": task_name,
                        "labels": schedule.get("labels", {}),
                        "args": schedule.get("args", []),
                        "kwargs": schedule.get("kwargs", {}),
                        "schedule_id": str(uuid.uuid4()),
                        "cron": schedule.get("cron", None),
                        "cron_offset": schedule.get("cron_offset", None),
                        "time": schedule.get("time", None),
                    },
                )
                scheduled_tasks_for_creation.append(new_schedule)
            except ValidationError:  # noqa: PERF203
                logger.exception(
                    "Schedule for task %s is not valid, skipping",
                    task_name,
                )
                continue
    return scheduled_tasks_for_creation

exceptions

BaseTaskiqPgError

Bases: Exception

Base error for all possible exception in the lib.

DatabaseConnectionError

Bases: BaseTaskiqPgError

Error if cannot connect to PostgreSQL.

ResultIsMissingError

Bases: BaseTaskiqPgError

Error if cannot retrieve result from PostgreSQL.

psqlpy

PSQLPyBroker

PSQLPyBroker(
    dsn="postgresql://postgres:postgres@localhost:5432/postgres",
    result_backend=None,
    task_id_generator=None,
    channel_name="taskiq",
    table_name="taskiq_messages",
    max_retry_attempts=5,
    read_kwargs=None,
    write_kwargs=None,
)

Bases: BasePostgresBroker

Broker that uses PostgreSQL and PSQLPy with LISTEN/NOTIFY.

Construct a new broker.

Parameters:

  • dsn (str | Callable[[], str], default: 'postgresql://postgres:postgres@localhost:5432/postgres' ) –

    connection string to PostgreSQL, or callable returning one.

  • result_backend (AsyncResultBackend[_T] | None, default: None ) –

    Custom result backend.

  • task_id_generator (Callable[[], str] | None, default: None ) –

    Custom task_id generator.

  • channel_name (str, default: 'taskiq' ) –

    Name of the channel to listen on.

  • table_name (str, default: 'taskiq_messages' ) –

    Name of the table to store messages.

  • max_retry_attempts (int, default: 5 ) –

    Maximum number of message processing attempts.

  • read_kwargs (dict[str, Any] | None, default: None ) –

    Additional arguments for read connection creation.

  • write_kwargs (dict[str, Any] | None, default: None ) –

    Additional arguments for write pool creation.

Source code in src/taskiq_pg/_internal/broker.py
def __init__(  # noqa: PLR0913
    self,
    dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres",
    result_backend: AsyncResultBackend[_T] | None = None,
    task_id_generator: tp.Callable[[], str] | None = None,
    channel_name: str = "taskiq",
    table_name: str = "taskiq_messages",
    max_retry_attempts: int = 5,
    read_kwargs: dict[str, tp.Any] | None = None,
    write_kwargs: dict[str, tp.Any] | None = None,
) -> None:
    """
    Construct a new broker.

    Args:
        dsn: connection string to PostgreSQL, or callable returning one.
        result_backend: Custom result backend.
        task_id_generator: Custom task_id generator.
        channel_name: Name of the channel to listen on.
        table_name: Name of the table to store messages.
        max_retry_attempts: Maximum number of message processing attempts.
        read_kwargs: Additional arguments for read connection creation.
        write_kwargs: Additional arguments for write pool creation.

    """
    super().__init__(
        result_backend=result_backend,
        task_id_generator=task_id_generator,
    )
    self._dsn: str | tp.Callable[[], str] = dsn
    self.channel_name: str = channel_name
    self.table_name: str = table_name
    self.read_kwargs: dict[str, tp.Any] = read_kwargs or {}
    self.write_kwargs: dict[str, tp.Any] = write_kwargs or {}
    self.max_retry_attempts: int = max_retry_attempts
    self._queue: asyncio.Queue[str] | None = None

channel_name instance-attribute

channel_name = channel_name

table_name instance-attribute

table_name = table_name

read_kwargs instance-attribute

read_kwargs = read_kwargs or {}

write_kwargs instance-attribute

write_kwargs = write_kwargs or {}

max_retry_attempts instance-attribute

max_retry_attempts = max_retry_attempts

dsn property

dsn

Get the DSN string.

Returns:

  • str

    A string with dsn or None if dsn isn't set yet.

startup async

startup()

Initialize the broker.

Source code in src/taskiq_pg/psqlpy/broker.py
async def startup(self) -> None:
    """Initialize the broker."""
    await super().startup()
    self._read_conn = await psqlpy.connect(
        dsn=self.dsn,
        **self.read_kwargs,
    )
    self._write_pool = psqlpy.ConnectionPool(
        dsn=self.dsn,
        **self.write_kwargs,
    )

    # create messages table if it doesn't exist
    async with self._write_pool.acquire() as conn:
        await conn.execute(CREATE_MESSAGE_TABLE_QUERY.format(self.table_name))

    # listen to notification channel
    self._listener = self._write_pool.listener()
    await self._listener.add_callback(self.channel_name, self._notification_handler)
    await self._listener.startup()
    self._listener.listen()

    self._queue = asyncio.Queue()

shutdown async

shutdown()

Close all connections on shutdown.

Source code in src/taskiq_pg/psqlpy/broker.py
async def shutdown(self) -> None:
    """Close all connections on shutdown."""
    await super().shutdown()
    if self._read_conn is not None:
        self._read_conn.close()
    if self._write_pool is not None:
        self._write_pool.close()
    if self._listener is not None:
        self._listener.abort_listen()
        await self._listener.shutdown()

kick async

kick(message)

Send message to the channel.

Inserts the message into the database and sends a NOTIFY.

:param message: Message to send.

Source code in src/taskiq_pg/psqlpy/broker.py
async def kick(self, message: BrokerMessage) -> None:
    """
    Send message to the channel.

    Inserts the message into the database and sends a NOTIFY.

    :param message: Message to send.
    """
    async with self._write_pool.acquire() as conn:
        # insert message into db table
        message_inserted_id = tp.cast(
            "int",
            await conn.fetch_val(
                INSERT_MESSAGE_QUERY.format(self.table_name),
                [
                    message.task_id,
                    message.task_name,
                    message.message.decode(),
                    JSONB(message.labels),
                ],
            ),
        )

        delay_value = tp.cast("str | None", message.labels.get("delay"))
        if delay_value is not None:
            delay_seconds = int(delay_value)
            asyncio.create_task(  # noqa: RUF006
                self._schedule_notification(message_inserted_id, delay_seconds),
            )
        else:
            # Send NOTIFY with message ID as payload
            _ = await conn.execute(
                f"NOTIFY {self.channel_name}, '{message_inserted_id}'",
            )

listen async

listen()

Listen to the channel.

Yields messages as they are received.

:yields: AckableMessage instances.

Source code in src/taskiq_pg/psqlpy/broker.py
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
    """
    Listen to the channel.

    Yields messages as they are received.

    :yields: AckableMessage instances.
    """
    while True:
        try:
            payload = await self._queue.get()
            message_id = int(payload)  # payload is the message id
            try:
                async with self._write_pool.acquire() as conn:
                    claimed_message = await conn.fetch_row(
                        CLAIM_MESSAGE_QUERY.format(self.table_name),
                        [message_id],
                    )
            except ConnectionExecuteError:  # message was claimed by another worker
                continue
            message_row_result = tp.cast(
                "MessageRow",
                tp.cast("object", claimed_message.as_class(MessageRow)),
            )
            message_data = message_row_result.message.encode()

            async def ack(*, _message_id: int = message_id) -> None:
                async with self._write_pool.acquire() as conn:
                    _ = await conn.execute(
                        DELETE_MESSAGE_QUERY.format(self.table_name),
                        [_message_id],
                    )

            yield AckableMessage(data=message_data, ack=ack)
        except Exception:
            logger.exception("Error processing message")
            continue

PSQLPyResultBackend

PSQLPyResultBackend(
    dsn="postgres://postgres:postgres@localhost:5432/postgres",
    keep_results=True,
    table_name="taskiq_results",
    field_for_task_id="VarChar",
    serializer=None,
    **connect_kwargs,
)

Bases: BasePostgresResultBackend

Result backend for TaskIQ based on PSQLPy.

Construct new result backend.

Parameters:

  • dsn (Callable[[], str] | str | None, default: 'postgres://postgres:postgres@localhost:5432/postgres' ) –

    connection string to PostgreSQL, or callable returning one.

  • keep_results (bool, default: True ) –

    flag to not remove results from the database after reading.

  • table_name (str, default: 'taskiq_results' ) –

    name of the table to store results.

  • field_for_task_id (Literal['VarChar', 'Text', 'Uuid'], default: 'VarChar' ) –

    type of the field to store task_id.

  • serializer (TaskiqSerializer | None, default: None ) –

    serializer class to serialize/deserialize result from task.

  • connect_kwargs (Any, default: {} ) –

    additional arguments for creating connection pool.

Source code in src/taskiq_pg/_internal/result_backend.py
def __init__(
    self,
    dsn: tp.Callable[[], str] | str | None = "postgres://postgres:postgres@localhost:5432/postgres",
    keep_results: bool = True,
    table_name: str = "taskiq_results",
    field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = "VarChar",
    serializer: TaskiqSerializer | None = None,
    **connect_kwargs: tp.Any,
) -> None:
    """
    Construct new result backend.

    Args:
        dsn: connection string to PostgreSQL, or callable returning one.
        keep_results: flag to not remove results from the database after reading.
        table_name: name of the table to store results.
        field_for_task_id: type of the field to store task_id.
        serializer: serializer class to serialize/deserialize result from task.
        connect_kwargs: additional arguments for creating connection pool.

    """
    self._dsn: tp.Final = dsn
    self.keep_results: tp.Final = keep_results
    self.table_name: tp.Final = table_name
    self.field_for_task_id: tp.Final = field_for_task_id
    self.connect_kwargs: tp.Final = connect_kwargs
    self.serializer = serializer or PickleSerializer()

keep_results instance-attribute

keep_results = keep_results

table_name instance-attribute

table_name = table_name

field_for_task_id instance-attribute

field_for_task_id = field_for_task_id

connect_kwargs instance-attribute

connect_kwargs = connect_kwargs

serializer instance-attribute

serializer = serializer or PickleSerializer()

dsn property

dsn

Get the DSN string.

Returns the DSN string or None if not set.

startup async

startup()

Initialize the result backend.

Construct new connection pool and create new table for results if not exists.

Source code in src/taskiq_pg/psqlpy/result_backend.py
async def startup(self) -> None:
    """
    Initialize the result backend.

    Construct new connection pool
    and create new table for results if not exists.
    """
    self._database_pool = ConnectionPool(
        dsn=self.dsn,
        **self.connect_kwargs,
    )
    connection = await self._database_pool.connection()
    await connection.execute(
        querystring=CREATE_TABLE_QUERY.format(
            self.table_name,
            self.field_for_task_id,
        ),
    )
    await connection.execute(
        querystring=CREATE_INDEX_QUERY.format(
            self.table_name,
            self.table_name,
        ),
    )

shutdown async

shutdown()

Close the connection pool.

Source code in src/taskiq_pg/psqlpy/result_backend.py
async def shutdown(self) -> None:
    """Close the connection pool."""
    if getattr(self, "_database_pool", None) is not None:
        self._database_pool.close()

set_result async

set_result(task_id, result)

Set result to the PostgreSQL table.

:param task_id: ID of the task. :param result: result of the task.

Source code in src/taskiq_pg/psqlpy/result_backend.py
async def set_result(
    self,
    task_id: str,
    result: TaskiqResult[ReturnType],
) -> None:
    """
    Set result to the PostgreSQL table.

    :param task_id: ID of the task.
    :param result: result of the task.
    """
    connection = await self._database_pool.connection()
    await connection.execute(
        querystring=INSERT_RESULT_QUERY.format(
            self.table_name,
        ),
        parameters=[
            task_id,
            self.serializer.dumpb(model_dump(result)),
        ],
    )

is_result_ready async

is_result_ready(task_id)

Returns whether the result is ready.

:param task_id: ID of the task.

:returns: True if the result is ready else False.

Source code in src/taskiq_pg/psqlpy/result_backend.py
async def is_result_ready(self, task_id: str) -> bool:
    """
    Returns whether the result is ready.

    :param task_id: ID of the task.

    :returns: True if the result is ready else False.
    """
    connection: tp.Final = await self._database_pool.connection()
    return tp.cast(
        "bool",
        await connection.fetch_val(
            querystring=IS_RESULT_EXISTS_QUERY.format(
                self.table_name,
            ),
            parameters=[task_id],
        ),
    )

get_result async

get_result(task_id, with_logs=False)

Retrieve result from the task.

:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.

Source code in src/taskiq_pg/psqlpy/result_backend.py
async def get_result(
    self,
    task_id: str,
    with_logs: bool = False,
) -> TaskiqResult[ReturnType]:
    """
    Retrieve result from the task.

    :param task_id: task's id.
    :param with_logs: if True it will download task's logs.
    :raises ResultIsMissingError: if there is no result when trying to get it.
    :return: TaskiqResult.
    """
    connection: tp.Final = await self._database_pool.connection()
    try:
        result_in_bytes: tp.Final[bytes] = await connection.fetch_val(
            querystring=SELECT_RESULT_QUERY.format(
                self.table_name,
            ),
            parameters=[task_id],
        )
    except BaseConnectionError as exc:
        msg = f"Cannot find record with task_id = {task_id} in PostgreSQL"
        raise ResultIsMissingError(msg) from exc

    if not self.keep_results:
        await connection.execute(
            querystring=DELETE_RESULT_QUERY.format(
                self.table_name,
            ),
            parameters=[task_id],
        )

    taskiq_result: tp.Final = model_validate(
        TaskiqResult[ReturnType],
        self.serializer.loadb(result_in_bytes),
    )

    if not with_logs:
        taskiq_result.log = None

    return taskiq_result

PSQLPyScheduleSource

PSQLPyScheduleSource(
    broker,
    dsn="postgresql://postgres:postgres@localhost:5432/postgres",
    table_name="taskiq_schedules",
    **connect_kwargs,
)

Bases: BasePostgresScheduleSource

Schedule source that uses psqlpy to store schedules in PostgreSQL.

Initialize the PostgreSQL scheduler source.

Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks across application restarts.

Parameters:

  • dsn (str | Callable[[], str], default: 'postgresql://postgres:postgres@localhost:5432/postgres' ) –

    PostgreSQL connection string

  • table_name (str, default: 'taskiq_schedules' ) –

    Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.

  • broker (AsyncBroker) –

    The TaskIQ broker instance to use for finding and managing tasks. Required if startup_schedule is provided.

  • **connect_kwargs (Any, default: {} ) –

    Additional keyword arguments passed to the database connection pool.

Source code in src/taskiq_pg/_internal/schedule_source.py
def __init__(
    self,
    broker: AsyncBroker,
    dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres",
    table_name: str = "taskiq_schedules",
    **connect_kwargs: tp.Any,
) -> None:
    """
    Initialize the PostgreSQL scheduler source.

    Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database.
    This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks
    across application restarts.

    Args:
        dsn: PostgreSQL connection string
        table_name: Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.
        broker: The TaskIQ broker instance to use for finding and managing tasks.
            Required if startup_schedule is provided.
        **connect_kwargs: Additional keyword arguments passed to the database connection pool.

    """
    self._broker: tp.Final = broker
    self._dsn: tp.Final = dsn
    self._table_name: tp.Final = table_name
    self._connect_kwargs: tp.Final = connect_kwargs

dsn property

dsn

Get the DSN string.

Returns the DSN string or None if not set.

extract_scheduled_tasks_from_broker

extract_scheduled_tasks_from_broker()

Extract schedules from tasks that were registered in broker.

Returns:

  • list[ScheduledTask]

    A list of ScheduledTask instances extracted from the task's labels.

Source code in src/taskiq_pg/_internal/schedule_source.py
def extract_scheduled_tasks_from_broker(self) -> list[ScheduledTask]:
    """
    Extract schedules from tasks that were registered in broker.

    Returns:
        A list of ScheduledTask instances extracted from the task's labels.
    """
    scheduled_tasks_for_creation: list[ScheduledTask] = []
    for task_name, task in self._broker.get_all_tasks().items():
        if "schedule" not in task.labels:
            logger.debug("Task %s has no schedule, skipping", task_name)
            continue
        if not isinstance(task.labels["schedule"], list):
            logger.warning(
                "Schedule for task %s is not a list, skipping",
                task_name,
            )
            continue
        for schedule in task.labels["schedule"]:
            try:
                new_schedule = ScheduledTask.model_validate(
                    {
                        "task_name": task_name,
                        "labels": schedule.get("labels", {}),
                        "args": schedule.get("args", []),
                        "kwargs": schedule.get("kwargs", {}),
                        "schedule_id": str(uuid.uuid4()),
                        "cron": schedule.get("cron", None),
                        "cron_offset": schedule.get("cron_offset", None),
                        "time": schedule.get("time", None),
                    },
                )
                scheduled_tasks_for_creation.append(new_schedule)
            except ValidationError:  # noqa: PERF203
                logger.exception(
                    "Schedule for task %s is not valid, skipping",
                    task_name,
                )
                continue
    return scheduled_tasks_for_creation

startup async

startup()

Initialize the schedule source.

Construct new connection pool, create new table for schedules if not exists and fill table with schedules from task labels.

Source code in src/taskiq_pg/psqlpy/schedule_source.py
async def startup(self) -> None:
    """
    Initialize the schedule source.

    Construct new connection pool, create new table for schedules if not exists
    and fill table with schedules from task labels.
    """
    self._database_pool = ConnectionPool(
        dsn=self.dsn,
        **self._connect_kwargs,
    )
    async with self._database_pool.acquire() as connection:
        await connection.execute(
            CREATE_SCHEDULES_TABLE_QUERY.format(
                self._table_name,
            ),
        )
    scheduled_tasks_for_creation = self.extract_scheduled_tasks_from_broker()
    await self._update_schedules_on_startup(scheduled_tasks_for_creation)

shutdown async

shutdown()

Close the connection pool.

Source code in src/taskiq_pg/psqlpy/schedule_source.py
async def shutdown(self) -> None:
    """Close the connection pool."""
    if getattr(self, "_database_pool", None) is not None:
        self._database_pool.close()

get_schedules async

get_schedules()

Fetch schedules from the database.

Source code in src/taskiq_pg/psqlpy/schedule_source.py
async def get_schedules(self) -> list["ScheduledTask"]:
    """Fetch schedules from the database."""
    async with self._database_pool.acquire() as connection:
        rows_with_schedules = await connection.fetch(
            SELECT_SCHEDULES_QUERY.format(self._table_name),
        )
    schedules = []
    for row in rows_with_schedules.result():
        schedule = row["schedule"]
        schedules.append(
            ScheduledTask.model_validate(
                {
                    "schedule_id": str(row["id"]),
                    "task_name": row["task_name"],
                    "labels": schedule["labels"],
                    "args": schedule["args"],
                    "kwargs": schedule["kwargs"],
                    "cron": schedule["cron"],
                    "cron_offset": schedule["cron_offset"],
                    "time": schedule["time"],
                },
            ),
        )
    return schedules

add_schedule async

add_schedule(schedule)

Add a new schedule.

Parameters:

  • schedule (ScheduledTask) –

    schedule to add.

Source code in src/taskiq_pg/psqlpy/schedule_source.py
async def add_schedule(self, schedule: "ScheduledTask") -> None:
    """
    Add a new schedule.

    Args:
        schedule: schedule to add.
    """
    async with self._database_pool.acquire() as connection:
        schedule_dict = schedule.model_dump(
            mode="json",
            exclude={"schedule_id", "task_name"},
        )
        await connection.execute(
            INSERT_SCHEDULE_QUERY.format(self._table_name),
            [
                uuid.UUID(schedule.schedule_id),
                schedule.task_name,
                JSONB(schedule_dict),
            ]
        )

delete_schedule async

delete_schedule(schedule_id)

Method to delete schedule by id.

This is useful for schedule cancelation.

Parameters:

  • schedule_id (str) –

    id of schedule to delete.

Source code in src/taskiq_pg/psqlpy/schedule_source.py
async def delete_schedule(self, schedule_id: str) -> None:
    """
    Method to delete schedule by id.

    This is useful for schedule cancelation.

    Args:
        schedule_id: id of schedule to delete.
    """
    async with self._database_pool.acquire() as connection:
        await connection.execute(
            DELETE_SCHEDULE_QUERY.format(self._table_name),
            [uuid.UUID(schedule_id)],
        )

post_send async

post_send(task)

Delete a task after it's completed.

Source code in src/taskiq_pg/psqlpy/schedule_source.py
async def post_send(self, task: ScheduledTask) -> None:
    """Delete a task after it's completed."""
    if task.time is not None:
        await self.delete_schedule(task.schedule_id)

broker

logger module-attribute

logger = getLogger('taskiq.psqlpy_broker')

MessageRow dataclass

MessageRow(
    id,
    task_id,
    task_name,
    message,
    labels,
    status,
    created_at,
)

Message in db table.

id instance-attribute
id
task_id instance-attribute
task_id
task_name instance-attribute
task_name
message instance-attribute
message
labels instance-attribute
labels
status instance-attribute
status
created_at instance-attribute
created_at

PSQLPyBroker

PSQLPyBroker(
    dsn="postgresql://postgres:postgres@localhost:5432/postgres",
    result_backend=None,
    task_id_generator=None,
    channel_name="taskiq",
    table_name="taskiq_messages",
    max_retry_attempts=5,
    read_kwargs=None,
    write_kwargs=None,
)

Bases: BasePostgresBroker

Broker that uses PostgreSQL and PSQLPy with LISTEN/NOTIFY.

Construct a new broker.

Parameters:

  • dsn (str | Callable[[], str], default: 'postgresql://postgres:postgres@localhost:5432/postgres' ) –

    connection string to PostgreSQL, or callable returning one.

  • result_backend (AsyncResultBackend[_T] | None, default: None ) –

    Custom result backend.

  • task_id_generator (Callable[[], str] | None, default: None ) –

    Custom task_id generator.

  • channel_name (str, default: 'taskiq' ) –

    Name of the channel to listen on.

  • table_name (str, default: 'taskiq_messages' ) –

    Name of the table to store messages.

  • max_retry_attempts (int, default: 5 ) –

    Maximum number of message processing attempts.

  • read_kwargs (dict[str, Any] | None, default: None ) –

    Additional arguments for read connection creation.

  • write_kwargs (dict[str, Any] | None, default: None ) –

    Additional arguments for write pool creation.

Source code in src/taskiq_pg/_internal/broker.py
def __init__(  # noqa: PLR0913
    self,
    dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres",
    result_backend: AsyncResultBackend[_T] | None = None,
    task_id_generator: tp.Callable[[], str] | None = None,
    channel_name: str = "taskiq",
    table_name: str = "taskiq_messages",
    max_retry_attempts: int = 5,
    read_kwargs: dict[str, tp.Any] | None = None,
    write_kwargs: dict[str, tp.Any] | None = None,
) -> None:
    """
    Construct a new broker.

    Args:
        dsn: connection string to PostgreSQL, or callable returning one.
        result_backend: Custom result backend.
        task_id_generator: Custom task_id generator.
        channel_name: Name of the channel to listen on.
        table_name: Name of the table to store messages.
        max_retry_attempts: Maximum number of message processing attempts.
        read_kwargs: Additional arguments for read connection creation.
        write_kwargs: Additional arguments for write pool creation.

    """
    super().__init__(
        result_backend=result_backend,
        task_id_generator=task_id_generator,
    )
    self._dsn: str | tp.Callable[[], str] = dsn
    self.channel_name: str = channel_name
    self.table_name: str = table_name
    self.read_kwargs: dict[str, tp.Any] = read_kwargs or {}
    self.write_kwargs: dict[str, tp.Any] = write_kwargs or {}
    self.max_retry_attempts: int = max_retry_attempts
    self._queue: asyncio.Queue[str] | None = None
channel_name instance-attribute
channel_name = channel_name
table_name instance-attribute
table_name = table_name
read_kwargs instance-attribute
read_kwargs = read_kwargs or {}
write_kwargs instance-attribute
write_kwargs = write_kwargs or {}
max_retry_attempts instance-attribute
max_retry_attempts = max_retry_attempts
dsn property
dsn

Get the DSN string.

Returns:

  • str

    A string with dsn or None if dsn isn't set yet.

startup async
startup()

Initialize the broker.

Source code in src/taskiq_pg/psqlpy/broker.py
async def startup(self) -> None:
    """Initialize the broker."""
    await super().startup()
    self._read_conn = await psqlpy.connect(
        dsn=self.dsn,
        **self.read_kwargs,
    )
    self._write_pool = psqlpy.ConnectionPool(
        dsn=self.dsn,
        **self.write_kwargs,
    )

    # create messages table if it doesn't exist
    async with self._write_pool.acquire() as conn:
        await conn.execute(CREATE_MESSAGE_TABLE_QUERY.format(self.table_name))

    # listen to notification channel
    self._listener = self._write_pool.listener()
    await self._listener.add_callback(self.channel_name, self._notification_handler)
    await self._listener.startup()
    self._listener.listen()

    self._queue = asyncio.Queue()
shutdown async
shutdown()

Close all connections on shutdown.

Source code in src/taskiq_pg/psqlpy/broker.py
async def shutdown(self) -> None:
    """Close all connections on shutdown."""
    await super().shutdown()
    if self._read_conn is not None:
        self._read_conn.close()
    if self._write_pool is not None:
        self._write_pool.close()
    if self._listener is not None:
        self._listener.abort_listen()
        await self._listener.shutdown()
kick async
kick(message)

Send message to the channel.

Inserts the message into the database and sends a NOTIFY.

:param message: Message to send.

Source code in src/taskiq_pg/psqlpy/broker.py
async def kick(self, message: BrokerMessage) -> None:
    """
    Send message to the channel.

    Inserts the message into the database and sends a NOTIFY.

    :param message: Message to send.
    """
    async with self._write_pool.acquire() as conn:
        # insert message into db table
        message_inserted_id = tp.cast(
            "int",
            await conn.fetch_val(
                INSERT_MESSAGE_QUERY.format(self.table_name),
                [
                    message.task_id,
                    message.task_name,
                    message.message.decode(),
                    JSONB(message.labels),
                ],
            ),
        )

        delay_value = tp.cast("str | None", message.labels.get("delay"))
        if delay_value is not None:
            delay_seconds = int(delay_value)
            asyncio.create_task(  # noqa: RUF006
                self._schedule_notification(message_inserted_id, delay_seconds),
            )
        else:
            # Send NOTIFY with message ID as payload
            _ = await conn.execute(
                f"NOTIFY {self.channel_name}, '{message_inserted_id}'",
            )
listen async
listen()

Listen to the channel.

Yields messages as they are received.

:yields: AckableMessage instances.

Source code in src/taskiq_pg/psqlpy/broker.py
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
    """
    Listen to the channel.

    Yields messages as they are received.

    :yields: AckableMessage instances.
    """
    while True:
        try:
            payload = await self._queue.get()
            message_id = int(payload)  # payload is the message id
            try:
                async with self._write_pool.acquire() as conn:
                    claimed_message = await conn.fetch_row(
                        CLAIM_MESSAGE_QUERY.format(self.table_name),
                        [message_id],
                    )
            except ConnectionExecuteError:  # message was claimed by another worker
                continue
            message_row_result = tp.cast(
                "MessageRow",
                tp.cast("object", claimed_message.as_class(MessageRow)),
            )
            message_data = message_row_result.message.encode()

            async def ack(*, _message_id: int = message_id) -> None:
                async with self._write_pool.acquire() as conn:
                    _ = await conn.execute(
                        DELETE_MESSAGE_QUERY.format(self.table_name),
                        [_message_id],
                    )

            yield AckableMessage(data=message_data, ack=ack)
        except Exception:
            logger.exception("Error processing message")
            continue

queries

CREATE_TABLE_QUERY module-attribute

CREATE_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n    task_id {} UNIQUE,\n    result BYTEA\n)\n"

CREATE_INDEX_QUERY module-attribute

CREATE_INDEX_QUERY = "\nCREATE INDEX IF NOT EXISTS {}_task_id_idx ON {} USING HASH (task_id)\n"

INSERT_RESULT_QUERY module-attribute

INSERT_RESULT_QUERY = "\nINSERT INTO {} VALUES ($1, $2)\nON CONFLICT (task_id)\nDO UPDATE\nSET result = $2\n"

IS_RESULT_EXISTS_QUERY module-attribute

IS_RESULT_EXISTS_QUERY = "\nSELECT EXISTS(\n    SELECT 1 FROM {} WHERE task_id = $1\n)\n"

SELECT_RESULT_QUERY module-attribute

SELECT_RESULT_QUERY = (
    "\nSELECT result FROM {} WHERE task_id = $1\n"
)

DELETE_RESULT_QUERY module-attribute

DELETE_RESULT_QUERY = (
    "\nDELETE FROM {} WHERE task_id = $1\n"
)

CREATE_MESSAGE_TABLE_QUERY module-attribute

CREATE_MESSAGE_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n    id SERIAL PRIMARY KEY,\n    task_id VARCHAR NOT NULL,\n    task_name VARCHAR NOT NULL,\n    message TEXT NOT NULL,\n    labels JSONB NOT NULL,\n    status TEXT NOT NULL DEFAULT 'pending',\n    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"

INSERT_MESSAGE_QUERY module-attribute

INSERT_MESSAGE_QUERY = "\nINSERT INTO {} (task_id, task_name, message, labels)\nVALUES ($1, $2, $3, $4)\nRETURNING id\n"

CLAIM_MESSAGE_QUERY module-attribute

CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING *"

DELETE_MESSAGE_QUERY module-attribute

DELETE_MESSAGE_QUERY = 'DELETE FROM {} WHERE id = $1'

CREATE_SCHEDULES_TABLE_QUERY module-attribute

CREATE_SCHEDULES_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n    id UUID PRIMARY KEY,\n    task_name VARCHAR(100) NOT NULL,\n    schedule JSONB NOT NULL,\n    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),\n    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"

INSERT_SCHEDULE_QUERY module-attribute

INSERT_SCHEDULE_QUERY = "\nINSERT INTO {} (id, task_name, schedule)\nVALUES ($1, $2, $3)\nON CONFLICT (id) DO UPDATE\nSET task_name = EXCLUDED.task_name,\n    schedule = EXCLUDED.schedule,\n    updated_at = NOW();\n"

SELECT_SCHEDULES_QUERY module-attribute

SELECT_SCHEDULES_QUERY = (
    "\nSELECT id, task_name, schedule\nFROM {};\n"
)

DELETE_ALL_SCHEDULES_QUERY module-attribute

DELETE_ALL_SCHEDULES_QUERY = '\nDELETE FROM {};\n'

DELETE_SCHEDULE_QUERY module-attribute

DELETE_SCHEDULE_QUERY = '\nDELETE FROM {} WHERE id = $1;\n'

result_backend

PSQLPyResultBackend

PSQLPyResultBackend(
    dsn="postgres://postgres:postgres@localhost:5432/postgres",
    keep_results=True,
    table_name="taskiq_results",
    field_for_task_id="VarChar",
    serializer=None,
    **connect_kwargs,
)

Bases: BasePostgresResultBackend

Result backend for TaskIQ based on PSQLPy.

Construct new result backend.

Parameters:

  • dsn (Callable[[], str] | str | None, default: 'postgres://postgres:postgres@localhost:5432/postgres' ) –

    connection string to PostgreSQL, or callable returning one.

  • keep_results (bool, default: True ) –

    flag to not remove results from the database after reading.

  • table_name (str, default: 'taskiq_results' ) –

    name of the table to store results.

  • field_for_task_id (Literal['VarChar', 'Text', 'Uuid'], default: 'VarChar' ) –

    type of the field to store task_id.

  • serializer (TaskiqSerializer | None, default: None ) –

    serializer class to serialize/deserialize result from task.

  • connect_kwargs (Any, default: {} ) –

    additional arguments for creating connection pool.

Source code in src/taskiq_pg/_internal/result_backend.py
def __init__(
    self,
    dsn: tp.Callable[[], str] | str | None = "postgres://postgres:postgres@localhost:5432/postgres",
    keep_results: bool = True,
    table_name: str = "taskiq_results",
    field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = "VarChar",
    serializer: TaskiqSerializer | None = None,
    **connect_kwargs: tp.Any,
) -> None:
    """
    Construct new result backend.

    Args:
        dsn: connection string to PostgreSQL, or callable returning one.
        keep_results: flag to not remove results from the database after reading.
        table_name: name of the table to store results.
        field_for_task_id: type of the field to store task_id.
        serializer: serializer class to serialize/deserialize result from task.
        connect_kwargs: additional arguments for creating connection pool.

    """
    self._dsn: tp.Final = dsn
    self.keep_results: tp.Final = keep_results
    self.table_name: tp.Final = table_name
    self.field_for_task_id: tp.Final = field_for_task_id
    self.connect_kwargs: tp.Final = connect_kwargs
    self.serializer = serializer or PickleSerializer()
keep_results instance-attribute
keep_results = keep_results
table_name instance-attribute
table_name = table_name
field_for_task_id instance-attribute
field_for_task_id = field_for_task_id
connect_kwargs instance-attribute
connect_kwargs = connect_kwargs
serializer instance-attribute
serializer = serializer or PickleSerializer()
dsn property
dsn

Get the DSN string.

Returns the DSN string or None if not set.

startup async
startup()

Initialize the result backend.

Construct new connection pool and create new table for results if not exists.

Source code in src/taskiq_pg/psqlpy/result_backend.py
async def startup(self) -> None:
    """
    Initialize the result backend.

    Construct new connection pool
    and create new table for results if not exists.
    """
    self._database_pool = ConnectionPool(
        dsn=self.dsn,
        **self.connect_kwargs,
    )
    connection = await self._database_pool.connection()
    await connection.execute(
        querystring=CREATE_TABLE_QUERY.format(
            self.table_name,
            self.field_for_task_id,
        ),
    )
    await connection.execute(
        querystring=CREATE_INDEX_QUERY.format(
            self.table_name,
            self.table_name,
        ),
    )
shutdown async
shutdown()

Close the connection pool.

Source code in src/taskiq_pg/psqlpy/result_backend.py
async def shutdown(self) -> None:
    """Close the connection pool."""
    if getattr(self, "_database_pool", None) is not None:
        self._database_pool.close()
set_result async
set_result(task_id, result)

Set result to the PostgreSQL table.

:param task_id: ID of the task. :param result: result of the task.

Source code in src/taskiq_pg/psqlpy/result_backend.py
async def set_result(
    self,
    task_id: str,
    result: TaskiqResult[ReturnType],
) -> None:
    """
    Set result to the PostgreSQL table.

    :param task_id: ID of the task.
    :param result: result of the task.
    """
    connection = await self._database_pool.connection()
    await connection.execute(
        querystring=INSERT_RESULT_QUERY.format(
            self.table_name,
        ),
        parameters=[
            task_id,
            self.serializer.dumpb(model_dump(result)),
        ],
    )
is_result_ready async
is_result_ready(task_id)

Returns whether the result is ready.

:param task_id: ID of the task.

:returns: True if the result is ready else False.

Source code in src/taskiq_pg/psqlpy/result_backend.py
async def is_result_ready(self, task_id: str) -> bool:
    """
    Returns whether the result is ready.

    :param task_id: ID of the task.

    :returns: True if the result is ready else False.
    """
    connection: tp.Final = await self._database_pool.connection()
    return tp.cast(
        "bool",
        await connection.fetch_val(
            querystring=IS_RESULT_EXISTS_QUERY.format(
                self.table_name,
            ),
            parameters=[task_id],
        ),
    )
get_result async
get_result(task_id, with_logs=False)

Retrieve result from the task.

:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.

Source code in src/taskiq_pg/psqlpy/result_backend.py
async def get_result(
    self,
    task_id: str,
    with_logs: bool = False,
) -> TaskiqResult[ReturnType]:
    """
    Retrieve result from the task.

    :param task_id: task's id.
    :param with_logs: if True it will download task's logs.
    :raises ResultIsMissingError: if there is no result when trying to get it.
    :return: TaskiqResult.
    """
    connection: tp.Final = await self._database_pool.connection()
    try:
        result_in_bytes: tp.Final[bytes] = await connection.fetch_val(
            querystring=SELECT_RESULT_QUERY.format(
                self.table_name,
            ),
            parameters=[task_id],
        )
    except BaseConnectionError as exc:
        msg = f"Cannot find record with task_id = {task_id} in PostgreSQL"
        raise ResultIsMissingError(msg) from exc

    if not self.keep_results:
        await connection.execute(
            querystring=DELETE_RESULT_QUERY.format(
                self.table_name,
            ),
            parameters=[task_id],
        )

    taskiq_result: tp.Final = model_validate(
        TaskiqResult[ReturnType],
        self.serializer.loadb(result_in_bytes),
    )

    if not with_logs:
        taskiq_result.log = None

    return taskiq_result

schedule_source

logger module-attribute

logger = getLogger('taskiq_pg.psqlpy_schedule_source')

PSQLPyScheduleSource

PSQLPyScheduleSource(
    broker,
    dsn="postgresql://postgres:postgres@localhost:5432/postgres",
    table_name="taskiq_schedules",
    **connect_kwargs,
)

Bases: BasePostgresScheduleSource

Schedule source that uses psqlpy to store schedules in PostgreSQL.

Initialize the PostgreSQL scheduler source.

Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks across application restarts.

Parameters:

  • dsn (str | Callable[[], str], default: 'postgresql://postgres:postgres@localhost:5432/postgres' ) –

    PostgreSQL connection string

  • table_name (str, default: 'taskiq_schedules' ) –

    Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.

  • broker (AsyncBroker) –

    The TaskIQ broker instance to use for finding and managing tasks. Required if startup_schedule is provided.

  • **connect_kwargs (Any, default: {} ) –

    Additional keyword arguments passed to the database connection pool.

Source code in src/taskiq_pg/_internal/schedule_source.py
def __init__(
    self,
    broker: AsyncBroker,
    dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres",
    table_name: str = "taskiq_schedules",
    **connect_kwargs: tp.Any,
) -> None:
    """
    Initialize the PostgreSQL scheduler source.

    Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database.
    This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks
    across application restarts.

    Args:
        dsn: PostgreSQL connection string
        table_name: Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.
        broker: The TaskIQ broker instance to use for finding and managing tasks.
            Required if startup_schedule is provided.
        **connect_kwargs: Additional keyword arguments passed to the database connection pool.

    """
    self._broker: tp.Final = broker
    self._dsn: tp.Final = dsn
    self._table_name: tp.Final = table_name
    self._connect_kwargs: tp.Final = connect_kwargs
dsn property
dsn

Get the DSN string.

Returns the DSN string or None if not set.

startup async
startup()

Initialize the schedule source.

Construct new connection pool, create new table for schedules if not exists and fill table with schedules from task labels.

Source code in src/taskiq_pg/psqlpy/schedule_source.py
async def startup(self) -> None:
    """
    Initialize the schedule source.

    Construct new connection pool, create new table for schedules if not exists
    and fill table with schedules from task labels.
    """
    self._database_pool = ConnectionPool(
        dsn=self.dsn,
        **self._connect_kwargs,
    )
    async with self._database_pool.acquire() as connection:
        await connection.execute(
            CREATE_SCHEDULES_TABLE_QUERY.format(
                self._table_name,
            ),
        )
    scheduled_tasks_for_creation = self.extract_scheduled_tasks_from_broker()
    await self._update_schedules_on_startup(scheduled_tasks_for_creation)
shutdown async
shutdown()

Close the connection pool.

Source code in src/taskiq_pg/psqlpy/schedule_source.py
async def shutdown(self) -> None:
    """Close the connection pool."""
    if getattr(self, "_database_pool", None) is not None:
        self._database_pool.close()
get_schedules async
get_schedules()

Fetch schedules from the database.

Source code in src/taskiq_pg/psqlpy/schedule_source.py
async def get_schedules(self) -> list["ScheduledTask"]:
    """Fetch schedules from the database."""
    async with self._database_pool.acquire() as connection:
        rows_with_schedules = await connection.fetch(
            SELECT_SCHEDULES_QUERY.format(self._table_name),
        )
    schedules = []
    for row in rows_with_schedules.result():
        schedule = row["schedule"]
        schedules.append(
            ScheduledTask.model_validate(
                {
                    "schedule_id": str(row["id"]),
                    "task_name": row["task_name"],
                    "labels": schedule["labels"],
                    "args": schedule["args"],
                    "kwargs": schedule["kwargs"],
                    "cron": schedule["cron"],
                    "cron_offset": schedule["cron_offset"],
                    "time": schedule["time"],
                },
            ),
        )
    return schedules
add_schedule async
add_schedule(schedule)

Add a new schedule.

Parameters:

  • schedule (ScheduledTask) –

    schedule to add.

Source code in src/taskiq_pg/psqlpy/schedule_source.py
async def add_schedule(self, schedule: "ScheduledTask") -> None:
    """
    Add a new schedule.

    Args:
        schedule: schedule to add.
    """
    async with self._database_pool.acquire() as connection:
        schedule_dict = schedule.model_dump(
            mode="json",
            exclude={"schedule_id", "task_name"},
        )
        await connection.execute(
            INSERT_SCHEDULE_QUERY.format(self._table_name),
            [
                uuid.UUID(schedule.schedule_id),
                schedule.task_name,
                JSONB(schedule_dict),
            ]
        )
delete_schedule async
delete_schedule(schedule_id)

Method to delete schedule by id.

This is useful for schedule cancelation.

Parameters:

  • schedule_id (str) –

    id of schedule to delete.

Source code in src/taskiq_pg/psqlpy/schedule_source.py
async def delete_schedule(self, schedule_id: str) -> None:
    """
    Method to delete schedule by id.

    This is useful for schedule cancelation.

    Args:
        schedule_id: id of schedule to delete.
    """
    async with self._database_pool.acquire() as connection:
        await connection.execute(
            DELETE_SCHEDULE_QUERY.format(self._table_name),
            [uuid.UUID(schedule_id)],
        )
post_send async
post_send(task)

Delete a task after it's completed.

Source code in src/taskiq_pg/psqlpy/schedule_source.py
async def post_send(self, task: ScheduledTask) -> None:
    """Delete a task after it's completed."""
    if task.time is not None:
        await self.delete_schedule(task.schedule_id)
extract_scheduled_tasks_from_broker
extract_scheduled_tasks_from_broker()

Extract schedules from tasks that were registered in broker.

Returns:

  • list[ScheduledTask]

    A list of ScheduledTask instances extracted from the task's labels.

Source code in src/taskiq_pg/_internal/schedule_source.py
def extract_scheduled_tasks_from_broker(self) -> list[ScheduledTask]:
    """
    Extract schedules from tasks that were registered in broker.

    Returns:
        A list of ScheduledTask instances extracted from the task's labels.
    """
    scheduled_tasks_for_creation: list[ScheduledTask] = []
    for task_name, task in self._broker.get_all_tasks().items():
        if "schedule" not in task.labels:
            logger.debug("Task %s has no schedule, skipping", task_name)
            continue
        if not isinstance(task.labels["schedule"], list):
            logger.warning(
                "Schedule for task %s is not a list, skipping",
                task_name,
            )
            continue
        for schedule in task.labels["schedule"]:
            try:
                new_schedule = ScheduledTask.model_validate(
                    {
                        "task_name": task_name,
                        "labels": schedule.get("labels", {}),
                        "args": schedule.get("args", []),
                        "kwargs": schedule.get("kwargs", {}),
                        "schedule_id": str(uuid.uuid4()),
                        "cron": schedule.get("cron", None),
                        "cron_offset": schedule.get("cron_offset", None),
                        "time": schedule.get("time", None),
                    },
                )
                scheduled_tasks_for_creation.append(new_schedule)
            except ValidationError:  # noqa: PERF203
                logger.exception(
                    "Schedule for task %s is not valid, skipping",
                    task_name,
                )
                continue
    return scheduled_tasks_for_creation

psycopg

PsycopgBroker

PsycopgBroker(
    dsn="postgresql://postgres:postgres@localhost:5432/postgres",
    result_backend=None,
    task_id_generator=None,
    channel_name="taskiq",
    table_name="taskiq_messages",
    max_retry_attempts=5,
    read_kwargs=None,
    write_kwargs=None,
)

Bases: BasePostgresBroker

Broker that uses PostgreSQL and psycopg with LISTEN/NOTIFY.

Construct a new broker.

Parameters:

  • dsn (str | Callable[[], str], default: 'postgresql://postgres:postgres@localhost:5432/postgres' ) –

    connection string to PostgreSQL, or callable returning one.

  • result_backend (AsyncResultBackend[_T] | None, default: None ) –

    Custom result backend.

  • task_id_generator (Callable[[], str] | None, default: None ) –

    Custom task_id generator.

  • channel_name (str, default: 'taskiq' ) –

    Name of the channel to listen on.

  • table_name (str, default: 'taskiq_messages' ) –

    Name of the table to store messages.

  • max_retry_attempts (int, default: 5 ) –

    Maximum number of message processing attempts.

  • read_kwargs (dict[str, Any] | None, default: None ) –

    Additional arguments for read connection creation.

  • write_kwargs (dict[str, Any] | None, default: None ) –

    Additional arguments for write pool creation.

Source code in src/taskiq_pg/_internal/broker.py
def __init__(  # noqa: PLR0913
    self,
    dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres",
    result_backend: AsyncResultBackend[_T] | None = None,
    task_id_generator: tp.Callable[[], str] | None = None,
    channel_name: str = "taskiq",
    table_name: str = "taskiq_messages",
    max_retry_attempts: int = 5,
    read_kwargs: dict[str, tp.Any] | None = None,
    write_kwargs: dict[str, tp.Any] | None = None,
) -> None:
    """
    Construct a new broker.

    Args:
        dsn: connection string to PostgreSQL, or callable returning one.
        result_backend: Custom result backend.
        task_id_generator: Custom task_id generator.
        channel_name: Name of the channel to listen on.
        table_name: Name of the table to store messages.
        max_retry_attempts: Maximum number of message processing attempts.
        read_kwargs: Additional arguments for read connection creation.
        write_kwargs: Additional arguments for write pool creation.

    """
    super().__init__(
        result_backend=result_backend,
        task_id_generator=task_id_generator,
    )
    self._dsn: str | tp.Callable[[], str] = dsn
    self.channel_name: str = channel_name
    self.table_name: str = table_name
    self.read_kwargs: dict[str, tp.Any] = read_kwargs or {}
    self.write_kwargs: dict[str, tp.Any] = write_kwargs or {}
    self.max_retry_attempts: int = max_retry_attempts
    self._queue: asyncio.Queue[str] | None = None

channel_name instance-attribute

channel_name = channel_name

table_name instance-attribute

table_name = table_name

read_kwargs instance-attribute

read_kwargs = read_kwargs or {}

write_kwargs instance-attribute

write_kwargs = write_kwargs or {}

max_retry_attempts instance-attribute

max_retry_attempts = max_retry_attempts

dsn property

dsn

Get the DSN string.

Returns:

  • str

    A string with dsn or None if dsn isn't set yet.

startup async

startup()

Initialize the broker.

Source code in src/taskiq_pg/psycopg/broker.py
async def startup(self) -> None:
    """Initialize the broker."""
    await super().startup()
    self._read_conn = await AsyncConnection.connect(
        conninfo=self.dsn,
        **self.read_kwargs,
        autocommit=True,
        cursor_factory=AsyncRawCursor,
    )
    self._write_pool = AsyncConnectionPool(
        conninfo=self.dsn if self.dsn is not None else "",
        open=False,
        **self.write_kwargs,
    )
    await self._write_pool.open()

    async with self._write_pool.connection() as connection, connection.cursor() as cursor:
        await cursor.execute(sql.SQL(CREATE_MESSAGE_TABLE_QUERY).format(sql.Identifier(self.table_name)))

    await self._read_conn.execute(sql.SQL("LISTEN {}").format(sql.Identifier(self.channel_name)))
    self._notifies_iter = self._read_conn.notifies()

shutdown async

shutdown()

Close all connections on shutdown.

Source code in src/taskiq_pg/psycopg/broker.py
async def shutdown(self) -> None:
    """Close all connections on shutdown."""
    await super().shutdown()
    if self._notifies_iter is not None:
        with suppress(RuntimeError):  # RuntimeError: aclose(): asynchronous generator is already running
            await self._notifies_iter.aclose()  # type: ignore[attr-defined]
    if self._read_conn is not None:
        await self._read_conn.notifies().aclose()
        await self._read_conn.close()
    if self._write_pool is not None:
        await self._write_pool.close()

kick async

kick(message)

Send message to the channel.

Inserts the message into the database and sends a NOTIFY.

:param message: Message to send.

Source code in src/taskiq_pg/psycopg/broker.py
async def kick(self, message: BrokerMessage) -> None:
    """
    Send message to the channel.

    Inserts the message into the database and sends a NOTIFY.

    :param message: Message to send.
    """
    async with self._write_pool.connection() as connection, connection.cursor() as cursor:
        # insert message into db table
        await cursor.execute(
            sql.SQL(INSERT_MESSAGE_QUERY).format(sql.Identifier(self.table_name)),
            [
                message.task_id,
                message.task_name,
                message.message.decode(),
                json.dumps(message.labels),
            ],
        )
        row = await cursor.fetchone()
        if row is None:
            msg = "failed to insert message"
            raise RuntimeError(msg)
        message_inserted_id = int(row[0])

        delay_value = tp.cast("str | None", message.labels.get("delay"))
        if delay_value is not None:
            delay_seconds = int(delay_value)
            await self._schedule_notification(message_inserted_id, delay_seconds)
        else:
            # Send NOTIFY with message ID as payload
            await cursor.execute(
                sql.SQL("NOTIFY {}, {}").format(
                    sql.Identifier(self.channel_name),
                    sql.Literal(str(message_inserted_id)),
                ),
            )

listen async

listen()

Listen to the channel.

Yields messages as they are received.

:yields: AckableMessage instances.

Source code in src/taskiq_pg/psycopg/broker.py
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
    """
    Listen to the channel.

    Yields messages as they are received.

    :yields: AckableMessage instances.
    """
    while True:
        async for message_id_str in self._listen_context():
            message_id = int(message_id_str)  # payload is the message id
            try:
                async with self._write_pool.connection() as connection, connection.cursor() as cursor:
                    await cursor.execute(
                        sql.SQL(CLAIM_MESSAGE_QUERY).format(sql.Identifier(self.table_name)),
                        [message_id],
                    )
                    claimed_message = await cursor.fetchone()
                    if claimed_message is None:
                        continue
            except psycopg.OperationalError:  # message was claimed by another worker
                continue
            message_str = claimed_message[3]
            if not isinstance(message_str, str):
                msg = "Message is not a string"
                raise TypeError(msg)
            message_data = message_str.encode()

            async def ack(*, _message_id: int = message_id) -> None:
                async with self._write_pool.connection() as connection, connection.cursor() as cursor:
                    await cursor.execute(
                        sql.SQL(DELETE_MESSAGE_QUERY).format(sql.Identifier(self.table_name)),
                        [_message_id],
                    )

            yield AckableMessage(data=message_data, ack=ack)

PsycopgResultBackend

PsycopgResultBackend(
    dsn="postgres://postgres:postgres@localhost:5432/postgres",
    keep_results=True,
    table_name="taskiq_results",
    field_for_task_id="VarChar",
    serializer=None,
    **connect_kwargs,
)

Bases: BasePostgresResultBackend

Result backend for TaskIQ based on psycopg.

Construct new result backend.

Parameters:

  • dsn (Callable[[], str] | str | None, default: 'postgres://postgres:postgres@localhost:5432/postgres' ) –

    connection string to PostgreSQL, or callable returning one.

  • keep_results (bool, default: True ) –

    flag to not remove results from the database after reading.

  • table_name (str, default: 'taskiq_results' ) –

    name of the table to store results.

  • field_for_task_id (Literal['VarChar', 'Text', 'Uuid'], default: 'VarChar' ) –

    type of the field to store task_id.

  • serializer (TaskiqSerializer | None, default: None ) –

    serializer class to serialize/deserialize result from task.

  • connect_kwargs (Any, default: {} ) –

    additional arguments for creating connection pool.

Source code in src/taskiq_pg/_internal/result_backend.py
def __init__(
    self,
    dsn: tp.Callable[[], str] | str | None = "postgres://postgres:postgres@localhost:5432/postgres",
    keep_results: bool = True,
    table_name: str = "taskiq_results",
    field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = "VarChar",
    serializer: TaskiqSerializer | None = None,
    **connect_kwargs: tp.Any,
) -> None:
    """
    Construct new result backend.

    Args:
        dsn: connection string to PostgreSQL, or callable returning one.
        keep_results: flag to not remove results from the database after reading.
        table_name: name of the table to store results.
        field_for_task_id: type of the field to store task_id.
        serializer: serializer class to serialize/deserialize result from task.
        connect_kwargs: additional arguments for creating connection pool.

    """
    self._dsn: tp.Final = dsn
    self.keep_results: tp.Final = keep_results
    self.table_name: tp.Final = table_name
    self.field_for_task_id: tp.Final = field_for_task_id
    self.connect_kwargs: tp.Final = connect_kwargs
    self.serializer = serializer or PickleSerializer()

keep_results instance-attribute

keep_results = keep_results

table_name instance-attribute

table_name = table_name

field_for_task_id instance-attribute

field_for_task_id = field_for_task_id

connect_kwargs instance-attribute

connect_kwargs = connect_kwargs

serializer instance-attribute

serializer = serializer or PickleSerializer()

dsn property

dsn

Get the DSN string.

Returns the DSN string or None if not set.

startup async

startup()

Initialize the result backend.

Construct new connection pool and create new table for results if not exists.

Source code in src/taskiq_pg/psycopg/result_backend.py
async def startup(self) -> None:
    """
    Initialize the result backend.

    Construct new connection pool
    and create new table for results if not exists.
    """
    self._database_pool = AsyncConnectionPool(
        conninfo=self.dsn if self.dsn is not None else "",
        open=False,
        **self.connect_kwargs,
    )
    await self._database_pool.open()
    async with self._database_pool.connection() as connection, connection.cursor() as cursor:
        await cursor.execute(
            query=sql.SQL(CREATE_TABLE_QUERY).format(
                sql.Identifier(self.table_name),
                sql.SQL(self.field_for_task_id),
            ),
        )
        await cursor.execute(
            query=sql.SQL(CREATE_INDEX_QUERY).format(
                sql.Identifier(self.table_name + "_task_id_idx"),
                sql.Identifier(self.table_name),
            ),
        )

shutdown async

shutdown()

Close the connection pool.

Source code in src/taskiq_pg/psycopg/result_backend.py
async def shutdown(self) -> None:
    """Close the connection pool."""
    if getattr(self, "_database_pool", None) is not None:
        await self._database_pool.close()

set_result async

set_result(task_id, result)

Set result to the PostgreSQL table.

:param task_id: ID of the task. :param result: result of the task.

Source code in src/taskiq_pg/psycopg/result_backend.py
async def set_result(
    self,
    task_id: str,
    result: TaskiqResult[ReturnType],
) -> None:
    """
    Set result to the PostgreSQL table.

    :param task_id: ID of the task.
    :param result: result of the task.
    """
    async with self._database_pool.connection() as connection, connection.cursor() as cursor:
        await cursor.execute(
            query=sql.SQL(INSERT_RESULT_QUERY).format(
                sql.Identifier(self.table_name),
            ),
            params=[
                task_id,
                self.serializer.dumpb(model_dump(result)),
            ],
        )

is_result_ready async

is_result_ready(task_id)

Returns whether the result is ready.

:param task_id: ID of the task.

:returns: True if the result is ready else False.

Source code in src/taskiq_pg/psycopg/result_backend.py
async def is_result_ready(self, task_id: str) -> bool:
    """
    Returns whether the result is ready.

    :param task_id: ID of the task.

    :returns: True if the result is ready else False.
    """
    async with self._database_pool.connection() as connection, connection.cursor() as cursor:
        execute_result = await cursor.execute(
            query=sql.SQL(IS_RESULT_EXISTS_QUERY).format(
                sql.Identifier(self.table_name),
            ),
            params=[task_id],
        )
        row = await execute_result.fetchone()
        return bool(row and row[0])

get_result async

get_result(task_id, with_logs=False)

Retrieve result from the task.

:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.

Source code in src/taskiq_pg/psycopg/result_backend.py
async def get_result(
    self,
    task_id: str,
    with_logs: bool = False,
) -> TaskiqResult[ReturnType]:
    """
    Retrieve result from the task.

    :param task_id: task's id.
    :param with_logs: if True it will download task's logs.
    :raises ResultIsMissingError: if there is no result when trying to get it.
    :return: TaskiqResult.
    """
    async with self._database_pool.connection() as connection, connection.cursor() as cursor:
        execute_result = await cursor.execute(
            query=sql.SQL(SELECT_RESULT_QUERY).format(
                sql.Identifier(self.table_name),
            ),
            params=[task_id],
        )
        result = await execute_result.fetchone()
        if result is None:
            msg = f"Cannot find record with task_id = {task_id} in PostgreSQL"
            raise ResultIsMissingError(msg)
        result_in_bytes: tp.Final = result[0]

        if not self.keep_results:
            await cursor.execute(
                query=sql.SQL(DELETE_RESULT_QUERY).format(
                    sql.Identifier(self.table_name),
                ),
                params=[task_id],
            )

        taskiq_result: tp.Final = model_validate(
            TaskiqResult[ReturnType],
            self.serializer.loadb(result_in_bytes),
        )

        if not with_logs:
            taskiq_result.log = None

        return taskiq_result

PsycopgScheduleSource

PsycopgScheduleSource(
    broker,
    dsn="postgresql://postgres:postgres@localhost:5432/postgres",
    table_name="taskiq_schedules",
    **connect_kwargs,
)

Bases: BasePostgresScheduleSource

Schedule source that uses psycopg to store schedules in PostgreSQL.

Initialize the PostgreSQL scheduler source.

Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks across application restarts.

Parameters:

  • dsn (str | Callable[[], str], default: 'postgresql://postgres:postgres@localhost:5432/postgres' ) –

    PostgreSQL connection string

  • table_name (str, default: 'taskiq_schedules' ) –

    Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.

  • broker (AsyncBroker) –

    The TaskIQ broker instance to use for finding and managing tasks. Required if startup_schedule is provided.

  • **connect_kwargs (Any, default: {} ) –

    Additional keyword arguments passed to the database connection pool.

Source code in src/taskiq_pg/_internal/schedule_source.py
def __init__(
    self,
    broker: AsyncBroker,
    dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres",
    table_name: str = "taskiq_schedules",
    **connect_kwargs: tp.Any,
) -> None:
    """
    Initialize the PostgreSQL scheduler source.

    Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database.
    This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks
    across application restarts.

    Args:
        dsn: PostgreSQL connection string
        table_name: Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.
        broker: The TaskIQ broker instance to use for finding and managing tasks.
            Required if startup_schedule is provided.
        **connect_kwargs: Additional keyword arguments passed to the database connection pool.

    """
    self._broker: tp.Final = broker
    self._dsn: tp.Final = dsn
    self._table_name: tp.Final = table_name
    self._connect_kwargs: tp.Final = connect_kwargs

dsn property

dsn

Get the DSN string.

Returns the DSN string or None if not set.

extract_scheduled_tasks_from_broker

extract_scheduled_tasks_from_broker()

Extract schedules from tasks that were registered in broker.

Returns:

  • list[ScheduledTask]

    A list of ScheduledTask instances extracted from the task's labels.

Source code in src/taskiq_pg/_internal/schedule_source.py
def extract_scheduled_tasks_from_broker(self) -> list[ScheduledTask]:
    """
    Extract schedules from tasks that were registered in broker.

    Returns:
        A list of ScheduledTask instances extracted from the task's labels.
    """
    scheduled_tasks_for_creation: list[ScheduledTask] = []
    for task_name, task in self._broker.get_all_tasks().items():
        if "schedule" not in task.labels:
            logger.debug("Task %s has no schedule, skipping", task_name)
            continue
        if not isinstance(task.labels["schedule"], list):
            logger.warning(
                "Schedule for task %s is not a list, skipping",
                task_name,
            )
            continue
        for schedule in task.labels["schedule"]:
            try:
                new_schedule = ScheduledTask.model_validate(
                    {
                        "task_name": task_name,
                        "labels": schedule.get("labels", {}),
                        "args": schedule.get("args", []),
                        "kwargs": schedule.get("kwargs", {}),
                        "schedule_id": str(uuid.uuid4()),
                        "cron": schedule.get("cron", None),
                        "cron_offset": schedule.get("cron_offset", None),
                        "time": schedule.get("time", None),
                    },
                )
                scheduled_tasks_for_creation.append(new_schedule)
            except ValidationError:  # noqa: PERF203
                logger.exception(
                    "Schedule for task %s is not valid, skipping",
                    task_name,
                )
                continue
    return scheduled_tasks_for_creation

startup async

startup()

Initialize the schedule source.

Construct new connection pool, create new table for schedules if not exists and fill table with schedules from task labels.

Source code in src/taskiq_pg/psycopg/schedule_source.py
async def startup(self) -> None:
    """
    Initialize the schedule source.

    Construct new connection pool, create new table for schedules if not exists
    and fill table with schedules from task labels.
    """
    self._database_pool = AsyncConnectionPool(
        conninfo=self.dsn if self.dsn is not None else "",
        open=False,
        **self._connect_kwargs,
    )
    await self._database_pool.open()

    async with self._database_pool.connection() as connection, connection.cursor() as cursor:
        await cursor.execute(
            sql.SQL(CREATE_SCHEDULES_TABLE_QUERY).format(sql.Identifier(self._table_name)),
        )
    scheduled_tasks_for_creation = self.extract_scheduled_tasks_from_broker()
    await self._update_schedules_on_startup(scheduled_tasks_for_creation)

shutdown async

shutdown()

Close the connection pool.

Source code in src/taskiq_pg/psycopg/schedule_source.py
async def shutdown(self) -> None:
    """Close the connection pool."""
    if getattr(self, "_database_pool", None) is not None:
        await self._database_pool.close()

get_schedules async

get_schedules()

Fetch schedules from the database.

Source code in src/taskiq_pg/psycopg/schedule_source.py
async def get_schedules(self) -> list["ScheduledTask"]:
    """Fetch schedules from the database."""
    schedules = []
    async with self._database_pool.connection() as connection, connection.cursor() as cursor:
        rows_with_schedules = await cursor.execute(
            sql.SQL(SELECT_SCHEDULES_QUERY).format(sql.Identifier(self._table_name)),
        )
        rows = await rows_with_schedules.fetchall()
        for schedule_id, task_name, schedule in rows:
            schedules.append(
                ScheduledTask.model_validate(
                    {
                        "schedule_id": str(schedule_id),
                        "task_name": task_name,
                        "labels": schedule["labels"],
                        "args": schedule["args"],
                        "kwargs": schedule["kwargs"],
                        "cron": schedule["cron"],
                        "cron_offset": schedule["cron_offset"],
                        "time": schedule["time"],
                    },
                ),
            )
    return schedules

add_schedule async

add_schedule(schedule)

Add a new schedule.

Parameters:

  • schedule (ScheduledTask) –

    schedule to add.

Source code in src/taskiq_pg/psycopg/schedule_source.py
async def add_schedule(self, schedule: "ScheduledTask") -> None:
    """
    Add a new schedule.

    Args:
        schedule: schedule to add.
    """
    async with self._database_pool.connection() as connection, connection.cursor() as cursor:
        await cursor.execute(
            sql.SQL(INSERT_SCHEDULE_QUERY).format(sql.Identifier(self._table_name)),
            [
                uuid.UUID(schedule.schedule_id),
                schedule.task_name,
                schedule.model_dump_json(
                    exclude={"schedule_id", "task_name"},
                ),
            ]
        )

delete_schedule async

delete_schedule(schedule_id)

Method to delete schedule by id.

This is useful for schedule cancelation.

Parameters:

  • schedule_id (str) –

    id of schedule to delete.

Source code in src/taskiq_pg/psycopg/schedule_source.py
async def delete_schedule(self, schedule_id: str) -> None:
    """
    Method to delete schedule by id.

    This is useful for schedule cancelation.

    Args:
        schedule_id: id of schedule to delete.
    """
    async with self._database_pool.connection() as connection, connection.cursor() as cursor:
        await cursor.execute(
            sql.SQL(DELETE_SCHEDULE_QUERY).format(sql.Identifier(self._table_name)),
            [schedule_id],
        )

post_send async

post_send(task)

Delete a task after it's completed.

Source code in src/taskiq_pg/psycopg/schedule_source.py
async def post_send(self, task: ScheduledTask) -> None:
    """Delete a task after it's completed."""
    if task.time is not None:
        await self.delete_schedule(task.schedule_id)

broker

logger module-attribute

logger = getLogger('taskiq.psycopg_broker')

PsycopgBroker

PsycopgBroker(
    dsn="postgresql://postgres:postgres@localhost:5432/postgres",
    result_backend=None,
    task_id_generator=None,
    channel_name="taskiq",
    table_name="taskiq_messages",
    max_retry_attempts=5,
    read_kwargs=None,
    write_kwargs=None,
)

Bases: BasePostgresBroker

Broker that uses PostgreSQL and psycopg with LISTEN/NOTIFY.

Construct a new broker.

Parameters:

  • dsn (str | Callable[[], str], default: 'postgresql://postgres:postgres@localhost:5432/postgres' ) –

    connection string to PostgreSQL, or callable returning one.

  • result_backend (AsyncResultBackend[_T] | None, default: None ) –

    Custom result backend.

  • task_id_generator (Callable[[], str] | None, default: None ) –

    Custom task_id generator.

  • channel_name (str, default: 'taskiq' ) –

    Name of the channel to listen on.

  • table_name (str, default: 'taskiq_messages' ) –

    Name of the table to store messages.

  • max_retry_attempts (int, default: 5 ) –

    Maximum number of message processing attempts.

  • read_kwargs (dict[str, Any] | None, default: None ) –

    Additional arguments for read connection creation.

  • write_kwargs (dict[str, Any] | None, default: None ) –

    Additional arguments for write pool creation.

Source code in src/taskiq_pg/_internal/broker.py
def __init__(  # noqa: PLR0913
    self,
    dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres",
    result_backend: AsyncResultBackend[_T] | None = None,
    task_id_generator: tp.Callable[[], str] | None = None,
    channel_name: str = "taskiq",
    table_name: str = "taskiq_messages",
    max_retry_attempts: int = 5,
    read_kwargs: dict[str, tp.Any] | None = None,
    write_kwargs: dict[str, tp.Any] | None = None,
) -> None:
    """
    Construct a new broker.

    Args:
        dsn: connection string to PostgreSQL, or callable returning one.
        result_backend: Custom result backend.
        task_id_generator: Custom task_id generator.
        channel_name: Name of the channel to listen on.
        table_name: Name of the table to store messages.
        max_retry_attempts: Maximum number of message processing attempts.
        read_kwargs: Additional arguments for read connection creation.
        write_kwargs: Additional arguments for write pool creation.

    """
    super().__init__(
        result_backend=result_backend,
        task_id_generator=task_id_generator,
    )
    self._dsn: str | tp.Callable[[], str] = dsn
    self.channel_name: str = channel_name
    self.table_name: str = table_name
    self.read_kwargs: dict[str, tp.Any] = read_kwargs or {}
    self.write_kwargs: dict[str, tp.Any] = write_kwargs or {}
    self.max_retry_attempts: int = max_retry_attempts
    self._queue: asyncio.Queue[str] | None = None
channel_name instance-attribute
channel_name = channel_name
table_name instance-attribute
table_name = table_name
read_kwargs instance-attribute
read_kwargs = read_kwargs or {}
write_kwargs instance-attribute
write_kwargs = write_kwargs or {}
max_retry_attempts instance-attribute
max_retry_attempts = max_retry_attempts
dsn property
dsn

Get the DSN string.

Returns:

  • str

    A string with dsn or None if dsn isn't set yet.

startup async
startup()

Initialize the broker.

Source code in src/taskiq_pg/psycopg/broker.py
async def startup(self) -> None:
    """Initialize the broker."""
    await super().startup()
    self._read_conn = await AsyncConnection.connect(
        conninfo=self.dsn,
        **self.read_kwargs,
        autocommit=True,
        cursor_factory=AsyncRawCursor,
    )
    self._write_pool = AsyncConnectionPool(
        conninfo=self.dsn if self.dsn is not None else "",
        open=False,
        **self.write_kwargs,
    )
    await self._write_pool.open()

    async with self._write_pool.connection() as connection, connection.cursor() as cursor:
        await cursor.execute(sql.SQL(CREATE_MESSAGE_TABLE_QUERY).format(sql.Identifier(self.table_name)))

    await self._read_conn.execute(sql.SQL("LISTEN {}").format(sql.Identifier(self.channel_name)))
    self._notifies_iter = self._read_conn.notifies()
shutdown async
shutdown()

Close all connections on shutdown.

Source code in src/taskiq_pg/psycopg/broker.py
async def shutdown(self) -> None:
    """Close all connections on shutdown."""
    await super().shutdown()
    if self._notifies_iter is not None:
        with suppress(RuntimeError):  # RuntimeError: aclose(): asynchronous generator is already running
            await self._notifies_iter.aclose()  # type: ignore[attr-defined]
    if self._read_conn is not None:
        await self._read_conn.notifies().aclose()
        await self._read_conn.close()
    if self._write_pool is not None:
        await self._write_pool.close()
kick async
kick(message)

Send message to the channel.

Inserts the message into the database and sends a NOTIFY.

:param message: Message to send.

Source code in src/taskiq_pg/psycopg/broker.py
async def kick(self, message: BrokerMessage) -> None:
    """
    Send message to the channel.

    Inserts the message into the database and sends a NOTIFY.

    :param message: Message to send.
    """
    async with self._write_pool.connection() as connection, connection.cursor() as cursor:
        # insert message into db table
        await cursor.execute(
            sql.SQL(INSERT_MESSAGE_QUERY).format(sql.Identifier(self.table_name)),
            [
                message.task_id,
                message.task_name,
                message.message.decode(),
                json.dumps(message.labels),
            ],
        )
        row = await cursor.fetchone()
        if row is None:
            msg = "failed to insert message"
            raise RuntimeError(msg)
        message_inserted_id = int(row[0])

        delay_value = tp.cast("str | None", message.labels.get("delay"))
        if delay_value is not None:
            delay_seconds = int(delay_value)
            await self._schedule_notification(message_inserted_id, delay_seconds)
        else:
            # Send NOTIFY with message ID as payload
            await cursor.execute(
                sql.SQL("NOTIFY {}, {}").format(
                    sql.Identifier(self.channel_name),
                    sql.Literal(str(message_inserted_id)),
                ),
            )
listen async
listen()

Listen to the channel.

Yields messages as they are received.

:yields: AckableMessage instances.

Source code in src/taskiq_pg/psycopg/broker.py
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
    """
    Listen to the channel.

    Yields messages as they are received.

    :yields: AckableMessage instances.
    """
    while True:
        async for message_id_str in self._listen_context():
            message_id = int(message_id_str)  # payload is the message id
            try:
                async with self._write_pool.connection() as connection, connection.cursor() as cursor:
                    await cursor.execute(
                        sql.SQL(CLAIM_MESSAGE_QUERY).format(sql.Identifier(self.table_name)),
                        [message_id],
                    )
                    claimed_message = await cursor.fetchone()
                    if claimed_message is None:
                        continue
            except psycopg.OperationalError:  # message was claimed by another worker
                continue
            message_str = claimed_message[3]
            if not isinstance(message_str, str):
                msg = "Message is not a string"
                raise TypeError(msg)
            message_data = message_str.encode()

            async def ack(*, _message_id: int = message_id) -> None:
                async with self._write_pool.connection() as connection, connection.cursor() as cursor:
                    await cursor.execute(
                        sql.SQL(DELETE_MESSAGE_QUERY).format(sql.Identifier(self.table_name)),
                        [_message_id],
                    )

            yield AckableMessage(data=message_data, ack=ack)

queries

CREATE_TABLE_QUERY module-attribute

CREATE_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n    task_id {} UNIQUE,\n    result BYTEA\n)\n"

CREATE_INDEX_QUERY module-attribute

CREATE_INDEX_QUERY = "\nCREATE INDEX IF NOT EXISTS {} ON {} USING HASH (task_id)\n"

INSERT_RESULT_QUERY module-attribute

INSERT_RESULT_QUERY = "\nINSERT INTO {} VALUES (%s, %s)\nON CONFLICT (task_id)\nDO UPDATE\nSET result = EXCLUDED.result;\n"

IS_RESULT_EXISTS_QUERY module-attribute

IS_RESULT_EXISTS_QUERY = "\nSELECT EXISTS(\n    SELECT 1 FROM {} WHERE task_id = %s\n);\n"

SELECT_RESULT_QUERY module-attribute

SELECT_RESULT_QUERY = (
    "\nSELECT result FROM {} WHERE task_id = %s;\n"
)

DELETE_RESULT_QUERY module-attribute

DELETE_RESULT_QUERY = (
    "\nDELETE FROM {} WHERE task_id = %s;\n"
)

CREATE_MESSAGE_TABLE_QUERY module-attribute

CREATE_MESSAGE_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n    id SERIAL PRIMARY KEY,\n    task_id VARCHAR NOT NULL,\n    task_name VARCHAR NOT NULL,\n    message TEXT NOT NULL,\n    labels JSONB NOT NULL,\n    status TEXT NOT NULL DEFAULT 'pending',\n    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"

INSERT_MESSAGE_QUERY module-attribute

INSERT_MESSAGE_QUERY = "\nINSERT INTO {} (task_id, task_name, message, labels)\nVALUES (%s, %s, %s, %s)\nRETURNING id\n"

CLAIM_MESSAGE_QUERY module-attribute

CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = %s AND status = 'pending' RETURNING *"

DELETE_MESSAGE_QUERY module-attribute

DELETE_MESSAGE_QUERY = 'DELETE FROM {} WHERE id = %s'

CREATE_SCHEDULES_TABLE_QUERY module-attribute

CREATE_SCHEDULES_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n    id UUID PRIMARY KEY,\n    task_name VARCHAR(100) NOT NULL,\n    schedule JSONB NOT NULL,\n    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),\n    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"

INSERT_SCHEDULE_QUERY module-attribute

INSERT_SCHEDULE_QUERY = "\nINSERT INTO {} (id, task_name, schedule)\nVALUES (%s, %s, %s)\nON CONFLICT (id) DO UPDATE\nSET task_name = EXCLUDED.task_name,\n    schedule = EXCLUDED.schedule,\n    updated_at = NOW();\n"

SELECT_SCHEDULES_QUERY module-attribute

SELECT_SCHEDULES_QUERY = (
    "\nSELECT id, task_name, schedule\nFROM {};\n"
)

DELETE_ALL_SCHEDULES_QUERY module-attribute

DELETE_ALL_SCHEDULES_QUERY = '\nDELETE FROM {};\n'

DELETE_SCHEDULE_QUERY module-attribute

DELETE_SCHEDULE_QUERY = '\nDELETE FROM {} WHERE id = %s;\n'

result_backend

PsycopgResultBackend

PsycopgResultBackend(
    dsn="postgres://postgres:postgres@localhost:5432/postgres",
    keep_results=True,
    table_name="taskiq_results",
    field_for_task_id="VarChar",
    serializer=None,
    **connect_kwargs,
)

Bases: BasePostgresResultBackend

Result backend for TaskIQ based on psycopg.

Construct new result backend.

Parameters:

  • dsn (Callable[[], str] | str | None, default: 'postgres://postgres:postgres@localhost:5432/postgres' ) –

    connection string to PostgreSQL, or callable returning one.

  • keep_results (bool, default: True ) –

    flag to not remove results from the database after reading.

  • table_name (str, default: 'taskiq_results' ) –

    name of the table to store results.

  • field_for_task_id (Literal['VarChar', 'Text', 'Uuid'], default: 'VarChar' ) –

    type of the field to store task_id.

  • serializer (TaskiqSerializer | None, default: None ) –

    serializer class to serialize/deserialize result from task.

  • connect_kwargs (Any, default: {} ) –

    additional arguments for creating connection pool.

Source code in src/taskiq_pg/_internal/result_backend.py
def __init__(
    self,
    dsn: tp.Callable[[], str] | str | None = "postgres://postgres:postgres@localhost:5432/postgres",
    keep_results: bool = True,
    table_name: str = "taskiq_results",
    field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = "VarChar",
    serializer: TaskiqSerializer | None = None,
    **connect_kwargs: tp.Any,
) -> None:
    """
    Construct new result backend.

    Args:
        dsn: connection string to PostgreSQL, or callable returning one.
        keep_results: flag to not remove results from the database after reading.
        table_name: name of the table to store results.
        field_for_task_id: type of the field to store task_id.
        serializer: serializer class to serialize/deserialize result from task.
        connect_kwargs: additional arguments for creating connection pool.

    """
    self._dsn: tp.Final = dsn
    self.keep_results: tp.Final = keep_results
    self.table_name: tp.Final = table_name
    self.field_for_task_id: tp.Final = field_for_task_id
    self.connect_kwargs: tp.Final = connect_kwargs
    self.serializer = serializer or PickleSerializer()
keep_results instance-attribute
keep_results = keep_results
table_name instance-attribute
table_name = table_name
field_for_task_id instance-attribute
field_for_task_id = field_for_task_id
connect_kwargs instance-attribute
connect_kwargs = connect_kwargs
serializer instance-attribute
serializer = serializer or PickleSerializer()
dsn property
dsn

Get the DSN string.

Returns the DSN string or None if not set.

startup async
startup()

Initialize the result backend.

Construct new connection pool and create new table for results if not exists.

Source code in src/taskiq_pg/psycopg/result_backend.py
async def startup(self) -> None:
    """
    Initialize the result backend.

    Construct new connection pool
    and create new table for results if not exists.
    """
    self._database_pool = AsyncConnectionPool(
        conninfo=self.dsn if self.dsn is not None else "",
        open=False,
        **self.connect_kwargs,
    )
    await self._database_pool.open()
    async with self._database_pool.connection() as connection, connection.cursor() as cursor:
        await cursor.execute(
            query=sql.SQL(CREATE_TABLE_QUERY).format(
                sql.Identifier(self.table_name),
                sql.SQL(self.field_for_task_id),
            ),
        )
        await cursor.execute(
            query=sql.SQL(CREATE_INDEX_QUERY).format(
                sql.Identifier(self.table_name + "_task_id_idx"),
                sql.Identifier(self.table_name),
            ),
        )
shutdown async
shutdown()

Close the connection pool.

Source code in src/taskiq_pg/psycopg/result_backend.py
async def shutdown(self) -> None:
    """Close the connection pool."""
    if getattr(self, "_database_pool", None) is not None:
        await self._database_pool.close()
set_result async
set_result(task_id, result)

Set result to the PostgreSQL table.

:param task_id: ID of the task. :param result: result of the task.

Source code in src/taskiq_pg/psycopg/result_backend.py
async def set_result(
    self,
    task_id: str,
    result: TaskiqResult[ReturnType],
) -> None:
    """
    Set result to the PostgreSQL table.

    :param task_id: ID of the task.
    :param result: result of the task.
    """
    async with self._database_pool.connection() as connection, connection.cursor() as cursor:
        await cursor.execute(
            query=sql.SQL(INSERT_RESULT_QUERY).format(
                sql.Identifier(self.table_name),
            ),
            params=[
                task_id,
                self.serializer.dumpb(model_dump(result)),
            ],
        )
is_result_ready async
is_result_ready(task_id)

Returns whether the result is ready.

:param task_id: ID of the task.

:returns: True if the result is ready else False.

Source code in src/taskiq_pg/psycopg/result_backend.py
async def is_result_ready(self, task_id: str) -> bool:
    """
    Returns whether the result is ready.

    :param task_id: ID of the task.

    :returns: True if the result is ready else False.
    """
    async with self._database_pool.connection() as connection, connection.cursor() as cursor:
        execute_result = await cursor.execute(
            query=sql.SQL(IS_RESULT_EXISTS_QUERY).format(
                sql.Identifier(self.table_name),
            ),
            params=[task_id],
        )
        row = await execute_result.fetchone()
        return bool(row and row[0])
get_result async
get_result(task_id, with_logs=False)

Retrieve result from the task.

:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.

Source code in src/taskiq_pg/psycopg/result_backend.py
async def get_result(
    self,
    task_id: str,
    with_logs: bool = False,
) -> TaskiqResult[ReturnType]:
    """
    Retrieve result from the task.

    :param task_id: task's id.
    :param with_logs: if True it will download task's logs.
    :raises ResultIsMissingError: if there is no result when trying to get it.
    :return: TaskiqResult.
    """
    async with self._database_pool.connection() as connection, connection.cursor() as cursor:
        execute_result = await cursor.execute(
            query=sql.SQL(SELECT_RESULT_QUERY).format(
                sql.Identifier(self.table_name),
            ),
            params=[task_id],
        )
        result = await execute_result.fetchone()
        if result is None:
            msg = f"Cannot find record with task_id = {task_id} in PostgreSQL"
            raise ResultIsMissingError(msg)
        result_in_bytes: tp.Final = result[0]

        if not self.keep_results:
            await cursor.execute(
                query=sql.SQL(DELETE_RESULT_QUERY).format(
                    sql.Identifier(self.table_name),
                ),
                params=[task_id],
            )

        taskiq_result: tp.Final = model_validate(
            TaskiqResult[ReturnType],
            self.serializer.loadb(result_in_bytes),
        )

        if not with_logs:
            taskiq_result.log = None

        return taskiq_result

schedule_source

logger module-attribute

logger = getLogger('taskiq_pg.psycopg_schedule_source')

PsycopgScheduleSource

PsycopgScheduleSource(
    broker,
    dsn="postgresql://postgres:postgres@localhost:5432/postgres",
    table_name="taskiq_schedules",
    **connect_kwargs,
)

Bases: BasePostgresScheduleSource

Schedule source that uses psycopg to store schedules in PostgreSQL.

Initialize the PostgreSQL scheduler source.

Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks across application restarts.

Parameters:

  • dsn (str | Callable[[], str], default: 'postgresql://postgres:postgres@localhost:5432/postgres' ) –

    PostgreSQL connection string

  • table_name (str, default: 'taskiq_schedules' ) –

    Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.

  • broker (AsyncBroker) –

    The TaskIQ broker instance to use for finding and managing tasks. Required if startup_schedule is provided.

  • **connect_kwargs (Any, default: {} ) –

    Additional keyword arguments passed to the database connection pool.

Source code in src/taskiq_pg/_internal/schedule_source.py
def __init__(
    self,
    broker: AsyncBroker,
    dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres",
    table_name: str = "taskiq_schedules",
    **connect_kwargs: tp.Any,
) -> None:
    """
    Initialize the PostgreSQL scheduler source.

    Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database.
    This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks
    across application restarts.

    Args:
        dsn: PostgreSQL connection string
        table_name: Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.
        broker: The TaskIQ broker instance to use for finding and managing tasks.
            Required if startup_schedule is provided.
        **connect_kwargs: Additional keyword arguments passed to the database connection pool.

    """
    self._broker: tp.Final = broker
    self._dsn: tp.Final = dsn
    self._table_name: tp.Final = table_name
    self._connect_kwargs: tp.Final = connect_kwargs
dsn property
dsn

Get the DSN string.

Returns the DSN string or None if not set.

startup async
startup()

Initialize the schedule source.

Construct new connection pool, create new table for schedules if not exists and fill table with schedules from task labels.

Source code in src/taskiq_pg/psycopg/schedule_source.py
async def startup(self) -> None:
    """
    Initialize the schedule source.

    Construct new connection pool, create new table for schedules if not exists
    and fill table with schedules from task labels.
    """
    self._database_pool = AsyncConnectionPool(
        conninfo=self.dsn if self.dsn is not None else "",
        open=False,
        **self._connect_kwargs,
    )
    await self._database_pool.open()

    async with self._database_pool.connection() as connection, connection.cursor() as cursor:
        await cursor.execute(
            sql.SQL(CREATE_SCHEDULES_TABLE_QUERY).format(sql.Identifier(self._table_name)),
        )
    scheduled_tasks_for_creation = self.extract_scheduled_tasks_from_broker()
    await self._update_schedules_on_startup(scheduled_tasks_for_creation)
shutdown async
shutdown()

Close the connection pool.

Source code in src/taskiq_pg/psycopg/schedule_source.py
async def shutdown(self) -> None:
    """Close the connection pool."""
    if getattr(self, "_database_pool", None) is not None:
        await self._database_pool.close()
get_schedules async
get_schedules()

Fetch schedules from the database.

Source code in src/taskiq_pg/psycopg/schedule_source.py
async def get_schedules(self) -> list["ScheduledTask"]:
    """Fetch schedules from the database."""
    schedules = []
    async with self._database_pool.connection() as connection, connection.cursor() as cursor:
        rows_with_schedules = await cursor.execute(
            sql.SQL(SELECT_SCHEDULES_QUERY).format(sql.Identifier(self._table_name)),
        )
        rows = await rows_with_schedules.fetchall()
        for schedule_id, task_name, schedule in rows:
            schedules.append(
                ScheduledTask.model_validate(
                    {
                        "schedule_id": str(schedule_id),
                        "task_name": task_name,
                        "labels": schedule["labels"],
                        "args": schedule["args"],
                        "kwargs": schedule["kwargs"],
                        "cron": schedule["cron"],
                        "cron_offset": schedule["cron_offset"],
                        "time": schedule["time"],
                    },
                ),
            )
    return schedules
add_schedule async
add_schedule(schedule)

Add a new schedule.

Parameters:

  • schedule (ScheduledTask) –

    schedule to add.

Source code in src/taskiq_pg/psycopg/schedule_source.py
async def add_schedule(self, schedule: "ScheduledTask") -> None:
    """
    Add a new schedule.

    Args:
        schedule: schedule to add.
    """
    async with self._database_pool.connection() as connection, connection.cursor() as cursor:
        await cursor.execute(
            sql.SQL(INSERT_SCHEDULE_QUERY).format(sql.Identifier(self._table_name)),
            [
                uuid.UUID(schedule.schedule_id),
                schedule.task_name,
                schedule.model_dump_json(
                    exclude={"schedule_id", "task_name"},
                ),
            ]
        )
delete_schedule async
delete_schedule(schedule_id)

Method to delete schedule by id.

This is useful for schedule cancelation.

Parameters:

  • schedule_id (str) –

    id of schedule to delete.

Source code in src/taskiq_pg/psycopg/schedule_source.py
async def delete_schedule(self, schedule_id: str) -> None:
    """
    Method to delete schedule by id.

    This is useful for schedule cancelation.

    Args:
        schedule_id: id of schedule to delete.
    """
    async with self._database_pool.connection() as connection, connection.cursor() as cursor:
        await cursor.execute(
            sql.SQL(DELETE_SCHEDULE_QUERY).format(sql.Identifier(self._table_name)),
            [schedule_id],
        )
post_send async
post_send(task)

Delete a task after it's completed.

Source code in src/taskiq_pg/psycopg/schedule_source.py
async def post_send(self, task: ScheduledTask) -> None:
    """Delete a task after it's completed."""
    if task.time is not None:
        await self.delete_schedule(task.schedule_id)
extract_scheduled_tasks_from_broker
extract_scheduled_tasks_from_broker()

Extract schedules from tasks that were registered in broker.

Returns:

  • list[ScheduledTask]

    A list of ScheduledTask instances extracted from the task's labels.

Source code in src/taskiq_pg/_internal/schedule_source.py
def extract_scheduled_tasks_from_broker(self) -> list[ScheduledTask]:
    """
    Extract schedules from tasks that were registered in broker.

    Returns:
        A list of ScheduledTask instances extracted from the task's labels.
    """
    scheduled_tasks_for_creation: list[ScheduledTask] = []
    for task_name, task in self._broker.get_all_tasks().items():
        if "schedule" not in task.labels:
            logger.debug("Task %s has no schedule, skipping", task_name)
            continue
        if not isinstance(task.labels["schedule"], list):
            logger.warning(
                "Schedule for task %s is not a list, skipping",
                task_name,
            )
            continue
        for schedule in task.labels["schedule"]:
            try:
                new_schedule = ScheduledTask.model_validate(
                    {
                        "task_name": task_name,
                        "labels": schedule.get("labels", {}),
                        "args": schedule.get("args", []),
                        "kwargs": schedule.get("kwargs", {}),
                        "schedule_id": str(uuid.uuid4()),
                        "cron": schedule.get("cron", None),
                        "cron_offset": schedule.get("cron_offset", None),
                        "time": schedule.get("time", None),
                    },
                )
                scheduled_tasks_for_creation.append(new_schedule)
            except ValidationError:  # noqa: PERF203
                logger.exception(
                    "Schedule for task %s is not valid, skipping",
                    task_name,
                )
                continue
    return scheduled_tasks_for_creation