Compare commits

...

22 Commits

Author SHA1 Message Date
Andy Eschbacher
9805171a7b adds rounding to output results 2017-05-19 09:25:23 -04:00
Andy Eschbacher
d1172b7858 minor updates 2017-05-19 09:24:52 -04:00
Andy Eschbacher
5a623e7903 adds outputs for fixed assignments (costs not yet output) 2017-05-18 22:34:20 -04:00
Andy Eschbacher
588e8b413f adds amount ferried between pairs as output value 2017-05-18 20:33:14 -04:00
Andy Eschbacher
fe803d0b4b init to only include Optim class 2017-05-18 13:04:06 -04:00
Andy Eschbacher
7675a60dee updates to not consider already paired source/drains 2017-05-18 13:03:18 -04:00
Andy Eschbacher
39b7b09272 adds extra providers and extra options for optim problem 2017-05-18 13:02:16 -04:00
Andy Eschbacher
b0614aefc6 [mvp] fractional optimization 2017-05-08 23:47:17 -04:00
Andy Eschbacher
ddfdd87044 update execute to rely on UPDATE for templating values in query 2017-05-05 08:56:33 -04:00
Andy Eschbacher
3a042ee2ff adds initial version of distance matrix function 2017-05-04 17:26:45 -04:00
Andy Eschbacher
59466a69bc moves functions to rely on queries instead of tqblenames 2017-04-24 14:40:21 -04:00
Andy Eschbacher
b0e7253dbb adds basic tests for cvxopt function 2017-04-10 17:14:58 -04:00
Andy Eschbacher
faa3eb9434 cleans check constraint code 2017-04-10 11:59:17 -04:00
Andy Eschbacher
afb845f29c data type fixes and __init__ refactoring 2017-04-03 10:03:20 -04:00
Andy Eschbacher
7b84427d02 pylint updates 2017-04-03 09:04:51 -04:00
Andy Eschbacher
78f72d25e6 refactoring 2017-03-30 15:11:12 -04:00
Andy Eschbacher
cc555cb322 updates internal variable names 2017-03-30 09:55:14 -04:00
Andy Eschbacher
b8b7acdb2d refactoring to limit the number of attributes 2017-03-29 18:55:49 -04:00
Andy Eschbacher
dc8cc0e1c7 expose model parameters 2017-03-29 15:20:46 -04:00
Andy Eschbacher
c9af838b07 basic working version of mvp 2017-03-29 14:39:22 -04:00
Andy Eschbacher
a601ea0642 optim code 2017-03-28 18:22:26 -04:00
Andy Eschbacher
d871704885 stubs out mvp 2017-03-28 18:09:43 -04:00
7 changed files with 653 additions and 8 deletions

View File

@@ -0,0 +1,26 @@
CREATE OR REPLACE FUNCTION
CDB_OptimAssignments(source text,
drain text,
drain_capacity text,
source_production text,
marginal_cost text,
dist_matrix_query text,
dist_rate numeric DEFAULT 0.15,
dist_threshold numeric DEFAULT null)
RETURNS table(drain_id bigint, source_id int, cost numeric, amount numeric) AS $$
from crankshaft.optimization import Optim
def cast_val(val):
return float(val) if val is not None else None
params = {'dist_rate': cast_val(dist_rate),
'dist_threshold': cast_val(dist_threshold)}
optim = Optim(source, drain, dist_matrix_query, drain_capacity,
source_production, marginal_cost, **params)
x = optim.output()
return x
$$ LANGUAGE plpythonu;

View File

