TensorFlow實現時間序列預測

常常會碰到各種各樣時間序列預測問題,如商場人流量的預測、商品價格的預測、股價的預測,等等。TensorFlow新引入了一個TensorFlow Time Series庫(以下簡稱為TFTS),它可以幫助在TensorFlow中快速搭建高性能的時間序列預測系統,並提供包括AR、LSTM在內的多個模型。

時間序列問題

一般而言,時間序列數據抽象為兩部分:觀察的時間點和觀察的值 (以商品價格為例,某年一月的價格為120元,二月的價格為130元,三月的價格為135元,四月的價格為132元。那麼觀察的時間點可以看作是1,2,3,4,而在各時間點上觀察到的數據的值為120,130,135,132) 。觀察的時間點可以不連續,比如二月的數據有缺失,那麼實際的觀察時間點為1,3,4,對應的數據為120,135,132。 所謂時間序列預測,是指預測某些未來的時間點上(如5,6)數據的值應該是多少 。

TFTS庫按照時間點+觀察值的方式對時間序列問題進行抽象包裝。觀察的 時間點用“times”表示 ,對應的 值用“values”表示 。在訓練模型時,輸入數據需要同時具有times和values兩個字段;在預測時,需要給定一些初始的數值,以及需要預測的時間點times。

讀入時間序列數據

在訓練模型之前,需要將時間序列數據讀入成Tensor的形式。 TFTS 庫中提供了兩個方便的讀取器

  • NumpyReader :用於從Numpy數組中讀入數據
  • CSVReader :用於從CSV文件中讀入數據

從Numpy數組中讀取時間序列

導入需要的包及函數

<code>

import

numpy

as

np

import

matplotlib matplotlib.use(

'agg'

)

import

matplotlib.pyplot

as

plt

import

tensorflow

as

tf

from

tensorflow.contrib.timeseries.python.timeseries

import

NumpyReader /<code>

接著,利用np.sin生成一個實驗用的時間序列數據。該時間序列數據實際上是在正弦曲線上加入了上升的趨勢和一些隨機的噪聲:

<code>x = np.array(range(

1000

)) noise = np.

random

.uniform(

-0.2

,

0.2

,

1000

) y = np.

sin

(np.

pi

* x /

100

) + x /

200.

+ noise plt.plot(x, y) plt.savefig(

'timeseries_y.jpg'

) /<code>
TensorFlow實現時間序列預測

橫座標對應變量“x”,縱座標對應變量“y”,它們分別對應之前提到過的“觀察的時間點”和“觀察到的值”。TFTS讀入x和y的方式非常簡單,請看下面的代碼:

<code>

data

= { tf.contrib.timeseries.TrainEvalFeatures.TIMES: x, tf.contrib.timeseries.TrainEvalFeatures.VALUES: y, } reader = NumpyReader(

data

) /<code>

首先把x和y變成Python中的字典(變量data)。上面的定義直接寫成“ data={‘times':x, ‘values':y} ”也是可以的。寫成比較複雜的形式是為了和源碼中的寫法保持一致。

得到的reader有一個read_full()方法,它的返回值是時間序列對應的Tensor,可以用下面的代碼進行試驗:

<code>

with

tf.Session() as sess:

full_data

=

reader.read_full()

coord

=

tf.train.Coordinator()

threads

=

tf.train.start_queue_runners(sess=sess, coord=coord)

print(sess.run(full_data))

coord.request_stop()

/<code>

在訓練時,通常不會使用整個數據集進行訓練,而是採用batch的形式。從reader出發,建立batch數據的方法也很簡單:

<code>train_input_fn = tf.contrib.timeseries.RandomWindowInputFn(
    reader, batch_size=2, window_size=10)
/<code>


tf.contrib.timeseries.RandomWindowInputFn會在reader的所有數據中,隨機選取窗口長度為window_size的序列,幷包裝成batch_size大小的batch數據。換句話說,一個batch內共有batch_size個序列,每個序列的長度為window_size。

以batch_size=2, window_size=10為例,可以打印出一個batch的數據:

<code>

with

tf.Session() as sess:

batch_data

=

train_input_fn.create_batch()

