Overview
PostgreSQL integration for Taskiq with support for asyncpg, psqlpy, psycopg and aiopg drivers.
Motivation¶
There are too many libraries for PostgreSQL and Taskiq integration. Although they have different view on interface and different functionality. To address this issue I created this library with a common interface for most popular PostgreSQL drivers that handle similarity across functionality of:
- result backends;
- brokers;
- schedule sources.
Installation¶
Depending on your preferred PostgreSQL driver, you can install this library with the corresponding extra:
Quick start¶
Basic task processing¶
-
Define your broker:
# broker_example.py import asyncio from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn)) @broker.task("solve_all_problems") async def best_task_ever() -> None: """Solve all problems in the world.""" await asyncio.sleep(2) print("All problems are solved!") async def main(): await broker.startup() task = await best_task_ever.kiq() print(await task.wait_result()) await broker.shutdown() if __name__ == "__main__": asyncio.run(main())# broker_example.py import asyncio from taskiq_pg.psqlpy import PSQLPyBroker, PSQLPyResultBackend dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" broker = PSQLPyBroker(dsn).with_result_backend(PSQLPyResultBackend(dsn)) @broker.task("solve_all_problems") async def best_task_ever() -> None: """Solve all problems in the world.""" await asyncio.sleep(2) print("All problems are solved!") async def main(): await broker.startup() task = await best_task_ever.kiq() print(await task.wait_result()) await broker.shutdown() if __name__ == "__main__": asyncio.run(main())# broker_example.py import asyncio from taskiq_pg.psycopg import PsycopgBroker, PsycopgResultBackend dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" broker = PsycopgBroker(dsn).with_result_backend(PsycopgResultBackend(dsn)) @broker.task("solve_all_problems") async def best_task_ever() -> None: """Solve all problems in the world.""" await asyncio.sleep(2) print("All problems are solved!") async def main(): await broker.startup() task = await best_task_ever.kiq() print(await task.wait_result()) await broker.shutdown() if __name__ == "__main__": asyncio.run(main())# broker_example.py import asyncio from taskiq_pg.aiopg import AiopgBroker, AiopgResultBackend dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" broker = AiopgBroker(dsn).with_result_backend(AiopgResultBackend(dsn)) @broker.task("solve_all_problems") async def best_task_ever() -> None: """Solve all problems in the world.""" await asyncio.sleep(2) print("All problems are solved!") async def main(): await broker.startup() task = await best_task_ever.kiq() print(await task.wait_result()) await broker.shutdown() if __name__ == "__main__": asyncio.run(main()) -
Start a worker to process tasks (by default taskiq runs two instances of worker):
-
Run
broker_example.pyfile to send a task to the worker:
Your experience with other drivers will be pretty similar. Just change the import statement and that's it.
Task scheduling¶
-
Define your broker and schedule source:
# scheduler_example.py import asyncio from taskiq import TaskiqScheduler from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" broker = AsyncpgBroker(dsn) scheduler = TaskiqScheduler( broker=broker, sources=[AsyncpgScheduleSource( dsn=dsn, broker=broker, )], ) @broker.task( task_name="solve_all_problems", schedule=[ { "cron": "*/1 * * * *", # type: str, either cron or time should be specified. "cron_offset": None, # type: str | None, can be omitted. "time": None, # type: datetime | None, either cron or time should be specified. "args": [], # type list[Any] | None, can be omitted. "kwargs": {}, # type: dict[str, Any] | None, can be omitted. "labels": {}, # type: dict[str, Any] | None, can be omitted. }, ], ) async def best_task_ever() -> None: """Solve all problems in the world.""" await asyncio.sleep(2) print("All problems are solved!")# scheduler_example.py import asyncio from taskiq import TaskiqScheduler from taskiq_pg.psqlpy import PSQLPyBroker, PSQLPyScheduleSource dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" broker = PSQLPyBroker(dsn) scheduler = TaskiqScheduler( broker=broker, sources=[PSQLPyScheduleSource( dsn=dsn, broker=broker, )], ) @broker.task( task_name="solve_all_problems", schedule=[ { "cron": "*/1 * * * *", # type: str, either cron or time should be specified. "cron_offset": None, # type: str | None, can be omitted. "time": None, # type: datetime | None, either cron or time should be specified. "args": [], # type list[Any] | None, can be omitted. "kwargs": {}, # type: dict[str, Any] | None, can be omitted. "labels": {}, # type: dict[str, Any] | None, can be omitted. }, ], ) async def best_task_ever() -> None: """Solve all problems in the world.""" await asyncio.sleep(2) print("All problems are solved!")# scheduler_example.py import asyncio from taskiq import TaskiqScheduler from taskiq_pg.psycopg import PsycopgBroker, PsycopgScheduleSource dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" broker = PsycopgBroker(dsn) scheduler = TaskiqScheduler( broker=broker, sources=[PsycopgScheduleSource( dsn=dsn, broker=broker, )], ) @broker.task( task_name="solve_all_problems", schedule=[ { "cron": "*/1 * * * *", # type: str, either cron or time should be specified. "cron_offset": None, # type: str | None, can be omitted. "time": None, # type: datetime | None, either cron or time should be specified. "args": [], # type list[Any] | None, can be omitted. "kwargs": {}, # type: dict[str, Any] | None, can be omitted. "labels": {}, # type: dict[str, Any] | None, can be omitted. }, ], ) async def best_task_ever() -> None: """Solve all problems in the world.""" await asyncio.sleep(2) print("All problems are solved!")# scheduler_example.py import asyncio from taskiq import TaskiqScheduler from taskiq_pg.aiopg import AiopgBroker, AiopgScheduleSource dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" broker = AiopgBroker(dsn) scheduler = TaskiqScheduler( broker=broker, sources=[AiopgScheduleSource( dsn=dsn, broker=broker, )], ) @broker.task( task_name="solve_all_problems", schedule=[ { "cron": "*/1 * * * *", # type: str, either cron or time should be specified. "cron_offset": None, # type: str | None, can be omitted. "time": None, # type: datetime | None, either cron or time should be specified. "args": [], # type list[Any] | None, can be omitted. "kwargs": {}, # type: dict[str, Any] | None, can be omitted. "labels": {}, # type: dict[str, Any] | None, can be omitted. }, ], ) async def best_task_ever() -> None: """Solve all problems in the world.""" await asyncio.sleep(2) print("All problems are solved!") -
Start worker processes:
-
Run scheduler process:
