技术交流28群

服务热线

135-6963-3175

微信服务号

activiti之引擎作业处理器源码分析 更新时间 2022-3-15 浏览2114次

主要涉及类:一系列JobHandler用于job的处理、JobManager用于job的入库调度激活挂起等操作、

                   asyncExecutor用于job的异步执行(通过线程池实现、和3个死循环的线程用于扫描job表操作调度job)

首先查看引擎配置初始化类ProcessEngineConfigurationImpl的init方法:

其中调用了
 //------------作业处理器相关---------------
initJobHandlers();//作业处理器集合(异步任务、定时触发任务、开始事件、定时挂起激活流程事件等、自定义等处理器)
initJobManager();//job任务管理器(用于创建或调度各种job的操作)
initAsyncExecutor();//异步执行器(基于ThreadPoolExecutor线程池和BlockingQueue阻塞队列)

第一个initJobHandlers(),用于各种任务处理器放入一个map中,方便使用时候通过key类型去获取。

第二个initJobManager(),进行JobManager的初始化,用于异步job、timerJob的创建和调度、挂起等操作。

                                    异步job通过异步执行器去执行。

第三个initAsyncExecutor(),初始化异步执行器(通过线程池实现)

public void initAsyncExecutor() {
    if (asyncExecutor == null) {
      DefaultAsyncJobExecutor defaultAsyncExecutor = new DefaultAsyncJobExecutor();
      // Message queue mode 队列模式 默认false
      defaultAsyncExecutor.setMessageQueueMode(asyncExecutorMessageQueueMode);
      // Thread pool config
      defaultAsyncExecutor.setCorePoolSize(asyncExecutorCorePoolSize);//核心线程数
      defaultAsyncExecutor.setMaxPoolSize(asyncExecutorMaxPoolSize);//最大线程数
      defaultAsyncExecutor.setKeepAliveTime(asyncExecutorThreadKeepAliveTime);//保持存活时间
      // Threadpool queue
      if (asyncExecutorThreadPoolQueue != null) {
        defaultAsyncExecutor.setThreadPoolQueue(asyncExecutorThreadPoolQueue);//阻塞队列
      }
      defaultAsyncExecutor.setQueueSize(asyncExecutorThreadPoolQueueSize);//阻塞队列大小
      // Acquisition wait time
      defaultAsyncExecutor.setDefaultTimerJobAcquireWaitTimeInMillis(asyncExecutorDefaultTimerJobAcquireWaitTime);//设置作业计时器查询间隔默认的等待时间10s
      defaultAsyncExecutor.setDefaultAsyncJobAcquireWaitTimeInMillis(asyncExecutorDefaultAsyncJobAcquireWaitTime);//异步作业查询间隔默认等待时间,默认10s
      // Queue full wait time
      defaultAsyncExecutor.setDefaultQueueSizeFullWaitTimeInMillis(asyncExecutorDefaultQueueSizeFullWaitTime);//队列已满时的等待时间默认0s
      // Job locking
      defaultAsyncExecutor.setTimerLockTimeInMillis(asyncExecutorTimerLockTimeInMillis);//定时执行器锁时间,默认5分钟
      defaultAsyncExecutor.setAsyncJobLockTimeInMillis(asyncExecutorAsyncJobLockTimeInMillis);//异步作业执行器锁时间,默认5分钟
      if (asyncExecutorLockOwner != null) {//执行器锁所有者
        defaultAsyncExecutor.setLockOwner(asyncExecutorLockOwner);
      }
      // Reset expired
      defaultAsyncExecutor.setResetExpiredJobsInterval(asyncExecutorResetExpiredJobsInterval);//重制过期任务间隔
      defaultAsyncExecutor.setResetExpiredJobsPageSize(asyncExecutorResetExpiredJobsPageSize);//重置过期任务查询条数
      // Shutdown
      defaultAsyncExecutor.setSecondsToWaitOnShutdown(asyncExecutorSecondsToWaitOnShutdown);//关闭前等待时间
      asyncExecutor = defaultAsyncExecutor;
    }
    asyncExecutor.setProcessEngineConfiguration(this);
    asyncExecutor.setAutoActivate(asyncExecutorActivate);//是否自动激活,默认true
  }

