This is the 4th article in the series of learning more about Postgres and applying it to a Rust backend.
Here is a index of them all.
In this episode we are going to explore how to build and use a connection threadpool.
Using a DB connection threadpool is one of the first thing that is likely to be planned in any application that will sustain a constant stream of users and therefore many DB connections. The reason is that creating a socket is always expensive (in terms of computational resources), so one of the solutions is to create a pool of "ready-to-use" connections at startup of the application, keep them ready and distribute them everytime the application needs to query the database. Once the application has finished that specific work with the DB (for example, creating a new user), the connection handle will be returned to the pool for the next DB operation. All this is basically the job of a connection threadpool.
In my time using Rust I've used two solutions for a connection threadpool: a synchronous and an asynchronous pooler. They basically do the same thing but the second one is preferable when the whole application is async, otherwise you would need to spawn a new thread everytime a connection from the pool is being used. The whole point of an asynchronous application from top to bottom is to avoid spawning too many OS threads and instead rely on tasks (which have a different lifecycle, that I'm not going to detail here) that live inside the same thread.
Note: a common misconception is that a synchronous application is always slow and asynchronous programming is always TEH WAY TO GO. This is not always true. Spawning threads is not so expensive like it used to be (in the Linux kernel). Asynchronous programming is also a bit more tricky and it should be evaluated if the cost is worth the benefits. I know this sounds like hand-waving but I've seen too often talking about sync vs. async like it's a dick-measuring contest - the reality is that every situation is different and what works for someone else might not be ideal for me.
For completeness, we will see both solutions.
§ Synchronous DB pool
To summarize the concept: the DB pool manager holds a pool of DB connections and everytime the application asks for one connection, the pooler will open a thread so the whole operation won't be blocking.
Let's see for example a widely used Rust crate, r2d2
coupled with a Postgres database. Let's copy and paste the first example on the README.md
of that crate.
First add the dependencies to our project Cargo.toml
:
[dependencies]
r2d2 = "0.8.9"
r2d2_postgres = "0.18.1"
Note: although I have only 2 dependencies, the transient libraries they depend on accounted for a total of 93 crates (external dependencies pulled in automatically) when I compiled the project. Keep in mind this number for later.
use std::thread;
use r2d2_postgres::{postgres::NoTls, PostgresConnectionManager};
fn main() {
let manager = PostgresConnectionManager::new(
"host=localhost user=postgres".parse().unwrap(),
NoTls,
);
let pool = r2d2::Pool::new(manager).unwrap();
let mut handles = vec![];
for i in 0..10i32 {
let pool = pool.clone();
handles.push(thread::spawn(move || {
eprintln!("Thread{}", i);
let mut client = pool.get().unwrap();
client.execute("INSERT INTO tbl (field) VALUES ($1)", &[&i]).unwrap();
}));
}
for h in handles {
h.join().unwrap();
}
}
This code will create a pool then will spawn 10 threads, each will execute an INSERT into a table. All the threads need to use join so they are "waited" until finish. After running this we will have 10 records in that table (try it!).
Let's tweak that code for a moment and add a huge delay before executing each thread, so we have time to observe what is happening:
...
// delay thread execution for 5 minutes
let delay = core::time::Duration::from_secs(300);
for i in 0..10i32 {
let pool = pool.clone();
handles.push(thread::spawn(move || {
eprintln!("Thread{}: waiting {}s", i, delay.as_secs());
// Zzz....
thread::sleep(delay);
let mut client = pool.get().unwrap();
client
.execute("INSERT INTO library (shelf) VALUES ($1)", &[&i.to_string()])
.unwrap();
}));
}
...
Now I'll re-run this code and observe that the application "hangs":
$ cargo run
...
Thread0: waiting 300s
Thread1: waiting 300s
Thread3: waiting 300s
Thread2: waiting 300s
Thread5: waiting 300s
Thread6: waiting 300s
Thread7: waiting 300s
Thread4: waiting 300s
Thread9: waiting 300s
Thread8: waiting 300s
Check the open sockets (filtering out all the noise) and we see 10 open connections to Postgres:
$ sudo netstat -tanp | grep 5432 | grep ESTABLISHED | grep debug
tcp 0 0 127.0.0.1:48506 127.0.0.1:5432 ESTABLISHED 765750/target/debug
tcp 0 0 127.0.0.1:48502 127.0.0.1:5432 ESTABLISHED 765750/target/debug
tcp 0 0 127.0.0.1:48504 127.0.0.1:5432 ESTABLISHED 765750/target/debug
tcp 0 0 127.0.0.1:48496 127.0.0.1:5432 ESTABLISHED 765750/target/debug
tcp 0 0 127.0.0.1:48500 127.0.0.1:5432 ESTABLISHED 765750/target/debug
tcp 0 0 127.0.0.1:48494 127.0.0.1:5432 ESTABLISHED 765750/target/debug
tcp 0 0 127.0.0.1:48490 127.0.0.1:5432 ESTABLISHED 765750/target/debug
tcp 0 0 127.0.0.1:48498 127.0.0.1:5432 ESTABLISHED 765750/target/debug
tcp 0 0 127.0.0.1:48488 127.0.0.1:5432 ESTABLISHED 765750/target/debug
tcp 0 0 127.0.0.1:48492 127.0.0.1:5432 ESTABLISHED 765750/target/debug
Let's see htop
:
691906 user 20 0 8320 5268 3480 S 0.0 0.0 0:00.07 │ └─ bash
765750 user 20 0 865M 5004 4372 S 0.0 0.0 0:00.10 │ └─ r2d2-postgres
765949 user 20 0 865M 5004 4372 S 0.0 0.0 0:00.00 │ ├─ r2d2-postgres
765950 user 20 0 865M 5004 4372 S 0.0 0.0 0:00.00 │ ├─ r2d2-postgres
765951 user 20 0 865M 5004 4372 S 0.0 0.0 0:00.00 │ ├─ r2d2-postgres
765964 user 20 0 865M 5004 4372 S 0.0 0.0 0:00.00 │ ├─ r2d2-postgres
765965 user 20 0 865M 5004 4372 S 0.0 0.0 0:00.00 │ ├─ r2d2-postgres
765966 user 20 0 865M 5004 4372 S 0.0 0.0 0:00.00 │ ├─ r2d2-postgres
765967 user 20 0 865M 5004 4372 S 0.0 0.0 0:00.00 │ ├─ r2d2-postgres
765968 user 20 0 865M 5004 4372 S 0.0 0.0 0:00.00 │ ├─ r2d2-postgres
765969 user 20 0 865M 5004 4372 S 0.0 0.0 0:00.00 │ ├─ r2d2-postgres
765970 user 20 0 865M 5004 4372 S 0.0 0.0 0:00.00 │ ├─ r2d2-postgres
765973 user 20 0 865M 5004 4372 S 0.0 0.0 0:00.00 │ ├─ r2d2-postgres
765974 user 20 0 865M 5004 4372 S 0.0 0.0 0:00.00 │ ├─ r2d2-postgres
765975 user 20 0 865M 5004 4372 S 0.0 0.0 0:00.00 │ └─ r2d2-postgres
Let's pick the parent PID and check that threads count of our Rust process:
$ grep Threads /proc/765750/status
35:Threads: 14
Ok, cool! We now see what happens when we pop threads like candies. Let's see now the corresponding code async.
§ Asynchronous DB pool
The main difference between sync and async is that while the former spawns OS threads (involving the kernel scheduler and doing a lot of other stuff), the latter spawns Tasks
- subprocesses living inside the same thread - that are polled until completion. The underlying implementation is more complex(~ish) and platform dependent, so for more information go and find some online documentation about async in Rust (I suggest this video).
What matters is that for our purposes someone else is doing all the heavy lifting and we just need to slightly tweak our code to make it async.
Well, "slightly".
At the time (we are talking about 2019) I had evaluated three options for a threadpool: bb8 mobc and deadpool. The first one was not yet polished and the third looked to do more than I needed. Eventually I settled for the second one, it was simple and I could understand its code.
This turned out to be very useful because later on the maintainer of mobc
reorganized a bit the crates around mobc
in a way that forced me to pull some code of it into my application, so today I need you to see a bit of boilerplate. But no worries, it's just for my reference, you can just skip this block of code and jump to the usage in the application.
The code that follows is just implementing the Manager
trait. The trait requires two methods to be implemented, a connect()
(which establishes the connection to the DB) and a check()
that simply executes a "SELECT 1" to tell if the connection is still connected to the DB.
use mobc::{self, Pool};
use postgres::NoTls;
use tokio_postgres;
use async_trait::async_trait;
use mobc::Manager;
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
use tokio_postgres::Client;
use tokio_postgres::Config;
use tokio_postgres::Error;
use tokio_postgres::Socket;
pub struct PgConnectionManager<Tls> {
config: Config,
tls: Tls,
}
impl<Tls> PgConnectionManager<Tls> {
pub fn new(config: Config, tls: Tls) -> Self {
Self { config, tls }
}
}
#[async_trait]
impl<Tls> Manager for PgConnectionManager<Tls>
where
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
{
type Connection = Client;
type Error = Error;
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
let tls = self.tls.clone();
let (client, conn) = self.config.connect(tls).await?;
mobc::spawn(conn);
Ok(client)
}
async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
conn.simple_query("SELECT 1").await?;
Ok(conn)
}
}
And now let's see how to use this other threadpool. Basically it's the same code we have seen before, with some caveats:
// tokio runtime will use just one thread
#[tokio::main(flavor = "current_thread")]
async fn main() {
let manager = PgConnectionManager::new(
"host=localhost user=postgres"
.parse()
.unwrap(),
NoTls,
);
let pool = Pool::builder().max_open(10).build(manager);
let mut handles = vec![];
// Create a delay of 5 minutes
let delay = tokio::time::Duration::from_secs(300);
for i in 0..10i32 {
let pool = pool.clone();
handles.push(tokio::spawn(async move {
eprintln!("Task{}: waiting {}s", i, delay.as_secs());
let client = pool.get().await.unwrap();
// Zzzz...
tokio::time::sleep(delay).await;
client
.execute("INSERT INTO library (shelf) VALUES ($1)", &[&i.to_string()])
.await
.unwrap();
}));
}
for j in handles {
j.await.unwrap();
}
eprintln!("Tasks: all done");
}
Ok, let's unpack a few interesting bits. First of all we decorate the main
function with a feature of the tokio runtime, forcing it to use just one thread (btw, this is not tricking, it's just that tokio can only use a fixed number of OS threads, either 1,2,3,4,... but not "any") I've also sprinkled async
and await
keywords here and there because now the function is not blocking and every query on the DB is a Future (i.e. will finish "at some point in the future").
We also create the same delay
from before to slow down execution and take some screenshots. One important thing to notice about the delay is that now we use tokio::time
because we want to just delay the execution of that specific Future, not of the entire thread. We group these Futures and execute them with .await
, just like before we did with the threads.
Let's compare the systats. Database connections are always the same because I'm blocking the Future just after creating the connection.
$ sudo netstat -tanp | grep 5432 | grep ESTA | grep target
tcp 0 0 127.0.0.1:48926 127.0.0.1:5432 ESTABLISHED 1030904/target/debu
tcp 0 0 127.0.0.1:48924 127.0.0.1:5432 ESTABLISHED 1030904/target/debu
tcp 0 0 127.0.0.1:48938 127.0.0.1:5432 ESTABLISHED 1030904/target/debu
tcp 0 0 127.0.0.1:48940 127.0.0.1:5432 ESTABLISHED 1030904/target/debu
tcp 0 0 127.0.0.1:48932 127.0.0.1:5432 ESTABLISHED 1030904/target/debu
tcp 0 0 127.0.0.1:48936 127.0.0.1:5432 ESTABLISHED 1030904/target/debu
tcp 0 0 127.0.0.1:48930 127.0.0.1:5432 ESTABLISHED 1030904/target/debu
tcp 0 0 127.0.0.1:48942 127.0.0.1:5432 ESTABLISHED 1030904/target/debu
tcp 0 0 127.0.0.1:48934 127.0.0.1:5432 ESTABLISHED 1030904/target/debu
tcp 0 0 127.0.0.1:48944 127.0.0.1:5432 ESTABLISHED 1030904/target/debu
number of threads, from htop
:
691906 user 20 0 10184 6200 4256 S 0.0 0.0 0:00.62 │ └─ bash
1030904 user 20 0 76116 5696 5008 S 0.0 0.0 0:00.13 │ └─ test-mobc
1030906 user 20 0 76116 5696 5008 S 0.0 0.0 0:00.00 │ └─ test-mobc
Number of threads for the process descriptor:
$ grep Threads /proc/1030904/status
35:Threads: 2
The memory footprint reported from htop
now is also incredibly lower than the sync version (1/10th), but I would be cautious to say that is because of the async version (the code is quite different now). Asynchronous programming also force you to use a lot more libraries. Here's a thought about this.
We are getting more and more used to an npm
-style type of programming. If we are coding in Ruby, Python, Javascript, Rust ... we have package managers that make extremely easy to compose software by pulling dependencies. However dependencies are liabilities, it's code outside of our control and we should be careful and use external dependencies parsimoniously.
Why this digression? Because I was surprised when I wanted to demonstrate that my asynchronous example needs more dependencies than the synchronous one, our previous example.
The crates pulled as direct dependencies for the asynchronous code snippet are:
[dependencies]
mobc = { version = "0.7.3", features = ["tokio"] }
tokio-postgres = "0.7.5"
tokio = "1.15.0"
I can shave off something by excluding optional tokio stuff I don't need:
[dependencies]
mobc = { version = "0.7.3", features = ["tokio"] }
tokio-postgres = "0.7.5"
[dependencies.tokio]
version = "1.15.0"
default-features = false
features = ["macros", "time"]
But the total crate count is ... almost the same as of our previous example (94 instead of 93). Why is that? Let's investigate the dependency tree for r2d2-postgres
, the Postgres adaptor for r2d2
:
[dependencies]
r2d2 = "0.8"
postgres = "0.19"
Looks fine. Let's see what the postgres
crate pulls in.
[dependencies]
bytes = "1.0"
fallible-iterator = "0.2"
futures = "0.3"
tokio-postgres = { version = "0.7.2", path = "../tokio-postgres" }
tokio = { version = "1.0", features = ["rt", "time"] }
log = "0.4"
tokio
is also here! Right in the base crate for using a Postgres DB in Rust. The reason is that (as I've learned) some crates use asynchronous libraries in "blocking mode" to provide a synchronous API, here is where this happens in the postgres crate.
I'm thinking that the global software complexity of writing a small query to the DB is running out of my control.
§ Conclusions
And that's a wrap! Was that interesting? I hope so, because I'm not using any of this anymore! :D
Since a couple of months I have migrated to sqlx and this will be the topic for the next article: Using sqlx in a Rust project (subtitle: "if it works don't break unless there are good reasons to do so").