Планирование выполнения заданий#
Когда задания поступают на целевой узел, они передаются в пул потоков и планируются к выполнению в произвольном порядке. Порядок выполнения заданий можно изменить в настройках интерфейса CollisionSpi. С его помощью можно контролировать план обработки заданий на каждом узле.
В DataGrid есть несколько реализаций интерфейса CollisionSpi:
FifoQueueCollisionSpi— упорядочение по FIFO («первым пришел — первым ушел») в нескольких потоках. Реализация по умолчанию.PriorityQueueCollisionSpi— упорядочение по приоритетам.JobStealingFailoverSpi— реализация для включения перехвата заданий. Подробнее о нем написано в разделе «Балансировка нагрузки».
Чтобы включить конкретный Collision SPI, измените свойство IgniteConfiguration.collisionSpi.
Упорядочение по FIFO#
В реализации FifoQueueCollisionSpi задания выполняются в порядке поступления в нескольких потоках. Количество потоков настраивается с помощью параметра parallelJobsNumber. Значение по умолчанию превышает количество ядер процессоров в 2 раза.
XML:
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="collisionSpi">
<bean class="org.apache.ignite.spi.collision.fifoqueue.FifoQueueCollisionSpi">
<!-- Выполняйте по одному заданию за раз. -->
<property name="parallelJobsNumber" value="1"/>
</bean>
</property>
</bean>
Java:
FifoQueueCollisionSpi colSpi = new FifoQueueCollisionSpi();
// Выполняйте задания последовательно, по одному за раз.
// Для этого установите количество параллельных заданий равным 1.
colSpi.setParallelJobsNumber(1);
IgniteConfiguration cfg = new IgniteConfiguration();
// Переопределите Collision SPI по умолчанию.
cfg.setCollisionSpi(colSpi);
// Запустите узел.
Ignite ignite = Ignition.start(cfg);
Упорядочение по приоритетам#
Чтобы установить приоритеты отдельным заданиям, используйте PriorityQueueCollisionSpi. Задания с более высоким приоритетом выполняются раньше заданий с более низким. Также можно указать количество потоков для обработки заданий.
XML:
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="collisionSpi">
<bean class="org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi">
<!-- При необходимости измените количество параллельных заданий.
Значение по умолчанию превышает количество ядер процессоров в 2 раза. -->
<property name="parallelJobsNumber" value="5"/>
</bean>
</property>
</bean>
Java:
PriorityQueueCollisionSpi colSpi = new PriorityQueueCollisionSpi();
// При необходимости измените количество параллельных заданий.
// Значение по умолчанию превышает количество ядер процессоров в 2 раза.
colSpi.setParallelJobsNumber(5);
IgniteConfiguration cfg = new IgniteConfiguration();
// Переопределите Collision SPI по умолчанию.
cfg.setCollisionSpi(colSpi);
// Запустите узел.
Ignite ignite = Ignition.start(cfg);
Приоритеты по задачам устанавливаются в их сессиях с помощью атрибута grid.task.priority. Если задаче не назначили приоритет, по умолчанию используется значение 0.
Подробнее о сессиях задач написано в разделе MapReduce API.
public class MyUrgentTask extends ComputeTaskSplitAdapter<Object, Object> {
// Автоматическое внедрение сессии задачи.
@TaskSessionResource
private ComputeTaskSession taskSes = null;
@Override
protected Collection<ComputeJob> split(int gridSize, Object arg) {
// Установите высокий приоритет для задачи.
taskSes.setAttribute("grid.task.priority", 10);
List<ComputeJob> jobs = new ArrayList<>(gridSize);
for (int i = 1; i <= gridSize; i++) {
jobs.add(new ComputeJobAdapter() {
@Override
public Object execute() throws IgniteException {
// Здесь нужно написать пользовательскую реализацию.
return null;
}
});
}
// Данные задания будут выполняться с более высоким приоритетом.
return jobs;
}
@Override
public Object reduce(List<ComputeJobResult> results) throws IgniteException {
return null;
}
}