5.2 量子+经典算法
【PM+CIM案例】使用PenaltyMethodSolver求解作业车间调度问题(JSP)
1. 问题背景
1.1 作业车间调度问题(JSP)
作业车间调度问题(Job Shop Scheduling Problem)是制造业中的经典优化问题,其目标是在多台机器上合理安排多个作业任务的加工顺序,使得整体加工时间(makespan)最小化。该问题具有NP-hard性质,在以下场景中具有重要应用价值:
- 工厂生产排程
- 云计算任务调度
- 物流运输规划
1.2 参数定义
- 任务数量:20个独立任务
- 机器数量:5台并行机器
- 任务持续时间:每个任务加工时间,这里通过
time%10
生成(保证最小为1) - 机器启动时间:每台机器的初始准备时间,取值为1到5的整数
- 优化目标: (1)最小化最大机器完成时间(makespan) (2)平衡各机器负载(最小化完成时间方差)
2. QUBO模型与约束转换
2.1 QUBO模型基础
QUBO模型标准形式:
其中
2.2 变量定义
定义二元决策变量:
python
X = kw.qubo.ndarray((taskNums, machineNums), "X", kw.qubo.binary)
其中
2.3 构建目标函数
2.3.1 完成时间计算
每台机器i的完成时间包含:
- 初始启动时间
machine_start_time[i]
- 分配任务的持续时间总和
python
J = [machine_start_time[i] + duration.dot(X[:, i]) for i in machineNums]
2.3.2 平衡目标
目标1:最小化完成时间的方差
python
J_mean = (总持续时间 + 总启动时间) / 机器数
obj = Σ[(J[i] - J_mean)^2] / 机器数
2.4 约束处理
硬约束:每个任务必须分配且只能分配到一个机器
python
for j in 所有任务:
ΣX[j][i] = 1
通过惩罚项转化为QUBO:
python
约束表达式 = (1 - ΣX[j][i])^2 * lambda1
3. 混合求解框架解析
3.1 PenaltyMethodSolver架构
< img src="https://images-cloud.qboson.com/community/portal/202504/25/155650hhepth4c24cr2hh2.png" width="200" />
3.2 组件说明
- 经典优化器:使用模拟退火(SA)进行初步求解
- 循环控制器:
SolverLoopController
管理迭代过程 - 惩罚调整策略:动态调整约束惩罚系数
4. 代码实现详解
4.1 模型构建核心代码
python
# 初始化参数
taskNums = 20
machineNums = 5
penalty = 45
# 生成持续时间数组(示例数据)
duration = np.array([time%10 if time%10 else 1
for time in range(1, taskNums+1)])
# 定义QUBO变量
X = kw.qubo.ndarray((taskNums, machineNums), "X", kw.qubo.binary)
# 计算目标函数
J_mean = (np.sum(duration) + np.sum(machine_start_time)) / machineNums
J = [machine_start_time[i] + duration.dot(X[:,i])
for i in range(machineNums)]
obj = kw.qubo.quicksum([(J[i]-J_mean)**2
for i in range(machineNums)])/machineNums
# 构建QUBO模型
qubo_model = kw.qubo.QuboModel()
qubo_model.set_objective(obj)
# 添加约束
for j in range(taskNums):
qubo_model.add_constraint(
(1 - kw.qubo.quicksum(X[j,i] for i in range(machineNums))) == 0,
f"c{j}",
penalty=penalty
)
4.2 求解器配置
python
# 模拟退火参数设置
optimizer = kw.classical.SimulatedAnnealingOptimizer(
initial_temperature=1e8,
alpha=0.99,
cutoff_temperature=0.001,
iterations_per_t=2500
)
# 循环控制器
pm_controller = kw.common.SolverLoopController(max_repeat_step=5)
# 构建求解器
solver = kw.solver.PenaltyMethodSolver(optimizer, pm_controller)
sol_dict, _ = solver.solve_qubo(qubo_model)
5. 运行结果分析
5.1 典型输出示例
duration array: [1 2 3 4 5 6 7 8 9 1 2 3 4 5 6 7 8 9 1 2]
machine_start_time: [1 2 3 4 5]
所有机器结束时间: [55. 54. 53. 52. 55.]
最终结果:55
方差:1.04
5.2 结果解读
- Makespan:55时间单位,即最晚完成机器的结束时间
- 方差:1.04,显示各机器负载高度平衡
- 约束验证:所有任务均被分配到唯一机器
5.3 可视化分析
python
import matplotlib.pyplot as plt
plt.figure(figsize=(10,6))
plt.bar(range(machineNums), J_end)
plt.xlabel('Machine ID')
plt.ylabel('Completion Time')
plt.title('Machine Workload Distribution')
plt.show()
6. 方法与参数调优建议
6.1 惩罚系数选择
- 过低惩罚(<40):约束无法满足
- 过高惩罚(>1e4):目标函数被淹没
- 经验范围:本问题建议在45-200之间调整
6.2 模拟退火参数优化
参数 | 推荐值 | 作用 |
---|---|---|
初始温度 | 1e6-1e8 | 避免陷入局部最优 |
降温系数 | 0.95-0.995 | 平衡收敛速度与精度 |
每温度迭代次数 | 1000-5000 | 保证充分搜索 |
6.3 量子优化替代方案
python
# 切换为CIM光量子计算机
q_optimizer = kw.cim.CIMOptimizer(user_name='test',
password='absd1232',
cim_task_manager_domain='local',
project_no="abc123")
solver = kw.solver.PenaltyMethodSolver(q_optimizer, pm_controller)
7. 扩展与应用场景
7.1 复杂约束扩展
- 任务优先级:添加顺序约束项
- 机器容量限制:引入新的约束条件
- 动态调度:实时更新任务队列
7.2 行业应用
- 智能制造:优化工厂生产排程
- 云计算:虚拟机任务调度
- 物流运输:配送路径规划
7.3 性能提升方向
- 分层求解:将问题分解为分配子问题和排序子问题
- 混合编码:结合整数变量和连续变量
- 量子经典混合:使用CIM等量子算法处理核心计算
结语
本案例展示了如何将经典调度问题转化为QUBO模型,并通过PenaltyMethodSolver进行高效求解。该方法兼具量子计算的发展潜力与经典算法的实用性,为复杂调度问题提供了新的解决思路。随着量子计算硬件的进步,此类混合算法将在工业优化领域发挥更大价值。
附:完整代码
python
import os
import sys
import numpy as np
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(base_dir, 'src/com/qboson'))
import kaiwu as kw
class CIMSolver:
def prepareCIMMatrix(self, taskNums, machineNums, lambda1=10000):
# np.random.seed(10)
self.taskNums = taskNums
self.machineNums = machineNums
print('TaskNums:', self.taskNums)
print('MachineNums:', self.machineNums)
self.duration = np.array([time % 10 if time % 10 else 1 for time in range(1, self.taskNums + 1)])
self.machine_start_time = np.array(range(1, self.machineNums + 1))
self.X = kw.qubo.ndarray((taskNums, machineNums), "X", kw.qubo.binary)
J_mean = (np.sum(self.duration) + np.sum(self.machine_start_time)) / self.machineNums
J = [self.machine_start_time[i] + self.duration.dot(self.X[:, i]) for i in range(self.machineNums)]
obj = kw.qubo.quicksum([(J[i] - J_mean) ** 2 for i in range(self.machineNums)]) / self.machineNums
self.model = kw.qubo.QuboModel()
self.model.set_objective(obj)
for j in range(self.taskNums):
self.model.add_constraint((1 - kw.qubo.quicksum([self.X[j][i] for i in range(self.machineNums)])) == 0,
f"c{j}", penalty=lambda1)
self.obj_ising = self.model.to_ising_model()
self.matrix = self.obj_ising.get_ising()["ising"]
def showResultDetail(self, sol_dict):
unsatisfied_count, result_dict = self.model.verify_constraint(sol_dict)
print('约束项不满足的个数:', unsatisfied_count)
out_array = np.zeros([self.taskNums, self.machineNums])
for key in sol_dict:
m_index = int(key.split('[')[1].split(']')[0])
n_index = int(key.split('[')[2].split(']')[0])
out_array[m_index, n_index] = sol_dict[key]
for y in range(self.machineNums):
tmp = []
for x in range(self.taskNums):
if out_array[x, y] == 1:
tmp.append(x)
# print('MACHINE:', y, 'TASK:', tmp)
J_end = np.zeros(self.machineNums)
for i in range(self.machineNums):
ifturn_temp = kw.qubo.get_val(kw.qubo.quicksum(self.X[:, i]), sol_dict)
ifturn = 1 if ifturn_temp > 0 else 0
J_temp = self.duration.dot(self.X[:, i]) + self.machine_start_time[i] * ifturn
J_end[i] = kw.qubo.get_val(J_temp, sol_dict)
print("duration array: ", self.duration)
print("machine_start_time: ", self.machine_start_time)
print("所有机器结束时间:", J_end)
print('最终结果:{}'.format(np.max(J_end)))
print('方差:', np.var(J_end))
return 0, np.max(J_end) # todo: tmp
def showResultDetail1(self, sol_dict):
J_end = np.zeros(self.machineNums)
for i in range(self.machineNums):
ifturn_temp = kw.qubo.get_val(kw.qubo.quicksum(self.X[:, i]), sol_dict)
ifturn = 1 if ifturn_temp > 0 else 0
J_temp = self.duration.dot(self.X[:, i]) + self.machine_start_time[i] * ifturn
J_end[i] = kw.qubo.get_val(J_temp, sol_dict)
print("duration array: ", self.duration)
print("machine_start_time: ", self.machine_start_time)
print("所有机器结束时间:", J_end)
print('最终结果:{}'.format(np.max(J_end)))
print('方差:', np.var(J_end))
return 0, np.max(J_end) # todo: tmp
if __name__ == '__main__':
kw.license.init(user_id="3098067335119083", sdk_code="7HUicCDbmhepmuoOt4uf5U15xO4rJN")
kw.common.set_log_level("INFO")
taskNums = 20
machineNums = 5
penalty = 45
print('TaskNums:', taskNums)
print('MachineNums:', machineNums)
duration = np.array([time % 10 if time % 10 else 1 for time in range(1, taskNums + 1)])
machine_start_time = np.array(range(1, machineNums + 1))
X = kw.qubo.ndarray((taskNums, machineNums), "X", kw.qubo.binary)
J_mean = (np.sum(duration) + np.sum(machine_start_time)) / machineNums
J = [machine_start_time[i] + duration.dot(X[:, i]) for i in range(machineNums)]
obj = kw.qubo.quicksum([(J[i] - J_mean) ** 2 for i in range(machineNums)]) / machineNums
qubo_model = kw.qubo.QuboModel()
qubo_model.set_objective(obj)
for j in range(taskNums):
qubo_model.add_constraint((1 - kw.qubo.quicksum([X[j][i] for i in range(machineNums)])) == 0,
f"c{j}", penalty=penalty)
optimizer = kw.classical.SimulatedAnnealingOptimizer(initial_temperature=1e8,
alpha=0.99,
cutoff_temperature=0.001,
iterations_per_t=2500)
pm_controller = kw.common.SolverLoopController(max_repeat_step=5)
solver = kw.solver.PenaltyMethodSolver(optimizer, pm_controller)
sol_dict, _ = solver.solve_qubo(qubo_model)
if sol_dict is not None:
unsatisfied_count, result_dict = qubo_model.verify_constraint(sol_dict)
print('约束项不满足的个数:', unsatisfied_count)
out_array = np.zeros([taskNums, machineNums])
for key in sol_dict:
m_index = int(key.split('[')[1].split(']')[0])
n_index = int(key.split('[')[2].split(']')[0])
out_array[m_index, n_index] = sol_dict[key]
for y in range(machineNums):
tmp = []
for x in range(taskNums):
if out_array[x, y] == 1:
tmp.append(x)
J_end = np.zeros(machineNums)
for i in range(machineNums):
#ifturn_temp = kw.qubo.get_val(kw.qubo.quicksum(X[:, i].tolist()), sol_dict)
ifturn_temp = kw.core.get_val(X[:, i].sum(), sol_dict)
ifturn = 1 if ifturn_temp > 0 else 0
J_temp = duration.dot(X[:, i]) + machine_start_time[i] * ifturn
J_end[i] = kw.core.get_val(J_temp, sol_dict)
print("duration array: ", duration)
print("machine_start_time: ", machine_start_time)
print("所有机器结束时间:", J_end)
print('最终结果:{}'.format(np.max(J_end)))
print('方差:', np.var(J_end))