Rust: DB connection threadpool

13 minute read Published: 2023-01-09

This is the 4th article in the series of learning more about Postgres and applying it to a Rust backend.

Here is a index of them all.

In this episode we are going to explore how to build and use a connection threadpool.

Using a DB connection threadpool is one of the first thing that is likely to be planned in any application that will sustain a constant stream of users and therefore many DB connections. The reason is that creating a socket is always expensive (in terms of computational resources), so one of the solutions is to create a pool of "ready-to-use" connections at startup of the application, keep them ready and distribute them everytime the application needs to query the database. Once the application has finished that specific work with the DB (for example, creating a new user), the connection handle will be returned to the pool for the next DB operation. All this is basically the job of a connection threadpool.

In my time using Rust I've used two solutions for a connection threadpool: a synchronous and an asynchrounous pooler. They basically do the same thing but the second one is preferable when the whole application is async, otherwise you would need to spawn a new thread everytime a connection from the pool is being used. The whole point of an asynchronous application from top to bottom is to avoid spawning too many OS threads and instead rely on tasks (which have a different lifecycle, that I'm not going to detail here) that live inside the same thread.

Note: a common misconception is that a synchronous application is always slow and asynchronous programming is always TEH WAY TO GO. This is not always true. Spawning threads is not so expensive like it used to be (in the Linux kernel). Asynchronous programming is also a bit more tricky and it should be evaluated if the cost is worth the benefits. I know this sounds like hand-waving but I've seen too often talking about sync vs. async like it's a dick-measuring contest - the reality is that every situation is different and what works for someone else might not be ideal for me.

For completeness, we will see both solutions.

§ Synchronous DB pool

To summarize the concept: the DB pool manager holds a pool of DB connections and everytime the application asks for one connection, the pooler will open a thread so the whole operation won't be blocking.

Let's see for example a widely used Rust crate, r2d2 coupled with a Postgres database. Let's copy and paste the first example on the README.md of that crate.

First add the dependencies to our project Cargo.toml:

[dependencies]
r2d2 = "0.8.9"
r2d2_postgres = "0.18.1"

Note: although I have only 2 dependencies, the transient libraries they depend on accounted for a total of 93 crates (external dependencies pulled in automatically) when I compiled the project. Keep in mind this number for later.

use std::thread;
use r2d2_postgres::{postgres::NoTls, PostgresConnectionManager};

fn main() {
    let manager = PostgresConnectionManager::new(
        "host=localhost user=postgres".parse().unwrap(),
        NoTls,
    );
    let pool = r2d2::Pool::new(manager).unwrap();
    let mut handles = vec![];

    for i in 0..10i32 {
        let pool = pool.clone();
        handles.push(thread::spawn(move || {
            eprintln!("Thread{}", i);
            let mut client = pool.get().unwrap();
            client.execute("INSERT INTO tbl (field) VALUES ($1)", &[&i]).unwrap();
        }));
    }
    for h in handles {
        h.join().unwrap();
    }
}

This code will create a pool then will spawn 10 threads, each will execute an INSERT into a table. All the threads need to use join so they are "waited" until finish. After running this we will have 10 records in that table (try it!).

Let's tweak that code for a moment and add a huge delay before executing each thread, so we have time to observe what is happening:

    ...
    // delay thread execution for 5 minutes 
    let delay = core::time::Duration::from_secs(300);
    for i in 0..10i32 {
        let pool = pool.clone();
        handles.push(thread::spawn(move || {
            eprintln!("Thread{}: waiting {}s", i, delay.as_secs());
            // Zzz....
            thread::sleep(delay);
            let mut client = pool.get().unwrap();
            client
                .execute("INSERT INTO library (shelf) VALUES ($1)", &[&i.to_string()])
                .unwrap();
        }));
    }
    ...

Now I'll re-run this code and observe that the application "hangs":