且在ProcessEngineImpl的构造方法里进行了asyncExecutor线程的启动:

public ProcessEngineImpl(ProcessEngineConfigurationImpl processEngineConfiguration) {
    //服务及配置注入
    this.processEngineConfiguration = processEngineConfiguration;
    this.name = processEngineConfiguration.getProcessEngineName();//默认为:default
    this.repositoryService = processEngineConfiguration.getRepositoryService();
    this.runtimeService = processEngineConfiguration.getRuntimeService();
    this.historicDataService = processEngineConfiguration.getHistoryService();
    this.identityService = processEngineConfiguration.getIdentityService();
    this.taskService = processEngineConfiguration.getTaskService();
    this.formService = processEngineConfiguration.getFormService();
    this.managementService = processEngineConfiguration.getManagementService();
    this.dynamicBpmnService = processEngineConfiguration.getDynamicBpmnService();
    this.asyncExecutor = processEngineConfiguration.getAsyncExecutor();
    this.commandExecutor = processEngineConfiguration.getCommandExecutor();
    this.sessionFactories = processEngineConfiguration.getSessionFactories();
    this.transactionContextFactory = processEngineConfiguration.getTransactionContextFactory();
    this.formEngineRepositoryService = processEngineConfiguration.getFormEngineRepositoryService();
    this.formEngineFormService = processEngineConfiguration.getFormEngineFormService();
    if (processEngineConfiguration.isUsingRelationalDatabase() && processEngineConfiguration.getDatabaseSchemaUpdate() != null) {
      //数据库脚本检查,根据配置的schema策略
      commandExecutor.execute(processEngineConfiguration.getSchemaCommandConfig(), new SchemaOperationsProcessEngineBuild());
    }
    if (name == null) {
      log.info("default activiti ProcessEngine created");
    } else {
      log.info("ProcessEngine {} created", name);
    }
    ProcessEngines.registerProcessEngine(this);//放入引擎缓存map(name:engine实例)
    if (asyncExecutor != null && asyncExecutor.isAutoActivate()) {//异步执行器状态激活,默认true
      asyncExecutor.start();//启动各种线程池和线程
    }
    if (processEngineConfiguration.getProcessEngineLifecycleListener() != null) {//引擎被创建监听器触发
      processEngineConfiguration.getProcessEngineLifecycleListener().onProcessEngineBuilt(this);
    }
    //更新缓存:实例表覆盖原始实例表bpmnModel缓存
    initModel(managementService);
    processEngineConfiguration.getEventDispatcher().dispatchEvent(ActivitiEventBuilder.createGlobalEvent(ActivitiEventType.ENGINE_CREATED));
  }

接下来我们查看DefaultAsyncJobExecutor的start及相关方法源码:

