Demo entry 6644131

main

   

Submitted by anonymous on Oct 01, 2017 at 22:44
Language: Python 3. Code size: 8.8 kB.

import datetime
import numpy as np

# List of assignment states
states_list = ['a,b,c', 'a,b', 'a,c', 'b,c', '(a,b),c', '(b,a),c', '(a,c),b', '(c,a),b', '(b,c),a', '(c,b),a', '(a,b)',
               '(b,a)', '(a,c)', '(c,a)', '(c,b)', '(b,c)', '(a,b,c)', '(a,c,b)', '(b,a,c)', '(c,b,a)', '(c,a,b)', '(b,c,a)']
# List of assignment actions
actions = ['ga', 'gb', 'gc', 'at', 'bt', 'ct', 'ab', 'ac', 'ba', 'bc', 'ca', 'cb']
goal_state = '(a,c,b)'

gamma = 0.9
eps = 0.000001

value_iteration = True
policy_iteration = True

# Initializing the dictionary (dict) with zero values for all states (s) and actions (a) (S x A x S)
def dict_propositions(dict, s, a):
    for s_from in states_list:
        dict[s_from] = {}
        for a in actions:
            dict[s_from][a] = {}
            for s_to in states_list:
                dict[s_from][a][s_to] = 0


# To get transition probability from s_from to s_to while executing action a (S x A  S --> R)
def t(s_from, a, s_to):
    s_from = s_from.replace(' ', '')
    a = a.replace(' ', '')
    s_to = s_to.replace(' ', '')
    return t_dict[s_from][a][s_to]


# Setting the transition probability from s_from to s_to while executing action a to value
def set_t(s_from, a, s_to, value):
    s_from = s_from.replace(' ', '')
    a = a.replace(' ', '')
    s_to = s_to.replace(' ', '')
    t_dict[s_from][a][s_to] = value


# Computation of the expected reward for each state-action pair
#----- EXPLAIN IN REPORT ------#
def exp_r(s, a):
    expected_r = 0
    for s_to, p in t_dict[s][a].iteritems():
        if s_to == goal_state:
            expected_r += p * 100
        elif p < 0.5:
            expected_r += p * -10
        else:
            expected_r += p * -1
    return expected_r


def set_data():
    set_t('a,b,c', 'ga', 'b,c', 0.8)
    set_t('a,b,c', 'ga', 'a,c', 0.1)
    set_t('a,b,c', 'ga', 'a,b', 0.1)
    set_t('a,b,c', 'gb', 'b,c', 0.1)
    set_t('a,b,c', 'gb', 'a,c', 0.8)
    set_t('a,b,c', 'gb', 'a,b', 0.1)
    set_t('a,b,c', 'gc', 'b,c', 0.1)
    set_t('a,b,c', 'gc', 'a,c', 0.1)
    set_t('a,b,c', 'gc', 'a,b', 0.8)

    set_t('a,b', 'ct', 'a,b,c', 1.0)
    set_t('a,b', 'cb', '(b,c),a', 0.9)
    set_t('a,b', 'cb', '(a,c),b', 0.1)
    set_t('a,b', 'ca', '(b,c),a', 0.1)
    set_t('a,b', 'ca', '(a,c),b', 0.9)

    set_t('a,c', 'bt', 'a,b,c', 1.0)
    set_t('a,c', 'ba', '(a,b),c', 0.9)
    set_t('a,c', 'ba', '(c,b),a', 0.1)
    set_t('a,c', 'bc', '(a,b),c', 0.1)
    set_t('a,c', 'bc', '(c,b),a', 0.9)

    set_t('b,c', 'at', 'a,b,c', 1.0)
    set_t('b,c', 'ab', '(c,a),b', 0.1)
    set_t('b,c', 'ab', '(b,a),c', 0.9)
    set_t('b,c', 'ac', '(c,a),b', 0.9)
    set_t('b,c', 'ac', '(b,a),c', 0.1)

    set_t('(a,b),c', 'gb', 'a,c', 0.9)
    set_t('(a,b),c', 'gb', '(a,b)', 0.1)
    set_t('(a,b),c', 'gc', 'a,c', 0.1)
    set_t('(a,b),c', 'gc', '(a,b)', 0.9)

    set_t('(b,a),c', 'ga', 'b,c', 0.9)
    set_t('(b,a),c', 'ga', '(b,a)', 0.1)
    set_t('(b,a),c', 'gc', 'b,c', 0.1)
    set_t('(b,a),c', 'gc', '(b,a)', 0.9)

    set_t('(a,c),b', 'gb', '(a,c)', 0.9)
    set_t('(a,c),b', 'gb', 'a,b', 0.1)
    set_t('(a,c),b', 'gc', '(a,c)', 0.1)
    set_t('(a,c),b', 'gc', 'a,b', 0.9)

    set_t('(c,a),b', 'gb', '(c,a)', 0.9)
    set_t('(c,a),b', 'gb', 'b,c', 0.1)
    set_t('(c,a),b', 'ga', 'b,c', 0.9)
    set_t('(c,a),b', 'ga', '(c,a)', 0.1)

    set_t('(b,c),a', 'ga', '(b,c)', 0.9)
    set_t('(b,c),a', 'ga', 'a,b', 0.1)
    set_t('(b,c),a', 'gc', '(b,c)', 0.1)
    set_t('(b,c),a', 'gc', 'a,b', 0.9)

    set_t('(c,b),a', 'ga', '(c,b)', 0.9)
    set_t('(c,b),a', 'ga', 'a,c', 0.1)
    set_t('(c,b),a', 'gb', 'a,c', 0.9)
    set_t('(c,b),a', 'gb', '(c,b)', 0.1)

    set_t('(a,b)', 'ct', '(a,b),c', 1)
    set_t('(a,b)', 'cb', '(a,b,c)', 1)

    set_t('(b,a)', 'ct', '(b,a),c', 1)
    set_t('(b,a)', 'ca', '(b,a,c)', 1)

    set_t('(a,c)', 'bt', '(a,c),b', 1)
    set_t('(a,c)', 'bc', '(a,c,b)', 1)

    set_t('(c,a)', 'bt', '(c,a),b', 1)
    set_t('(c,a)', 'ba', '(c,a,b)', 1)

    set_t('(c,b)', 'at', '(c,b),a', 1)
    set_t('(c,b)', 'ab', '(c,b,a)', 1)

    set_t('(b,c)', 'at', '(b,c),a', 1)
    set_t('(b,c)', 'ac', '(b,c,a)', 1)

    set_t('(a,b,c)', 'gc', '(a,b)', 1)
    set_t('(b,a,c)', 'gc', '(b,a)', 1)
    set_t('(c,b,a)', 'ga', '(c,b)', 1)
    set_t('(c,a,b)', 'gb', '(c,a)', 1)
    set_t('(b,c,a)', 'ga', '(b,c)', 1)

    return t_dict


