Предварительная инициализация пула рабочих потоков для повторного использования объектов подключения (сокетов)
Мне нужно создать пул работников на Java, где каждый рабочий имеет свой собственный подключенный сокет; когда рабочий поток работает, он использует сокет, но сохраняет его открытым для повторного использования позже. Мы решили использовать этот подход, поскольку накладные расходы, связанные с созданием, подключением и уничтожением сокетов на разовой основе, потребовали слишком много накладных расходов, поэтому нам нужен метод, с помощью которого пул работников предварительно инициализируется их сокетным соединением, готовым к приступайте к работе, сохраняя ресурсы сокетов в безопасности от других потоков (сокеты не являются потокобезопасными), поэтому нам нужно что-то в этом направлении...
public class SocketTask implements Runnable {
Socket socket;
public SocketTask(){
//create + connect socket here
}
public void run(){
//use socket here
}
}
При запуске приложения мы хотим инициализировать рабочих и, надеюсь, сокет-подключения так или иначе...
MyWorkerPool pool = new MyWorkerPool();
for( int i = 0; i < 100; i++)
pool.addWorker( new WorkerThread());
Поскольку работа запрашивается приложением, мы отправляем задачи в рабочий пул для немедленного выполнения...
pool.queueWork( new SocketTask(..));
Обновлено с помощью рабочего кода
Основываясь на полезных комментариях от Gray и jontejj, у меня есть следующий код, работающий...
SocketTask
public class SocketTask implements Runnable {
private String workDetails;
private static final ThreadLocal<Socket> threadLocal =
new ThreadLocal<Socket>(){
@Override
protected Socket initialValue(){
return new Socket();
}
};
public SocketTask(String details){
this.workDetails = details;
}
public void run(){
Socket s = getSocket(); //gets from threadlocal
//send data on socket based on workDetails, etc.
}
public static Socket getSocket(){
return threadLocal.get();
}
}
ExecutorService
ExecutorService threadPool =
Executors.newFixedThreadPool(5, Executors.defaultThreadFactory());
int tasks = 15;
for( int i = 1; i <= tasks; i++){
threadPool.execute(new SocketTask("foobar-" + i));
}
Мне нравится этот подход по нескольким причинам...
- Сокеты - это локальные объекты (через ThreadLocal), доступные для выполняемых задач, устраняющие проблемы параллелизма.
- Сокеты создаются один раз и сохраняются открытыми, повторно используются при постановке новых задач, исключая создание или уничтожение служебных данных сокета.
Ответы
Ответ 1
Одной из идей было бы поставить Socket
в BlockingQueue
. Тогда всякий раз, когда вам нужен Socket
ваши потоки могут take()
из очереди, а когда они выполняются с помощью Socket
они put()
обратно в очередь.
public void run() {
Socket socket = socketQueue.take();
try {
// use the socket ...
} finally {
socketQueue.put(socket);
}
}
Это имеет дополнительные преимущества:
- Вы можете вернуться к использованию кода
ExecutorService
. - Вы можете отделить сокет от обработки результатов.
- Вам не требуется соответствие 1 к 1 для обработки потоков и сокетов. Но связь сокета может составлять 98% работы, поэтому, возможно, нет выигрыша.
- Когда все будет готово, а ваш
ExecutorService
завершен, вы можете отключить свои сокеты, просто отключив их и закрыв их.
Это добавляет дополнительные накладные расходы другого BlockingQueue
но если вы используете Socket
связь, вы этого не заметите.
мы не считаем, что ThreadFactory удовлетворяет наши потребности...
Я думаю, вы могли бы сделать эту работу, если бы использовали локаторы потоков. Ваша фабрика потоков создаст поток, который сначала откроет сокет, сохранит его в потоковом-локальном, а затем Runnable
arg, который выполняет всю работу с сокетом, деинсталлируя задания из внутренней очереди ExecutorService
. Как только это будет сделано, метод arg.run()
завершится, и вы сможете получить сокет из локального потока и закрыть его.
Что-то вроде следующего. Это немного грязно, но вы должны получить эту идею.
ExecutorService threadPool =
Executors.newFixedThreadPool(10,
new ThreadFactory() {
public Thread newThread(final Runnable r) {
Thread thread = new Thread(new Runnable() {
public void run() {
openSocketAndStoreInThreadLocal();
// our tasks would then get the socket from the thread-local
r.run();
getSocketFromThreadLocalAndCloseIt();
}
});
return thread;
}
}));
Таким образом, ваши задачи будут реализовывать Runnable
и выглядеть так:
public SocketWorker implements Runnable {
private final ThreadLocal<Socket> threadLocal;
public SocketWorker(ThreadLocal<Socket> threadLocal) {
this.threadLocal = threadLocal;
}
public void run() {
Socket socket = threadLocal.get();
// use the socket ...
}
}
Ответ 2
Я думаю, вы должны использовать ThreadLocal
package com.stackoverflow.q16680096;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main
{
public static void main(String[] args)
{
ExecutorService pool = Executors.newCachedThreadPool();
int nrOfConcurrentUsers = 100;
for(int i = 0; i < nrOfConcurrentUsers; i++)
{
pool.submit(new InitSocketTask());
}
// do stuff...
pool.submit(new Task());
}
}
package com.stackoverflow.q16680096;
import java.net.Socket;
public class InitSocketTask implements Runnable
{
public void run()
{
Socket socket = SocketPool.get();
// Do initial setup here
}
}
package com.stackoverflow.q16680096;
import java.net.Socket;
public final class SocketPool
{
private static final ThreadLocal<Socket> SOCKETS = new ThreadLocal<Socket>(){
@Override
protected Socket initialValue()
{
return new Socket(); // Pass in suitable arguments here...
}
};
public static Socket get()
{
return SOCKETS.get();
}
}
package com.stackoverflow.q16680096;
import java.net.Socket;
public class Task implements Runnable
{
public void run()
{
Socket socket = SocketPool.get();
// Do stuff with socket...
}
}
Где каждый поток получает свой собственный сокет.