Использование API continuous query#

Сontinuous query (непрерывный запрос) — запрос, который отслеживает изменения данных в кеше. При запуске API continuous query пользователь получает уведомления обо всех изменениях данных, которые попадают в фильтр запросов.

Все события по обновлению данных передаются локальному слушателю, который нужно указать в запросе. API continuous query гарантирует передачу события локальному слушателю однократно (семантика exactly-once).

Чтобы сузить диапазон записей, которые проверяются на наличие обновлений, укажите удаленный фильтр.

Локальный слушатель#

При обновлении кеша (добавлении, удалении или обновлении записи) в локальный слушатель запроса отправляется соответствующее событие для последующих действий со стороны приложения. Локальный слушатель выполняется на узле, который запускает запрос. В данном случае под узлом подразумевается толстый клиент или узел, который выполняет пользовательские вычислительные задачи и сервисы. Подробнее об API continuous query тонких клиентов написано в подразделе «Тонкий клиент Java» раздела «Тонкие клиенты».

Если API continuous query запускается без локального слушателя, сгенерируется исключение.

Java:

IgniteCache<Integer, String> cache = ignite.getOrCreateCache("myCache");

ContinuousQuery<Integer, String> query = new ContinuousQuery<>();

query.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {

    @Override
    public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> events)
        throws CacheEntryListenerException {
        // Обработка события.
    }
});

cache.query(query);

C#/.NET:

class LocalListener : ICacheEntryEventListener<int, string>
{
    public void OnEvent(IEnumerable<ICacheEntryEvent<int, string>> evts)
    {
        foreach (var cacheEntryEvent in evts)
        {
            // Обработка события.
        }
    }
}
public static void ContinuousQueryListenerDemo()
{
    var ignite = Ignition.Start(new IgniteConfiguration
    {
        DiscoverySpi = new TcpDiscoverySpi
        {
            LocalPort = 48500,
            LocalPortRange = 20,
            IpFinder = new TcpDiscoveryStaticIpFinder
            {
                Endpoints = new[]
                {
                    "127.0.0.1:48500..48520"
                }
            }
        }
    });
    var cache = ignite.GetOrCreateCache<int, string>("myCache");

    var query = new ContinuousQuery<int, string>(new LocalListener());

    var handle = cache.QueryContinuous(query);

    cache.Put(1, "1");
    cache.Put(2, "2");
}

C++:

/**
 * Класс слушателя.
 */
template<typename K, typename V>
class Listener : public event::CacheEntryEventListener<K, V>
{
public:
    /**
     * Конструктор по умолчанию.
     */
    Listener()
    {
        // Без операции.
    }

    /**
     * Callback-функция обработки события.
     *
     * @param evts События.
     * @param num Количество событий.
     */
    virtual void OnEvent(const CacheEntryEvent<K, V>* evts, uint32_t num)
    {
        for (uint32_t i = 0; i < num; ++i)
        {
            std::cout << "Queried entry [key=" << (evts[i].HasValue() ? evts[i].GetKey() : K())
                << ", val=" << (evts[i].HasValue() ? evts[i].GetValue() : V()) << ']'
                << std::endl;
        }
    }
};

int main()
{
    IgniteConfiguration cfg;
    cfg.springCfgPath = "/path/to/configuration.xml";

    Ignite ignite = Ignition::Start(cfg);

    Cache<int32_t, std::string> cache = ignite.GetOrCreateCache<int32_t, std::string>("myCache");

    // Создание пользовательского слушателя.
    Listener<int32_t, std::string> listener;

    // Создание API continuous query. 
    continuous::ContinuousQuery<int32_t, std::string> query(MakeReference(listener));

    continuous::ContinuousQueryHandle<int32_t, std::string> handle = cache.QueryContinuous(query);
}

Первичный запрос#

Можно создать первичный запрос, который будет выполняться до регистрации API continuous query в кластере и получения обновления данных. Чтобы указать первичный запрос, используйте метод  ContinuousQuery.setInitialQuery(…​).

Как и scan query (запрос сканирования), API continuous query выполняется с помощью метода query(), который возвращает курсор. Его можно использовать для итерации по результатам первичного запроса (после его установки).

Java:

IgniteCache<Integer, String> cache = ignite.getOrCreateCache("myCache");

ContinuousQuery<Integer, String> query = new ContinuousQuery<>();

// Установка необязательного первичного запроса.
// Запрос вернет записи для ключей больше 10.
query.setInitialQuery(new ScanQuery<>((k, v) -> k > 10));

// Обязательный локальный слушатель событий.
query.setLocalListener(events -> {
...
});

try (QueryCursor<Cache.Entry<Integer, String>> cursor = cache.query(query)) {
    // Итерация по записям, которые вернул первичный запрос.
    for (Cache.Entry<Integer, String> e : cursor)
        System.out.println("key=" + e.getKey() + ", val=" + e.getValue());
}

C++:

Cache<int32_t, std::string> cache = ignite.GetOrCreateCache<int32_t, std::string>("myCache");

// Пользовательский слушатель.
Listener<int32_t, std::string> listener;

// Объявление API continuous query.
continuous::ContinuousQuery<int32_t, std::string> query(MakeReference(listener));

// Объявление необязательного первичного запроса.
ScanQuery initialQuery = ScanQuery();

continuous::ContinuousQueryHandle<int32_t, std::string> handle = cache.QueryContinuous(query, initialQuery);

// Итерация по существующим данным, которые хранятся в кеше.
QueryCursor<int32_t, std::string> cursor = handle.GetInitialQueryCursor();

while (cursor.HasNext())
{
    std::cout << cursor.GetNext().GetKey() << std::endl;
}

Удаленный фильтр (Remote filter)#

Фильтр выполняется для каждого обновленного ключа и оценивает, нужно ли передавать информацию об обновлении локальному слушателю запроса. Если фильтр возвращает true, слушателю передается информация о событии.

Из соображений избыточности фильтр выполняется и для основных, и для резервных версий ключа (если настроены резервные копии партиций). Удаленный фильтр можно использовать как удаленный слушатель для событий обновления данных.

Java:

ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();

qry.setLocalListener(events ->
    events.forEach(event -> System.out.format("Entry: key=[%s] value=[%s]\n", event.getKey(), event.getValue()))
);

qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() {
    @Override
    public CacheEntryEventFilter<Integer, String> create() {
        return new CacheEntryEventFilter<Integer, String>() {
            @Override
            public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
                System.out.format("the value for key [%s] was updated from [%s] to [%s]\n", e.getKey(), e.getOldValue(), e.getValue());
                return true;
            }
        };
    }
});

C#/.NET:

class LocalListener : ICacheEntryEventListener<int, string>
{
    public void OnEvent(IEnumerable<ICacheEntryEvent<int, string>> evts)
    {
        foreach (var cacheEntryEvent in evts)
        {
            // Обработка событий.
        }
    }
}
class RemoteFilter : ICacheEntryEventFilter<int, string>
{
    public bool Evaluate(ICacheEntryEvent<int, string> e)
    {
        if (e.Key == 1)
        {
            return false;
        }
        Console.WriteLine("the value for key {0} was updated from {1} to {2}", e.Key, e.OldValue, e.Value);
        return true;
    }
}
public static void ContinuousQueryFilterDemo()
{
    var ignite = Ignition.Start(new IgniteConfiguration
    {
        DiscoverySpi = new TcpDiscoverySpi
        {
            LocalPort = 48500,
            LocalPortRange = 20,
            IpFinder = new TcpDiscoveryStaticIpFinder
            {
                Endpoints = new[]
                {
                    "127.0.0.1:48500..48520"
                }
            }
        }
    });
    var cache = ignite.GetOrCreateCache<int, string>("myCache");

    var query = new ContinuousQuery<int, string>(new LocalListener(), new RemoteFilter());

    var handle = cache.QueryContinuous(query);

    cache.Put(1, "1");
    cache.Put(2, "2");
}

C++:

template<typename K, typename V>
struct RemoteFilter : event::CacheEntryEventFilter<int32_t, std::string>
{
    /**
     * Конструктор по умолчанию.
     */
    RemoteFilter()
    {
        // Без операции.
    }

    /**
     * Деструктор.
     */
    virtual ~RemoteFilter()
    {
        // Без операции.
    }

    /**
     * Callback-функция обработки события.
     *
     * @param event Событие.
     * @return Принимает значение `true`, если событие проходит фильтр.
     */
    virtual bool Process(const CacheEntryEvent<K, V>& event)
    {
        std::cout << "The value for key " << event.GetKey() <<
            " was updated from " << event.GetOldValue() << " to " << event.GetValue() << std::endl;
        return true;
    }
};

namespace ignite
{
    namespace binary
    {
        template<>
        struct BinaryType< RemoteFilter<int32_t, std::string> >
        {
            static int32_t GetTypeId()
            {
                return GetBinaryStringHashCode("RemoteFilter<int32_t,std::string>");
            }

            static void GetTypeName(std::string& dst)
            {
                dst = "RemoteFilter<int32_t,std::string>";

            }

            static int32_t GetFieldId(const char* name)
            {
                return GetBinaryStringHashCode(name);
            }

            static bool IsNull(const RemoteFilter<int32_t, std::string>&)
            {
                return false;
            }

            static void GetNull(RemoteFilter<int32_t, std::string>& dst)
            {
                dst = RemoteFilter<int32_t, std::string>();
            }

            static void Write(BinaryWriter& writer, const RemoteFilter<int32_t, std::string>& obj)
            {
                // Без операции.
            }

            static void Read(BinaryReader& reader, RemoteFilter<int32_t, std::string>& dst)
            {
                // Без операции.
            }
        };
    }
}

int main()
{
    IgniteConfiguration cfg;
    cfg.springCfgPath = "/path/to/configuration.xml";

    // Запуск узла.
    Ignite ignite = Ignition::Start(cfg);

    IgniteBinding binding = ignite.GetBinding();

    // Регистрация удаленного фильтра.
    binding.RegisterCacheEntryEventFilter<RemoteFilter<int32_t, std::string>>();

    // Кеш.
    Cache<int32_t, std::string> cache = ignite.GetOrCreateCache<int32_t, std::string>("myCache");

    // Регистрация пользовательского слушателя.
    Listener<int32_t, std::string> listener;

    // Объявление удаленного фильтра.
    RemoteFilter<int32_t, std::string> filter;

    // Объявление API continuous query.
    continuous::ContinuousQuery<int32_t, std::string> qry(MakeReference(listener), MakeReference(filter));
}

Внимание

Перед использованием удаленных фильтров убедитесь, что определения класса фильтров доступны на серверных узлах:

  • добавьте классы в classpath каждого серверного узла;

  • или включите функцию загрузки классов — подробнее о ней написано в подразделе Peer Class Loading раздела «Развертывание кода».

Преобразование результатов с помощью Remote transformer#

По умолчанию API continuous query отправляет весь обновленный объект данных локальному слушателю. Это может привести к большой загрузке сети (в зависимости от размера объекта). При этом приложениям часто не нужен весь объект и требуется только подмножество его полей.

Чтобы решить эту проблему, используйте API continuous query с преобразованием записей. Преобразователь (Remote transformer) — функция, которая выполняется на удаленных узлах для каждого обновленного объекта и возвращает только результаты преобразования.

Java:

IgniteCache<Integer, Person> cache = ignite.getOrCreateCache("myCache");

// API continuous query с преобразователем.
ContinuousQueryWithTransformer<Integer, Person, String> qry = new ContinuousQueryWithTransformer<>();

// Фабрика для создания преобразователей.
Factory factory = FactoryBuilder.factoryOf(
    // Вернет одно поле сложного объекта.
    // Только это поле передается в локальный слушатель событий.
    (IgniteClosure<CacheEntryEvent, String>)
        event -> ((Person)event.getValue()).getName()
);

qry.setRemoteTransformerFactory(factory);

// Слушатель, который получит преобразованные данные.
qry.setLocalListener(names -> {
    for (String name : names)
        System.out.println("New person name: " + name);
});

Внимание

Перед использованием преобразования записей убедитесь, что классы фабрики и реализация Remote transformer доступны на серверных узлах:

  • добавьте классы в classpath каждого серверного узла;

  • или включите функцию загрузки классов — подробнее о ней написано в подразделе Peer Class Loading раздела «Развертывание кода».

Гарантии доставки событий#

API continuous query передает события локальным слушателям клиентов однократно (семантика exactly-once).

Основные (primary) и резервные (backup) узлы обеспечивают очередь обновлений — она содержит обработанные API continuous query события, которые еще не доставили клиентам. Если происходит сбой основного узла или меняется топология кластера, каждый резервный узел отправляет содержимое данной очереди клиентам и следит за тем, чтобы все события доставили локальному слушателю клиента.

Каждая партиция DataGrid содержит специальный счетчик обновлений (update counter), который позволяет избежать дублирования уведомлений. Когда запись в партиции обновилась, счетчик для этой партиции увеличится на основных и резервных узлах. Значение счетчика также отправится клиентскому узлу вместе с уведомлением о событии. Таким образом клиент может пропускать уже обработанные события. Как только клиент подтверждает получение события, основные и резервные узлы удаляют запись о нем из своих очередей.