@@ -0,0 +1,44 @@
-- Calculate the distance matrix using underlying road network
-- Sample usage:
-- select * from cdb_distancematrix('drain_table'::regclass,
-- 'source_table'::regclass)
CREATE OR REPLACE FUNCTION CDB_DistanceMatrix(
origin_table regclass,
destination_table regclass,
transit_mode text DEFAULT 'car'
)
RETURNS TABLE(origin_id bigint, destination_id bigint,
the_geom geometry(geometry, 4326),
length_km numeric, duration_sec numeric)
AS $$
BEGIN
RETURN QUERY
EXECUTE format('
WITH pairs AS (
SELECT
o."cartodb_id" AS origin_id,
d."cartodb_id" AS destination_id,
o."the_geom" AS origin_point,
d."the_geom" AS destination_point
FROM
(SELECT * FROM %I) AS o,
(SELECT * FROM %I) AS d),
results AS (
SELECT
origin_id,
destination_id,
(cdb_route_point_to_point(origin_point,
destination_point,
$1)).*
FROM pairs)
SELECT
origin_id::bigint AS origin_id,
destination_id::bigint AS destination_id,
shape AS the_geom,
length::numeric AS length_km,
duration::numeric AS duration_sec
FROM results;', origin_table, destination_table)
USING transit_mode;
RETURN;
END;
$$ LANGUAGE plpgsql;

View File

@@ -4,3 +4,4 @@ import crankshaft.clustering
import crankshaft.space_time_dynamics
import crankshaft.segmentation
import analysis_data_provider
import crankshaft.optimization

View File

@@ -1,9 +1,11 @@
"""class for fetching data"""
import plpy
import pysal_utils as pu
import numpy as np
class AnalysisDataProvider:
class AnalysisDataProvider(object):
"""Analysis providers for crankshaft functions. These rely on database
access through `plpy`"""
def get_getis(self, w_type, params):
"""fetch data for getis ord's g"""
try:
@@ -14,7 +16,7 @@ class AnalysisDataProvider:
return pu.empty_zipped_array(4)
else:
return result
except plpy.SPIError, err:
except plpy.SPIError as err:
plpy.error('Analysis failed: %s' % err)
def get_markov(self, w_type, params):
@@ -27,7 +29,7 @@ class AnalysisDataProvider:
return pu.empty_zipped_array(4)
return data
except plpy.SPIError, err:
except plpy.SPIError as err:
plpy.error('Analysis failed: %s' % err)
def get_moran(self, w_type, params):
@@ -40,8 +42,8 @@ class AnalysisDataProvider:
if len(data) == 0:
return pu.empty_zipped_array(2)
return data
except plpy.SPIError, err:
plpy.error('Analysis failed: %s' % e)
except plpy.SPIError as err:
plpy.error('Analysis failed: %s' % err)
return pu.empty_zipped_array(2)
def get_nonspatial_kmeans(self, query):
@@ -49,7 +51,7 @@ class AnalysisDataProvider:
try:
data = plpy.execute(query)
return data
except plpy.SPIError, err:
except plpy.SPIError as err:
plpy.error('Analysis failed: %s' % err)
def get_spatial_kmeans(self, params):
@@ -63,5 +65,154 @@ class AnalysisDataProvider:
try:
data = plpy.execute(query)
return data
except plpy.SPIError, err:
except plpy.SPIError as err:
plpy.error('Analysis failed: %s' % err)
def get_column(self, subquery, column, dtype=float, id_col='cartodb_id',
condition=None):
"""
Retrieve the column from the specified table from a connected
PostgreSQL database.
Args:
subquery (str): subquery to retrieve column from
column (str): column to retrieve
dtype (type): data type in column (e.g, float, int, str)
id_col (str, optional): Column name for index. Defaults to
`cartodb_id`.
Returns:
numpy.array: column from table as a NumPy array
"""
query = '''
SELECT array_agg("{column}" ORDER BY "{id_col}" ASC) as col
FROM ({subquery}) As _wrap {filter}
'''.format(subquery=subquery,
column=column,
id_col=id_col,
filter='WHERE {}'.format(condition) if condition else '')
resp = plpy.execute(query)
return np.array(resp[0]['col'], dtype=dtype)
def get_reduced_column(self, drain_query, capacity,
source_query, amount,
dtype=float, id_col='cartodb_id'):
"""
Retrieve the column from the specified table from a connected
PostgreSQL database.
Args:
source_query (str): source_query to retrieve column from
column (str): column to retrieve
dtype (type): data type in column (e.g, float, int, str)
id_col (str, optional): Column name for index. Defaults to
`cartodb_id`.
Returns:
numpy.array: column from table as a NumPy array
"""
query = '''
WITH cte AS (
SELECT
d."{capacity}" - coalesce(s."source_claimed", 0) As
reduced_capacity,
d."{id_col}"
FROM
({drain_query}) As d
LEFT JOIN
(SELECT
"drain_id",
sum("{amount}") As source_claimed
FROM ({source_query}) As _wrap
GROUP BY "drain_id") As s
ON
d."{id_col}" = s."drain_id"
)
SELECT
array_agg("reduced_capacity"
ORDER BY "{id_col}" ASC) As col
FROM cte
'''.format(capacity=capacity,
id_col=id_col,
drain_query=drain_query,
amount=amount,
source_query=source_query)
resp = plpy.execute(query)
return np.array(resp[0]['col'], dtype=dtype)
def get_distance_matrix(self, table, origin_ids, destination_ids):
"""Transforms a SQL table origin-destination table into a distance
matrix.
:param query: Table that has the data needed for building the
distance matrix. Query should have the following columns:
- origin_id (int)
- destination_id (int)
- length_km (numeric)
:type query: str
:param origin_ids: List of origin IDs
:type origin_ids: list of ints
:param destination_ids: List of origin IDs
:type destination_ids: list of ints
:returns: 2D array of distances from all origins to all destinations
:rtype: numpy.array
"""
try:
resp = plpy.execute('''
SELECT "origin_id", "destination_id", "length_km"
FROM (SELECT * FROM "{table}") as _wrap
'''.format(table=table))
except plpy.SPIError as err:
plpy.error("Failed to build distance matrix: {}".format(err))
pairs = {(row['origin_id'], row['destination_id']): row['length_km']
for row in resp}
distance_matrix = np.array([
pairs[(origin, destination)]
for destination in destination_ids
for origin in origin_ids
])
return np.array(distance_matrix,
dtype=float).reshape((len(destination_ids),
len(origin_ids)))
def get_pairwise_distances(self, drain_query, source_query,
id_col='cartodb_id'):
"""Retuns the pairwise distances between row i and j for all i in
drain_query and j in source_query
Args:
drain_query (str): Query that exposes the `the_geom` and
`cartodb_id` (or what is specified in `id_col`) of the dataset
for 'drain' locations
source_query (str): Query that exposes the `the_geom` and
`cartodb_id` (or what is specified in `id_col`) of the dataset
for 'source' locations
id_col (str, optional): Column name for table index. Defaults to
`cartodb_id`.
Returns:
numpy.array: A len(s) by len(d) array of distances from source i to
drain j
"""
query = '''
SELECT array_agg(ST_Distance(d."the_geom"::geography,
s."the_geom"::geography) / 1000.0
ORDER BY d."{id_col}" ASC) as dist
FROM ({drain_query}) AS d, ({source_query}) AS s
GROUP BY s."{id_col}"
ORDER BY s."{id_col}" ASC
'''.format(drain_query=drain_query,
source_query=source_query,
id_col=id_col)
resp = plpy.execute(query)
# len(s) x len(d) matrix
return np.array([np.array(row['dist'], dtype=float)
for row in resp], dtype=float)

View File

@@ -0,0 +1 @@
from optim import Optim

View File

@@ -0,0 +1,301 @@
"""optimization"""
import plpy
import numpy as np
import cvxopt
from cvxopt import solvers
from crankshaft.analysis_data_provider import AnalysisDataProvider
class Optim(object):
"""Linear optimization class for logistics cost minimization
Optimization for logistics
based on models:
- source_amount * (marginal_cost + transport_cost * distance)
"""
def __init__(self, source_query, drain_query, dist_matrix_table,
capacity_column, production_column, marginal_column,
**kwargs):
# set data provider - defaults to SQL database access
self.data_provider = kwargs.get('data_provider',
AnalysisDataProvider())
# model parameters
self.model_params = {
'dist_cost': kwargs.get('dist_cost', 0.15),
'dist_threshold': kwargs.get('dist_threshold', None),
'solver': kwargs.get('solver', 'glpk')}
self._check_model_params()
# database ids
self.ids = {
'drain_free': self.data_provider.get_column(
drain_query,
'cartodb_id',
id_col='cartodb_id',
dtype=int),
'source_free': self.data_provider.get_column(
source_query,
'cartodb_id',
dtype=int,
condition='drain_id is null'),
'source_fixed': self.data_provider.get_column(
source_query,
'cartodb_id',
dtype=int,
condition='drain_id is not null'),
'drain_fixed': self.data_provider.get_column(
source_query,
'drain_id',
dtype=int,
condition='drain_id is not null'
)}
# model data
self.model_data = {
'drain_capacity': self.data_provider.get_reduced_column(
drain_query,
capacity_column,
source_query,
production_column,
id_col='cartodb_id',
dtype=int),
'source_amount': self.data_provider.get_column(
source_query,
production_column,
condition='drain_id is null'),
'source_amount_fixed': self.data_provider.get_column(
source_query,
production_column,
condition='drain_id is not null'),
'marginal_cost': self.data_provider.get_column(
drain_query,
marginal_column),
'distance': self.data_provider.get_distance_matrix(
dist_matrix_table,
self.ids['source_free'],
self.ids['drain_free']),
'distance_fixed': self.data_provider.get_distance_matrix(
dist_matrix_table,
self.ids['source_fixed'],
self.ids['drain_fixed']
)}
self.model_data['cost'] = self.calc_cost()
self.n_sources = len(self.ids['source_free'])
self.n_drains = len(self.ids['drain_free'])
def _check_constraints(self):
"""Check if inputs are within constraints"""
total_capacity = self.model_data['drain_capacity'].sum()
total_amount = self.model_data['source_amount'].sum()
if total_amount > total_capacity:
raise ValueError("Solution not possible. Drain capacity is "
"smaller than total source production.")
elif total_capacity <= 0:
raise ValueError("Capacity must be greater than zero")
plpy.notice('Capacity: {total_capacity}, '
'Amount: {total_amount} '
'({perc}%)'.format(total_capacity=total_capacity,
total_amount=total_amount,
perc=100.0 * total_amount / total_capacity))
return None
def _check_model_params(self):
"""Ensure model parameters are well formed"""
if (self.model_params['dist_threshold'] <= 0 and
self.model_params['dist_threshold'] is not None):
raise ValueError("`dist_threshold` must be greater than zero")
if (self.model_params['dist_cost'] is None or
self.model_params['dist_cost'] < 0):
raise ValueError("`dist_cost` must be greater than zero")
if self.model_params['solver'] not in (None, 'glpk'):
raise ValueError("`solver` must be one of 'glpk' (default) "
"or None.")
return None
def output(self):
"""Output the calculated 'optimal' assignments if solution is not infeasible.
:returns: List of source id/drain id pairs and the associated cost of
transport from source to drain
:rtype: List of tuples
"""
# retrieve fractional assignments
assignments = self.optim()
# crosswalks for matrix index -> cartodb_id
drain_id_crosswalk = {}
for idx, cid in enumerate(self.ids['drain_free']):
# matrix index -> cartodb_id
drain_id_crosswalk[idx] = cid
source_id_crosswalk = {}
for idx, cid in enumerate(self.ids['source_free']):
# matrix index -> cartodb_id
source_id_crosswalk[idx] = cid
# find non-zero entries
source_index, drain_index = np.nonzero(assignments)
# returns:
# - drain_id
# - source_id
# - cost of that pairing
# - amount sent via that pairing
assigned_costs = [(
drain_id_crosswalk[drain_index[idx]],
source_id_crosswalk[source_val],
self.model_data['cost'][drain_index[idx], source_val],
round(self.model_data['source_amount'][source_val] *
assignments[source_val, drain_index[idx]], 6),
False
)
for idx, source_val in enumerate(source_index)]
# Fixed vals:
# - self.ids['source_fixed']
# - self.ids['drain_fixed']
# -
fixed_costs = self.fixed_values()
# plpy.notice("FIXED COSTS: {}".format(fixed_costs))
return assigned_costs + fixed_costs
def fixed_values(self):
"""Return the fixed source IDs, drain IDs, costs for transport, and the
amount that is transported.
"""
margins = {k: val for k, val in zip(self.ids['drain_free'],
self.model_data['marginal_cost'])}
self.model_data['marginal_cost_fixed'] = [margins[d]
for d in self.ids['drain_fixed']]
fixed_costs = self.calc_cost(source='source_amount_fixed',
distance='distance_fixed',
margin='marginal_cost_fixed')
# cost = [fixed_costs[self.ids['drain_fixed'][idx], source_val]
# for idx, source_val in enumerate(self.ids['source_fixed'])]
return zip(self.ids['drain_fixed'],
self.ids['source_fixed'],
[1.] * len(self.ids['source_fixed']),
self.model_data['source_amount_fixed'],
[True] * len(self.ids['drain_fixed']))
def cost_func(self, distance, waste, marginal):
"""
cost equation
:param distance: distance (in km)
:type distance: float
:param waste: number of tons of waste. This was previously calculated
as self.model_params['amount_per_unit'] * number of people minus the recycle_rate
:type waste: numeric
:param marginal: intrinsic cost per ton of a plant
:type marginal: numeric
:returns: cost
:rtype: numeric
Note: dist_cost is the cost per ton (e.g., 0.15 GBP/ton)
"""
return waste * (marginal + self.model_params['dist_cost'] * distance)
def calc_cost(self, source='source_amount', distance='distance',
margin='marginal_cost'):
"""
Populate an d x s matrix according to the cost equation
:returns: d x s matrix of costs from area i to plant j
:rtype: numpy.array
"""
costs = np.array(
[self.cost_func(dist,
self.model_data[source][pair[1]],
self.model_data[margin][pair[0]])
for pair, dist in np.ndenumerate(self.model_data[distance])])
return costs.reshape(self.model_data[distance].shape)
def optim(self):
"""solve linear optimization problem
Equations of the form:
minimize c'*x by assigning x values
subject to G*x <= h
A*x = b
0 <= x[k] <= 1
:returns: Fractional assignments array (of 1s and 0s) of shape c.T.
Value at position (i, j) corresponds to the fraction of source
`i`'s supply to drain `j`.
:rtype: numpy.array
"""
n_pairings = self.n_sources * self.n_drains
# ---
# costs
# elements chosen to minimize sum
cost = np.nan_to_num(self.model_data['cost'])
cost = cvxopt.matrix(cost.ravel('F'))
# ---
# equality constraint variables
# each area is serviced once
A = cvxopt.spmatrix(1.,
[i // self.n_drains
for i in range(n_pairings)],
range(n_pairings), tc='d')
b = cvxopt.matrix([1.] * self.n_sources, tc='d')
# make nan's in cost impossible
if np.isnan(self.model_data['distance']).any():
i_vals, j_vals = np.where(np.isnan(self.model_data['distance']))
for idx, i_val in enumerate(i_vals):
i = int(i_val)
j = int(i_val * self.n_drains + j_vals[idx])
A[i, j] = 0
# knock out values above distance threshold
if self.model_params['dist_threshold']:
j_vals, i_vals = np.where(self.model_data['distance'] >
self.model_params['dist_threshold'])
for idx, ival in enumerate(i_vals):
A[int(ival), int(ival * self.n_drains + j_vals[idx])] = 0
# ---
# inequality constraint variables
# each plant never goes over capacity
drain_capacity = cvxopt.matrix([
cvxopt.matrix(self.model_data['drain_capacity'], tc='d'),
cvxopt.matrix([1.] * n_pairings, tc='d'),
cvxopt.matrix([0.] * n_pairings, tc='d')
])
# inequality maxima
ineq_maxs = cvxopt.sparse([
cvxopt.spmatrix(
np.repeat(self.model_data['source_amount'], self.n_drains),
[i % self.n_drains for i in range(n_pairings)],
range(n_pairings), tc='d'),
cvxopt.spmatrix(1.,
range(n_pairings),
range(n_pairings)),
cvxopt.spmatrix(-1.,
range(n_pairings),
range(n_pairings))
], tc='d')
for var in (cost, ineq_maxs, drain_capacity, A, b):
plpy.notice('size: {}'.format(var.size))
plpy.notice('{}, {}, {}'.format(n_pairings, self.n_sources, self.n_drains))
# solve
sol = solvers.lp(c=cost, G=ineq_maxs, h=drain_capacity,
A=A, b=b, solver=self.model_params['solver'])
if sol['status'] != 'optimal':
raise Exception("No solution possible: {}".format(sol))
# NOTE: assignments needs to be shaped like self.model_data['cost'].T
return np.array(sol['x'],
dtype=float)\
.flatten()\
.reshape((self.model_data['cost'].shape[1],
self.model_data['cost'].shape[0]))

View File

@@ -0,0 +1,121 @@
"""Unit tests for the optimizaiton module"""
import unittest
import numpy as np
from crankshaft.optimization import Optim
from crankshaft.analysis_data_provider import AnalysisDataProvider
import cvxopt
# suppress cvxopt GLPK messages
cvxopt.glpk.options['msg_lev'] = 'GLP_MSG_OFF'
class RawDataProvider(AnalysisDataProvider):
"""Raw data provider for testing purposes"""
def __init__(self, raw_data):
self.raw_data = raw_data
def get_column(self, table, column, dtype=float):
"""Returns requested 'column' of data"""
if column != 'cartodb_id':
return np.array(self.raw_data[column], dtype=dtype)
elif table == 'drain_table':
return np.arange(1, len(self.raw_data['capacity_col']) + 1)
elif table == 'source_table':
return np.arange(1, len(self.raw_data['production_col']) + 1)
def get_pairwise_distances(self, _source, _drain):
"""Returns pairwise distances"""
return np.array(self.raw_data['pairwise'], dtype=float)
class OptimTest(unittest.TestCase):
"""Testing class for Optimization module"""
def setUp(self):
# self.data = json.loads(
# open(fixture_file('optim.json')).read())
# capacity ~ 0.01 * production given waste_per_person of 0.01
# so capacity_col = [9, 31] / 100
self.data = {
'all_right': {"production_col": [10, 10, 10],
"capacity_col": [0.09, 0.31],
"marginal_col": [5, 5],
"pairwise": [[1, 2, 3], [3, 2, 1]]},
'all_left': {"production_col": [10, 10, 10],
"capacity_col": [0.31, 0.09],
"marginal_col": [5, 5],
"pairwise": [[1, 2, 3], [3, 2, 1]]},
'2left': {"production_col": [10, 10, 10],
"capacity_col": [0.21, 0.11],
"marginal_col": [5, 5],
"pairwise": [[1, 2, 3], [3, 2, 1]]},
'infeasible': {"production_col": [10, 10, 10],
"capacity_col": [0.19, 0.11],
"marginal_col": [5, 5],
"pairwise": [[1, 2, 3], [3, 2, 1]]}}
self.params = {'waste_per_person': 0.01,
'recycle_rate': 0.0,
'dist_rate': 0.15,
'dist_threshold': None,
'data_provider': None}
self.args = ('drain_table', 'source_table',
'capacity_col', 'production_col',
'marginal_col')
# print(self.model_data)
# print(self.model_params)
def test_optim_output(self):
"""Test Optim().output"""
outputs = {'all_right': [2, 2, 2],
'all_left': [1, 1, 1],
'2left': [1, 1, 2],
'infeasible': None}
for k in self.data:
if k == 'infeasible':
continue
self.params['data_provider'] = RawDataProvider(self.data[k])
optim = Optim(*self.args, **self.params)
out_vals = optim.output()
drain_ids = [row[0] for row in out_vals]
print(drain_ids)
print(k)
self.assertTrue(drain_ids == outputs[k])
return True
def test_check_constraints(self):
"""Test optim._check_constraints"""
for k in self.data:
self.params['data_provider'] = RawDataProvider(self.data[k])
print(k)
try:
optim = Optim(*self.args, **self.params)
# pylint: disable=protected-access
constraint_check = optim._check_constraints() is None
except ValueError as err:
# if infeasible, catch and say it's acceptable
print(k)
if k == 'infeasible':
constraint_check = True
print(constraint_check)
else:
raise ValueError(err)
self.assertTrue(constraint_check)
# def test_check_model_params(self):
# """Test model param defaults are correctly formed"""
# for k in self.data:
# self.params['data_provider'] = RawDataProvider(self.data[k])
# optim = Optim(*self.args, **self.params)
# # pylint: disable=protected-access
# model_check = optim._check_model_params() is None
# self.assertTrue(model_check)
def test_optim(self):
"""Test optim.optim method"""
# assert False
pass