$ cargo run
...
Thread0: waiting 300s
Thread1: waiting 300s
Thread3: waiting 300s
Thread2: waiting 300s
Thread5: waiting 300s
Thread6: waiting 300s
Thread7: waiting 300s
Thread4: waiting 300s
Thread9: waiting 300s
Thread8: waiting 300s

Check the open sockets (filtering out all the noise) and we see 10 open connections to Postgres:

$ sudo netstat -tanp | grep 5432 | grep ESTABLISHED | grep debug

tcp        0      0 127.0.0.1:48506         127.0.0.1:5432          ESTABLISHED 765750/target/debug
tcp        0      0 127.0.0.1:48502         127.0.0.1:5432          ESTABLISHED 765750/target/debug
tcp        0      0 127.0.0.1:48504         127.0.0.1:5432          ESTABLISHED 765750/target/debug
tcp        0      0 127.0.0.1:48496         127.0.0.1:5432          ESTABLISHED 765750/target/debug
tcp        0      0 127.0.0.1:48500         127.0.0.1:5432          ESTABLISHED 765750/target/debug
tcp        0      0 127.0.0.1:48494         127.0.0.1:5432          ESTABLISHED 765750/target/debug
tcp        0      0 127.0.0.1:48490         127.0.0.1:5432          ESTABLISHED 765750/target/debug
tcp        0      0 127.0.0.1:48498         127.0.0.1:5432          ESTABLISHED 765750/target/debug
tcp        0      0 127.0.0.1:48488         127.0.0.1:5432          ESTABLISHED 765750/target/debug
tcp        0      0 127.0.0.1:48492         127.0.0.1:5432          ESTABLISHED 765750/target/debug

Let's see htop:

 691906 user       20   0  8320  5268  3480 S  0.0  0.0  0:00.07 │     └─ bash
 765750 user       20   0  865M  5004  4372 S  0.0  0.0  0:00.10 │        └─ r2d2-postgres
 765949 user       20   0  865M  5004  4372 S  0.0  0.0  0:00.00 │           ├─ r2d2-postgres
 765950 user       20   0  865M  5004  4372 S  0.0  0.0  0:00.00 │           ├─ r2d2-postgres
 765951 user       20   0  865M  5004  4372 S  0.0  0.0  0:00.00 │           ├─ r2d2-postgres
 765964 user       20   0  865M  5004  4372 S  0.0  0.0  0:00.00 │           ├─ r2d2-postgres
 765965 user       20   0  865M  5004  4372 S  0.0  0.0  0:00.00 │           ├─ r2d2-postgres
 765966 user       20   0  865M  5004  4372 S  0.0  0.0  0:00.00 │           ├─ r2d2-postgres
 765967 user       20   0  865M  5004  4372 S  0.0  0.0  0:00.00 │           ├─ r2d2-postgres
 765968 user       20   0  865M  5004  4372 S  0.0  0.0  0:00.00 │           ├─ r2d2-postgres
 765969 user       20   0  865M  5004  4372 S  0.0  0.0  0:00.00 │           ├─ r2d2-postgres
 765970 user       20   0  865M  5004  4372 S  0.0  0.0  0:00.00 │           ├─ r2d2-postgres
 765973 user       20   0  865M  5004  4372 S  0.0  0.0  0:00.00 │           ├─ r2d2-postgres
 765974 user       20   0  865M  5004  4372 S  0.0  0.0  0:00.00 │           ├─ r2d2-postgres
 765975 user       20   0  865M  5004  4372 S  0.0  0.0  0:00.00 │           └─ r2d2-postgres

Let's pick the parent PID and check that threads count of our Rust process:

$ grep Threads /proc/765750/status
35:Threads:	14

Ok, cool! We now see what happens when we pop threads like candies. Let's see now the corresponding code async.

§ Asynchronous DB pool

