跳转到内容
学习 > 学习文档
本文内容

5.2 量子+经典算法

【PM+CIM案例】使用PenaltyMethodSolver求解作业车间调度问题(JSP)

1. 问题背景

1.1 作业车间调度问题(JSP)

作业车间调度问题(Job Shop Scheduling Problem)是制造业中的经典优化问题,其目标是在多台机器上合理安排多个作业任务的加工顺序,使得整体加工时间(makespan)最小化。该问题具有NP-hard性质,在以下场景中具有重要应用价值:

  • 工厂生产排程
  • 云计算任务调度
  • 物流运输规划

1.2 参数定义

  • 任务数量:20个独立任务
  • 机器数量:5台并行机器
  • 任务持续时间:每个任务加工时间,这里通过 time%10 生成(保证最小为1)
  • 机器启动时间:每台机器的初始准备时间,取值为1到5的整数
  • 优化目标: (1)最小化最大机器完成时间(makespan) (2)平衡各机器负载(最小化完成时间方差)

2. QUBO模型与约束转换

2.1 QUBO模型基础

QUBO模型标准形式:

minxTQx

其中x是二进制变量向量,Q为系数矩阵。

2.2 变量定义

定义二元决策变量:

python
X = kw.qubo.ndarray((taskNums, machineNums), "X", kw.qubo.binary)

其中Xji=1表示任务j被分配到机器i。

2.3 构建目标函数

2.3.1 完成时间计算

每台机器i的完成时间包含:

  • 初始启动时间machine_start_time[i]
  • 分配任务的持续时间总和
python
J = [machine_start_time[i] + duration.dot(X[:, i]) for i in machineNums]
2.3.2 平衡目标

目标1:最小化完成时间的方差

python
J_mean = (总持续时间 + 总启动时间) / 机器数
obj = Σ[(J[i] - J_mean)^2] / 机器数

2.4 约束处理

硬约束:每个任务必须分配且只能分配到一个机器

python
for j in 所有任务:
    ΣX[j][i] = 1

通过惩罚项转化为QUBO:

python
约束表达式 = (1 - ΣX[j][i])^2 * lambda1

3. 混合求解框架解析

3.1 PenaltyMethodSolver架构

< img src="https://images-cloud.qboson.com/community/portal/202504/25/155650hhepth4c24cr2hh2.png" width="200" />

3.2 组件说明

  • 经典优化器:使用模拟退火(SA)进行初步求解
  • 循环控制器SolverLoopController管理迭代过程
  • 惩罚调整策略:动态调整约束惩罚系数

4. 代码实现详解

4.1 模型构建核心代码

python
# 初始化参数
taskNums = 20
machineNums = 5
penalty = 45

# 生成持续时间数组(示例数据)
duration = np.array([time%10 if time%10 else 1
                    for time in range(1, taskNums+1)])

# 定义QUBO变量
X = kw.qubo.ndarray((taskNums, machineNums), "X", kw.qubo.binary)

# 计算目标函数
J_mean = (np.sum(duration) + np.sum(machine_start_time)) / machineNums
J = [machine_start_time[i] + duration.dot(X[:,i])
     for i in range(machineNums)]
obj = kw.qubo.quicksum([(J[i]-J_mean)**2
                       for i in range(machineNums)])/machineNums

# 构建QUBO模型
qubo_model = kw.qubo.QuboModel()
qubo_model.set_objective(obj)

# 添加约束
for j in range(taskNums):
    qubo_model.add_constraint(
        (1 - kw.qubo.quicksum(X[j,i] for i in range(machineNums))) == 0,
        f"c{j}",
        penalty=penalty
    )

4.2 求解器配置

python
# 模拟退火参数设置
optimizer = kw.classical.SimulatedAnnealingOptimizer(
    initial_temperature=1e8,
    alpha=0.99,
    cutoff_temperature=0.001,
    iterations_per_t=2500
)

# 循环控制器
pm_controller = kw.common.SolverLoopController(max_repeat_step=5)

# 构建求解器
solver = kw.solver.PenaltyMethodSolver(optimizer, pm_controller)
sol_dict, _ = solver.solve_qubo(qubo_model)

5. 运行结果分析

5.1 典型输出示例

duration array:  [1 2 3 4 5 6 7 8 9 1 2 3 4 5 6 7 8 9 1 2]
machine_start_time:  [1 2 3 4 5]
所有机器结束时间: [55. 54. 53. 52. 55.]
最终结果:55
方差:1.04

5.2 结果解读

  • Makespan:55时间单位,即最晚完成机器的结束时间
  • 方差:1.04,显示各机器负载高度平衡
  • 约束验证:所有任务均被分配到唯一机器

5.3 可视化分析

python
import matplotlib.pyplot as plt

plt.figure(figsize=(10,6))
plt.bar(range(machineNums), J_end)
plt.xlabel('Machine ID')
plt.ylabel('Completion Time')
plt.title('Machine Workload Distribution')
plt.show()

6. 方法与参数调优建议

6.1 惩罚系数选择

  • 过低惩罚(<40):约束无法满足
  • 过高惩罚(>1e4):目标函数被淹没
  • 经验范围:本问题建议在45-200之间调整

6.2 模拟退火参数优化

参数推荐值作用
初始温度1e6-1e8避免陷入局部最优
降温系数0.95-0.995平衡收敛速度与精度
每温度迭代次数1000-5000保证充分搜索

6.3 量子优化替代方案

python
# 切换为CIM光量子计算机
q_optimizer = kw.cim.CIMOptimizer(user_name='test',
                                      password='absd1232',
                                      cim_task_manager_domain='local',
                                      project_no="abc123")
solver = kw.solver.PenaltyMethodSolver(q_optimizer, pm_controller)

7. 扩展与应用场景

7.1 复杂约束扩展

  • 任务优先级:添加顺序约束项
  • 机器容量限制:引入新的约束条件
  • 动态调度:实时更新任务队列

7.2 行业应用

  1. 智能制造:优化工厂生产排程
  2. 云计算:虚拟机任务调度
  3. 物流运输:配送路径规划

7.3 性能提升方向

  • 分层求解:将问题分解为分配子问题和排序子问题
  • 混合编码:结合整数变量和连续变量
  • 量子经典混合:使用CIM等量子算法处理核心计算

结语

本案例展示了如何将经典调度问题转化为QUBO模型,并通过PenaltyMethodSolver进行高效求解。该方法兼具量子计算的发展潜力与经典算法的实用性,为复杂调度问题提供了新的解决思路。随着量子计算硬件的进步,此类混合算法将在工业优化领域发挥更大价值。

附:完整代码

python
import os
import sys
import numpy as np

base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(base_dir, 'src/com/qboson'))
import kaiwu as kw


class CIMSolver:
    def prepareCIMMatrix(self, taskNums, machineNums, lambda1=10000):
        # np.random.seed(10)

        self.taskNums = taskNums
        self.machineNums = machineNums
        print('TaskNums:', self.taskNums)
        print('MachineNums:', self.machineNums)

        self.duration = np.array([time % 10 if time % 10 else 1 for time in range(1, self.taskNums + 1)])

        self.machine_start_time = np.array(range(1, self.machineNums + 1))

        self.X = kw.qubo.ndarray((taskNums, machineNums), "X", kw.qubo.binary)
        J_mean = (np.sum(self.duration) + np.sum(self.machine_start_time)) / self.machineNums

        J = [self.machine_start_time[i] + self.duration.dot(self.X[:, i]) for i in range(self.machineNums)]

        obj = kw.qubo.quicksum([(J[i] - J_mean) ** 2 for i in range(self.machineNums)]) / self.machineNums

        self.model = kw.qubo.QuboModel()
        self.model.set_objective(obj)

        for j in range(self.taskNums):
            self.model.add_constraint((1 - kw.qubo.quicksum([self.X[j][i] for i in range(self.machineNums)])) == 0,
                                      f"c{j}", penalty=lambda1)

        self.obj_ising = self.model.to_ising_model()
        self.matrix = self.obj_ising.get_ising()["ising"]

    def showResultDetail(self, sol_dict):
        unsatisfied_count, result_dict = self.model.verify_constraint(sol_dict)
        print('约束项不满足的个数:', unsatisfied_count)
        out_array = np.zeros([self.taskNums, self.machineNums])

        for key in sol_dict:
            m_index = int(key.split('[')[1].split(']')[0])
            n_index = int(key.split('[')[2].split(']')[0])
            out_array[m_index, n_index] = sol_dict[key]

        for y in range(self.machineNums):
            tmp = []
            for x in range(self.taskNums):
                if out_array[x, y] == 1:
                    tmp.append(x)
            # print('MACHINE:', y, 'TASK:', tmp)

        J_end = np.zeros(self.machineNums)
        for i in range(self.machineNums):
            ifturn_temp = kw.qubo.get_val(kw.qubo.quicksum(self.X[:, i]), sol_dict)
            ifturn = 1 if ifturn_temp > 0 else 0
            J_temp = self.duration.dot(self.X[:, i]) + self.machine_start_time[i] * ifturn
            J_end[i] = kw.qubo.get_val(J_temp, sol_dict)

        print("duration array: ", self.duration)
        print("machine_start_time: ", self.machine_start_time)
        print("所有机器结束时间:", J_end)
        print('最终结果:{}'.format(np.max(J_end)))
        print('方差:', np.var(J_end))
        return 0, np.max(J_end)  # todo: tmp

    def showResultDetail1(self, sol_dict):

        J_end = np.zeros(self.machineNums)
        for i in range(self.machineNums):
            ifturn_temp = kw.qubo.get_val(kw.qubo.quicksum(self.X[:, i]), sol_dict)
            ifturn = 1 if ifturn_temp > 0 else 0
            J_temp = self.duration.dot(self.X[:, i]) + self.machine_start_time[i] * ifturn
            J_end[i] = kw.qubo.get_val(J_temp, sol_dict)

        print("duration array: ", self.duration)
        print("machine_start_time: ", self.machine_start_time)
        print("所有机器结束时间:", J_end)
        print('最终结果:{}'.format(np.max(J_end)))
        print('方差:', np.var(J_end))
        return 0, np.max(J_end)  # todo: tmp


