Source code for PyTrans.UrbanNetworkAnalysis.Frank_Wolfe

import networkx as nx
import scipy.integrate as integrate 
from scipy.optimize import minimize_scalar
import matplotlib.pyplot as plt

from PyTrans.UrbanNetworkAnalysis import TransportationNetworks as tn

[docs]class Run: """ Class of implementing Frank-Wolfe algorithm for networks privided from Transportation Networks for Research Core Team (https://github.com/bstabler/TransportationNetworks) Parameters ---------- link_file : string file path of network file, which containing various link information trip_file : string file path of trip table. An Origin label and then Origin node number, followed by Destination node numders and OD flow node_file : string file path of node file, which containing coordinates information of nodes SO: boolean True if objective is to find system optimal solution, False if objective is to find user equilibrium Attributes ---------- graph: networkx DiGraph graph of links when completing the algorithm network: nested dictionary dictionary of links information and history of Frank-Wolfe algorithm implementation by iteration fwResult: dictionary dictionary of theta (optimal move size) and objective function value over iterations Example ------- A Quick example #Set the paths of Transportation Networks file >>> directory = ".\\Data\\TransportationNetworks\\SiouxFalls\\" >>> link_file = '{}SiouxFalls_net.tntp'.format(directory) >>> trip_file = '{}SiouxFalls_trips.tntp'.format(directory) >>> node_file = '{}SiouxFalls_node.tntp'.format(directory) >>> SO = False \n #Implement Frank-Wolfe algorithm >>> fw = Run(link_file, trip_file, node_file, SO) >>> fw.showODFlow() >>> fw.showODFlowMap() """ def __init__(self, link_file, trip_file, node_file, SO): self.SO = SO nw = tn.Network(link_file, trip_file, node_file, self.SO) ## initialization self.network = {(u,v): {'t0':d['object'].t0, 'alpha':d['object'].alpha, \ 'beta':d['object'].beta, 'capa':d['object'].capacity, 'flow':[], \ 'auxiliary':[], 'cost':[]} for (u, v, d) in nw.graph.edges(data=True)} self.fwResult = {'theta':[], 'z':[]} nw.all_or_nothing_assignment() nw.update_linkcost() for linkKey, linkVal in self.network.items(): linkVal['cost'].append(nw.graph[linkKey[0]][linkKey[1]]['weight']) linkVal['auxiliary'].append(nw.graph[linkKey[0]][linkKey[1]]['object'].vol) linkVal['flow'].append(nw.graph[linkKey[0]][linkKey[1]]['object'].vol) ## iterations iterNum=0 iteration = True while iteration: iterNum += 1 nw.all_or_nothing_assignment() nw.update_linkcost() for linkKey, linkVal in self.network.items(): linkVal['auxiliary'].append(nw.graph[linkKey[0]][linkKey[1]]['object'].vol) theta = self.lineSearch() self.fwResult['theta'].append(theta) for linkKey, linkVal in self.network.items(): aux = linkVal['auxiliary'][-1] flow = linkVal['flow'][-1] linkVal['flow'].append(flow + theta*(aux-flow)) nw.graph[linkKey[0]][linkKey[1]]['object'].vol = flow + theta * (aux - flow) nw.graph[linkKey[0]][linkKey[1]]['object'].flow = flow + theta * (aux - flow) nw.update_linkcost() z=0 for linkKey, linkVal in self.network.items(): linkVal['cost'].append(nw.graph[linkKey[0]][linkKey[1]]['weight']) totalcost = nw.graph[linkKey[0]][linkKey[1]]['object'].get_objective_function() z+=totalcost self.fwResult['z'].append(z) if iterNum == 1: iteration = True else: if abs(self.fwResult['z'][-2] - self.fwResult['z'][-1]) <= 0.001 or iterNum==3000: iteration = False self.graph = nw.graph
[docs] def BPR(self, t0, xa, ca, alpha, beta): """ Method for calculating link travel time based on BPR function Parameters ---------- t0: float link travel time under free flow speed xa: float traffic link flow ca: float capacity of link alpha: float first BPR function parameter, usually 0.15 beta: float second BPR function parameter, usually 4.0 Return ------ ta: float link travel time under the current traffic flow """ ta = t0*(1+alpha*(xa/ca)**beta) return ta
[docs] def calculateZ(self, theta): """ Method for calculating objective function value Parameters ---------- theta: float optimal move size Return ------ float objective function value """ z = 0 for linkKey, linkVal in self.network.items(): t0 = linkVal['t0'] ca = linkVal['capa'] beta = linkVal['beta'] alpha = linkVal['alpha'] aux = linkVal['auxiliary'][-1] flow = linkVal['flow'][-1] if SO == False: z += integrate.quad(lambda x: self.BPR(t0, x, ca, alpha, beta), 0, flow+theta*(aux-flow))[0] elif SO == True: z += list(map(lambda x : x * self.BPR(t0, x, ca, alpha, beta), [flow+theta*(aux-flow)]))[0] return z
[docs] def lineSearch(self): """ Method for estimating theta (optimal move size) Return ------ float optimal move size (rate) between 0 and 1 """ theta = minimize_scalar(lambda x: self.calculateZ(x), bounds = (0,1), method = 'Bounded') return theta.x
[docs] def showODFlow(self): """ Method for presenting table of the optimal traffic assignment of the Frank-Wolfe algorithm procedure """ for (u, v, d) in self.graph.edges(data=True): print(u, ' -> ', v, ': ', d['object'].vol)
[docs] def showODFlowMap(self): """ Method for presenting the traffic assignment result on a map """ edgewidth = [d['object'].vol/5000 for (u, v, d) in self.graph.edges(data=True)] if node_file != None: plt.figure(num = 1, figsize=(10,10)) plt.axis('off') pos = nx.get_node_attributes(self.graph, "pos") nx.draw_networkx_edges(self.graph, pos, width=edgewidth) nx.draw_networkx_edge_labels(self.graph, pos, \ edge_labels={(u, v): round(d["object"].vol,0) for u, v, d in self.graph.edges(data=True)}, \ font_size=8, label_pos=0.3, alpha=0.) nx.draw_networkx_nodes(self.graph, pos, with_labels=True) nx.draw_networkx_labels(self.graph, pos, font_size=10) plt.show()
if __name__ == "__main__": directory = ".\\Data\\TransportationNetworks\\SiouxFalls\\" link_file = '{}SiouxFalls_net.tntp'.format(directory) trip_file = '{}SiouxFalls_trips.tntp'.format(directory) node_file = '{}SiouxFalls_node.tntp'.format(directory) SO = False fw = Run(link_file, trip_file, node_file, SO) fw.showODFlow() fw.showODFlowMap()