The main difference between sync and async is that while the former spawns OS threads (involving the kernel scheduler and doing a lot of other stuff), the latter spawns Tasks - subprocesses living inside the same thread - that are polled until completion. The underlying implementation is more complex(~ish) and platform dependent, so for more information go and find some online documentation about async in Rust (I suggest this video).

What matters is that for our purposes someone else is doing all the heavy lifting and we just need to slightly tweak our code to make it async.

Well, "slightly".

At the time (we are talking about 2019) I had evaluated three options for a threadpool: bb8 mobc and deadpool. The first one was not yet polished and the third looked to do more than I needed. Eventually I settled for the second one, it was simple and I could understand its code.

This turned out to be very useful because later on the maintainer of mobc reorganized a bit the crates around mobc in a way that forced me to pull some code of it into my application, so today I need you to see a bit of boilerplate. But no worries, it's just for my reference, you can just skip this block of code and jump to the usage in the application.

The code that follows is just implementing the Manager trait. The trait requires two methods to be implemented, a connect() (which establishes the connection to the DB) and a check() that simply executes a "SELECT 1" to tell if the connection is still connected to the DB.

use mobc::{self, Pool};
use postgres::NoTls;
use tokio_postgres;

use async_trait::async_trait;
use mobc::Manager;
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
use tokio_postgres::Client;
use tokio_postgres::Config;
use tokio_postgres::Error;
use tokio_postgres::Socket;

pub struct PgConnectionManager<Tls> {
    config: Config,
    tls: Tls,
}

impl<Tls> PgConnectionManager<Tls> {
    pub fn new(config: Config, tls: Tls) -> Self {
        Self { config, tls }
    }
}

#[async_trait]
impl<Tls> Manager for PgConnectionManager<Tls>
where
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
{
    type Connection = Client;
    type Error = Error;

    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
        let tls = self.tls.clone();
        let (client, conn) = self.config.connect(tls).await?;
        mobc::spawn(conn);
        Ok(client)
    }

    async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
        conn.simple_query("SELECT 1").await?;
        Ok(conn)
    }
}

And now let's see how to use this other threadpool. Basically it's the same code we have seen before, with some caveats:

// tokio runtime will use just one thread
#[tokio::main(flavor = "current_thread")]
async fn main() {
    let manager = PgConnectionManager::new(
        "host=localhost user=postgres"
            .parse()
            .unwrap(),
        NoTls,
    );
    let pool = Pool::builder().max_open(10).build(manager);
    let mut handles = vec![];
    // Create a delay of 5 minutes
    let delay = tokio::time::Duration::from_secs(300);
    for i in 0..10i32 {
        let pool = pool.clone();
        handles.push(tokio::spawn(async move {
            eprintln!("Task{}: waiting {}s", i, delay.as_secs());
            let client = pool.get().await.unwrap();
            // Zzzz...
            tokio::time::sleep(delay).await;
            client
                .execute("INSERT INTO library (shelf) VALUES ($1)", &[&i.to_string()])
                .await
                .unwrap();
        }));
    }
    for j in handles {
        j.await.unwrap();
    }
    eprintln!("Tasks: all done");
}

Ok, let's unpack a few interesting bits. First of all we decorate the main function with a feature of the tokio runtime, forcing it to use just one thread (btw, this is not tricking, it's just that tokio can only use a fixed number of OS threads, either 1,2,3,4,... but not "any") I've also sprinkled async and await keywords here and there because now the function is not blocking and every query on the DB is a Future (i.e. will finish "at some point in the future").

We also create the same delay from before to slow down execution and take some screenshots. One important thing to notice about the delay is that now we use tokio::time because we want to just delay the execution of that specific Future, not of the entire thread. We group these Futures and execute them with .await, just like before we did with the threads.

Let's compare the systats. Database connections are always the same because I'm blocking the Future just after creating the connection.

