В современном программировании важно ускорять приложения, особенно при одновременном выполнении задач. При многопоточности разные части программы выполняются параллельно, это один из способов ускорения.
Но многопоточность чревата багами, например когда два потока обращаются к одним и тем же данным одновременно. Правилами владения Rust эти проблемы предотвращаются, параллельное выполнение становится безопаснее. Попробуем ускорить приложение, сохраняя код простым и безопасным.
Начнем со структуры проекта:
src/
├── main.rs
├── config.rs
├── db_schema.rs
├── db_order.rs
├── export_csv.rs
Cargo.toml
cargo new multi_schema_csv_export
Заменяем содержимое Cargo.toml вот этим:
[package]
name = "rust-multischema-csv-export"
version = "0.1.0"
edition = "2021"
[dependencies]
sqlx = { version = "0.6", features = ["runtime-tokio-native-tls", "postgres", "chrono"] }
tokio = { version = "1", features = ["full"] }
chrono = "0.4"
В каталоге src создаем файлы config.rs, db_schema.rs, db_order.rs, export_csv.rs и копируем этот код:
config.rs
В этом файле содержатся конфигурационные константы для приложений, хранится URL-адрес базы данных, легко обновляемый для различных сред. Централизацией конфигурирования упрощаются управление настройками и их изменение в приложениях.
pub const DATABASE_URL: &str = "postgres://username:password@localhost/database_name";
Теперь сделаем функцию для создания схем и таблиц, вставки образцов данных. Этими функциями настраиваются структуры баз данных, предоставляются тестовые данные для многосхемного сценария:
use sqlx::postgres::PgPool;
pub async fn create_schema_and_table(pool: &PgPool, schema_name: &str) -> Result<(), sqlx::Error> {
sqlx::query(&format!(
"DROP TABLE IF EXISTS {}.orders",
schema_name
))
.execute(pool)
.await?;
sqlx::query(&format!(
"CREATE TABLE IF NOT EXISTS {}.orders (
id SERIAL PRIMARY KEY,
customer_name TEXT NOT NULL,
order_date TIMESTAMP NOT NULL,
total_amount FLOAT8 NOT NULL
)",
schema_name
))
.execute(pool)
.await?;
Ok(())
}
pub async fn insert_sample_data(pool: &PgPool, schema_name: &str) -> Result<(), sqlx::Error> {
sqlx::query(&format!(
"INSERT INTO {}.orders (customer_name, order_date, total_amount) VALUES
('Alice', '2023-01-01 10:00:00', 100.50),
('Bob', '2023-01-02 11:00:00', 200.75),
('Charlie', '2023-01-03 12:00:00', 150.25)",
schema_name
))
.execute(pool)
.await?;
Ok(())
}
Теперь сделаем структуру Order и предоставим функцию для извлечения заказов из базы данных, этой struct
представляется структура данных заказа, в том числе названия схемы. При помощи fetch_orders извлекаются все заказы из заданной схемы, включая название схемы в результате, эта функция важна для многосхемного подхода, данные извлекаются ею из разных схем динамически.
use sqlx::postgres::{PgPool, PgRow};
use sqlx::Row;
use chrono::NaiveDateTime;
#[derive(Debug)]
pub struct Order {
pub id: i32,
pub customer_name: String,
pub order_date: NaiveDateTime,
pub total_amount: f64,
pub schema_name: String,
}
pub async fn fetch_orders(pool: &PgPool, schema_name: &str) -> Result<Vec<Order>, sqlx::Error> {
let query = format!(
r#"
SELECT
id,
customer_name,
order_date,
total_amount,
'{schema_name}' as schema_name
FROM {schema_name}.orders
"#,
schema_name = schema_name
);
let orders = sqlx::query(&query)
.bind(schema_name)
.map(|row: PgRow| Order {
id: row.get("id"),
customer_name: row.get("customer_name"),
order_date: row.get("order_date"),
total_amount: row.get("total_amount"),
schema_name: row.get("schema_name"),
})
.fetch_all(pool)
.await?;
Ok(orders)
}
export_csv.rs
Теперь сделаем функцию для экспорта данных заказа в CSV-файлы. В export_to_csv
принимается вектор структур Order, которые записываются в CSV-файл с названием схемы. Для каждой схемы создается файл, сначала записывается строка заголовка, а затем — каждый заказ в виде строки CSV.
use std::fs::File;
use std::io::Write;
use crate::db_order::Order;
pub fn export_to_csv(orders: Vec<Order>, schema_name: &str) -> std::io::Result<()> {
let filename = format!("{}_orders.csv", schema_name);
let mut file = File::create(filename)?;
writeln!(file, "id,customer_name,order_date,total_amount,schema_name")?;
for order in orders {
writeln!(
file,
"{},{},{},{},{}",
order.id,
order.customer_name,
order.order_date,
order.total_amount,
order.schema_name
)?;
}
Ok(())
}
Заменяем содержимое main.rs вот этим:
use sqlx::postgres::PgPoolOptions;
use std::sync::Arc;
use std::thread;
use tokio;
mod config;
mod db_schema;
mod db_order;
mod export_csv;
use db_order::fetch_orders;
use db_schema::{create_schema_and_table, insert_sample_data};
use export_csv::export_to_csv;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(&config::DATABASE_URL)
.await?;
let schemas = vec!["schema1", "schema2", "schema3", "schema4", "schema5"];
// Создаются схемы и таблицы, вставляются образцы данных
for schema in &schemas {
create_schema_and_table(&pool, schema).await?;
insert_sample_data(&pool, schema).await?;
}
let pool = Arc::new(pool);
let mut handles = vec![];
for schema in schemas {
let pool_clone = Arc::clone(&pool);
let schema_name = schema.to_string();
let handle = thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("Processing schema: {}", schema_name);
match fetch_orders(&pool_clone, &schema_name).await {
Ok(orders) => {
println!("Fetched {} orders from {}", orders.len(), schema_name);
if let Err(e) = export_to_csv(orders, &schema_name) {
eprintln!("Error exporting CSV for {}: {}", schema_name, e);
} else {
println!("Successfully exported CSV for {}", schema_name);
}
}
Err(e) => eprintln!("Error fetching orders from {}: {}", schema_name, e),
}
});
});
handles.push(handle);
}
// Ожидается завершение всех потоков
for handle in handles {
handle.join().unwrap();
}
println!("All schemas processed and CSV files generated.");
Ok(())
}
Этот файл — точка входа приложений, здесь настраивается подключение к базе данных, создаются схемы и порождаются потоки для одновременной обработки каждой схемы. Каждым потоком заказы извлекаются из присвоенной ему схемы и экспортируются в CSV-файл. Функцией main
организуется весь процесс — от настройки базы данных до многопоточной обработки и экспорта в CSV.
Запускаем этот проект, обновив подключения к базе данных, связав main.rs с учетными данными PostgreSQL:
cargo run
Этим проектом на Rust демонстрируется мощный подход к выполнению операций в многосхемных базах данных и к параллельной обработке данных. Из строгой системы типов Rust, асинхронных возможностей и потоковой модели мы создали надежное приложение, которым эффективно параллельно обрабатываются данные многосхемных баз данных.
Читайте также:
- Понимание и реализация смарт-указателя Arc и мьютекса на Rust
- От отправителя к получателю: подход Rust к локальной передаче файлов
- Индексация строк в Rust и TypeScript в сравнениях
Читайте нас в Telegram, VK и Дзен
Перевод статьи Andriat Ratyanto: Multithreading in Rust: Unlocking Faster and More Efficient Applications