# Licensed to the Software Freedom Conservancy (SFC) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The SFC licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import platform
import string
import warnings
from base64 import b64encode
from typing import Optional
from urllib import parse
from urllib.parse import urlparse
import urllib3
from selenium import __version__
from . import utils
from .client_config import ClientConfig
from .command import Command
from .errorhandler import ErrorCode
LOGGER = logging.getLogger(__name__)
remote_commands = {
Command.NEW_SESSION: ("POST", "/session"),
Command.QUIT: ("DELETE", "/session/$sessionId"),
Command.W3C_GET_CURRENT_WINDOW_HANDLE: ("GET", "/session/$sessionId/window"),
Command.W3C_GET_WINDOW_HANDLES: ("GET", "/session/$sessionId/window/handles"),
Command.GET: ("POST", "/session/$sessionId/url"),
Command.GO_FORWARD: ("POST", "/session/$sessionId/forward"),
Command.GO_BACK: ("POST", "/session/$sessionId/back"),
Command.REFRESH: ("POST", "/session/$sessionId/refresh"),
Command.W3C_EXECUTE_SCRIPT: ("POST", "/session/$sessionId/execute/sync"),
Command.W3C_EXECUTE_SCRIPT_ASYNC: ("POST", "/session/$sessionId/execute/async"),
Command.GET_CURRENT_URL: ("GET", "/session/$sessionId/url"),
Command.GET_TITLE: ("GET", "/session/$sessionId/title"),
Command.GET_PAGE_SOURCE: ("GET", "/session/$sessionId/source"),
Command.SCREENSHOT: ("GET", "/session/$sessionId/screenshot"),
Command.ELEMENT_SCREENSHOT: ("GET", "/session/$sessionId/element/$id/screenshot"),
Command.FIND_ELEMENT: ("POST", "/session/$sessionId/element"),
Command.FIND_ELEMENTS: ("POST", "/session/$sessionId/elements"),
Command.W3C_GET_ACTIVE_ELEMENT: ("GET", "/session/$sessionId/element/active"),
Command.FIND_CHILD_ELEMENT: ("POST", "/session/$sessionId/element/$id/element"),
Command.FIND_CHILD_ELEMENTS: ("POST", "/session/$sessionId/element/$id/elements"),
Command.CLICK_ELEMENT: ("POST", "/session/$sessionId/element/$id/click"),
Command.CLEAR_ELEMENT: ("POST", "/session/$sessionId/element/$id/clear"),
Command.GET_ELEMENT_TEXT: ("GET", "/session/$sessionId/element/$id/text"),
Command.SEND_KEYS_TO_ELEMENT: ("POST", "/session/$sessionId/element/$id/value"),
Command.GET_ELEMENT_TAG_NAME: ("GET", "/session/$sessionId/element/$id/name"),
Command.IS_ELEMENT_SELECTED: ("GET", "/session/$sessionId/element/$id/selected"),
Command.IS_ELEMENT_ENABLED: ("GET", "/session/$sessionId/element/$id/enabled"),
Command.GET_ELEMENT_RECT: ("GET", "/session/$sessionId/element/$id/rect"),
Command.GET_ELEMENT_ATTRIBUTE: ("GET", "/session/$sessionId/element/$id/attribute/$name"),
Command.GET_ELEMENT_PROPERTY: ("GET", "/session/$sessionId/element/$id/property/$name"),
Command.GET_ELEMENT_ARIA_ROLE: ("GET", "/session/$sessionId/element/$id/computedrole"),
Command.GET_ELEMENT_ARIA_LABEL: ("GET", "/session/$sessionId/element/$id/computedlabel"),
Command.GET_SHADOW_ROOT: ("GET", "/session/$sessionId/element/$id/shadow"),
Command.FIND_ELEMENT_FROM_SHADOW_ROOT: ("POST", "/session/$sessionId/shadow/$shadowId/element"),
Command.FIND_ELEMENTS_FROM_SHADOW_ROOT: ("POST", "/session/$sessionId/shadow/$shadowId/elements"),
Command.GET_ALL_COOKIES: ("GET", "/session/$sessionId/cookie"),
Command.ADD_COOKIE: ("POST", "/session/$sessionId/cookie"),
Command.GET_COOKIE: ("GET", "/session/$sessionId/cookie/$name"),
Command.DELETE_ALL_COOKIES: ("DELETE", "/session/$sessionId/cookie"),
Command.DELETE_COOKIE: ("DELETE", "/session/$sessionId/cookie/$name"),
Command.SWITCH_TO_FRAME: ("POST", "/session/$sessionId/frame"),
Command.SWITCH_TO_PARENT_FRAME: ("POST", "/session/$sessionId/frame/parent"),
Command.SWITCH_TO_WINDOW: ("POST", "/session/$sessionId/window"),
Command.NEW_WINDOW: ("POST", "/session/$sessionId/window/new"),
Command.CLOSE: ("DELETE", "/session/$sessionId/window"),
Command.GET_ELEMENT_VALUE_OF_CSS_PROPERTY: ("GET", "/session/$sessionId/element/$id/css/$propertyName"),
Command.EXECUTE_ASYNC_SCRIPT: ("POST", "/session/$sessionId/execute_async"),
Command.SET_TIMEOUTS: ("POST", "/session/$sessionId/timeouts"),
Command.GET_TIMEOUTS: ("GET", "/session/$sessionId/timeouts"),
Command.W3C_DISMISS_ALERT: ("POST", "/session/$sessionId/alert/dismiss"),
Command.W3C_ACCEPT_ALERT: ("POST", "/session/$sessionId/alert/accept"),
Command.W3C_SET_ALERT_VALUE: ("POST", "/session/$sessionId/alert/text"),
Command.W3C_GET_ALERT_TEXT: ("GET", "/session/$sessionId/alert/text"),
Command.W3C_ACTIONS: ("POST", "/session/$sessionId/actions"),
Command.W3C_CLEAR_ACTIONS: ("DELETE", "/session/$sessionId/actions"),
Command.SET_WINDOW_RECT: ("POST", "/session/$sessionId/window/rect"),
Command.GET_WINDOW_RECT: ("GET", "/session/$sessionId/window/rect"),
Command.W3C_MAXIMIZE_WINDOW: ("POST", "/session/$sessionId/window/maximize"),
Command.SET_SCREEN_ORIENTATION: ("POST", "/session/$sessionId/orientation"),
Command.GET_SCREEN_ORIENTATION: ("GET", "/session/$sessionId/orientation"),
Command.GET_NETWORK_CONNECTION: ("GET", "/session/$sessionId/network_connection"),
Command.SET_NETWORK_CONNECTION: ("POST", "/session/$sessionId/network_connection"),
Command.GET_LOG: ("POST", "/session/$sessionId/se/log"),
Command.GET_AVAILABLE_LOG_TYPES: ("GET", "/session/$sessionId/se/log/types"),
Command.CURRENT_CONTEXT_HANDLE: ("GET", "/session/$sessionId/context"),
Command.CONTEXT_HANDLES: ("GET", "/session/$sessionId/contexts"),
Command.SWITCH_TO_CONTEXT: ("POST", "/session/$sessionId/context"),
Command.FULLSCREEN_WINDOW: ("POST", "/session/$sessionId/window/fullscreen"),
Command.MINIMIZE_WINDOW: ("POST", "/session/$sessionId/window/minimize"),
Command.PRINT_PAGE: ("POST", "/session/$sessionId/print"),
Command.ADD_VIRTUAL_AUTHENTICATOR: ("POST", "/session/$sessionId/webauthn/authenticator"),
Command.REMOVE_VIRTUAL_AUTHENTICATOR: (
"DELETE",
"/session/$sessionId/webauthn/authenticator/$authenticatorId",
),
Command.ADD_CREDENTIAL: ("POST", "/session/$sessionId/webauthn/authenticator/$authenticatorId/credential"),
Command.GET_CREDENTIALS: ("GET", "/session/$sessionId/webauthn/authenticator/$authenticatorId/credentials"),
Command.REMOVE_CREDENTIAL: (
"DELETE",
"/session/$sessionId/webauthn/authenticator/$authenticatorId/credentials/$credentialId",
),
Command.REMOVE_ALL_CREDENTIALS: (
"DELETE",
"/session/$sessionId/webauthn/authenticator/$authenticatorId/credentials",
),
Command.SET_USER_VERIFIED: ("POST", "/session/$sessionId/webauthn/authenticator/$authenticatorId/uv"),
Command.UPLOAD_FILE: ("POST", "/session/$sessionId/se/file"),
Command.GET_DOWNLOADABLE_FILES: ("GET", "/session/$sessionId/se/files"),
Command.DOWNLOAD_FILE: ("POST", "/session/$sessionId/se/files"),
Command.DELETE_DOWNLOADABLE_FILES: ("DELETE", "/session/$sessionId/se/files"),
# Federated Credential Management (FedCM)
Command.GET_FEDCM_TITLE: ("GET", "/session/$sessionId/fedcm/gettitle"),
Command.GET_FEDCM_DIALOG_TYPE: ("GET", "/session/$sessionId/fedcm/getdialogtype"),
Command.GET_FEDCM_ACCOUNT_LIST: ("GET", "/session/$sessionId/fedcm/accountlist"),
Command.CLICK_FEDCM_DIALOG_BUTTON: ("POST", "/session/$sessionId/fedcm/clickdialogbutton"),
Command.CANCEL_FEDCM_DIALOG: ("POST", "/session/$sessionId/fedcm/canceldialog"),
Command.SELECT_FEDCM_ACCOUNT: ("POST", "/session/$sessionId/fedcm/selectaccount"),
Command.SET_FEDCM_DELAY: ("POST", "/session/$sessionId/fedcm/setdelayenabled"),
Command.RESET_FEDCM_COOLDOWN: ("POST", "/session/$sessionId/fedcm/resetcooldown"),
}
[docs]
class RemoteConnection:
"""A connection with the Remote WebDriver server.
Communicates with the server using the WebDriver wire protocol:
https://github.com/SeleniumHQ/selenium/wiki/JsonWireProtocol
"""
browser_name = None
# Keep backward compatibility for AppiumConnection - https://github.com/SeleniumHQ/selenium/issues/14694
import os
import socket
import certifi
_timeout = (
float(os.getenv("GLOBAL_DEFAULT_TIMEOUT", str(socket.getdefaulttimeout())))
if os.getenv("GLOBAL_DEFAULT_TIMEOUT") is not None
else socket.getdefaulttimeout()
)
_ca_certs = os.getenv("REQUESTS_CA_BUNDLE") if "REQUESTS_CA_BUNDLE" in os.environ else certifi.where()
_client_config: ClientConfig = None
system = platform.system().lower()
if system == "darwin":
system = "mac"
# Class variables for headers
extra_headers = None
user_agent = f"selenium/{__version__} (python {system})"
[docs]
@classmethod
def get_timeout(cls):
""":Returns:
Timeout value in seconds for all http requests made to the
Remote Connection
"""
warnings.warn(
"get_timeout() in RemoteConnection is deprecated, get timeout from ClientConfig instance instead",
DeprecationWarning,
stacklevel=2,
)
return cls._client_config.timeout
[docs]
@classmethod
def set_timeout(cls, timeout):
"""Override the default timeout.
:Args:
- timeout - timeout value for http requests in seconds
"""
warnings.warn(
"set_timeout() in RemoteConnection is deprecated, set timeout to ClientConfig instance in constructor instead",
DeprecationWarning,
stacklevel=2,
)
cls._client_config.timeout = timeout
[docs]
@classmethod
def reset_timeout(cls):
"""Reset the http request timeout to socket._GLOBAL_DEFAULT_TIMEOUT."""
warnings.warn(
"reset_timeout() in RemoteConnection is deprecated, use reset_timeout() in ClientConfig instance instead",
DeprecationWarning,
stacklevel=2,
)
cls._client_config.reset_timeout()
[docs]
@classmethod
def get_certificate_bundle_path(cls):
""":Returns:
Paths of the .pem encoded certificate to verify connection to
command executor. Defaults to certifi.where() or
REQUESTS_CA_BUNDLE env variable if set.
"""
warnings.warn(
"get_certificate_bundle_path() in RemoteConnection is deprecated, get ca_certs from ClientConfig instance instead",
DeprecationWarning,
stacklevel=2,
)
return cls._client_config.ca_certs
[docs]
@classmethod
def set_certificate_bundle_path(cls, path):
"""Set the path to the certificate bundle to verify connection to
command executor. Can also be set to None to disable certificate
validation.
:Args:
- path - path of a .pem encoded certificate chain.
"""
warnings.warn(
"set_certificate_bundle_path() in RemoteConnection is deprecated, set ca_certs to ClientConfig instance in constructor instead",
DeprecationWarning,
stacklevel=2,
)
cls._client_config.ca_certs = path
def _identify_http_proxy_auth(self):
parsed_url = urlparse(self._proxy_url)
if parsed_url.username and parsed_url.password:
return True
def _separate_http_proxy_auth(self):
parsed_url = urlparse(self._proxy_url)
proxy_without_auth = f"{parsed_url.scheme}://{parsed_url.hostname}:{parsed_url.port}"
auth = f"{parsed_url.username}:{parsed_url.password}"
return proxy_without_auth, auth
def _get_connection_manager(self):
pool_manager_init_args = {"timeout": self._client_config.timeout}
pool_manager_init_args.update(
self._client_config.init_args_for_pool_manager.get("init_args_for_pool_manager", {})
)
if self._client_config.ignore_certificates:
pool_manager_init_args["cert_reqs"] = "CERT_NONE"
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
elif self._client_config.ca_certs:
pool_manager_init_args["cert_reqs"] = "CERT_REQUIRED"
pool_manager_init_args["ca_certs"] = self._client_config.ca_certs
if self._proxy_url:
if self._proxy_url.lower().startswith("sock"):
from urllib3.contrib.socks import SOCKSProxyManager
return SOCKSProxyManager(self._proxy_url, **pool_manager_init_args)
if self._identify_http_proxy_auth():
self._proxy_url, self._basic_proxy_auth = self._separate_http_proxy_auth()
pool_manager_init_args["proxy_headers"] = urllib3.make_headers(proxy_basic_auth=self._basic_proxy_auth)
return urllib3.ProxyManager(self._proxy_url, **pool_manager_init_args)
return urllib3.PoolManager(**pool_manager_init_args)
def __init__(
self,
remote_server_addr: Optional[str] = None,
keep_alive: Optional[bool] = True,
ignore_proxy: Optional[bool] = False,
ignore_certificates: Optional[bool] = False,
init_args_for_pool_manager: Optional[dict] = None,
client_config: Optional[ClientConfig] = None,
):
self._client_config = client_config or ClientConfig(
remote_server_addr=remote_server_addr,
keep_alive=keep_alive,
ignore_certificates=ignore_certificates,
init_args_for_pool_manager=init_args_for_pool_manager,
)
# Keep backward compatibility for AppiumConnection - https://github.com/SeleniumHQ/selenium/issues/14694
RemoteConnection._timeout = self._client_config.timeout
RemoteConnection._ca_certs = self._client_config.ca_certs
RemoteConnection._client_config = self._client_config
RemoteConnection.extra_headers = self._client_config.extra_headers or RemoteConnection.extra_headers
RemoteConnection.user_agent = self._client_config.user_agent or RemoteConnection.user_agent
if remote_server_addr:
warnings.warn(
"setting remote_server_addr in RemoteConnection() is deprecated, set in ClientConfig instance instead",
DeprecationWarning,
stacklevel=2,
)
if not keep_alive:
warnings.warn(
"setting keep_alive in RemoteConnection() is deprecated, set in ClientConfig instance instead",
DeprecationWarning,
stacklevel=2,
)
if ignore_certificates:
warnings.warn(
"setting ignore_certificates in RemoteConnection() is deprecated, set in ClientConfig instance instead",
DeprecationWarning,
stacklevel=2,
)
if init_args_for_pool_manager:
warnings.warn(
"setting init_args_for_pool_manager in RemoteConnection() is deprecated, set in ClientConfig instance instead",
DeprecationWarning,
stacklevel=2,
)
if ignore_proxy:
warnings.warn(
"setting ignore_proxy in RemoteConnection() is deprecated, set in ClientConfig instance instead",
DeprecationWarning,
stacklevel=2,
)
self._proxy_url = None
else:
self._proxy_url = self._client_config.get_proxy_url()
if self._client_config.keep_alive:
self._conn = self._get_connection_manager()
self._commands = remote_commands
extra_commands = {}
[docs]
def add_command(self, name, method, url):
"""Register a new command."""
self._commands[name] = (method, url)
[docs]
def get_command(self, name: str):
"""Retrieve a command if it exists."""
return self._commands.get(name)
[docs]
def execute(self, command, params):
"""Send a command to the remote server.
Any path substitutions required for the URL mapped to the command should be
included in the command parameters.
:Args:
- command - A string specifying the command to execute.
- params - A dictionary of named parameters to send with the command as
its JSON payload.
"""
command_info = self._commands.get(command) or self.extra_commands.get(command)
assert command_info is not None, f"Unrecognised command {command}"
path_string = command_info[1]
path = string.Template(path_string).substitute(params)
substitute_params = {word[1:] for word in path_string.split("/") if word.startswith("$")} # remove dollar sign
if isinstance(params, dict) and substitute_params:
for word in substitute_params:
del params[word]
data = utils.dump_json(params)
url = f"{self._client_config.remote_server_addr}{path}"
trimmed = self._trim_large_entries(params)
LOGGER.debug("%s %s %s", command_info[0], url, str(trimmed))
return self._request(command_info[0], url, body=data)
def _request(self, method, url, body=None):
"""Send an HTTP request to the remote server.
:Args:
- method - A string for the HTTP method to send the request with.
- url - A string for the URL to send the request to.
- body - A string for request body. Ignored unless method is POST or PUT.
:Returns:
A dictionary with the server's parsed JSON response.
"""
parsed_url = parse.urlparse(url)
headers = self.get_remote_connection_headers(parsed_url, self._client_config.keep_alive)
auth_header = self._client_config.get_auth_header()
if auth_header:
headers.update(auth_header)
if body and method not in ("POST", "PUT"):
body = None
if self._client_config.keep_alive:
response = self._conn.request(method, url, body=body, headers=headers, timeout=self._client_config.timeout)
statuscode = response.status
else:
conn = self._get_connection_manager()
with conn as http:
response = http.request(method, url, body=body, headers=headers, timeout=self._client_config.timeout)
statuscode = response.status
data = response.data.decode("UTF-8")
LOGGER.debug("Remote response: status=%s | data=%s | headers=%s", response.status, data, response.headers)
try:
if 300 <= statuscode < 304:
return self._request("GET", response.headers.get("location", None))
if 399 < statuscode <= 500:
if statuscode == 401:
return {"status": statuscode, "value": "Authorization Required"}
return {"status": statuscode, "value": str(statuscode) if not data else data.strip()}
content_type = []
if response.headers.get("Content-Type", None):
content_type = response.headers.get("Content-Type", None).split(";")
if not any([x.startswith("image/png") for x in content_type]):
try:
data = utils.load_json(data.strip())
except ValueError:
if 199 < statuscode < 300:
status = ErrorCode.SUCCESS
else:
status = ErrorCode.UNKNOWN_ERROR
return {"status": status, "value": data.strip()}
# Some drivers incorrectly return a response
# with no 'value' field when they should return null.
if "value" not in data:
data["value"] = None
return data
data = {"status": 0, "value": data}
return data
finally:
LOGGER.debug("Finished Request")
response.close()
[docs]
def close(self):
"""Clean up resources when finished with the remote_connection."""
if hasattr(self, "_conn"):
self._conn.clear()
def _trim_large_entries(self, input_dict, max_length=100):
"""Truncate string values in a dictionary if they exceed max_length.
:param dict: Dictionary with potentially large values
:param max_length: Maximum allowed length of string values
:return: Dictionary with truncated string values
"""
output_dictionary = {}
for key, value in input_dict.items():
if isinstance(value, dict):
output_dictionary[key] = self._trim_large_entries(value, max_length)
elif isinstance(value, str) and len(value) > max_length:
output_dictionary[key] = value[:max_length] + "..."
else:
output_dictionary[key] = value
return output_dictionary