public boolean executeAsyncJob(final Job job) {
    if (isMessageQueueMode) {//使用基于消息队列的作业执行器运行时,这里不执行作业。
      // When running with a message queue based job executor,
      // the job is not executed here.
      return true;
    }
    Runnable runnable = null;
    if (isActive) {
      //基于job创建线程
      runnable = createRunnableForJob(job);
      try {
        executorService.execute(runnable);//放入线程池执行
      } catch (RejectedExecutionException e) {
       ......
     }
}
public void start() {
    if (isActive) {//默认false未激活
       return;
    }
    log.info("Starting up the default async job executor [{}].", getClass().getName());
    if (timerJobRunnable == null) {//timerJob线程(负责查询最新(<=now)一条的timerJob表数据并放入job表(所有线程都起来后也就是isActive=true后会涉及加锁set锁过期时间操作),删除timerJob表)
      timerJobRunnable = new AcquireTimerJobsRunnable(this, processEngineConfiguration.getJobManager());
    }
    if (resetExpiredJobsRunnable == null) {//new 重置过期job线程(查持有锁时间过期的job,并进行锁撤销:
                                            // 6版本:删除旧的job,插入新的job(新的不设置lock过期时间和id,lock持有者等);5版本:直接更新)
      resetExpiredJobsRunnable = new ResetExpiredJobsRunnable(this);
    }
    if (!isMessageQueueMode && asyncJobsDueRunnable == null) {
      asyncJobsDueRunnable = new AcquireAsyncJobsDueRunnable(this);
    }
    if (!isMessageQueueMode) {//非消息队列模式
      initAsyncJobExecutionThreadPool();//初始化异步线程池
      startJobAcquisitionThread();//启动异步可执行作业线程(开始真正的执行)
    }
    startTimerAcquisitionThread();//启动timerJob作业获取线程(作业搬运到job)
    startResetExpiredJobsThread();//启动重置过期job线程
    isActive = true;//设置为异步作业执行器为已激活
    executeTemporaryJobs();//执行临时作业
}
//初始化一个线程池
protected void initAsyncJobExecutionThreadPool() {
    if (threadPoolQueue == null) {
      log.info("Creating thread pool queue of size {}", queueSize);
      threadPoolQueue = new ArrayBlockingQueue<Runnable>(queueSize);
    }
    if (executorService == null) {
      log.info("Creating executor service with corePoolSize {}, maxPoolSize {} and keepAliveTime {}", corePoolSize, maxPoolSize, keepAliveTime);

      BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("activiti-async-job-executor-thread-%d").build();
      executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, threadPoolQueue, threadFactory);
    }
}
//启动异步线程池
protected void startJobAcquisitionThread() {
    if (asyncJobAcquisitionThread == null) {
      asyncJobAcquisitionThread = new Thread(asyncJobsDueRunnable);
    }
    asyncJobAcquisitionThread.start();
}
//启动timerJob线程
protected void startTimerAcquisitionThread() {
    if (timerJobAcquisitionThread == null) {
      timerJobAcquisitionThread = new Thread(timerJobRunnable);
    }
    timerJobAcquisitionThread.start();
}
//启动重置过期job线程
protected void startResetExpiredJobsThread() {
    if (resetExpiredJobThread == null) {
      resetExpiredJobThread = new Thread(resetExpiredJobsRunnable);
    }
    resetExpiredJobThread.start();
}

该类主要职责:

1、进行了线程池的创建和启动、线程放入线程池执行

2、启动AcquireTimerJobsRunnable 线程(timerJob作业搬运到job):

     timerJob线程(负责查询最新(<=now())一条的timerJob表数据并放入job表(所有线程都起来后也就是isActive=true后会涉及加锁set锁过期时间操作),删除timerJob表)

3、启动ResetExpiredJobsRunnable重置过期job线程

    重置过期job线程:(查持有锁时间过期的job,并进行锁撤销:

       6版本:删除旧的job,插入新的job(新的不设置lock过期时间和id,lock持有者等);

       5版本:直接更新)

4、启动AcquireAsyncJobsDueRunnable该线程里开始job真正的执行,通过步骤1异步处理器的线程池去执行线程

        :该步骤在线程池里最终调用了JobManager().execute(job);方法

该方法如下:

public void execute(Job job) {
    if (job instanceof JobEntity) {
      if (Job.JOB_TYPE_MESSAGE.equals(job.getJobType())) {//message
        executeMessageJob((JobEntity) job);
      } else if (Job.JOB_TYPE_TIMER.equals(job.getJobType())) {//timer
        executeTimerJob((JobEntity) job);
      }
      
    } else {
      throw new ActivitiException("Only jobs with type JobEntity are supported to be executed");
    }
  }

之后通过作业处理器集合取到相应的处理器进行了job的执行 。

如下代码:

Map<String, JobHandler> jobHandlers = processEngineConfiguration.getJobHandlers();
JobHandler jobHandler = jobHandlers.get(jobEntity.getJobHandlerType());
jobHandler.execute(jobEntity, jobEntity.getJobHandlerConfiguration(), execution, getCommandContext());

综上:

1、init用于jobHandlerMap、jobManager、asyncExecutor异步处理器的初始化

2、jobManager用于job的创建、挂起、数据库操作等。

3、AsyncExecutor异步处理器通过线程池和多个线程去扫表并进行job的调度,通过线程池去执行,并最终通过jobManager的的execute调用job对应的具体jobHandler去处理执行job。

说明:异步处理器中线程的启动在ProcessEngineImpl构造方法。

待补充....