taskiq_pg
¶
aiopg
¶
AiopgResultBackend
¶
AiopgResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
keep_results=True,
table_name="taskiq_results",
field_for_task_id="VarChar",
serializer=None,
*,
pool=None,
**connect_kwargs,
)
Bases: BasePostgresResultBackend
Result backend for TaskIQ based on Aiopg.
Construct a new AiopgResultBackend.
Parameters:
-
dsn(Callable[[], str] | str | None, default:'postgres://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable. Can be None if pool is provided.
-
keep_results(bool, default:True) –Whether to keep results after reading.
-
table_name(str, default:'taskiq_results') –Table to store results in.
-
field_for_task_id(Literal['VarChar', 'Text', 'Uuid'], default:'VarChar') –Column type for task_id.
-
serializer(TaskiqSerializer | None, default:None) –Serializer for task results.
-
pool(Pool | None, default:None) –An existing connection pool to reuse.
-
**connect_kwargs(Any, default:{}) –Extra kwargs for connection pool creation.
Source code in src/taskiq_pg/aiopg/result_backend.py
startup
async
¶
Initialize the result backend.
Construct new connection pool (if not provided externally) and create new table for results if not exists.
Source code in src/taskiq_pg/aiopg/result_backend.py
shutdown
async
¶
Close the connection pool if created by this backend.
set_result
async
¶
Set result to the PostgreSQL table.
Parameters:
-
task_id(Any) –ID of the task.
-
result(TaskiqResult[_ReturnType]) –result of the task.
Source code in src/taskiq_pg/aiopg/result_backend.py
is_result_ready
async
¶
Return whether the result is ready.
Parameters:
-
task_id(Any) –ID of the task.
Returns:
-
bool(bool) –True if the result is ready else False.
Source code in src/taskiq_pg/aiopg/result_backend.py
get_result
async
¶
Retrieve result from the task.
:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.
Source code in src/taskiq_pg/aiopg/result_backend.py
set_progress
async
¶
Saves progress.
:param task_id: task's id. :param progress: progress of execution.
Source code in src/taskiq_pg/aiopg/result_backend.py
get_progress
async
¶
Gets progress.
:param task_id: task's id.
Source code in src/taskiq_pg/aiopg/result_backend.py
AiopgScheduleSource
¶
AiopgScheduleSource(
broker,
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
table_name="taskiq_schedules",
*,
pool=None,
**connect_kwargs,
)
Bases: BasePostgresScheduleSource
Schedule source that uses aiopg to store schedules in PostgreSQL.
Construct a new AiopgScheduleSource.
Parameters:
-
broker(AsyncBroker) –The TaskIQ broker instance.
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable. Ignored in pool mode.
-
table_name(str, default:'taskiq_schedules') –Table to store schedules in.
-
pool(Pool | None, default:None) –An existing connection pool to reuse.
-
**connect_kwargs(Any, default:{}) –Extra kwargs for connection pool creation.
Source code in src/taskiq_pg/aiopg/schedule_source.py
extract_scheduled_tasks_from_broker
¶
Extract schedules from tasks that were registered in broker.
Returns:
-
list[ScheduledTask]–A list of ScheduledTask instances extracted from the task's labels.
Source code in src/taskiq_pg/_internal/schedule_source.py
startup
async
¶
Initialize the schedule source.
Construct new connection pool (if not provided externally), create new table for schedules if not exists and fill table with schedules from task labels.
Source code in src/taskiq_pg/aiopg/schedule_source.py
shutdown
async
¶
Close the connection pool if it was created by this schedule source.
get_schedules
async
¶
Fetch schedules from the database.
Source code in src/taskiq_pg/aiopg/schedule_source.py
add_schedule
async
¶
Add a new schedule.
Parameters:
-
schedule(ScheduledTask) –schedule to add.
Source code in src/taskiq_pg/aiopg/schedule_source.py
delete_schedule
async
¶
Method to delete schedule by id.
This is useful for schedule cancelation.
Parameters:
-
schedule_id(str) –id of schedule to delete.
Source code in src/taskiq_pg/aiopg/schedule_source.py
post_send
async
¶
broker
¶
queries
¶
CREATE_TABLE_QUERY
module-attribute
¶
CREATE_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n task_id {} UNIQUE,\n result BYTEA,\n progress BYTEA\n)\n"
ADD_PROGRESS_COLUMN_QUERY
module-attribute
¶
CREATE_INDEX_QUERY
module-attribute
¶
INSERT_RESULT_QUERY
module-attribute
¶
INSERT_RESULT_QUERY = "\nINSERT INTO {} VALUES (%s, %s, NULL)\nON CONFLICT (task_id)\nDO UPDATE\nSET result = %s\n"
INSERT_PROGRESS_QUERY
module-attribute
¶
INSERT_PROGRESS_QUERY = "\nINSERT INTO {} VALUES (%s, NULL, %s)\nON CONFLICT (task_id)\nDO UPDATE\nSET progress = %s\n"
SELECT_PROGRESS_QUERY
module-attribute
¶
IS_RESULT_EXISTS_QUERY
module-attribute
¶
IS_RESULT_EXISTS_QUERY = "\nSELECT EXISTS(\n SELECT 1 FROM {} WHERE task_id = %s and result IS NOT NULL\n)\n"
SELECT_RESULT_QUERY
module-attribute
¶
DELETE_RESULT_QUERY
module-attribute
¶
CREATE_SCHEDULES_TABLE_QUERY
module-attribute
¶
CREATE_SCHEDULES_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n id UUID PRIMARY KEY,\n task_name VARCHAR(100) NOT NULL,\n schedule JSONB NOT NULL,\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),\n updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"
INSERT_SCHEDULE_QUERY
module-attribute
¶
INSERT_SCHEDULE_QUERY = "\nINSERT INTO {} (id, task_name, schedule)\nVALUES (%s, %s, %s)\nON CONFLICT (id) DO UPDATE\nSET task_name = EXCLUDED.task_name,\n schedule = EXCLUDED.schedule,\n updated_at = NOW();\n"
SELECT_SCHEDULES_QUERY
module-attribute
¶
DELETE_SCHEDULE_QUERY
module-attribute
¶
result_backend
¶
AiopgResultBackend
¶
AiopgResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
keep_results=True,
table_name="taskiq_results",
field_for_task_id="VarChar",
serializer=None,
*,
pool=None,
**connect_kwargs,
)
Bases: BasePostgresResultBackend
Result backend for TaskIQ based on Aiopg.
Construct a new AiopgResultBackend.
Parameters:
-
dsn(Callable[[], str] | str | None, default:'postgres://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable. Can be None if pool is provided.
-
keep_results(bool, default:True) –Whether to keep results after reading.
-
table_name(str, default:'taskiq_results') –Table to store results in.
-
field_for_task_id(Literal['VarChar', 'Text', 'Uuid'], default:'VarChar') –Column type for task_id.
-
serializer(TaskiqSerializer | None, default:None) –Serializer for task results.
-
pool(Pool | None, default:None) –An existing connection pool to reuse.
-
**connect_kwargs(Any, default:{}) –Extra kwargs for connection pool creation.
Source code in src/taskiq_pg/aiopg/result_backend.py
startup
async
¶
Initialize the result backend.
Construct new connection pool (if not provided externally) and create new table for results if not exists.
Source code in src/taskiq_pg/aiopg/result_backend.py
shutdown
async
¶
Close the connection pool if created by this backend.
set_result
async
¶
Set result to the PostgreSQL table.
Parameters:
-
task_id(Any) –ID of the task.
-
result(TaskiqResult[_ReturnType]) –result of the task.
Source code in src/taskiq_pg/aiopg/result_backend.py
is_result_ready
async
¶
Return whether the result is ready.
Parameters:
-
task_id(Any) –ID of the task.
Returns:
-
bool(bool) –True if the result is ready else False.
Source code in src/taskiq_pg/aiopg/result_backend.py
get_result
async
¶
Retrieve result from the task.
:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.
Source code in src/taskiq_pg/aiopg/result_backend.py
set_progress
async
¶
Saves progress.
:param task_id: task's id. :param progress: progress of execution.
Source code in src/taskiq_pg/aiopg/result_backend.py
get_progress
async
¶
Gets progress.
:param task_id: task's id.
Source code in src/taskiq_pg/aiopg/result_backend.py
schedule_source
¶
AiopgScheduleSource
¶
AiopgScheduleSource(
broker,
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
table_name="taskiq_schedules",
*,
pool=None,
**connect_kwargs,
)
Bases: BasePostgresScheduleSource
Schedule source that uses aiopg to store schedules in PostgreSQL.
Construct a new AiopgScheduleSource.
Parameters:
-
broker(AsyncBroker) –The TaskIQ broker instance.
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable. Ignored in pool mode.
-
table_name(str, default:'taskiq_schedules') –Table to store schedules in.
-
pool(Pool | None, default:None) –An existing connection pool to reuse.
-
**connect_kwargs(Any, default:{}) –Extra kwargs for connection pool creation.
Source code in src/taskiq_pg/aiopg/schedule_source.py
startup
async
¶
Initialize the schedule source.
Construct new connection pool (if not provided externally), create new table for schedules if not exists and fill table with schedules from task labels.
Source code in src/taskiq_pg/aiopg/schedule_source.py
shutdown
async
¶
Close the connection pool if it was created by this schedule source.
get_schedules
async
¶
Fetch schedules from the database.
Source code in src/taskiq_pg/aiopg/schedule_source.py
add_schedule
async
¶
Add a new schedule.
Parameters:
-
schedule(ScheduledTask) –schedule to add.
Source code in src/taskiq_pg/aiopg/schedule_source.py
delete_schedule
async
¶
Method to delete schedule by id.
This is useful for schedule cancelation.
Parameters:
-
schedule_id(str) –id of schedule to delete.
Source code in src/taskiq_pg/aiopg/schedule_source.py
post_send
async
¶
extract_scheduled_tasks_from_broker
¶
Extract schedules from tasks that were registered in broker.
Returns:
-
list[ScheduledTask]–A list of ScheduledTask instances extracted from the task's labels.
Source code in src/taskiq_pg/_internal/schedule_source.py
asyncpg
¶
AsyncpgBroker
¶
AsyncpgBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: None = ...,
read_connection: None = ...,
)
AsyncpgBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: Pool,
read_connection: None = ...,
)
AsyncpgBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: None = ...,
read_connection: Connection,
)
AsyncpgBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: Pool,
read_connection: Connection,
)
AsyncpgBroker(
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
result_backend=None,
task_id_generator=None,
channel_name="taskiq",
table_name="taskiq_messages",
max_retry_attempts=5,
read_kwargs=None,
write_kwargs=None,
*,
write_pool=None,
read_connection=None,
)
Bases: BasePostgresBroker
Broker that uses asyncpg as driver and PostgreSQL with LISTEN/NOTIFY mechanism.
Construct a new AsyncpgBroker.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or a callable returning one.
-
result_backend(AsyncResultBackend[_T] | None, default:None) –Custom result backend.
-
task_id_generator(Callable[[], str] | None, default:None) –Custom task_id generator.
-
channel_name(str, default:'taskiq') –Name of the LISTEN/NOTIFY channel.
-
table_name(str, default:'taskiq_messages') –Name of the table used to store messages.
-
max_retry_attempts(int, default:5) –Maximum number of message processing attempts.
-
read_kwargs(dict[str, Any] | None, default:None) –Extra kwargs forwarded to
asyncpg.connect(). -
write_kwargs(dict[str, Any] | None, default:None) –Extra kwargs forwarded to
asyncpg.create_pool(). -
write_pool(Pool | None, default:None) –An existing connection pool to reuse for writes.
-
read_connection(Connection | None, default:None) –An existing connection to reuse for LISTEN.
Source code in src/taskiq_pg/asyncpg/broker.py
dsn
property
¶
startup
async
¶
Initialize the broker.
Source code in src/taskiq_pg/asyncpg/broker.py
shutdown
async
¶
Close all connections on shutdown.
Source code in src/taskiq_pg/asyncpg/broker.py
kick
async
¶
Send message to the channel.
Inserts the message into the database and sends a NOTIFY.
:param message: Message to send.
Source code in src/taskiq_pg/asyncpg/broker.py
listen
async
¶
Listen to the channel.
Yields messages as they are received.
:yields: AckableMessage instances.
Source code in src/taskiq_pg/asyncpg/broker.py
AsyncpgResultBackend
¶
AsyncpgResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
keep_results=True,
table_name="taskiq_results",
field_for_task_id="VarChar",
serializer=None,
*,
pool=None,
**connect_kwargs,
)
Bases: BasePostgresResultBackend
Result backend for TaskIQ based on asyncpg.
Construct a new AsyncpgResultBackend.
Parameters:
-
dsn(Callable[[], str] | str | None, default:'postgres://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable. Can be None if pool is provided.
-
keep_results(bool, default:True) –Whether to keep results after reading.
-
table_name(str, default:'taskiq_results') –Table to store results in.
-
field_for_task_id(Literal['VarChar', 'Text', 'Uuid'], default:'VarChar') –Column type for task_id.
-
serializer(TaskiqSerializer | None, default:None) –Serializer for task results.
-
pool(Pool[Any] | None, default:None) –An existing connection pool to reuse.
-
**connect_kwargs(Any, default:{}) –Extra kwargs for connection pool creation.
Source code in src/taskiq_pg/asyncpg/result_backend.py
startup
async
¶
Initialize the result backend.
Construct new connection pool (if not provided externally) and create new table for results if not exists.
Source code in src/taskiq_pg/asyncpg/result_backend.py
shutdown
async
¶
Close the connection pool if it was created by this backend.
set_result
async
¶
Set result to the PostgreSQL table.
:param task_id: ID of the task. :param result: result of the task.
Source code in src/taskiq_pg/asyncpg/result_backend.py
is_result_ready
async
¶
Returns whether the result is ready.
:param task_id: ID of the task. :returns: True if the result is ready else False.
Source code in src/taskiq_pg/asyncpg/result_backend.py
get_result
async
¶
Retrieve result from the task.
:param task_id: task's id. :param with_logs: if True it will download task's logs. (deprecated in taskiq) :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.
Source code in src/taskiq_pg/asyncpg/result_backend.py
set_progress
async
¶
Saves progress.
:param task_id: task's id. :param progress: progress of execution.
Source code in src/taskiq_pg/asyncpg/result_backend.py
get_progress
async
¶
Gets progress.
:param task_id: task's id.
Source code in src/taskiq_pg/asyncpg/result_backend.py
AsyncpgScheduleSource
¶
AsyncpgScheduleSource(
broker,
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
table_name="taskiq_schedules",
*,
pool=None,
**connect_kwargs,
)
Bases: BasePostgresScheduleSource
Schedule source that uses asyncpg to store schedules in PostgreSQL.
Construct a new AsyncpgScheduleSource.
Parameters:
-
broker(AsyncBroker) –The TaskIQ broker instance.
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable. Ignored in pool mode.
-
table_name(str, default:'taskiq_schedules') –Table to store schedules in.
-
pool(Pool[Record] | None, default:None) –An existing connection pool to reuse.
-
**connect_kwargs(Any, default:{}) –Extra kwargs for connection pool creation.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
extract_scheduled_tasks_from_broker
¶
Extract schedules from tasks that were registered in broker.
Returns:
-
list[ScheduledTask]–A list of ScheduledTask instances extracted from the task's labels.
Source code in src/taskiq_pg/_internal/schedule_source.py
startup
async
¶
Initialize the schedule source.
Construct new connection pool (if not provided externally), create new table for schedules if not exists and fill table with schedules from task labels.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
shutdown
async
¶
Close the connection pool if it created by this schedule source.
get_schedules
async
¶
Fetch schedules from the database.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
add_schedule
async
¶
Add a new schedule.
Parameters:
-
schedule(ScheduledTask) –schedule to add.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
delete_schedule
async
¶
Method to delete schedule by id.
This is useful for schedule cancelation.
Parameters:
-
schedule_id(str) –id of schedule to delete.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
post_send
async
¶
broker
¶
AsyncpgBroker
¶
AsyncpgBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: None = ...,
read_connection: None = ...,
)
AsyncpgBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: Pool,
read_connection: None = ...,
)
AsyncpgBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: None = ...,
read_connection: Connection,
)
AsyncpgBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: Pool,
read_connection: Connection,
)
AsyncpgBroker(
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
result_backend=None,
task_id_generator=None,
channel_name="taskiq",
table_name="taskiq_messages",
max_retry_attempts=5,
read_kwargs=None,
write_kwargs=None,
*,
write_pool=None,
read_connection=None,
)
Bases: BasePostgresBroker
Broker that uses asyncpg as driver and PostgreSQL with LISTEN/NOTIFY mechanism.
Construct a new AsyncpgBroker.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or a callable returning one.
-
result_backend(AsyncResultBackend[_T] | None, default:None) –Custom result backend.
-
task_id_generator(Callable[[], str] | None, default:None) –Custom task_id generator.
-
channel_name(str, default:'taskiq') –Name of the LISTEN/NOTIFY channel.
-
table_name(str, default:'taskiq_messages') –Name of the table used to store messages.
-
max_retry_attempts(int, default:5) –Maximum number of message processing attempts.
-
read_kwargs(dict[str, Any] | None, default:None) –Extra kwargs forwarded to
asyncpg.connect(). -
write_kwargs(dict[str, Any] | None, default:None) –Extra kwargs forwarded to
asyncpg.create_pool(). -
write_pool(Pool | None, default:None) –An existing connection pool to reuse for writes.
-
read_connection(Connection | None, default:None) –An existing connection to reuse for LISTEN.
Source code in src/taskiq_pg/asyncpg/broker.py
dsn
property
¶
startup
async
¶
Initialize the broker.
Source code in src/taskiq_pg/asyncpg/broker.py
shutdown
async
¶
Close all connections on shutdown.
Source code in src/taskiq_pg/asyncpg/broker.py
kick
async
¶
Send message to the channel.
Inserts the message into the database and sends a NOTIFY.
:param message: Message to send.
Source code in src/taskiq_pg/asyncpg/broker.py
listen
async
¶
Listen to the channel.
Yields messages as they are received.
:yields: AckableMessage instances.
Source code in src/taskiq_pg/asyncpg/broker.py
queries
¶
CREATE_TABLE_QUERY
module-attribute
¶
CREATE_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n task_id {} UNIQUE,\n result BYTEA,\n progress BYTEA\n)\n"
ADD_PROGRESS_COLUMN_QUERY
module-attribute
¶
CREATE_INDEX_QUERY
module-attribute
¶
INSERT_RESULT_QUERY
module-attribute
¶
INSERT_RESULT_QUERY = "\nINSERT INTO {} VALUES ($1, $2, NULL)\nON CONFLICT (task_id)\nDO UPDATE\nSET result = $2\n"
INSERT_PROGRESS_QUERY
module-attribute
¶
INSERT_PROGRESS_QUERY = "\nINSERT INTO {} VALUES ($1, NULL, $2)\nON CONFLICT (task_id)\nDO UPDATE\nSET progress = $2\n"
SELECT_PROGRESS_QUERY
module-attribute
¶
IS_RESULT_EXISTS_QUERY
module-attribute
¶
IS_RESULT_EXISTS_QUERY = "\nSELECT EXISTS(\n SELECT 1 FROM {} WHERE task_id = $1 and result IS NOT NULL\n)\n"
SELECT_RESULT_QUERY
module-attribute
¶
DELETE_RESULT_QUERY
module-attribute
¶
CREATE_MESSAGE_TABLE_QUERY
module-attribute
¶
CREATE_MESSAGE_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n id SERIAL PRIMARY KEY,\n task_id VARCHAR NOT NULL,\n task_name VARCHAR NOT NULL,\n message TEXT NOT NULL,\n labels JSONB NOT NULL,\n status TEXT NOT NULL DEFAULT 'pending',\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"
INSERT_MESSAGE_QUERY
module-attribute
¶
INSERT_MESSAGE_QUERY = "\nINSERT INTO {} (task_id, task_name, message, labels)\nVALUES ($1, $2, $3, $4)\nRETURNING id\n"
CLAIM_MESSAGE_QUERY
module-attribute
¶
CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING id, message"
CREATE_SCHEDULES_TABLE_QUERY
module-attribute
¶
CREATE_SCHEDULES_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n id UUID PRIMARY KEY,\n task_name VARCHAR(100) NOT NULL,\n schedule JSONB NOT NULL,\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),\n updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"
INSERT_SCHEDULE_QUERY
module-attribute
¶
INSERT_SCHEDULE_QUERY = "\nINSERT INTO {} (id, task_name, schedule)\nVALUES ($1, $2, $3)\nON CONFLICT (id) DO UPDATE\nSET task_name = EXCLUDED.task_name,\n schedule = EXCLUDED.schedule,\n updated_at = NOW();\n"
SELECT_SCHEDULES_QUERY
module-attribute
¶
DELETE_SCHEDULE_QUERY
module-attribute
¶
result_backend
¶
AsyncpgResultBackend
¶
AsyncpgResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
keep_results=True,
table_name="taskiq_results",
field_for_task_id="VarChar",
serializer=None,
*,
pool=None,
**connect_kwargs,
)
Bases: BasePostgresResultBackend
Result backend for TaskIQ based on asyncpg.
Construct a new AsyncpgResultBackend.
Parameters:
-
dsn(Callable[[], str] | str | None, default:'postgres://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable. Can be None if pool is provided.
-
keep_results(bool, default:True) –Whether to keep results after reading.
-
table_name(str, default:'taskiq_results') –Table to store results in.
-
field_for_task_id(Literal['VarChar', 'Text', 'Uuid'], default:'VarChar') –Column type for task_id.
-
serializer(TaskiqSerializer | None, default:None) –Serializer for task results.
-
pool(Pool[Any] | None, default:None) –An existing connection pool to reuse.
-
**connect_kwargs(Any, default:{}) –Extra kwargs for connection pool creation.
Source code in src/taskiq_pg/asyncpg/result_backend.py
startup
async
¶
Initialize the result backend.
Construct new connection pool (if not provided externally) and create new table for results if not exists.
Source code in src/taskiq_pg/asyncpg/result_backend.py
shutdown
async
¶
Close the connection pool if it was created by this backend.
set_result
async
¶
Set result to the PostgreSQL table.
:param task_id: ID of the task. :param result: result of the task.
Source code in src/taskiq_pg/asyncpg/result_backend.py
is_result_ready
async
¶
Returns whether the result is ready.
:param task_id: ID of the task. :returns: True if the result is ready else False.
Source code in src/taskiq_pg/asyncpg/result_backend.py
get_result
async
¶
Retrieve result from the task.
:param task_id: task's id. :param with_logs: if True it will download task's logs. (deprecated in taskiq) :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.
Source code in src/taskiq_pg/asyncpg/result_backend.py
set_progress
async
¶
Saves progress.
:param task_id: task's id. :param progress: progress of execution.
Source code in src/taskiq_pg/asyncpg/result_backend.py
get_progress
async
¶
Gets progress.
:param task_id: task's id.
Source code in src/taskiq_pg/asyncpg/result_backend.py
schedule_source
¶
AsyncpgScheduleSource
¶
AsyncpgScheduleSource(
broker,
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
table_name="taskiq_schedules",
*,
pool=None,
**connect_kwargs,
)
Bases: BasePostgresScheduleSource
Schedule source that uses asyncpg to store schedules in PostgreSQL.
Construct a new AsyncpgScheduleSource.
Parameters:
-
broker(AsyncBroker) –The TaskIQ broker instance.
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable. Ignored in pool mode.
-
table_name(str, default:'taskiq_schedules') –Table to store schedules in.
-
pool(Pool[Record] | None, default:None) –An existing connection pool to reuse.
-
**connect_kwargs(Any, default:{}) –Extra kwargs for connection pool creation.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
startup
async
¶
Initialize the schedule source.
Construct new connection pool (if not provided externally), create new table for schedules if not exists and fill table with schedules from task labels.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
shutdown
async
¶
Close the connection pool if it created by this schedule source.
get_schedules
async
¶
Fetch schedules from the database.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
add_schedule
async
¶
Add a new schedule.
Parameters:
-
schedule(ScheduledTask) –schedule to add.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
delete_schedule
async
¶
Method to delete schedule by id.
This is useful for schedule cancelation.
Parameters:
-
schedule_id(str) –id of schedule to delete.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
post_send
async
¶
extract_scheduled_tasks_from_broker
¶
Extract schedules from tasks that were registered in broker.
Returns:
-
list[ScheduledTask]–A list of ScheduledTask instances extracted from the task's labels.
Source code in src/taskiq_pg/_internal/schedule_source.py
exceptions
¶
DatabaseConnectionError
¶
Bases: BaseTaskiqPgError
Error if cannot connect to PostgreSQL.
ResultIsMissingError
¶
Bases: BaseTaskiqPgError
Error if cannot retrieve result from PostgreSQL.
psqlpy
¶
PSQLPyBroker
¶
PSQLPyBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: None = ...,
read_connection: None = ...,
)
PSQLPyBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: ConnectionPool,
read_connection: None = ...,
)
PSQLPyBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: None = ...,
read_connection: Connection,
)
PSQLPyBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: ConnectionPool,
read_connection: Connection,
)
PSQLPyBroker(
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
result_backend=None,
task_id_generator=None,
channel_name="taskiq",
table_name="taskiq_messages",
max_retry_attempts=5,
read_kwargs=None,
write_kwargs=None,
*,
write_pool=None,
read_connection=None,
)
Bases: BasePostgresBroker
Broker that uses PostgreSQL and PSQLPy with LISTEN/NOTIFY.
Construct a new PSQLPyBroker.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable.
-
result_backend(AsyncResultBackend[_T] | None, default:None) –Custom result backend.
-
task_id_generator(Callable[[], str] | None, default:None) –Custom task_id generator.
-
channel_name(str, default:'taskiq') –Name of the LISTEN/NOTIFY channel.
-
table_name(str, default:'taskiq_messages') –Name of the table used to store messages.
-
max_retry_attempts(int, default:5) –Maximum number of message processing attempts.
-
read_kwargs(dict[str, Any] | None, default:None) –Extra kwargs forwarded to
psqlpy.connect() -
write_kwargs(dict[str, Any] | None, default:None) –Extra kwargs forwarded to
psqlpy.ConnectionPool() -
write_pool(ConnectionPool | None, default:None) –An existing pool to reuse for writes.
-
read_connection(Connection | None, default:None) –An existing connection to reuse for LISTEN.
Source code in src/taskiq_pg/psqlpy/broker.py
dsn
property
¶
startup
async
¶
Initialize the broker.
Source code in src/taskiq_pg/psqlpy/broker.py
shutdown
async
¶
Close all connections on shutdown.
Source code in src/taskiq_pg/psqlpy/broker.py
kick
async
¶
Send message to the channel.
Inserts the message into the database and sends a NOTIFY.
:param message: Message to send.
Source code in src/taskiq_pg/psqlpy/broker.py
listen
async
¶
Listen to the channel.
Yields messages as they are received.
:yields: AckableMessage instances.
Source code in src/taskiq_pg/psqlpy/broker.py
PSQLPyResultBackend
¶
PSQLPyResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
keep_results=True,
table_name="taskiq_results",
field_for_task_id="VarChar",
serializer=None,
*,
pool=None,
**connect_kwargs,
)
Bases: BasePostgresResultBackend
Result backend for TaskIQ based on PSQLPy.
Construct a new PSQLPyResultBackend.
Parameters:
-
dsn(Callable[[], str] | str | None, default:'postgres://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable. Can be None if pool is provided.
-
keep_results(bool, default:True) –Whether to keep results after reading.
-
table_name(str, default:'taskiq_results') –Table to store results in.
-
field_for_task_id(Literal['VarChar', 'Text', 'Uuid'], default:'VarChar') –Column type for task_id.
-
serializer(TaskiqSerializer | None, default:None) –Serializer for task results.
-
pool(ConnectionPool | None, default:None) –An existing connection pool to reuse.
-
**connect_kwargs(Any, default:{}) –Extra kwargs for connection pool creation.
Source code in src/taskiq_pg/psqlpy/result_backend.py
startup
async
¶
Initialize the result backend.
Construct new connection pool (if not provided externally) and create new table for results if not exists.
Source code in src/taskiq_pg/psqlpy/result_backend.py
shutdown
async
¶
Close the connection pool if it was created by this result backend.
set_result
async
¶
Set result to the PostgreSQL table.
:param task_id: ID of the task. :param result: result of the task.
Source code in src/taskiq_pg/psqlpy/result_backend.py
is_result_ready
async
¶
Returns whether the result is ready.
:param task_id: ID of the task.
:returns: True if the result is ready else False.
Source code in src/taskiq_pg/psqlpy/result_backend.py
get_result
async
¶
Retrieve result from the task.
:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.
Source code in src/taskiq_pg/psqlpy/result_backend.py
set_progress
async
¶
Saves progress.
:param task_id: task's id. :param progress: progress of execution.
Source code in src/taskiq_pg/psqlpy/result_backend.py
get_progress
async
¶
Gets progress.
:param task_id: task's id.
Source code in src/taskiq_pg/psqlpy/result_backend.py
PSQLPyScheduleSource
¶
PSQLPyScheduleSource(
broker,
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
table_name="taskiq_schedules",
*,
pool=None,
**connect_kwargs,
)
Bases: BasePostgresScheduleSource
Schedule source that uses psqlpy to store schedules in PostgreSQL.
Construct a new PSQLPyScheduleSource.
Parameters:
-
broker(AsyncBroker) –The TaskIQ broker instance.
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable. Ignored in pool mode.
-
table_name(str, default:'taskiq_schedules') –Table to store schedules in.
-
pool(ConnectionPool | None, default:None) –An existing connection pool to reuse.
-
**connect_kwargs(Any, default:{}) –Extra kwargs for connection pool creation.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
extract_scheduled_tasks_from_broker
¶
Extract schedules from tasks that were registered in broker.
Returns:
-
list[ScheduledTask]–A list of ScheduledTask instances extracted from the task's labels.
Source code in src/taskiq_pg/_internal/schedule_source.py
startup
async
¶
Initialize the schedule source.
Construct new connection pool (if not provided externally), create new table for schedules if not exists and fill table with schedules from task labels.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
shutdown
async
¶
Close the connection pool if was created by this schedule source.
get_schedules
async
¶
Fetch schedules from the database.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
add_schedule
async
¶
Add a new schedule.
Parameters:
-
schedule(ScheduledTask) –schedule to add.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
delete_schedule
async
¶
Method to delete schedule by id.
This is useful for schedule cancelation.
Parameters:
-
schedule_id(str) –id of schedule to delete.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
post_send
async
¶
broker
¶
MessageRow
dataclass
¶
PSQLPyBroker
¶
PSQLPyBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: None = ...,
read_connection: None = ...,
)
PSQLPyBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: ConnectionPool,
read_connection: None = ...,
)
PSQLPyBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: None = ...,
read_connection: Connection,
)
PSQLPyBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: ConnectionPool,
read_connection: Connection,
)
PSQLPyBroker(
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
result_backend=None,
task_id_generator=None,
channel_name="taskiq",
table_name="taskiq_messages",
max_retry_attempts=5,
read_kwargs=None,
write_kwargs=None,
*,
write_pool=None,
read_connection=None,
)
Bases: BasePostgresBroker
Broker that uses PostgreSQL and PSQLPy with LISTEN/NOTIFY.
Construct a new PSQLPyBroker.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable.
-
result_backend(AsyncResultBackend[_T] | None, default:None) –Custom result backend.
-
task_id_generator(Callable[[], str] | None, default:None) –Custom task_id generator.
-
channel_name(str, default:'taskiq') –Name of the LISTEN/NOTIFY channel.
-
table_name(str, default:'taskiq_messages') –Name of the table used to store messages.
-
max_retry_attempts(int, default:5) –Maximum number of message processing attempts.
-
read_kwargs(dict[str, Any] | None, default:None) –Extra kwargs forwarded to
psqlpy.connect() -
write_kwargs(dict[str, Any] | None, default:None) –Extra kwargs forwarded to
psqlpy.ConnectionPool() -
write_pool(ConnectionPool | None, default:None) –An existing pool to reuse for writes.
-
read_connection(Connection | None, default:None) –An existing connection to reuse for LISTEN.
Source code in src/taskiq_pg/psqlpy/broker.py
dsn
property
¶
startup
async
¶
Initialize the broker.
Source code in src/taskiq_pg/psqlpy/broker.py
shutdown
async
¶
Close all connections on shutdown.
Source code in src/taskiq_pg/psqlpy/broker.py
kick
async
¶
Send message to the channel.
Inserts the message into the database and sends a NOTIFY.
:param message: Message to send.
Source code in src/taskiq_pg/psqlpy/broker.py
listen
async
¶
Listen to the channel.
Yields messages as they are received.
:yields: AckableMessage instances.
Source code in src/taskiq_pg/psqlpy/broker.py
queries
¶
CREATE_TABLE_QUERY
module-attribute
¶
CREATE_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n task_id {} UNIQUE,\n result BYTEA,\n progress BYTEA\n)\n"
ADD_PROGRESS_COLUMN_QUERY
module-attribute
¶
CREATE_INDEX_QUERY
module-attribute
¶
INSERT_RESULT_QUERY
module-attribute
¶
INSERT_RESULT_QUERY = "\nINSERT INTO {} VALUES ($1, $2, NULL)\nON CONFLICT (task_id)\nDO UPDATE\nSET result = $2\n"
INSERT_PROGRESS_QUERY
module-attribute
¶
INSERT_PROGRESS_QUERY = "\nINSERT INTO {} VALUES ($1, NULL, $2)\nON CONFLICT (task_id)\nDO UPDATE\nSET progress = $2\n"
SELECT_PROGRESS_QUERY
module-attribute
¶
IS_RESULT_EXISTS_QUERY
module-attribute
¶
IS_RESULT_EXISTS_QUERY = "\nSELECT EXISTS(\n SELECT 1 FROM {} WHERE task_id = $1 and result IS NOT NULL\n)\n"
SELECT_RESULT_QUERY
module-attribute
¶
DELETE_RESULT_QUERY
module-attribute
¶
CREATE_MESSAGE_TABLE_QUERY
module-attribute
¶
CREATE_MESSAGE_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n id SERIAL PRIMARY KEY,\n task_id VARCHAR NOT NULL,\n task_name VARCHAR NOT NULL,\n message TEXT NOT NULL,\n labels JSONB NOT NULL,\n status TEXT NOT NULL DEFAULT 'pending',\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"
INSERT_MESSAGE_QUERY
module-attribute
¶
INSERT_MESSAGE_QUERY = "\nINSERT INTO {} (task_id, task_name, message, labels)\nVALUES ($1, $2, $3, $4)\nRETURNING id\n"
CLAIM_MESSAGE_QUERY
module-attribute
¶
CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING *"
CREATE_SCHEDULES_TABLE_QUERY
module-attribute
¶
CREATE_SCHEDULES_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n id UUID PRIMARY KEY,\n task_name VARCHAR(100) NOT NULL,\n schedule JSONB NOT NULL,\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),\n updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"
INSERT_SCHEDULE_QUERY
module-attribute
¶
INSERT_SCHEDULE_QUERY = "\nINSERT INTO {} (id, task_name, schedule)\nVALUES ($1, $2, $3)\nON CONFLICT (id) DO UPDATE\nSET task_name = EXCLUDED.task_name,\n schedule = EXCLUDED.schedule,\n updated_at = NOW();\n"
SELECT_SCHEDULES_QUERY
module-attribute
¶
DELETE_SCHEDULE_QUERY
module-attribute
¶
result_backend
¶
PSQLPyResultBackend
¶
PSQLPyResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
keep_results=True,
table_name="taskiq_results",
field_for_task_id="VarChar",
serializer=None,
*,
pool=None,
**connect_kwargs,
)
Bases: BasePostgresResultBackend
Result backend for TaskIQ based on PSQLPy.
Construct a new PSQLPyResultBackend.
Parameters:
-
dsn(Callable[[], str] | str | None, default:'postgres://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable. Can be None if pool is provided.
-
keep_results(bool, default:True) –Whether to keep results after reading.
-
table_name(str, default:'taskiq_results') –Table to store results in.
-
field_for_task_id(Literal['VarChar', 'Text', 'Uuid'], default:'VarChar') –Column type for task_id.
-
serializer(TaskiqSerializer | None, default:None) –Serializer for task results.
-
pool(ConnectionPool | None, default:None) –An existing connection pool to reuse.
-
**connect_kwargs(Any, default:{}) –Extra kwargs for connection pool creation.
Source code in src/taskiq_pg/psqlpy/result_backend.py
startup
async
¶
Initialize the result backend.
Construct new connection pool (if not provided externally) and create new table for results if not exists.
Source code in src/taskiq_pg/psqlpy/result_backend.py
shutdown
async
¶
Close the connection pool if it was created by this result backend.
set_result
async
¶
Set result to the PostgreSQL table.
:param task_id: ID of the task. :param result: result of the task.
Source code in src/taskiq_pg/psqlpy/result_backend.py
is_result_ready
async
¶
Returns whether the result is ready.
:param task_id: ID of the task.
:returns: True if the result is ready else False.
Source code in src/taskiq_pg/psqlpy/result_backend.py
get_result
async
¶
Retrieve result from the task.
:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.
Source code in src/taskiq_pg/psqlpy/result_backend.py
set_progress
async
¶
Saves progress.
:param task_id: task's id. :param progress: progress of execution.
Source code in src/taskiq_pg/psqlpy/result_backend.py
get_progress
async
¶
Gets progress.
:param task_id: task's id.
Source code in src/taskiq_pg/psqlpy/result_backend.py
schedule_source
¶
PSQLPyScheduleSource
¶
PSQLPyScheduleSource(
broker,
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
table_name="taskiq_schedules",
*,
pool=None,
**connect_kwargs,
)
Bases: BasePostgresScheduleSource
Schedule source that uses psqlpy to store schedules in PostgreSQL.
Construct a new PSQLPyScheduleSource.
Parameters:
-
broker(AsyncBroker) –The TaskIQ broker instance.
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable. Ignored in pool mode.
-
table_name(str, default:'taskiq_schedules') –Table to store schedules in.
-
pool(ConnectionPool | None, default:None) –An existing connection pool to reuse.
-
**connect_kwargs(Any, default:{}) –Extra kwargs for connection pool creation.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
startup
async
¶
Initialize the schedule source.
Construct new connection pool (if not provided externally), create new table for schedules if not exists and fill table with schedules from task labels.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
shutdown
async
¶
Close the connection pool if was created by this schedule source.
get_schedules
async
¶
Fetch schedules from the database.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
add_schedule
async
¶
Add a new schedule.
Parameters:
-
schedule(ScheduledTask) –schedule to add.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
delete_schedule
async
¶
Method to delete schedule by id.
This is useful for schedule cancelation.
Parameters:
-
schedule_id(str) –id of schedule to delete.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
post_send
async
¶
extract_scheduled_tasks_from_broker
¶
Extract schedules from tasks that were registered in broker.
Returns:
-
list[ScheduledTask]–A list of ScheduledTask instances extracted from the task's labels.
Source code in src/taskiq_pg/_internal/schedule_source.py
psycopg
¶
PsycopgBroker
¶
PsycopgBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: None = ...,
read_connection: None = ...,
)
PsycopgBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: AsyncConnectionPool,
read_connection: None = ...,
)
PsycopgBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: None = ...,
read_connection: AsyncConnection,
)
PsycopgBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: AsyncConnectionPool,
read_connection: AsyncConnection,
)
PsycopgBroker(
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
result_backend=None,
task_id_generator=None,
channel_name="taskiq",
table_name="taskiq_messages",
max_retry_attempts=5,
read_kwargs=None,
write_kwargs=None,
*,
write_pool=None,
read_connection=None,
)
Bases: BasePostgresBroker
Broker that uses PostgreSQL and psycopg with LISTEN/NOTIFY.
Construct a new PsycopgBroker.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable.
-
result_backend(AsyncResultBackend[_T] | None, default:None) –Custom result backend.
-
task_id_generator(Callable[[], str] | None, default:None) –Custom task_id generator.
-
channel_name(str, default:'taskiq') –Name of the LISTEN/NOTIFY channel.
-
table_name(str, default:'taskiq_messages') –Name of the table used to store messages.
-
max_retry_attempts(int, default:5) –Maximum number of message processing attempts.
-
read_kwargs(dict[str, Any] | None, default:None) –Extra kwargs forwarded to
AsyncConnection.connect(). -
write_kwargs(dict[str, Any] | None, default:None) –Extra kwargs forwarded to
AsyncConnectionPool(). -
write_pool(AsyncConnectionPool | None, default:None) –An existing connection pool to reuse for writes.
-
read_connection(AsyncConnection | None, default:None) –An existing connection to reuse for LISTEN.
Source code in src/taskiq_pg/psycopg/broker.py
dsn
property
¶
startup
async
¶
Initialize the broker.
Source code in src/taskiq_pg/psycopg/broker.py
shutdown
async
¶
Close all connections on shutdown.
Source code in src/taskiq_pg/psycopg/broker.py
kick
async
¶
Send message to the channel.
Inserts the message into the database and sends a NOTIFY.
:param message: Message to send.
Source code in src/taskiq_pg/psycopg/broker.py
listen
async
¶
Listen to the channel.
Yields messages as they are received.
:yields: AckableMessage instances.
Source code in src/taskiq_pg/psycopg/broker.py
PsycopgResultBackend
¶
PsycopgResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
keep_results=True,
table_name="taskiq_results",
field_for_task_id="VarChar",
serializer=None,
*,
pool=None,
**connect_kwargs,
)
Bases: BasePostgresResultBackend
Result backend for TaskIQ based on psycopg.
Construct a new PsycopgResultBackend.
Parameters:
-
dsn(Callable[[], str] | str | None, default:'postgres://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable. Can be None if pool is provided.
-
keep_results(bool, default:True) –Whether to keep results after reading.
-
table_name(str, default:'taskiq_results') –Table to store results in.
-
field_for_task_id(Literal['VarChar', 'Text', 'Uuid'], default:'VarChar') –Column type for task_id.
-
serializer(TaskiqSerializer | None, default:None) –Serializer for task results.
-
pool(AsyncConnectionPool | None, default:None) –An existing connection pool to reuse.
-
**connect_kwargs(Any, default:{}) –Extra kwargs for connection pool creation.
Source code in src/taskiq_pg/psycopg/result_backend.py
startup
async
¶
Initialize the result backend.
Construct new connection pool (if not provided externally) and create new table for results if not exists.
Source code in src/taskiq_pg/psycopg/result_backend.py
shutdown
async
¶
Close the connection pool (only if owned by this backend).
set_result
async
¶
Set result to the PostgreSQL table.
:param task_id: ID of the task. :param result: result of the task.
Source code in src/taskiq_pg/psycopg/result_backend.py
is_result_ready
async
¶
Returns whether the result is ready.
:param task_id: ID of the task.
:returns: True if the result is ready else False.
Source code in src/taskiq_pg/psycopg/result_backend.py
get_result
async
¶
Retrieve result from the task.
:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.
Source code in src/taskiq_pg/psycopg/result_backend.py
set_progress
async
¶
Saves progress.
:param task_id: task's id. :param progress: progress of execution.
Source code in src/taskiq_pg/psycopg/result_backend.py
get_progress
async
¶
Gets progress.
:param task_id: task's id.
Source code in src/taskiq_pg/psycopg/result_backend.py
PsycopgScheduleSource
¶
PsycopgScheduleSource(
broker,
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
table_name="taskiq_schedules",
*,
pool=None,
**connect_kwargs,
)
Bases: BasePostgresScheduleSource
Schedule source that uses psycopg to store schedules in PostgreSQL.
Construct a new PsycopgScheduleSource.
Parameters:
-
broker(AsyncBroker) –The TaskIQ broker instance.
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable. Ignored in pool mode.
-
table_name(str, default:'taskiq_schedules') –Table to store schedules in.
-
pool(AsyncConnectionPool | None, default:None) –An existing connection pool to reuse.
-
**connect_kwargs(Any, default:{}) –Extra kwargs for connection pool creation.
Source code in src/taskiq_pg/psycopg/schedule_source.py
extract_scheduled_tasks_from_broker
¶
Extract schedules from tasks that were registered in broker.
Returns:
-
list[ScheduledTask]–A list of ScheduledTask instances extracted from the task's labels.
Source code in src/taskiq_pg/_internal/schedule_source.py
startup
async
¶
Initialize the schedule source.
Construct new connection pool (if not provided externally), create new table for schedules if not exists and fill table with schedules from task labels.
Source code in src/taskiq_pg/psycopg/schedule_source.py
shutdown
async
¶
Close the connection pool (only if owned by this source).
get_schedules
async
¶
Fetch schedules from the database.
Source code in src/taskiq_pg/psycopg/schedule_source.py
add_schedule
async
¶
Add a new schedule.
Parameters:
-
schedule(ScheduledTask) –schedule to add.
Source code in src/taskiq_pg/psycopg/schedule_source.py
delete_schedule
async
¶
Method to delete schedule by id.
This is useful for schedule cancelation.
Parameters:
-
schedule_id(str) –id of schedule to delete.
Source code in src/taskiq_pg/psycopg/schedule_source.py
post_send
async
¶
broker
¶
PsycopgBroker
¶
PsycopgBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: None = ...,
read_connection: None = ...,
)
PsycopgBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: AsyncConnectionPool,
read_connection: None = ...,
)
PsycopgBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: None = ...,
read_connection: AsyncConnection,
)
PsycopgBroker(
dsn: str | Callable[[], str] = ...,
result_backend: AsyncResultBackend[_T] | None = ...,
task_id_generator: Callable[[], str] | None = ...,
channel_name: str = ...,
table_name: str = ...,
max_retry_attempts: int = ...,
read_kwargs: dict[str, Any] | None = ...,
write_kwargs: dict[str, Any] | None = ...,
*,
write_pool: AsyncConnectionPool,
read_connection: AsyncConnection,
)
PsycopgBroker(
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
result_backend=None,
task_id_generator=None,
channel_name="taskiq",
table_name="taskiq_messages",
max_retry_attempts=5,
read_kwargs=None,
write_kwargs=None,
*,
write_pool=None,
read_connection=None,
)
Bases: BasePostgresBroker
Broker that uses PostgreSQL and psycopg with LISTEN/NOTIFY.
Construct a new PsycopgBroker.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable.
-
result_backend(AsyncResultBackend[_T] | None, default:None) –Custom result backend.
-
task_id_generator(Callable[[], str] | None, default:None) –Custom task_id generator.
-
channel_name(str, default:'taskiq') –Name of the LISTEN/NOTIFY channel.
-
table_name(str, default:'taskiq_messages') –Name of the table used to store messages.
-
max_retry_attempts(int, default:5) –Maximum number of message processing attempts.
-
read_kwargs(dict[str, Any] | None, default:None) –Extra kwargs forwarded to
AsyncConnection.connect(). -
write_kwargs(dict[str, Any] | None, default:None) –Extra kwargs forwarded to
AsyncConnectionPool(). -
write_pool(AsyncConnectionPool | None, default:None) –An existing connection pool to reuse for writes.
-
read_connection(AsyncConnection | None, default:None) –An existing connection to reuse for LISTEN.
Source code in src/taskiq_pg/psycopg/broker.py
dsn
property
¶
startup
async
¶
Initialize the broker.
Source code in src/taskiq_pg/psycopg/broker.py
shutdown
async
¶
Close all connections on shutdown.
Source code in src/taskiq_pg/psycopg/broker.py
kick
async
¶
Send message to the channel.
Inserts the message into the database and sends a NOTIFY.
:param message: Message to send.
Source code in src/taskiq_pg/psycopg/broker.py
listen
async
¶
Listen to the channel.
Yields messages as they are received.
:yields: AckableMessage instances.
Source code in src/taskiq_pg/psycopg/broker.py
queries
¶
CREATE_TABLE_QUERY
module-attribute
¶
CREATE_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n task_id {} UNIQUE,\n result BYTEA,\n progress BYTEA\n)\n"
ADD_PROGRESS_COLUMN_QUERY
module-attribute
¶
CREATE_INDEX_QUERY
module-attribute
¶
INSERT_RESULT_QUERY
module-attribute
¶
INSERT_RESULT_QUERY = "\nINSERT INTO {} VALUES (%s, %s)\nON CONFLICT (task_id)\nDO UPDATE\nSET result = EXCLUDED.result;\n"
INSERT_PROGRESS_QUERY
module-attribute
¶
INSERT_PROGRESS_QUERY = "\nINSERT INTO {} VALUES (%s, NULL, %s)\nON CONFLICT (task_id)\nDO UPDATE\nSET progress = %s\n"
SELECT_PROGRESS_QUERY
module-attribute
¶
IS_RESULT_EXISTS_QUERY
module-attribute
¶
IS_RESULT_EXISTS_QUERY = "\nSELECT EXISTS(\n SELECT 1 FROM {} WHERE task_id = %s and result IS NOT NULL\n);\n"
SELECT_RESULT_QUERY
module-attribute
¶
DELETE_RESULT_QUERY
module-attribute
¶
CREATE_MESSAGE_TABLE_QUERY
module-attribute
¶
CREATE_MESSAGE_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n id SERIAL PRIMARY KEY,\n task_id VARCHAR NOT NULL,\n task_name VARCHAR NOT NULL,\n message TEXT NOT NULL,\n labels JSONB NOT NULL,\n status TEXT NOT NULL DEFAULT 'pending',\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"
INSERT_MESSAGE_QUERY
module-attribute
¶
INSERT_MESSAGE_QUERY = "\nINSERT INTO {} (task_id, task_name, message, labels)\nVALUES (%s, %s, %s, %s)\nRETURNING id\n"
CLAIM_MESSAGE_QUERY
module-attribute
¶
CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = %s AND status = 'pending' RETURNING *"
CREATE_SCHEDULES_TABLE_QUERY
module-attribute
¶
CREATE_SCHEDULES_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n id UUID PRIMARY KEY,\n task_name VARCHAR(100) NOT NULL,\n schedule JSONB NOT NULL,\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),\n updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"
INSERT_SCHEDULE_QUERY
module-attribute
¶
INSERT_SCHEDULE_QUERY = "\nINSERT INTO {} (id, task_name, schedule)\nVALUES (%s, %s, %s)\nON CONFLICT (id) DO UPDATE\nSET task_name = EXCLUDED.task_name,\n schedule = EXCLUDED.schedule,\n updated_at = NOW();\n"
SELECT_SCHEDULES_QUERY
module-attribute
¶
DELETE_SCHEDULE_QUERY
module-attribute
¶
result_backend
¶
PsycopgResultBackend
¶
PsycopgResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
keep_results=True,
table_name="taskiq_results",
field_for_task_id="VarChar",
serializer=None,
*,
pool=None,
**connect_kwargs,
)
Bases: BasePostgresResultBackend
Result backend for TaskIQ based on psycopg.
Construct a new PsycopgResultBackend.
Parameters:
-
dsn(Callable[[], str] | str | None, default:'postgres://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable. Can be None if pool is provided.
-
keep_results(bool, default:True) –Whether to keep results after reading.
-
table_name(str, default:'taskiq_results') –Table to store results in.
-
field_for_task_id(Literal['VarChar', 'Text', 'Uuid'], default:'VarChar') –Column type for task_id.
-
serializer(TaskiqSerializer | None, default:None) –Serializer for task results.
-
pool(AsyncConnectionPool | None, default:None) –An existing connection pool to reuse.
-
**connect_kwargs(Any, default:{}) –Extra kwargs for connection pool creation.
Source code in src/taskiq_pg/psycopg/result_backend.py
startup
async
¶
Initialize the result backend.
Construct new connection pool (if not provided externally) and create new table for results if not exists.
Source code in src/taskiq_pg/psycopg/result_backend.py
shutdown
async
¶
Close the connection pool (only if owned by this backend).
set_result
async
¶
Set result to the PostgreSQL table.
:param task_id: ID of the task. :param result: result of the task.
Source code in src/taskiq_pg/psycopg/result_backend.py
is_result_ready
async
¶
Returns whether the result is ready.
:param task_id: ID of the task.
:returns: True if the result is ready else False.
Source code in src/taskiq_pg/psycopg/result_backend.py
get_result
async
¶
Retrieve result from the task.
:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.
Source code in src/taskiq_pg/psycopg/result_backend.py
set_progress
async
¶
Saves progress.
:param task_id: task's id. :param progress: progress of execution.
Source code in src/taskiq_pg/psycopg/result_backend.py
get_progress
async
¶
Gets progress.
:param task_id: task's id.
Source code in src/taskiq_pg/psycopg/result_backend.py
schedule_source
¶
PsycopgScheduleSource
¶
PsycopgScheduleSource(
broker,
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
table_name="taskiq_schedules",
*,
pool=None,
**connect_kwargs,
)
Bases: BasePostgresScheduleSource
Schedule source that uses psycopg to store schedules in PostgreSQL.
Construct a new PsycopgScheduleSource.
Parameters:
-
broker(AsyncBroker) –The TaskIQ broker instance.
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string or callable. Ignored in pool mode.
-
table_name(str, default:'taskiq_schedules') –Table to store schedules in.
-
pool(AsyncConnectionPool | None, default:None) –An existing connection pool to reuse.
-
**connect_kwargs(Any, default:{}) –Extra kwargs for connection pool creation.
Source code in src/taskiq_pg/psycopg/schedule_source.py
startup
async
¶
Initialize the schedule source.
Construct new connection pool (if not provided externally), create new table for schedules if not exists and fill table with schedules from task labels.
Source code in src/taskiq_pg/psycopg/schedule_source.py
shutdown
async
¶
Close the connection pool (only if owned by this source).
get_schedules
async
¶
Fetch schedules from the database.
Source code in src/taskiq_pg/psycopg/schedule_source.py
add_schedule
async
¶
Add a new schedule.
Parameters:
-
schedule(ScheduledTask) –schedule to add.
Source code in src/taskiq_pg/psycopg/schedule_source.py
delete_schedule
async
¶
Method to delete schedule by id.
This is useful for schedule cancelation.
Parameters:
-
schedule_id(str) –id of schedule to delete.
Source code in src/taskiq_pg/psycopg/schedule_source.py
post_send
async
¶
extract_scheduled_tasks_from_broker
¶
Extract schedules from tasks that were registered in broker.
Returns:
-
list[ScheduledTask]–A list of ScheduledTask instances extracted from the task's labels.