Stream 介紹
Stream和Future類似,但是Future對應的是一個item的狀態的變化,而Stream則是類似於iterator,在結束之前能夠得到多個值。或者我們可以簡單的理解為,Stream是由一系列的Future組成,我們可以從Stream讀取各個Future的結果,直到Stream結束。
Stream trait定義
定義如下:
<code>trait
Stream
{type
Item
;fn
poll_next
(self
: PinmutSelf
>, lw: &LocalWaker) -> Poll<Option
>; } /<code>
poll_next函數有三種可能的返回值,分別如下:
- Poll::Pending 說明下一個值還沒有就緒,仍然需要等待。
- Poll::Ready(Some(val)) 已經就緒,成功返回一個值,程序可以通過調用poll_next再獲取下一個值。
- Poll::Ready(None) 表示Stream已經結束,不應該再調用poll_next。
迭代
和同步的Iterator類似,Stream可以迭代處理其中的值,如使用map, filter, fold, try_map, try_filter, and try_fold等。但是Stream不支持使用for,而while let和 next/try_next則是允許的。 例子如下:
<code>async
fn
sum_with_next
(mut
stream: Pinmutdyn
Streami32>>) ->i32
{use
futures::stream::StreamExt;let
mut
sum =0
;while
let
Some
(item) = stream.next().await
{ sum += item; } sum }async
fn
sum_with_try_next
(mut
stream: Pinmutdyn
StreamResult<i32
, io::Error>>>, ) ->Result
<i32
, io::Error> {use
futures::stream::TryStreamExt;let
mut
sum =0
;while
let
Some
(item) = stream.try_next().await
? { sum += item; }Ok
(sum) } /<code>
併發
上面的使用的迭代處理,如果我們要併發的處理流,則應該使用for_each_concurrent和 try_for_each_concurrent,示例如下:
<code>async
fn
jump_around
(mut
stream: Pinmutdyn
StreamResult<u8
, io::Error>>>, ) ->Result
{use
futures::stream::TryStreamExt;const
MAX_CONCURRENT_JUMPERS:usize
=100
; stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num|async
move
{ jump_n_times(num).await
?; report_n_jumps(num).await
?;Ok
(()) }).await
?;Ok
(()) } /<code>
Rust異步編程