coord

=

tf.train.Coordinator()

threads

=

tf.train.start_queue_runners(sess=sess, coord=coord)

one_batch

=

sess.run(batch_data[0])

coord.request_stop()

:

', one_batch)

/<code>

原先的數據長度為1000的時間序列(x=np.array(range(1000))),使用
tf.contrib.timeseries.RandomWindowInputFn,並指定window_size=10, batch_size=2的功能是在這長度為1000的時間序列中,隨機選取長度為10的序列,並在每個batch裡包含兩個這樣的序列。這也可以從打印出的數據中看出來。

使用
tf.contrib.timeseries.RandomWindowInputFn返回的train_input_fn可以進行訓練了。這是在TFTS中讀入Numpy數組時間序列的基本方式。下面介紹如何讀入CSV格式的數據。

從CSV文件中讀取時間序列

有時,時間序列數據是存在CSV文件中的。當然可以將其先讀入為Numpy數組,再使用之前的方法處理。更方便的做法是使用
tf.contrib.timeseries.CSVReader讀入。數據文件 period_trend.csv

假設CSV文件的時間序列數據的形式為:

<code>1,

-0

.6656603714

2,

-0

.1164380359

3,0

.7398626488

4,0

.7368633029

5,0

.2289480898

... /<code>

CSV文件的第一列為時間點,第二列為該時間點上觀察到的值。將其讀入的方法為:

<code>

import

tensorflow

as

tf csv_file_name =

'./period_trend.csv'

reader = tf.contrib.timeseries.CSVReader(csv_file_name) /<code>

實際讀入的代碼只有一行,直接使用函數
tf.contrib.timeseries.CSVReader得到了reader。將reader中所有數據打印出來的方法和之前是一樣的:

<code>

with

tf.Session() as sess:

data

=

reader.read_full()

coord

=

tf.train.Coordinator()

threads

=

tf.train.start_queue_runners(sess=sess, coord=coord)

print(sess.run(data))

coord.request_stop()

/<code>

從reader出發,建立batch數據的train_input_fn的方法也完全相同:

<code>

train_input_fn

= tf.contrib.timeseries.RandomWindowInputFn(reader, batch_size=

4

, window_size=

16

) /<code>

最後,可以打印出兩個batch的數據進行測試:

<code>

with

tf.Session()

as

sess:

data

= train_input_fn.create_batch() coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) batch1 = sess.run(

data

[

0

]) batch2 = sess.run(

data

[

0

]) coord.request_stop() print(

'batch1:'

, batch1) print(

'batch2:'

, batch2) /<code>

以上是TFTS庫中數據的讀取方式。總的來說, 從Numpy數組或者CSV文件出發構造一個reader,再利用reader生成batch數據。最後得到的Tensor為train_input_fn,這個train_input_fn會被當作訓練時的輸入 。

使用AR模型預測時間序列

AR模型的訓練

自迴歸模型(Autoregressive model,簡稱為AR模型)是統計學上處理時間序列模型的基本方法之一。TFTS中已經實現了一個自迴歸模型,我們只需要對其進行調用即可使用。

我們先定義出一個train_input_fn

<code>x = np.array(range(1000))
noise = np.random.uniform(-0.2, 0.2, 1000)
y = np.sin(np.pi * x / 100) + x / 200. + noise
plt.plot(x, y)
plt.savefig('timeseries_y.jpg')

data = {
    tf.contrib.timeseries.TrainEvalFeatures.TIMES: x,
    tf.contrib.timeseries.TrainEvalFeatures.VALUES: y,
}

reader = NumpyReader(data)

train_input_fn = tf.contrib.timeseries.RandomWindowInputFn(
    reader, batch_size=16, window_size=40)
/<code>

使用的時間序列數據如圖所示。

TensorFlow實現時間序列預測

定義AR模型:

<code>

ar

=

tf.contrib.timeseries.ARRegressor(

periodicities

=

200, input_window_size=30, output_window_size=10,

num_features

=

1,

loss

=

tf.contrib.timeseries.ARModel.NORMAL_LIKELIHOOD_LOSS)

/<code>

參數:

  • periodicities :序列的規律性週期。在定義數據時使用的語句是“y=np.sin(np.pi * x /100)+x /200.+noise”,因此週期為200
  • input_window_size :模型每次輸入的值
  • output_window_size :模型每次輸出的值
  • num_features :表示在一個時間點上觀察到的數的維度。這裡每一步都是一個單獨的值,所以num_features=1
  • loss :指定採取哪一種損失,NORMAL_LIKELIHOOD_LOSS 或 SQUARED_LOSS
  • model_dir :模型訓練好後保存的地址,如果不指定的話,會隨機分配一個臨時地址

input_window_size和output_window_size加起來必須等於train_input_fn中總的window_size。總的window_size為40, input_window_size為30,output_window_size為10;也是說,一個batch內每個序列的長度為40,其中前30個數被當作模型的輸入值,後面10個數為這些輸入對應的目標輸出值。

使用變量ar的train方法可以直接進行訓練:

<code>

ar

.train

(input_fn=train_input_fn, steps=

6000

) /<code>

AR模型的驗證和預測

TFTS中驗證(evaluation)的含義是:使用訓練好的模型在原先的訓練集上進行計算,由此可以觀察到模型的擬合效果,對應的程序段是:

<code>evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader)

keys of evaluation: [

'covariance'

,

'loss'

,

'mean'

,

'observed'

,

'start_tuple'

,

'times'

,

'global_step'

]

evaluation = ar.evaluate(input_fn=evaluation_input_fn, steps=1) /<code>

如果想要明白這裡的邏輯,首先要理解之前定義的AR模型:它每次都接收一個長度為30的輸入觀測序列,並輸出長度為10的預測序列。整個訓練集是一個長度為1000的序列,前30個數首先被當作“初始觀測序列”輸入到模型中,由此可以計算出下面10步的預測值。接著又會取30個數進行預測, 這30個數中有10個數是前一步的預測值 ,新得到的預測值又會變成下一步的輸入,依此類推。

最終得到970個預測值(970=1000-30,因為前30個數是沒辦法進行預測的)。970個預測值被記錄在 evaluation[‘mean'] 中。evaluation還有其他幾個鍵值,如 evaluation[‘times'] 表示evaluation[‘mean']對應的時間點, evaluation[‘loss'] 表示總的損失等等。

evaluation[‘start_tuple']會被用於之後的預測中,它相當於最後30步的輸出值和對應的時間點。以此為起點,可以對1000步以後的值進行預測,對應的代碼為:

<code>  = 

tuple(ar.predict(

input_fn

=

tf.contrib.timeseries.predict_continuation_input_fn(

steps=250)))

/<code>

這裡的代碼在1000步之後又向後預測了250個時間點。對應的值保存在predictions[‘mean']中。可以把觀測到的值、模型擬合的值、預測值用下面的代碼畫出來:

<code>

plt

.figure

(figsize=(

15

,

5

))

plt

.plot

(data[

'times'

].reshape(-

1

), data[

'values'

].reshape(-

1

), label=

'origin'

)

plt

.plot

(evaluation[

'times'

].reshape(-

1

), evaluation[

'mean'

].reshape(-

1

), label=

'evaluation'

)

plt

.plot

(predictions[

'times'

].reshape(-

1

), predictions[

'mean'

].reshape(-

1

), label=

'prediction'

)

plt

.xlabel

(

'time_step'

)

plt

.ylabel

(

'values'

)

plt

.legend

(loc=

4

)

plt

.savefig

(

'predict_result.jpg'

) /<code>
TensorFlow實現時間序列預測

前1000步模型原始觀測值的曲線和模型擬合值非常接近,說明模型擬合得已經比較好了,1000步之後的預測也合情合理。

<code># coding: utf-

8

from __future__

import

print_function

import

numpy

as

np

import

matplotlib matplotlib.use(

'agg'

)

import

matplotlib.pyplot

as

plt

import

tensorflow

as

tf from tensorflow.contrib.timeseries.python.timeseries

import

NumpyReader def main(_): x = np.array(range(

1000

)) noise = np.random.uniform(-

0.2

,

0.2

,

1000

) y = np.sin(np.pi * x /

100

) + x /

200

. + noise plt.plot(x, y) plt.savefig(

'timeseries_y.jpg'

)

data

= { tf.contrib.timeseries.TrainEvalFeatures.TIMES: x, tf.contrib.timeseries.TrainEvalFeatures.VALUES: y, } reader = NumpyReader(

data

) train_input_fn = tf.contrib.timeseries.RandomWindowInputFn( reader, batch_size=

16

, window_size=

40

) ar = tf.contrib.timeseries.ARRegressor( periodicities=

200

, input_window_size=

30

, output_window_size=

10

, num_features=

1

, loss=tf.contrib.timeseries.ARModel.NORMAL_LIKELIHOOD_LOSS) ar.train(input_fn=train_input_fn, steps=

6000

) evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader) # keys of evaluation: [

'covariance'

,

'loss'

,

'mean'

,

'observed'

,

'start_tuple'

,

'times'

,

'global_step'

] evaluation = ar.evaluate(input_fn=evaluation_input_fn, steps=

1

) (predictions,) = tuple(ar.predict( input_fn=tf.contrib.timeseries.predict_continuation_input_fn( evaluation, steps=

250

))) plt.figure(figsize=(

15

,

5

)) plt.plot(

data

[

'times'

].reshape(-

1

),

data

[

'values'

].reshape(-

1

), label=

'origin'

) plt.plot(evaluation[

'times'

].reshape(-

1

), evaluation[

'mean'

].reshape(-

1

), label=

'evaluation'

) plt.plot(predictions[

'times'

].reshape(-

1

), predictions[

'mean'

].reshape(-

1

), label=

'prediction'

) plt.xlabel(

'time_step'

) plt.ylabel(

'values'

) plt.legend(loc=

4

) plt.savefig(

'predict_result.jpg'

)

if

__name__ ==

'__main__'

: tf.logging.set_verbosity(tf.logging.INFO) tf.app.run() 示例完整代碼/<code>

使用LSTM模型預測時間序列

為了使用LSTM模型,需要先使用TFTS庫對其進行定義。

單變量時間序列預測

同樣,用函數加噪聲的方法模擬生成時間序列數據:

<code>x = np.array(range(

1000

)) noise = np.

random

.uniform(

-0.2

,

0.2

,

1000

) y = np.

sin

(np.

pi

* x /

50

) + np.

cos

(np.

pi

* x /

50

) + np.

sin

(np.

pi

* x /

25

) + noise data = { tf.contrib.timeseries.TrainEvalFeatures.TIMES: x, tf.contrib.timeseries.TrainEvalFeatures.VALUES: y, } reader = NumpyReader(data) train_input_fn = tf.contrib.timeseries.RandomWindowInputFn( reader, batch_size=

4

, window_size=

100

) /<code>

得到y和x後,使用NumpyReader讀入為Tensor形式,接著用
tf.contrib.timeseries.RandomWindowInputFn將其變為batch訓練數據。一個batch中有4個隨機選取的序列,每個序列的長度為100。

接下來定義一個LSTM模型:

<code>

estimator

=

ts_estimators.TimeSeriesRegressor(

model

=

_LSTMModel(num_features=1, num_units=128),

optimizer

=

tf.train.AdamOptimizer(0.001))

/<code>

num_features=1表示單變量時間序列,即每個時間點上觀察到的量只是一個單獨的數值,num_units=128表示使用隱層為128大小的LSTM模型。

訓練、驗證和預測的方法都和之前類似。在訓練時,在已有的1000步的觀察量的基礎上向後預測200步:

<code>estimator.train(input_fn=train_input_fn, steps=2000)     
evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader)   
evaluation = estimator.evaluate(input_fn=evaluation_input_fn, steps=1)   
 
(predictions,) = tuple(estimator.predict(
    input_fn=tf.contrib.timeseries.predict_continuation_input_fn(
        evaluation, steps=200)))
/<code>

將驗證、預測的結果取出並畫成示意圖,畫出的圖像會保存成“predict_result.jpg”文件:

<code>observed_times = evaluation[

"times"

][

0

] observed = evaluation[

"observed"

][

0, :, :

] evaluated_times = evaluation[

"times"

][

0

] evaluated = evaluation[

"mean"

][

0

] predicted_times = predictions['times'] predicted = predictions["mean"] plt.figure(figsize=(15, 5)) plt.axvline(999, linestyle="dotted", linewidth=4, color='r') observed

_lines = plt.plot(observed_

times, observed, label="observation", color="k") evaluated

_lines = plt.plot(evaluated_

times, evaluated, label="evaluation", color="g") predicted

_lines = plt.plot(predicted_

times, predicted, label="prediction", color="r") plt.legend(handles=[observed

_lines[0], evaluated_

lines[0], predicted_lines[0]],

loc="upper left")

plt.savefig('predict_result.jpg') /<code>

預測效果如圖15-4所示,橫座標為時間軸,前1000步是訓練數據,1000~1200步是模型預測的值。

TensorFlow實現時間序列預測

<code>

import

numpy

as

np

import

tensorflow

as

tf

from

tensorflow.contrib.timeseries.python.timeseries

import

estimators

as

ts_estimators

from

tensorflow.contrib.timeseries.python.timeseries

import

model

as

ts_model

from

tensorflow.contrib.timeseries.python.timeseries

import

NumpyReader

import

matplotlib matplotlib.use(

"agg"

)

import

matplotlib.pyplot

as

plt

class

_LSTMModel

(ts_model.SequentialTimeSeriesModel)

:

"""A time series model-building example using an RNNCell."""

def

__init__

(self, num_units, num_features, dtype=tf.float32)

:

"""Initialize/configure the model object. Note that we do not start graph building here. Rather, this object is a configurable factory for TensorFlow graphs which are run by an Estimator. Args: num_units: The number of units in the model's LSTMCell. num_features: The dimensionality of the time series (features per timestep). dtype: The floating point data type to use. """

super(_LSTMModel, self).__init__( train_output_names=[

"mean"

], predict_output_names=[

"mean"

], num_features=num_features, dtype=dtype) self._num_units = num_units self._lstm_cell =

None

self._lstm_cell_run =

None

self._predict_from_lstm_output =

None

def

initialize_graph

(self, input_statistics)

:

"""Save templates for components, which can then be used repeatedly. This method is called every time a new graph is created. It's safe to start adding ops to the current default graph here, but the graph should be constructed from scratch. Args: input_statistics: A math_utils.InputStatistics object. """

super(_LSTMModel, self).initialize_graph(input_statistics=input_statistics) self._lstm_cell = tf.nn.rnn_cell.LSTMCell(num_units=self._num_units) self._lstm_cell_run = tf.make_template( name_=

"lstm_cell"

, func_=self._lstm_cell, create_scope_now_=

True

) self._predict_from_lstm_output = tf.make_template( name_=

"predict_from_lstm_output"

, func_=

lambda

inputs: tf.layers.dense(inputs=inputs, units=self.num_features), create_scope_now_=

True

)

def

get_start_state

(self)

:

"""Return initial state for the time series model."""

return

( tf.zeros([], dtype=tf.int64), tf.zeros([self.num_features], dtype=self.dtype), [tf.squeeze(state_element, axis=

0

)

for

state_element

in

self._lstm_cell.zero_state(batch_size=

1

, dtype=self.dtype)])

def

_transform

(self, data)

:

"""Normalize data based on input statistics to encourage stable training."""

mean, variance = self._input_statistics.overall_feature_moments

return

(data - mean) / variance

def

_de_transform

(self, data)

:

"""Transform data back to the input scale."""

mean, variance = self._input_statistics.overall_feature_moments

return

data * variance + mean

def

_filtering_step

(self, current_times, current_values, state, predictions)

:

"""Update model state based on observations. Note that we don't do much here aside from computing a loss. In this case it's easier to update the RNN state in _prediction_step, since that covers running the RNN both on observations (from this method) and our own predictions. This distinction can be important for probabilistic models, where repeatedly predicting without filtering should lead to low-confidence predictions. Args: current_times: A [batch size] integer Tensor. current_values: A [batch size, self.num_features] floating point Tensor with new observations. state: The model's state tuple. predictions: The output of the previous `_prediction_step`. Returns: A tuple of new state and a predictions dictionary updated to include a loss (note that we could also return other measures of goodness of fit, although only "loss" will be optimized). """

