Ответ 1
Можно обновить AbstractInboundFileSynchronizer
, чтобы распознать обновленные файлы, но он хрупкий, и вы столкнулись с другими проблемами.
Обновление 13/Ноябрь/2016: Обнаружено, как получить временные метки изменений в секундах.
Основная проблема с обновлением AbstractInboundFileSynchronizer
заключается в том, что у него есть методы setter, но нет (защищенные) getter-методы. Если в будущем методы-сеттеры сделают что-то умное, обновленная версия, представленная здесь, сломается.
Основная проблема с обновлением файлов в локальном каталоге - concurrency: если вы обрабатываете локальный файл одновременно с получением обновления, вы можете столкнуться со всеми неприятностями. Легкий выход - переместить локальный файл в (временный) каталог обработки, чтобы обновление можно было получить в виде нового файла, что, в свою очередь, устраняет необходимость обновления AbstractInboundFileSynchronizer
. См. Также временную метку Camel примечания.
По умолчанию FTP-серверы предоставляют временные метки модификации в минутах. Для тестирования я обновил FTP-клиент, чтобы использовать команду MLSD, которая предоставляет временные метки изменения в секундах (и миллисекундах, если вам повезет), но не все FTP-серверы поддерживают это.
Как упоминалось в Spring ссылка FTP, фильтр локального файла должен быть FileSystemPersistentAcceptOnceFileListFilter
, чтобы обеспечить сбор локальных файлов когда изменяется временная метка изменения.
Ниже моей версии обновленного AbstractInboundFileSynchronizer
, за которым следуют некоторые классы тестов, которые я использовал.
public class FtpUpdatingFileSynchronizer extends FtpInboundFileSynchronizer {
protected final Log logger = LogFactory.getLog(this.getClass());
private volatile Expression localFilenameGeneratorExpression;
private volatile EvaluationContext evaluationContext;
private volatile boolean deleteRemoteFiles;
private volatile String remoteFileSeparator = "/";
private volatile boolean preserveTimestamp;
public FtpUpdatingFileSynchronizer(SessionFactory<FTPFile> sessionFactory) {
super(sessionFactory);
setPreserveTimestamp(true);
}
@Override
public void setLocalFilenameGeneratorExpression(Expression localFilenameGeneratorExpression) {
super.setLocalFilenameGeneratorExpression(localFilenameGeneratorExpression);
this.localFilenameGeneratorExpression = localFilenameGeneratorExpression;
}
@Override
public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
super.setIntegrationEvaluationContext(evaluationContext);
this.evaluationContext = evaluationContext;
}
@Override
public void setDeleteRemoteFiles(boolean deleteRemoteFiles) {
super.setDeleteRemoteFiles(deleteRemoteFiles);
this.deleteRemoteFiles = deleteRemoteFiles;
}
@Override
public void setRemoteFileSeparator(String remoteFileSeparator) {
super.setRemoteFileSeparator(remoteFileSeparator);
this.remoteFileSeparator = remoteFileSeparator;
}
@Override
public void setPreserveTimestamp(boolean preserveTimestamp) {
// updated
Assert.isTrue(preserveTimestamp, "for updating timestamps must be preserved");
super.setPreserveTimestamp(preserveTimestamp);
this.preserveTimestamp = preserveTimestamp;
}
@Override
protected void copyFileToLocalDirectory(String remoteDirectoryPath, FTPFile remoteFile, File localDirectory,
Session<FTPFile> session) throws IOException {
String remoteFileName = this.getFilename(remoteFile);
String localFileName = this.generateLocalFileName(remoteFileName);
String remoteFilePath = (remoteDirectoryPath != null
? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName)
: remoteFileName);
if (!this.isFile(remoteFile)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("cannot copy, not a file: " + remoteFilePath);
}
return;
}
// start update
File localFile = new File(localDirectory, localFileName);
boolean update = false;
if (localFile.exists()) {
if (this.getModified(remoteFile) > localFile.lastModified()) {
this.logger.info("Updating local file " + localFile);
update = true;
} else {
this.logger.info("File already exists: " + localFile);
return;
}
}
// end update
String tempFileName = localFile.getAbsolutePath() + this.getTemporaryFileSuffix();
File tempFile = new File(tempFileName);
OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile));
try {
session.read(remoteFilePath, outputStream);
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
else {
throw new MessagingException("Failure occurred while copying from remote to local directory", e);
}
} finally {
try {
outputStream.close();
}
catch (Exception ignored2) {
}
}
// updated
if (update && !localFile.delete()) {
throw new MessagingException("Unable to delete local file [" + localFile + "] for update.");
}
if (tempFile.renameTo(localFile)) {
if (this.deleteRemoteFiles) {
session.remove(remoteFilePath);
if (this.logger.isDebugEnabled()) {
this.logger.debug("deleted " + remoteFilePath);
}
}
// updated
this.logger.info("Stored file locally: " + localFile);
} else {
// updated
throw new MessagingException("Unable to rename temporary file [" + tempFile + "] to [" + localFile + "]");
}
if (this.preserveTimestamp) {
localFile.setLastModified(getModified(remoteFile));
}
}
private String generateLocalFileName(String remoteFileName) {
if (this.localFilenameGeneratorExpression != null) {
return this.localFilenameGeneratorExpression.getValue(this.evaluationContext, remoteFileName, String.class);
}
return remoteFileName;
}
}
Следуя некоторым из тестовых классов, которые я использовал.
Я использовал зависимости org.springframework.integration:spring-integration-ftp:4.3.5.RELEASE
и org.apache.ftpserver:ftpserver-core:1.0.6
(плюс обычные зависимости для регистрации и тестирования).
public class TestFtpSync {
static final Logger log = LoggerFactory.getLogger(TestFtpSync.class);
static final String FTP_ROOT_DIR = "target" + File.separator + "ftproot";
// org.apache.ftpserver:ftpserver-core:1.0.6
static FtpServer server;
@BeforeClass
public static void startServer() throws FtpException {
File ftpRoot = new File (FTP_ROOT_DIR);
ftpRoot.mkdirs();
TestUserManager userManager = new TestUserManager(ftpRoot.getAbsolutePath());
FtpServerFactory serverFactory = new FtpServerFactory();
serverFactory.setUserManager(userManager);
ListenerFactory factory = new ListenerFactory();
factory.setPort(4444);
serverFactory.addListener("default", factory.createListener());
server = serverFactory.createServer();
server.start();
}
@AfterClass
public static void stopServer() {
if (server != null) {
server.stop();
}
}
File ftpFile = Paths.get(FTP_ROOT_DIR, "test1.txt").toFile();
File ftpFile2 = Paths.get(FTP_ROOT_DIR, "test2.txt").toFile();
@Test
public void syncDir() {
// org.springframework.integration:spring-integration-ftp:4.3.5.RELEASE
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
try {
ctx.register(FtpSyncConf.class);
ctx.refresh();
PollableChannel msgChannel = ctx.getBean("inputChannel", PollableChannel.class);
for (int j = 0; j < 2; j++) {
for (int i = 0; i < 2; i++) {
storeFtpFile();
}
for (int i = 0; i < 4; i++) {
fetchMessage(msgChannel);
}
}
} catch (Exception e) {
throw new AssertionError("FTP test failed.", e);
} finally {
ctx.close();
cleanup();
}
}
boolean tswitch = true;
void storeFtpFile() throws IOException, InterruptedException {
File f = (tswitch ? ftpFile : ftpFile2);
tswitch = !tswitch;
log.info("Writing message " + f.getName());
Files.write(f.toPath(), ("Hello " + System.currentTimeMillis()).getBytes());
}
Message<?> fetchMessage(PollableChannel msgChannel) {
log.info("Fetching message.");
Message<?> msg = msgChannel.receive(1000L);
if (msg == null) {
log.info("No message.");
} else {
log.info("Have a message: " + msg);
}
return msg;
}
void cleanup() {
delFile(ftpFile);
delFile(ftpFile2);
File d = new File(FtpSyncConf.LOCAL_DIR);
if (d.isDirectory()) {
for (File f : d.listFiles()) {
delFile(f);
}
}
log.info("Finished cleanup");
}
void delFile(File f) {
if (f.isFile()) {
if (f.delete()) {
log.info("Deleted " + f);
} else {
log.error("Cannot delete file " + f);
}
}
}
}
public class MlistFtpSessionFactory extends AbstractFtpSessionFactory<MlistFtpClient> {
@Override
protected MlistFtpClient createClientInstance() {
return new MlistFtpClient();
}
}
public class MlistFtpClient extends FTPClient {
@Override
public FTPFile[] listFiles(String pathname) throws IOException {
return super.mlistDir(pathname);
}
}
@EnableIntegration
@Configuration
public class FtpSyncConf {
private static final Logger log = LoggerFactory.getLogger(FtpSyncConf.class);
public static final String LOCAL_DIR = "/tmp/received";
@Bean(name = "ftpMetaData")
public ConcurrentMetadataStore ftpMetaData() {
return new SimpleMetadataStore();
}
@Bean(name = "localMetaData")
public ConcurrentMetadataStore localMetaData() {
return new SimpleMetadataStore();
}
@Bean(name = "ftpFileSyncer")
public FtpUpdatingFileSynchronizer ftpFileSyncer(
@Qualifier("ftpMetaData") ConcurrentMetadataStore metadataStore) {
MlistFtpSessionFactory ftpSessionFactory = new MlistFtpSessionFactory();
ftpSessionFactory.setHost("localhost");
ftpSessionFactory.setPort(4444);
ftpSessionFactory.setUsername("demo");
ftpSessionFactory.setPassword("demo");
FtpPersistentAcceptOnceFileListFilter fileFilter = new FtpPersistentAcceptOnceFileListFilter(metadataStore, "ftp");
fileFilter.setFlushOnUpdate(true);
FtpUpdatingFileSynchronizer ftpFileSync = new FtpUpdatingFileSynchronizer(ftpSessionFactory);
ftpFileSync.setFilter(fileFilter);
// ftpFileSync.setDeleteRemoteFiles(true);
return ftpFileSync;
}
@Bean(name = "syncFtp")
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "500", maxMessagesPerPoll = "1"))
public MessageSource<File> syncChannel(
@Qualifier("localMetaData") ConcurrentMetadataStore metadataStore,
@Qualifier("ftpFileSyncer") FtpUpdatingFileSynchronizer ftpFileSync) throws Exception {
FtpInboundFileSynchronizingMessageSource messageSource = new FtpInboundFileSynchronizingMessageSource(ftpFileSync);
File receiveDir = new File(LOCAL_DIR);
receiveDir.mkdirs();
messageSource.setLocalDirectory(receiveDir);
messageSource.setLocalFilter(new FileSystemPersistentAcceptOnceFileListFilter(metadataStore, "local"));
log.info("Message source bean created.");
return messageSource;
}
@Bean(name = "inputChannel")
public PollableChannel inputChannel() {
QueueChannel channel = new QueueChannel();
log.info("Message channel bean created.");
return channel;
}
}
/**
* Copied from https://github.com/spring-projects/spring-integration-samples/tree/master/basic/ftp/src/test/java/org/springframework/integration/samples/ftp/support
* @author Gunnar Hillert
*
*/
public class TestUserManager extends AbstractUserManager {
private BaseUser testUser;
private BaseUser anonUser;
private static final String TEST_USERNAME = "demo";
private static final String TEST_PASSWORD = "demo";
public TestUserManager(String homeDirectory) {
super("admin", new ClearTextPasswordEncryptor());
testUser = new BaseUser();
testUser.setAuthorities(Arrays.asList(new Authority[] {new ConcurrentLoginPermission(1, 1), new WritePermission()}));
testUser.setEnabled(true);
testUser.setHomeDirectory(homeDirectory);
testUser.setMaxIdleTime(10000);
testUser.setName(TEST_USERNAME);
testUser.setPassword(TEST_PASSWORD);
anonUser = new BaseUser(testUser);
anonUser.setName("anonymous");
}
public User getUserByName(String username) throws FtpException {
if(TEST_USERNAME.equals(username)) {
return testUser;
} else if(anonUser.getName().equals(username)) {
return anonUser;
}
return null;
}
public String[] getAllUserNames() throws FtpException {
return new String[] {TEST_USERNAME, anonUser.getName()};
}
public void delete(String username) throws FtpException {
throw new UnsupportedOperationException("Deleting of FTP Users is not supported.");
}
public void save(User user) throws FtpException {
throw new UnsupportedOperationException("Saving of FTP Users is not supported.");
}
public boolean doesExist(String username) throws FtpException {
return (TEST_USERNAME.equals(username) || anonUser.getName().equals(username)) ? true : false;
}
public User authenticate(Authentication authentication) throws AuthenticationFailedException {
if(UsernamePasswordAuthentication.class.isAssignableFrom(authentication.getClass())) {
UsernamePasswordAuthentication upAuth = (UsernamePasswordAuthentication) authentication;
if(TEST_USERNAME.equals(upAuth.getUsername()) && TEST_PASSWORD.equals(upAuth.getPassword())) {
return testUser;
}
if(anonUser.getName().equals(upAuth.getUsername())) {
return anonUser;
}
} else if(AnonymousAuthentication.class.isAssignableFrom(authentication.getClass())) {
return anonUser;
}
return null;
}
}
Обновление 15/Ноябрь/2016: Примечание по конфигурации xml.
xml-элемент inbound-channel-adapter
напрямую связан с FtpInboundFileSynchronizer
через org.springframework.integration.ftp.config.FtpInboundChannelAdapterParser
через FtpNamespaceHandler
через spring-integration-ftp-4.3.5.RELEASE.jar!/META-INF/spring.handlers
.
Следуя справочному руководству xml-custom, указание пользовательского FtpNamespaceHandler
в локальном файле META-INF/spring.handlers
должно позволить вам использовать FtpUpdatingFileSynchronizer
вместо FtpInboundFileSynchronizer
. Это не сработало для меня с модульными тестами, хотя правильное решение, вероятно, потребует создания дополнительных/измененных xsd файлов, так что обычный inbound-channel-adapter
использует обычный FtpInboundFileSynchronizer
, а специальный inbound-updating-channel-adapter
использует FtpUpdatingFileSynchronizer
. Выполнение этого правильно немного не подходит для этого ответа.
Быстрый взлом может помочь вам начать. Вы можете перезаписать по умолчанию FtpNamespaceHandler
, создав в своем локальном проекте пакет org.springframework.integration.ftp.config
и class FtpNamespaceHandler
. Содержание, показанное ниже:
package org.springframework.integration.ftp.config;
public class FtpNamespaceHandler extends org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler {
@Override
public void init() {
System.out.println("Initializing FTP updating file synchronizer.");
// one updated line below, rest copied from original FtpNamespaceHandler
registerBeanDefinitionParser("inbound-channel-adapter", new MyFtpInboundChannelAdapterParser());
registerBeanDefinitionParser("inbound-streaming-channel-adapter",
new FtpStreamingInboundChannelAdapterParser());
registerBeanDefinitionParser("outbound-channel-adapter", new FtpOutboundChannelAdapterParser());
registerBeanDefinitionParser("outbound-gateway", new FtpOutboundGatewayParser());
}
}
package org.springframework.integration.ftp.config;
import org.springframework.integration.file.remote.synchronizer.InboundFileSynchronizer;
import org.springframework.integration.ftp.config.FtpInboundChannelAdapterParser;
public class MyFtpInboundChannelAdapterParser extends FtpInboundChannelAdapterParser {
@Override
protected Class<? extends InboundFileSynchronizer> getInboundFileSynchronizerClass() {
System.out.println("Returning updating file synchronizer.");
return FtpUpdatingFileSynchronizer.class;
}
}
Также добавьте preserve-timestamp="true"
в xml файл, чтобы предотвратить новый IllegalArgumentException: for updating timestamps must be preserved
.