Перейти к содержанию

Overview

PyPI - Python Version PyPI Checks


PostgreSQL integration for Taskiq with support for asyncpg, psqlpy, psycopg and aiopg drivers.

Motivation

There are too many libraries for PostgreSQL and Taskiq integration. Although they have different view on interface and different functionality. To address this issue I created this library with a common interface for most popular PostgreSQL drivers that handle similarity across functionality of:

  • result backends;
  • brokers;
  • schedule sources.

Installation

Depending on your preferred PostgreSQL driver, you can install this library with the corresponding extra:

pip install taskiq-postgres[asyncpg]
pip install taskiq-postgres[psqlpy]
pip install taskiq-postgres[psycopg]
pip install taskiq-postgres[aiopg]

Quick start

Basic task processing

  1. Define your broker:

    # broker_example.py
    import asyncio
    from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend
    
    
    dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
    broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn))
    
    
    @broker.task("solve_all_problems")
    async def best_task_ever() -> None:
        """Solve all problems in the world."""
        await asyncio.sleep(2)
        print("All problems are solved!")
    
    
    async def main():
        await broker.startup()
        task = await best_task_ever.kiq()
        print(await task.wait_result())
        await broker.shutdown()
    
    
    if __name__ == "__main__":
        asyncio.run(main())
    
    # broker_example.py
    import asyncio
    from taskiq_pg.psqlpy import PSQLPyBroker, PSQLPyResultBackend
    
    
    dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
    broker = PSQLPyBroker(dsn).with_result_backend(PSQLPyResultBackend(dsn))
    
    
    @broker.task("solve_all_problems")
    async def best_task_ever() -> None:
        """Solve all problems in the world."""
        await asyncio.sleep(2)
        print("All problems are solved!")
    
    
    async def main():
        await broker.startup()
        task = await best_task_ever.kiq()
        print(await task.wait_result())
        await broker.shutdown()
    
    
    if __name__ == "__main__":
        asyncio.run(main())
    
    # broker_example.py
    import asyncio
    from taskiq_pg.psycopg import PsycopgBroker, PsycopgResultBackend
    
    
    dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
    broker = PsycopgBroker(dsn).with_result_backend(PsycopgResultBackend(dsn))
    
    
    @broker.task("solve_all_problems")
    async def best_task_ever() -> None:
        """Solve all problems in the world."""
        await asyncio.sleep(2)
        print("All problems are solved!")
    
    
    async def main():
        await broker.startup()
        task = await best_task_ever.kiq()
        print(await task.wait_result())
        await broker.shutdown()
    
    
    if __name__ == "__main__":
        asyncio.run(main())
    
    # broker_example.py
    import asyncio
    from taskiq_pg.aiopg import AiopgBroker, AiopgResultBackend
    
    
    dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
    broker = AiopgBroker(dsn).with_result_backend(AiopgResultBackend(dsn))
    
    
    @broker.task("solve_all_problems")
    async def best_task_ever() -> None:
        """Solve all problems in the world."""
        await asyncio.sleep(2)
        print("All problems are solved!")
    
    
    async def main():
        await broker.startup()
        task = await best_task_ever.kiq()
        print(await task.wait_result())
        await broker.shutdown()
    
    
    if __name__ == "__main__":
        asyncio.run(main())
    
  2. Start a worker to process tasks (by default taskiq runs two instances of worker):

    taskiq worker broker_example:broker
    
  3. Run broker_example.py file to send a task to the worker:

    python broker_example.py
    

Your experience with other drivers will be pretty similar. Just change the import statement and that's it.