$ sudo netstat -tanp | grep 5432 | grep ESTA | grep target
tcp        0      0 127.0.0.1:48926         127.0.0.1:5432          ESTABLISHED 1030904/target/debu 
tcp        0      0 127.0.0.1:48924         127.0.0.1:5432          ESTABLISHED 1030904/target/debu 
tcp        0      0 127.0.0.1:48938         127.0.0.1:5432          ESTABLISHED 1030904/target/debu 
tcp        0      0 127.0.0.1:48940         127.0.0.1:5432          ESTABLISHED 1030904/target/debu 
tcp        0      0 127.0.0.1:48932         127.0.0.1:5432          ESTABLISHED 1030904/target/debu 
tcp        0      0 127.0.0.1:48936         127.0.0.1:5432          ESTABLISHED 1030904/target/debu 
tcp        0      0 127.0.0.1:48930         127.0.0.1:5432          ESTABLISHED 1030904/target/debu 
tcp        0      0 127.0.0.1:48942         127.0.0.1:5432          ESTABLISHED 1030904/target/debu 
tcp        0      0 127.0.0.1:48934         127.0.0.1:5432          ESTABLISHED 1030904/target/debu 
tcp        0      0 127.0.0.1:48944         127.0.0.1:5432          ESTABLISHED 1030904/target/debu

number of threads, from htop:

 691906 user       20   0 10184  6200  4256 S  0.0  0.0  0:00.62 │     └─ bash
1030904 user       20   0 76116  5696  5008 S  0.0  0.0  0:00.13 │        └─ test-mobc
1030906 user       20   0 76116  5696  5008 S  0.0  0.0  0:00.00 │           └─ test-mobc

Number of threads for the process descriptor:

$ grep Threads /proc/1030904/status
35:Threads:	2

The memory footprint reported from htop now is also incredibly lower than the sync version (1/10th), but I would be cautious to say that is because of the async version (the code is quite different now). Asynchronous programming also force you to use a lot more libraries. Here's a thought about this.

We are getting more and more used to an npm-style type of programming. If we are coding in Ruby, Python, Javascript, Rust ... we have package managers that make extremely easy to compose software by pulling dependencies. However dependencies are liabilities, it's code outside of our control and we should be careful and use external dependencies parsimoniously.

Why this digression? Because I was surprised when I wanted to demonstrate that my asynchronous example needs more dependencies than the synchronous one, our previous example.

The crates pulled as direct dependencies for the asynchronous code snippet are:

[dependencies]
mobc = { version = "0.7.3", features = ["tokio"] }
tokio-postgres = "0.7.5"
tokio = "1.15.0"

I can shave off something by excluding optional tokio stuff I don't need:

[dependencies]
mobc = { version = "0.7.3", features = ["tokio"] }
tokio-postgres = "0.7.5"

[dependencies.tokio]
version = "1.15.0"
default-features = false
features = ["macros", "time"]

But the total crate count is ... almost the same as of our previous example (94 instead of 93). Why is that? Let's investigate the dependency tree for r2d2-postgres, the Postgres adaptor for r2d2:

[dependencies]
r2d2 = "0.8"
postgres = "0.19"

Looks fine. Let's see what the postgres crate pulls in.

[dependencies]
bytes = "1.0"
fallible-iterator = "0.2"
futures = "0.3"
tokio-postgres = { version = "0.7.2", path = "../tokio-postgres" }
tokio = { version = "1.0", features = ["rt", "time"] }
log = "0.4"

tokio is also here! Right in the base crate for using a Postgres DB in Rust. The reason is that (as I've learned) some crates use asynchronous libraries in "blocking mode" to provide a synchronous API, here is where this happens in the postgres crate.

I'm thinking that the global software complexity of writing a small query to the DB is running out of my control.

§ Conclusions

And that's a wrap! Was that interesting? I hope so, because I'm not using any of this anymore! :D

Since a couple of months I have migrated to sqlx and this will be the topic for the next article: Using sqlx in a Rust project (subtitle: "if it works don't break unless there are good reasons to do so").