Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions assignment/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class Assignment(models.Model):
created_at = models.DateTimeField(auto_now_add=True)
delivery_locations = models.JSONField() # List of [longitude, latitude]
total_load = models.PositiveIntegerField()
optimized_distance = models.PositiveIntegerField(null=True, blank=True) # Store distance in meters

def __str__(self):
return f"Assignment #{self.id} to {self.vehicle.vehicle_id}"
84 changes: 81 additions & 3 deletions assignment/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,106 @@
from .models import Assignment
from .serializers import AssignmentSerializer
from fleet.models import Vehicle
from route_optimizer.optimizer import RouteOptimizer
from django.conf import settings
import logging

# Set up logging
logger = logging.getLogger(__name__)

class AssignmentViewSet(viewsets.ModelViewSet):
queryset = Assignment.objects.all()
serializer_class = AssignmentSerializer

def create(self, request, *args, **kwargs):
# Log the raw payload for debugging
logger.debug("Received payload: %s", request.data)

# Check for deliveries
deliveries = request.data.get("deliveries")
if not deliveries:
logger.error("No deliveries provided")
return Response({"error": "Deliveries required"}, status=400)

# Validate deliveries format
if not isinstance(deliveries, list):
logger.error("Deliveries must be a list")
return Response({"error": "Deliveries must be a list"}, status=400)

for delivery in deliveries:
location = delivery.get("location")
load = delivery.get("load")
if not isinstance(location, list) or len(location) != 2:
logger.error("Invalid location format: %s", delivery)
return Response({"error": f"Invalid location format for delivery: {delivery}"}, status=400)
if not all(isinstance(coord, (int, float)) for coord in location):
logger.error("Location coordinates must be numbers: %s", delivery)
return Response({"error": f"Location coordinates must be numbers: {delivery}"}, status=400)
if not isinstance(load, (int, float)) or load <= 0:
logger.error("Invalid load: %s", delivery)
return Response({"error": f"Invalid load for delivery: {delivery}"}, status=400)

# Calculate total load
total_load = sum(d.get("load", 0) for d in deliveries)
logger.debug("Total load: %s", total_load)

# Find an available vehicle
vehicle = Vehicle.objects.filter(status="available", capacity__gte=total_load).first()
if not vehicle:
logger.error("No available vehicle for load: %s", total_load)
return Response({"error": "No available vehicle for the load"}, status=400)
logger.debug("Selected vehicle: %s", vehicle.vehicle_id)

# Prepare data for RouteOptimizer
depot = getattr(settings, 'DEPOT_COORDINATES', [77.58, 12.96])
locations = [
{'id': 'depot', 'coordinates': depot, 'load': 0}
] + [
{
'id': f'loc_{i}',
'coordinates': d.get("location"),
'load': d.get("load")
}
for i, d in enumerate(deliveries)
]
logger.debug("RouteOptimizer locations: %s", locations)

# Run RouteOptimizer
try:
optimizer = RouteOptimizer(locations, [vehicle.capacity])
routes = optimizer.solve()
if not routes:
logger.error("No feasible route found for locations: %s", locations)
return Response({"error": "No feasible route found"}, status=400)

# Extract the optimized route
optimized_route = routes[0]['route']
optimized_distance = routes[0]['distance']
logger.debug("Optimized route: %s, distance: %s", optimized_route, optimized_distance)

# Map optimized route to coordinates, excluding depot
location_map = {loc['id']: loc['coordinates'] for loc in locations}
delivery_locations = [location_map[loc_id] for loc_id in optimized_route[1:-1]]

except ValueError as e:
logger.error("RouteOptimizer validation error: %s", str(e))
return Response({"error": f"Invalid input for route optimization: {str(e)}"}, status=400)
except Exception as e:
logger.error("RouteOptimizer error: %s", str(e))
return Response({"error": f"Route optimization failed: {str(e)}"}, status=500)

# Mark vehicle as assigned
vehicle.status = "assigned"
vehicle.save()

# Create Assignment
assignment = Assignment.objects.create(
vehicle=vehicle,
delivery_locations=[d["location"] for d in deliveries],
total_load=total_load
delivery_locations=delivery_locations,
total_load=total_load,
optimized_distance=optimized_distance
)
logger.debug("Created assignment: %s", assignment)

