Конфигурация GRPC egress#

EVTA поддерживает работу с помощью gRPC взаимодействия. Если в конфигурационном файле adapter.conf в поле egressEnabled установлено grpc, то EVTA позволяет обращаться извне к брокерам сообщений при помощи gRPC взаимодействия соответственно.

В конфигурации в блоке egressSubscribe указываются настройки egress, который позволяет вычитывать сообщения извне, а egressPublish, наоборот, отправлять сообщения. Для egressSubscribe в конфигурации должны быть указаны настройки для consumer, при типе egressPublish — для producer.

Настройки для брокера берутся из файла, указанного в urlConfig.

Пример заполненного файла urlConfig.json (путь до него указывается в файле adapter.conf)

{
  "fpss": {
    "Domain": {
      "Federation": {
        "SegmentA": {
          "SystemName": {
            "EventName": {
              "1": {
                "type": "kafka",
                "topic": "EventNameTopic",
                "properties": "/path/to/producer.properties"
              }
            },
            "OtherEventName": {
              "1": {
                "type": "kafka",
                "topic": "OtherEventNameTopic",
                "properties": "/path/to/other_producer.properties"
              }
            }
          }
        }
      }
    }
  }
}

Для настройки gRPC-взаимодействия используется протокол:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "ru.sbt.ss.reactive.stream.proto";
option java_outer_classname = "EventServiceApi";

package ru.sbt.synapse.fpss.adapter.grpc.api;

service PublishService {
    rpc Connect (ConnectRequest) returns (Status) {
    }
    rpc Disconnect (DisconnectRequest) returns (Status) {
    }
    rpc Publish (stream EventWithId) returns (stream StatusWithId) {
    }
}

service SubscribeService {
    rpc Subscribe (EventRequest) returns (stream EventWithId) {
    }
    rpc Commit (CommitRequest) returns (Status) {}
}

message SubscribeSettings {
    string url = 1;
    bool sendMessagesForAllClients = 2;
    string kafkaGroupName = 3;
    Offset offset = 4;
    bool stopAfterEmptyPoll = 5;
}

enum Offset {
    GROUP = 0;
    BEGIN = 1;
    END = 2;
}

message ConnectRequest {
    string url = 1;
}

message DisconnectRequest {
}

message CommitRequest {
    SubscribeSettings subscribeSettings = 1;
    int32 id = 2;
}

message EventWithId {
    Event event = 1;
    int32 id = 2;
}

message Event {
    bytes event = 1;
    bytes eventKey = 2;
    repeated Header additionalHeaders = 3;
}

enum StatusCode {
    FAILED = 0;
    SUCCESS = 1;
}

message Status {
    StatusCode status = 1;
    string message = 2;
}

message StatusWithId {
    Status status = 1;
    int32 id = 2;
}

message EventRequest {
    SubscribeSettings subscribeSettings = 1;
}

message Header {
    string name = 1;
    string value = 2;
}

Для публикации сообщений используются три метода:

  • Connect — подключение к topic для публикации;

  • Disconnect — отключение от topic, в который ранее публиковали сообщения;

  • Publish — непосредственно публикация сообщений, публикация сообщений осуществляется потоком. Возвращает поток сообщений с 2 параметрами: статус сообщения и ID, по которому можно идентифицировать сообщение.

Для отправки gRPC запроса на публикацию необходимо написать код клиента.

Пример кода, написанного АС-клиентом, для осуществления публикации сообщений с использованием gRPC-взаимодействия:

import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NegotiationType;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.sbt.ss.reactive.stream.proto.*;

import javax.net.ssl.KeyManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

public class PublishClient {
    private static final Logger LOG = LoggerFactory.getLogger(PublishClient.class);
    private static final String URL = "url";

    public static void main(String[] args) throws InterruptedException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException {
        String keyStorePath = "/path/to/cert.jks";
        String keyStorePassword = "passsword";
        KeyStore keyStore = loadKeyStore(keyStorePath, keyStorePassword);

        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
        kmf.init(keyStore, keyStorePassword.toCharArray());

        PublishServiceGrpc.PublishServiceStub stub;
        PublishServiceGrpc.PublishServiceBlockingStub blStub;

        try {
            //подключаемся к EVTA
            ManagedChannel channel = NettyChannelBuilder
                    .forAddress("localhost", 9003)
                    .negotiationType(NegotiationType.TLS)
                    .sslContext(GrpcSslContexts.forClient()
                            .trustManager(InsecureTrustManagerFactory.INSTANCE)
                            .keyManager(kmf)
                            .protocols("TLSv1.3")
                            .build())
                    .build();
            AtomicInteger count = new AtomicInteger(0);
            stub = PublishServiceGrpc.newStub(channel);
            blStub = PublishServiceGrpc.newBlockingStub(channel);

            //описываем реакцию на наблюдаемый поток статусов и идентификаторов публикуемых сообщений
            StreamObserver<StatusWithId> statusWithIdStreamObserver = new StreamObserver<StatusWithId>() {
                @Override
                public void onNext(StatusWithId value) {
                    count.incrementAndGet();
                    LOG.info("onNext: " + value.getStatus().getStatusValue() + " id: " + value.getId());
                }

                @Override
                public void onError(Throwable t) {
                    LOG.error("onError: " + t.getMessage());
                }

                @Override
                public void onCompleted() {
                    LOG.info("onCompleted");
                    Status res = blStub.disconnect(DisconnectRequest.newBuilder().build());
                    LOG.info("Client disconnected with status {}", res.getStatusValue());
                }
            };

            //отправляем запрос для подключения к транспорту
            Status res = blStub.connect(ConnectRequest.newBuilder().setUrl(URL).build());
            LOG.info("Client connected with status {}", res.getStatusValue());

            StreamObserver<EventWithId> streamObserver = stub.publish(statusWithIdStreamObserver);

            while (count.get() < 10) {
                //отправляем запрос с сообщением, которое мы хотим опубликовать
                streamObserver.onNext(EventWithId.newBuilder()
                        .setEvent(Event.newBuilder()
                                .addAdditionalHeaders(Header.newBuilder().setName("test").setValue("value").build())
                                .setEvent(ByteString.copyFrom("{\"test\":\"testEvent\"}", Charset.defaultCharset()))
                                .setEventKey(ByteString.copyFrom("testKey", Charset.defaultCharset()))
                                .build())
                        .setId(ThreadLocalRandom.current().nextInt())
                        .build());
                Thread.sleep(1000);
            }

            streamObserver.onCompleted();

            //отключаемся от транспорта, к которому были подключены ранее
            res = blStub.disconnect(DisconnectRequest.newBuilder().build());
            LOG.info("Client disconnected with status {}", res.getStatusValue());

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static KeyStore loadKeyStore(String keyStorePath, String keyStorePassword) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
        KeyStore ks = KeyStore.getInstance("JKS");
        FileInputStream ksStream = new FileInputStream((Paths.get(keyStorePath)).toFile());
        ks.load(ksStream, keyStorePassword.toCharArray());
        ksStream.close();
        return ks;
    }
}

Для подписки на сообщения используются 2 метода:

  • Subscribe — подписка на поток сообщений. Возвращает сообщение и его ID;

  • Commit — подтверждение о вычитке сообщений. Возвращает статус коммита.

Пример кода, написанного АС-клиентом, для осуществления подписки на сообщения с использованием gRPC-взаимодействия:

import io.grpc.Context;
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NegotiationType;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.sbt.ss.reactive.stream.proto.*;

import javax.net.ssl.KeyManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.concurrent.atomic.AtomicInteger;

public class SubscribeClient {
    private static final Logger LOG = LoggerFactory.getLogger(SubscribeClient.class);
    private static final String URL = "fpss://Federation/System/TESTA/1";

    public static void main(String[] args) throws InterruptedException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException {

        String keyStorePath = "/path/to/cert.jks";
        String keyStorePassword = "password";
        KeyStore keyStore = loadKeyStore(keyStorePath, keyStorePassword);

        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
        kmf.init(keyStore, keyStorePassword.toCharArray());

        AtomicInteger count = new AtomicInteger(0);

        try {
            //подключаемся к адаптеру
            ManagedChannel channel = NettyChannelBuilder
                    .forAddress(new InetSocketAddress("localhost", 9014))
                    .negotiationType(NegotiationType.TLS)
                    .sslContext(GrpcSslContexts.forClient()
                            .trustManager(InsecureTrustManagerFactory.INSTANCE)
                            .keyManager(kmf)
                            .protocols("TLSv1.2")
                            .build())
                    .build();

            SubscribeServiceGrpc.SubscribeServiceStub subStub = SubscribeServiceGrpc.newStub(channel);
            SubscribeServiceGrpc.SubscribeServiceStub commitStub = SubscribeServiceGrpc.newStub(channel);
            Context.CancellableContext cancellableContext = Context.current().withCancellation();
            StreamObserver<Status> streamObserverStatus = new StreamObserver<Status>() {
                @Override
                public void onNext(Status value) {
                    LOG.info("successful commit");
                }

                @Override
                public void onError(Throwable t) {
                    LOG.error("onError commit: " + t.getMessage());
                }

                @Override
                public void onCompleted() {
                    LOG.info("onCompleted commit");
                }
            };

            SubscribeSettings subscribeSettings = SubscribeSettings.newBuilder().setUrl(URL).build();

            //описываем реакцию на наблюдаемый поток сообщений, вычитываемых адаптером из транспорта
            StreamObserver<EventWithId> streamObserverEvent = new StreamObserver<EventWithId>() {
                @Override
                public void onNext(EventWithId value) {
                    LOG.info("onNext:" + value.getEvent().toString());
                    count.incrementAndGet();

                    //если адаптер запущен с значением autoCommit равным false, то при получении сообщений клиент должен подтвердить его
                    //для этого отправляем запрос с подтверждением с указанием id полученного сообщения
                    commitStub.commit(CommitRequest.newBuilder()
                            .setSubscribeSettings(subscribeSettings)
                            .setId(value.getId())
                            .build(),
                            streamObserverStatus);
                }

                @Override
                public void onError(Throwable t) {
                    LOG.error("onError: " + t.getMessage());
                }

                @Override
                public void onCompleted() {
                    LOG.info("onCompleted");
                }
            };

            Runnable task = () -> {
                //отправляем запрос на подписку
                subStub.subscribe(EventRequest.newBuilder()
                        .setSubscribeSettings(subscribeSettings)
                        .build(), streamObserverEvent);
            };
            cancellableContext.run(task);
            //вычитываем до отправки 10 коммитов
            while (count.get() < 10) {

            }
            //завершаем прием сообщений клиентом
            streamObserverEvent.onCompleted();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static KeyStore loadKeyStore(String keyStorePath, String keyStorePassword) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
        KeyStore ks = KeyStore.getInstance("JKS");
        FileInputStream ksStream = new FileInputStream((Paths.get(keyStorePath)).toFile());
        ks.load(ksStream, keyStorePassword.toCharArray());
        ksStream.close();
        return ks;
    }
}