Конфигурация GRPC egress#
EVTA поддерживает работу с помощью gRPC взаимодействия. Если в конфигурационном файле adapter.conf в поле egressEnabled установлено grpc, то EVTA позволяет обращаться извне к брокерам сообщений при помощи gRPC взаимодействия соответственно.
В конфигурации в блоке egressSubscribe указываются настройки egress, который позволяет вычитывать сообщения извне, а egressPublish, наоборот, отправлять сообщения. Для egressSubscribe в конфигурации должны быть указаны настройки для consumer, при типе egressPublish — для producer.
Настройки для брокера берутся из файла, указанного в urlConfig.
Пример заполненного файла urlConfig.json (путь до него указывается в файле adapter.conf)
{
"fpss": {
"Domain": {
"Federation": {
"SegmentA": {
"SystemName": {
"EventName": {
"1": {
"type": "kafka",
"topic": "EventNameTopic",
"properties": "/path/to/producer.properties"
}
},
"OtherEventName": {
"1": {
"type": "kafka",
"topic": "OtherEventNameTopic",
"properties": "/path/to/other_producer.properties"
}
}
}
}
}
}
}
}
Для настройки gRPC-взаимодействия используется протокол:
syntax = "proto3";
option java_multiple_files = true;
option java_package = "ru.sbt.ss.reactive.stream.proto";
option java_outer_classname = "EventServiceApi";
package ru.sbt.synapse.fpss.adapter.grpc.api;
service PublishService {
rpc Connect (ConnectRequest) returns (Status) {
}
rpc Disconnect (DisconnectRequest) returns (Status) {
}
rpc Publish (stream EventWithId) returns (stream StatusWithId) {
}
}
service SubscribeService {
rpc Subscribe (EventRequest) returns (stream EventWithId) {
}
rpc Commit (CommitRequest) returns (Status) {}
}
message SubscribeSettings {
string url = 1;
bool sendMessagesForAllClients = 2;
string kafkaGroupName = 3;
Offset offset = 4;
bool stopAfterEmptyPoll = 5;
}
enum Offset {
GROUP = 0;
BEGIN = 1;
END = 2;
}
message ConnectRequest {
string url = 1;
}
message DisconnectRequest {
}
message CommitRequest {
SubscribeSettings subscribeSettings = 1;
int32 id = 2;
}
message EventWithId {
Event event = 1;
int32 id = 2;
}
message Event {
bytes event = 1;
bytes eventKey = 2;
repeated Header additionalHeaders = 3;
}
enum StatusCode {
FAILED = 0;
SUCCESS = 1;
}
message Status {
StatusCode status = 1;
string message = 2;
}
message StatusWithId {
Status status = 1;
int32 id = 2;
}
message EventRequest {
SubscribeSettings subscribeSettings = 1;
}
message Header {
string name = 1;
string value = 2;
}
Для публикации сообщений используются три метода:
Connect — подключение к topic для публикации;
Disconnect — отключение от topic, в который ранее публиковали сообщения;
Publish — непосредственно публикация сообщений, публикация сообщений осуществляется потоком. Возвращает поток сообщений с 2 параметрами: статус сообщения и ID, по которому можно идентифицировать сообщение.
Для отправки gRPC запроса на публикацию необходимо написать код клиента.
Пример кода, написанного АС-клиентом, для осуществления публикации сообщений с использованием gRPC-взаимодействия:
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NegotiationType;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.sbt.ss.reactive.stream.proto.*;
import javax.net.ssl.KeyManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
public class PublishClient {
private static final Logger LOG = LoggerFactory.getLogger(PublishClient.class);
private static final String URL = "url";
public static void main(String[] args) throws InterruptedException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException {
String keyStorePath = "/path/to/cert.jks";
String keyStorePassword = "passsword";
KeyStore keyStore = loadKeyStore(keyStorePath, keyStorePassword);
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(keyStore, keyStorePassword.toCharArray());
PublishServiceGrpc.PublishServiceStub stub;
PublishServiceGrpc.PublishServiceBlockingStub blStub;
try {
//подключаемся к EVTA
ManagedChannel channel = NettyChannelBuilder
.forAddress("localhost", 9003)
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.keyManager(kmf)
.protocols("TLSv1.3")
.build())
.build();
AtomicInteger count = new AtomicInteger(0);
stub = PublishServiceGrpc.newStub(channel);
blStub = PublishServiceGrpc.newBlockingStub(channel);
//описываем реакцию на наблюдаемый поток статусов и идентификаторов публикуемых сообщений
StreamObserver<StatusWithId> statusWithIdStreamObserver = new StreamObserver<StatusWithId>() {
@Override
public void onNext(StatusWithId value) {
count.incrementAndGet();
LOG.info("onNext: " + value.getStatus().getStatusValue() + " id: " + value.getId());
}
@Override
public void onError(Throwable t) {
LOG.error("onError: " + t.getMessage());
}
@Override
public void onCompleted() {
LOG.info("onCompleted");
Status res = blStub.disconnect(DisconnectRequest.newBuilder().build());
LOG.info("Client disconnected with status {}", res.getStatusValue());
}
};
//отправляем запрос для подключения к транспорту
Status res = blStub.connect(ConnectRequest.newBuilder().setUrl(URL).build());
LOG.info("Client connected with status {}", res.getStatusValue());
StreamObserver<EventWithId> streamObserver = stub.publish(statusWithIdStreamObserver);
while (count.get() < 10) {
//отправляем запрос с сообщением, которое мы хотим опубликовать
streamObserver.onNext(EventWithId.newBuilder()
.setEvent(Event.newBuilder()
.addAdditionalHeaders(Header.newBuilder().setName("test").setValue("value").build())
.setEvent(ByteString.copyFrom("{\"test\":\"testEvent\"}", Charset.defaultCharset()))
.setEventKey(ByteString.copyFrom("testKey", Charset.defaultCharset()))
.build())
.setId(ThreadLocalRandom.current().nextInt())
.build());
Thread.sleep(1000);
}
streamObserver.onCompleted();
//отключаемся от транспорта, к которому были подключены ранее
res = blStub.disconnect(DisconnectRequest.newBuilder().build());
LOG.info("Client disconnected with status {}", res.getStatusValue());
} catch (Exception e) {
e.printStackTrace();
}
}
private static KeyStore loadKeyStore(String keyStorePath, String keyStorePassword) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
KeyStore ks = KeyStore.getInstance("JKS");
FileInputStream ksStream = new FileInputStream((Paths.get(keyStorePath)).toFile());
ks.load(ksStream, keyStorePassword.toCharArray());
ksStream.close();
return ks;
}
}
Для подписки на сообщения используются 2 метода:
Subscribe — подписка на поток сообщений. Возвращает сообщение и его ID;
Commit — подтверждение о вычитке сообщений. Возвращает статус коммита.
Пример кода, написанного АС-клиентом, для осуществления подписки на сообщения с использованием gRPC-взаимодействия:
import io.grpc.Context;
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NegotiationType;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.sbt.ss.reactive.stream.proto.*;
import javax.net.ssl.KeyManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.concurrent.atomic.AtomicInteger;
public class SubscribeClient {
private static final Logger LOG = LoggerFactory.getLogger(SubscribeClient.class);
private static final String URL = "fpss://Federation/System/TESTA/1";
public static void main(String[] args) throws InterruptedException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException {
String keyStorePath = "/path/to/cert.jks";
String keyStorePassword = "password";
KeyStore keyStore = loadKeyStore(keyStorePath, keyStorePassword);
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(keyStore, keyStorePassword.toCharArray());
AtomicInteger count = new AtomicInteger(0);
try {
//подключаемся к адаптеру
ManagedChannel channel = NettyChannelBuilder
.forAddress(new InetSocketAddress("localhost", 9014))
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.keyManager(kmf)
.protocols("TLSv1.2")
.build())
.build();
SubscribeServiceGrpc.SubscribeServiceStub subStub = SubscribeServiceGrpc.newStub(channel);
SubscribeServiceGrpc.SubscribeServiceStub commitStub = SubscribeServiceGrpc.newStub(channel);
Context.CancellableContext cancellableContext = Context.current().withCancellation();
StreamObserver<Status> streamObserverStatus = new StreamObserver<Status>() {
@Override
public void onNext(Status value) {
LOG.info("successful commit");
}
@Override
public void onError(Throwable t) {
LOG.error("onError commit: " + t.getMessage());
}
@Override
public void onCompleted() {
LOG.info("onCompleted commit");
}
};
SubscribeSettings subscribeSettings = SubscribeSettings.newBuilder().setUrl(URL).build();
//описываем реакцию на наблюдаемый поток сообщений, вычитываемых адаптером из транспорта
StreamObserver<EventWithId> streamObserverEvent = new StreamObserver<EventWithId>() {
@Override
public void onNext(EventWithId value) {
LOG.info("onNext:" + value.getEvent().toString());
count.incrementAndGet();
//если адаптер запущен с значением autoCommit равным false, то при получении сообщений клиент должен подтвердить его
//для этого отправляем запрос с подтверждением с указанием id полученного сообщения
commitStub.commit(CommitRequest.newBuilder()
.setSubscribeSettings(subscribeSettings)
.setId(value.getId())
.build(),
streamObserverStatus);
}
@Override
public void onError(Throwable t) {
LOG.error("onError: " + t.getMessage());
}
@Override
public void onCompleted() {
LOG.info("onCompleted");
}
};
Runnable task = () -> {
//отправляем запрос на подписку
subStub.subscribe(EventRequest.newBuilder()
.setSubscribeSettings(subscribeSettings)
.build(), streamObserverEvent);
};
cancellableContext.run(task);
//вычитываем до отправки 10 коммитов
while (count.get() < 10) {
}
//завершаем прием сообщений клиентом
streamObserverEvent.onCompleted();
} catch (Exception e) {
e.printStackTrace();
}
}
private static KeyStore loadKeyStore(String keyStorePath, String keyStorePassword) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
KeyStore ks = KeyStore.getInstance("JKS");
FileInputStream ksStream = new FileInputStream((Paths.get(keyStorePath)).toFile());
ks.load(ksStream, keyStorePassword.toCharArray());
ksStream.close();
return ks;
}
}