serializer = self.get_serializer(assignment)
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.data, status=status.HTTP_201_CREATED)
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ six==1.17.0
sqlparse==0.5.3
tzdata==2025.2
uritemplate==4.1.1
flask==2.0.3
requests==2.28.1
werkzeug==2.0.3
2 changes: 1 addition & 1 deletion route_optimizer/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ def ready(self):
Perform initialization tasks when the app is ready.
"""
# Import any signals or startup tasks here
pass
pass
111 changes: 111 additions & 0 deletions route_optimizer/optimizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from ortools.constraint_solver import routing_enums_pb2
from ortools.constraint_solver import pywrapcp
import math

class RouteOptimizer:
def __init__(self, locations, vehicle_capacities):
self.locations = locations
self.vehicle_capacities = vehicle_capacities
self.depot_index = 0

def validate_inputs(self):
if not self.locations:
raise ValueError("No delivery locations provided")
if self.locations[0]['load'] != 0:
raise ValueError("Depot must have 0 load")
if len(self.vehicle_capacities) < 1:
raise ValueError("At least one vehicle required")

def haversine_distance(self, lat1, lon1, lat2, lon2):
R = 6371 # Earth's radius in kilometers
lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
c = 2 * math.asin(math.sqrt(a))
distance = R * c * 1000 # Convert to meters
return round(distance)

def create_distance_matrix(self):
matrix = []
for loc1 in self.locations:
row = []
lat1, lon1 = loc1['coordinates']
for loc2 in self.locations:
lat2, lon2 = loc2['coordinates']
distance = self.haversine_distance(lat1, lon1, lat2, lon2)
row.append(distance)
matrix.append(row)
return matrix

def solve(self):
self.validate_inputs()
data = {
'distance_matrix': self.create_distance_matrix(),
'loads': [loc['load'] for loc in self.locations],
'vehicle_capacities': self.vehicle_capacities,
'num_vehicles': len(self.vehicle_capacities),
'depot': self.depot_index
}

manager = pywrapcp.RoutingIndexManager(
len(data['distance_matrix']),
data['num_vehicles'],
data['depot']
)
routing = pywrapcp.RoutingModel(manager)

def distance_callback(from_index, to_index):
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
return int(data['distance_matrix'][from_node][to_node])

transit_callback_idx = routing.RegisterTransitCallback(distance_callback)
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_idx)

def load_callback(from_index):
return data['loads'][manager.IndexToNode(from_index)]

load_callback_idx = routing.RegisterUnaryTransitCallback(load_callback)
routing.AddDimensionWithVehicleCapacity(
load_callback_idx,
0, # null capacity slack
data['vehicle_capacities'],
True,
'Capacity'
)

search_params = pywrapcp.DefaultRoutingSearchParameters()
search_params.first_solution_strategy = (
routing_enums_pb2.FirstSolutionStrategy.GLOBAL_CHEAPEST_ARC
)
search_params.time_limit.FromSeconds(10)
solution = routing.SolveWithParameters(search_params)
return self.format_solution(data, manager, routing, solution)

def format_solution(self, data, manager, routing, solution):
if not solution:
return None

routes = []
for vehicle_id in range(data['num_vehicles']):
index = routing.Start(vehicle_id)
route = []
route_distance = 0
route_load = 0
while not routing.IsEnd(index):
node = manager.IndexToNode(index)
route.append(self.locations[node]['id'])
previous_index = index
index = solution.Value(routing.NextVar(index))
next_node = manager.IndexToNode(index)
route_distance += data['distance_matrix'][node][next_node]
route_load += data['loads'][node]
route.append(self.locations[self.depot_index]['id'])
routes.append({
'vehicle_id': vehicle_id,
'route': route,
'distance': route_distance,
'load': route_load
})
return routes
98 changes: 98 additions & 0 deletions route_optimizer/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from django.test import TestCase

# Create your tests here.
from route_optimizer.optimizer import RouteOptimizer

class RouteOptimizerTest(TestCase):
def setUp(self):
"""Set up test data for RouteOptimizer tests."""
self.simple_locations = [
{'id': 'depot', 'coordinates': [0, 0], 'load': 0},
{'id': 'A', 'coordinates': [0.018, 0.018], 'load': 1},
{'id': 'B', 'coordinates': [0.027, 0.027], 'load': 2}
]
self.vehicle_capacities = [3]
self.sri_lanka_locations = [
{'id': 'Warehouse_Colombo', 'coordinates': [6.9271, 79.8612], 'load': 0},
{'id': 'Restaurant_Kandy', 'coordinates': [7.2906, 80.6337], 'load': 3},
{'id': 'Cafe_Galle', 'coordinates': [6.0535, 80.2210], 'load': 2}
]

def test_successful_optimization_simple(self):
"""Test route optimization with simple locations."""
optimizer = RouteOptimizer(self.simple_locations, self.vehicle_capacities)
result = optimizer.solve()

self.assertIsNotNone(result, "Expected a valid solution")
self.assertEqual(len(result), 1, "Expected one route")
self.assertEqual(result[0]['route'][0], 'depot', "Route must start at depot")
self.assertEqual(result[0]['route'][-1], 'depot', "Route must end at depot")
self.assertEqual(set(result[0]['route'][1:-1]), {'A', 'B'}, "Route must visit A and B")
self.assertEqual(result[0]['load'], 3, "Expected total load of 3")
self.assertGreater(result[0]['distance'], 0, "Distance must be non-zero")

def test_successful_optimization_sri_lanka(self):
"""Test route optimization with Sri Lanka locations."""
optimizer = RouteOptimizer(self.sri_lanka_locations, [5])
result = optimizer.solve()

self.assertIsNotNone(result, "Expected a valid solution")
self.assertEqual(len(result), 1, "Expected one route")
self.assertEqual(result[0]['route'][0], 'Warehouse_Colombo', "Route must start at depot")
self.assertEqual(result[0]['route'][-1], 'Warehouse_Colombo', "Route must end at depot")
self.assertEqual(set(result[0]['route'][1:-1]), {'Restaurant_Kandy', 'Cafe_Galle'},
"Route must visit Kandy and Galle")
self.assertEqual(result[0]['load'], 5, "Expected total load of 5")
self.assertGreaterEqual(result[0]['distance'], 10000, "Distance must be significant")

def test_insufficient_vehicle_capacity(self):
"""Test optimization with insufficient vehicle capacity."""
optimizer = RouteOptimizer(self.simple_locations, [1]) # Capacity too low
result = optimizer.solve()

self.assertIsNone(result, "Expected no solution due to insufficient capacity")

def test_invalid_depot_load(self):
"""Test initialization with non-zero depot load."""
invalid_locations = self.simple_locations.copy()
invalid_locations[0]['load'] = 1
optimizer = RouteOptimizer(invalid_locations, self.vehicle_capacities)

with self.assertRaises(ValueError) as context:
optimizer.solve()
self.assertEqual(str(context.exception), "Depot must have 0 load")

def test_empty_locations(self):
"""Test initialization with empty locations list."""
optimizer = RouteOptimizer([], self.vehicle_capacities)

with self.assertRaises(ValueError) as context:
optimizer.solve()
self.assertEqual(str(context.exception), "No delivery locations provided")

def test_no_vehicles(self):
"""Test initialization with no vehicles."""
optimizer = RouteOptimizer(self.simple_locations, [])

with self.assertRaises(ValueError) as context:
optimizer.solve()
self.assertEqual(str(context.exception), "At least one vehicle required")

def test_haversine_distance(self):
"""Test Haversine distance calculation."""
optimizer = RouteOptimizer(self.sri_lanka_locations, self.vehicle_capacities)
distance = optimizer.haversine_distance(6.9271, 79.8612, 7.2906, 80.6337) # Colombo to Kandy

self.assertGreater(distance, 93000, "Distance should be more than 93km")
self.assertLess(distance, 95000, "Distance should be less than 95km")

def test_distance_matrix(self):
"""Test distance matrix creation."""
optimizer = RouteOptimizer(self.simple_locations, self.vehicle_capacities)
matrix = optimizer.create_distance_matrix()

self.assertEqual(len(matrix), 3, "Matrix should have 3 rows")
self.assertEqual(len(matrix[0]), 3, "Matrix should have 3 columns")
self.assertEqual(matrix[0][0], 0, "Depot to depot distance should be 0")
self.assertEqual(matrix[1][2], matrix[2][1], "Matrix should be symmetric")
self.assertGreater(matrix[1][2], 0, "Distance between A and B should be non-zero")
Loading