state_from_time, prediction, lstm_state = state

with

tf.control_dependencies( [tf.assert_equal(current_times, state_from_time)]): transformed_values = self._transform(current_values) predictions[

"loss"

] = tf.reduce_mean( (prediction - transformed_values) **

2

, axis=

-1

) new_state_tuple = (current_times, transformed_values, lstm_state)

return

(new_state_tuple, predictions)

def

_prediction_step

(self, current_times, state)

:

"""Advance the RNN state using a previous observation or prediction."""

_, previous_observation_or_prediction, lstm_state = state lstm_output, new_lstm_state = self._lstm_cell_run( inputs=previous_observation_or_prediction, state=lstm_state) next_prediction = self._predict_from_lstm_output(lstm_output) new_state_tuple = (current_times, next_prediction, new_lstm_state)

return

new_state_tuple, {

"mean"

: self._de_transform(next_prediction)}

def

_imputation_step

(self, current_times, state)

:

"""Advance model state across a gap."""

return

state

def

_exogenous_input_step

( self, current_times, current_exogenous_regressors, state)

:

"""Update model state based on exogenous regressors."""

raise

NotImplementedError(

"Exogenous inputs are not implemented for this example."

)

if

__name__ ==

'__main__'

: tf.logging.set_verbosity(tf.logging.INFO) x = np.array(range(

1000

)) noise = np.random.uniform(

-0.2

,

0.2

,

1000

) y = np.sin(np.pi * x /

50

) + np.cos(np.pi * x /

50

) + np.sin(np.pi * x /

25

) + noise data = { tf.contrib.timeseries.TrainEvalFeatures.TIMES: x, tf.contrib.timeseries.TrainEvalFeatures.VALUES: y, } reader = NumpyReader(data) train_input_fn = tf.contrib.timeseries.RandomWindowInputFn( reader, batch_size=

4

, window_size=

100

) estimator = ts_estimators.TimeSeriesRegressor( model=_LSTMModel(num_features=

1

, num_units=

128

), optimizer=tf.train.AdamOptimizer(

0.001

)) estimator.train(input_fn=train_input_fn, steps=

2000

) evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader) evaluation = estimator.evaluate(input_fn=evaluation_input_fn, steps=

1

) (predictions,) = tuple(estimator.predict( input_fn=tf.contrib.timeseries.predict_continuation_input_fn( evaluation, steps=

200

))) observed_times = evaluation[

"times"

][

0

] observed = evaluation[

"observed"

][

0

, :, :] evaluated_times = evaluation[

"times"

][

0

] evaluated = evaluation[

"mean"

][

0

] predicted_times = predictions[

'times'

] predicted = predictions[

"mean"

] plt.figure(figsize=(

15

,

5

)) plt.axvline(

999

, linestyle=

"dotted"

, linewidth=

4

, color=

'r'

) observed_lines = plt.plot(observed_times, observed, label=

"observation"

, color=

"k"

) evaluated_lines = plt.plot(evaluated_times, evaluated, label=

"evaluation"

, color=

"g"

) predicted_lines = plt.plot(predicted_times, predicted, label=

"prediction"

, color=

"r"

) plt.legend(handles=[observed_lines[

0

], evaluated_lines[

0

], predicted_lines[

0

]], loc=

"upper left"

) plt.savefig(

'predict_result.jpg'

) LSTM單變量完整代碼/<code>

多變量時間序列預測

所謂多變量時間序列,是指在每個時間點上的觀測量有多個值。在 multivariate_periods.csv 文件中,保存了一個多變量時間序列的數據:

<code>

0

0.926906299771

1.99107237682

2.56546245685

3.07914768197

4.04839057867

1

0.108010001864

1.41645361423

2.1686839775

2.94963962176

4.1263503303

2

-0.800567600028

1.0172132907

1.96434754116

2.99885333086

4.04300485864

3

0.0607042871898

0.719540073421

1.9765012584

2.89265588817

4.0951014426

...

99

0.987764008058

1.85581989607

2.84685706149

2.94760204892

6.0212151724

/<code>

這個CSV文件的第一列是觀察時間點,除此之外,每一行還有5個數,表示在這個時間點上觀察到的數據。換句話說,時間序列上每一步都是一個5維的向量。

使用TFTS讀入該CSV文件的方法為:

<code>csv_file_name = path.join(

"./data/multivariate_periods.csv"

) reader = tf.contrib.timeseries.CSVReader( csv_file_name, column_names=((tf.contrib.timeseries.TrainEvalFeatures.TIMES,) + (tf.contrib.timeseries.TrainEvalFeatures.VALUES,) * 5)) train_input_fn = tf.contrib.timeseries.RandomWindowInputFn( reader, batch_size=4, window_size=32) /<code>

與之前的讀入相比,唯一的區別是column_names參數。它告訴TFTS在CSV文件中,哪些列表示時間,哪些列表示觀測量。

接下來定義LSTM模型:

<code>

estimator

=

ts_estimators.TimeSeriesRegressor(

model

=

_LSTMModel(num_features=5, num_units=128),

optimizer

=

tf.train.AdamOptimizer(0.001))

/<code>

區別在於使用num_features=5而不是1,原因在於每個時間點上的觀測量是一個5維向量。

訓練、驗證、預測及畫圖的代碼與之前比較類似,最後的運行結果圖所示

TensorFlow實現時間序列預測

使用LSTM預測多變量時間序列

前100步是訓練數據,一條線代表觀測量在一個維度上的取值。100步之後為預測值。

<code>

from

os

import

path

import

tensorflow

as

tf

from

tensorflow.contrib.timeseries.python.timeseries

import

estimators

as

ts_estimators

from

tensorflow.contrib.timeseries.python.timeseries

import

model

as

ts_model

import

matplotlib matplotlib.use(

"agg"

)

import

matplotlib.pyplot

as

plt

class

_LSTMModel

(ts_model.SequentialTimeSeriesModel)

:

"""A time series model-building example using an RNNCell."""

def

__init__

(self, num_units, num_features, dtype=tf.float32)

:

"""Initialize/configure the model object. Note that we do not start graph building here. Rather, this object is a configurable factory for TensorFlow graphs which are run by an Estimator. Args: num_units: The number of units in the model's LSTMCell. num_features: The dimensionality of the time series (features per timestep). dtype: The floating point data type to use. """

super(_LSTMModel, self).__init__( train_output_names=[

"mean"

], predict_output_names=[

"mean"

], num_features=num_features, dtype=dtype) self._num_units = num_units self._lstm_cell =

None

self._lstm_cell_run =

None

self._predict_from_lstm_output =

None

def

initialize_graph

(self, input_statistics)

:

"""Save templates for components, which can then be used repeatedly. This method is called every time a new graph is created. It's safe to start adding ops to the current default graph here, but the graph should be constructed from scratch. Args: input_statistics: A math_utils.InputStatistics object. """

super(_LSTMModel, self).initialize_graph(input_statistics=input_statistics) self._lstm_cell = tf.nn.rnn_cell.LSTMCell(num_units=self._num_units) self._lstm_cell_run = tf.make_template( name_=

"lstm_cell"

, func_=self._lstm_cell, create_scope_now_=

True

) self._predict_from_lstm_output = tf.make_template( name_=

"predict_from_lstm_output"

, func_=

lambda

inputs: tf.layers.dense(inputs=inputs, units=self.num_features), create_scope_now_=

True

)

def

get_start_state

(self)

:

"""Return initial state for the time series model."""

return

( tf.zeros([], dtype=tf.int64), tf.zeros([self.num_features], dtype=self.dtype), [tf.squeeze(state_element, axis=

0

)

for

state_element

in

self._lstm_cell.zero_state(batch_size=

1

, dtype=self.dtype)])

def

_transform

(self, data)

:

"""Normalize data based on input statistics to encourage stable training."""

mean, variance = self._input_statistics.overall_feature_moments

return

(data - mean) / variance

def

_de_transform