Task scheduling

  1. Define your broker and schedule source:

    # scheduler_example.py
    import asyncio
    from taskiq import TaskiqScheduler
    from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource
    
    
    dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
    broker = AsyncpgBroker(dsn)
    scheduler = TaskiqScheduler(
        broker=broker,
        sources=[AsyncpgScheduleSource(
            dsn=dsn,
            broker=broker,
        )],
    )
    
    
    @broker.task(
        task_name="solve_all_problems",
        schedule=[
            {
                "cron": "*/1 * * * *",  # type: str, either cron or time should be specified.
                "cron_offset": None,  # type: str | None, can be omitted.
                "time": None,  # type: datetime | None, either cron or time should be specified.
                "args": [], # type list[Any] | None, can be omitted.
                "kwargs": {}, # type: dict[str, Any] | None, can be omitted.
                "labels": {}, # type: dict[str, Any] | None, can be omitted.
            },
        ],
    )
    async def best_task_ever() -> None:
        """Solve all problems in the world."""
        await asyncio.sleep(2)
        print("All problems are solved!")
    
    # scheduler_example.py
    import asyncio
    from taskiq import TaskiqScheduler
    from taskiq_pg.psqlpy import PSQLPyBroker, PSQLPyScheduleSource
    
    
    dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
    broker = PSQLPyBroker(dsn)
    scheduler = TaskiqScheduler(
        broker=broker,
        sources=[PSQLPyScheduleSource(
            dsn=dsn,
            broker=broker,
        )],
    )
    
    
    @broker.task(
        task_name="solve_all_problems",
        schedule=[
            {
                "cron": "*/1 * * * *",  # type: str, either cron or time should be specified.
                "cron_offset": None,  # type: str | None, can be omitted.
                "time": None,  # type: datetime | None, either cron or time should be specified.
                "args": [], # type list[Any] | None, can be omitted.
                "kwargs": {}, # type: dict[str, Any] | None, can be omitted.
                "labels": {}, # type: dict[str, Any] | None, can be omitted.
            },
        ],
    )
    async def best_task_ever() -> None:
        """Solve all problems in the world."""
        await asyncio.sleep(2)
        print("All problems are solved!")
    
    # scheduler_example.py
    import asyncio
    from taskiq import TaskiqScheduler
    from taskiq_pg.psycopg import PsycopgBroker, PsycopgScheduleSource
    
    
    dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
    broker = PsycopgBroker(dsn)
    scheduler = TaskiqScheduler(
        broker=broker,
        sources=[PsycopgScheduleSource(
            dsn=dsn,
            broker=broker,
        )],
    )
    
    
    @broker.task(
        task_name="solve_all_problems",
        schedule=[
            {
                "cron": "*/1 * * * *",  # type: str, either cron or time should be specified.
                "cron_offset": None,  # type: str | None, can be omitted.
                "time": None,  # type: datetime | None, either cron or time should be specified.
                "args": [], # type list[Any] | None, can be omitted.
                "kwargs": {}, # type: dict[str, Any] | None, can be omitted.
                "labels": {}, # type: dict[str, Any] | None, can be omitted.
            },
        ],
    )
    async def best_task_ever() -> None:
        """Solve all problems in the world."""
        await asyncio.sleep(2)
        print("All problems are solved!")
    
    # scheduler_example.py
    import asyncio
    from taskiq import TaskiqScheduler
    from taskiq_pg.aiopg import AiopgBroker, AiopgScheduleSource
    
    
    dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
    broker = AiopgBroker(dsn)
    scheduler = TaskiqScheduler(
        broker=broker,
        sources=[AiopgScheduleSource(
            dsn=dsn,
            broker=broker,
        )],
    )
    
    
    @broker.task(
        task_name="solve_all_problems",
        schedule=[
            {
                "cron": "*/1 * * * *",  # type: str, either cron or time should be specified.
                "cron_offset": None,  # type: str | None, can be omitted.
                "time": None,  # type: datetime | None, either cron or time should be specified.
                "args": [], # type list[Any] | None, can be omitted.
                "kwargs": {}, # type: dict[str, Any] | None, can be omitted.
                "labels": {}, # type: dict[str, Any] | None, can be omitted.
            },
        ],
    )
    async def best_task_ever() -> None:
        """Solve all problems in the world."""
        await asyncio.sleep(2)
        print("All problems are solved!")
    
  2. Start worker processes:

    taskiq worker scheduler_example:broker
    
  3. Run scheduler process:

    taskiq scheduler scheduler_example:scheduler