Есть ли способ предотвратить ClosedByInterruptException?

В следующем примере у меня есть один файл, который используется двумя потоками (в реальном примере у меня может быть любое количество потоков)

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class A {
    static volatile boolean running = true;

    public static void main(String[] args) throws IOException, InterruptedException {
        String name = "delete.me";
        new File(name).deleteOnExit();
        RandomAccessFile raf = new RandomAccessFile(name, "rw");
        FileChannel fc = raf.getChannel();

        Thread monitor = new Thread(() -> {
            try {
                while (running) {
                    System.out.println(name + " is " + (fc.size() >> 10) + " KB");

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        System.out.println("Interrupted");
                        Thread.currentThread().interrupt();
                    }
                }
            } catch (IOException e) {
                System.err.println("Monitor thread died");
                e.printStackTrace();
            }
        });
        monitor.setDaemon(true);
        monitor.start();

        Thread writer = new Thread(() -> {
            ByteBuffer bb = ByteBuffer.allocateDirect(32);
            try {
                while (running) {
                    bb.position(0).limit(32);
                    fc.write(bb);

                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        System.out.println("Interrupted");
                        Thread.currentThread().interrupt();
                    }
                }
            } catch (IOException e) {
                System.err.println("Writer thread died");
                e.printStackTrace();
            }
        });

        writer.setDaemon(true);
        writer.start();

        Thread.sleep(5000);
        monitor.interrupt();
        Thread.sleep(2000);
        running = false;
        raf.close();
    }
}

Скорее создавая RandomAccessFile и сопоставление памяти для каждого потока, у меня есть один файл и одно сопоставление памяти, разделяемое между потоками, но есть catch, если какой-либо поток прерывается, ресурс закрывается.

delete.me is 0 KB
delete.me is 2 KB
delete.me is 4 KB
delete.me is 6 KB
delete.me is 8 KB
Interrupted
Monitor thread died
java.nio.channels.ClosedByInterruptException
    at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
    at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:315)
    at A.lambda$main$0(A.java:19)
    at java.lang.Thread.run(Thread.java:748)
Writer thread died
java.nio.channels.ClosedChannelException
    at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
    at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:199)
    at A.lambda$main$1(A.java:41)
    at java.lang.Thread.run(Thread.java:748)

Есть ли способ предотвратить блокировку FileChannel только потому, что один поток, использующий его, был прерван?


EDIT Что я хочу избежать, так это, как я подозреваю, это не сработает для Java 9+

private void doNotCloseOnInterrupt(FileChannel fc) {
    try {
        Field field = AbstractInterruptibleChannel.class
                .getDeclaredField("interruptor");
        field.setAccessible(true);
        field.set(fc, (Interruptible) thread
                -> Jvm.warn().on(getClass(), fc + " not closed on interrupt"));
    } catch (Exception e) {
        Jvm.warn().on(getClass(), "Couldn't disable close on interrupt", e);
    }
}

BTW Вызов fc.size() возвращает размер, как ожидалось, с помощью вышеупомянутого взлома.

Ответы

Ответ 1

Поскольку вы сказали, что хотите "одно сопоставление памяти, разделяемое между потоками", такой проблемы вообще нет, так как отображение памяти не влияет на закрытие FileChannel. Фактически, это хорошая стратегия как можно скорее закрыть канал, чтобы уменьшить ресурсы, удерживаемые приложением.

Например

static volatile boolean running = true;

