alxolr

posts about software engineering craft

Rust bulk insert to PostgreSQL using sqlx

Rust bulk insert to PostgreSQL using sqlx

Reference

Sqlx doc reference: How can I bind an array to a VALUES() clause? How can I do bulk inserts?

Intro

Example on how to insert a large amount of data to PostgreSQL using Rust and sqlx.

The idea is to leverage the concept SELECT * FROM UNNEST ()

Structure

Given the following structure of user table:

CREATE TABLE IF NOT EXISTS users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(128) NOT NULL,
    address JSONB NOT NULL,
    ref_1 INT4 NOT NULL,
    ref_2 INT8 NOT NULL,
    created_at TIMESTAMP NOT NULL
    deleted_at TIMESTAMP
);

Rust code for the user structure and bulk inserts:

use sqlx::prelude::*;
use sqlx::types::JsonValue;
use sqlx::{Pool, Postgres};

struct UserForCreate {
    name: String,
    address: JsonValue,
    ref_1: i32, // this maps to INT4
    ref_2: i64, // this maps to INT8
    created_at: chrono::NaiveDateTime,
    deleted_at: Option<chrono::NaiveDateTime>,
}

#[derive(FromRow)]
struct Id {
    id: i64,
}

async fn bulk_insert(db: &Pool<Postgres>, users: &[UserForCreate]) -> Vec<i64> {
    let names: Vec<String> = users.iter().map(|u| u.name.clone()).collect();
    let addresses: Vec<JsonValue> = users.iter().map(|u| u.address.clone()).collect();
    let ref_1s: Vec<i32> = users.iter().map(|u| u.ref_1).collect();
    let ref_2s: Vec<i64> = users.iter().map(|u| u.ref_2).collect();
    let created_ats: Vec<chrono::NaiveDateTime> = users.iter().map(|u| u.created_at).collect();
    let deleted_ats: Vec<Option<chrono::NaiveDateTime>> =
        users.iter().map(|u| u.deleted_at).collect();

    let sql = r"
            INSERT INTO users
             (
                name,
                address,
                ref_1,
                ref_2,
                created_at,
                deleted_at
            ) SELECT * FROM UNNEST(
                $1::VARCHAR(128)[],
                $2::JSONB[],
                $3::INT4[],
                $4::INT8[],
                $5::TIMESTAMP[],
                $6::TIMESTAMP[]
            ) RETURNING id";

    let rows = sqlx::query_as::<_, Id>(sql)
        .bind(names)
        .bind(addresses)
        .bind(ref_1s)
        .bind(ref_2s)
        .bind(created_ats)
        .bind(deleted_ats)
        .fetch_all(db)
        .await
        .unwrap();

    rows.into_iter().map(|r| r.id).collect()
}

I hope that this article was helpful. If you like it, please share it with your friends and leave a comment; I will gladly answer all the questions.

Related articles

×