Ответ 1
Наш подход - использовать конструктор-инъекцию сериализуемого factory в носик/болт. Затем выталкиватель/болт проконсультируется с factory в методе open/prepare. Единственная ответственность factory заключается в том, чтобы инкапсулировать получение зависимостей spout/bolt сериализуемым образом. Эта конструкция позволяет нашим модульным испытаниям вводить поддельные/испытательные/макетные заводы, которые при консультации обращаются к макетным сервисам. Таким образом, мы можем узко unit test выталкивать/болты с помощью mocks, например. Mockito.
Ниже приведен общий пример болта и тест для него. Я опустил реализацию factory UserNotificationFactory
, потому что это зависит от вашего приложения. Вы можете использовать сервисные локаторы для получения сервисов, сериализованной конфигурации, конфигурации, доступной для HDFS, или вообще любого способа получить правильные сервисы, если factory может сделать это после цикла serde. Вы должны охватить сериализацию этого класса.
Болт
public class NotifyUserBolt extends BaseBasicBolt {
public static final String NAME = "NotifyUser";
private static final String USER_ID_FIELD_NAME = "userId";
private final UserNotifierFactory factory;
transient private UserNotifier notifier;
public NotifyUserBolt(UserNotifierFactory factory) {
checkNotNull(factory);
this.factory = factory;
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
notifier = factory.createUserNotifier();
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
// This check ensures that the time-dependency imposed by Storm has been observed
checkState(notifier != null, "Unable to execute because user notifier is unavailable. Was this bolt successfully prepared?");
long userId = input.getLongByField(PreviousBolt.USER_ID_FIELD_NAME);
notifier.notifyUser(userId);
collector.emit(new Values(userId));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(USER_ID_FIELD_NAME));
}
}
Test
public class NotifyUserBoltTest {
private NotifyUserBolt bolt;
@Mock
private TopologyContext topologyContext;
@Mock
private UserNotifier notifier;
// This test implementation allows us to get the mock to the unit-under-test.
private class TestFactory implements UserNotifierFactory {
private final UserNotifier notifier;
private TestFactory(UserNotifier notifier) {
this.notifier = notifier;
}
@Override
public UserNotifier createUserNotifier() {
return notifier;
}
}
@Before
public void before() {
MockitoAnnotations.initMocks(this);
// The factory will return our mock `notifier`
bolt = new NotifyUserBolt(new TestFactory(notifier));
// Now the bolt is holding on to our mock and is under our control!
bolt.prepare(new Config(), topologyContext);
}
@Test
public void testExecute() {
long userId = 24;
Tuple tuple = mock(Tuple.class);
when(tuple.getLongByField(PreviousBolt.USER_ID_FIELD_NAME)).thenReturn(userId);
BasicOutputCollector collector = mock(BasicOutputCollector.class);
bolt.execute(tuple, collector);
// Here we just verify a call on `notifier`, but we could have stubbed out behavior befor
// the call to execute, too.
verify(notifier).notifyUser(userId);
verify(collector).emit(new Values(userId));
}
}