if __name__ == '__main__':

    kw.license.init(user_id="3098067335119083", sdk_code="7HUicCDbmhepmuoOt4uf5U15xO4rJN")

    kw.common.set_log_level("INFO")

    taskNums = 20
    machineNums = 5
    penalty = 45
    print('TaskNums:', taskNums)
    print('MachineNums:', machineNums)

    duration = np.array([time % 10 if time % 10 else 1 for time in range(1, taskNums + 1)])
    machine_start_time = np.array(range(1, machineNums + 1))
    X = kw.qubo.ndarray((taskNums, machineNums), "X", kw.qubo.binary)
    J_mean = (np.sum(duration) + np.sum(machine_start_time)) / machineNums
    J = [machine_start_time[i] + duration.dot(X[:, i]) for i in range(machineNums)]
    obj = kw.qubo.quicksum([(J[i] - J_mean) ** 2 for i in range(machineNums)]) / machineNums
    qubo_model = kw.qubo.QuboModel()
    qubo_model.set_objective(obj)
    for j in range(taskNums):
        qubo_model.add_constraint((1 - kw.qubo.quicksum([X[j][i] for i in range(machineNums)])) == 0,
                                  f"c{j}", penalty=penalty)

    optimizer = kw.classical.SimulatedAnnealingOptimizer(initial_temperature=1e8,
                                                         alpha=0.99,
                                                         cutoff_temperature=0.001,
                                                         iterations_per_t=2500)
    pm_controller = kw.common.SolverLoopController(max_repeat_step=5)
    solver = kw.solver.PenaltyMethodSolver(optimizer, pm_controller)
    sol_dict, _ = solver.solve_qubo(qubo_model)

    if sol_dict is not None:
        unsatisfied_count, result_dict = qubo_model.verify_constraint(sol_dict)
        print('约束项不满足的个数:', unsatisfied_count)
        out_array = np.zeros([taskNums, machineNums])

        for key in sol_dict:
            m_index = int(key.split('[')[1].split(']')[0])
            n_index = int(key.split('[')[2].split(']')[0])
            out_array[m_index, n_index] = sol_dict[key]

        for y in range(machineNums):
            tmp = []
            for x in range(taskNums):
                if out_array[x, y] == 1:
                    tmp.append(x)

        J_end = np.zeros(machineNums)
        for i in range(machineNums):
            #ifturn_temp = kw.qubo.get_val(kw.qubo.quicksum(X[:, i].tolist()), sol_dict)
            ifturn_temp = kw.core.get_val(X[:, i].sum(), sol_dict)
            ifturn = 1 if ifturn_temp > 0 else 0
            J_temp = duration.dot(X[:, i]) + machine_start_time[i] * ifturn
            J_end[i] = kw.core.get_val(J_temp, sol_dict)

        print("duration array: ", duration)
        print("machine_start_time: ", machine_start_time)
        print("所有机器结束时间:", J_end)
        print('最终结果:{}'.format(np.max(J_end)))
        print('方差:', np.var(J_end))

基于 MIT 许可发布