forked from Cernewein/heating-RL-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
127 lines (104 loc) · 4.33 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import torch
from vars import *
from collections import namedtuple
import random
from environment import Building
import numpy as np
import copy
# Taken from
# https://github.com/pytorch/tutorials/blob/master/intermediate_source/reinforcement_q_learning.py
Transition = namedtuple('Transition',
('state', 'action', 'next_state', 'reward'))
import os
import pickle as pkl
class Normalizer():
"""
Normalizes the input data by computing an online variance and mean
"""
def __init__(self, num_inputs):
self.n = torch.zeros(num_inputs).to(device)
self.mean = torch.zeros(num_inputs).to(device)
self.mean_diff = torch.zeros(num_inputs).to(device)
self.var = torch.zeros(num_inputs).to(device)
def observe(self, x):
self.n += 1.
last_mean = self.mean.clone()
self.mean += (x-self.mean)/self.n
self.mean_diff += (x-last_mean)*(x-self.mean)
self.var = torch.clamp(self.mean_diff/self.n, min=1e-2)
def normalize(self, inputs):
obs_std = torch.sqrt(self.var).to(device)
return (inputs - self.mean)/obs_std
class ReplayMemory(object):
"""
This class serves as storage capability for the replay memory. It stores the Transition tuple
(state, action, next_state, reward) that can later be used by a DQN agent for learning based on experience replay.
:param capacity: The size of the replay memory
:type capacity: Integer
"""
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
self.position = 0
def push(self, *args):
"""Saves a transition. (the transition tuple)"""
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.position] = Transition(*args)
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size):
"""
Randomly selects batch_size elements from the memory.
:param batch_size: The wanted batch size
:type batch_size: Integer
:return:
"""
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
class BasicController():
def __init__(self, number_time_steps, dynamic, model_name):
self.number_time_steps = number_time_steps
self.building = Building(dynamic, eval=True)
self.temperatures = []
self.costs = []
self.action = 0
self.model_name = model_name
def control(self):
"""
Represents a very basic control mechanism that is used as baseline for comparision. It heats until T=T_max
and then turns the heating off until T_min is reached
:param number_time_steps:
:return:
"""
for _ in range(self.number_time_steps):
if self.building.inside_temperature > T_MAX - 1 / TEMPERATURE_ROUNDING:
self.action = 0
elif self.building.inside_temperature < T_MIN + 1 / TEMPERATURE_ROUNDING:
self.action = 1
self.building.step(self.action)
self.temperatures.append(self.building.inside_temperature)
self.costs.append(self.action*NOMINAL_HEAT_PUMP_POWER*self.building.price/1e6*TIME_STEP_SIZE/3600)
with open(os.getcwd() + '/data/output/' + self.model_name + '_costs_basic.pkl', 'wb') as f:
pkl.dump(self.costs, f)
with open(os.getcwd() + '/data/output/' + self.model_name + '_temperatures_basic.pkl', 'wb') as f:
pkl.dump(self.temperatures, f)
class OUNoise:
"""Ornstein-Uhlenbeck process.
Taken from https://github.com/udacity/deep-reinforcement-learning/blob/master/ddpg-pendulum/ddpg_agent.py"""
def __init__(self, size, seed, mu=0., theta=0.15, sigma=0.2):
"""Initialize parameters and noise process."""
self.mu = mu * np.ones(size)
self.theta = theta
self.sigma = sigma
self.seed = random.seed(seed)
self.reset()
def reset(self):
"""Reset the internal state (= noise) to mean (mu)."""
self.state = copy.copy(self.mu)
def sample(self):
"""Update internal state and return it as a noise sample."""
x = self.state
dx = self.theta * (self.mu - x) + self.sigma * np.array([random.random() for i in range(len(x))])
self.state = x + dx
return self.state