# -*- coding: utf-8 -*-
# wasp_general/network/clients/ftp.py
#
# Copyright (C) 2017 the wasp-general authors and contributors
# <see AUTHORS file>
#
# This file is part of wasp-general.
#
# Wasp-general is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Wasp-general is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with wasp-general. If not, see <http://www.gnu.org/licenses/>.
# TODO: document the code
# TODO: write tests for the code
# noinspection PyUnresolvedReferences
from wasp_general.version import __author__, __version__, __credits__, __license__, __copyright__, __email__
# noinspection PyUnresolvedReferences
from wasp_general.version import __status__
import ftplib
from wasp_general.network.clients.proto import WNetworkClientProto
from wasp_general.network.clients.base import WBasicNetworkClientProto
from wasp_general.network.clients.base import WBasicNetworkClientListDirCapability
from wasp_general.network.clients.base import WBasicNetworkClientChangeDirCapability
from wasp_general.network.clients.base import WBasicNetworkClientMakeDirCapability
from wasp_general.network.clients.base import WBasicNetworkClientCurrentDirCapability
from wasp_general.network.clients.base import WBasicNetworkClientUploadFileCapability
from wasp_general.network.clients.base import WBasicNetworkClientRemoveFileCapability
[docs]class WFTPClient(WBasicNetworkClientProto):
def __init__(self, uri):
WBasicNetworkClientProto.__init__(self, uri)
try:
ftp_args = {'host': uri.hostname()}
# TODO: FTP class in python3.6 has port argument. But 3.4 doesn't
'''
if uri.port() is not None:
ftp_args['port'] = uri.port()
'''
ftp_client = ftplib.FTP(**ftp_args)
login_args = {}
if uri.username() is not None:
login_args['user'] = uri.username()
if uri.password():
login_args['passwd'] = uri.password()
ftp_client.login(**login_args)
ftp_client.cwd(uri.path() if uri.path() is not None else '/')
self.__ftp_client = ftp_client
except (ftplib.error_perm, ftplib.error_proto, ftplib.error_reply, ftplib.error_temp):
raise WNetworkClientProto.ConnectionError(
'Unable to connect to %s' % str(uri)
)
except OSError: # no route to host and so on
raise WNetworkClientProto.ConnectionError(
'Unable to connect to %s' % str(uri)
)
[docs] def ftp_client(self):
return self.__ftp_client
def _close(self):
self.__ftp_client.close()
[docs] @classmethod
def scheme(cls):
return 'ftp'
[docs] @classmethod
def agent_capabilities(cls):
return WFTPClientListDirCapability, \
WFTPClientMakeDirCapability, \
WFTPClientChangeDirCapability, \
WFTPClientUploadFileCapability
[docs]class WFTPClientChangeDirCapability(WBasicNetworkClientChangeDirCapability):
[docs] def request(self, path, *args, exec_cmd=None, **kwargs):
try:
self.network_agent().ftp_client().cwd(path)
if exec_cmd is not None:
return self.network_agent().request(exec_cmd, *args, **kwargs)
return True
except (ftplib.error_perm, ftplib.error_proto, ftplib.error_reply, ftplib.error_temp):
if exec_cmd is not None:
return
return False
[docs]class WFTPClientCurrentDirCapability(WBasicNetworkClientCurrentDirCapability):
[docs] def request(self, *args, **kwargs):
return self.network_agent().ftp_client().pwd()
[docs]class WFTPClientListDirCapability(WBasicNetworkClientListDirCapability):
[docs] def request(self, *args, **kwargs):
try:
return tuple(self.network_agent().ftp_client().nlst())
except (ftplib.error_perm, ftplib.error_proto, ftplib.error_reply, ftplib.error_temp):
return
[docs]class WFTPClientMakeDirCapability(WBasicNetworkClientMakeDirCapability):
[docs] def request(self, directory_name, *args, **kwargs):
try:
return len(self.network_agent().ftp_client().mkd(directory_name)) > 0
except (ftplib.error_perm, ftplib.error_proto, ftplib.error_reply, ftplib.error_temp):
return False
[docs]class WFTPClientUploadFileCapability(WBasicNetworkClientUploadFileCapability):
[docs] def request(self, file_name, file_obj, *args, **kwargs):
try:
self.network_agent().ftp_client().storbinary('STOR ' + file_name, file_obj)
return True
except (ftplib.error_perm, ftplib.error_proto, ftplib.error_reply, ftplib.error_temp):
return False
[docs]class WFTPClientRemoveFileCapability(WBasicNetworkClientRemoveFileCapability):
[docs] def request(self, file_name, *args, **kwargs):
try:
self.network_agent().ftp_client().delete(file_name)
return True
except (ftplib.error_perm, ftplib.error_proto, ftplib.error_reply, ftplib.error_temp):
return False