taskiq_pg
¶
aiopg
¶
AiopgResultBackend
¶
AiopgResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
keep_results=True,
table_name="taskiq_results",
field_for_task_id="VarChar",
serializer=None,
**connect_kwargs,
)
Bases: BasePostgresResultBackend
Result backend for TaskIQ based on Aiopg.
Construct new result backend.
Parameters:
-
dsn(Callable[[], str] | str | None, default:'postgres://postgres:postgres@localhost:5432/postgres') –connection string to PostgreSQL, or callable returning one.
-
keep_results(bool, default:True) –flag to not remove results from the database after reading.
-
table_name(str, default:'taskiq_results') –name of the table to store results.
-
field_for_task_id(Literal['VarChar', 'Text', 'Uuid'], default:'VarChar') –type of the field to store task_id.
-
serializer(TaskiqSerializer | None, default:None) –serializer class to serialize/deserialize result from task.
-
connect_kwargs(Any, default:{}) –additional arguments for creating connection pool.
Source code in src/taskiq_pg/_internal/result_backend.py
startup
async
¶
Initialize the result backend.
Construct new connection pool and create new table for results if not exists.
Source code in src/taskiq_pg/aiopg/result_backend.py
shutdown
async
¶
set_result
async
¶
Set result to the PostgreSQL table.
Parameters:
-
task_id(Any) –ID of the task.
-
result(TaskiqResult[_ReturnType]) –result of the task.
Source code in src/taskiq_pg/aiopg/result_backend.py
is_result_ready
async
¶
Return whether the result is ready.
Parameters:
-
task_id(Any) –ID of the task.
Returns:
-
bool(bool) –True if the result is ready else False.
Source code in src/taskiq_pg/aiopg/result_backend.py
get_result
async
¶
Retrieve result from the task.
:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.
Source code in src/taskiq_pg/aiopg/result_backend.py
AiopgScheduleSource
¶
AiopgScheduleSource(
broker,
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
table_name="taskiq_schedules",
**connect_kwargs,
)
Bases: BasePostgresScheduleSource
Schedule source that uses aiopg to store schedules in PostgreSQL.
Initialize the PostgreSQL scheduler source.
Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks across application restarts.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string
-
table_name(str, default:'taskiq_schedules') –Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.
-
broker(AsyncBroker) –The TaskIQ broker instance to use for finding and managing tasks. Required if startup_schedule is provided.
-
**connect_kwargs(Any, default:{}) –Additional keyword arguments passed to the database connection pool.
Source code in src/taskiq_pg/_internal/schedule_source.py
extract_scheduled_tasks_from_broker
¶
Extract schedules from tasks that were registered in broker.
Returns:
-
list[ScheduledTask]–A list of ScheduledTask instances extracted from the task's labels.
Source code in src/taskiq_pg/_internal/schedule_source.py
startup
async
¶
Initialize the schedule source.
Construct new connection pool, create new table for schedules if not exists and fill table with schedules from task labels.
Source code in src/taskiq_pg/aiopg/schedule_source.py
shutdown
async
¶
get_schedules
async
¶
Fetch schedules from the database.
Source code in src/taskiq_pg/aiopg/schedule_source.py
add_schedule
async
¶
Add a new schedule.
Parameters:
-
schedule(ScheduledTask) –schedule to add.
Source code in src/taskiq_pg/aiopg/schedule_source.py
delete_schedule
async
¶
Method to delete schedule by id.
This is useful for schedule cancelation.
Parameters:
-
schedule_id(str) –id of schedule to delete.
Source code in src/taskiq_pg/aiopg/schedule_source.py
post_send
async
¶
broker
¶
queries
¶
CREATE_TABLE_QUERY
module-attribute
¶
CREATE_INDEX_QUERY
module-attribute
¶
INSERT_RESULT_QUERY
module-attribute
¶
INSERT_RESULT_QUERY = "\nINSERT INTO {} VALUES (%s, %s)\nON CONFLICT (task_id)\nDO UPDATE\nSET result = %s\n"
IS_RESULT_EXISTS_QUERY
module-attribute
¶
SELECT_RESULT_QUERY
module-attribute
¶
DELETE_RESULT_QUERY
module-attribute
¶
CREATE_SCHEDULES_TABLE_QUERY
module-attribute
¶
CREATE_SCHEDULES_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n id UUID PRIMARY KEY,\n task_name VARCHAR(100) NOT NULL,\n schedule JSONB NOT NULL,\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),\n updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"
INSERT_SCHEDULE_QUERY
module-attribute
¶
INSERT_SCHEDULE_QUERY = "\nINSERT INTO {} (id, task_name, schedule)\nVALUES (%s, %s, %s)\nON CONFLICT (id) DO UPDATE\nSET task_name = EXCLUDED.task_name,\n schedule = EXCLUDED.schedule,\n updated_at = NOW();\n"
SELECT_SCHEDULES_QUERY
module-attribute
¶
DELETE_SCHEDULE_QUERY
module-attribute
¶
result_backend
¶
AiopgResultBackend
¶
AiopgResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
keep_results=True,
table_name="taskiq_results",
field_for_task_id="VarChar",
serializer=None,
**connect_kwargs,
)
Bases: BasePostgresResultBackend
Result backend for TaskIQ based on Aiopg.
Construct new result backend.
Parameters:
-
dsn(Callable[[], str] | str | None, default:'postgres://postgres:postgres@localhost:5432/postgres') –connection string to PostgreSQL, or callable returning one.
-
keep_results(bool, default:True) –flag to not remove results from the database after reading.
-
table_name(str, default:'taskiq_results') –name of the table to store results.
-
field_for_task_id(Literal['VarChar', 'Text', 'Uuid'], default:'VarChar') –type of the field to store task_id.
-
serializer(TaskiqSerializer | None, default:None) –serializer class to serialize/deserialize result from task.
-
connect_kwargs(Any, default:{}) –additional arguments for creating connection pool.
Source code in src/taskiq_pg/_internal/result_backend.py
startup
async
¶
Initialize the result backend.
Construct new connection pool and create new table for results if not exists.
Source code in src/taskiq_pg/aiopg/result_backend.py
shutdown
async
¶
set_result
async
¶
Set result to the PostgreSQL table.
Parameters:
-
task_id(Any) –ID of the task.
-
result(TaskiqResult[_ReturnType]) –result of the task.
Source code in src/taskiq_pg/aiopg/result_backend.py
is_result_ready
async
¶
Return whether the result is ready.
Parameters:
-
task_id(Any) –ID of the task.
Returns:
-
bool(bool) –True if the result is ready else False.
Source code in src/taskiq_pg/aiopg/result_backend.py
get_result
async
¶
Retrieve result from the task.
:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.
Source code in src/taskiq_pg/aiopg/result_backend.py
schedule_source
¶
AiopgScheduleSource
¶
AiopgScheduleSource(
broker,
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
table_name="taskiq_schedules",
**connect_kwargs,
)
Bases: BasePostgresScheduleSource
Schedule source that uses aiopg to store schedules in PostgreSQL.
Initialize the PostgreSQL scheduler source.
Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks across application restarts.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string
-
table_name(str, default:'taskiq_schedules') –Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.
-
broker(AsyncBroker) –The TaskIQ broker instance to use for finding and managing tasks. Required if startup_schedule is provided.
-
**connect_kwargs(Any, default:{}) –Additional keyword arguments passed to the database connection pool.
Source code in src/taskiq_pg/_internal/schedule_source.py
startup
async
¶
Initialize the schedule source.
Construct new connection pool, create new table for schedules if not exists and fill table with schedules from task labels.
Source code in src/taskiq_pg/aiopg/schedule_source.py
shutdown
async
¶
get_schedules
async
¶
Fetch schedules from the database.
Source code in src/taskiq_pg/aiopg/schedule_source.py
add_schedule
async
¶
Add a new schedule.
Parameters:
-
schedule(ScheduledTask) –schedule to add.
Source code in src/taskiq_pg/aiopg/schedule_source.py
delete_schedule
async
¶
Method to delete schedule by id.
This is useful for schedule cancelation.
Parameters:
-
schedule_id(str) –id of schedule to delete.
Source code in src/taskiq_pg/aiopg/schedule_source.py
post_send
async
¶
extract_scheduled_tasks_from_broker
¶
Extract schedules from tasks that were registered in broker.
Returns:
-
list[ScheduledTask]–A list of ScheduledTask instances extracted from the task's labels.
Source code in src/taskiq_pg/_internal/schedule_source.py
asyncpg
¶
AsyncpgBroker
¶
AsyncpgBroker(
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
result_backend=None,
task_id_generator=None,
channel_name="taskiq",
table_name="taskiq_messages",
max_retry_attempts=5,
read_kwargs=None,
write_kwargs=None,
)
Bases: BasePostgresBroker
Broker that uses asyncpg as driver and PostgreSQL with LISTEN/NOTIFY mechanism.
Construct a new broker.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –connection string to PostgreSQL, or callable returning one.
-
result_backend(AsyncResultBackend[_T] | None, default:None) –Custom result backend.
-
task_id_generator(Callable[[], str] | None, default:None) –Custom task_id generator.
-
channel_name(str, default:'taskiq') –Name of the channel to listen on.
-
table_name(str, default:'taskiq_messages') –Name of the table to store messages.
-
max_retry_attempts(int, default:5) –Maximum number of message processing attempts.
-
read_kwargs(dict[str, Any] | None, default:None) –Additional arguments for read connection creation.
-
write_kwargs(dict[str, Any] | None, default:None) –Additional arguments for write pool creation.
Source code in src/taskiq_pg/_internal/broker.py
dsn
property
¶
startup
async
¶
Initialize the broker.
Source code in src/taskiq_pg/asyncpg/broker.py
shutdown
async
¶
Close all connections on shutdown.
Source code in src/taskiq_pg/asyncpg/broker.py
kick
async
¶
Send message to the channel.
Inserts the message into the database and sends a NOTIFY.
:param message: Message to send.
Source code in src/taskiq_pg/asyncpg/broker.py
listen
async
¶
Listen to the channel.
Yields messages as they are received.
:yields: AckableMessage instances.
Source code in src/taskiq_pg/asyncpg/broker.py
AsyncpgResultBackend
¶
AsyncpgResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
keep_results=True,
table_name="taskiq_results",
field_for_task_id="VarChar",
serializer=None,
**connect_kwargs,
)
Bases: BasePostgresResultBackend
Result backend for TaskIQ based on asyncpg.
Construct new result backend.
Parameters:
-
dsn(Callable[[], str] | str | None, default:'postgres://postgres:postgres@localhost:5432/postgres') –connection string to PostgreSQL, or callable returning one.
-
keep_results(bool, default:True) –flag to not remove results from the database after reading.
-
table_name(str, default:'taskiq_results') –name of the table to store results.
-
field_for_task_id(Literal['VarChar', 'Text', 'Uuid'], default:'VarChar') –type of the field to store task_id.
-
serializer(TaskiqSerializer | None, default:None) –serializer class to serialize/deserialize result from task.
-
connect_kwargs(Any, default:{}) –additional arguments for creating connection pool.
Source code in src/taskiq_pg/_internal/result_backend.py
startup
async
¶
Initialize the result backend.
Construct new connection pool and create new table for results if not exists.
Source code in src/taskiq_pg/asyncpg/result_backend.py
shutdown
async
¶
set_result
async
¶
Set result to the PostgreSQL table.
:param task_id: ID of the task. :param result: result of the task.
Source code in src/taskiq_pg/asyncpg/result_backend.py
is_result_ready
async
¶
Returns whether the result is ready.
:param task_id: ID of the task. :returns: True if the result is ready else False.
Source code in src/taskiq_pg/asyncpg/result_backend.py
get_result
async
¶
Retrieve result from the task.
:param task_id: task's id. :param with_logs: if True it will download task's logs. (deprecated in taskiq) :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.
Source code in src/taskiq_pg/asyncpg/result_backend.py
AsyncpgScheduleSource
¶
AsyncpgScheduleSource(
broker,
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
table_name="taskiq_schedules",
**connect_kwargs,
)
Bases: BasePostgresScheduleSource
Schedule source that uses asyncpg to store schedules in PostgreSQL.
Initialize the PostgreSQL scheduler source.
Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks across application restarts.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string
-
table_name(str, default:'taskiq_schedules') –Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.
-
broker(AsyncBroker) –The TaskIQ broker instance to use for finding and managing tasks. Required if startup_schedule is provided.
-
**connect_kwargs(Any, default:{}) –Additional keyword arguments passed to the database connection pool.
Source code in src/taskiq_pg/_internal/schedule_source.py
extract_scheduled_tasks_from_broker
¶
Extract schedules from tasks that were registered in broker.
Returns:
-
list[ScheduledTask]–A list of ScheduledTask instances extracted from the task's labels.
Source code in src/taskiq_pg/_internal/schedule_source.py
startup
async
¶
Initialize the schedule source.
Construct new connection pool, create new table for schedules if not exists and fill table with schedules from task labels.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
shutdown
async
¶
get_schedules
async
¶
Fetch schedules from the database.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
add_schedule
async
¶
Add a new schedule.
Parameters:
-
schedule(ScheduledTask) –schedule to add.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
delete_schedule
async
¶
Method to delete schedule by id.
This is useful for schedule cancelation.
Parameters:
-
schedule_id(str) –id of schedule to delete.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
post_send
async
¶
broker
¶
AsyncpgBroker
¶
AsyncpgBroker(
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
result_backend=None,
task_id_generator=None,
channel_name="taskiq",
table_name="taskiq_messages",
max_retry_attempts=5,
read_kwargs=None,
write_kwargs=None,
)
Bases: BasePostgresBroker
Broker that uses asyncpg as driver and PostgreSQL with LISTEN/NOTIFY mechanism.
Construct a new broker.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –connection string to PostgreSQL, or callable returning one.
-
result_backend(AsyncResultBackend[_T] | None, default:None) –Custom result backend.
-
task_id_generator(Callable[[], str] | None, default:None) –Custom task_id generator.
-
channel_name(str, default:'taskiq') –Name of the channel to listen on.
-
table_name(str, default:'taskiq_messages') –Name of the table to store messages.
-
max_retry_attempts(int, default:5) –Maximum number of message processing attempts.
-
read_kwargs(dict[str, Any] | None, default:None) –Additional arguments for read connection creation.
-
write_kwargs(dict[str, Any] | None, default:None) –Additional arguments for write pool creation.
Source code in src/taskiq_pg/_internal/broker.py
dsn
property
¶
startup
async
¶
Initialize the broker.
Source code in src/taskiq_pg/asyncpg/broker.py
shutdown
async
¶
Close all connections on shutdown.
Source code in src/taskiq_pg/asyncpg/broker.py
kick
async
¶
Send message to the channel.
Inserts the message into the database and sends a NOTIFY.
:param message: Message to send.
Source code in src/taskiq_pg/asyncpg/broker.py
listen
async
¶
Listen to the channel.
Yields messages as they are received.
:yields: AckableMessage instances.
Source code in src/taskiq_pg/asyncpg/broker.py
queries
¶
CREATE_TABLE_QUERY
module-attribute
¶
CREATE_INDEX_QUERY
module-attribute
¶
INSERT_RESULT_QUERY
module-attribute
¶
INSERT_RESULT_QUERY = "\nINSERT INTO {} VALUES ($1, $2)\nON CONFLICT (task_id)\nDO UPDATE\nSET result = $2\n"
IS_RESULT_EXISTS_QUERY
module-attribute
¶
SELECT_RESULT_QUERY
module-attribute
¶
DELETE_RESULT_QUERY
module-attribute
¶
CREATE_MESSAGE_TABLE_QUERY
module-attribute
¶
CREATE_MESSAGE_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n id SERIAL PRIMARY KEY,\n task_id VARCHAR NOT NULL,\n task_name VARCHAR NOT NULL,\n message TEXT NOT NULL,\n labels JSONB NOT NULL,\n status TEXT NOT NULL DEFAULT 'pending',\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"
INSERT_MESSAGE_QUERY
module-attribute
¶
INSERT_MESSAGE_QUERY = "\nINSERT INTO {} (task_id, task_name, message, labels)\nVALUES ($1, $2, $3, $4)\nRETURNING id\n"
CLAIM_MESSAGE_QUERY
module-attribute
¶
CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING id, message"
CREATE_SCHEDULES_TABLE_QUERY
module-attribute
¶
CREATE_SCHEDULES_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n id UUID PRIMARY KEY,\n task_name VARCHAR(100) NOT NULL,\n schedule JSONB NOT NULL,\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),\n updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"
INSERT_SCHEDULE_QUERY
module-attribute
¶
INSERT_SCHEDULE_QUERY = "\nINSERT INTO {} (id, task_name, schedule)\nVALUES ($1, $2, $3)\nON CONFLICT (id) DO UPDATE\nSET task_name = EXCLUDED.task_name,\n schedule = EXCLUDED.schedule,\n updated_at = NOW();\n"
SELECT_SCHEDULES_QUERY
module-attribute
¶
DELETE_SCHEDULE_QUERY
module-attribute
¶
result_backend
¶
AsyncpgResultBackend
¶
AsyncpgResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
keep_results=True,
table_name="taskiq_results",
field_for_task_id="VarChar",
serializer=None,
**connect_kwargs,
)
Bases: BasePostgresResultBackend
Result backend for TaskIQ based on asyncpg.
Construct new result backend.
Parameters:
-
dsn(Callable[[], str] | str | None, default:'postgres://postgres:postgres@localhost:5432/postgres') –connection string to PostgreSQL, or callable returning one.
-
keep_results(bool, default:True) –flag to not remove results from the database after reading.
-
table_name(str, default:'taskiq_results') –name of the table to store results.
-
field_for_task_id(Literal['VarChar', 'Text', 'Uuid'], default:'VarChar') –type of the field to store task_id.
-
serializer(TaskiqSerializer | None, default:None) –serializer class to serialize/deserialize result from task.
-
connect_kwargs(Any, default:{}) –additional arguments for creating connection pool.
Source code in src/taskiq_pg/_internal/result_backend.py
startup
async
¶
Initialize the result backend.
Construct new connection pool and create new table for results if not exists.
Source code in src/taskiq_pg/asyncpg/result_backend.py
shutdown
async
¶
set_result
async
¶
Set result to the PostgreSQL table.
:param task_id: ID of the task. :param result: result of the task.
Source code in src/taskiq_pg/asyncpg/result_backend.py
is_result_ready
async
¶
Returns whether the result is ready.
:param task_id: ID of the task. :returns: True if the result is ready else False.
Source code in src/taskiq_pg/asyncpg/result_backend.py
get_result
async
¶
Retrieve result from the task.
:param task_id: task's id. :param with_logs: if True it will download task's logs. (deprecated in taskiq) :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.
Source code in src/taskiq_pg/asyncpg/result_backend.py
schedule_source
¶
AsyncpgScheduleSource
¶
AsyncpgScheduleSource(
broker,
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
table_name="taskiq_schedules",
**connect_kwargs,
)
Bases: BasePostgresScheduleSource
Schedule source that uses asyncpg to store schedules in PostgreSQL.
Initialize the PostgreSQL scheduler source.
Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks across application restarts.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string
-
table_name(str, default:'taskiq_schedules') –Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.
-
broker(AsyncBroker) –The TaskIQ broker instance to use for finding and managing tasks. Required if startup_schedule is provided.
-
**connect_kwargs(Any, default:{}) –Additional keyword arguments passed to the database connection pool.
Source code in src/taskiq_pg/_internal/schedule_source.py
startup
async
¶
Initialize the schedule source.
Construct new connection pool, create new table for schedules if not exists and fill table with schedules from task labels.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
shutdown
async
¶
get_schedules
async
¶
Fetch schedules from the database.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
add_schedule
async
¶
Add a new schedule.
Parameters:
-
schedule(ScheduledTask) –schedule to add.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
delete_schedule
async
¶
Method to delete schedule by id.
This is useful for schedule cancelation.
Parameters:
-
schedule_id(str) –id of schedule to delete.
Source code in src/taskiq_pg/asyncpg/schedule_source.py
post_send
async
¶
extract_scheduled_tasks_from_broker
¶
Extract schedules from tasks that were registered in broker.
Returns:
-
list[ScheduledTask]–A list of ScheduledTask instances extracted from the task's labels.
Source code in src/taskiq_pg/_internal/schedule_source.py
exceptions
¶
DatabaseConnectionError
¶
Bases: BaseTaskiqPgError
Error if cannot connect to PostgreSQL.
ResultIsMissingError
¶
Bases: BaseTaskiqPgError
Error if cannot retrieve result from PostgreSQL.
psqlpy
¶
PSQLPyBroker
¶
PSQLPyBroker(
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
result_backend=None,
task_id_generator=None,
channel_name="taskiq",
table_name="taskiq_messages",
max_retry_attempts=5,
read_kwargs=None,
write_kwargs=None,
)
Bases: BasePostgresBroker
Broker that uses PostgreSQL and PSQLPy with LISTEN/NOTIFY.
Construct a new broker.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –connection string to PostgreSQL, or callable returning one.
-
result_backend(AsyncResultBackend[_T] | None, default:None) –Custom result backend.
-
task_id_generator(Callable[[], str] | None, default:None) –Custom task_id generator.
-
channel_name(str, default:'taskiq') –Name of the channel to listen on.
-
table_name(str, default:'taskiq_messages') –Name of the table to store messages.
-
max_retry_attempts(int, default:5) –Maximum number of message processing attempts.
-
read_kwargs(dict[str, Any] | None, default:None) –Additional arguments for read connection creation.
-
write_kwargs(dict[str, Any] | None, default:None) –Additional arguments for write pool creation.
Source code in src/taskiq_pg/_internal/broker.py
dsn
property
¶
startup
async
¶
Initialize the broker.
Source code in src/taskiq_pg/psqlpy/broker.py
shutdown
async
¶
Close all connections on shutdown.
Source code in src/taskiq_pg/psqlpy/broker.py
kick
async
¶
Send message to the channel.
Inserts the message into the database and sends a NOTIFY.
:param message: Message to send.
Source code in src/taskiq_pg/psqlpy/broker.py
listen
async
¶
Listen to the channel.
Yields messages as they are received.
:yields: AckableMessage instances.
Source code in src/taskiq_pg/psqlpy/broker.py
PSQLPyResultBackend
¶
PSQLPyResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
keep_results=True,
table_name="taskiq_results",
field_for_task_id="VarChar",
serializer=None,
**connect_kwargs,
)
Bases: BasePostgresResultBackend
Result backend for TaskIQ based on PSQLPy.
Construct new result backend.
Parameters:
-
dsn(Callable[[], str] | str | None, default:'postgres://postgres:postgres@localhost:5432/postgres') –connection string to PostgreSQL, or callable returning one.
-
keep_results(bool, default:True) –flag to not remove results from the database after reading.
-
table_name(str, default:'taskiq_results') –name of the table to store results.
-
field_for_task_id(Literal['VarChar', 'Text', 'Uuid'], default:'VarChar') –type of the field to store task_id.
-
serializer(TaskiqSerializer | None, default:None) –serializer class to serialize/deserialize result from task.
-
connect_kwargs(Any, default:{}) –additional arguments for creating connection pool.
Source code in src/taskiq_pg/_internal/result_backend.py
startup
async
¶
Initialize the result backend.
Construct new connection pool and create new table for results if not exists.
Source code in src/taskiq_pg/psqlpy/result_backend.py
shutdown
async
¶
set_result
async
¶
Set result to the PostgreSQL table.
:param task_id: ID of the task. :param result: result of the task.
Source code in src/taskiq_pg/psqlpy/result_backend.py
is_result_ready
async
¶
Returns whether the result is ready.
:param task_id: ID of the task.
:returns: True if the result is ready else False.
Source code in src/taskiq_pg/psqlpy/result_backend.py
get_result
async
¶
Retrieve result from the task.
:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.
Source code in src/taskiq_pg/psqlpy/result_backend.py
PSQLPyScheduleSource
¶
PSQLPyScheduleSource(
broker,
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
table_name="taskiq_schedules",
**connect_kwargs,
)
Bases: BasePostgresScheduleSource
Schedule source that uses psqlpy to store schedules in PostgreSQL.
Initialize the PostgreSQL scheduler source.
Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks across application restarts.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string
-
table_name(str, default:'taskiq_schedules') –Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.
-
broker(AsyncBroker) –The TaskIQ broker instance to use for finding and managing tasks. Required if startup_schedule is provided.
-
**connect_kwargs(Any, default:{}) –Additional keyword arguments passed to the database connection pool.
Source code in src/taskiq_pg/_internal/schedule_source.py
extract_scheduled_tasks_from_broker
¶
Extract schedules from tasks that were registered in broker.
Returns:
-
list[ScheduledTask]–A list of ScheduledTask instances extracted from the task's labels.
Source code in src/taskiq_pg/_internal/schedule_source.py
startup
async
¶
Initialize the schedule source.
Construct new connection pool, create new table for schedules if not exists and fill table with schedules from task labels.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
shutdown
async
¶
get_schedules
async
¶
Fetch schedules from the database.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
add_schedule
async
¶
Add a new schedule.
Parameters:
-
schedule(ScheduledTask) –schedule to add.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
delete_schedule
async
¶
Method to delete schedule by id.
This is useful for schedule cancelation.
Parameters:
-
schedule_id(str) –id of schedule to delete.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
post_send
async
¶
broker
¶
MessageRow
dataclass
¶
PSQLPyBroker
¶
PSQLPyBroker(
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
result_backend=None,
task_id_generator=None,
channel_name="taskiq",
table_name="taskiq_messages",
max_retry_attempts=5,
read_kwargs=None,
write_kwargs=None,
)
Bases: BasePostgresBroker
Broker that uses PostgreSQL and PSQLPy with LISTEN/NOTIFY.
Construct a new broker.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –connection string to PostgreSQL, or callable returning one.
-
result_backend(AsyncResultBackend[_T] | None, default:None) –Custom result backend.
-
task_id_generator(Callable[[], str] | None, default:None) –Custom task_id generator.
-
channel_name(str, default:'taskiq') –Name of the channel to listen on.
-
table_name(str, default:'taskiq_messages') –Name of the table to store messages.
-
max_retry_attempts(int, default:5) –Maximum number of message processing attempts.
-
read_kwargs(dict[str, Any] | None, default:None) –Additional arguments for read connection creation.
-
write_kwargs(dict[str, Any] | None, default:None) –Additional arguments for write pool creation.
Source code in src/taskiq_pg/_internal/broker.py
dsn
property
¶
startup
async
¶
Initialize the broker.
Source code in src/taskiq_pg/psqlpy/broker.py
shutdown
async
¶
Close all connections on shutdown.
Source code in src/taskiq_pg/psqlpy/broker.py
kick
async
¶
Send message to the channel.
Inserts the message into the database and sends a NOTIFY.
:param message: Message to send.
Source code in src/taskiq_pg/psqlpy/broker.py
listen
async
¶
Listen to the channel.
Yields messages as they are received.
:yields: AckableMessage instances.
Source code in src/taskiq_pg/psqlpy/broker.py
queries
¶
CREATE_TABLE_QUERY
module-attribute
¶
CREATE_INDEX_QUERY
module-attribute
¶
INSERT_RESULT_QUERY
module-attribute
¶
INSERT_RESULT_QUERY = "\nINSERT INTO {} VALUES ($1, $2)\nON CONFLICT (task_id)\nDO UPDATE\nSET result = $2\n"
IS_RESULT_EXISTS_QUERY
module-attribute
¶
SELECT_RESULT_QUERY
module-attribute
¶
DELETE_RESULT_QUERY
module-attribute
¶
CREATE_MESSAGE_TABLE_QUERY
module-attribute
¶
CREATE_MESSAGE_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n id SERIAL PRIMARY KEY,\n task_id VARCHAR NOT NULL,\n task_name VARCHAR NOT NULL,\n message TEXT NOT NULL,\n labels JSONB NOT NULL,\n status TEXT NOT NULL DEFAULT 'pending',\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"
INSERT_MESSAGE_QUERY
module-attribute
¶
INSERT_MESSAGE_QUERY = "\nINSERT INTO {} (task_id, task_name, message, labels)\nVALUES ($1, $2, $3, $4)\nRETURNING id\n"
CLAIM_MESSAGE_QUERY
module-attribute
¶
CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING *"
CREATE_SCHEDULES_TABLE_QUERY
module-attribute
¶
CREATE_SCHEDULES_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n id UUID PRIMARY KEY,\n task_name VARCHAR(100) NOT NULL,\n schedule JSONB NOT NULL,\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),\n updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"
INSERT_SCHEDULE_QUERY
module-attribute
¶
INSERT_SCHEDULE_QUERY = "\nINSERT INTO {} (id, task_name, schedule)\nVALUES ($1, $2, $3)\nON CONFLICT (id) DO UPDATE\nSET task_name = EXCLUDED.task_name,\n schedule = EXCLUDED.schedule,\n updated_at = NOW();\n"
SELECT_SCHEDULES_QUERY
module-attribute
¶
DELETE_SCHEDULE_QUERY
module-attribute
¶
result_backend
¶
PSQLPyResultBackend
¶
PSQLPyResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
keep_results=True,
table_name="taskiq_results",
field_for_task_id="VarChar",
serializer=None,
**connect_kwargs,
)
Bases: BasePostgresResultBackend
Result backend for TaskIQ based on PSQLPy.
Construct new result backend.
Parameters:
-
dsn(Callable[[], str] | str | None, default:'postgres://postgres:postgres@localhost:5432/postgres') –connection string to PostgreSQL, or callable returning one.
-
keep_results(bool, default:True) –flag to not remove results from the database after reading.
-
table_name(str, default:'taskiq_results') –name of the table to store results.
-
field_for_task_id(Literal['VarChar', 'Text', 'Uuid'], default:'VarChar') –type of the field to store task_id.
-
serializer(TaskiqSerializer | None, default:None) –serializer class to serialize/deserialize result from task.
-
connect_kwargs(Any, default:{}) –additional arguments for creating connection pool.
Source code in src/taskiq_pg/_internal/result_backend.py
startup
async
¶
Initialize the result backend.
Construct new connection pool and create new table for results if not exists.
Source code in src/taskiq_pg/psqlpy/result_backend.py
shutdown
async
¶
set_result
async
¶
Set result to the PostgreSQL table.
:param task_id: ID of the task. :param result: result of the task.
Source code in src/taskiq_pg/psqlpy/result_backend.py
is_result_ready
async
¶
Returns whether the result is ready.
:param task_id: ID of the task.
:returns: True if the result is ready else False.
Source code in src/taskiq_pg/psqlpy/result_backend.py
get_result
async
¶
Retrieve result from the task.
:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.
Source code in src/taskiq_pg/psqlpy/result_backend.py
schedule_source
¶
PSQLPyScheduleSource
¶
PSQLPyScheduleSource(
broker,
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
table_name="taskiq_schedules",
**connect_kwargs,
)
Bases: BasePostgresScheduleSource
Schedule source that uses psqlpy to store schedules in PostgreSQL.
Initialize the PostgreSQL scheduler source.
Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks across application restarts.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string
-
table_name(str, default:'taskiq_schedules') –Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.
-
broker(AsyncBroker) –The TaskIQ broker instance to use for finding and managing tasks. Required if startup_schedule is provided.
-
**connect_kwargs(Any, default:{}) –Additional keyword arguments passed to the database connection pool.
Source code in src/taskiq_pg/_internal/schedule_source.py
startup
async
¶
Initialize the schedule source.
Construct new connection pool, create new table for schedules if not exists and fill table with schedules from task labels.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
shutdown
async
¶
get_schedules
async
¶
Fetch schedules from the database.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
add_schedule
async
¶
Add a new schedule.
Parameters:
-
schedule(ScheduledTask) –schedule to add.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
delete_schedule
async
¶
Method to delete schedule by id.
This is useful for schedule cancelation.
Parameters:
-
schedule_id(str) –id of schedule to delete.
Source code in src/taskiq_pg/psqlpy/schedule_source.py
post_send
async
¶
extract_scheduled_tasks_from_broker
¶
Extract schedules from tasks that were registered in broker.
Returns:
-
list[ScheduledTask]–A list of ScheduledTask instances extracted from the task's labels.
Source code in src/taskiq_pg/_internal/schedule_source.py
psycopg
¶
PsycopgBroker
¶
PsycopgBroker(
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
result_backend=None,
task_id_generator=None,
channel_name="taskiq",
table_name="taskiq_messages",
max_retry_attempts=5,
read_kwargs=None,
write_kwargs=None,
)
Bases: BasePostgresBroker
Broker that uses PostgreSQL and psycopg with LISTEN/NOTIFY.
Construct a new broker.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –connection string to PostgreSQL, or callable returning one.
-
result_backend(AsyncResultBackend[_T] | None, default:None) –Custom result backend.
-
task_id_generator(Callable[[], str] | None, default:None) –Custom task_id generator.
-
channel_name(str, default:'taskiq') –Name of the channel to listen on.
-
table_name(str, default:'taskiq_messages') –Name of the table to store messages.
-
max_retry_attempts(int, default:5) –Maximum number of message processing attempts.
-
read_kwargs(dict[str, Any] | None, default:None) –Additional arguments for read connection creation.
-
write_kwargs(dict[str, Any] | None, default:None) –Additional arguments for write pool creation.
Source code in src/taskiq_pg/_internal/broker.py
dsn
property
¶
startup
async
¶
Initialize the broker.
Source code in src/taskiq_pg/psycopg/broker.py
shutdown
async
¶
Close all connections on shutdown.
Source code in src/taskiq_pg/psycopg/broker.py
kick
async
¶
Send message to the channel.
Inserts the message into the database and sends a NOTIFY.
:param message: Message to send.
Source code in src/taskiq_pg/psycopg/broker.py
listen
async
¶
Listen to the channel.
Yields messages as they are received.
:yields: AckableMessage instances.
Source code in src/taskiq_pg/psycopg/broker.py
PsycopgResultBackend
¶
PsycopgResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
keep_results=True,
table_name="taskiq_results",
field_for_task_id="VarChar",
serializer=None,
**connect_kwargs,
)
Bases: BasePostgresResultBackend
Result backend for TaskIQ based on psycopg.
Construct new result backend.
Parameters:
-
dsn(Callable[[], str] | str | None, default:'postgres://postgres:postgres@localhost:5432/postgres') –connection string to PostgreSQL, or callable returning one.
-
keep_results(bool, default:True) –flag to not remove results from the database after reading.
-
table_name(str, default:'taskiq_results') –name of the table to store results.
-
field_for_task_id(Literal['VarChar', 'Text', 'Uuid'], default:'VarChar') –type of the field to store task_id.
-
serializer(TaskiqSerializer | None, default:None) –serializer class to serialize/deserialize result from task.
-
connect_kwargs(Any, default:{}) –additional arguments for creating connection pool.
Source code in src/taskiq_pg/_internal/result_backend.py
startup
async
¶
Initialize the result backend.
Construct new connection pool and create new table for results if not exists.
Source code in src/taskiq_pg/psycopg/result_backend.py
shutdown
async
¶
set_result
async
¶
Set result to the PostgreSQL table.
:param task_id: ID of the task. :param result: result of the task.
Source code in src/taskiq_pg/psycopg/result_backend.py
is_result_ready
async
¶
Returns whether the result is ready.
:param task_id: ID of the task.
:returns: True if the result is ready else False.
Source code in src/taskiq_pg/psycopg/result_backend.py
get_result
async
¶
Retrieve result from the task.
:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.
Source code in src/taskiq_pg/psycopg/result_backend.py
PsycopgScheduleSource
¶
PsycopgScheduleSource(
broker,
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
table_name="taskiq_schedules",
**connect_kwargs,
)
Bases: BasePostgresScheduleSource
Schedule source that uses psycopg to store schedules in PostgreSQL.
Initialize the PostgreSQL scheduler source.
Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks across application restarts.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string
-
table_name(str, default:'taskiq_schedules') –Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.
-
broker(AsyncBroker) –The TaskIQ broker instance to use for finding and managing tasks. Required if startup_schedule is provided.
-
**connect_kwargs(Any, default:{}) –Additional keyword arguments passed to the database connection pool.
Source code in src/taskiq_pg/_internal/schedule_source.py
extract_scheduled_tasks_from_broker
¶
Extract schedules from tasks that were registered in broker.
Returns:
-
list[ScheduledTask]–A list of ScheduledTask instances extracted from the task's labels.
Source code in src/taskiq_pg/_internal/schedule_source.py
startup
async
¶
Initialize the schedule source.
Construct new connection pool, create new table for schedules if not exists and fill table with schedules from task labels.
Source code in src/taskiq_pg/psycopg/schedule_source.py
shutdown
async
¶
get_schedules
async
¶
Fetch schedules from the database.
Source code in src/taskiq_pg/psycopg/schedule_source.py
add_schedule
async
¶
Add a new schedule.
Parameters:
-
schedule(ScheduledTask) –schedule to add.
Source code in src/taskiq_pg/psycopg/schedule_source.py
delete_schedule
async
¶
Method to delete schedule by id.
This is useful for schedule cancelation.
Parameters:
-
schedule_id(str) –id of schedule to delete.
Source code in src/taskiq_pg/psycopg/schedule_source.py
post_send
async
¶
broker
¶
PsycopgBroker
¶
PsycopgBroker(
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
result_backend=None,
task_id_generator=None,
channel_name="taskiq",
table_name="taskiq_messages",
max_retry_attempts=5,
read_kwargs=None,
write_kwargs=None,
)
Bases: BasePostgresBroker
Broker that uses PostgreSQL and psycopg with LISTEN/NOTIFY.
Construct a new broker.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –connection string to PostgreSQL, or callable returning one.
-
result_backend(AsyncResultBackend[_T] | None, default:None) –Custom result backend.
-
task_id_generator(Callable[[], str] | None, default:None) –Custom task_id generator.
-
channel_name(str, default:'taskiq') –Name of the channel to listen on.
-
table_name(str, default:'taskiq_messages') –Name of the table to store messages.
-
max_retry_attempts(int, default:5) –Maximum number of message processing attempts.
-
read_kwargs(dict[str, Any] | None, default:None) –Additional arguments for read connection creation.
-
write_kwargs(dict[str, Any] | None, default:None) –Additional arguments for write pool creation.
Source code in src/taskiq_pg/_internal/broker.py
dsn
property
¶
startup
async
¶
Initialize the broker.
Source code in src/taskiq_pg/psycopg/broker.py
shutdown
async
¶
Close all connections on shutdown.
Source code in src/taskiq_pg/psycopg/broker.py
kick
async
¶
Send message to the channel.
Inserts the message into the database and sends a NOTIFY.
:param message: Message to send.
Source code in src/taskiq_pg/psycopg/broker.py
listen
async
¶
Listen to the channel.
Yields messages as they are received.
:yields: AckableMessage instances.
Source code in src/taskiq_pg/psycopg/broker.py
queries
¶
CREATE_TABLE_QUERY
module-attribute
¶
CREATE_INDEX_QUERY
module-attribute
¶
INSERT_RESULT_QUERY
module-attribute
¶
INSERT_RESULT_QUERY = "\nINSERT INTO {} VALUES (%s, %s)\nON CONFLICT (task_id)\nDO UPDATE\nSET result = EXCLUDED.result;\n"
IS_RESULT_EXISTS_QUERY
module-attribute
¶
SELECT_RESULT_QUERY
module-attribute
¶
DELETE_RESULT_QUERY
module-attribute
¶
CREATE_MESSAGE_TABLE_QUERY
module-attribute
¶
CREATE_MESSAGE_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n id SERIAL PRIMARY KEY,\n task_id VARCHAR NOT NULL,\n task_name VARCHAR NOT NULL,\n message TEXT NOT NULL,\n labels JSONB NOT NULL,\n status TEXT NOT NULL DEFAULT 'pending',\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"
INSERT_MESSAGE_QUERY
module-attribute
¶
INSERT_MESSAGE_QUERY = "\nINSERT INTO {} (task_id, task_name, message, labels)\nVALUES (%s, %s, %s, %s)\nRETURNING id\n"
CLAIM_MESSAGE_QUERY
module-attribute
¶
CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = %s AND status = 'pending' RETURNING *"
CREATE_SCHEDULES_TABLE_QUERY
module-attribute
¶
CREATE_SCHEDULES_TABLE_QUERY = "\nCREATE TABLE IF NOT EXISTS {} (\n id UUID PRIMARY KEY,\n task_name VARCHAR(100) NOT NULL,\n schedule JSONB NOT NULL,\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),\n updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()\n);\n"
INSERT_SCHEDULE_QUERY
module-attribute
¶
INSERT_SCHEDULE_QUERY = "\nINSERT INTO {} (id, task_name, schedule)\nVALUES (%s, %s, %s)\nON CONFLICT (id) DO UPDATE\nSET task_name = EXCLUDED.task_name,\n schedule = EXCLUDED.schedule,\n updated_at = NOW();\n"
SELECT_SCHEDULES_QUERY
module-attribute
¶
DELETE_SCHEDULE_QUERY
module-attribute
¶
result_backend
¶
PsycopgResultBackend
¶
PsycopgResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
keep_results=True,
table_name="taskiq_results",
field_for_task_id="VarChar",
serializer=None,
**connect_kwargs,
)
Bases: BasePostgresResultBackend
Result backend for TaskIQ based on psycopg.
Construct new result backend.
Parameters:
-
dsn(Callable[[], str] | str | None, default:'postgres://postgres:postgres@localhost:5432/postgres') –connection string to PostgreSQL, or callable returning one.
-
keep_results(bool, default:True) –flag to not remove results from the database after reading.
-
table_name(str, default:'taskiq_results') –name of the table to store results.
-
field_for_task_id(Literal['VarChar', 'Text', 'Uuid'], default:'VarChar') –type of the field to store task_id.
-
serializer(TaskiqSerializer | None, default:None) –serializer class to serialize/deserialize result from task.
-
connect_kwargs(Any, default:{}) –additional arguments for creating connection pool.
Source code in src/taskiq_pg/_internal/result_backend.py
startup
async
¶
Initialize the result backend.
Construct new connection pool and create new table for results if not exists.
Source code in src/taskiq_pg/psycopg/result_backend.py
shutdown
async
¶
set_result
async
¶
Set result to the PostgreSQL table.
:param task_id: ID of the task. :param result: result of the task.
Source code in src/taskiq_pg/psycopg/result_backend.py
is_result_ready
async
¶
Returns whether the result is ready.
:param task_id: ID of the task.
:returns: True if the result is ready else False.
Source code in src/taskiq_pg/psycopg/result_backend.py
get_result
async
¶
Retrieve result from the task.
:param task_id: task's id. :param with_logs: if True it will download task's logs. :raises ResultIsMissingError: if there is no result when trying to get it. :return: TaskiqResult.
Source code in src/taskiq_pg/psycopg/result_backend.py
schedule_source
¶
PsycopgScheduleSource
¶
PsycopgScheduleSource(
broker,
dsn="postgresql://postgres:postgres@localhost:5432/postgres",
table_name="taskiq_schedules",
**connect_kwargs,
)
Bases: BasePostgresScheduleSource
Schedule source that uses psycopg to store schedules in PostgreSQL.
Initialize the PostgreSQL scheduler source.
Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks across application restarts.
Parameters:
-
dsn(str | Callable[[], str], default:'postgresql://postgres:postgres@localhost:5432/postgres') –PostgreSQL connection string
-
table_name(str, default:'taskiq_schedules') –Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist.
-
broker(AsyncBroker) –The TaskIQ broker instance to use for finding and managing tasks. Required if startup_schedule is provided.
-
**connect_kwargs(Any, default:{}) –Additional keyword arguments passed to the database connection pool.
Source code in src/taskiq_pg/_internal/schedule_source.py
startup
async
¶
Initialize the schedule source.
Construct new connection pool, create new table for schedules if not exists and fill table with schedules from task labels.
Source code in src/taskiq_pg/psycopg/schedule_source.py
shutdown
async
¶
get_schedules
async
¶
Fetch schedules from the database.
Source code in src/taskiq_pg/psycopg/schedule_source.py
add_schedule
async
¶
Add a new schedule.
Parameters:
-
schedule(ScheduledTask) –schedule to add.
Source code in src/taskiq_pg/psycopg/schedule_source.py
delete_schedule
async
¶
Method to delete schedule by id.
This is useful for schedule cancelation.
Parameters:
-
schedule_id(str) –id of schedule to delete.
Source code in src/taskiq_pg/psycopg/schedule_source.py
post_send
async
¶
extract_scheduled_tasks_from_broker
¶
Extract schedules from tasks that were registered in broker.
Returns:
-
list[ScheduledTask]–A list of ScheduledTask instances extracted from the task's labels.