🦉
Programming Notes
  • My Programming Notes
  • CKA Exam Preparation
    • Certified Kubernetes Administrator
    • Setup Minikube
    • Network Design Principles
    • Role-Based Access Control (RBAC)
    • Namespace
    • Resource Quota
    • Pod
    • Deployment
    • Deployment: Rollout
    • ConfigMap
    • Service
    • Service: kubectl expose
    • Pod: Resources Management
    • Pod & Container: Quality of Service Class
    • Pod & Container: Probes
    • Limit Range
    • Scaling: Manual
    • Scaling: Horizontal Pod Autoscaler
    • Persistent Volume & Claim
    • Secret
    • Ingress: Routing
    • Ingress: TLS
    • Ingress: Rate Limit
    • Ingress: Basic Auth
    • Ingress: CRD (Custom Resource Definition)
    • Job
    • CronJob
    • Mutli-Node Cluster
  • Golang
    • Generics
    • Context
    • Goroutines and Channels in Go
    • Goroutine: Concurrency vs Parallelism
    • Goroutine: Performance & Tradeoffs
    • JSON: omitzero
  • Rust
    • Arrays & Slices
    • Closures
    • Generics & Traits
    • Iterators
    • Run Code Simultaneously
    • String vs &str
    • Tests
    • Rustlings Exercises
      • Variables
      • Functions
      • If
      • Primitive Types
      • Vectors
      • Move Semantics
      • Structs
      • Enums and Matching Pattern
      • Strings
      • Modules
      • Hashmaps
      • Options
      • Error Handling
      • Generics
      • Traits
      • Lifetimes
      • Tests
      • Iterators
      • Smart Pointers
      • Threads
      • Macros
      • Quiz 1
      • Quiz 2
      • Quiz 3
  • Software Engineering
    • CAP Theorem
    • Circuit Breaker
    • Decoupling
    • GraphQL: Query Caching
    • HMAC Signature
    • Idempotency
    • Monolith VS Microservice
    • OWASP Top 10 2021
    • PCI DSS
    • PostgreSQL: Partitioning
    • PostgreSQL: Replication
    • Protobuf & gRPC
    • Redis: Streams
    • Resource Scaling
    • Signed URL
    • SOLID
    • Stack VS Heap
    • Stateful VS Stateless
  • Site Reliability Engineering
    • Chaos Engineering
    • Distributed Tracing
    • Kubernetes (k8s)
    • SLA, SLO, and SLI Metrics
    • Site Reliability Engineer
  • Others
    • FFMPEG Cheat sheet
Powered by GitBook
On this page
  • threads1.rs
  • threads2.rs
  • threads3.rs
  1. Rust
  2. Rustlings Exercises

Threads

PreviousSmart PointersNextMacros

Last updated 4 months ago

  • Rust provides built-in support for concurrent programming through its standard library module std::thread.

  • We can spawn threads using std::thread::spawn, which takes a closure and runs it in a new thread.

  • In a multi-threaded program, if we need shared mutable state, we can use:

    • Arc<T>: An atomic reference-counted smart pointer for shared ownership across threads.

    • Mutex<T>: A synchronization primitive for mutual exclusion to safely access shared data.

  • Rust provides channels for thread communication in the std::sync::mpsc module.

    • mpsc::channel creates a transmitter (Sender) and a receiver (Receiver).

    • Messages are sent via the transmitter and received via the receiver.

    • The Sender can be cloned to send to the same channel multiple times, but only one Receiver is supported.

  • References:

threads1.rs

// This program spawns multiple threads that each runs for at least 250ms, and
// each thread returns how much time it took to complete. The program should
// wait until all the spawned threads have finished and should collect their
// return values into a vector.

use std::{
    thread,
    time::{Duration, Instant},
};

