用Rust編寫web server,實現線程池的清除

在之前用線程池實現的web server中,每個工作線程中通過loop進行循環,從channel的接收端等待任務,然後執行。但是代碼中沒有提供一種機制,來通知這些工作線程結束。本節就是在之前的基礎上,來實現線程池對象的正確清除。

通過為ThreadPool實現Drop trait來實現線程池對象清除

修改Worker如下:

<code>struct Worker {
id: usize,
thread: Option<:joinhandle>>,
}/<code>
<code>impl Worker {
fn new(id: usize, receiver: Arc<mutex>>>) -> Worker {
// --snip--

Worker {
id,
thread: Some(thread),
}
}
}/<mutex>/<code>

為ThreadPool實現Drop:

<code>impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);

if let Some(thread) = worker.thread.take() {//知識點:Option的take方法
thread.join().unwrap();
}
}
}
}/<code>

通知worker線程結束

修改發送內容的格式:

<code>enum Message {
NewJob(Job),
Terminate,
}/<code>

修改ThreadPool:

<code>pub struct ThreadPool {
workers: Vec<worker>,
sender: mpsc::Sender<message>,
}

// --snip--

impl ThreadPool {
// --snip--

pub fn execute(&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);

self.sender.send(Message::NewJob(job)).unwrap();
}
}
/<message>/<worker>/<code>

修改Worker:

<code>impl Worker {
fn new(id: usize, receiver: Arc<mutex>>>) ->
Worker {

let thread = thread::spawn(move ||{
loop {
let message = receiver.lock().unwrap().recv().unwrap();

match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);

job();
},
Message::Terminate => {
println!("Worker {} was told to terminate.", id);

break;
},
}
}
});

Worker {

id,
thread: Some(thread),
}
}
}/<mutex>/<code>

再在ThreadPool的Drop實現中添加發送結束信息:

<code>impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Sending terminate message to all workers.");

for _ in &mut self.workers {
self.sender.send(Message::Terminate).unwrap();
}

println!("Shutting down all workers.");

for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);

if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}/<code>

修改主函數:

<code>//src/main.rs
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);

for stream in listener.incoming().take(2) {//take 方法定義於 Iterator trait,這裡限制循環最多頭 2 次
let stream = stream.unwrap();

pool.execute(|| {
handle_connection(stream);
});
}

println!("Shutting down.");
}/<code>

總結

web server這個例子來自於The Rust這本教材,通過這個例子,我們能基本對如何用Rust寫一個簡單的線程池有一個瞭解,用到的知識點主要是channel、thread、Arc等知識點。

後續我們還將為大家寫一個簡單的區塊鏈的例子。

關注令狐一衝,關注區塊鏈和Rust。


分享到:


相關文章: