公司做一个从ftp server上下载txt文件,然后将文件内容写入到sql server中去,将从ftp server上拿文件的部分做个笔记,不是大牛,不了解ftp相关原理,只是解决工作中遇到的场景,希望能帮到需要协助的人.大牛勿喷.
pom.xml中引入依赖
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.6</version>
</dependency>
application.yml中填写配置信息
ftp:
hostName: 192.168.240.99
port: 88
userName:root
passWord: root
defaultLocalPath: D:/home //下载到本地存储的路径
defaultRemotePath: /home/ //ftp server上的文件路径
在project中config包下,创建ftp配置类,项目启动会将ftp的配置信息,加载进来
@Data
@Component("ftpConfig")
public class FtpConfig {
/**
* ftp server host
*/
@Value("${ftp.hostName}")
private String hostName;
/**
* ftp server port
*/
@Value("${ftp.port}")
private Integer port;
/**
* ftp login username
*/
@Value("${ftp.userName}")
private String userName;
/**
* ftp login password
*/
@Value("${ftp.passWord}")
private String passWord;
/**
* default local path
*/
@Value("${ftp.defaultLocalPath}")
private String defaultLocalPath;
/**
* ftp server default path
*/
@Value("${ftp.defaultRemotePath}")
private String defaultRemotePath;
}
创建一个工厂类,通过此工厂类创建,销毁,ftpClient,和验证ftpClient是否存活,用到的ftpClient的API不知道是做什么的,可以去搜索下,里面介绍的都比较详细.
@Slf4j
@Component
public class FtpClientFactory {
private static final String ENCODING = "utf-8";
private static final int TIMEOUT = 10 * 1000;
private final FtpConfig ftpConfig;
public FtpClientFactory(FtpConfig ftpConfig) {
this.ftpConfig = ftpConfig;
}
public FTPClient makeClient() {
FTPClient ftpClient = new FTPClient();
ftpClient.setConnectTimeout(TIMEOUT);
ftpClient.setControlEncoding(ENCODING);
try {
// connect ftp server
ftpClient.connect(ftpConfig.getHostName(), ftpConfig.getPort());
// login ftp server
ftpClient.login(ftpConfig.getUserName(), ftpConfig.getPassWord());
// check whether login in to the server
int replyCode = ftpClient.getReplyCode();
if (FTPReply.isPositiveCompletion(replyCode)) {
log.info("The login to the FTP server is successful.");
} else {
log.error("The login to the FTP server failed.");
return null;
}
// File Type
ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE);
// Passive Mode
ftpClient.enterLocalActiveMode();
} catch (Exception e) {
log.error("factory make client exception", e);
destroyClient(ftpClient);
}
return ftpClient;
}
public void destroyClient(FTPClient ftpClient) {
try {
if (ftpClient != null && ftpClient.isConnected()) {
ftpClient.logout();
}
} catch (Exception e) {
log.error("ftpClient logout exception", e);
} finally {
try {
if (ftpClient != null) {
ftpClient.disconnect();
}
} catch (Exception e2) {
log.error("ftpClient disconnect exception", e2);
}
}
}
public boolean validateClient(FTPClient ftpClient) {
try {
return ftpClient.sendNoOp();
} catch (Exception e) {
log.error("ftpClient validate exception", e);
}
return false;
}
当ftpClient可以构建完成之后,可以封装一个FtpUtils工具类,来进行一些ftp的操作
@Slf4j
public class FtpUtils {
private static final String SLASH_STR = "/";
private static final String NOT_FIND_DIRECTORY = "Can not find the directory in ftp server.";
private static final String NOT_FIND_FILE = "Can not find the file in the path.";
private static final String NOT_FIND_LOCAL_PATH = "Can not find the local path.";
private static final String LOCAL_PATH_NOT_DIRECTORY = "The local path is not a directory.";
/**
* traverse all files under the directory recursively
*
* @param ftpClient ftp client
* @param fileList file list
* @param pathName directory to be traversed, must start and end with a slash(/)
*/
public static List<String> listFile(FTPClient ftpClient, List<String> fileList, String pathName) {
if (pathName.startsWith(SLASH_STR) && pathName.endsWith(SLASH_STR)) {
//change working directory
changeWorkingDirectory(ftpClient, pathName);
FTPFile[] files = new FTPFile[0];
try {
files = ftpClient.listFiles();
} catch (IOException e) {
log.error("Get ftp server file list error.");
}
for (FTPFile file : files) {
if (file.isFile()) {
fileList.add(pathName + file.getName());
} else if (file.isDirectory()) {
/*
* This judgment needs to be added.
* FTP will include the directory under the project file directory(./)
* and the directory under the directory one level up from the project
* file directory(../) in the recursion.
* It will cause a cycle, so need to filter it out.
*/
if (!".".equals(file.getName()) && !"..".equals(file.getName())) {
listFile(ftpClient, fileList, pathName + file.getName() + SLASH_STR);
}
}
}
}
return fileList;
}
/**
* traverse the specified type file under the directory recursively
*
* @param ftpClient ftp client
* @param fileList file list
* @param pathName directory to be traversed, must start and end with a slash(/)
* @param ext the extension of the file
*/
public static List<String> listFileByExt(FTPClient ftpClient, List<String> fileList, String pathName, String ext) {
if (pathName.startsWith(SLASH_STR) && pathName.endsWith(SLASH_STR)) {
FTPFile[] files = new FTPFile[0];
try {
//change working directory
changeWorkingDirectory(ftpClient, pathName);
files = ftpClient.listFiles();
} catch (IOException e) {
log.error("Get ftp server file list error.");
}
for (FTPFile file : files) {
if (file.isFile()) {
if (file.getName().endsWith(ext)) {
fileList.add(pathName + file.getName());
}
} else if (file.isDirectory()) {
if (!".".equals(file.getName()) && !"..".equals(file.getName())) {
listFileByExt(ftpClient, fileList, pathName + file.getName() + SLASH_STR, ext);
}
}
}
}
return fileList;
}
/**
* upload file
*
* @param ftpClient ftp client
* @param pathname ftp server path
* @param fileName upload file name
* @param inputStream inputStream
* @return operation result
*/
public static boolean uploadFile(FTPClient ftpClient, String pathname, String fileName, InputStream inputStream) {
boolean flag = false;
try {
log.info("start upload file");
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
ftpClient.enterLocalPassiveMode();
createDirectory(ftpClient, pathname);
ftpClient.makeDirectory(pathname);
ftpClient.setControlEncoding("utf-8");
ftpClient.storeFile(fileName, inputStream);
inputStream.close();
flag = true;
log.info("upload file successful");
} catch (Exception e) {
log.error("failed to upload file");
e.printStackTrace();
} finally {
if (null != inputStream) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return flag;
}
/**
* change working directory
*
* @param ftpClient ftp client
* @param directory ftp directory
* @return operation result
*/
public static boolean changeWorkingDirectory(FTPClient ftpClient, String directory) {
boolean flag;
try {
flag = ftpClient.changeWorkingDirectory(directory);
if (flag) {
log.info("enter" + directory + " successfully!");
} else {
log.info("enter" + directory + " failed!Start to create this directory.");
throw new CommonException(HttpStatusCode.BAD_REQUEST, NOT_FIND_DIRECTORY);
}
} catch (IOException ioe) {
ioe.printStackTrace();
}
return true;
}
/**
* Create multilayer directory. If the directory already exists on the FTP server, do not create it. If no, create it.
*
* @param ftpClient ftp client
* @param remote remote path
* @throws IOException io exception
*/
public static void createDirectory(FTPClient ftpClient, String remote) throws IOException {
String directory = remote + SLASH_STR;
// If the remote directory does not exist, the remote server directory will be created recursively
if (!SLASH_STR.equalsIgnoreCase(directory) && !changeWorkingDirectory(ftpClient, directory)) {
int start = 0;
int end;
if (directory.startsWith(SLASH_STR)) {
start = 1;
}
end = directory.indexOf(SLASH_STR, start);
String path = "";
do {
String subDirectory = new String(remote.substring(start, end).getBytes(StandardCharsets.UTF_8), StandardCharsets.ISO_8859_1);
path = path + SLASH_STR + subDirectory;
if (!existFile(ftpClient, path)) {
if (!makeDirectory(ftpClient, subDirectory)) {
log.error("create directory[" + subDirectory + "]failed");
}
}
changeWorkingDirectory(ftpClient, subDirectory);
start = end + 1;
end = directory.indexOf(SLASH_STR, start);
// Check whether all directories are created
} while (end > start);
}
}
/**
* Check whether files on the FTP server exist
*
* @param ftpClient ftp client
* @param path path name
* @return operation result
* @throws IOException io exception
*/
public static boolean existFile(FTPClient ftpClient, String path) throws IOException {
boolean flag = false;
FTPFile[] ftpFileArr = ftpClient.listFiles(path);
if (ftpFileArr.length > 0) {
flag = true;
}
return flag;
}
/**
* Check whether local path exist
*
* @param path local path name
*/
public static void existLocalPath(String path) {
File directory = new File(path);
if (!directory.exists()) {
throw new CommonException(HttpStatusCode.BAD_REQUEST, NOT_FIND_LOCAL_PATH);
}
if (!directory.isDirectory()) {
throw new CommonException(HttpStatusCode.BAD_REQUEST, LOCAL_PATH_NOT_DIRECTORY);
}
}
/**
* create directory
*
* @param ftpClient ftp client
* @param dir dir name
* @return operation result
*/
public static boolean makeDirectory(FTPClient ftpClient, String dir) {
boolean flag = true;
try {
flag = ftpClient.makeDirectory(dir);
if (flag) {
log.info("create directory " + dir + " successful!");
} else {
log.error("create directory " + dir + " failed!");
}
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
/**
* download all files, not include package by given path
*
* @param ftpClient ftp client
* @param pathName FTP server path name
* @param localPath local file path
* @return operation result
*/
public static boolean downloadFileByPath(FTPClient ftpClient, String pathName, String localPath) {
boolean flag = false;
OutputStream os = null;
//switch ftp directory
changeWorkingDirectory(ftpClient, pathName);
//check local path
existLocalPath(localPath);
try {
log.info("start to download file");
FTPFile[] ftpFiles = ftpClient.listFiles();
for (FTPFile file : ftpFiles) {
if (file.isFile()) {
File localFile = new File(localPath + SLASH_STR + file.getName());
os = new FileOutputStream(localFile);
ftpClient.retrieveFile(file.getName(), os);
os.close();
}
}
flag = true;
log.info("download file successful");
} catch (Exception e) {
log.error("download file failed");
e.printStackTrace();
} finally {
if (null != os) {
try {
os.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return flag;
}
/**
* download file
*
* @param ftpClient ftp client
* @param pathName FTP server path name
* @param fileName file name
* @param localPath local file path
* @return operation result
*/
public static boolean downloadFile(FTPClient ftpClient, String pathName, String fileName, String localPath) {
boolean flag = false;
OutputStream os = null;
//switch ftp directory
changeWorkingDirectory(ftpClient, pathName);
//check local path
existLocalPath(localPath);
try {
log.info("start to download file");
FTPFile[] ftpFiles = ftpClient.listFiles();
for (FTPFile file : ftpFiles) {
if (fileName.equalsIgnoreCase(file.getName())) {
File localFile = new File(localPath + SLASH_STR + file.getName());
os = new FileOutputStream(localFile);
ftpClient.retrieveFile(file.getName(), os);
os.close();
flag = true;
}
}
} catch (Exception e) {
log.error("download file failed");
e.printStackTrace();
} finally {
if (null != os) {
try {
os.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
if (!flag) {
throw new CommonException(HttpStatusCode.BAD_REQUEST, NOT_FIND_FILE);
}
return true;
}
/**
* delete file
*
* @param ftpClient ftp client
* @param pathname FTP server path name
* @param filename file name
* @return operation result
*/
public static boolean deleteFile(FTPClient ftpClient, String pathname, String filename) {
boolean flag = false;
try {
log.info("start to delete file");
// switch ftp directory
ftpClient.changeWorkingDirectory(pathname);
ftpClient.dele(filename);
flag = true;
log.info("delete file successful");
} catch (Exception e) {
log.info("delete file failed");
e.printStackTrace();
}
return flag;
}
/**
* write inputStream to HtpServletResponse
*
* @param input inputStream
* @param response HttpServletResponse
* @return operation result
*/
public static boolean inputToResponse(InputStream input, HttpServletResponse response) {
boolean flag = false;
byte[] buffer = new byte[2048];
BufferedInputStream bis = null;
try {
bis = new BufferedInputStream(input);
OutputStream os = response.getOutputStream();
int i = bis.read(buffer);
while (i != -1) {
os.write(buffer, 0, i);
i = bis.read(buffer);
}
flag = true;
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (bis != null) {
bis.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return flag;
}
}
至此,java代码对ftp server的操作,就可以使用了,如果需求对fpt server操作比较频繁的话,可以选择弄一个ftpClient池,这样可以节省一点,client开关的性能消耗,可以构建一个clientPool,原理是通过ftpClientFactory来创建ftpClient,然后将ftpClient放入一个阻塞队列中(BlockingQueue<FTPClient> pool)
@Slf4j
@Data
public class FtpClientPool {
private static final int DEFAULT_POOL_SIZE = 16;
private static final int TIMEOUT = 10;
private BlockingQueue<FTPClient> pool;
private FtpClientFactory factory;
public FtpClientPool(FtpClientFactory factory) {
this(factory, DEFAULT_POOL_SIZE);
}
public FtpClientPool(FtpClientFactory factory, int size) {
this.factory = factory;
this.pool = new ArrayBlockingQueue<>(size);
initPool(size);
}
/**
* Initialize the pool
*
* @param maxPoolSize max pool size
* @throws Exception
*/
private void initPool(int maxPoolSize) {
try {
int count = 0;
while (count < maxPoolSize) {
pool.offer(factory.makeClient(), TIMEOUT, TimeUnit.SECONDS);
count++;
}
} catch (Exception e) {
log.error("init ftp connection pool failed", e);
}
}
public FTPClient borrowClient() {
FTPClient client = null;
try {
client = pool.take();
} catch (InterruptedException e) {
log.error("Get connection from ftp connection pool error.");
}
if (client == null) {
client = factory.makeClient();
returnClient(client);
} else if (!factory.validateClient(client)) {
invalidateClient(client);
client = factory.makeClient();
returnClient(client);
}
return client;
}
/**
* Return the client, or destroy it if it cannot be returned within 10 seconds
*
* @param ftpClient ftp client
*/
public void returnClient(FTPClient ftpClient) {
try {
if (ftpClient != null && !pool.offer(ftpClient, TIMEOUT, TimeUnit.SECONDS)) {
factory.destroyClient(ftpClient);
}
} catch (Exception e) {
log.error("Failed to return client.", e);
}
}
/**
* Invalidate the client
*
* @param ftpClient ftp client
*/
public void invalidateClient(FTPClient ftpClient) {
pool.remove(ftpClient);
factory.destroyClient(ftpClient);
}
@PreDestroy
public void close() throws Exception {
while (pool.iterator().hasNext()) {
FTPClient client = pool.take();
factory.destroyClient(client);
}
}
}
如果要使用连接池的话,要注意,ftpClient如果长时间没有进行操作,ftpServer会认为,这个client已经不用了,client会变无效.所以需要定期来检查下,连接池里的client是否有效,无效的话,将其销毁,一种方案是弄一个定时任务,来定时检查,我这边为了方便,就弄了一个死循环,来检查,生产环境的话,还是选择定时任务,或者其他更好的方案
@Slf4j
@Component
public class FtpClientKeepAlive {
private static final int CYCLE_TIME = 30 * 1000;
private KeepAliveThread keepAliveThread;
private final FtpClientPool ftpClientPool;
public FtpClientKeepAlive(FtpClientPool ftpClientPool) {
this.ftpClientPool = ftpClientPool;
}
@PostConstruct
public void init() {
// Start the heartbeat detection thread
if (keepAliveThread == null) {
keepAliveThread = new KeepAliveThread();
Thread thread = new Thread(keepAliveThread);
thread.start();
}
}
class KeepAliveThread implements Runnable {
@Override
public void run() {
FTPClient ftpClient = null;
while (true) {
try {
BlockingQueue<FTPClient> pool = ftpClientPool.getPool();
if (pool != null && pool.size() > 0) {
Iterator<FTPClient> it = pool.iterator();
while (it.hasNext()) {
ftpClient = it.next();
boolean result = ftpClient.sendNoOp();
log.debug("heart beat result: {}", result);
if (!result) {
ftpClientPool.invalidateClient(ftpClient);
}
}
}
} catch (Exception e) {
log.error("ftp heart beat exception", e);
ftpClientPool.invalidateClient(ftpClient);
}
try {
Thread.sleep(CYCLE_TIME);
} catch (InterruptedException e) {
log.error("ftp sleep exception", e);
}
}
}
}
}
© 版权声明
文章版权归作者所有,未经允许请勿转载。
相关文章
暂无评论...


