Инструмент Netty EventLoop для детерминированного выполнения запланированных задач

Я хотел бы использовать инструмент Netty EventLoop для того, чтобы:

  • Запуск задач в детерминированном порядке.
  • Возьмите крайний срок для запланированных задач.
  • Быстрая перемотка виртуальных часов, запуск выполнения заданных задач.

Я знаю о EmbeddedChannel и я использую его в некоторых тестах. Но то, что я хочу, - это что-то среднее между модульным тестированием и интеграционным тестированием, которое остается слепым относительно некоторых угловых случаев. В сценариях с подключением и повторным подключением и пинга много запланированных задач. Я мог бы добавить детерминизм с огромными задержками, но я не хочу, чтобы мои тесты подождали секунды или больше. Таким образом, инструмент Netty EventLoop выглядит как решение.

Я уже написал код, который имеет смысл хотя бы для меня.

  • Я ScheduledFutureTask#nanoTime чтобы вернуть мое значение.
  • Я NioEventLoopGroup поэтому у меня есть сроки выполнения задач.
  • Я ScheduledFutureTask#nanoTime значение, возвращаемое ScheduledFutureTask#nanoTime.

ScheduledFutureTask#nanoTime что код Netty использует только значение, возвращаемое ScheduledFutureTask#nanoTime (отличный дизайн!), Поэтому это очень ограниченное изменение. Я использую ByteBuddy, чтобы избежать копирования кода Netty, но это не важно.

Очень простой тест, такой как InstrumentedNioEventLoopGroupTest возникает при планировании только 1 задачи, поскольку AbstractScheduledEventExecutor#pollScheduledTask(long) имеет нулевую очередь.

Я обнаружил, что каждый NioEventLoop имеет свою собственную очередь задач, а опрос в очереди может не произойти, потому что NioEventLoopGroup ожидает, что Selector будет сигнализировать что-то, что имеет смысл. Поэтому я увеличил количество потоков NioEventLoopGroup до 2. Я также попытался установить ioRatio в 1 и запланировать больше задач, не имея лучших результатов. Используя отладчик, кажется, что мои задачи всегда "падают" в очередь задач, которая не опрошена.

Любая идея сделать эту работу? Я использую Netty 4.1.24.Final.

ScheduledFutureTaskHack.java

package com.otcdlink.chiron.integration.harness;

import com.otcdlink.chiron.toolbox.ToStringTools;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.agent.ByteBuddyAgent;
import net.bytebuddy.dynamic.loading.ClassReloadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.Future;
import java.util.function.LongSupplier;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate;
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

/**
 *
 * Got the delegation working with the help of
 * https://www.infoq.com/articles/Easily-Create-Java-Agents-with-ByteBuddy
 */
final class ScheduledFutureTaskHack {

  private static final Logger LOGGER = LoggerFactory.getLogger( ScheduledFutureTaskHack.class ) ;

  private static final Class< ? > SCHEDULEDFUTURETASK_CLASS ;
  private static final Method SCHEDULEDFUTURETASK_NANOTIME_METHOD ;
  private static final Method SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD ;
  private static final Field SCHEDULEDFUTURETASK_DEADLINENANOS_FIELD ;
  private static final Field SCHEDULEDFUTURETASK_STARTTIME_FIELD ;
  static {
    try {
      SCHEDULEDFUTURETASK_CLASS = Class.forName( "io.netty.util.concurrent.ScheduledFutureTask" ) ;
      SCHEDULEDFUTURETASK_NANOTIME_METHOD =
          SCHEDULEDFUTURETASK_CLASS.getDeclaredMethod( "nanoTime" ) ;
      SCHEDULEDFUTURETASK_NANOTIME_METHOD.setAccessible( true ) ;
      SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD =
          SCHEDULEDFUTURETASK_CLASS.getDeclaredMethod( "deadlineNanos") ;
      SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD.setAccessible( true ) ;
      SCHEDULEDFUTURETASK_DEADLINENANOS_FIELD =
          SCHEDULEDFUTURETASK_CLASS.getDeclaredField( "deadlineNanos" ) ;
      SCHEDULEDFUTURETASK_DEADLINENANOS_FIELD.setAccessible( true ) ;
      SCHEDULEDFUTURETASK_STARTTIME_FIELD =
          SCHEDULEDFUTURETASK_CLASS.getDeclaredField( "START_TIME" ) ;
      SCHEDULEDFUTURETASK_STARTTIME_FIELD.setAccessible( true ) ;
    } catch( ClassNotFoundException | NoSuchMethodException | NoSuchFieldException e ) {
      throw new Error( e ) ;
    }
  }

  /**
   * Everything is this class must be visible from the redefined class.
   */
  @SuppressWarnings( "unused" )
  public static final class StaticMethodDelegate {
    /**
     * Calls to {@link io.netty.util.concurrent.ScheduledFutureTask#nanoTime()} are redirected
     * to this method.
     * Sadly we can't use parameter annotated with {@link @This} or something giving a hint
     * about the call context. It looks like a consequence of JVMTI reload not supporting method
     * addition (adding a parameter would imply creating a new method).
     */
    public static long nanoTime() {
      final long supplied = longSupplier.getAsLong() ;
      LOGGER.debug( "Called " + StaticMethodDelegate.class.getSimpleName() + "#nanoTime(), " +
          "returns " + supplied + "." ) ;
      return supplied ;
    }

  }

  private static LongSupplier longSupplier = null ;

  static void install( final LongSupplier longSupplier ) {
    install( longSupplier, true ) ;
  }

  /**
   *
   * @param longSupplier
   * @param suppliedNanosRelativeToClassloadingTime if {@code true}, supplied nanoseconds are
   *     relative to {@link io.netty.util.concurrent.ScheduledFutureTask#START_TIME}.
   *     Original behavior of the hacked method is to substract
   *     {@link io.netty.util.concurrent.ScheduledFutureTask#START_TIME} from value returned
   *     by {@link System#nanoTime()} (probably to make number more readable and reduce the risk
   *     of an overflow). During tests we prefer to not care about start time so there is this
   *     option to add it automatically.
   */
  static void install(
      final LongSupplier longSupplier,
      final boolean suppliedNanosRelativeToClassloadingTime
  ) {
    checkState( ScheduledFutureTaskHack.longSupplier == null ) ;
    if( suppliedNanosRelativeToClassloadingTime ) {
      final long startTime = START_TIME ;
      LOGGER.debug(
          "Installing with value of " +
          SCHEDULEDFUTURETASK_STARTTIME_FIELD.toGenericString() +
          " = " + startTime + " automatically added to the values supplied."
      ) ;
      class AdjustedLongSupplier implements LongSupplier {
        @Override
        public long getAsLong() {
          return longSupplier.getAsLong() + startTime ;
        }
        @Override
        public String toString() {
          return ToStringTools.getNiceClassName( this ) + "{startTime=" + startTime + "}" ;
        }
      }
      ScheduledFutureTaskHack.longSupplier = new AdjustedLongSupplier() ;
    } else {
      ScheduledFutureTaskHack.longSupplier = checkNotNull( longSupplier ) ;
    }
    ByteBuddyAgent.install() ;
    LOGGER.info( "Successfully installed ByteBuddy Agent." ) ;
    redefineClass() ;
    LOGGER.info( "Successfully redefined static method implementation." ) ;
  }

  private static void redefineClass() {
    new ByteBuddy()
        .redefine( SCHEDULEDFUTURETASK_CLASS )
        .method( named( "nanoTime" )
            .and( isStatic() )
            .and( isPackagePrivate() )
            .and( takesArguments( 0 ) )
            .and( returns( long.class ) )
        )
        .intercept( MethodDelegation.to( StaticMethodDelegate.class ) )
        .make()
        .load( ScheduledFutureTaskHack.class.getClassLoader(), ClassReloadingStrategy.fromInstalledAgent() )
    ;
  }

  /**
   * Invokes method replacing {@link io.netty.util.concurrent.ScheduledFutureTask#nanoTime()}.
   */
  public static long invokeNanoTime() {
    try {
      return ( long ) SCHEDULEDFUTURETASK_NANOTIME_METHOD.invoke( null ) ;
    } catch( IllegalAccessException | InvocationTargetException e ) {
      throw new Error( e ) ;
    }
  }

  /**
   * The {@link io.netty.util.concurrent.ScheduledFutureTask#deadlineNanos()} method returns
   * the value made from {@link System#nanoTime()},
   * minus {@link io.netty.util.concurrent.ScheduledFutureTask#START_TIME},
   * plus the delay before executing the task.
   */
  public static Long invokeDeadlineNanos( final Future future ) {
    try {
      if( SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD.getDeclaringClass()
          .isAssignableFrom( future.getClass() )
      ) {
        return ( long ) SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD.invoke( future ) ;
      } else {
        return null ;
      }
    } catch( IllegalAccessException | InvocationTargetException e ) {
      throw new Error(
          "Could not access method " + SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD + " in " + future,
          e
      ) ;
    }
  }

  private static long readStartTime() {
    try {
      return ( long ) SCHEDULEDFUTURETASK_STARTTIME_FIELD.get( null ) ;
    } catch( IllegalAccessException e ) {
      throw new Error(
          "Could not access static field " + SCHEDULEDFUTURETASK_STARTTIME_FIELD,
          e
      ) ;
    }
  }

  public static final long START_TIME = readStartTime() ;


}

ScheduledFutureTaskHackTest.java

package com.otcdlink.chiron.integration.harness;

import com.otcdlink.chiron.toolbox.ToStringTools;
import com.otcdlink.chiron.toolbox.netty.NettyTools;
import io.netty.channel.nio.NioEventLoopGroup;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.assertj.core.api.Assertions.assertThat;

public class ScheduledFutureTaskHackTest {

  @Test
  public void fastForward() throws InterruptedException {

    final AtomicLong nanotimeHolder = new AtomicLong( 0 ) ;
    ScheduledFutureTaskHack.install( nanotimeHolder::get ) ;
    final long startTime = hackedNanoTime() ;

    final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup() ;
    final Semaphore scheduledTaskCompleted = new Semaphore( 0 ) ;
    nioEventLoopGroup.schedule(
        () -> {
          scheduledTaskCompleted.release() ;
          LOGGER.info( "Scheduled task completed." ) ;
        },
        1,
        TimeUnit.HOURS
    ) ;
    LOGGER.info( "Scheduled task for in 1 hour, now fast-forwarding Netty clock ..." ) ;

    // Test fails when disabling fast-forward below.
    nanotimeHolder.set( startTime + TimeUnit.HOURS.toNanos( 1 ) + 1 ) ;
    Thread.sleep( 1000 ) ;
    hackedNanoTime() ;
    // Amazingly Netty detected clock change and ran the task!
    assertThat( scheduledTaskCompleted.tryAcquire( 1, TimeUnit.SECONDS ) )
        .describedAs( "Scheduled task should have completed within 1 second" )
        .isTrue()
    ;

  }


// =======
// Fixture
// =======

  private static final Logger LOGGER = LoggerFactory.getLogger(
      ScheduledFutureTaskHackTest.class ) ;

  static {
    NettyTools.forceNettyClassesToLoad() ;
  }

  private static long hackedNanoTime() {
    final long nanoTime = ScheduledFutureTaskHack.invokeNanoTime() ;
    LOGGER.info(
        ToStringTools.getNiceName( ScheduledFutureTaskHack.StaticMethodDelegate.class ) +
        "#nanoTime(): " + nanoTime + "."
    ) ;
    return nanoTime ;
  }

}

InstrumentedNioEventLoopGroup.java

package com.otcdlink.chiron.integration.harness;

import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ScheduledFuture;

import javax.annotation.Nonnull;
import java.time.Instant;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static com.google.common.base.Preconditions.checkNotNull;

class InstrumentedNioEventLoopGroup extends NioEventLoopGroup {

  /**
   * Consume the value obtained from
   * {@link io.netty.util.concurrent.ScheduledFutureTask#deadlineNanos()}.
   * This is hardly mappable to an exact {@link Instant} (even if the Java flavor retains
   * nanoseconds) but this is enough to compare with {@link System#nanoTime()}.
   */
  private final Consumer< Long > scheduledTaskMomentConsumer ;

  public InstrumentedNioEventLoopGroup(
      final ThreadFactory threadFactory,
      final Consumer< Long > scheduledTaskMomentConsumer
  ) {
    // Need 2 threads because one will block on Socket Selector if there is no IO,
    // so we add one to poll Tasks.
    super( 2, threadFactory ) ;
    this.scheduledTaskMomentConsumer = checkNotNull( scheduledTaskMomentConsumer ) ;
  }

  private < FUTURE extends Future > FUTURE recordDeadlineNanos( final FUTURE future ) {
    final Long deadlineNanos = ScheduledFutureTaskHack.invokeDeadlineNanos( future ) ;
    if( deadlineNanos != null ) {
      scheduledTaskMomentConsumer.accept( deadlineNanos ) ;
    }
    return future ;
  }


  @Nonnull
  @Override
  public Future< ? > submit( final Runnable task ) {
    return recordDeadlineNanos( super.submit( task ) ) ;
  }

  @Nonnull
  @Override
  public < T > Future< T > submit(
      final Runnable task,
      final T result
  ) {
    return recordDeadlineNanos( super.submit( task, result ) ) ;
  }

  @Nonnull
  @Override
  public < T > Future< T > submit( final Callable< T > task ) {
    return recordDeadlineNanos( super.submit( task ) ) ;
  }

  @Nonnull
  @Override
  public ScheduledFuture< ? > schedule(
      final Runnable command,
      final long delay,
      final TimeUnit unit
  ) {
    return recordDeadlineNanos( super.schedule( command, delay, unit ) ) ;
  }

  @Nonnull
  @Override
  public < V > ScheduledFuture< V > schedule(
      final Callable< V > callable,
      final long delay,
      final TimeUnit unit
  ) {
    return recordDeadlineNanos( super.schedule( callable, delay, unit ) ) ;
  }

  @Nonnull
  @Override
  public ScheduledFuture< ? > scheduleAtFixedRate(
      final Runnable command,
      final long initialDelay,
      final long period,
      final TimeUnit unit
  ) {
    return recordDeadlineNanos(
        super.scheduleAtFixedRate( command, initialDelay, period, unit ) ) ;
  }

  @Nonnull
  @Override
  public ScheduledFuture< ? > scheduleWithFixedDelay(
      final Runnable command,
      final long initialDelay,
      final long delay,
      final TimeUnit unit
  ) {
    return recordDeadlineNanos(
        super.scheduleWithFixedDelay( command, initialDelay, delay, unit ) ) ;
  }
}

InstrumentedNioEventLoopGroupTest.java

package com.otcdlink.chiron.integration.harness;

import com.otcdlink.chiron.toolbox.concurrent.ExecutorTools;
import com.otcdlink.chiron.toolbox.netty.NettyTools;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.ScheduledFuture;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.assertj.core.api.Assertions.assertThat;

public class InstrumentedNioEventLoopGroupTest {

  @Test
  public void recordAndAdjust() throws InterruptedException {

    final int delay = 10 ;
    final TimeUnit timeUnit = TimeUnit.SECONDS ;

    final AtomicLong nanoInstantSupplier = new AtomicLong() ;
    ScheduledFutureTaskHack.install( nanoInstantSupplier::get ) ;

    final List< Long > taskDeadlineRecorder = Collections.synchronizedList( new ArrayList<>() ) ;
    final InstrumentedNioEventLoopGroup executor = new InstrumentedNioEventLoopGroup(
        ExecutorTools.newThreadFactory( "executor" ), taskDeadlineRecorder::add ) ;
    executor.setIoRatio( 1 ) ;  // Silly but worth trying to see what can get wrong.

    final Semaphore doneSemaphore = new Semaphore( 0 ) ;
    final ScheduledFuture< ? > scheduledFuture1 =
        executor.schedule( ( Runnable ) doneSemaphore::release, delay, timeUnit ) ;
    LOGGER.info( "Scheduled " + scheduledFuture1 + "." ) ;

    assertThat( taskDeadlineRecorder ).hasSize( 1 ) ;
    final Long nanoTime = taskDeadlineRecorder.get( 0 ) - ScheduledFutureTaskHack.START_TIME ;
    LOGGER.info( "Recorded " + nanoTime + " as nanoTime deadline for next task." ) ;

    assertThat( nanoTime ).isEqualTo( timeUnit.toNanos( delay ) ) ;
    final long pastDeadline = nanoTime + 1 ;
    nanoInstantSupplier.set( pastDeadline ) ;
    LOGGER.info(
        "Did set nanoTime to " + pastDeadline + ", past to Task deadline. " +
        "Invocation of hacked nanoTime() returns " +
        ScheduledFutureTaskHack.invokeNanoTime() + "."
    ) ;
    LOGGER.info( "Now waiting for task completion ..." ) ;
    assertThat( doneSemaphore.tryAcquire( 3, TimeUnit.SECONDS ) ).isTrue() ;
  }

  /**
   * Fails when ran after {@link #recordAndAdjust()} because JUnit doesn't reload classes for
   * each method inside a test class.
   */
  @Test
  public void noInstrumentation() throws InterruptedException {
    final NioEventLoopGroup executor =
        new NioEventLoopGroup( 1, ExecutorTools.newThreadFactory( "executor" ) ) ;
    final Semaphore doneSemaphore = new Semaphore( 0 ) ;
    executor.submit( () -> LOGGER.info( "Plain submission works!" ) ) ;
    final ScheduledFuture< ? > scheduledFuture =
        executor.schedule( ( Runnable ) doneSemaphore::release, 1, TimeUnit.SECONDS ) ;
    LOGGER.info( "Scheduled " + scheduledFuture + "." ) ;
    assertThat( doneSemaphore.tryAcquire( 3, TimeUnit.SECONDS ) ).isTrue() ;
  }


// =======
// Fixture
// =======

  private static final Logger LOGGER =
      LoggerFactory.getLogger( InstrumentedNioEventLoopGroupTest.class ) ;

  static {
    NettyTools.forceNettyClassesToLoad() ;
  }

}

Я являюсь автором Chiron Framework, основанной на WebSocket сетевой каркас с чистым Java-клиентом и неблокирующей двухфакторной аутентификацией. Он много использует Netty. К сожалению, существует множество тестов на основе JMockit, которые не выполняются надежно, потому что порядок выполнения может быть недетерминированным (это проблема с каждой частью кода, который планирует задачи).

Ответы

Ответ 1

О, ребята, это было так очевидно: я переопределяю каждый метод schedule*, накапливаю Runnable/Callable и другие параметры в объекте данных и добавляю его в некоторую очередь. Затем я запускаю выполнение задачи из теста.

Поскольку я код, который создает задачи, принадлежит мне, я украшаю каждую задачу (которая является ссылкой на метод) с помощью интерфейса тегирования. Тогда тест может проверить, выполняет ли он ожидаемую задачу.