Руководство прикладного разработчика#
Термины и определения#
Термины:
Термин |
Определение |
|---|---|
Докат |
Попытка исполнения задания |
Задание |
Операция, исполнение которой может быть запланировано |
Интенсивность выдачи заданий на докат |
Функциональность, ограничивающая количество заданий, выдаваемых на исполнение, в промежуток времени |
Конфигурация |
Совокупность параметров однократного типа задания |
Однократное задание |
Вид задания, позволяющий выполнять операцию до тех пор, пока она не будет успешно обработана, или не будет превышено количество попыток доката |
Периодическое задание |
Вид задания, позволяющий выполнять операцию регулярно по расписанию |
Тип задания |
Наименование группы заданий, которая идентифицирует исполняемую операцию |
Экспоненциальный докат |
Функциональность, позволяющая настраивать время между попытками доката задания |
Сокращения:
Сокращение |
Расшифровка |
|---|---|
КМ |
Клиентский модуль |
K8s |
Kubernetes |
OSE |
OpenShift Enterprise |
Системные требования#
Системные требования смотрите в документе «Руководство по установке», в разделе «Системные требования».
Подключение и конфигурирование#
Определение вида задания#

Описание объекта задания типа UUID - задание, параметром которого является идентификатор обрабатываемого объекта (используется, когда для процесса обработки задания необходим только идентификатор объекта).
Имя параметра
Тип
Описание
Ограничения
taskType
String
Тип задания. Строковая константа, однозначно определяющая тип задания. По этому значению будет выбираться обработчик
Обязательно, длина не более 64 символов
processedObjectUUID
String
Уникальный идентификатор обрабатываемого объекта
Обязательно, длина не более 64 символов
scheduledStartTime
Long
Плановое время обработки в миллисекундах, прошедших с начала UNIX epoch
Необязательно
clientId
String
Идентификатор клиента в (при наличии клиентской сессии при регистрации задания можно не передавать в явном виде, Platform V Asynchronous Tasks возьмет из сессии)
Обязательно, если replicateAndDoNotExecuteFromSI=true
originClientId
String
Идентификатор клиента в родном блоке (при наличии клиентской сессии при регистрации задания можно не передавать в явном виде, Platform V Asynchronous Tasks возьмет из сессии)
Обязательно, если replicateAndDoNotExecuteFromSI=true
clientIdType
String
Тип идентификатора (при наличии клиентской сессии при регистрации задания можно не передавать в явном виде, Platform V Asynchronous Tasks возьмет из сессии)
Обязательно, если replicateAndDoNotExecuteFromSI=true
block
String
Код блока (при наличии клиентской сессии при регистрации задания можно не передавать в явном виде, Platform V Asynchronous Tasks возьмет из сессии)
Обязательно, если replicateAndDoNotExecuteFromSI=true
originBlock
String
Код текущего блока (при наличии клиентской сессии при регистрации задания можно не передавать в явном виде, Platform V Asynchronous Tasks возьмет из сессии)
Обязательно, если replicateAndDoNotExecuteFromSI=true
doNotReplicateFromSI
boolean
Флаг - не реплицировать из Stand-In блока (по умолчанию false)
Необязательно
replicateAndDoNotExecuteFromSI
boolean
Флаг - строго реплицировать в основной блок из Stand-In и не исполнять в Stand-In (по умолчанию false)
Необязательно
trackingId
String
Идентификатор для отслеживания отложенного задания (заполняется КМ Platform V Asynchronous Tasks)
Необязательно
taskVersion
String
Версия задания
Необязательно, длина не более 64 символов
taskMetaInfo
Map<String,Object>
Метаинформация о задании
Необязательно, длина не более 2000 символов
placementTenantCode
String
Код потребителя (заполняется КМ Platform V Asynchronous Tasks)
Обязательно
initiatorTenantCode
String
Код потребителя инициатора (для случаев, когда потребителю, размещающему задание в Platform V Asynchronous Tasks из одного тенанта, необходимо пометить то, что данная задача была инициирована из другого тенанта)
Необязательно
Описание объекта задания типа INFO - задание, параметром которого является строка 2000 символов (используется, когда для процесса обработки задания необходимы дополнительные параметры помимо или вместо идентификатора объекта).
Имя параметра
Тип
Описание
Ограничения
taskType
String
Тип задания. Строковая константа, однозначно определяющая тип задания. По этому значению будет выбираться обработчик
Обязательно, длина не более 64 символов
processedObjectInfo
String
Метаданные (описание обрабатываемого объекта), необходимые процессу обработки
Обязательно, длина не более 2000 символов
scheduledStartTime
Long
Плановое время обработки в миллисекундах, прошедших с начала UNIX epoch
Необязательно
clientId
String
Идентификатор клиента (при наличии клиентской сессии при регистрации задания можно не передавать в явном виде, Platform V Asynchronous Tasks возьмет из сессии)
Обязательно, если replicateAndDoNotExecuteFromSI=true
originClientId
String
Идентификатор клиента в родном блоке (при наличии клиентской сессии при регистрации задания можно не передавать в явном виде, Platform V Asynchronous Tasks возьмет из сессии)
Обязательно, если replicateAndDoNotExecuteFromSI=true
clientIdType
String
Тип идентификатора (при наличии клиентской сессии при регистрации задания можно не передавать в явном виде, Platform V Asynchronous Tasks возьмет из сессии)
Обязательно, если replicateAndDoNotExecuteFromSI=true
block
String
Код блока (при наличии клиентской сессии при регистрации задания можно не передавать в явном виде, Platform V Asynchronous Tasks возьмет из сессии)
Обязательно, если replicateAndDoNotExecuteFromSI=true
originBlock
String
Код текущего блока (при наличии клиентской сессии при регистрации задания можно не передавать в явном виде, Platform V Asynchronous Tasks возьмет из сессии)
Обязательно, если replicateAndDoNotExecuteFromSI=true
doNotReplicateFromSI
boolean
Флаг - не реплицировать из Stand-In блока (по умолчанию false)
Необязательно
replicateAndDoNotExecuteFromSI
boolean
Флаг - строго реплицировать в основной блок из Stand-In и не исполнять в Stand-In (по умолчанию false)
Необязательно
trackingId
String
Идентификатор для отслеживания отложенного задания (заполняется КМ Platform V Asynchronous Tasks)
Необязательно
taskVersion
String
Версия задания
Необязательно, длина не более 64 символов
taskMetaInfo
Map<String,Object>
Метаинформация о задании
Необязательно, длина не более 2000 символов
placementTenantCode
String
Код потребителя (заполняется КМ Platform V Asynchronous Tasks)
Обязательно
initiatorTenantCode
String
Код потребителя инициатора (для случаев, когда потребителю, размещающему задание в Platform V Asynchronous Tasks из одного тенанта, необходимо пометить то, что данная задача была инициирована из другого тенанта)
Необязательно
В Platform V Asynchronous Tasks размещение критичной информации (учетные записи, пароли, персональные данные и прочее) запрещено.
В Platform V Asynchronous Tasks реализована поддержка нескольких версий сервиса. При параллельном функционировании нескольких версий Platform V Asynchronous Tasks в одном блоке обработчик потребителя вычитывает задания с любого сервера Platform V Asynchronous Tasks в блоке независимо от того, на каком сервере какой версии они были зарегистрированы. В случае если потребителю необходимо выполнять задания одного taskType на серверах определенных версий, то для версионирования заданий необходимо использовать атрибут taskVersion или заводить новый taskType.
Подключение Platform V Asynchronous Tasks#
Зависимости необходимые для работы клиентского модуля Platform V Asynchronous Tasks#
API Platform V Asynchronous Tasks
<dependency> <groupId>ru.sbrf.ufs.scheduler</groupId> <artifactId>ufs-platform-scheduler-api</artifactId> <version>${scheduler.api.version}</version> </dependency>Starters
<dependencies> <!--PACMAN (CFGA) продукта Platform V Frontend Std (#FS)--> <dependency> <groupId>ru.sbrf.ufs.platform</groupId> <artifactId>ufs-platform-config-spring-boot-starter</artifactId> </dependency> <!-- CORE --> <dependency> <groupId>ru.sbrf.ufs.platform</groupId> <artifactId>environment-spring-boot-starter</artifactId> </dependency> <dependency> <groupId>ru.sbrf.ufs.platform</groupId> <artifactId>rest-app-jersey-spring-boot-starter</artifactId> </dependency> <!-- HTTPCLIENT --> <dependency> <groupId>ru.sbrf.ufs.platform</groupId> <artifactId>httpclient-spring-boot-starter</artifactId> </dependency> <!-- HC --> <dependency> <groupId>ru.sbrf.ufs.healthcheck</groupId> <artifactId>ufs-healthcheck-spring-boot-starter</artifactId> </dependency> <!-- Объединенный мониторинг Unimon (MONA) продукта Platform V Monitor (OPM) --> <!-- optional <dependency> <groupId>ru.sbrf.ufs.monitoring.spring-boot</groupId> <artifactId>monitoring-spring-boot-starter</artifactId> </dependency>--> <dependency> <groupId>ru.sbrf.ufs.platform</groupId> <artifactId>ufs-platform-monitoring-api</artifactId> </dependency> <!-- ЕФС.Stand In (STDE) продукта Platform V Frontend Std (#FS)--> <!-- optional <dependency> <groupId>ru.sbrf.ufs.standin</groupId> <artifactId>ufs-si-spring-boot-starter</artifactId> </dependency>--> <dependency> <groupId>ru.sbrf.ufs.standin</groupId> <artifactId>ufs-si-client-api</artifactId> </dependency> <!-- IM --> <!-- нужен для стартера ЕФС.Stand In (STDE) продукта Platform V Frontend Std (#FS) <dependency> <groupId>ru.sbrf.ufs.integration</groupId> <artifactId>integration-module-spring-boot-starter</artifactId> </dependency>--> </dependencies>Implementation
<dependencies> <!-- CORE --> <dependency> <groupId>ru.sbrf.ufs.platform</groupId> <artifactId>ufs-platform-core</artifactId> </dependency> <dependency> <groupId>ru.sbrf.ufs.platform</groupId> <artifactId>ufs-platform-api</artifactId> </dependency> <dependency> <groupId>ru.sbrf.ufs.platform</groupId> <artifactId>ufs-platform-spring</artifactId> </dependency> <dependency> <groupId>ru.sbrf.ufs.platform</groupId> <artifactId>ufs-platform-json-mapper</artifactId> </dependency> <!-- PACMAN (CFGA) продукта Platform V Frontend Std (#FS) --> <dependency> <groupId>ru.sbrf.ufs.platform</groupId> <artifactId>ufs-platform-config</artifactId> </dependency> <!-- HTTPCLIENT --> <dependency> <groupId>ru.sbrf.ufs.platform</groupId> <artifactId>ufs-platform-httpclient-impl</artifactId> </dependency> <dependency> <groupId>ru.sbrf.ufs.platform</groupId> <artifactId>ufs-platform-httpclient-spring</artifactId> </dependency> <!-- HC --> <dependency> <groupId>ru.sbrf.ufs.healthcheck</groupId> <artifactId>healthcheck-control</artifactId> </dependency> <!-- IM --> <dependency> <groupId>ru.sbrf.ufs.integration</groupId> <artifactId>integration-module-api</artifactId> </dependency> <!-- ЕФС.Stand In (STDE) продукта Platform V Frontend Std (#FS) --> <dependency> <groupId>ru.sbrf.ufs.standin</groupId> <artifactId>ufs-si-client-api</artifactId> </dependency> <dependency> <groupId>ru.sbrf.ufs.standin</groupId> <artifactId>ufs-si-client</artifactId> </dependency> <!-- Объединенный мониторинг Unimon (MONA) продукта Platform V Monitor (OPM) --> <dependency> <groupId>ru.sbrf.ufs.platform</groupId> <artifactId>ufs-platform-monitoring-api</artifactId> </dependency> <dependency> <groupId>ru.sbrf.ufs.monitoring</groupId> <artifactId>monitoring-module</artifactId> </dependency> <dependency> <groupId>ru.sbrf.ufs.monitoring</groupId> <artifactId>monitoring-session</artifactId> </dependency> </dependencies>
Подключение#
registrator-spring-boot-starter - только регистрация заданий.
<dependency> <groupId>ru.sbrf.ufs.scheduler</groupId> <artifactId>task-scheduler-registrator-spring-boot-starter</artifactId> <version>${delayed.background.tasks.version}</version> </dependency>Spring Property:
Имя
Значение
Значение по умолчанию
Описание
ufs.task-scheduler.registrator.client.mode
LOCAL/REMOTE
-
Если задан, то стартер не смотрит на профили Spring. Следовательно, проект может быть запущен на одном профиле, а стартер на другом
ufs.task-scheduler.registrator.client.taskVersion
NONE
Строковый идентификатор версии задания, определяется потребителем произвольно. Используется в случае необходимости одновременной работы разных версий потребителя, в которых используются разные версии клиентского модуля Platform V Asynchronous Tasks
ufs.task-scheduler.registrator.client.enabled
true
Включение/выключение стартера
Стартер запустится в профиле PRODUCTION если:
Задан профиль PRODUCTION/PROD/PROM.
Профиль не задан.
Стартер запустится в профиле DEVELOP если:
Задан профиль DEVELOP/DEV и не задан профиль PRODUCTION.
executor-spring-boot-starter - только обработка однократных и периодических заданий.
<dependency> <groupId>ru.sbrf.ufs.scheduler</groupId> <artifactId>task-scheduler-executor-spring-boot-starter</artifactId> <version>${delayed.background.tasks.version}</version> </dependency>Spring Property:
Имя
Значения
Значение по умолчанию
Описание
ufs.task-scheduler.executor.client.mode
LOCAL/REMOTE
-
Если задан, то стартер не смотрит на профили Spring. Следовательно, проект может быть запущен на одном профиле, а стартер на другом
ufs.task-scheduler.executor.client.threadCount
Integer
10
Размер пула потоков для выполнения прикладных обработчиков заданий
ufs.task-scheduler.executor.client.enabled
true/false
true
Включение/выключение стартера. Если не задан, то стартер смотрит на наличие аннотации @TaskMeta
ufs.task-scheduler.executor.client.use-external-request-template
true/false
false
Использование внешнего RequestTemplate. Для правильной работы необходимо создать bean ru.sbrf.ufs.scheduler.config.TaskSchedulerRequestTemplate (маркерная обертка над шаблоном, т.к. в приложении может использоваться сразу несколько RequestTemplate).
Стартер запустится в профиле PRODUCTION если:
Задан профиль PRODUCTION/PROD/PROM.
Профиль не задан.
Стартер запустится в профиле DEVELOP если:
Задан профиль DEVELOP/DEV и не задан профиль PRODUCTION.
Пример необходимых beans для использования внешнего requestTemplate:
@Configuration @AutoConfigureBefore(ConfigServiceAutoConfiguration.class) public class RequestTemplateConfiguration { @Bean public RequestTemplate requestTemplate() { return RequestTemplateBuilder.builder() .attributeNames() .attributeValues() .build(); } @Bean public TaskSchedulerRequestTemplate taskSchedulerRequestTemplate() { return TaskSchedulerRequestTemplate.of(requestTemplate()); } }
Режим заглушки (stubmode)#
Основная задача этого режима - дать возможность разработчику протестировать поведение обработчика заданий локально.
Как включить: описать spring property (application.yml / application.properties / @PropertySource), для каждого модуля свое название: ufs.task-scheduler.registrator.client.mode=local, ufs.task-scheduler.executor.client.mode=local. Присутствует зависимость от профилей, когда эти свойства не указаны.
Принцип работы:
Все интеграции с бэкендом Platform V Asynchronous Tasks выключены. Регистрация заданий, поиск заданий на исполнение - все операции работают с обычным словарем. Если подключены несколько модулей (внутри одного приложения) в режиме заглушки (регистратор и исполнитель), то все операции работают с одним и тем же словарем.
Ограничения:
не учитывается конфигурация;
при поиске заданий не учитывается режим блока;
нет перерегистрации заданий;
поле scheduledStartTime (указывается при регистрации) не учитывается при исполнении заданий - задание будет выдано на обработку сразу, как только за ним придет обработчик по своему расписанию;
не работает канальная изоляция (поле placementTenantCode) - поиск заданий на обработку осуществляется по паре taskType/taskVersion;
если у вас 2 разных приложения для регистрации и исполнения, то режим заглушки будет работать для каждого приложения отдельно.
Миграция на текущую версию#
Добавлена возможность приостановить выполнение задания, передав результат обработки задания SUSPEND.
ru.sbrf.ufs.scheduler.background.tasks.delayed.api.entities.DelayedBackgroundTaskResultBuilderImpl#createSuspendResult()
Возобновление обработки приостановленного задания будет доступно через консоль администратора Platform V Asynchronous Tasks.
Добавлена возможность передать с результатом обработки задания информацию о причине невыполнения задания.
ru.sbrf.ufs.scheduler.background.tasks.delayed.api.entities.DelayedBackgroundTaskResultBuilderImpl#createSuspendResult(java.lang.String)
ru.sbrf.ufs.scheduler.background.tasks.delayed.api.entities.DelayedBackgroundTaskResultBuilderImpl#createRetryResult(java.lang.Long, java.lang.String)
Длина строки с причиной ограничена, не может быть больше 2000 символов. Причина невыполнения задания будет отображаться в консоли администратора Platform V Asynchronous Tasks.
Быстрый старт#
Для однократных заданий необходимо реализовать сервис регистрации заданий и обработчик однократных заданий, для периодических - обработчик периодических заданий.
Подключение Platform V Asynchronous Tasks#
Executor Starter
<dependency>
<groupId>ru.sbrf.ufs.scheduler</groupId>
<artifactId>task-scheduler-executor-spring-boot-starter</artifactId>
<version>${actual.version}</version>
</dependency>
Registrator Starter
<dependency>
<groupId>ru.sbrf.ufs.scheduler</groupId>
<artifactId>task-scheduler-registrator-spring-boot-starter</artifactId>
<version>${actual.version}</version>
</dependency>
Регистрация задания#
Инжектировать bean Platform V Asynchronous Tasks для регистрации заданий:
@Autowired DelayedBackgroundTaskRegistrationService registrationService;Создать однократное задание:
UUID-based - задание, параметром которого является идентификатор обрабатываемого объекта (используется, когда для процесса обработки задания необходим только идентификатор объекта).
Для создания задания используется DelayedBackgroundTaskLinkImpl#builder(String, String).
public class MyRegistrationClass { @Autowired DelayedBackgroundTaskRegistrationService registrationService; public void myMethodWithUUID() { /* Задание со ссылкой на обрабатываемый объект (UUID). Рекомендуемый способ работы с докатом. */ DelayedBackgroundTaskLink link = DelayedBackgroundTaskLinkImpl .builder("task_scheduler.example.taskWithUUID", "Object1UUID") .withScheduledStartTime(System.currentTimeMillis() + 5000L) .build(); registrationService.registerDelayedBackgroundTask(link); } public void myMethodWithUUIDAndMetaInfo() { DelayedBackgroundTaskLink link = DelayedBackgroundTaskLinkImpl .builder("task_scheduler.example.taskWithUUID", "Object1UUID") .withScheduledStartTime(System.currentTimeMillis() + 5000L) // Добавление параметров осуществляется с помощью методов .withTaskMetaInfoParameter("key", "value") .withTaskMetaInfoParameters(Collections.singletonMap("key", (Object) "value")) .build(); registrationService.registerDelayedBackgroundTask(link); } /* Если у вас не заполнено поле идентификатора документа в платформенном RequestContext, то для сохранения сквозного идентификатора документа необходимо воспользоваться ru.sbrf.ufs.platform.core.env.RequestContext#setOperationHistoryId */ private void setOperationHistoryId() { // нужно обязательно проверять что 32 символа в uuid'e, иначе произойдет ошибка регистрации env.getRequestContext().setOperationHistoryId(UUID.randomUUID().toString().substring(0, 32)); } }INFO-based - задание, параметром которого является строка 2000 символов (используется, когда для процесса обработки задания необходимы дополнительные параметры помимо или вместо идентификатора объекта).
Для создания используйте SimpleDelayedBackgroundTaskImpl#builder(String, String).
public class MyRegistrationClass { @Autowired DelayedBackgroundTaskRegistrationService registrationService; public void myMethodWithINFO() { SimpleDelayedBackgroundTask link = SimpleDelayedBackgroundTaskImpl .builder("task_scheduler.example.taskWithINFO", "Long Metadata String (plain, JSON, XML, YAML, whatever)") .withScheduledStartTime(System.currentTimeMillis() + 5000L) .build(); registrationService.registerSimpleDelayedBackgroundTask(link); } public void myMethodWithINFOAndMetaInfo() { SimpleDelayedBackgroundTask link = SimpleDelayedBackgroundTaskImpl .builder("task_scheduler.example.taskWithINFO", "Long Metadata String (plain, JSON, XML, YAML, whatever)") .withScheduledStartTime(System.currentTimeMillis() + 5000L) // Добавление параметров осуществляется с помощью методов .withTaskMetaInfoParameter("key", "value") .withTaskMetaInfoParameters(Collections.singletonMap("key", (Object) "value")) .build(); registrationService.registerSimpleDelayedBackgroundTask(link); } /* Если у вас не заполнено поле идентификатора документа в платформенном RequestContext, то для сохранения сквозного идентификатора документа необходимо воспользоваться ru.sbrf.ufs.platform.core.env.RequestContext#setOperationHistoryId */ private void setOperationHistoryId() { // нужно обязательно проверять что 32 символа в uuid'e, иначе произойдет ошибка регистрации env.getRequestContext().setOperationHistoryId(UUID.randomUUID().toString().substring(0, 32)); } }
Реализация обработчика заданий#
Прикладные обработчики необходимо объявлять наследниками классов АПИ:
Периодические задания реализуют ru.sbrf.ufs.scheduler.background.tasks.delayed.api.TaskExecutor.
Использовать аннотацию @TaskMeta на классах наследниках: InfoBasedTaskProcessor/UUIDBasedTaskProcessor для однократных заданий и TaskExecutor для периодических заданий.
Атрибуты @TaskMeta
Имя |
Обязателен |
Значение по умолчанию |
Описание |
|---|---|---|---|
taskType |
да |
- |
Тип обрабатываемого задания |
taskVersion |
нет |
«» |
Версия обрабатываемого задания |
Создание прикладного обработчика периодических заданий
Обработчик заданий должен реализовать интерфейс TaskExecutor.
Класс необходимо пометить аннотацией @Component.
Реализация может быть как конечным обработчиком задания, так и промежуточным диспетчером, ориентирующимся на context.getTaskType() - тип сработавшего задания.
Пример реализации для периодических заданий:
@TaskMeta(taskType = "task_scheduler.example.periodic_task") @Component public class PeriodicTaskExecutor implements TaskExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(PeriodicTaskExecutor.class); @Override public TaskExecutionResult execute(TaskExecutionContext context) { LOGGER.info("I'm worked!"); return new TaskExecutionResultBuilderImpl(context).withStatus(ExecutionStatus.COMPLETED).build(); } }Создание прикладного обработчика однократных заданий

UUID-based - Однократные задания со ссылкой на объект наследуются от: ru.sbrf.ufs.scheduler.background.tasks.delayed.client.services.UUIDBasedTaskProcessor
. Пример реализации обработчика для заданий с UUID:
@TaskMeta(taskType = "task_scheduler.example.taskWithUUID") @Component public class UUIDBasedTaskProcessorExample extends UUIDBasedTaskProcessor<Document> { private static final Logger LOGGER = LoggerFactory.getLogger(UUIDBasedTaskProcessorExample.class); @Override public Document findProcessedObject(String processedObjectUUID) { return workTypeService.getById(Long.parseLong(processedObjectUUID)); } @Override public boolean accept(Document processedObject) { double rnd = Math.random() * 2; if (rnd <= 1) { LOGGER.info("Обработка объекта запрещена."); return false; } else { LOGGER.info("Обработка объекта разрешена."); return true; } } @Override public DelayedBackgroundTaskResult createNotAcceptedResult(Document processedObject, Long attempt) { return new DelayedBackgroundTaskResultBuilderImpl().createRetryResult(System.currentTimeMillis() + 20_000L); } @Override public DelayedBackgroundTaskResult process(Document processedObject, Long attempt) { double rnd = Math.random() * 2; DelayedBackgroundTaskResultBuilder resultBuilder = new DelayedBackgroundTaskResultBuilderImpl(); if (rnd <= 1) { LOGGER.info("Успешно выполненное задание"); return resultBuilder.createFinishResult(); } else { LOGGER.info("Не успешно выполненное задание"); return resultBuilder.createRetryResult(20_000L); } } @Override public long getDefaultTimeShift() { return 5_000L; } }INFO-based - Однократные задания с метаданными наследуются от: ru.sbrf.ufs.scheduler.background.tasks.delayed.client.services.INFOBasedTaskProcessor
. Пример реализации обработчика для заданий с INFO:
@TaskMeta(taskType = "task_scheduler.example.taskWithINFO") @Component public class UUIDBasedTaskProcessorExample extends InfoBasedTaskProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(UUIDBasedTaskProcessorExample.class); @Override public DelayedBackgroundTaskResult process(String processedObject, Long attempt) { double rnd = Math.random() * 2; DelayedBackgroundTaskResultBuilder resultBuilder = new DelayedBackgroundTaskResultBuilderImpl(); if (rnd <= 1) { LOGGER.info("Успешно выполнение задание"); return resultBuilder.createFinishResult(); } else { LOGGER.info("Не успешно выполненное задание"); return resultBuilder.createRetryResult(20_000L); } } @Override public long getDefaultTimeShift() { return 5000L; } }Пример работы с параметрами task MetaInfo в обработчиках
@TaskMeta(taskType = "task_scheduler.example.taskWithUUID") @Component public class UUIDBasedTaskProcessorExample extends UUIDBasedTaskProcessor<Document> { private static final Logger LOGGER = LoggerFactory.getLogger(UUIDBasedTaskProcessorExample.class); {...} @Override public DelayedBackgroundTaskResult process(Document processedObject, Long attempt) { // Параметры из TASK_META_INFO передаются обратно на сторону клиента в виде мапы, получить доступ к которой можно, вызвав метод // ru.sbrf.ufs.scheduler.background.tasks.delayed.client.services.TaskProcessor#getTaskMetaInfo Map<String, Object> taskMetaInfo = getTaskMetaInfo(); // Для получения параметров необходимо указать ключ используемый при регистрации задания с заполнением данной информации taskMetaInfo.get("key") return new DelayedBackgroundTaskResultBuilderImpl().createFinishResult(); } @Override public long getDefaultTimeShift() { return 5000L; } }
Настройка параметров#
Для регистрации заданий:
Наименование
Источник ресурса
Вид задания
Обязательный
Описание
Значение по умолчанию
Тип
Разрез
platform.task.scheduler.rest.client.timeout
SUP
Однократные
Нет
Таймаут соединения (в мс.)
3000, где 0 - бесконечный таймаут
LONG
SUBSYSTEM CHANNEL
platform.task.scheduler.rest.client.connection.timeout
SUP
Однократные
Нет
Таймаут на установку соединения (в мс.)
500, где 0 - бесконечный таймаут
LONG
SUBSYSTEM CHANNEL
platform.task.scheduler.rest.client.max.total.connections
SUP
Однократные
Нет
Максимальное число соединений в пуле
10
LONG
SUBSYSTEM CHANNEL
platform.task.scheduler.rest.client.max.per.route
SUP
Однократные
Нет
Максимальное число соединений на URL
'platform.errors.rest.client.max.total.connections'
LONG
SUBSYSTEM CHANNEL
platform.task.scheduler.rest.client.header.server.ip
SUP
Однократные
Нет
Имя http-заголовка для передачи ip-адреса клиента
ufs-client-ip
STRING
SUBSYSTEM CHANNEL
Для обработки заданий:
Наименование
Источник ресурса
Вид задания
Обязательный
Описание
Значение по умолчанию
Тип
Разрез
<тип задания>
SUP
Однократные и периодические
Да
Расписание по выполнению заданий. Ключом является тип задания. Значение - строка, содержащая расписание в виде CRON-выражения (например, «0/20 * * * * ?») или интервала в секундах (например, «20»). Рекомендуем использовать именно интервал в секундах для равномерного распределения заданий на сервера-обработчики
10 минут
STRING
SUBSYSTEM CHANNEL
task_scheduler.standin.modes.<тип задания>
SUP
Однократные и периодические
Нет
Список режимов Stand-In, в которых возможна обработка этого типа заданий. Пустой список означает, что обработка доступна в любом режиме
Пустой список
List STRING
SUBSYSTEM CHANNEL
task_scheduler.schedule_reloader
SUP
Однократные и периодические
Нет
Задание по периодическому перечитыванию расписания с целью актуализации. Значение - строка, содержащая расписание в виде CRON-выражения (например, «0/20 * * * * ?») или интервала в секундах (например, «20»)
10 минут
STRING
SUBSYSTEM CHANNEL
task_scheduler.topology_info_reloader
SUP
Однократные
Нет
Задание по периодическому перечитыванию информации о топологии c целью актуализации. Значение - строка, содержащая расписание в виде интервала в секундах (например, «900»)
15 минут
STRING
SUBSYSTEM CHANNEL
task_scheduler.standin.modes.task_scheduler.topology_info_reloader
SUP
Однократные
Нет
Список режимов Stand-In, при которых работает task_scheduler.topology_info_reloader
NORMAL, INERT, PASSIVE_STANDIN, PASIIVE_INERT
List STRING
SUBSYSTEM CHANNEL
task_scheduler.single_task.max.tasks.<тип задания>
SUP
Однократные
Нет
Максимальное количество однократных заданий, выбираемых из БД для одного цикла движка однократных заданий
20
LONG
SUBSYSTEM CHANNEL
task_scheduler.single_task.max.executors.<тип задания>
SUP
Однократные
Нет
Максимальное количество работающих одновременно на одном узле обработчиков однократных заданий для одного типа заданий. Обратите внимание, что максимальная сумма всех обработчиков по всем вашим типам заданий не должна превышать пула потоков на сфере
10
LONG
SUBSYSTEM CHANNEL
task_scheduler.single_task.max.attempts.<тип задания>
SUP
Однократные
Нет
Максимальное количество попыток обработки одного и того же задания
100
LONG
SUBSYSTEM CHANNEL
platform.task.scheduler.rest.client.timeout
SUP
Однократные
Нет
Таймаут соединения (в мс.)
3000, где 0 - бесконечный таймаут
LONG
SUBSYSTEM CHANNEL
platform.task.scheduler.rest.client.connection.timeout
SUP
Однократные
Нет
Таймаут на установку соединения (в мс.)
500, где 0 - бесконечный таймаут
LONG
SUBSYSTEM CHANNEL
platform.task.scheduler.rest.client.max.total.connections
SUP
Однократные
Нет
Максимальное число соединений в пуле
10
LONG
SUBSYSTEM CHANNEL
platform.task.scheduler.rest.client.max.per.route
SUP
Однократные
Нет
Максимальное число соединений на URL
'platform.errors.rest.client.max.total.connections'
LONG
SUBSYSTEM CHANNEL
platform.task.scheduler.rest.client.header.server.ip
SUP
Однократные
Нет
Имя http-заголовка для передачи ip-адреса клиента
ufs-client-ip
STRING
SUBSYSTEM CHANNEL
Настройка импорта конфигураций через pipeline для однократных заданий#
Файл импорта должен быть включен в поставку каждого потребителя Platform V Asynchronous Tasks и должен содержать конфигурации всех однократных типов заданий, регистрируемых или обрабатываемых потребителем.
Для импорта конфигураций типов заданий необходимо:
создать файл с именем configs.json
поместить файл в папку /package/conf/data/task_scheduler/
при раскатке дистрибутива на стенды через pipeline выбрать playbook: IMPORT_TASK_SCHEDULER_PARAMS
Параметры файла:
параметр |
тип |
описание |
рекомендации по заполнению |
обязательность |
|---|---|---|---|---|
version |
string |
версия схемы |
текущее значение: 2.0 |
+ |
taskType |
string |
тип задания |
учитывается регистр |
+ |
taskVersion |
string |
версия задания |
- |
|
description |
string |
описание типа задания |
такое описание чтобы сотруднику сопровождения было понятно, что это за задание, какую функцию выполняет |
- |
taskProvider |
string |
подсистема, регистрирующая задания |
значение: subsystem подсистемы, регистрирующей задания данного типа, учитывается регистр |
+ |
taskConsumer |
string |
подсистема, обрабатывающая задания |
значение: subsystem подсистемы, обрабатывающей задания данного типа, учитывается регистр |
+ |
maxIntensity |
integer |
максимальное количество заданий, выдаваемых за период maxIntensityPeriod |
для ограничения количества выдаваемых на докат заданий в единицу времени, возможные значения для интервала: «DAY», «HOUR», «MINUTE», «SECOND» |
+ если заполнено поле maxIntensityPeriod |
maxIntensityPeriod |
enum |
период выдачи заданий (в секунду, в минуту, в час, в день) |
+ если заполнено поле maxIntensity |
|
exponentialRetryEnabled |
boolean |
флаг включения использования экспоненциального доката |
для управления интервалами между попытками доката |
- |
ratiosOfExponentialRetry |
string |
список коэффициентов для расчета интервалов между попытками с использованием экспоненциального доката |
коэффициент - целое число, коэффициенты должны быть разделены «;» |
+ если exponentialRetryEnabled=true |
timeLimitRetry |
integer |
предел времени между попытками с использованием экспоненциального доката |
+ если exponentialRetryEnabled=true |
|
task_scheduler.single_task.cancelable |
boolean |
возможность отмены задания в АРМ администратора |
- |
|
task_scheduler.single_task.shift_ignore |
boolean |
запрет на перепланирование задания |
- |
|
task_scheduler.single_task.restart_ignore |
boolean |
запрет на ручную перерегистрацию задания в АРМ Администратора |
- |
В случае отсутствия в файле параметра с типом boolean при записи в БД для него будет установлено значение false.
Работа экспоненциального доката#
Каждый коэффициент применяется к определенной попытке доката, например введены коэффициенты: 2;5;10. Коэффициент 2 будет применен к попытке, где attempt=1, коэффициент 5 - где attempt=2, коэффициент 10 - где attempt=3. В случае если количество коэффициентов меньше значения параметра максимального количества попыток обработки задания, то для тех попыток доката, у которых нет соответствующего коэффициента, будет применяться последний коэффициент.
Расчет планового времени выполнения попытки доката с учетом коэффициентов:
Коэффициент будет применяться к интервалу, рассчитанному как разность между переданным плановым временем исполнения очередной попытки доката и временем БД Platform V Asynchronous Tasks.
Плановое время выполнения попытки будет рассчитано как сумма времени БД и полученного интервала, умноженного на соответствующий коэффициент, причем произведение полученного интервала и коэффициента не будет превышать значения timeLimitRetry.
Необходимо учитывать то, что если плановое время исполнения не передано или оно меньше, чем время БД, то оно переопределяется Platform V Asynchronous Tasks на (текущее время БД + 5000 мс), и в случае использования экспоненциального доката 5000мс будет умножаться на соответствующий коэффициент.
json schema
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"version": {
"type": "string",
"maxLength": 3,
"description": "версия схемы"
},
"configs": {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"properties": {
"taskType": {
"type": "string",
"maxLength": 64,
"description": "тип задания"
},
"taskVersion": {
"type": "string",
"maxLength": 64,
"description": "версия задания"
},
"description": {
"type": "string",
"maxLength": 200,
"description": "описание задания"
},
"taskProvider": {
"type": "string",
"maxLength": 64,
"description": "подсистема, регистрирующая задания"
},
"taskConsumer": {
"type": "string",
"maxLength": 64,
"description": "подсистема, обрабатывающая задания"
},
"maxIntensity": {
"type": "integer",
"minimum": 1,
"maximum": 99999999,
"description": "максимальная интенсивность"
},
"maxIntensityPeriod": {
"enum": [
"DAY",
"HOUR",
"MINUTE",
"SECOND"
],
"description": "период максимальной интенсивности"
},
"exponentialRetryEnabled": {
"type": "boolean",
"description": "флаг включения использования экспоненциального доката"
},
"ratiosOfExponentialRetry": {
"type": "string",
"pattern": "^([1-9]([0-9]*))?((;)([1-9]([0-9]*)))*$",
"maxLength": 200,
"description": "список коэффициентов для расчета интервалов между попытками с использованием экспоненциального доката"
},
"timeLimitRetry": {
"type": "integer",
"minimum": 1,
"description": "предел времени между попытками с использованием экспоненциального доката"
},
"task_scheduler.single_task.cancelable": {
"type": "boolean",
"description": "возможность отмены задания в АРМ администратора"
},
"task_scheduler.single_task.shift_ignore": {
"type": "boolean",
"description": "запрет на перепланирование задания"
}
"task_scheduler.single_task.restart_ignore": {
"type": "boolean",
"description": "запрет на ручную перерегистрацию задания в АРМ Администратора"
}
},
"if": {
"properties": {
"exponentialRetryEnabled": { "const": true }
},
"required": ["exponentialRetryEnabled"]
},
"then": {
"required": [
"ratiosOfExponentialRetry",
"timeLimitRetry"
]
},
"required": [
"taskType"
],
"dependencies": {
"maxIntensity": {
"required": [
"maxIntensityPeriod"
]
},
"maxIntensityPeriod": {
"required": [
"maxIntensity"
]
}
},
"additionalProperties": false
}
}
},
"required": [
"version",
"configs"
],
"additionalProperties": false
}
шаблон configs.json
{
"version":"2.0",
"configs": [
{
"taskType": "tt1",
"taskVersion": "V1",
"description": "Описание tt1 на кириллице",
"taskProvider": "taskProvider",
"taskConsumer": "taskConsumer",
"maxIntensity": 10,
"maxIntensityPeriod": "DAY",
"exponentialRetryEnabled": true,
"ratiosOfExponentialRetry": "1;2;3",
"timeLimitRetry": 1200000,
"task_scheduler.single_task.cancelable": true,
"task_scheduler.single_task.shift_ignore": true,
"task_scheduler.single_task.restart_ignore": true
}
]
}
Проверка корректности работы#
Варианты проверки регистрации однократных заданий#
Наличие записи в логе:
Зарегистрировано задание с параметрами:DelayedBackgroundTaskParametersContainer{taskType=task_scheduler.example.taskWithUUID, ...}Наличие задания при поиске через консоль администратора.
Наличие записи с заданием в таблице БД.
Варианты проверки исполнения задания#
Периодическое задание
Наличие записей в логе:
Регистрация обработчика <package.className> для задания task_scheduler.example.periodic_taskПодписка на задания типа task_scheduler.example.periodic_task завершена успешноСтарт задания. Контекст выполнения: TaskExecutionContextImpl{... , taskType=task_scheduler.example.periodic_task, ....}Задание выполнено. Результат: TaskExecutionResult{executionUuid=... , status= ... , retryingTime=...}. Контекст выполнения: TaskExecutionContextImpl{... , taskType=task_scheduler.example.periodic_task, ...}Однократные задания
UUID-based
Наличие записей в логе:
Регистрация обработчика ru.sbrf.ufs.scheduler.client.impl.SingleTaskExecutionDispatcherImpl для задания task_scheduler.example.taskWithUUIDПодписка на задания типа task_scheduler.example.taskWithUUID завершена успешноНачало обработки задания UUIDBasedDelayedBackgroundTaskJournalItem{... , taskType=task_scheduler.example.taskWithUUID, ...}Выполнение задания UUIDBasedDelayedBackgroundTaskJournalItem{... , taskType=task_scheduler.example.taskWithUUID, ...} закончилось с вердиктом <вами возвращенный результат(FINISH или RETRY)>Задание UUIDBasedDelayedBackgroundTaskJournalItem{... , taskType=task_scheduler.example.taskWithUUID, ...} обработаноПроверка статуса задания через консоль администратора.
Проверка статуса задания в таблице БД.
INFO-based
Наличие записей в логе:
Регистрация обработчика ru.sbrf.ufs.scheduler.client.impl.SingleTaskExecutionDispatcherImpl для задания task_scheduler.example.taskWithINFOПодписка на задания типа task_scheduler.example.taskWithINFO завершена успешноНачало обработки задания InfoBasedDelayedBackgroundTaskJournalItem{... , taskType=task_scheduler.example.taskWithINFO, ...}Выполнение задания InfoBasedDelayedBackgroundTaskJournalItem{... , taskType=task_scheduler.example.taskWithINFO, ... } закончилось с вердиктом <вами возвращенный результат(FINISH или RETRY)>Задание InfoBasedDelayedBackgroundTaskJournalItem{... , taskType=task_scheduler.example.taskWithINFO, ... } обработаноПроверка статуса задания через консоль администратора.
Проверка статуса задания в таблице БД.
Использование программного компонента#
Регистрация задания в Platform V Asynchronous Tasks с последующей обработкой задания в следующих случаях:
Интеграция с внешней системой завершилась неуспешно и требуется ее повторить.
Операцию необходимо выполнить спустя некоторое время.
Выполнение периодического задания при необходимости исполнять одну и ту же операцию с определенным интервалом.
Часто встречающиеся проблемы и пути их устранения#
Зарегистрированное задание не обрабатывается.
Проверить режим работы блока. Выдача заданий на докат происходит только в режимах NORMAL, PASSIVE_STANDIN.
Проверить сколько заданий в очереди на обработку - в статусе WAIT и с меньшим scheduledStartTime. Задания выдаются на докат в порядке очереди с наименьшим scheduledStartTime.
Проверить, работают ли приложения - обработчики заданий.
Проверить конфигурацию обработчика заданий. Если при регистрации задания передается taskVersion, то и обработчик этих заданий должен содержать атрибут taskVersion.
Задания не успевают обрабатываться, происходит накопление заданий в Platform V Asynchronous Tasks.
Необходимо настроить параметры расписания типа задания и размера пачки заданий (task_scheduler.single_task.max.tasks.<тип задания>) в соответствии с необходимой скоростью обработки заданий. Например, расписание 20 секунд и размер пачки 20 заданий обеспечивают скорость обработки 1 tps.
Периодическое задание запускается одновременно на всех серверах.
Platform V Asynchronous Tasks не предоставляет синхронизацию периодических заданий. Это нужно делать своими силами, например через блокировку в БД.
При исполнении задания возникает ошибка Session not bounded to http request. Check auth settings.
Исполнение задания выполняется асинхронно, поэтому в этот момент не существует сессии клиента и ее атрибутов. Необходимо исключить обращение к компоненту Сессионные данные (SUSD) продукта Platform V SessionsData (SUS) при исполнении задания.
При запросах в Platform V Asynchronous Tasks периодически возникает ошибка SocketTimeoutException: Read timed out.
Необходимо увеличить значения параметров platform.task.scheduler.rest.client.timeout и platform.task.scheduler.rest.client.connection.timeout для своего приложения.
Зарегистрированное задание не реплицировалось в основной блок.
Проверить параметры задания: должен быть указан корректный код родного блока и передан флаг репликации (replicateAndDoNotExecuteFromSI:true).
Проверить, включена ли обратная репликация в консоли компонента Репликатор ЕФС.Stand In (STDE) продукта Platform V Frontend Std (#FS).
На 5 попытку исполнения задания настроили бизнес-логику - перевод нашего документа в БД в статус SOME_STATE_2, но 5 попытка исполнения до нас не дошла и документ завис в статусе SOME_STATE_1, что делать?
Задача Platform V Asynchronous Tasks состоит в том, чтобы обеспечить исполнение задания путем доставки нескольких попыток клиенту, но Platform V Asynchronous Tasks не обеспечивает доставку каждой попытки в отдельности. Поэтому нельзя завязывать бизнес-логику на попытку исполнения, т.к. в рамках повторных попыток исполнения задания любая из попыток может быть не получена Клиентским модулем.