(self, data)

:

"""Transform data back to the input scale."""

mean, variance = self._input_statistics.overall_feature_moments

return

data * variance + mean

def

_filtering_step

(self, current_times, current_values, state, predictions)

:

"""Update model state based on observations. Note that we don't do much here aside from computing a loss. In this case it's easier to update the RNN state in _prediction_step, since that covers running the RNN both on observations (from this method) and our own predictions. This distinction can be important for probabilistic models, where repeatedly predicting without filtering should lead to low-confidence predictions. Args: current_times: A [batch size] integer Tensor. current_values: A [batch size, self.num_features] floating point Tensor with new observations. state: The model's state tuple. predictions: The output of the previous `_prediction_step`. Returns: A tuple of new state and a predictions dictionary updated to include a loss (note that we could also return other measures of goodness of fit, although only "loss" will be optimized). """

state_from_time, prediction, lstm_state = state

with

tf.control_dependencies( [tf.assert_equal(current_times, state_from_time)]): transformed_values = self._transform(current_values) predictions[

"loss"

] = tf.reduce_mean( (prediction - transformed_values) **

2

, axis=

-1

) new_state_tuple = (current_times, transformed_values, lstm_state)

return

(new_state_tuple, predictions)

def

_prediction_step

(self, current_times, state)

:

"""Advance the RNN state using a previous observation or prediction."""

_, previous_observation_or_prediction, lstm_state = state lstm_output, new_lstm_state = self._lstm_cell_run( inputs=previous_observation_or_prediction, state=lstm_state) next_prediction = self._predict_from_lstm_output(lstm_output) new_state_tuple = (current_times, next_prediction, new_lstm_state)

return

new_state_tuple, {

"mean"

: self._de_transform(next_prediction)}

def

_imputation_step

(self, current_times, state)

:

"""Advance model state across a gap."""

return

state

def

_exogenous_input_step

( self, current_times, current_exogenous_regressors, state)

:

"""Update model state based on exogenous regressors."""

raise

NotImplementedError(

"Exogenous inputs are not implemented for this example."

)

if

__name__ ==

'__main__'

: tf.logging.set_verbosity(tf.logging.INFO) csv_file_name = path.join(

"./data/multivariate_periods.csv"

) reader = tf.contrib.timeseries.CSVReader( csv_file_name, column_names=((tf.contrib.timeseries.TrainEvalFeatures.TIMES,) + (tf.contrib.timeseries.TrainEvalFeatures.VALUES,) *

5

)) train_input_fn = tf.contrib.timeseries.RandomWindowInputFn( reader, batch_size=

4

, window_size=

32

) estimator = ts_estimators.TimeSeriesRegressor( model=_LSTMModel(num_features=

5

, num_units=

128

), optimizer=tf.train.AdamOptimizer(

0.001

)) estimator.train(input_fn=train_input_fn, steps=

200

) evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader) evaluation = estimator.evaluate(input_fn=evaluation_input_fn, steps=

1

) (predictions,) = tuple(estimator.predict( input_fn=tf.contrib.timeseries.predict_continuation_input_fn( evaluation, steps=

100

))) observed_times = evaluation[

"times"

][

0

] observed = evaluation[

"observed"

][

0

, :, :] evaluated_times = evaluation[

"times"

][

0

] evaluated = evaluation[

"mean"

][

0

] predicted_times = predictions[

'times'

] predicted = predictions[

"mean"

] plt.figure(figsize=(

15

,

5

)) plt.axvline(

99

, linestyle=

"dotted"

, linewidth=

4

, color=

'r'

) observed_lines = plt.plot(observed_times, observed, label=

"observation"

, color=

"k"

) evaluated_lines = plt.plot(evaluated_times, evaluated, label=

"evaluation"

, color=

"g"

) predicted_lines = plt.plot(predicted_times, predicted, label=

"prediction"

, color=

"r"

) plt.legend(handles=[observed_lines[

0

], evaluated_lines[

0

], predicted_lines[

0

]], loc=

"upper left"

) plt.savefig(

'predict_result.jpg'

) LSTM多變量完整代碼/<code>


分享到:


相關文章: