import logging
import typing
from lxml import etree
from requests_toolbelt.multipart.decoder import MultipartDecoder
from zeep import ns, plugins, wsa
from zeep.exceptions import Fault, TransportError, XMLSyntaxError
from zeep.loader import parse_xml
from zeep.utils import as_qname, get_media_type, qname_attr
from zeep.wsdl.attachments import MessagePack
from zeep.wsdl.definitions import Binding, Operation
from zeep.wsdl.messages import DocumentMessage, RpcMessage
from zeep.wsdl.messages.xop import process_xop
from zeep.wsdl.utils import etree_to_string, url_http_to_https
if typing.TYPE_CHECKING:
from zeep.wsdl.wsdl import Definition
logger = logging.getLogger(__name__)
class SoapBinding(Binding):
"""Soap 1.1/1.2 binding"""
def __init__(self, wsdl, name, port_name, transport, default_style):
"""The SoapBinding is the base class for the Soap11Binding and
Soap12Binding.
:param wsdl:
:type wsdl:
:param name:
:type name: string
:param port_name:
:type port_name: string
:param transport:
:type transport: zeep.transports.Transport
:param default_style:
"""
super().__init__(wsdl, name, port_name)
self.transport = transport
self.default_style = default_style
@classmethod
def match(cls, node):
"""Check if this binding instance should be used to parse the given
node.
:param node: The node to match against
:type node: lxml.etree._Element
"""
soap_node = node.find("soap:binding", namespaces=cls.nsmap)
return soap_node is not None
def create_message(self, operation, *args, **kwargs):
envelope, http_headers = self._create(operation, args, kwargs)
return envelope
def _create(self, operation, args, kwargs, client=None, options=None):
"""Create the XML document to send to the server.
Note that this generates the soap envelope without the wsse applied.
"""
operation_obj = self.get(operation)
if not operation_obj:
raise ValueError("Operation %r not found" % operation)
# Create the SOAP envelope
serialized = operation_obj.create(*args, **kwargs)
self._set_http_headers(serialized, operation_obj)
envelope = serialized.content
http_headers = serialized.headers
# Apply ws-addressing
if client:
if not options:
options = client.service._binding_options
if operation_obj.abstract.wsa_action:
envelope, http_headers = wsa.WsAddressingPlugin().egress(
envelope, http_headers, operation_obj, options
)
# Apply plugins
envelope, http_headers = plugins.apply_egress(
client, envelope, http_headers, operation_obj, options
)
# Apply WSSE
if client.wsse:
if isinstance(client.wsse, list):
for wsse in client.wsse:
envelope, http_headers = wsse.apply(envelope, http_headers)
else:
envelope, http_headers = client.wsse.apply(envelope, http_headers)
# Add extra http headers from the setings object
if client.settings.extra_http_headers:
http_headers.update(client.settings.extra_http_headers)
return envelope, http_headers
def send(self, client, options, operation, args, kwargs):
"""Called from the service
:param client: The client with which the operation was called
:type client: zeep.client.Client
:param options: The binding options
:type options: dict
:param operation: The operation object from which this is a reply
:type operation: zeep.wsdl.definitions.Operation
:param args: The args to pass to the operation
:type args: tuple
:param kwargs: The kwargs to pass to the operation
:type kwargs: dict
"""
envelope, http_headers = self._create(
operation, args, kwargs, client=client, options=options
)
response = client.transport.post_xml(options["address"], envelope, http_headers)
operation_obj = self.get(operation)
# If the client wants to return the raw data then let's do that.
if client.settings.raw_response:
return response
return self.process_reply(client, operation_obj, response)
async def send_async(self, client, options, operation, args, kwargs):
"""Called from the async service
:param client: The client with which the operation was called
:type client: zeep.client.Client
:param options: The binding options
:type options: dict
:param operation: The operation object from which this is a reply
:type operation: zeep.wsdl.definitions.Operation
:param args: The args to pass to the operation
:type args: tuple
:param kwargs: The kwargs to pass to the operation
:type kwargs: dict
"""
envelope, http_headers = self._create(
operation, args, kwargs, client=client, options=options
)
response = await client.transport.post_xml(
options["address"], envelope, http_headers
)
if client.settings.raw_response:
return response
operation_obj = self.get(operation)
return self.process_reply(client, operation_obj, response)
def process_reply(self, client, operation, response):
"""Process the XML reply from the server.
:param client: The client with which the operation was called
:type client: zeep.client.Client
:param operation: The operation object from which this is a reply
:type operation: zeep.wsdl.definitions.Operation
:param response: The response object returned by the remote server
:type response: requests.Response
"""
if response.status_code in (201, 202) and not response.content:
return None
elif response.status_code != 200 and not response.content:
raise TransportError(
"Server returned HTTP status %d (no content available)"
% response.status_code,
status_code=response.status_code,
)
content_type = response.headers.get("Content-Type", "text/xml")
media_type = get_media_type(content_type)
message_pack = None
# If the reply is a multipart/related then we need to retrieve all the
# parts
if media_type == "multipart/related":
decoder = MultipartDecoder(
response.content, content_type, response.encoding or "utf-8"
)
content = decoder.parts[0].content
if len(decoder.parts) > 1:
message_pack = MessagePack(parts=decoder.parts[1:])
else:
content = response.content
try:
doc = parse_xml(content, self.transport, settings=client.settings)
except XMLSyntaxError as exc:
raise TransportError(
"Server returned response (%s) with invalid XML: %s.\nContent: %r"
% (response.status_code, exc, response.content),
status_code=response.status_code,
content=response.content,
)
# Check if this is an XOP message which we need to decode first
if message_pack:
if process_xop(doc, message_pack):
message_pack = None
if client.wsse:
client.wsse.verify(doc)
doc, http_headers = plugins.apply_ingress(
client, doc, response.headers, operation
)
# If the response code is not 200 or if there is a Fault node available
# then assume that an error occured.
fault_node = doc.find("soap-env:Body/soap-env:Fault", namespaces=self.nsmap)
if response.status_code != 200 or fault_node is not None:
return self.process_error(doc, operation)
result = operation.process_reply(doc)
if message_pack:
message_pack._set_root(result)
return message_pack
return result
def process_error(self, doc, operation):
raise NotImplementedError
def process_service_port(self, xmlelement, force_https=False):
address_node = xmlelement.find("soap:address", namespaces=self.nsmap)
if address_node is None:
logger.debug("No valid soap:address found for service")
return
# Force the usage of HTTPS when the force_https boolean is true
location = address_node.get("location")
if force_https and location:
location = url_http_to_https(location)
if location != address_node.get("location"):
logger.warning("Forcing soap:address location to HTTPS")
return {"address": location}
@classmethod
def parse(cls, definitions, xmlelement):
"""
Definition::
*
<-- extensibility element (1) --> *
*
<-- extensibility element (2) --> *
?
<-- extensibility element (3) -->
?
<-- extensibility element (4) --> *
*
<-- extensibility element (5) --> *
"""
name = qname_attr(xmlelement, "name", definitions.target_namespace)
port_name = qname_attr(xmlelement, "type", definitions.target_namespace)
# The soap:binding element contains the transport method and
# default style attribute for the operations.
soap_node = xmlelement.find("soap:binding", namespaces=cls.nsmap)
transport = soap_node.get("transport")
supported_transports = [
"http://schemas.xmlsoap.org/soap/http",
"http://www.w3.org/2003/05/soap/bindings/HTTP/",
]
if transport not in supported_transports:
raise NotImplementedError(
"The binding transport %s is not supported (only soap/http)"
% (transport)
)
default_style = soap_node.get("style", "document")
obj = cls(definitions.wsdl, name, port_name, transport, default_style)
for node in xmlelement.findall("wsdl:operation", namespaces=cls.nsmap):
operation = SoapOperation.parse(definitions, node, obj, nsmap=cls.nsmap)
obj._operation_add(operation)
return obj
class Soap11Binding(SoapBinding):
nsmap = {
"soap": ns.SOAP_11,
"soap-env": ns.SOAP_ENV_11,
"wsdl": ns.WSDL,
"xsd": ns.XSD,
}
def process_error(self, doc, operation):
fault_node = doc.find("soap-env:Body/soap-env:Fault", namespaces=self.nsmap)
if fault_node is None:
raise Fault(
message="Unknown fault occured",
code=None,
actor=None,
detail=etree_to_string(doc),
)
def get_text(name):
child = fault_node.find(name, namespaces=fault_node.nsmap)
if child is not None:
return child.text
raise Fault(
message=get_text("faultstring"),
code=get_text("faultcode"),
actor=get_text("faultactor"),
detail=fault_node.find("detail", namespaces=fault_node.nsmap),
)
def _set_http_headers(self, serialized, operation):
serialized.headers["Content-Type"] = "text/xml; charset=utf-8"
class Soap12Binding(SoapBinding):
nsmap = {
"soap": ns.SOAP_12,
"soap-env": ns.SOAP_ENV_12,
"wsdl": ns.WSDL,
"xsd": ns.XSD,
}
def process_error(self, doc, operation):
fault_node = doc.find("soap-env:Body/soap-env:Fault", namespaces=self.nsmap)
if fault_node is None:
raise Fault(
message="Unknown fault occured",
code=None,
actor=None,
detail=etree_to_string(doc),
)
def get_text(name):
child = fault_node.find(name)
if child is not None:
return child.text
message = fault_node.findtext(
"soap-env:Reason/soap-env:Text", namespaces=self.nsmap
)
code = fault_node.findtext(
"soap-env:Code/soap-env:Value", namespaces=self.nsmap
)
# Extract the fault subcodes. These can be nested, as in subcodes can
# also contain other subcodes.
subcodes = []
subcode_element = fault_node.find(
"soap-env:Code/soap-env:Subcode", namespaces=self.nsmap
)
while subcode_element is not None:
subcode_value_element = subcode_element.find(
"soap-env:Value", namespaces=self.nsmap
)
subcode_qname = as_qname(
subcode_value_element.text, subcode_value_element.nsmap, None
)
subcodes.append(subcode_qname)
subcode_element = subcode_element.find(
"soap-env:Subcode", namespaces=self.nsmap
)
# TODO: We should use the fault message as defined in the wsdl.
detail_node = fault_node.find("soap-env:Detail", namespaces=self.nsmap)
raise Fault(
message=message,
code=code,
actor=None,
detail=detail_node,
subcodes=subcodes,
)
def _set_http_headers(self, serialized, operation):
serialized.headers["Content-Type"] = "; ".join(
[
"application/soap+xml",
"charset=utf-8",
'action="%s"' % operation.soapaction,
]
)
class SoapOperation(Operation):
"""Represent's an operation within a specific binding."""
def __init__(self, name, binding, nsmap, soapaction, style):
super().__init__(name, binding)
self.nsmap = nsmap
self.soapaction = soapaction
self.style = style
def process_reply(self, envelope):
envelope_qname = etree.QName(self.nsmap["soap-env"], "Envelope")
if envelope.tag != envelope_qname:
raise XMLSyntaxError(
(
"The XML returned by the server does not contain a valid "
+ "{%s}Envelope root element. The root element found is %s "
)
% (envelope_qname.namespace, envelope.tag)
)
if self.output:
return self.output.deserialize(envelope)
@classmethod
def parse(cls, definitions, xmlelement, binding, nsmap):
"""
Definition::
*
?
?
?
<-- extensibility element (4) --> *
*
<-- extensibility element (5) --> *
Example::
"""
name = xmlelement.get("name")
# The soap:operation element is required for soap/http bindings
# and may be omitted for other bindings.
soap_node = xmlelement.find("soap:operation", namespaces=binding.nsmap)
action = None
if soap_node is not None:
action = soap_node.get("soapAction")
style = soap_node.get("style", binding.default_style)
else:
style = binding.default_style
obj = cls(name, binding, nsmap, action, style)
if style == "rpc":
message_class = RpcMessage
else:
message_class = DocumentMessage
for node in xmlelement:
tag_name = etree.QName(node.tag).localname
if tag_name not in ("input", "output", "fault"):
continue
msg = message_class.parse(
definitions=definitions,
xmlelement=node,
operation=obj,
nsmap=nsmap,
type=tag_name,
)
if tag_name == "fault":
obj.faults[msg.name] = msg
else:
setattr(obj, tag_name, msg)
return obj
def resolve(self, definitions: "Definition"):
super().resolve(definitions)
for name, fault in self.faults.items():
if name in self.abstract.fault_messages:
fault.resolve(definitions, self.abstract.fault_messages[name])
if self.output:
self.output.resolve(definitions, self.abstract.output_message)
if self.input:
self.input.resolve(definitions, self.abstract.input_message)