fn main() {
    let mut handles = Vec::new();
    for i in 0..10 {
        let handle = thread::spawn(move || {
            let start = Instant::now();
            thread::sleep(Duration::from_millis(250));
            println!("Thread {i} done");
            start.elapsed().as_millis()
        });
        handles.push(handle);
    }

    let mut results = Vec::new();
    for handle in handles {
        // TODO: Collect the results of all threads into the `results` vector.
        // Use the `JoinHandle` struct which is returned by `thread::spawn`.
        results.push(handle.join().unwrap());
    }

    if results.len() != 10 {
        panic!("Oh no! Some thread isn't done yet!");
    }

    println!();
    for (i, result) in results.into_iter().enumerate() {
        println!("Thread {i} took {result}ms");
    }
}
  • In this exercise we just need to wait the spawned thread to finish, get the result, and push it into results.

  • We can wait the thread to finish using join method.

    pub fn join(self) -> Result<T>: Waits for the associated thread to finish. This function will return immediately if the associated thread has already finished.

    results.push(handle.join().unwrap());

threads2.rs

// Building on the last exercise, we want all of the threads to complete their
// work. But this time, the spawned threads need to be in charge of updating a
// shared value: `JobStatus.jobs_done`

use std::{
    sync::{Arc, Mutex},
    thread,
    time::Duration,
};

struct JobStatus {
    jobs_done: u32,
}

fn main() {
    // Use Arc and Mutex
    let status = Arc::new(Mutex::new(JobStatus { jobs_done: 0 }));

    let mut handles = Vec::new();
    for _ in 0..10 {
        let status_shared = Arc::clone(&status);
        let handle = thread::spawn(move || {
            thread::sleep(Duration::from_millis(250));

            // Lock and update jobs_done
            status_shared.lock().unwrap().jobs_done += 1
        });
        handles.push(handle);
    }

    // Waiting for all jobs to complete.
    for handle in handles {
        handle.join().unwrap();
    }

    // Print the value of `JobStatus.jobs_done`.
    println!("Jobs done: {}", status.lock().unwrap().jobs_done);
}
  • In this exercise using Arc doesn't work because we also need mutability.

  • So we should add Mutex, A mutual exclusion primitive useful for protecting shared data.

    Mutex will block threads waiting for the lock to become available. The mutex can be created via a new constructor. Each mutex has a type parameter which represents the data that it is protecting. The data can only be accessed through the RAII guards returned from lock and try_lock, which guarantees that the data is only ever accessed when the mutex is locked.

  • First we add mutex like this:

    let status = Arc::new(Mutex::new(JobStatus { jobs_done: 0 }));
  • Then inside the spawned thread block we get lock and update the jobs_done like this:

    status_shared.lock().unwrap().jobs_done += 1
  • To get the value for print we can do the same with as above:

    println!("Jobs done: {}", status.lock().unwrap().jobs_done);

threads3.rs

use std::{sync::mpsc, thread, time::Duration};

struct Queue {
    first_half: Vec<u32>,
    second_half: Vec<u32>,
}

impl Queue {
    fn new() -> Self {
        Self {
            first_half: vec![1, 2, 3, 4, 5],
            second_half: vec![6, 7, 8, 9, 10],
        }
    }
}

fn send_tx(q: Queue, tx: mpsc::Sender<u32>) {
    // Clone the sender
    let tx1 = tx.clone();
    thread::spawn(move || {
        for val in q.first_half {
            println!("Sending {val:?}");
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_millis(250));
        }
    });

    thread::spawn(move || {
        for val in q.second_half {
            println!("Sending {val:?}");
            tx.send(val).unwrap();
            thread::sleep(Duration::from_millis(250));
        }
    });
}

fn main() {
    // You can optionally experiment here.
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn threads3() {
        let (tx, rx) = mpsc::channel();
        let queue = Queue::new();

        send_tx(queue, tx);

        let mut received = Vec::with_capacity(10);
        for value in rx {
            received.push(value);
        }

        received.sort();
        assert_eq!(received, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
    }
}
  • In this exercise we learn how to use channel.

  • Because we will use two thread to send data we need to clone the Sender and use it on the first thread.

    The Sender can be cloned to send to the same channel multiple times, but only one Receiver is supported.

    let tx1 = tx.clone();
    thread::spawn(move || {
        for val in q.first_half {
            println!("Sending {val:?}");
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_millis(250));
        }
    });
Using Threads to Run Code Simultaneously
Using Message Passing to Transfer Data Between Threads