public static void main(String[] args) throws IOException {
    Path name = Paths.get("delete.me");
    MappedByteBuffer mapped;
    try(FileChannel fc1 = FileChannel.open(name, READ,WRITE,CREATE_NEW,DELETE_ON_CLOSE)) {
        mapped = fc1.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
    }
    Thread thread1 = new Thread(() -> {
        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
        while(running && !Thread.interrupted()) {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
            byte[] b = new byte[5];
            mapped.position(4000);
            mapped.get(b);
            System.out.println("read "+new String(b, StandardCharsets.US_ASCII));
        }
    });
    thread1.setDaemon(true);
    thread1.start();
    Thread thread2 = new Thread(() -> {
        byte[] b = "HELLO".getBytes(StandardCharsets.US_ASCII);
        while(running && !Thread.interrupted()) {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
            mapped.position(4000);
            mapped.put(b);
            System.out.println("wrote "+new String(b, StandardCharsets.US_ASCII));
            byte b1 = b[0];
            System.arraycopy(b, 1, b, 0, b.length-1);
            b[b.length-1] = b1;
        }
        mapped.force();
    });
    thread2.setDaemon(true);
    thread2.start();
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
    thread2.interrupt();
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
    running = false;

Это демонстрирует, как потоки могут читать и записывать свои данные после закрытия канала, а прерывание потока записи не останавливает поток чтения.

Если вам необходимо выполнить операции FileChannel в дополнение к I/O с отображением памяти, нет проблем с использованием нескольких экземпляров FileChannel, поэтому закрытие одного канала не влияет на другое. Например

static volatile boolean running = true;

public static void main(String[] args) throws IOException {
    Path name = Paths.get("delete.me");
    try(FileChannel fc1 = FileChannel.open(name,READ,WRITE,CREATE_NEW,DELETE_ON_CLOSE);
        FileChannel fc2 = FileChannel.open(name,READ,WRITE)) {
        Thread thread1 = new Thread(() -> {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
            try {
                MappedByteBuffer mapped = fc1.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
                while(running && !Thread.interrupted()) {
                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
                    byte[] b = new byte[5];
                    mapped.position(4000);
                    mapped.get(b);
                    System.out.println("read from map "
                        +new String(b, StandardCharsets.US_ASCII)
                        +", file size "+fc1.size());
                }
            }catch(IOException ex) {
                ex.printStackTrace();
            }
        });
        thread1.setDaemon(true);
        thread1.start();
        Thread thread2 = new Thread(() -> {
            byte[] b = "HELLO".getBytes(StandardCharsets.US_ASCII);
            try {
                MappedByteBuffer mapped = fc2.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
                fc2.position(4096);
                try {
                    while(running && !Thread.interrupted()) {
                        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
                        mapped.position(4000);
                        mapped.put(b);
                        System.out.println("wrote to mapped "
                            +new String(b, StandardCharsets.US_ASCII));
                        byte b1 = b[0];
                        System.arraycopy(b, 1, b, 0, b.length-1);
                        b[b.length-1] = b1;
                        fc2.write(ByteBuffer.wrap(b));
                    }
                } finally { mapped.force(); }
            }catch(IOException ex) {
                ex.printStackTrace();
            }
        });
        thread2.setDaemon(true);
        thread2.start();
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
        thread2.interrupt();
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
        running = false;
    }
}

Здесь прерывание одного потока закрывает его канал, но не влияет на другое. Кроме того, даже когда каждый поток получает свой собственный MappedByteBuffer из своего собственного канала, изменения просматриваются другим, даже без использования force(). Конечно, последнее определяется как зависящее от системы поведение, не гарантированное работать на каждой системе.

Но, как показано в первом примере, вы все равно можете создавать общие буферы только из одного из каналов в начале, одновременно выполняя операции ввода-вывода на другом канале, по одному на поток, и не имеет значения, будут ли и какие каналы закрыты, на него не влияют отображенные буферы.

Ответ 2

Вы можете использовать отражение для доступа к interruptor поля и незаконно получить sun.nio.ch.Interruptible типа класса оттуда, чтобы создать экземпляр прокси:

private void doNotCloseOnInterrupt(FileChannel fc) {
    try {
        Field field = AbstractInterruptibleChannel.class.getDeclaredField("interruptor");
        Class<?> interruptibleClass = field.getType();
        field.setAccessible(true);
        field.set(fc, Proxy.newProxyInstance(
                interruptibleClass.getClassLoader(), 
                new Class[] { interruptibleClass },
                new InterruptibleInvocationHandler()));
    } catch (final Exception e) {
        Jvm.warn().on(getClass(), "Couldn't disable close on interrupt", e);
    }
}

public class InterruptibleInvocationHandler implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
    {
        // TODO: Check method and handle accordingly
        return null;
    }
}

