Simple RNN時間序列預測

本文將介紹利用樸素的RNN模型進行時間序列預測

比方說現在我們有如下圖所示的一段正弦曲線,輸入紅色部分,通過訓練輸出下一段的值

Simple RNN時間序列預測

首先分析一下,假設我們一次輸入50個點,batch設為1,每個點就一個值,所以input的shape就是 [50, 1, 1] ,這裡我們換一種表示形式,把batch放在前面,那麼shape就是 [1, 50, 1] ,可以這麼理解這個shape,1條曲線,一共有50個點,每個點都是1個實數

<code>import numpy.random import randintimport numpy as npimport torchfrom torch import nn, optimfrom matplotlib import pyplot as pltstart = randint(3) # [0, 3)time_steps = np.linspace(start, start + 10, num_time_steps) # 返回num_time_steps個點data = np.sin(time_steps) # [50]data = data.reshape(num_time_steps, -1) # [50, 1]x = torch.tensor(data[:-1]).float().view(1, num_time_steps - 1, 1) # 0~48y = torch.tensor(data[1:]).float().view(1, num_time_steps - 1, 1) # 1~49/<code>

start 表示的含義從幾何上來說就是圖上紅色左邊框的對應的橫座標的值,因為我們要確定一個起點,從這個起點開始向後取50個點,如果每次這個起點都是相同的,就會被這個網絡記住

x 是50個數據點中的前49個,我們利用這49個點,每個點都向後預測一個單位的數據,得到^yy^,然後將^yy^與yy進行對比

接下來是構建網絡架構

<code>class Net(nn.Module):    def __init__(self):        super(Net, self).__init__()        self.rnn = nn.RNN(            input_size=input_size,            hidden_size=hidden_size,            num_layers=1,            batch_first=True,        )        self.linear = nn.Linear(hidden_size, output_size)    def forward(self, x, h0):        out, h0 = self.rnn(x, h0)        # [b, seq, h] => [seq, h]        out = out.view(-1, hidden_size)        out = self.linear(out) # [seq, h] => [seq, 1]        out = out.unsqueeze(dim=0) # => [1, seq, 1]        return out, h0/<code>

首先裡面是一個simple RNN,其中有個參數 batch_first ,因為我們數據傳入的格式是batch在前,所以要把這個參數設為True。RNN之後接了個Linear,將memory的size輸出為 output_size=1 方便進行比較,因為我們就只需要一個值

然後我們定義網絡Train的代碼

<code>model = Net()criterion = nn.MSELoss()optimizer = optim.Adam(model.parameters(), lr)h0 = torch.zeros(1, 1, hidden_size) # [b, 1, hidden_size]for iter in range(6000):    start = np.random.randint(3, size=1)[0]    time_steps = np.linspace(start, start + 10, num_time_steps)    data = np.sin(time_steps)    data = data.reshape(num_time_steps, 1)    x = torch.tensor(data[:-1]).float().view(1, num_time_steps - 1, 1)    y = torch.tensor(data[1:]).float().view(1, num_time_steps - 1, 1)    output, h0 = model(x, h0)    h0 = h0.detach()    loss = criterion(output, y)    model.zero_grad()    loss.backward()    optimizer.step()    if iter % 100 == 0:        print("Iteration: {} loss {}".format(iter, loss.item()))/<code>

最後是Predict的部分

<code>predictions = []input = x[:, 0, :]for _ in range(x.shape[1]):    input = input.view(1, 1, 1)    (pred, h0) = model(input, h0)    input = pred    predictions.append(pred.detach().numpy().ravel()[0])/<code>

假設 x 的shape是 [b, seq, 1] ,經過 x[:, 0, :] 之後就變成了 [b, 1] ,但其實前面說過了,batch值是1,所以input的shape就是 [1, 1] ,然後再展開成 [1, 1, 1] 是為了能匹配網絡的輸入維度

倒數第二行和第三行的代碼做的事情是,首先帶入第一個值,得到一個輸出 pred ,然後把 pred 作為下一次的輸入,又得到一個 pred ,如此循環往復,就把上一次的輸出,作為下一次的輸入

最後的輸出圖像如下所示

Simple RNN時間序列預測

完整代碼如下:

<code>from numpy.random import randintimport numpy as npimport torchimport torch.nn as nnimport torch.optim as optimfrom matplotlib import pyplot as pltnum_time_steps = 50input_size = 1hidden_size = 16output_size = 1lr=0.01class Net(nn.Module):    def __init__(self):        super(Net, self).__init__()        self.rnn = nn.RNN(            input_size=input_size,            hidden_size=hidden_size,            num_layers=1,            batch_first=True,        )        self.linear = nn.Linear(hidden_size, output_size)    def forward(self, x, h0):        out, h0 = self.rnn(x, h0)        # [b, seq, h]        out = out.view(-1, hidden_size)        out = self.linear(out)        out = out.unsqueeze(dim=0)        return out, h0model = Net()criterion = nn.MSELoss()optimizer = optim.Adam(model.parameters(), lr)h0 = torch.zeros(1, 1, hidden_size)for iter in range(6000):    start = randint(3)    time_steps = np.linspace(start, start + 10, num_time_steps)    data = np.sin(time_steps)    data = data.reshape(num_time_steps, 1)    x = torch.tensor(data[:-1]).float().view(1, num_time_steps - 1, 1)    y = torch.tensor(data[1:]).float().view(1, num_time_steps - 1, 1)    output, h0 = model(x, h0)    h0 = h0.detach()    loss = criterion(output, y)    model.zero_grad()    loss.backward()    optimizer.step()    if iter % 100 == 0:        print("Iteration: {} loss {}".format(iter, loss.item()))start = randint(3)time_steps = np.linspace(start, start + 10, num_time_steps)data = np.sin(time_steps)data = data.reshape(num_time_steps, 1)x = torch.tensor(data[:-1]).float().view(1, num_time_steps - 1, 1)y = torch.tensor(data[1:]).float().view(1, num_time_steps - 1, 1)predictions = []input = x[:, 0, :]for _ in range(x.shape[1]):    input = input.view(1, 1, 1)    (pred, h0) = model(input, h0)    input = pred    predictions.append(pred.detach().numpy().ravel()[0])x = x.data.numpy().ravel() # flatten操作y = y.data.numpy()plt.scatter(time_steps[:-1], x.ravel(), s=90)plt.plot(time_steps[:-1], x.ravel())plt.scatter(time_steps[1:], predictions)plt.show()/<code>


分享到:


相關文章: