NHacker Next
  • new
  • past
  • show
  • ask
  • show
  • jobs
  • submit
Multi-tenant queues in Postgres (docs.hatchet.run)
ndriscoll 13 days ago [-]
You've got something wonky going on with that query plan for the 2nd partition by attempt. In particular the seq scan on tasks to do the `(tasks.id = eligible_tasks.id)` hash join seems odd. The filter on queued status in `CTE eligible_tasks` (and not in the last join) also seems weird. Is that plan for the same query in the article?

If you add an index on `group_key, id WHERE status = 'queued'` and remove the 2nd `WHERE tasks."status" = 'QUEUED'` (I believe that's redundant?), you might get a better plan. You'd want something to make sure you're not preferring one group_key as well.

I think you should be able to solve your problem with workers having zero tasks by moving the LIMIT into the second CTE?

It's also useful in practice to have something like a worker_id and timestamp and not just set status to RUNNING in case a worker gets stuck/dies and you need to unclaim the work.

abelanger 13 days ago [-]
Ah, great catch - I just pushed an update to match the query from the article. I wasn't looking at much except for the WindowAgg line, thank you!

I tried with a similar indexing strategy - it did make a very noticeable difference, breaking at about 40000 enqueued tasks instead of 25000. I left indexing out of the article because it can open up a different can of worms with performance degradation over a longer time horizon.

I also tried with an `ORDER BY RANDOM()` across group keys first, which does help with fairness but breaks the "deterministic" element.

ndriscoll 13 days ago [-]
With your updated plan, you have

> Hash Cond: (tasks.id = t1.id)

> -> Seq Scan on tasks (cost=0.00..254.60 rows=48 width=14) (actual time=0.566..10.550 rows=10000 loops=1)

> Filter: (status = 'QUEUED'::"TaskStatus")

So it's still doing a seq scan on tasks when I'd expect it to join using the PK. It must be getting tripped up by the redundant filter on queued status. Try removing that.

I ninja edited my previous comment, but if you move the LIMIT to the 2nd CTE, that should fix your issue with workers not getting work. If you do that and add the other index I think in principle it should be able to do everything by maintaining a priority queue of the heads of each partition (which are each pre-sorted by the index now). idk if pg does that though. If it does, then that portion of the query should be streamed, so you don't need to try to limit it early to avoid a sort of 10k elements when you only need 10. Then if you remove the redundant QUEUED check, it should be doing everything else through indexes.

Basically, if doing this manually I'd expect the "good" solution to do this in a way where starting from an index, each row is streamed (i.e. no full sort) with logn complexity. So I look at it from a perspective of "how do I get the database to do what I'd do by hand?"

abelanger 13 days ago [-]
I created a gist with these recommendations - certainly an improvement, but I don't think it gets around the `WindowAgg` running across all 10k rows: https://gist.github.com/abelanger5/5c1a75755072239716cb587a2.... Does this accurately implement your suggestions?

Happy to try out other suggestions, but I haven't found a way to get a window function or `JOIN LATERAL` to scale in near-constant time for this queueing strategy.

ndriscoll 13 days ago [-]
It looks like now it does still only pull 100 rows out of the sort (so moving the limit into the 2nd cte didn't hurt). It isn't doing all 10000 rows now though, which is interesting. By any chance, do you have 9200 different tenants? If so that makes sense. What I suggested would work when you have a small number of tenants with queued work (it scales n log n with tenants with queued work, but log n with amount of tasks that a single tenant has queued). So if you're currently testing with many tenants queueing at once, you could see how it behaves with like 20 tenants where one has 9k items and the others have ~50 each. Then it sort of depends on how your distribution looks in practice to know whether that's acceptable.

You could also probably do tricks where individual workers filter to specific tenant IDs in the first CTE (e.g. filter group_key mod num_workers = worker_id) to reduce that cardinality if you expect it to be large. Or you could e.g. select 100 random group_keys as a first step and use that to filter the window, but then that needs another partial index on just `group_key where status = queued`.

Edit: I see it's still doing a seq scan on tasks. There is a PK on id, right? It knows there's only 100 rows from the rest of the query so it seems odd to me that it would decide to scan the table. You could try putting a hash index on id if it's refusing to use the btree index I guess. Or it might change its mind if you add 1M SUCCEEDED tasks or something.

Another thing to consider is that out of the box, pg's default config for the planner is tuned to like 20 year old hardware. You need to tweak the io costs for SSDs and tell it you have more RAM if you haven't done that. See e.g. https://pgtune.leopard.in.ua/ for better starting values.

12 days ago [-]
plandis 13 days ago [-]
At a previous job we did something similar but ended up having workers first poll another table to determine which tenant to query against. We called these items tokens and they represented a finite amount of dedicated thread time for processing a specific tenants’ queue.

What this looked like was a worker thread would first query the token table for which tenant to process eligible tasks from, and then update the token to take a timed lock and during that time would solely process eligible tasks from a specific tenant.

This has some nice properties:

1. You can scale different tenants using different amounts of tokens which means different but controlled amounts of thread time.

2. It allows for locality on your worker thread. Within a specific tenant the processing was usually similar so any shared resources could be cached and reused after polling for additional eligible tasks from the tenants queue.

magicalhippo 13 days ago [-]
Reminded me of the token bucket[1] algorithm. Good point about locality.

[1]: https://en.wikipedia.org/wiki/Token_bucket

klysm 13 days ago [-]
I like this approach a lot, but I’m unsure about time based vs number of items based fairness. I guess it really depends on the application.
jperras 13 days ago [-]
Is the `FOR UPDATE SKIP LOCKED` in the CTE necessary? Granted my understanding of Postgres row-level locking and their interaction with CTEs may be a bit lacking, but according to the docs[1]:

  The sub-statements in WITH are executed concurrently with each other and with the main query. Therefore, when using data-modifying statements in WITH, the order in which the specified updates actually happen is unpredictable. All the statements are executed with the same snapshot (see Chapter 13), so they cannot “see” one another's effects on the target tables.

1. https://www.postgresql.org/docs/current/queries-with.html#QU...
mslot 13 days ago [-]
Read committed mode (PostgreSQL's default) can get pretty funky.

If two transactions concurrently perform a SELECT (may be in a CTE) followed by an UPDATE, then they might see and try to update the same rows. That's often undesirable, for instance in the example of a queue where messages are supposed to arrive ~once. Serializable mode would "solve" the problem by letting one transaction fail, and expects the application to retry or otherwise deal with the consequences.

FOR UPDATE is a precision tool for working around read committed limitations. It ensures rows are locked by whichever transaction reads them first, such that the second reader blocks and (here's the funky part) when the first transaction is done it actually reads the latest row version instead of the one that was in the snapshot. That's semantically a bit weird, but nonetheless very useful, and actually matches how updates work in PostgreSQL.

The biggest issue with SELECT..FOR UPDATE is that it blocks waiting for concurrent updaters to finish, even if the rows no longer match its filter after the update. The SKIP LOCKED avoids all that by simply skipping the locked rows in the SELECT. Semantically even weirder, but very useful for queues.

jperras 13 days ago [-]
Ah, I see - the default transactional isolation level is what I wasn't accounting for.

Thanks for the explanation! Very much appreciated.

nosefrog 13 days ago [-]
Is a fair queue worth it vs spinning up more capacity? I've worked on multiple projects where we've ended up ripping out a queue and just spinning up more machines to handle the load synchronously instead.
vidarh 13 days ago [-]
More capacity won't address operations that the originator isn't willing to (or can't) hang around to wait for and/or that are long-running enough that restarts due to failures might be needed. That's the most immediate reason: Tasks where no amount of capacity will remove the need to have some form of queueing mechanism.

For complex enough workflows, queues are also often helpful at addressing potentially failing stages in ways that are easier to debug. But in that case you want your queue to be closer to a state machine where actually waiting in the queue for much time is the exception. You can just build a state machine for that too, ensuring the inputs to the stage about to execute are recorded in a durable, restartable way. But sometimes you may need more copies of the same type of job, and soon you have something that looks and smells much like a queue anyway.

Then lastly, spikes. But they only really help well enough if you still spin up more machines aggressively enough that the wait time doesn't get long enough to be perceived as just as bad as or worse than an immediate error, so it does make sense to ask your question.

A queue also doesn't need to be complex. If it gets complex, that increases the reasons to ask your question for that specific system. If it potentially grows large, as well (sometimes the solution to that is simply to refuse to queue if the queue exceeds a certain size or the processing time goes above a certain threshold).

Queues are great when appropriate, but they do often get used as a "solution" to a scaling problem that hasn't been sufficiently analyzed, which sounds like might have been the case in your examples.

strken 13 days ago [-]
Is it a choice? Most projects I've worked on had times when they became overwhelmed with requests; a queue handles this case, but more capacity just makes it rarer. Ideally you want enough capacity to handle X% of requests within Y milliseconds and a queue to deal with the leftovers, and I suppose if your X is low enough then a fair queue becomes a necessity.
datascienced 13 days ago [-]
I guess a queue handles spikes, and is OK for async-allowed operations such as generating a PDF and email it over say loading a web page.

A queue may give you time to scale up too?

time0ut 13 days ago [-]
Very cool. Bookmarked in case I ever need to do this.

I have implemented a transactional outbox in postgres using a simpler version of this plus a trigger to notify listening workers. It worked well and vastly outpaced the inserting processes. It easily handled several million tasks per hour without issue.

It is also nice the article showed the correct CTE based form of the query. It is possible to naively write the query without it and sometimes get way more tasks than you asked for when there are concurrent workers. I discovered that pretty quickly but it had me pulling my hair out…

jvolkman 13 days ago [-]
I have my queue workers maintain a list of task owners they've already processed, and prefer to get a task from an owner they've least-recently seen (or haven't seen) using `ORDER BY array_position(:seen_owner_ids, owner_id) desc`. Each new task's owner_id is inserted into the front of the list (and removed elsewhere if it exists).

But I have a relatively small number of possible `owner_id` values at any given time.

phibz 13 days ago [-]
Why not something like Kafka or Redis?
teraflop 13 days ago [-]
The most straightforward reason is that if you need a transactional database anyway, then moving the queue into the DB allows you to atomically en/dequeue messages at the same time as making other updates. Which can massively simplify your architecture because it eliminates an enormous category of possible failure modes. (Or it can massively improve your system correctness, if you didn't realize those failure modes were possible.)
dewey 13 days ago [-]
Also most of the times you already have a database so why not use that instead of adding another service to the pile.
klysm 13 days ago [-]
Using Kafka as a work queue is widely documented as a mistake. Using a single database results in a lot of operational simplicity and you get to skip a lot of distributed systems when all your state is in a single system
victor106 12 days ago [-]
> Using Kafka as a work queue is widely documented as a mistake

Could not find any good sources on this. Can you please provide any references?

perfectspiral 11 days ago [-]
This is a proposal to extend Kafka to (better) handle queueing use cases: https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A...

"For example, a queue is perfect for a situation in which messages are independent work items that can be processed concurrently by a pool of applications, and individually retried or acknowledged as processing completes. This is much easier to achieve using a queue rather than a partitioned topic with a consumer group."

klysm 12 days ago [-]
don’t have any off the top of my head sorry, I could be making it up but I’m pretty sure I remember reading that from several sources. Iirc it has to do with the fact that there isn’t really single message acknowledging?
hipadev23 13 days ago [-]
Because that’s sane, easy, and boring.
11 days ago [-]
rco8786 12 days ago [-]
Interesting read, but it seems like the grouping challenge could be fairly trivially solved in the application layer by registering job types (groups) with the poller, and having the round robin logic there.
remram 12 days ago [-]

You mean having the worker "know" that it's Alice's turn, and pull 10 Alice task from the database? Doesn't seem right.

rco8786 12 days ago [-]
Why not
remram 11 days ago [-]
Because then you're doing 10 of Alice's tasks in a row (or 100 or however man you pull at once). What they specifically want is to alternate.
rco8786 10 days ago [-]
Right I’m saying you implement the specific round robin logic that you want in the app layer.

Obviously if they don’t want to work 10 Alice jobs in a row you wouldn’t write it that way.

If you want true round robin it's as simple as an ordered list, where you look at the last completed job type, and find the next job type in the list, grab it and go. It's also more explicit this way vs that fairly complex query that is just doing implicit round robin.

remram 10 days ago [-]
So you get the list of potentially hundreds of people from the database, and then loop and do thousands of queries to the database? Query for Alice get 1, query for Bob get 1, query for Charlie there's none, query for Damien there's none, query for Emily get 1, ... eventually you have 100 and you send that many queries again to mark them as RUNNING?

tbh it really irks me that you can read this well-written, 3000-word article and go "I would just solve it trivially by not doing that on the server or something". There's a reason this has to run in the database.

rco8786 9 days ago [-]
No I would not do something so obviously silly. I’m not sure why you’re making up dumb scenarios as a way to discredit my idea.

I’m not trying to discredit the article or trivialize it. I’m trying to have a discussion about a thought I had while reading, based on my own lived experience working with DB backed queues.

remram 9 days ago [-]
You say this but you did not give many clues as to what your thought actually is. I keep prodding but I'm getting nothing back except for those "no obviously I wouldn't do that".

Will you actually describe your thought or does it not fit in the margin of your comment?

rco8786 8 days ago [-]
You have a list of queue groups. You write code to round robin through them. Each time, you see if there is a job to be processed for that group. If there is, process it. If there's not, move to the next group in the list.
ucarion 13 days ago [-]
This is pretty fancy stuff! Sorry if I'm just not reading carefully enough, but does this approach account for tenants whose messages take longer to process, as opposed to a tenant that sends a larger volume of messages?
abelanger 13 days ago [-]
No, this assumes you've already split your tasks into quanta of approximately the same duration - so all tasks are weighted equally in terms of execution time. If each of the tasks have different weights you might be looking at something like deficit round-robin [1], which I've looked at implementations for in RabbitMQ and we're thinking about how to implement in PG as well [2].

[1] https://www.cs.bu.edu/fac/matta/Teaching/cs655-papers/DRR.pd...

[2] https://nithril.github.io/amqp/2015/07/05/fair-consuming-wit...

ucarion 13 days ago [-]
Makes sense!

My gut tells me that it would often make sense to jump straight to shuffle sharding, where you'd converge on fair solutions dynamically, in a lot of cases. I'm looking forward to that follow-on article!

aranw 13 days ago [-]
Thanks for this write up. Really interesting I've built queues using Postgres before but never anything this complex so I'm sure this article will come in use and be handy in future!
mind-blight 13 days ago [-]
Super cool! I was looking at the self hosted quickstart, and it looks like Docker compare installs both hatchet and RabitMQ. Does hatchet use rabbit alongside Postgres?
abelanger 12 days ago [-]
Yeah we're using RabbitMQ for pub/sub (but are considering getting rid of it) and Postgres for the actual task queue. There's some more about that here: https://news.ycombinator.com/item?id=39643940.
wbeckler 13 days ago [-]
I love your animations! How did you do those?
abelanger 13 days ago [-]
Thank you! I've been using https://jitter.video with the Lottie exporter. It also has a Figma plugin so you can reuse components.
victor106 12 days ago [-]
lovely animations.

Can you expand on this. Why did you have to use both jitter and Lottie?

andrewstuart 13 days ago [-]
I wonder if Postgres RBAC row based access control is another solution to this.
remram 12 days ago [-]
This isn't related to access control. Workers process jobs for every user, that's actually the source of the problem.
throwaway984393 13 days ago [-]
hackermeows 13 days ago [-]
Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact
Rendered at 23:20:16 GMT+0000 (Coordinated Universal Time) with Vercel.