# ------------------------------------------------------------------------------------------

t_dict = {}
U_opt = {}
Pi = {}

dict_propositions(t_dict, states_list, actions)
set_data()

# Initialize all utilities to 0 and create an empty policy dictionary
for s_from in states_list:
    U_opt[s_from] = 0
    Pi[s_from] = ''

iter_counter = 0
start_time = datetime.datetime.now()

# value iteration algorithm
while value_iteration:

    iter_counter += 1
    delta = 0

    # Copy of optimal utilities in the previous iteration
    U_i = U_opt.copy()
    for s in states_list:

        # For every state, initialize an empty action and set the max_value to -inf
        max_action = ''
        max_value = -float('inf')

        # Find the action returning the highest expected utility when leaving state s
        for a in actions:

            # Compute the expected utility of executing some action a
            sum = 0
            for s1 in states_list:
                sum += t(s, a, s1) * U_i[s1]

            # If a higher utility if found then the one stored for state s, update the optimal utility and corresponding action
            if (exp_r(s, a) + gamma * sum) > max_value:
                max_value = exp_r(s, a) + gamma * sum
                max_action = a

        # If the maximum utility found in the current iteration exceeds the stored value, update the utility and policy
        if max_value > U_opt[s]:
            U_opt[s] = max_value
            Pi[s] = max_action

        # Update delta to the maximum distance between the new and old utility over all states
        if abs(U_opt[s] - U_i[s]) > delta:
            delta = abs(U_opt[s] - U_i[s])

    # Stop the algorithm if the maximum utility distance between two successive iterations is small enough
    # Hence, delta is least as small as epsilon, or simply zero
    if delta < eps or delta == 0:
        break

end_time = datetime.datetime.now()

print '\nVALUE ITERATION'
print '\n' + str(iter_counter) + ' steps where required in ' + str((end_time - start_time).total_seconds()) + ' seconds'
print "\nOptimal utility: ", U_opt
print "Optimal policy: ", Pi

# policy iteration
pi = {}
# Initializing all optimal utilities to 0 and all actions to the first action in the actions_list (at)
for s in states_list:
    U_opt[s] = 0
    Pi[s] = actions[0]

# Computing the optimal utilities for some policy Pi and updating U_opt
def opt_u():
    a = []
    b = []
    for s in states_list:
        b.append(exp_r(s, Pi[s]))
        row = []
        for s1 in states_list:
            if s == s1:
                row.append(1 - (gamma * t(s, Pi[s], s1)))
            else:
                row.append(-gamma * t(s, Pi[s], s1))

        a.append(row)

    #Solve and create dictionary
    solution = list(np.linalg.solve(np.array(a), np.array(b)))
    for s in states_list:
        U_opt[s] = solution[states_list.index(s)]

iter_counter = 0
start_time = datetime.datetime.now()

# Policy iteration algorithm
while policy_iteration:

    iter_counter += 1
    opt_u()                 # Update current optimal utilities
    new_policy = dict(Pi)   # Make copy of current policy

    for s in states_list:
        for a in actions:

            if a != Pi[s]:  # no need to compare to the action in your current policy
                sum = 0
                for s1 in states_list:
                    sum += t(s, a, s1) * U_opt[s1]
                other_action = exp_r(s, a) + (gamma * sum)

                if other_action > U_opt[s]:
                    # If the utility is higher for some action, replace it in the copy
                    # Don't break here because we want to find all the changes in pi before comparing
                    # if you set pi[s] = a, you aren't able to compare if the current policy (Pi) has changed
                    new_policy[s] = a

    if Pi == new_policy:
        Pi[goal_state] = ''
        break
    else:
        Pi = dict(new_policy)

end_time = datetime.datetime.now()

print '\nPOLICY ITERATION'
print '\n' + str(iter_counter) + ' steps where required in ' + str((end_time - start_time).total_seconds()) + ' seconds'
print "\nOptimal utility: ", U_opt
print "Optimal policy: ", Pi

This snippet took 0.02 seconds to highlight.

Back to the Entry List or Home.

Delete this entry (admin only).