Rust 異步編程,Stream 介紹

Stream 介紹

Stream和Future類似,但是Future對應的是一個item的狀態的變化,而Stream則是類似於iterator,在結束之前能夠得到多個值。或者我們可以簡單的理解為,Stream是由一系列的Future組成,我們可以從Stream讀取各個Future的結果,直到Stream結束。

Stream trait定義

定義如下:

<code>

trait

Stream

{

type

Item

;

fn

poll_next

(

self

: Pinmut

Self

>, lw: &LocalWaker) -> Poll<

Option

>; } /<code>

poll_next函數有三種可能的返回值,分別如下:

  • Poll::Pending 說明下一個值還沒有就緒,仍然需要等待。
  • Poll::Ready(Some(val)) 已經就緒,成功返回一個值,程序可以通過調用poll_next再獲取下一個值。
  • Poll::Ready(None) 表示Stream已經結束,不應該再調用poll_next。

迭代

和同步的Iterator類似,Stream可以迭代處理其中的值,如使用map, filter, fold, try_map, try_filter, and try_fold等。但是Stream不支持使用for,而while let和 next/try_next則是允許的。 例子如下:

<code>

async

fn

sum_with_next

(

mut

stream: Pinmut

dyn

Streami32>>) ->

i32

{

use

futures::stream::StreamExt;

let

mut

sum =

0

;

while

let

Some

(item) = stream.next().

await

{ sum += item; } sum }

async

fn

sum_with_try_next

(

mut

stream: Pinmut

dyn

StreamResult<

i32

, io::Error>>>, ) ->

Result

<

i32

, io::Error> {

use

futures::stream::TryStreamExt;

let

mut

sum =

0

;

while

let

Some

(item) = stream.try_next().

await

? { sum += item; }

Ok

(sum) } /<code>

併發

上面的使用的迭代處理,如果我們要併發的處理流,則應該使用for_each_concurrent和 try_for_each_concurrent,示例如下:

<code>

async

fn

jump_around

(

mut

stream: Pinmut

dyn

StreamResult<

u8

, io::Error>>>, ) ->

Result

{

use

futures::stream::TryStreamExt;

const

MAX_CONCURRENT_JUMPERS:

usize

=

100

; stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num|

async

move

{ jump_n_times(num).

await

?; report_n_jumps(num).

await

?;

Ok

(()) }).

await

?;

Ok

(()) } /<code>

Rust異步編程


分享到:


相關文章: