Source code for selenium.webdriver.remote.remote_connection

# Licensed to the Software Freedom Conservancy (SFC) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The SFC licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import logging
import platform
import string
import warnings
from base64 import b64encode
from typing import Optional
from urllib import parse
from urllib.parse import urlparse

import urllib3

from selenium import __version__

from . import utils
from .client_config import ClientConfig
from .command import Command
from .errorhandler import ErrorCode

LOGGER = logging.getLogger(__name__)

remote_commands = {
    Command.NEW_SESSION: ("POST", "/session"),
    Command.QUIT: ("DELETE", "/session/$sessionId"),
    Command.W3C_GET_CURRENT_WINDOW_HANDLE: ("GET", "/session/$sessionId/window"),
    Command.W3C_GET_WINDOW_HANDLES: ("GET", "/session/$sessionId/window/handles"),
    Command.GET: ("POST", "/session/$sessionId/url"),
    Command.GO_FORWARD: ("POST", "/session/$sessionId/forward"),
    Command.GO_BACK: ("POST", "/session/$sessionId/back"),
    Command.REFRESH: ("POST", "/session/$sessionId/refresh"),
    Command.W3C_EXECUTE_SCRIPT: ("POST", "/session/$sessionId/execute/sync"),
    Command.W3C_EXECUTE_SCRIPT_ASYNC: ("POST", "/session/$sessionId/execute/async"),
    Command.GET_CURRENT_URL: ("GET", "/session/$sessionId/url"),
    Command.GET_TITLE: ("GET", "/session/$sessionId/title"),
    Command.GET_PAGE_SOURCE: ("GET", "/session/$sessionId/source"),
    Command.SCREENSHOT: ("GET", "/session/$sessionId/screenshot"),
    Command.ELEMENT_SCREENSHOT: ("GET", "/session/$sessionId/element/$id/screenshot"),
    Command.FIND_ELEMENT: ("POST", "/session/$sessionId/element"),
    Command.FIND_ELEMENTS: ("POST", "/session/$sessionId/elements"),
    Command.W3C_GET_ACTIVE_ELEMENT: ("GET", "/session/$sessionId/element/active"),
    Command.FIND_CHILD_ELEMENT: ("POST", "/session/$sessionId/element/$id/element"),
    Command.FIND_CHILD_ELEMENTS: ("POST", "/session/$sessionId/element/$id/elements"),
    Command.CLICK_ELEMENT: ("POST", "/session/$sessionId/element/$id/click"),
    Command.CLEAR_ELEMENT: ("POST", "/session/$sessionId/element/$id/clear"),
    Command.GET_ELEMENT_TEXT: ("GET", "/session/$sessionId/element/$id/text"),
    Command.SEND_KEYS_TO_ELEMENT: ("POST", "/session/$sessionId/element/$id/value"),
    Command.GET_ELEMENT_TAG_NAME: ("GET", "/session/$sessionId/element/$id/name"),
    Command.IS_ELEMENT_SELECTED: ("GET", "/session/$sessionId/element/$id/selected"),
    Command.IS_ELEMENT_ENABLED: ("GET", "/session/$sessionId/element/$id/enabled"),
    Command.GET_ELEMENT_RECT: ("GET", "/session/$sessionId/element/$id/rect"),
    Command.GET_ELEMENT_ATTRIBUTE: ("GET", "/session/$sessionId/element/$id/attribute/$name"),
    Command.GET_ELEMENT_PROPERTY: ("GET", "/session/$sessionId/element/$id/property/$name"),
    Command.GET_ELEMENT_ARIA_ROLE: ("GET", "/session/$sessionId/element/$id/computedrole"),
    Command.GET_ELEMENT_ARIA_LABEL: ("GET", "/session/$sessionId/element/$id/computedlabel"),
    Command.GET_SHADOW_ROOT: ("GET", "/session/$sessionId/element/$id/shadow"),
    Command.FIND_ELEMENT_FROM_SHADOW_ROOT: ("POST", "/session/$sessionId/shadow/$shadowId/element"),
    Command.FIND_ELEMENTS_FROM_SHADOW_ROOT: ("POST", "/session/$sessionId/shadow/$shadowId/elements"),
    Command.GET_ALL_COOKIES: ("GET", "/session/$sessionId/cookie"),
    Command.ADD_COOKIE: ("POST", "/session/$sessionId/cookie"),
    Command.GET_COOKIE: ("GET", "/session/$sessionId/cookie/$name"),
    Command.DELETE_ALL_COOKIES: ("DELETE", "/session/$sessionId/cookie"),
    Command.DELETE_COOKIE: ("DELETE", "/session/$sessionId/cookie/$name"),
    Command.SWITCH_TO_FRAME: ("POST", "/session/$sessionId/frame"),
    Command.SWITCH_TO_PARENT_FRAME: ("POST", "/session/$sessionId/frame/parent"),
    Command.SWITCH_TO_WINDOW: ("POST", "/session/$sessionId/window"),
    Command.NEW_WINDOW: ("POST", "/session/$sessionId/window/new"),
    Command.CLOSE: ("DELETE", "/session/$sessionId/window"),
    Command.GET_ELEMENT_VALUE_OF_CSS_PROPERTY: ("GET", "/session/$sessionId/element/$id/css/$propertyName"),
    Command.EXECUTE_ASYNC_SCRIPT: ("POST", "/session/$sessionId/execute_async"),
    Command.SET_TIMEOUTS: ("POST", "/session/$sessionId/timeouts"),
    Command.GET_TIMEOUTS: ("GET", "/session/$sessionId/timeouts"),
    Command.W3C_DISMISS_ALERT: ("POST", "/session/$sessionId/alert/dismiss"),
    Command.W3C_ACCEPT_ALERT: ("POST", "/session/$sessionId/alert/accept"),
    Command.W3C_SET_ALERT_VALUE: ("POST", "/session/$sessionId/alert/text"),
    Command.W3C_GET_ALERT_TEXT: ("GET", "/session/$sessionId/alert/text"),
    Command.W3C_ACTIONS: ("POST", "/session/$sessionId/actions"),
    Command.W3C_CLEAR_ACTIONS: ("DELETE", "/session/$sessionId/actions"),
    Command.SET_WINDOW_RECT: ("POST", "/session/$sessionId/window/rect"),
    Command.GET_WINDOW_RECT: ("GET", "/session/$sessionId/window/rect"),
    Command.W3C_MAXIMIZE_WINDOW: ("POST", "/session/$sessionId/window/maximize"),
    Command.SET_SCREEN_ORIENTATION: ("POST", "/session/$sessionId/orientation"),
    Command.GET_SCREEN_ORIENTATION: ("GET", "/session/$sessionId/orientation"),
    Command.GET_NETWORK_CONNECTION: ("GET", "/session/$sessionId/network_connection"),
    Command.SET_NETWORK_CONNECTION: ("POST", "/session/$sessionId/network_connection"),
    Command.GET_LOG: ("POST", "/session/$sessionId/se/log"),
    Command.GET_AVAILABLE_LOG_TYPES: ("GET", "/session/$sessionId/se/log/types"),
    Command.CURRENT_CONTEXT_HANDLE: ("GET", "/session/$sessionId/context"),
    Command.CONTEXT_HANDLES: ("GET", "/session/$sessionId/contexts"),
    Command.SWITCH_TO_CONTEXT: ("POST", "/session/$sessionId/context"),
    Command.FULLSCREEN_WINDOW: ("POST", "/session/$sessionId/window/fullscreen"),
    Command.MINIMIZE_WINDOW: ("POST", "/session/$sessionId/window/minimize"),
    Command.PRINT_PAGE: ("POST", "/session/$sessionId/print"),
    Command.ADD_VIRTUAL_AUTHENTICATOR: ("POST", "/session/$sessionId/webauthn/authenticator"),
    Command.REMOVE_VIRTUAL_AUTHENTICATOR: (
        "DELETE",
        "/session/$sessionId/webauthn/authenticator/$authenticatorId",
    ),
    Command.ADD_CREDENTIAL: ("POST", "/session/$sessionId/webauthn/authenticator/$authenticatorId/credential"),
    Command.GET_CREDENTIALS: ("GET", "/session/$sessionId/webauthn/authenticator/$authenticatorId/credentials"),
    Command.REMOVE_CREDENTIAL: (
        "DELETE",
        "/session/$sessionId/webauthn/authenticator/$authenticatorId/credentials/$credentialId",
    ),
    Command.REMOVE_ALL_CREDENTIALS: (
        "DELETE",
        "/session/$sessionId/webauthn/authenticator/$authenticatorId/credentials",
    ),
    Command.SET_USER_VERIFIED: ("POST", "/session/$sessionId/webauthn/authenticator/$authenticatorId/uv"),
    Command.UPLOAD_FILE: ("POST", "/session/$sessionId/se/file"),
    Command.GET_DOWNLOADABLE_FILES: ("GET", "/session/$sessionId/se/files"),
    Command.DOWNLOAD_FILE: ("POST", "/session/$sessionId/se/files"),
    Command.DELETE_DOWNLOADABLE_FILES: ("DELETE", "/session/$sessionId/se/files"),
    # Federated Credential Management (FedCM)
    Command.GET_FEDCM_TITLE: ("GET", "/session/$sessionId/fedcm/gettitle"),
    Command.GET_FEDCM_DIALOG_TYPE: ("GET", "/session/$sessionId/fedcm/getdialogtype"),
    Command.GET_FEDCM_ACCOUNT_LIST: ("GET", "/session/$sessionId/fedcm/accountlist"),
    Command.CLICK_FEDCM_DIALOG_BUTTON: ("POST", "/session/$sessionId/fedcm/clickdialogbutton"),
    Command.CANCEL_FEDCM_DIALOG: ("POST", "/session/$sessionId/fedcm/canceldialog"),
    Command.SELECT_FEDCM_ACCOUNT: ("POST", "/session/$sessionId/fedcm/selectaccount"),
    Command.SET_FEDCM_DELAY: ("POST", "/session/$sessionId/fedcm/setdelayenabled"),
    Command.RESET_FEDCM_COOLDOWN: ("POST", "/session/$sessionId/fedcm/resetcooldown"),
}


[docs] class RemoteConnection: """A connection with the Remote WebDriver server. Communicates with the server using the WebDriver wire protocol: https://github.com/SeleniumHQ/selenium/wiki/JsonWireProtocol """ browser_name = None # Keep backward compatibility for AppiumConnection - https://github.com/SeleniumHQ/selenium/issues/14694 import os import socket import certifi _timeout = ( float(os.getenv("GLOBAL_DEFAULT_TIMEOUT", str(socket.getdefaulttimeout()))) if os.getenv("GLOBAL_DEFAULT_TIMEOUT") is not None else socket.getdefaulttimeout() ) _ca_certs = os.getenv("REQUESTS_CA_BUNDLE") if "REQUESTS_CA_BUNDLE" in os.environ else certifi.where() _client_config: ClientConfig = None system = platform.system().lower() if system == "darwin": system = "mac" # Class variables for headers extra_headers = None user_agent = f"selenium/{__version__} (python {system})"
[docs] @classmethod def get_timeout(cls): """:Returns: Timeout value in seconds for all http requests made to the Remote Connection """ warnings.warn( "get_timeout() in RemoteConnection is deprecated, get timeout from ClientConfig instance instead", DeprecationWarning, stacklevel=2, ) return cls._client_config.timeout
[docs] @classmethod def set_timeout(cls, timeout): """Override the default timeout. :Args: - timeout - timeout value for http requests in seconds """ warnings.warn( "set_timeout() in RemoteConnection is deprecated, set timeout to ClientConfig instance in constructor instead", DeprecationWarning, stacklevel=2, ) cls._client_config.timeout = timeout
[docs] @classmethod def reset_timeout(cls): """Reset the http request timeout to socket._GLOBAL_DEFAULT_TIMEOUT.""" warnings.warn( "reset_timeout() in RemoteConnection is deprecated, use reset_timeout() in ClientConfig instance instead", DeprecationWarning, stacklevel=2, ) cls._client_config.reset_timeout()
[docs] @classmethod def get_certificate_bundle_path(cls): """:Returns: Paths of the .pem encoded certificate to verify connection to command executor. Defaults to certifi.where() or REQUESTS_CA_BUNDLE env variable if set. """ warnings.warn( "get_certificate_bundle_path() in RemoteConnection is deprecated, get ca_certs from ClientConfig instance instead", DeprecationWarning, stacklevel=2, ) return cls._client_config.ca_certs
[docs] @classmethod def set_certificate_bundle_path(cls, path): """Set the path to the certificate bundle to verify connection to command executor. Can also be set to None to disable certificate validation. :Args: - path - path of a .pem encoded certificate chain. """ warnings.warn( "set_certificate_bundle_path() in RemoteConnection is deprecated, set ca_certs to ClientConfig instance in constructor instead", DeprecationWarning, stacklevel=2, ) cls._client_config.ca_certs = path
[docs] @classmethod def get_remote_connection_headers(cls, parsed_url, keep_alive=False): """Get headers for remote request. :Args: - parsed_url - The parsed url - keep_alive (Boolean) - Is this a keep-alive connection (default: False) """ headers = { "Accept": "application/json", "Content-Type": "application/json;charset=UTF-8", "User-Agent": cls.user_agent, } if parsed_url.username: warnings.warn( "Embedding username and password in URL could be insecure, use ClientConfig instead", stacklevel=2 ) base64string = b64encode(f"{parsed_url.username}:{parsed_url.password}".encode()) headers.update({"Authorization": f"Basic {base64string.decode()}"}) if keep_alive: headers.update({"Connection": "keep-alive"}) if cls.extra_headers: headers.update(cls.extra_headers) return headers
def _identify_http_proxy_auth(self): parsed_url = urlparse(self._proxy_url) if parsed_url.username and parsed_url.password: return True def _separate_http_proxy_auth(self): parsed_url = urlparse(self._proxy_url) proxy_without_auth = f"{parsed_url.scheme}://{parsed_url.hostname}:{parsed_url.port}" auth = f"{parsed_url.username}:{parsed_url.password}" return proxy_without_auth, auth def _get_connection_manager(self): pool_manager_init_args = {"timeout": self._client_config.timeout} pool_manager_init_args.update( self._client_config.init_args_for_pool_manager.get("init_args_for_pool_manager", {}) ) if self._client_config.ignore_certificates: pool_manager_init_args["cert_reqs"] = "CERT_NONE" urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) elif self._client_config.ca_certs: pool_manager_init_args["cert_reqs"] = "CERT_REQUIRED" pool_manager_init_args["ca_certs"] = self._client_config.ca_certs if self._proxy_url: if self._proxy_url.lower().startswith("sock"): from urllib3.contrib.socks import SOCKSProxyManager return SOCKSProxyManager(self._proxy_url, **pool_manager_init_args) if self._identify_http_proxy_auth(): self._proxy_url, self._basic_proxy_auth = self._separate_http_proxy_auth() pool_manager_init_args["proxy_headers"] = urllib3.make_headers(proxy_basic_auth=self._basic_proxy_auth) return urllib3.ProxyManager(self._proxy_url, **pool_manager_init_args) return urllib3.PoolManager(**pool_manager_init_args) def __init__( self, remote_server_addr: Optional[str] = None, keep_alive: Optional[bool] = True, ignore_proxy: Optional[bool] = False, ignore_certificates: Optional[bool] = False, init_args_for_pool_manager: Optional[dict] = None, client_config: Optional[ClientConfig] = None, ): self._client_config = client_config or ClientConfig( remote_server_addr=remote_server_addr, keep_alive=keep_alive, ignore_certificates=ignore_certificates, init_args_for_pool_manager=init_args_for_pool_manager, ) # Keep backward compatibility for AppiumConnection - https://github.com/SeleniumHQ/selenium/issues/14694 RemoteConnection._timeout = self._client_config.timeout RemoteConnection._ca_certs = self._client_config.ca_certs RemoteConnection._client_config = self._client_config RemoteConnection.extra_headers = self._client_config.extra_headers or RemoteConnection.extra_headers RemoteConnection.user_agent = self._client_config.user_agent or RemoteConnection.user_agent if remote_server_addr: warnings.warn( "setting remote_server_addr in RemoteConnection() is deprecated, set in ClientConfig instance instead", DeprecationWarning, stacklevel=2, ) if not keep_alive: warnings.warn( "setting keep_alive in RemoteConnection() is deprecated, set in ClientConfig instance instead", DeprecationWarning, stacklevel=2, ) if ignore_certificates: warnings.warn( "setting ignore_certificates in RemoteConnection() is deprecated, set in ClientConfig instance instead", DeprecationWarning, stacklevel=2, ) if init_args_for_pool_manager: warnings.warn( "setting init_args_for_pool_manager in RemoteConnection() is deprecated, set in ClientConfig instance instead", DeprecationWarning, stacklevel=2, ) if ignore_proxy: warnings.warn( "setting ignore_proxy in RemoteConnection() is deprecated, set in ClientConfig instance instead", DeprecationWarning, stacklevel=2, ) self._proxy_url = None else: self._proxy_url = self._client_config.get_proxy_url() if self._client_config.keep_alive: self._conn = self._get_connection_manager() self._commands = remote_commands extra_commands = {}
[docs] def add_command(self, name, method, url): """Register a new command.""" self._commands[name] = (method, url)
[docs] def get_command(self, name: str): """Retrieve a command if it exists.""" return self._commands.get(name)
[docs] def execute(self, command, params): """Send a command to the remote server. Any path substitutions required for the URL mapped to the command should be included in the command parameters. :Args: - command - A string specifying the command to execute. - params - A dictionary of named parameters to send with the command as its JSON payload. """ command_info = self._commands.get(command) or self.extra_commands.get(command) assert command_info is not None, f"Unrecognised command {command}" path_string = command_info[1] path = string.Template(path_string).substitute(params) substitute_params = {word[1:] for word in path_string.split("/") if word.startswith("$")} # remove dollar sign if isinstance(params, dict) and substitute_params: for word in substitute_params: del params[word] data = utils.dump_json(params) url = f"{self._client_config.remote_server_addr}{path}" trimmed = self._trim_large_entries(params) LOGGER.debug("%s %s %s", command_info[0], url, str(trimmed)) return self._request(command_info[0], url, body=data)
def _request(self, method, url, body=None): """Send an HTTP request to the remote server. :Args: - method - A string for the HTTP method to send the request with. - url - A string for the URL to send the request to. - body - A string for request body. Ignored unless method is POST or PUT. :Returns: A dictionary with the server's parsed JSON response. """ parsed_url = parse.urlparse(url) headers = self.get_remote_connection_headers(parsed_url, self._client_config.keep_alive) auth_header = self._client_config.get_auth_header() if auth_header: headers.update(auth_header) if body and method not in ("POST", "PUT"): body = None if self._client_config.keep_alive: response = self._conn.request(method, url, body=body, headers=headers, timeout=self._client_config.timeout) statuscode = response.status else: conn = self._get_connection_manager() with conn as http: response = http.request(method, url, body=body, headers=headers, timeout=self._client_config.timeout) statuscode = response.status data = response.data.decode("UTF-8") LOGGER.debug("Remote response: status=%s | data=%s | headers=%s", response.status, data, response.headers) try: if 300 <= statuscode < 304: return self._request("GET", response.headers.get("location", None)) if 399 < statuscode <= 500: if statuscode == 401: return {"status": statuscode, "value": "Authorization Required"} return {"status": statuscode, "value": str(statuscode) if not data else data.strip()} content_type = [] if response.headers.get("Content-Type", None): content_type = response.headers.get("Content-Type", None).split(";") if not any([x.startswith("image/png") for x in content_type]): try: data = utils.load_json(data.strip()) except ValueError: if 199 < statuscode < 300: status = ErrorCode.SUCCESS else: status = ErrorCode.UNKNOWN_ERROR return {"status": status, "value": data.strip()} # Some drivers incorrectly return a response # with no 'value' field when they should return null. if "value" not in data: data["value"] = None return data data = {"status": 0, "value": data} return data finally: LOGGER.debug("Finished Request") response.close()
[docs] def close(self): """Clean up resources when finished with the remote_connection.""" if hasattr(self, "_conn"): self._conn.clear()
def _trim_large_entries(self, input_dict, max_length=100): """Truncate string values in a dictionary if they exceed max_length. :param dict: Dictionary with potentially large values :param max_length: Maximum allowed length of string values :return: Dictionary with truncated string values """ output_dictionary = {} for key, value in input_dict.items(): if isinstance(value, dict): output_dictionary[key] = self._trim_large_entries(value, max_length) elif isinstance(value, str) and len(value) > max_length: output_dictionary[key] = value[:max_length] + "..." else: output_dictionary[key] = value return output_dictionary