Общение в Netty Nio java
Я хочу создать систему связи с двумя клиентами и сервером в Netty nio. Более конкретно, во-первых, я хочу, чтобы два клиента подключались к серверу для отправки сообщения с сервера, а затем для возможности обмена данными между двумя клиентами. Я использую код приведенный в этом примере. Мои изменения в коде можно найти здесь: ссылка
Кажется, что channelRead в serverHandler работает, когда первый клиент подключен, поэтому он всегда возвращает 1, но когда второй клиент подключен, он не изменяется на 2. Как я могу правильно проверить сервер, когда оба клиента подключены к сервер? Как я могу читать это значение динамически из моей основной функции Клиента? Тогда, что является лучшим способом, чтобы оба клиента обменивались данными?
EDIT1: Похоже, что клиентская служба работает и закрывается напрямую, поэтому каждый раз, когда я запускаю новый NettyClient, подключается, но после этого соединение закрывается. Таким образом, счетчик всегда имеет значение от нуля до единицы. Поскольку я был проинструктирован в нижеприведенных комментариях, я тестировал его, используя telnet в том же порту, и счетчик, кажется, растет нормально, однако, с услугой NettyClient нет.
EDIT2: Кажется, что проблема у меня была от future.addListener(ChannelFutureListener.CLOSE);
, которая находилась в channelRead
в ProcessingHandler class
. Когда я прокомментировал это, кажется, что код работает. Однако я не уверен, каковы последствия прокомментированного этого. Более того, я хочу, чтобы моя основная функция клиента проверяла, когда обратное сообщение является конкретным двумя. Как я могу создать метод, ожидающий определенного сообщения с сервера, и тем временем он блокирует основные функции.
static EventLoopGroup workerGroup = new NioEventLoopGroup();
static Promise<Object> promise = workerGroup.next().newPromise();
public static void callClient() throws Exception {
String host = "localhost";
int port = 8080;
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler(promise));
}
});
ChannelFuture f = b.connect(host, port).sync();
} finally {
//workerGroup.shutdownGracefully();
}
}
Я хочу, чтобы внутри основной функции вызывал метод и возвращал результат, а когда ему было 2, чтобы продолжить основные функции. Тем не менее, я не могу вызвать callClient внутри while, так как он будет запускать несколько раз один и тот же клиент.
callBack();
while (true) {
Object msg = promise.get();
System.out.println("Case1: the connected clients is not two");
int ret = Integer.parseInt(msg.toString());
if (ret == 2){
break;
}
}
System.out.println("Case2: the connected clients is two");
// proceed with the main functionality
Как обновить переменную обещания для первого клиента. Когда я запускаю двух клиентов, для первого клиента я всегда получал сообщение:
Случай 1: подключенные клиенты не два
кажется, что обещание не обновляется нормально, а для второго клиента я всегда получал:
Случай 2: подключенные клиенты - это два
Ответы
Ответ 1
Если моя память правильная, ChannelHandlerContext по одному на канал, и он может иметь несколько каналов ChannelHandlers в нем. Переменная ваших каналов - это переменная экземпляра класса вашего обработчика. И вы создаете новый экземпляр ProcessingHandler для каждого соединения. Таким образом, каждый будет иметь одно и только одно соединение в переменной channels
после инициализации - тот, для которого он был создан.
См. new ProcessingHandler()
в функции initChannel в коде сервера (NettyServer.java).
Вы можете сделать статическую переменную channels
так, чтобы она делилась между экземплярами ProcessingHandler. Или вы можете создать один экземпляр ProcessingHandler в другом месте (например, в качестве локальной переменной в функции run()
), а затем передать этот экземпляр на вызов addLast
вместо new ProcessingHandler()
.
Ответ 2
Почему размер каналов ChannelGroup всегда один. Даже если я подключу больше клиентов?
Поскольку дочерний ChannelInitializer
вызывается для каждого нового Channel
(клиента). Там вы создаете новый экземпляр ProcessingHandler
, поэтому каждый канал видит свой экземпляр ChannelGroup
.
Решение 1 - Атрибут канала
Используйте Attribute и свяжите его с Channel
.
Создайте атрибут где-нибудь (скажем, внутри класса Constants
):
public static final AttributeKey<ChannelGroup> CH_GRP_ATTR =
AttributeKey.valueOf(SomeClass.class.getName());
Теперь создайте ChannelGroup, которая будет использоваться всеми экземплярами ProcessingHandler
:
final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
Обновите дочерний элемент ChannelInitializer
в NettyServer:
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new RequestDecoder(),
new ResponseDataEncoder(),
new ProcessingHandler());
ch.attr(Constants.CH_GRP_ATTR).set(channels);
}
Теперь вы можете получить доступ к экземпляру ChannelGroup внутри ваших обработчиков, например:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final ChannelGroup channels = ctx.channel().attr(Constants.CH_GRP_ATTR).get();
channels.add(ctx.channel());
Это будет работать, потому что каждый раз, когда подключается новый клиент, ChannelInitializer будет вызываться с той же ссылкой на ChannelGroup
.
Решение 2 - статическое поле
Если вы объявите ChannelGroup
статическим, все экземпляры класса будут видеть один и тот же экземпляр ChannelGroup
:
private static final ChannelGroup channels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
Решение 3 - распространение общего экземпляра
Ввести параметр в конструктор ProcessingHandler
:
private final ChannelGroup channels;
public ProcessingHandler(ChannelGroup chg) {
this.channels = chg;
}
Теперь внутри класса NettyServer создайте экземпляр ChannelGroup
и распространите его на конструктор ProcessingHandler:
final ChannelGroup channels = new
DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new RequestDecoder(),
new ResponseDataEncoder(),
new ProcessingHandler(channels)); // <- here
}
Лично я выбрал бы первое решение, потому что
- Он четко связывает ChannelGroup с контекстом канала
- Вы можете получить доступ к той же ChannelGroup в других обработчиках.
- У вас может быть несколько экземпляров сервера (выполняется на другом порту, в пределах одной JVM)