В Java9 это работает с одним предупреждением, так как оно выполняется по умолчанию с --illegal-access=permit.

Однако этот флаг может быть удален в будущих версиях, и наилучшим способом обеспечения этого долгосрочного действия является использование флага --add-opens:

--add-opens java.base/sun.nio.ch=your-module
--add-opens java.base/java.nio.channels.spi=your-module

Или, если вы не работаете с модулями (не рекомендуется):

--add-opens java.base/sun.nio.ch=ALL-UNNAMED
--add-opens java.base/java.nio.channels.spi=ALL-UNNAMED

Это работает с Java 9, Java 10 и текущей версией JDK 11 Early-Access Build (28 (2018/8/23)).

Ответ 3

Используя AsynchronousFileChannel, ClosedByInterruptException никогда не бросается. Кажется, что это не похоже на прерывание

Тест, выполненный с использованием jdk 1.8.0_72

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicLong;

public class A {
    static volatile boolean running = true;

    public static void main(String[] args) throws IOException, InterruptedException {
        String name = "delete.me";
        Path path = new File(name).toPath();
        AtomicLong position = new AtomicLong(0);

        AsynchronousFileChannel fc = AsynchronousFileChannel.open(path, 
                StandardOpenOption.CREATE_NEW, StandardOpenOption.DELETE_ON_CLOSE ,
                StandardOpenOption.READ, StandardOpenOption.WRITE,
                StandardOpenOption.WRITE, StandardOpenOption.SYNC);

        CompletionHandler<Integer, Object> handler =
                new CompletionHandler<Integer, Object>() {
                @Override
                public void completed(Integer result, Object attachment) {
                    //System.out.println(attachment + " completed with " + result + " bytes written");
                    position.getAndAdd(result);
                }
                @Override
                public void failed(Throwable e, Object attachment) {
                    System.err.println(attachment + " failed with:");
                    e.printStackTrace();
                }
            };

        Runnable monitorRun = () -> {
            try {
                while (running) {
                    System.out.println(name + " is " + (fc.size() >> 10) + " KB");

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        System.out.println("Interrupted");
                        Thread.currentThread().interrupt();
                        System.out.println("Interrupt call failed so return");
                        return;
                    }
                }
            } catch (IOException e) {
                System.err.println("Monitor thread died");
                e.printStackTrace();
            }
        };

        Thread monitor = new Thread(monitorRun);
        monitor.setDaemon(true);
        monitor.start();

        Thread writer = new Thread(() -> {
            ByteBuffer bb = ByteBuffer.allocateDirect(32);
            try {
                while (running) {
                    bb.position(0).limit(32);
                    fc.write(bb,position.get(),null,handler);

                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        System.out.println("Interrupted");
                        Thread.currentThread().interrupt();
                    }
                }
            } catch (Exception e) {
                System.err.println("Writer thread died");
                e.printStackTrace();
            }
        });

        writer.setDaemon(true);
        writer.start();

        Thread.sleep(5000);
        monitor.interrupt();
        Thread.sleep(2000);
        monitor = new Thread(monitorRun);
        monitor.start();
        Thread.sleep(5000);
        running = false;
        fc.close();
    }
}

Создайте следующий результат:

delete.me is 0 KB
delete.me is 3 KB
delete.me is 6 KB
delete.me is 9 KB
delete.me is 12 KB
Interrupted
Interrupt call failed so return
delete.me is 21 KB
delete.me is 24 KB
delete.me is 27 KB
delete.me is 30 KB
delete.me is 33 KB