Source code for wasp_general.cli.cli

# -*- coding: utf-8 -*-
# wasp_general/cli/cli.py
#
# Copyright (C) 2016 the wasp-general authors and contributors
# <see AUTHORS file>
#
# This file is part of wasp-general.
#
# Wasp-general is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Wasp-general is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with wasp-general.  If not, see <http://www.gnu.org/licenses/>.

# TODO: document the code
# TODO: write tests for the code

# noinspection PyUnresolvedReferences
from wasp_general.version import __author__, __version__, __credits__, __license__, __copyright__, __email__
# noinspection PyUnresolvedReferences
from wasp_general.version import __status__

import traceback

from abc import ABCMeta, abstractmethod
from copy import deepcopy

from wasp_general.verify import verify_type, verify_value
from wasp_general.command.command import WCommandSet, WCommandResultProto


[docs]class WConsoleHistory: """ Simple console history implementation """ def __init__(self): self.__history = [] self.__history_position = None
[docs] def size(self): """ Returns history entries count :return: int """ return len(self.__history)
[docs] @verify_type(pos=(int, None)) @verify_value(pos=lambda x: x is None or x >= 0) def position(self, pos=None): """ Get current and/or set history cursor position :param pos: if value is not None, then current position is set to pos and new value is returned :return: int or None (if position have never been changed) """ if pos is not None: if pos >= len(self.__history): raise IndexError('History position is out of bound') self.__history_position = pos return self.__history_position
[docs] @verify_type(value=str) def add(self, value): """ Add new record to history. Record will be added to the end :param value: new record :return: int record position in history """ index = len(self.__history) self.__history.append(value) return index
[docs] @verify_type(position=int) def entry(self, position): """ Get record from history by record position :param position: record position :return: str """ return self.__history[position]
[docs] @verify_type(value=str, position=(int, None)) @verify_value(position=lambda x: x is None or x >= 0) def update(self, value, position): """ Change record in this history :param value: new record to save :param position: record position to change :return: None """ self.__history[position] = value
[docs]class WConsoleProto(metaclass=ABCMeta): """ Basic class for console implementation. It has non-changeable and changeable history (:class:`.WConsoleHistory`). One stores previous entered rows, other one helps to entered new row by editing previous one. """ def __init__(self): self.__history_mode = False self.__history = WConsoleHistory() self.__editable_history = None self.__current_row = None self.__prompt_show = None
[docs] def history(self): """ Return changeable history :return: WConsoleHistory or None """ return self.__editable_history
[docs] @verify_type(mode_value=(bool, None)) def history_mode(self, mode_value=None): """ Get and/or set current history mode. History mode defines what row will be changed with :meth:`.WConsoleProto.update_row` or can be got by :meth:`.WConsoleProto.row` call. If history mode disabled, then :meth:`.WConsoleProto.update_row` and :meth:`.WConsoleProto.row` affects current row prompt. If history mode is enabled, then :meth:`.WConsoleProto.update_row` and :meth:`.WConsoleProto.row` affects current entry in history :class:`.WConsoleHistory` (entry at :meth:`.WConsoleHistory.position`) History mode is turned off by default. :param mode_value: True value enables history mode. False - disables. None - do nothing :return: bool """ if mode_value is not None: self.__history_mode = mode_value return self.__history_mode
[docs] def start_session(self): """ Start new session and prepare environment for new row editing process :return: None """ self.__current_row = '' self.__history_mode = False self.__editable_history = deepcopy(self.__history) self.__prompt_show = True self.refresh_window()
[docs] def fin_session(self): """ Finalize current session :return: None """ self.__prompt_show = False self.__history.add(self.row()) self.exec(self.row())
[docs] @verify_type(value=str) def update_row(self, value): """ Change row :param value: new row :return: None """ if not self.__history_mode: self.__current_row = value else: self.history().update(value, self.history().position())
[docs] def row(self): """ Get row :return: str """ if not self.__history_mode: return self.__current_row else: return self.history().entry(self.history().position())
[docs] def prompt_show(self): """ Return flag, that shows, whether to display prompt and current command at the window end, or not :return: bool """ return self.__prompt_show
[docs] @abstractmethod def prompt(self): """ Return prompt, that would be printed before row. Prompt length must be the same within every session :return: str """ raise NotImplementedError('This method is abstract')
[docs] @abstractmethod def refresh_window(self): """ Refresh current screen. Simple clear and redraw should work :return: None """ raise NotImplementedError('This method is abstract')
[docs] @abstractmethod @verify_type(row=str) def exec(self, row): """ Must execute given command :param row: command to execute :return: None """ raise NotImplementedError('This method is abstract')
[docs]class WConsoleWindowProto(metaclass=ABCMeta): """ Basic class for console window implementation """ @verify_type(console=WConsoleProto) def __init__(self, console): """ Create new console window :param console: console, that this window is linked to """ self.__console = console self.__previous_data = '' self.__cursor_position = 0 if self.width() < 2: raise RuntimeError('Invalid width. Minimum windows width is 2') if self.height() < 2: raise RuntimeError('Invalid height. Minimum windows height is 2')
[docs] @abstractmethod def width(self): """ Get window width. If windows width was changed - window must be refreshed via :meth:`.WConsoleWindowProto.refresh` :return: int """ raise NotImplementedError('This method is abstract')
[docs] @abstractmethod def height(self): """ Get window height. If windows height was changed - window must be refreshed via :meth:`.WConsoleWindowProto.refresh` :return: int """ raise NotImplementedError('This method is abstract')
[docs] @abstractmethod def clear(self): """ Clear window and remove every symbol it has :return: None """ raise NotImplementedError('This method is abstract')
[docs] @abstractmethod @verify_type(line_index=int, line=str) @verify_value(line_index=lambda x: x >= 0) def write_line(self, line_index, line): """ Write string on specified line :param line_index: line index to display :param line: string to display (must fit windows width) :return: """ raise NotImplementedError('This method is abstract')
[docs] @abstractmethod @verify_type(y=int, x=int) @verify_value(x=lambda x: x >= 0, y=lambda x: x >= 0) def set_cursor(self, y, x): """ Set input cursor in window to specified coordinates. 0, 0 - is top left coordinates :param y: vertical coordinates, 0 - top, bottom - positive value :param x: horizontal coordinates, 0 - left, right - positive value :return: """ raise NotImplementedError('This method is abstract')
[docs] @verify_type(prompt_show=bool) def refresh(self, prompt_show=True): """ Refresh current window. Clear current window and redraw it with one of drawers :param prompt_show: flag, that specifies, whether to show prompt and current row at the windows end, or not :return: None """ raise NotImplementedError('This method is abstract')
[docs] @verify_type(previous_data=bool, prompt=bool, console_row=bool, console_row_to_cursor=bool) @verify_type(console_row_from_cursor=bool) def data( self, previous_data=False, prompt=False, console_row=False, console_row_to_cursor=False, console_row_from_cursor=False ): """ Return output data. Flags specifies what data to append. If no flags was specified nul-length string returned :param previous_data: If True, then previous output appends :param prompt: If True, then console prompt appends. If console_row or console_row_to_cursor is True, \ then this value is omitted :param console_row: If True, then console prompt and current input appends. :param console_row_to_cursor: If True, then console prompt and current input till cursor appends. \ If console_row is True, then this value is omitted :param console_row_from_cursor: If True, then current input from cursor appends. \ If console_row is True, then this value is omitted :return: str """ result = '' if previous_data: result += self.__previous_data if prompt or console_row or console_row_to_cursor: result += self.console().prompt() if console_row or (console_row_from_cursor and console_row_to_cursor): result += self.console().row() elif console_row_to_cursor: result += self.console().row()[:self.cursor()] elif console_row_from_cursor: result += self.console().row()[self.cursor():] return result
[docs] @verify_type('paranoid', previous_data=bool, prompt=bool, console_row=bool, console_row_to_cursor=bool) @verify_type('paranoid', console_row_from_cursor=bool) def list_data( self, previous_data=False, prompt=False, console_row=False, console_row_to_cursor=False, console_row_from_cursor=False ): """ Return list of strings. Where each string is fitted to windows width. Parameters are the same as they are in :meth:`.WConsoleWindow.data` method :return: list of str """ return self.split(self.data( previous_data, prompt, console_row, console_row_to_cursor, console_row_from_cursor ))
[docs] def console(self): """ Return linked console :return: WConsoleProto """ return self.__console
[docs] @verify_type('paranoid', start_position=int) @verify_value('paranoid', start_position=lambda x: x >= 0) @verify_type(data=list) def write_data(self, data, start_position=0): """ Write data from the specified line :param data: string to write, each one on new line :param start_position: starting line :return: """ if len(data) > self.height(): raise ValueError('Data too long (too many strings)') for i in range(len(data)): self.write_line(start_position + i, data[i])
[docs] @verify_type(pos=(None, int)) @verify_value(pos=lambda x: x is None or x >= 0) def cursor(self, pos=None): """ Set and/or get relative cursor position. Defines cursor position in current input row. :param pos: if value is not None, then current cursor position is set to this value and the same value \ is returned :return: int """ if pos is not None: self.__cursor_position = pos return self.__cursor_position
[docs] def commit(self): """ Store current input row. Keep current input row as previous output :return: None """ self.__previous_data += (self.data(console_row=True) + '\n')
[docs] @verify_type(data=str) def split(self, data): """ Split data into list of string, each (self.width() - 1) length or less. If nul-length string specified then empty list is returned :param data: data to split :return: list of str """ line = deepcopy(data) line_width = (self.width() - 1) lines = [] while len(line): new_line = line[:line_width] new_line_pos = new_line.find('\n') if new_line_pos >= 0: new_line = line[:new_line_pos] line = line[(new_line_pos + 1):] else: line = line[line_width:] lines.append(new_line) return lines
[docs] @verify_type(feedback=str, cr=bool) def write_feedback(self, feedback, cr=True): """ Store feedback. Keep specified feedback as previous output :param feedback: data to store :param cr: whether to write carriage return to the end or not :return: None """ self.__previous_data += feedback if cr is True: self.__previous_data += '\n'
[docs] @verify_type(length=int) @verify_value(length=lambda x: x >= 0) def truncate_feedback(self, length): """ Remove data from feedback (removes text from previous output) :param length: string length to remove (including required cr-characters) :return: None """ self.__previous_data = self.__previous_data[:-length]
[docs]class WConsoleDrawerProto(metaclass=ABCMeta): """ Basic class that helps displaying console content """
[docs] @abstractmethod @verify_type(window=WConsoleWindowProto, prompt_show=bool) def suitable(self, window, prompt_show=True): """ Check if this class can display console content :param window: window that should be drawn :param prompt_show: same as 'prompt_show' in :meth:`.WConsoleWindowProto.refresh` method :return: bool (True if this class can draw console content, False - if it can not) """ raise NotImplementedError('This method is abstract')
[docs] @abstractmethod @verify_type(window=WConsoleWindowProto, prompt_show=bool) def draw(self, window, prompt_show=True): """ Display console content on console window :param window: windows to draw :param prompt_show: same as 'prompt_show' in :meth:`.WConsoleWindowProto.refresh` method :return: None """ raise NotImplementedError('This method is abstract')
[docs]class WConsoleWindowBase(WConsoleWindowProto, metaclass=ABCMeta): """ Basic class for console window implementation """ @verify_type(console=WConsoleProto, drawers=WConsoleDrawerProto) def __init__(self, console, *drawers): """ :param drawers: drawers to use """ WConsoleWindowProto.__init__(self, console) self.__drawers = [] self.__drawers.extend(drawers)
[docs] @verify_type('paranoid', prompt_show=bool) def refresh(self, prompt_show=True): """ Refresh current window. Clear current window and redraw it with one of drawers :param prompt_show: flag, that specifies, whether to show prompt and current row at the windows end, or not :return: None """ self.clear() for drawer in self.__drawers: if drawer.suitable(self, prompt_show=prompt_show): drawer.draw(self, prompt_show=prompt_show) return raise RuntimeError('No suitable drawer was found')
[docs]class WConsoleBase(WConsoleProto): @verify_type(command_set=(WCommandSet, None)) def __init__(self, command_set=None): WConsoleProto.__init__(self) self.__command_set = command_set if command_set is not None else WCommandSet()
[docs] def command_set(self): return self.__command_set
[docs] def window(self): raise NotImplementedError('This method is abstract')
[docs] def start_session(self): """ :meth:`.WConsoleProto.start_session` implementation. Sets cursor to 0 position before session :return: None """ self.window().cursor(0) WConsoleProto.start_session(self)
[docs] def fin_session(self): """ :meth:`.WConsoleProto.fin_session` implementation. Commits current input row :return: None """ self.window().commit() WConsoleProto.fin_session(self)
[docs] @verify_type('paranoid', result=str, cr=bool) def write(self, result, cr=True): """ Shortcut for self.window().write_feedback(result) call :param result: same as feedback in :meth:`WConsoleWindowProto.write_feedback` :param cr: same as cr in :meth:`WConsoleWindowProto.write_feedback` :return: None """ self.window().write_feedback(result, cr=cr)
[docs] @verify_type('paranoid', length=int) @verify_value('paranoid', length=lambda x: x >= 0) def truncate(self, length): """ Shortcut for self.window().truncate_feedback(result) call :param length: same as length in :meth:`WConsoleWindowProto.truncate_feedback` :return: None """ self.window().truncate_feedback(length)
[docs] def refresh_window(self): """ Shortcut for self.window().refresh() call :return: None """ self.window().refresh(prompt_show=self.prompt_show())
[docs] def prompt(self): """ :meth:`.WConsoleProto.prompt` implementation """ return '> '
[docs] @verify_type(result=WCommandResultProto) def handle_result(self, result): result = str(result) if len(result) > 0: self.write(result)
[docs] @verify_type(e=Exception) def handle_exception(self, e): if isinstance(e, WCommandSet.NoCommandFound): self.write('Error: no suitable command found') else: self.write('Internal error. Traceback and exception information:') self.write(traceback.format_exc())
[docs] @verify_type('paranoid', row=str) def exec(self, row): command_set = self.command_set() try: self.handle_result(command_set.exec(row)) except Exception as e: self.handle_exception(e)