Source code for treq.client

from __future__ import absolute_import, division, print_function

import mimetypes
import uuid

from io import BytesIO

from twisted.internet.interfaces import IProtocol
from twisted.internet.defer import Deferred
from twisted.python.components import proxyForInterface
from twisted.python.compat import _PY3, unicode
from twisted.python.filepath import FilePath
from hyperlink import parse as _parse_url

from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer, IResponse

from twisted.web.client import (
    FileBodyProducer,
    RedirectAgent,
    BrowserLikeRedirectAgent,
    ContentDecoderAgent,
    GzipDecoder,
    CookieAgent
)

from twisted.python.components import registerAdapter
from json import dumps as json_dumps

from treq._utils import default_reactor
from treq.auth import add_auth
from treq import multipart
from treq.response import _Response
from requests.cookies import merge_cookies

if _PY3:
    from urllib.parse import urlencode as _urlencode

    def urlencode(query, doseq):
        return _urlencode(query, doseq).encode('ascii')
    from http.cookiejar import CookieJar, Cookie
else:
    from cookielib import CookieJar
    from urllib import urlencode

try:
    # The old location was quixotically deprecated and might actually be
    # removed in 3.10, maybe.
    #
    # See https://github.com/html5lib/html5lib-python/issues/419 for more of
    # this tale of woe.
    from collections.abc import Mapping
except ImportError:
    from collections import Mapping


def _scoped_cookiejar_from_dict(url_object, cookie_dict):
    """
    Create a CookieJar from a dictionary whose cookies are all scoped to the
    given URL's origin.

    @note: This does not scope the cookies to any particular path, only the
        host, port, and scheme of the given URL.
    """
    cookie_jar = CookieJar()
    if cookie_dict is None:
        return cookie_jar
    for k, v in cookie_dict.items():
        secure = url_object.scheme == 'https'
        port_specified = not (
            (url_object.scheme == "https" and url_object.port == 443)
            or (url_object.scheme == "http" and url_object.port == 80)
        )
        port = str(url_object.port)
        domain = url_object.host
        netscape_domain = domain if '.' in domain else domain + '.local'

        cookie_jar.set_cookie(
            Cookie(
                # Scoping
                domain=netscape_domain,
                port=port,
                secure=secure,
                port_specified=port_specified,

                # Contents
                name=k,
                value=v,

                # Constant/always-the-same stuff
                version=0,
                path="/",
                expires=None,
                discard=False,
                comment=None,
                comment_url=None,
                rfc2109=False,
                path_specified=False,
                domain_specified=False,
                domain_initial_dot=False,
                rest=[],
            )
        )
    return cookie_jar


class _BodyBufferingProtocol(proxyForInterface(IProtocol)):
    def __init__(self, original, buffer, finished):
        self.original = original
        self.buffer = buffer
        self.finished = finished

    def dataReceived(self, data):
        self.buffer.append(data)
        self.original.dataReceived(data)

    def connectionLost(self, reason):
        self.original.connectionLost(reason)
        self.finished.errback(reason)


class _BufferedResponse(proxyForInterface(IResponse)):
    def __init__(self, original):
        self.original = original
        self._buffer = []
        self._waiters = []
        self._waiting = None
        self._finished = False
        self._reason = None

    def _deliverWaiting(self, reason):
        self._reason = reason
        self._finished = True
        for waiter in self._waiters:
            for segment in self._buffer:
                waiter.dataReceived(segment)
            waiter.connectionLost(reason)

    def deliverBody(self, protocol):
        if self._waiting is None and not self._finished:
            self._waiting = Deferred()
            self._waiting.addBoth(self._deliverWaiting)
            self.original.deliverBody(
                _BodyBufferingProtocol(
                    protocol,
                    self._buffer,
                    self._waiting
                )
            )
        elif self._finished:
            for segment in self._buffer:
                protocol.dataReceived(segment)
            protocol.connectionLost(self._reason)
        else:
            self._waiters.append(protocol)


[docs]class HTTPClient(object): def __init__(self, agent, cookiejar=None, data_to_body_producer=IBodyProducer): self._agent = agent if cookiejar is None: cookiejar = CookieJar() self._cookiejar = cookiejar self._data_to_body_producer = data_to_body_producer
[docs] def get(self, url, **kwargs): """ See :func:`treq.get()`. """ return self.request('GET', url, **kwargs)
[docs] def put(self, url, data=None, **kwargs): """ See :func:`treq.put()`. """ return self.request('PUT', url, data=data, **kwargs)
[docs] def patch(self, url, data=None, **kwargs): """ See :func:`treq.patch()`. """ return self.request('PATCH', url, data=data, **kwargs)
[docs] def post(self, url, data=None, **kwargs): """ See :func:`treq.post()`. """ return self.request('POST', url, data=data, **kwargs)
[docs] def head(self, url, **kwargs): """ See :func:`treq.head()`. """ return self.request('HEAD', url, **kwargs)
[docs] def delete(self, url, **kwargs): """ See :func:`treq.delete()`. """ return self.request('DELETE', url, **kwargs)
[docs] def request(self, method, url, **kwargs): """ See :func:`treq.request()`. """ method = method.encode('ascii').upper() if isinstance(url, unicode): parsed_url = _parse_url(url) else: parsed_url = _parse_url(url.decode('ascii')) # Join parameters provided in the URL # and the ones passed as argument. params = kwargs.get('params') if params: parsed_url = parsed_url.replace( query=parsed_url.query + tuple(_coerced_query_params(params)) ) url = parsed_url.to_uri().to_text().encode('ascii') # Convert headers dictionary to # twisted raw headers format. headers = kwargs.get('headers') if headers: if isinstance(headers, dict): h = Headers({}) for k, v in headers.items(): if isinstance(v, (bytes, unicode)): h.addRawHeader(k, v) elif isinstance(v, list): h.setRawHeaders(k, v) headers = h else: headers = Headers({}) # Here we choose a right producer # based on the parameters passed in. bodyProducer = None data = kwargs.get('data') files = kwargs.get('files') # since json=None needs to be serialized as 'null', we need to # explicitly check kwargs for this key has_json = 'json' in kwargs if files: # If the files keyword is present we will issue a # multipart/form-data request as it suits better for cases # with files and/or large objects. files = list(_convert_files(files)) boundary = str(uuid.uuid4()).encode('ascii') headers.setRawHeaders( b'content-type', [ b'multipart/form-data; boundary=' + boundary]) if data: data = _convert_params(data) else: data = [] bodyProducer = multipart.MultiPartProducer( data + files, boundary=boundary) elif data: # Otherwise stick to x-www-form-urlencoded format # as it's generally faster for smaller requests. if isinstance(data, (dict, list, tuple)): headers.setRawHeaders( b'content-type', [b'application/x-www-form-urlencoded']) data = urlencode(data, doseq=True) bodyProducer = self._data_to_body_producer(data) elif has_json: # If data is sent as json, set Content-Type as 'application/json' headers.setRawHeaders( b'content-type', [b'application/json; charset=UTF-8']) content = kwargs['json'] json = json_dumps(content, separators=(u',', u':')).encode('utf-8') bodyProducer = self._data_to_body_producer(json) cookies = kwargs.get('cookies', {}) if not isinstance(cookies, CookieJar): cookies = _scoped_cookiejar_from_dict(parsed_url, cookies) cookies = merge_cookies(self._cookiejar, cookies) wrapped_agent = CookieAgent(self._agent, cookies) if kwargs.get('allow_redirects', True): if kwargs.get('browser_like_redirects', False): wrapped_agent = BrowserLikeRedirectAgent(wrapped_agent) else: wrapped_agent = RedirectAgent(wrapped_agent) wrapped_agent = ContentDecoderAgent(wrapped_agent, [(b'gzip', GzipDecoder)]) auth = kwargs.get('auth') if auth: wrapped_agent = add_auth(wrapped_agent, auth) d = wrapped_agent.request( method, url, headers=headers, bodyProducer=bodyProducer) timeout = kwargs.get('timeout') if timeout: delayedCall = default_reactor(kwargs.get('reactor')).callLater( timeout, d.cancel) def gotResult(result): if delayedCall.active(): delayedCall.cancel() return result d.addBoth(gotResult) if not kwargs.get('unbuffered', False): d.addCallback(_BufferedResponse) return d.addCallback(_Response, cookies)
def _convert_params(params): if hasattr(params, "iteritems"): return list(sorted(params.iteritems())) elif hasattr(params, "items"): return list(sorted(params.items())) elif isinstance(params, (tuple, list)): return list(params) else: raise ValueError("Unsupported format") def _convert_files(files): """Files can be passed in a variety of formats: * {'file': open("bla.f")} * {'file': (name, open("bla.f"))} * {'file': (name, content-type, open("bla.f"))} * Anything that has iteritems method, e.g. MultiDict: MultiDict([(name, open()), (name, open())] Our goal is to standardize it to unified form of: * [(param, (file name, content type, producer))] """ if hasattr(files, "iteritems"): files = files.iteritems() elif hasattr(files, "items"): files = files.items() for param, val in files: file_name, content_type, fobj = (None, None, None) if isinstance(val, tuple): if len(val) == 2: file_name, fobj = val elif len(val) == 3: file_name, content_type, fobj = val else: fobj = val if hasattr(fobj, "name"): file_name = FilePath(fobj.name).basename() if not content_type: content_type = _guess_content_type(file_name) yield (param, (file_name, content_type, IBodyProducer(fobj))) def _coerced_query_params(params): """ Carefully coerce *params* in the same way as `urllib.parse.urlencode()` Parameter names and values are coerced to unicode. As a special case, `bytes` are decoded as ASCII. :param params: A mapping or sequence of (name, value) two-tuples. The value may be a list or tuple of multiple values. Names and values may be pretty much any type. :returns: A generator that yields two-tuples containing text strings. :rtype: Iterator[Tuple[Text, Text]] """ if isinstance(params, Mapping): items = params.items() else: items = params for key, values in items: if isinstance(key, bytes): key = key.decode('ascii') elif not isinstance(key, unicode): key = unicode(key) if not isinstance(values, (list, tuple)): values = [values] for value in values: if isinstance(value, bytes): value = value.decode('ascii') elif not isinstance(value, unicode): value = unicode(value) yield key, value def _from_bytes(orig_bytes): return FileBodyProducer(BytesIO(orig_bytes)) def _from_file(orig_file): return FileBodyProducer(orig_file) def _guess_content_type(filename): if filename: guessed = mimetypes.guess_type(filename)[0] else: guessed = None return guessed or 'application/octet-stream' registerAdapter(_from_bytes, bytes, IBodyProducer) registerAdapter(_from_file, BytesIO, IBodyProducer) if not _PY3: from StringIO import StringIO registerAdapter(_from_file, StringIO, IBodyProducer) # Suppress lint failure on Python 3. registerAdapter(_from_file, file, IBodyProducer) # noqa: F821 else: import io # file()/open() equiv on Py3 registerAdapter(_from_file, io.BufferedReader, IBodyProducer)