Source code for wasp_general.network.web.headers

# -*- coding: utf-8 -*-
# wasp_general/network/web/headers.py
#
# Copyright (C) 2016 the wasp-general authors and contributors
# <see AUTHORS file>
#
# This file is part of wasp-general.
#
# Wasp-general is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Wasp-general is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with wasp-general.  If not, see <http://www.gnu.org/licenses/>.

# noinspection PyUnresolvedReferences
from wasp_general.version import __author__, __version__, __credits__, __license__, __copyright__, __email__
# noinspection PyUnresolvedReferences
from wasp_general.version import __status__

import re
from io import StringIO
import email.message
from wasp_general.verify import verify_type, verify_value
from wasp_general.network.web.cookies import WHTTPCookieJar
from wasp_general.network.web.re_statements import http_header_name


[docs]class WHTTPHeaders: """ Represent HTTP Headers as they are described in RFC 1945. Call :meth:`.WHTTPHeaders.ro` method to create read-only copy (in this state no changes are allowed) Cookies that are stored with :meth:`.WHTTPHeaders.set_cookie_jar` method have highest priority than cookies stored within constructor and/or :meth:`.WHTTPHeaders.add_header`. So in the conflict situation cookies saved within :meth:`.WHTTPHeaders.set_cookie_jar` must be used. Cookies that are stored in :meth:`.WHTTPHeaders.client_cookie_jar` are client cookies (i.e. read-only cookies). """ header_name_re = re.compile(http_header_name) """ Regexp is for HTTP header name checking """
[docs] @staticmethod def header_name_check(header_name): """ Check header name for validity. Return True if name is valid :param header_name: name to check :return: bool """ header_match = WHTTPHeaders.header_name_re.match(header_name.encode('us-ascii')) return len(header_name) > 0 and header_match is not None
def __init__(self, **kwargs): """ Construct new headers collection :param kwargs: dictionary of header names and corresponding values """ self.__headers = {} self.__ro_flag = False self.__normalization_mode = '1.0' self.__set_cookies = WHTTPCookieJar() for arg_name in kwargs.keys(): self.add_headers(arg_name, kwargs[arg_name])
[docs] def headers(self): """ Get specified header names :return: tuple of str """ return tuple(self.__headers.keys())
[docs] @verify_type('paranoid', header_name=str) @verify_value('paranoid', header_name=lambda x: WHTTPHeaders.header_name_check(x)) def remove_headers(self, header_name): """ Remove header by its name :param header_name: name of header to remove :return: None """ if self.__ro_flag: raise RuntimeError('ro') header_name = self.normalize_name(header_name) if header_name in self.__headers.keys(): self.__headers.pop(header_name)
[docs] @verify_type('paranoid', header_name=str) @verify_value('paranoid', header_name=lambda x: WHTTPHeaders.header_name_check(x)) @verify_type(value=str, values=str) def add_headers(self, header_name, value, *values): """ Add new header :param header_name: name of the header to add :param value: header value :param values: additional header values (in a result request/response must be concatenated by the coma \ or by the separate header string) :return: None """ if self.__ro_flag: raise RuntimeError('ro') header_name = self.normalize_name(header_name) if header_name not in self.__headers.keys(): self.__headers[header_name] = [value] else: self.__headers[header_name].append(value) for single_value in values: self.__headers[header_name].append(single_value)
[docs] @verify_type('paranoid', header_name=str, value=str, values=str) @verify_value('paranoid', header_name=lambda x: WHTTPHeaders.header_name_check(x)) def replace_headers(self, header_name, value, *values): """ Replace header value with specified value and/or values :param header_name: target header :param value: new header value :param values: additional header values (in a result request/response must be concatenated by the coma \ or by the separate header string) :return: None """ if self.__ro_flag: raise RuntimeError('ro') header_name = self.normalize_name(header_name) self.remove_headers(header_name) self.add_headers(header_name, value, *values)
[docs] @verify_type('paranoid', header_name=str) @verify_value('paranoid', header_name=lambda x: WHTTPHeaders.header_name_check(x)) def get_headers(self, header_name): """ Return header value by its name :param header_name: header name :return: tuple of str """ header_name = self.normalize_name(header_name) if header_name in self.__headers.keys(): return tuple(self.__headers[header_name])
@verify_type('paranoid', item=str) @verify_value('paranoid', item=lambda x: WHTTPHeaders.header_name_check(x)) def __getitem__(self, item): """ Return header value by its name :param item: header name :return: tuple of str """ return self.get_headers(item)
[docs] @verify_type(header_name=str) @verify_value(header_name=lambda x: WHTTPHeaders.header_name_check(x)) def normalize_name(self, header_name): """ Return header name as it is recommended (required) by corresponding http protocol. For protocol switching use :meth:`.WHTTPHeaders.switch_name_style` method. All current available protocols (0.9-2) compare header names in a case-insensitive fashion. However, previous protocol versions (0.9-1.1) recommends to use camel-case names like Foo or Foo-Bar. But HTTP/2 (RFC 7540) strictly requires lowercase only header names. :param header_name: name to convert :return: str """ if self.__normalization_mode in ['0.9', '1.0', '1.1']: return '-'.join([x.capitalize() for x in header_name.split('-')]) elif self.__normalization_mode == '2': return header_name.lower() raise RuntimeError('Internal error: unknown http protocol: %s' % self.__normalization_mode)
[docs] @verify_type(http_protocol_version=str) @verify_value(http_protocol_version=lambda x: x in ['0.9', '1.0', '1.1', '2']) def switch_name_style(self, http_protocol_version): """ Return object copy with header names saved as it is described in the given protocol version see :meth:`.WHTTPHeaders.normalize_name` :param http_protocol_version: target HTTP protocol version :return: WHTTPHeaders """ new_headers = WHTTPHeaders() new_headers.__normalization_mode = http_protocol_version names = self.headers() for name in names: new_headers.add_headers(name, *self.get_headers(name)) for cookie_name in self.__set_cookies.cookies(): new_headers.__set_cookies.add_cookie(self.__set_cookies[cookie_name].copy()) return new_headers
[docs] def ro(self): """ Return read-only copy of this object :return: WHTTPHeaders """ ro_headers = WHTTPHeaders() names = self.headers() for name in names: ro_headers.add_headers(name, *self.get_headers(name)) ro_headers.__cookies = self.__set_cookies.ro() ro_headers.__ro_flag = True return ro_headers
[docs] @verify_type('paranoid', value=(str, None)) def content_type(self, value=None): """ Set (replace) and or get "Content-Type" header value :param value: value to set (if specified) :return: None if header doesn't exist, otherwise - str """ content_type = self.normalize_name('Content-Type') if value is not None: self.replace_headers(content_type, value) if content_type in self.__headers.keys(): return self.__headers[content_type][0]
[docs] @classmethod @verify_type(http_code=str) def import_headers(cls, http_code): """ Create WHTTPHeaders by the given code. If code has 'Set-Cookie' headers, that headers are parsed, data are stored in internal cookie jar. At the end of parsing 'Set-Cookie' headers are removed from the result :param http_code: HTTP code to parse :return: WHTTPHeaders """ headers = WHTTPHeaders() message = email.message_from_file(StringIO(http_code)) for header_name, header_value in message.items(): headers.add_headers(header_name, header_value) cookie_header = headers.get_headers('Set-Cookie') if cookie_header is not None: for cookie_string in cookie_header: for single_cookie in WHTTPCookieJar.import_header_text(cookie_string): headers.set_cookie_jar().add_cookie(single_cookie) headers.remove_headers('Set-Cookie') return headers