postgresql is your friend, ORM is not
Postgres is amazing. It’s powerful, efficient, rock-solid, and genuinely one of the finest pieces of software engineering ever created. And it’s also much more than a traditional relational database. It’s a complete, rich ecosystem packed into a single engine.
Beyond the classic CRUD operations everyone expects, Postgres lets you do far more. You can store and query JSONB fields as if they were structured tables, and you can even build efficient indexes on top of them. It blurs the line between relational and semi-structured data in a way that’s both elegant and practical.
CREATE TABLE IF NOT EXISTS videos (
id UUID NOT NULL PRIMARY KEY,
profile_id UUID NOT NULL REFERENCES profiles (id) ON DELETE CASCADE,
title TEXT,
state VARCHAR(32) NOT NULL,
metadata JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT now() NOT NULL,
updated_at TIMESTAMPTZ DEFAULT now() NOT NULL
);
CREATE INDEX idx_videos_tags_gin
ON videos USING GIN ((metadata->'tags') jsonb_path_ops);SELECT * FROM videos WHERE metadata->'tags' @> '["personal"]';You can easily implement a simple yet efficient full-text search using the pg_trgm extension. So you probably don’t need that Elasticsearch instance for your mid-sized project.
CREATE EXTENSION IF NOT EXISTS pg_trgm;
CREATE INDEX IF NOT EXISTS idx_videos_title_trgm
ON videos USING GIN (lower(title) gin_trgm_ops)
WHERE state = 'LIVE' AND title IS NOT NULL;SELECT * FROM videos
WHERE
title ILIKE '%' || $1 || '%'
OR similarity(title, $1) > 0.12;And you have pg_partman for automated partitioning!
CREATE EXTENSION pg_partman;
CREATE TABLE events (
id UUID NOT NULL PRIMARY KEY,
created_at TIMESTAMPTZ DEFAULT now() NOT NULL,
payload JSONB
);
SELECT partman.create_parent(
p_parent_table := 'public.events',
p_control := 'created_at',
p_type := 'native',
p_interval := 'monthly'
);
UPDATE partman.part_config
SET premake = 3,
retention = '12 months'
WHERE parent_table = 'public.events';
SELECT partman.run_maintenance();There are many more features worth your attention: views and materialised views, window functions for efficient sliding aggregations, CTEs for clearer complex queries, etc.
One of the most underrated features in Postgres is the FOR UPDATE SKIP LOCKED mechanism, combined with Postgres’ native LISTEN/NOTIFY pub/sub. Using these two together, you can build robust concurrent processing that scales horizontally, coordinates work safely, and never loses a record.
Here’s how it looks on the Postgres side — it simply emits a notification on each insert to a dedicated channel. Your workers can subscribe to it and react instantly.
CREATE TABLE IF NOT EXISTS tasks (
id UUID NOT NULL PRIMARY KEY,
task_name VARCHAR(128) NOT NULL,
payload JSONB,
state VARCHAR(32) NOT NULL,
created_at TIMESTAMPTZ DEFAULT now() NOT NULL,
updated_at TIMESTAMPTZ DEFAULT now() NOT NULL
);
CREATE OR REPLACE FUNCTION notify_task()
RETURNS TRIGGER AS $$
DECLARE
payload JSON;
BEGIN
IF NEW.state = 'PENDING' THEN
payload = json_build_object(
'id', NEW.id,
'task_name', NEW.task_name,
'created_at', NEW.created_at
);
PERFORM pg_notify('tasks', payload::text);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_task_notify
AFTER INSERT ON tasks
FOR EACH ROW EXECUTE FUNCTION notify_task();
It fires native pg_notify() on every INSERT for you, keeping your repository implementation clean and single-purposed.
class TaskRepositoryAdapter:
async def add(self, task: Task, connection: Connection | None = None) -> None:
query = """
INSERT INTO tasks (
id,
task_name,
payload,
state,
created_at,
updated_at
) VALUES (\(1, \)2, \(3, \)4, \(5, \)6)
"""
await self.db.execute(
query=query,
args=[
task.id,
task.task_name,
dumps(task.payload) if task.payload else None,
task.state,
task.created_at,
task.updated_at,
],
connection=connection,
)Here’s a simplified consumer that listens for notifications and runs the corresponding task. In real life there’s more logic, of course (retry strategy, logging, health checks, etc.) but the essence is this:
- long-lived connection stays open to listen on the notification channel
- another short-lived connection is used to lock the task row, update its state, and then run the actual work right after
class TaskConsumerAdapter:
async def listen(self) -> None:
self._notification_conn = await self.db.pool.acquire()
await self._notification_conn.add_listener("tasks", self._on_notification)
async def _on_notification(self, connection: Connection, pid: int, channel: str, payload: str) -> None:
task_data = loads(payload)
task_id = UUID(task_data["id"])
asyncio.create_task(self._take_task(task_id))
async def _take_task(self, task_id: UUID) -> None:
async with self.db.pool.acquire() as conn, conn.transaction():
# locks selected row
task = await self.task_repo.get_pending(task_id=task_id, connection=conn)
# marks status while locked
await self.task_repo.update(
task.change_state(TaskState.PROCESSING),
connection=conn
)
consumer = self.consumers[task.task_name]
await self._process_task(consumer, task)
async def _process_task(self, consumer: TaskConsumer, task: Task) -> None:
next_state = TaskState.FAILED # let's start pessimistically
try:
await asyncio.wait_for(consumer.process(task=task), timeout=1800)
next_state = TaskState.DONE
except asyncio.TimeoutError:
next_state = TaskState.PENDING # mark to retry
finally:
await self.task_repo.update(task.change_state(next_state))The entire distribution logic is controlled by just one line:
class TaskRepositoryAdapter:
async def get_pending(self, task_id: UUID, connection: Connection | None = None) -> Task | None:
query = """
SELECT * from tasks
WHERE id = $1
AND state = $2
FOR UPDATE SKIP LOCKED
"""
rec = await self.db.fetchone(query=query, args=[task_id, TaskState.PENDING], connection=connection)
return TaskFactories.task_from_record(rec)And literally means:
FOR UPDATElocks the selected rowsSKIP LOCKEDmakes others skip already-locked rows
This way it’s safe to add more consumers later, they’ll simply compete for the next available pending task, naturally distributing work among themselves.
We use this approach in Hypha to register and process video-transcoding tasks. It’s simple and reliable.
Postgres can handle around 10k messages per second, which is more than enough in most cases.
If you need higher throughput or stronger delivery guarantees, that’s when you bring in tools like Kafka, RabbitMQ, or Redis Streams. But for the vast majority of applications, a properly designed App + Postgres combo is more than enough. Using this alone already helps reduce the number of moving parts and the overall infrastructure complexity.
Know Postgres. Use Postgres. Postgres is your friend.
ORM is not your friend
TL;DR: avoid ORMs, you don’t need an ORM.
Surprisingly many people don’t see the difference between a Query Builder and an ORM, so let’s clarify.
An ORM is an Object–Relational Mapper. It maps database tables to classes and rows to objects. It lets you use a language-specific DSL instead of SQL, “hiding” SQL “complexity“ behind its own abstractions.
When you fetch a row with asyncpg, you get back a Record — a Python object representing a Postgres row. Is that an ORM? No. An ORM goes much further.
- it performs bidirectional mapping between relational data and domain objects
- it tracks object identity
- it synchronises state changes
- it flushes updates automatically
- it maintains internal caches and a unit of work
A lot of complex work happens here. But do we really need all of it? Why? How is it actually helpful?
In practice, all these “features for free” are more like “shooting yourself in the leg for free”. You get N+1 queries, accidental writes on flush, hidden internal caches of loaded entities, and a lot of invisible behaviour you never asked for.
Speaking of hidden SQL complexity, let’s compare. Here is a raw SQL to fetch a list of videos matching a condition:
SELECT id, title
FROM videos
WHERE state = 'PUBLISHED'
AND metadata->'tags' ? 'python'
AND created_at >= NOW() - INTERVAL '30 days'
ORDER BY created_at DESC
LIMIT 20 OFFSET 0;The same query using SQLAlchemy:
tag_filter = func.jsonb_exists(Video.metadata["tags"], "python")
query = (
session.query(Video.id, Video.title)
.filter(Video.state == "PUBLISHED")
.filter(tag_filter)
.filter(Video.created_at >= func.now() - text("INTERVAL '30 days'"))
.order_by(Video.created_at.desc())
.limit(20)
.offset(0)
)
results = query.all()Roughly the same query in Django ORM:
now_30_days_ago = timezone.now() - timedelta(days=30)
qs = (
Video.objects
.filter(state="PUBLISHED")
.filter(metadata__tags__contains=["python"])
.filter(created_at__gte=now_30_days_ago)
.order_by("-created_at")
[:20]
)“Roughly” because Django ORM doesn’t support the JSONB ? operator. And if you need real SQL intervals, Django pushes you towards raw expressions or Func() wrappers. Of course, you have to know the tool really well to pull off tricks like this:
videos = Video.objects.annotate(
seven_days_ago=Func(
Value('7 days'),
function='NOW() - INTERVAL',
)
).filter(created_at__gt=F('seven_days_ago'))But that’s more about expressiveness and implementation limitations. The deeper issue lies elsewhere, so let’s take a step back.
What we actually need is simple — persist aggregates and restore them later, right?
That responsibility belongs inside the aggregate’s repository. And yes, that can (and should) be done directly in plain SQL.
SQL is already an excellent DSL for relational data
It might look like this:
@dataclass(kw_only=True)
class Video:
id: UUID = field(default_factory=uuid7)
owner_id: HyphaID
title: str | None = None
playlist: VideoPlaylist | None = None
video_tracks: list[VideoPlaylist] = field(default_factory=list)
audio_tracks: list[AudioPlaylist] = field(default_factory=list)
state: VideoState
created_at: DateTime = field(default_factory=now)
updated_at: DateTime = field(default_factory=now)
class VideoRepositoryAdapter:
async def add(self, video: Video, connection: Connection | None = None) -> None:
# handy pattern to implement a 'unit of work'
if connection:
await self._add(video, connection=connection)
else:
async with self.db.pool.acquire() as conn, conn.transaction():
await self._add(video, connection=conn)
async def _add(self, video: Video, connection: Connection) -> None:
query = """
INSERT INTO videos (
id,
profile_id,
title,
state,
created_at,
updated_at
)
VALUES (
$1,
$2,
$3,
$4,
$5,
$6
);
"""
await self.db.execute(
query=query,
args=[
video.id,
video.profile_id,
video.title,
video.state,
video.created_at,
video.updated_at,
],
connection=connection,
)
# some aggregate attributes are stored in a separate table,
# but all inserts still happen within the same transaction
traits_query = """
INSERT INTO video_traits (
id,
video_id,
trait_type,
trait_data,
created_at
) VALUES (\(1, \)2, \(3, \)4, $5)
"""
if video.playlist:
playlist_trait = video.playlist
await self.db.execute(
query=traits_query,
args=[
playlist_trait.id,
video.id,
VideoTraitType.MASTER_PLAYLIST,
playlist_trait.serialize(),
video.created_at,
],
connection=connection,
)
for video_track_trait in video.video_tracks:
await self.db.execute(
query=traits_query,
args=[
video_track_trait.id,
video.id,
VideoTraitType.STAGING_VIDEO,
video_track_trait.serialize(),
video.created_at,
],
connection=connection,
)
for audio_track_trait in video.audio_tracks:
await self.db.execute(
query=traits_query,
args=[
audio_track_trait.id,
video.id,
VideoTraitType.STAGING_AUDIO,
audio_track_trait.serialize(),
video.created_at,
],
connection=connection,
)
async def get_multi(
self,
*,
owner_id: HyphaID,
states: list[VideoState] | None = None,
limit: int = 10,
offset: int = 0,
connection: Connection | None = None,
) -> list[Video]:
select_query = """
SELECT
v.id,
p.public_id as owner_id,
v.title,
v.state,
v.created_at,
v.updated_at
FROM videos v
"""
query_args = [owner_id, limit, offset]
conditions = [f"p.public_id = $1"]
if states:
query_args.append(states)
conditions.append(f"v.state = ANY(${len(query_args)})")
# all parts together
select_query = f"""
{select_query}
JOIN profiles AS p ON v.profile_id = p.id
WHERE {" AND ".join(conditions)}
ORDER BY v.updated_at DESC
LIMIT \(2 OFFSET \)3;
"""
recs = await self.db.fetchmany(
query=select_query,
args=query_args,
connection=connection,
)
if not recs:
return []
# the most practical way to fetch all traits we need,
# only one extra query for N videos
traits_query = """
SELECT
video_id,
jsonb_agg(
jsonb_build_object(
'id', id,
'trait_type', trait_type,
'trait_data', trait_data
)
) AS traits
FROM video_traits
WHERE video_id = ANY($1)
GROUP BY video_id;
"""
trait_recs = await self.db.fetchmany(
query=traits_query,
args=[[rec["id"] for rec in recs]],
connection=connection,
)
return list(VideoFactories.videos_from_record(recs, trait_recs))
class VideoFactories:
@classmethod
def videos_from_records(cls, recs: list[Record], trait_recs: list[Record]) -> Iterator[Video]:
for rec in recs:
video_attrs = {
"id": UUID(str(rec["id"])),
"owner_id": HyphaID(rec["owner_id"]),
"title": rec["title"],
"state": VideoState(rec["state"]),
"created_at": pendulum.instance(rec["created_at"]),
"updated_at": pendulum.instance(rec["updated_at"]),
}
# enrich the aggregate with data from traits
...
yield Video(**video_attrs)Quite a few very important details here.
- The aggregate can be persisted in more than 1 DB table
It’s fairly common to use several tables to persist an aggregate for the sake of flexibility and query efficiency. You may notice the video_traits table, which stores “schemaless” objects (parts of the aggregate), in addition to the main videos table, which stores the more surface-level values.
- SQL tables don’t necessarily map 1-to-1 to the domain model
In Hypha, we rarely have a one-to-one match between aggregates and tables. Using an ORM would mean translating domain models to ORM models and back again just to reassemble the final shape. If you follow the snippets carefully, you’ll notice this applies to both the underlying tables and the naming conventions. The database schema uses a generic profile_id column name (closer to the actual table name) while the Video aggregate uses owner_id, a more precise term for the specific bounded context.
- Raw SQL is not a sin
Might be shocking to some of you, but that’s exactly what you need. Your repository is your aggregate-scoped query builder. There’s nothing wrong with having SQL strings there — it doesn’t break Separation of Concerns and it follows Locality of Behaviour perfectly. It’s safe, because of course we use the driver-native query escaping for arguments.
Yes, you must be careful here with all the commas and quotes. Yes, you must follow a certain self-discipline. But you write it once, test it well, and reuse it everywhere afterward. In return, you get:
- full control over the resulting SQL, allowing you to use the full power of SQL as a relational data DSL
- transparency and flexibility in how you build queries
- scoped, highly-efficient, purpose-built queries
There are two clear concerns experienced developers might raise here, though: how to deal with really big, complex analytical queries, and what to do about composability and reusability?
Surprisingly, the “workarounds” are just as simple and straightforward as the overall solution itself.
When a query starts growing far beyond the physical screen and makes the assembly logic harder to follow, you can template it. Literally move the query out of the method and into a Jinja template. The template can be as rich as you need, with parent CTEs as a base and conditional parameterisation. It’s a practical, pragmatic way to manage extra-long, heavy analytical queries.
Here comes the second one — how to reuse parts of the query? ORMs shine here because of method chaining, letting you extract common parts as a basis for other queries, right?
Even though this is possible (and often encouraged), blindly following the DRY principle can easily lead you to violate another, more important one:
Prefer Locality of Behaviour over DRY
By splitting the query chain into parts, you end up spreading the query composition across the repository (in the best case) or across several modules (in the worst), undermining the main advantage of the Locality of Behaviour principle — keeping behaviour scoped and contained.
If you start noticing repetitive parts of your SQL across several methods and feel the natural urge to refactor them into something reusable — stop right there and don’t. The optimisation might look more concise on the screen, but it won’t be in your head, ultimately making it harder to follow and maintain.
It’s perfectly fine to have repetitions across queries. It may take a bit longer to review and update the affected repositories and services when underlying schemas or base conditions change, but that process is infrequent and very straightforward. Your tests should reveal any gaps and guide you.
In return: each query remains isolated and atomic, with nothing external able to break it or implicitly change its behaviour. You can always review it visually without extra hopping and rebuild the mental context immediately. And that’s what truly matters.
No Alchemy needed when you have real Chemistry.