-
Notifications
You must be signed in to change notification settings - Fork 753
/
Copy pathasync_http_client.py
135 lines (118 loc) · 4.4 KB
/
async_http_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import logging
from typing import Dict, Optional, Tuple
from aiohttp import BasicAuth, ClientSession
from aiohttp_retry import ExponentialRetry, RetryClient
from twilio.http import AsyncHttpClient
from twilio.http.request import Request as TwilioRequest
from twilio.http.response import Response
_logger = logging.getLogger("twilio.async_http_client")
class AsyncTwilioHttpClient(AsyncHttpClient):
"""
General purpose asynchronous HTTP Client for interacting with the Twilio API
"""
def __init__(
self,
pool_connections: bool = True,
trace_configs=None,
timeout: Optional[float] = None,
logger: logging.Logger = _logger,
proxy_url: Optional[str] = None,
max_retries: Optional[int] = None,
):
"""
Constructor for the AsyncTwilioHttpClient
:param pool_connections: Creates a client session for making requests from.
:param trace_configs: Configuration used to trace request lifecycle events. See aiohttp library TraceConfig
documentation for more info.
:param timeout: Timeout for the requests (seconds)
:param logger
:param proxy_url: Proxy URL
:param max_retries: Maximum number of retries each request should attempt
"""
super().__init__(logger, True, timeout)
self.proxy_url = proxy_url
self.trace_configs = trace_configs
self.session = (
ClientSession(trace_configs=self.trace_configs)
if pool_connections
else None
)
if max_retries is not None:
retry_options = ExponentialRetry(attempts=max_retries)
self.session = RetryClient(
client_session=self.session, retry_options=retry_options
)
async def request(
self,
method: str,
url: str,
params: Optional[Dict[str, object]] = None,
data: Optional[Dict[str, object]] = None,
headers: Optional[Dict[str, str]] = None,
auth: Optional[Tuple[str, str]] = None,
timeout: Optional[float] = None,
allow_redirects: bool = False,
) -> Response:
"""
Make an asynchronous HTTP Request with parameters provided.
:param method: The HTTP method to use
:param url: The URL to request
:param params: Query parameters to append to the URL
:param data: Parameters to go in the body of the HTTP request
:param headers: HTTP Headers to send with the request
:param auth: Basic Auth arguments (username, password entries)
:param timeout: Socket/Read timeout for the request. Overrides the timeout if set on the client.
:param allow_redirects: Whether or not to allow redirects
See the requests documentation for explanation of all these parameters
:return: An http response
"""
if timeout is not None and timeout <= 0:
raise ValueError(timeout)
basic_auth = None
if auth is not None:
basic_auth = BasicAuth(login=auth[0], password=auth[1])
kwargs = {
"method": method.upper(),
"url": url,
"params": params,
"data": data,
"headers": headers,
"auth": basic_auth,
"timeout": timeout,
"allow_redirects": allow_redirects,
}
self.log_request(kwargs)
self._test_only_last_response = None
temp = False
session = None
if self.session:
session = self.session
else:
session = ClientSession()
temp = True
self._test_only_last_request = TwilioRequest(**kwargs)
response = await session.request(**kwargs)
self.log_response(response.status, response)
self._test_only_last_response = Response(
response.status, await response.text(), response.headers
)
if temp:
await session.close()
return self._test_only_last_response
async def close(self):
"""
Closes the HTTP client session
"""
if self.session:
await self.session.close()
async def __aenter__(self):
"""
Async context manager setup
"""
return self
async def __aexit__(self, *excinfo):
"""
Async context manager exit
"""
if self.session:
await self.session.close()