postgresql is your friend, ORM is not

Postgres is amazing. It’s powerful, efficient, rock-solid, and genuinely one of the finest pieces of software engineering ever created. And it’s also much more than a traditional relational database. It’s a complete, rich ecosystem packed into a single engine.

Beyond the classic CRUD operations everyone expects, Postgres lets you do far more. You can store and query JSONB fields as if they were structured tables, and you can even build efficient indexes on top of them. It blurs the line between relational and semi-structured data in a way that’s both elegant and practical.

CREATE TABLE IF NOT EXISTS videos (
    id           UUID NOT NULL PRIMARY KEY,
    profile_id   UUID NOT NULL REFERENCES profiles (id) ON DELETE CASCADE,
    title        TEXT,
    state        VARCHAR(32) NOT NULL,
    metadata     JSONB NOT NULL,
    created_at   TIMESTAMPTZ DEFAULT now() NOT NULL,
    updated_at   TIMESTAMPTZ DEFAULT now() NOT NULL
);
 
CREATE INDEX idx_videos_tags_gin
ON videos USING GIN ((metadata->'tags') jsonb_path_ops);
SELECT * FROM videos WHERE metadata->'tags' @> '["personal"]';

You can easily implement a simple yet efficient full-text search using the pg_trgm extension. So you probably don’t need that Elasticsearch instance for your mid-sized project.

CREATE EXTENSION IF NOT EXISTS pg_trgm;
 
CREATE INDEX IF NOT EXISTS idx_videos_title_trgm
ON videos USING GIN (lower(title) gin_trgm_ops)
WHERE state = 'LIVE' AND title IS NOT NULL;
SELECT * FROM videos
WHERE
    title ILIKE '%' || $1 || '%'
    OR similarity(title, $1) > 0.12;

And you have pg_partman for automated partitioning!

CREATE EXTENSION pg_partman;
 
CREATE TABLE events (
    id          UUID NOT NULL PRIMARY KEY,
    created_at  TIMESTAMPTZ DEFAULT now() NOT NULL,
    payload     JSONB
);
 
SELECT partman.create_parent(
    p_parent_table := 'public.events',
    p_control      := 'created_at',
    p_type         := 'native',
    p_interval     := 'monthly'
);
 
UPDATE partman.part_config
   SET premake = 3,
       retention = '12 months'
 WHERE parent_table = 'public.events';
 
SELECT partman.run_maintenance();

There are many more features worth your attention: views and materialised views, window functions for efficient sliding aggregations, CTEs for clearer complex queries, etc.

One of the most underrated features in Postgres is the FOR UPDATE SKIP LOCKED mechanism, combined with Postgres’ native LISTEN/NOTIFY pub/sub. Using these two together, you can build robust concurrent processing that scales horizontally, coordinates work safely, and never loses a record.

Here’s how it looks on the Postgres side — it simply emits a notification on each insert to a dedicated channel. Your workers can subscribe to it and react instantly.

CREATE TABLE IF NOT EXISTS tasks (
    id            UUID NOT NULL PRIMARY KEY,
    task_name     VARCHAR(128) NOT NULL,
    payload       JSONB,
    state         VARCHAR(32) NOT NULL,
    created_at    TIMESTAMPTZ DEFAULT now() NOT NULL,
    updated_at    TIMESTAMPTZ DEFAULT now() NOT NULL
);
 
CREATE OR REPLACE FUNCTION notify_task()
RETURNS TRIGGER AS $$
DECLARE
    payload JSON;
BEGIN
    IF NEW.state = 'PENDING' THEN
        payload = json_build_object(
            'id', NEW.id,
            'task_name', NEW.task_name,
            'created_at', NEW.created_at
        );
        PERFORM pg_notify('tasks', payload::text);
    END IF;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;
 
CREATE TRIGGER trg_task_notify
AFTER INSERT ON tasks
FOR EACH ROW EXECUTE FUNCTION notify_task();
 

It fires native pg_notify() on every INSERT for you, keeping your repository implementation clean and single-purposed.

class TaskRepositoryAdapter:
 
    async def add(self, task: Task, connection: Connection | None = None) -> None:
        query = """
            INSERT INTO tasks (
                id,
                task_name,
                payload,
                state,
                created_at,
                updated_at
            ) VALUES (\(1, \)2, \(3, \)4, \(5, \)6)
        """
        await self.db.execute(
            query=query,
            args=[
                task.id,
                task.task_name,
                dumps(task.payload) if task.payload else None,
                task.state,
                task.created_at,
                task.updated_at,
            ],
            connection=connection,
        )

Here’s a simplified consumer that listens for notifications and runs the corresponding task. In real life there’s more logic, of course (retry strategy, logging, health checks, etc.) but the essence is this:

  • long-lived connection stays open to listen on the notification channel
  • another short-lived connection is used to lock the task row, update its state, and then run the actual work right after
class TaskConsumerAdapter:
 
    async def listen(self) -> None:
        self._notification_conn = await self.db.pool.acquire()
        await self._notification_conn.add_listener("tasks", self._on_notification)
 
    async def _on_notification(self, connection: Connection, pid: int, channel: str, payload: str) -> None:
        task_data = loads(payload)
        task_id = UUID(task_data["id"])
        asyncio.create_task(self._take_task(task_id))
 
    async def _take_task(self, task_id: UUID) -> None:
        async with self.db.pool.acquire() as conn, conn.transaction():
            # locks selected row
            task = await self.task_repo.get_pending(task_id=task_id, connection=conn)
            # marks status while locked
            await self.task_repo.update(
                task.change_state(TaskState.PROCESSING),
                connection=conn
            )
 
        consumer = self.consumers[task.task_name]
        await self._process_task(consumer, task)
 
    async def _process_task(self, consumer: TaskConsumer, task: Task) -> None:
        next_state = TaskState.FAILED  # let's start pessimistically
        try:
            await asyncio.wait_for(consumer.process(task=task), timeout=1800)
            next_state = TaskState.DONE
        except asyncio.TimeoutError:
            next_state = TaskState.PENDING  # mark to retry
        finally:
            await self.task_repo.update(task.change_state(next_state))

The entire distribution logic is controlled by just one line:

class TaskRepositoryAdapter:
 
    async def get_pending(self, task_id: UUID, connection: Connection | None = None) -> Task | None:
        query = """
            SELECT * from tasks
            WHERE id = $1
            AND state = $2
            FOR UPDATE SKIP LOCKED
        """
        rec = await self.db.fetchone(query=query, args=[task_id, TaskState.PENDING], connection=connection)
 
        return TaskFactories.task_from_record(rec)

And literally means:

  • FOR UPDATE locks the selected rows
  • SKIP LOCKED makes others skip already-locked rows

This way it’s safe to add more consumers later, they’ll simply compete for the next available pending task, naturally distributing work among themselves.

We use this approach in Hypha to register and process video-transcoding tasks. It’s simple and reliable.

Postgres can handle around 10k messages per second, which is more than enough in most cases.

If you need higher throughput or stronger delivery guarantees, that’s when you bring in tools like Kafka, RabbitMQ, or Redis Streams. But for the vast majority of applications, a properly designed App + Postgres combo is more than enough. Using this alone already helps reduce the number of moving parts and the overall infrastructure complexity.

Know Postgres. Use Postgres. Postgres is your friend.

ORM is not your friend

TL;DR: avoid ORMs, you don’t need an ORM.

Surprisingly many people don’t see the difference between a Query Builder and an ORM, so let’s clarify.

An ORM is an Object–Relational Mapper. It maps database tables to classes and rows to objects. It lets you use a language-specific DSL instead of SQL, “hiding” SQL “complexity“ behind its own abstractions.

When you fetch a row with asyncpg, you get back a Record — a Python object representing a Postgres row. Is that an ORM? No. An ORM goes much further.

  • it performs bidirectional mapping between relational data and domain objects
  • it tracks object identity
  • it synchronises state changes
  • it flushes updates automatically
  • it maintains internal caches and a unit of work

A lot of complex work happens here. But do we really need all of it? Why? How is it actually helpful?

In practice, all these “features for free” are more like “shooting yourself in the leg for free”. You get N+1 queries, accidental writes on flush, hidden internal caches of loaded entities, and a lot of invisible behaviour you never asked for.

Speaking of hidden SQL complexity, let’s compare. Here is a raw SQL to fetch a list of videos matching a condition:

SELECT id, title
FROM videos
WHERE state = 'PUBLISHED'
  AND metadata->'tags' ? 'python'
  AND created_at >= NOW() - INTERVAL '30 days'
ORDER BY created_at DESC
LIMIT 20 OFFSET 0;

The same query using SQLAlchemy:

tag_filter = func.jsonb_exists(Video.metadata["tags"], "python")
 
query = (
    session.query(Video.id, Video.title)
    .filter(Video.state == "PUBLISHED")
    .filter(tag_filter)
    .filter(Video.created_at >= func.now() - text("INTERVAL '30 days'"))
    .order_by(Video.created_at.desc())
    .limit(20)
    .offset(0)
)
 
results = query.all()

Roughly the same query in Django ORM:

now_30_days_ago = timezone.now() - timedelta(days=30)
 
qs = (
    Video.objects
    .filter(state="PUBLISHED")
    .filter(metadata__tags__contains=["python"])
    .filter(created_at__gte=now_30_days_ago)
    .order_by("-created_at")
    [:20]
)

“Roughly” because Django ORM doesn’t support the JSONB ? operator. And if you need real SQL intervals, Django pushes you towards raw expressions or Func() wrappers. Of course, you have to know the tool really well to pull off tricks like this:

videos = Video.objects.annotate(
    seven_days_ago=Func(
        Value('7 days'),
        function='NOW() - INTERVAL',
    )
).filter(created_at__gt=F('seven_days_ago'))

But that’s more about expressiveness and implementation limitations. The deeper issue lies elsewhere, so let’s take a step back.

What we actually need is simple — persist aggregates and restore them later, right?

That responsibility belongs inside the aggregate’s repository. And yes, that can (and should) be done directly in plain SQL.

SQL is already an excellent DSL for relational data

It might look like this:

@dataclass(kw_only=True)
class Video:
    id: UUID = field(default_factory=uuid7)
    owner_id: HyphaID
 
    title: str | None = None
 
    playlist: VideoPlaylist | None = None
    video_tracks: list[VideoPlaylist] = field(default_factory=list)
    audio_tracks: list[AudioPlaylist] = field(default_factory=list)
 
    state: VideoState
 
    created_at: DateTime = field(default_factory=now)
    updated_at: DateTime = field(default_factory=now)
 
 
class VideoRepositoryAdapter:
 
    async def add(self, video: Video, connection: Connection | None = None) -> None:
        # handy pattern to implement a 'unit of work'
        if connection:
            await self._add(video, connection=connection)
        else:
            async with self.db.pool.acquire() as conn, conn.transaction():
                await self._add(video, connection=conn)
 
    async def _add(self, video: Video, connection: Connection) -> None:
        query = """
            INSERT INTO videos (
                id,
                profile_id,
                title,
                state,
                created_at,
                updated_at
            )
            VALUES (
                $1,
                $2,
                $3,
                $4,
                $5,
                $6
            );
        """
        await self.db.execute(
            query=query,
            args=[
                video.id,
                video.profile_id,
                video.title,
                video.state,
                video.created_at,
                video.updated_at,
            ],
            connection=connection,
        )
 
        # some aggregate attributes are stored in a separate table,
        # but all inserts still happen within the same transaction
 
        traits_query = """
            INSERT INTO video_traits (
                id,
                video_id,
                trait_type,
                trait_data,
                created_at
            ) VALUES (\(1, \)2, \(3, \)4, $5)
        """
 
        if video.playlist:
            playlist_trait = video.playlist
            await self.db.execute(
                query=traits_query,
                args=[
                    playlist_trait.id,
                    video.id,
                    VideoTraitType.MASTER_PLAYLIST,
                    playlist_trait.serialize(),
                    video.created_at,
                ],
                connection=connection,
            )
 
        for video_track_trait in video.video_tracks:
            await self.db.execute(
                query=traits_query,
                args=[
                    video_track_trait.id,
                    video.id,
                    VideoTraitType.STAGING_VIDEO,
                    video_track_trait.serialize(),
                    video.created_at,
                ],
                connection=connection,
            )
 
        for audio_track_trait in video.audio_tracks:
            await self.db.execute(
                query=traits_query,
                args=[
                    audio_track_trait.id,
                    video.id,
                    VideoTraitType.STAGING_AUDIO,
                    audio_track_trait.serialize(),
                    video.created_at,
                ],
                connection=connection,
            )
 
    async def get_multi(
        self,
        *,
        owner_id: HyphaID,
        states: list[VideoState] | None = None,
        limit: int = 10,
        offset: int = 0,
        connection: Connection | None = None,
    ) -> list[Video]:
        select_query = """
            SELECT
                v.id,
                p.public_id as owner_id,
                v.title,
                v.state,
                v.created_at,
                v.updated_at
            FROM videos v
        """
        query_args = [owner_id, limit, offset]
        conditions = [f"p.public_id = $1"]
 
        if states:
            query_args.append(states)
            conditions.append(f"v.state = ANY(${len(query_args)})")
 
        # all parts together
        select_query = f"""
            {select_query}
            JOIN profiles AS p ON v.profile_id = p.id
            WHERE {" AND ".join(conditions)}
            ORDER BY v.updated_at DESC
            LIMIT \(2 OFFSET \)3;
        """
 
        recs = await self.db.fetchmany(
            query=select_query,
            args=query_args,
            connection=connection,
        )
        if not recs:
            return []
 
        # the most practical way to fetch all traits we need,
        # only one extra query for N videos
        traits_query = """
            SELECT
                video_id,
                jsonb_agg(
                    jsonb_build_object(
                        'id', id,
                        'trait_type', trait_type,
                        'trait_data', trait_data
                    )
                ) AS traits
            FROM video_traits
            WHERE video_id = ANY($1)
            GROUP BY video_id;
        """
 
        trait_recs = await self.db.fetchmany(
            query=traits_query,
            args=[[rec["id"] for rec in recs]],
            connection=connection,
        )
 
        return list(VideoFactories.videos_from_record(recs, trait_recs))
 
 
class VideoFactories:
    @classmethod
    def videos_from_records(cls, recs: list[Record], trait_recs: list[Record]) -> Iterator[Video]:
        for rec in recs:
            video_attrs = {
                "id": UUID(str(rec["id"])),
                "owner_id": HyphaID(rec["owner_id"]),
                "title": rec["title"],
                "state": VideoState(rec["state"]),
                "created_at": pendulum.instance(rec["created_at"]),
                "updated_at": pendulum.instance(rec["updated_at"]),
            }
            # enrich the aggregate with data from traits
            ...
            yield Video(**video_attrs)

Quite a few very important details here.

  • The aggregate can be persisted in more than 1 DB table

It’s fairly common to use several tables to persist an aggregate for the sake of flexibility and query efficiency. You may notice the video_traits table, which stores “schemaless” objects (parts of the aggregate), in addition to the main videos table, which stores the more surface-level values.

  • SQL tables don’t necessarily map 1-to-1 to the domain model

In Hypha, we rarely have a one-to-one match between aggregates and tables. Using an ORM would mean translating domain models to ORM models and back again just to reassemble the final shape. If you follow the snippets carefully, you’ll notice this applies to both the underlying tables and the naming conventions. The database schema uses a generic profile_id column name (closer to the actual table name) while the Video aggregate uses owner_id, a more precise term for the specific bounded context.

  • Raw SQL is not a sin

Might be shocking to some of you, but that’s exactly what you need. Your repository is your aggregate-scoped query builder. There’s nothing wrong with having SQL strings there — it doesn’t break Separation of Concerns and it follows Locality of Behaviour perfectly. It’s safe, because of course we use the driver-native query escaping for arguments.

Yes, you must be careful here with all the commas and quotes. Yes, you must follow a certain self-discipline. But you write it once, test it well, and reuse it everywhere afterward. In return, you get:

  • full control over the resulting SQL, allowing you to use the full power of SQL as a relational data DSL
  • transparency and flexibility in how you build queries
  • scoped, highly-efficient, purpose-built queries

There are two clear concerns experienced developers might raise here, though: how to deal with really big, complex analytical queries, and what to do about composability and reusability?

Surprisingly, the “workarounds” are just as simple and straightforward as the overall solution itself.

When a query starts growing far beyond the physical screen and makes the assembly logic harder to follow, you can template it. Literally move the query out of the method and into a Jinja template. The template can be as rich as you need, with parent CTEs as a base and conditional parameterisation. It’s a practical, pragmatic way to manage extra-long, heavy analytical queries.

Here comes the second one — how to reuse parts of the query? ORMs shine here because of method chaining, letting you extract common parts as a basis for other queries, right?

Even though this is possible (and often encouraged), blindly following the DRY principle can easily lead you to violate another, more important one:

Prefer Locality of Behaviour over DRY

By splitting the query chain into parts, you end up spreading the query composition across the repository (in the best case) or across several modules (in the worst), undermining the main advantage of the Locality of Behaviour principle — keeping behaviour scoped and contained.

If you start noticing repetitive parts of your SQL across several methods and feel the natural urge to refactor them into something reusable — stop right there and don’t. The optimisation might look more concise on the screen, but it won’t be in your head, ultimately making it harder to follow and maintain.

It’s perfectly fine to have repetitions across queries. It may take a bit longer to review and update the affected repositories and services when underlying schemas or base conditions change, but that process is infrequent and very straightforward. Your tests should reveal any gaps and guide you.

In return: each query remains isolated and atomic, with nothing external able to break it or implicitly change its behaviour. You can always review it visually without extra hopping and rebuild the mental context immediately. And that’s what truly matters.

No Alchemy needed when you have real Chemistry.