import inspect
import json
import sys
from typing import Any, Optional, Union, Callable
from soar_sdk.asset import BaseAsset
from soar_sdk.input_spec import InputSpecification
from soar_sdk.compat import (
MIN_PHANTOM_VERSION,
PythonVersion,
)
from soar_sdk.shims.phantom_common.app_interface.app_interface import SoarRestClient
from soar_sdk.shims.phantom_common.encryption.encryption_manager_factory import (
platform_encryption_backend,
)
from soar_sdk.abstract import SOARClient, SOARClientAuth
from soar_sdk.action_results import ActionResult
from soar_sdk.actions_manager import ActionsManager
from soar_sdk.app_cli_runner import AppCliRunner
from soar_sdk.meta.webhooks import WebhookMeta
from soar_sdk.params import Params
from soar_sdk.app_client import AppClient
from soar_sdk.action_results import ActionOutput
from soar_sdk.logging import getLogger
from soar_sdk.types import Action
from soar_sdk.webhooks.routing import Router
from soar_sdk.webhooks.models import WebhookRequest, WebhookResponse
from soar_sdk.exceptions import ActionRegistrationError
import uuid
from soar_sdk.decorators import (
ConnectivityTestDecorator,
ActionDecorator,
ViewHandlerDecorator,
OnPollDecorator,
WebhookDecorator,
GenericActionDecorator,
)
def is_valid_uuid(value: str) -> bool:
"""Validates if a string is a valid UUID."""
try:
return str(uuid.UUID(value)).lower() == value.lower()
except ValueError:
return False
[docs]
class App:
"""Main application class for SOAR connectors.
This class provides the foundation for building SOAR connectors. It handles action registration, asset
management, test connectivity, polling, and webhook functionality.
The App class serves as the central coordinator for all app functionality,
providing decorators for action registration and managing the lifecycle of
SOAR operations.
Args:
name: Human-readable name of the app.
app_type: Type of the app (e.g., "investigative", "corrective").
logo: Path to the app's logo image.
logo_dark: Path to the app's dark theme logo image.
product_vendor: Vendor of the product this app integrates with.
product_name: Name of the product this app integrates with.
publisher: Publisher of the app.
appid: Unique UUID identifier for the app.
python_version: List of supported Python versions. Defaults to all supported versions.
min_phantom_version: Minimum required SOAR version. Defaults to configured minimum.
fips_compliant: Whether the app is FIPS compliant. Defaults to False.
asset_cls: Asset class to use for configuration. Defaults to BaseAsset.
Raises:
ValueError: If appid is not a valid UUID.
Example:
>>> app = App(
... name="My SOAR App",
... app_type="investigative",
... logo="logo.png",
... logo_dark="logo_dark.png",
... product_vendor="Acme Corp",
... product_name="Security Platform",
... publisher="My Company",
... appid="12345678-1234-5678-9012-123456789012",
... )
"""
[docs]
def __init__(
self,
*,
name: str,
app_type: str,
logo: str,
logo_dark: str,
product_vendor: str,
product_name: str,
publisher: str,
appid: str,
python_version: Optional[list[PythonVersion]] = None,
min_phantom_version: str = MIN_PHANTOM_VERSION,
fips_compliant: bool = False,
asset_cls: type[BaseAsset] = BaseAsset,
) -> None:
self.asset_cls = asset_cls
self._raw_asset_config: dict[str, Any] = {}
self.__logger = getLogger()
if not is_valid_uuid(appid):
raise ValueError(f"Appid is not a valid uuid: {appid}")
if python_version is None:
python_version = PythonVersion.all()
self.app_meta_info = {
"name": name,
"type": app_type,
"logo": logo,
"logo_dark": logo_dark,
"product_vendor": product_vendor,
"product_name": product_name,
"publisher": publisher,
"python_version": python_version,
"min_phantom_version": min_phantom_version,
"fips_compliant": fips_compliant,
"appid": appid,
}
self.actions_manager: ActionsManager = ActionsManager()
self.soar_client: SOARClient = AppClient()
def get_actions(self) -> dict[str, Action]:
"""Returns the list of actions registered in the app."""
return self.actions_manager.get_actions()
def cli(self) -> None:
"""This is just a handy shortcut for reducing imports in the main app code.
It uses AppRunner to run locally app the same way as main() in the legacy
connectors.
"""
runner = AppCliRunner(self)
runner.run()
def handle(self, raw_input_data: str, handle: Optional[int] = None) -> str:
"""Runs handling of the input data on connector.
NOTE: handle is actually a pointer address to spawn's internal state.
In versions of SOAR >6.4.1, handle will not be passed to the app.
"""
input_data = InputSpecification.parse_obj(json.loads(raw_input_data))
self._raw_asset_config = input_data.config.get_asset_config()
# Decrypt sensitive fields in the asset configuration
asset_id = input_data.asset_id
for field in self.asset_cls.fields_requiring_decryption():
if field in self._raw_asset_config:
self._raw_asset_config[field] = platform_encryption_backend.decrypt(
self._raw_asset_config[field], str(asset_id)
)
self.__logger.handler.set_handle(handle)
soar_auth = App.create_soar_client_auth_object(input_data)
self.soar_client.update_client(soar_auth, input_data.asset_id)
return self.actions_manager.handle(input_data, handle=handle)
@staticmethod
def create_soar_client_auth_object(
input_data: InputSpecification,
) -> SOARClientAuth:
"""Creates a SOARClientAuth object based on the input data.
This is used to authenticate the SOAR client before running actions.
"""
if input_data.user_session_token:
return SOARClientAuth(
user_session_token=input_data.user_session_token,
base_url=ActionsManager.get_soar_base_url(),
)
elif input_data.soar_auth:
return SOARClientAuth(
username=input_data.soar_auth.username,
password=input_data.soar_auth.password,
base_url=input_data.soar_auth.phantom_url,
)
else:
return SOARClientAuth(base_url=ActionsManager.get_soar_base_url())
__call__ = handle # the app instance can be called for ease of use by spawn3
@property
def asset(self) -> BaseAsset:
"""Returns the asset instance for the app."""
if not hasattr(self, "_asset"):
self._asset = self.asset_cls.parse_obj(self._raw_asset_config)
return self._asset
[docs]
def register_action(
self,
/,
action: Callable,
*,
name: Optional[str] = None,
identifier: Optional[str] = None,
description: Optional[str] = None,
verbose: str = "",
action_type: str = "generic", # TODO: consider introducing enum type for that
read_only: bool = True,
params_class: Optional[type[Params]] = None,
output_class: Optional[type[ActionOutput]] = None,
view_handler: Optional[Callable] = None,
view_template: Optional[str] = None,
versions: str = "EQ(*)",
) -> Action:
"""Dynamically register an action function defined in another module.
This method allows an app to dynamically import and register an action function
that is defined in a separate module. It provides a programmatic way to register
actions without using decorators directly on the action function.
Args:
action: Function import for the action. Must be a callable function that
follows SOAR action function conventions and is imported from another module.
name: Human-readable name for the action. If not provided, defaults
to the function name with underscores replaced by spaces.
identifier: Unique identifier for the action. If not provided, defaults
to the function name.
description: Brief description of what the action does. Used in the
app manifest and UI.
verbose: Detailed description or usage information for the action.
action_type: Type of action (e.g., "generic", "investigate", "correct").
Defaults to "generic".
read_only: Whether the action only reads data without making changes.
Defaults to True for safety.
params_class: Pydantic model class for validating action parameters.
If not provided, uses generic parameter validation.
output_class: Pydantic model class for structuring action output.
If not provided, uses generic output format.
view_handler: Optional raw view handler function to associate with this action.
Will be automatically decorated with the view_handler decorator.
view_template: Template name to use with the view handler. Only
relevant if view_handler is provided.
versions: Version constraint string for when this action is available.
Defaults to "EQ(*)" (all versions).
Returns:
The registered Action instance with all metadata and handlers configured.
Raises:
ActionRegistrationError: If view_handler is provided but cannot be
found in its original module for replacement.
Example:
>>> from my_actions_module import my_action_function
>>> from my_views_module import my_view_handler
>>>
>>> action = app.register_action(
... my_action_function,
... name="Dynamic Action",
... description="Action imported from another module",
... view_handler=my_view_handler,
... view_template="custom_template.html",
... )
"""
if view_handler:
decorated_view_handler = self.view_handler(template=view_template)(
view_handler
)
# Replace the function in its original module with the decorated version
if hasattr(view_handler, "__module__") and view_handler.__module__:
import sys
if (
original_module := sys.modules.get(view_handler.__module__)
) and hasattr(original_module, view_handler.__name__):
setattr(
original_module, view_handler.__name__, decorated_view_handler
)
else:
raise ActionRegistrationError(
f"View handler {view_handler.__name__} not found in its module {view_handler.__module__}"
)
view_handler = decorated_view_handler
return self.action(
name=name,
identifier=identifier,
description=description,
verbose=verbose,
action_type=action_type,
read_only=read_only,
params_class=params_class,
output_class=output_class,
view_handler=view_handler,
versions=versions,
)(action)
[docs]
def action(
self,
*,
name: Optional[str] = None,
identifier: Optional[str] = None,
description: Optional[str] = None,
verbose: str = "",
action_type: str = "generic", # TODO: consider introducing enum type for that
read_only: bool = True,
params_class: Optional[type[Params]] = None,
output_class: Optional[type[ActionOutput]] = None,
view_handler: Optional[Callable] = None,
versions: str = "EQ(*)",
) -> ActionDecorator:
"""Decorator for registering an action function.
This decorator marks a function as an action handler for the app. For more
information on how to write action functions, see the follow actions documentation:
- :doc:`Action Anatomy </actions/action_anatomy>`
- :doc:`Action Parameters </actions/action_params>`
- :doc:`Action Outputs </actions/action_outputs>`
"""
return ActionDecorator(
app=self,
name=name,
identifier=identifier,
description=description,
verbose=verbose,
action_type=action_type,
read_only=read_only,
params_class=params_class,
output_class=output_class,
view_handler=view_handler,
versions=versions,
)
[docs]
def test_connectivity(self) -> ConnectivityTestDecorator:
"""Decorator for registering a test connectivity function.
This decorator marks a function as the test connectivity action for the app.
Test connectivity is used to verify that the app can successfully connect to
its configured external service or API. Only one test connectivity function
is allowed per app.
Returns:
ConnectivityTestDecorator: A decorator instance that handles test
connectivity registration.
Example:
>>> @app.test_connectivity()
... def test_connectivity_handler(self, asset: Asset):
... logger.info(f"testing connectivity against {asset.base_url}")
Note:
The test connectivity function should not return anything or raise an exception if it fails.
"""
return ConnectivityTestDecorator(self)
[docs]
def on_poll(self) -> OnPollDecorator:
"""Decorator for the on_poll action.
The decorated function must be a generator (using yield) or return an Iterator that yields Container and/or Artifact objects. Only one on_poll action is allowed per app.
Usage:
If a Container is yielded first, all subsequent Artifacts will be added to that container unless they already have a `container_id`.
If an `Artifact` is yielded without a container and no `container_id` is set, it will be skipped.
Example:
>>> @app.on_poll()
... def on_poll(
... params: OnPollParams, client: SOARClient, asset: Asset
... ) -> Iterator[Union[Container, Artifact]]:
... yield Container(
... name="Network Alerts",
... description="Some network-related alerts",
... severity="medium",
... )
"""
return OnPollDecorator(self)
[docs]
def view_handler(
self,
*,
template: Optional[str] = None,
) -> ViewHandlerDecorator:
"""Decorator for custom view functions with output parsing and template rendering.
The decorated function receives parsed ActionOutput objects and can return either a dict for template rendering, HTML string, or component data model.
If a template is provided, dict results will be rendered using the template. Component type is automatically inferred from the return type annotation.
For more information on custom views, see the following :doc:`custom views documentation </custom_views/index>`:
Example:
>>> @app.view_handler(template="my_template.html")
... def my_view(outputs: List[MyActionOutput]) -> dict:
... return {"data": outputs[0].some_field}
>>> @app.view_handler()
... def my_chart_view(outputs: List[MyActionOutput]) -> PieChartData:
... return PieChartData(
... title="Chart",
... labels=["A", "B"],
... values=[1, 2],
... colors=["red", "blue"],
... )
"""
return ViewHandlerDecorator(self, template=template)
[docs]
def generic_action(
self, output_class: Optional[type[ActionOutput]] = None
) -> GenericActionDecorator:
"""Decorator for registering a generic action function.
This decorator marks a function as the generic action for the app. Generic action is used to call any endpoint of the underlying API service this app implements.
Only one generic action is allowed per app. The function you define needs to accept at least one parameter of type `GenericActionParams` and can accept any other parameters you need.
Other useful parameters to accept are the SOARClient and the asset.
Returns:
GenericActionDecorator: A decorator instance that handles generic action registration.
Example:
>>> @app.generic_action()
... def http_action(
... self, params: GenericActionParams, asset: Asset
... ) -> GenericActionOutput:
... logger.info(f"testing connectivity against {asset.base_url}")
... return GenericActionOutput(
... status_code=200,
... response_body=f"Base url is {asset.base_url}",
... )
Note:
The generic action function should return either a GenericActionOutput object or an output class derived from ActionOutput/GenericActionOutput.
"""
return GenericActionDecorator(self, output_class=output_class)
@staticmethod
def _validate_params_class(
action_name: str,
spec: inspect.FullArgSpec,
params_class: Optional[type[Params]] = None,
) -> type[Params]:
"""Validates the class used for params argument of the action.
Ensures the class is defined and provided as it is also used for building
the manifest JSON file.
"""
# validating params argument
validated_params_class = params_class or Params
if params_class is None:
# try to fetch from the function args typehints
if not len(spec.args):
raise TypeError(
"Action function must accept at least the params positional argument"
)
params_arg = spec.args[0]
annotated_params_type: Optional[type] = spec.annotations.get(params_arg)
if annotated_params_type is None:
raise TypeError(
f"Action {action_name} has no params type set. "
"The params argument must provide type which is derived "
"from Params class"
)
if issubclass(annotated_params_type, Params):
validated_params_class = annotated_params_type
else:
raise TypeError(
f"Proper params type for action {action_name} is not derived from Params class."
)
return validated_params_class
def _build_magic_args(self, function: Callable, **kwargs: object) -> dict[str, Any]:
"""Builds the auto-magic optional arguments for an action function.
This is used to pass the soar client and asset to the action function, when requested.
"""
# The reason we wrap values in callables is to avoid evaluating any lazy attributes
# (like asset) unless they're actually going to be used in the action function.
magic_args: dict[str, Union[object, Callable[[], object]]] = {
"soar": self.soar_client,
"asset": lambda: self.asset,
}
sig = inspect.signature(function)
for name, value_or_getter in magic_args.items():
given_value = kwargs.pop(name, None)
if name in sig.parameters:
# Give the original kwargs precedence over the magic args
value = (
value_or_getter() if callable(value_or_getter) else value_or_getter
)
kwargs[name] = given_value or value
return kwargs
@staticmethod
def _validate_params(params: Params, action_name: str) -> Params:
"""Validates input params, checking them against the use of proper Params class inheritance.
This is automatically covered by AppClient, but can be also useful for when
using in testing with mocked SOARClient implementation.
"""
if not isinstance(params, Params):
raise TypeError(
f"Provided params are not inheriting from Params class for action {action_name}"
)
return params
@staticmethod
def _adapt_action_result(
result: Union[ActionOutput, ActionResult, tuple[bool, str], bool],
actions_manager: ActionsManager,
action_params: Optional[Params] = None,
) -> bool:
"""Handles multiple ways of returning response from action.
The simplest result can be returned from the action as a tuple of success
boolean value and an extra message to add.
For backward compatibility, it also supports returning ActionResult object as
in the legacy Connectors.
"""
if isinstance(result, ActionOutput):
output_dict = result.dict()
param_dict = action_params.dict() if action_params else None
result = ActionResult(
status=True,
message=result.generate_action_summary_message(),
param=param_dict,
)
result.add_data(output_dict)
if isinstance(result, ActionResult):
actions_manager.add_result(result)
return result.get_status()
if isinstance(result, tuple) and 2 <= len(result) <= 3:
action_result = ActionResult(*result)
actions_manager.add_result(action_result)
return result[0]
return False
@staticmethod
def _dev_skip_in_pytest(function: Callable, inner: Action) -> None:
"""When running pytest, all actions with a name starting with test_ will be treated as test.
This method will mark them as to be skipped.
"""
if "pytest" in sys.modules and function.__name__.startswith("test_"):
# importing locally to not require this package in the runtime requirements
import pytest
pytest.mark.skip(inner)
webhook_meta: Optional[WebhookMeta] = None
webhook_router: Optional[Router] = None
[docs]
def enable_webhooks(
self,
default_requires_auth: bool = True,
default_allowed_headers: Optional[list[str]] = None,
default_ip_allowlist: Optional[list[str]] = None,
) -> "App":
"""Enable webhook functionality for the app.
This method configures the app to handle incoming webhook requests by setting
up the security and routing configurations.
Args:
default_requires_auth: Whether webhooks require authentication by default.
default_allowed_headers: List of HTTP headers allowed in webhook requests.
default_ip_allowlist: List of IP addresses/CIDR blocks allowed to send webhooks.
Defaults to ["0.0.0.0/0", "::/0"] (allow all).
Returns:
The App instance for method chaining.
Example:
>>> app.enable_webhooks(
... default_requires_auth=True,
... default_allowed_headers=["X-Custom-Header"],
... default_ip_allowlist=["192.168.1.0/24"],
... )
"""
if default_allowed_headers is None:
default_allowed_headers = []
if default_ip_allowlist is None:
default_ip_allowlist = ["0.0.0.0/0", "::/0"]
self.webhook_meta = WebhookMeta(
handler=None, # The handler is set by the ManifestProcessor when generating the final manifest
requires_auth=default_requires_auth,
allowed_headers=default_allowed_headers,
ip_allowlist=default_ip_allowlist,
)
self.webhook_router = Router()
return self
def webhook(
self, url_pattern: str, allowed_methods: Optional[list[str]] = None
) -> WebhookDecorator:
"""Decorator for registering a webhook handler."""
return WebhookDecorator(self, url_pattern, allowed_methods)
def handle_webhook(
self,
method: str,
headers: dict[str, str],
path_parts: list[str],
query: dict[str, Union[str, list[str], None]],
body: Optional[str],
asset: dict,
soar_rest_client: SoarRestClient,
) -> dict:
"""Handles the incoming webhook request."""
if self.webhook_router is None:
raise RuntimeError("Webhooks are not enabled for this app.")
self._raw_asset_config = asset
_, soar_auth_token = soar_rest_client.session.headers["Cookie"].split("=")
asset_id = soar_rest_client.asset_id
soar_base_url = soar_rest_client.base_url
soar_auth = SOARClientAuth(
user_session_token=soar_auth_token,
base_url=soar_base_url,
)
self.soar_client.update_client(soar_auth, asset_id)
normalized_query = {}
for key, value in query.items():
# Normalize query parameters to always be a list
# This is needed because SOAR prior to 7.0.0 used to flatten query parameters to the last item per key
# SOAR 7.0.0+ will normalize all query parameters to lists, with an "empty" parameter expressed as a list containing an empty string
if value is None:
normalized_query[key] = [""]
elif isinstance(value, list):
normalized_query[key] = value
else:
normalized_query[key] = [value]
request = WebhookRequest(
method=method,
headers=headers,
path_parts=path_parts,
query=normalized_query,
body=body,
asset=self.asset,
soar_auth_token=soar_auth_token,
soar_base_url=soar_base_url,
asset_id=asset_id,
)
response = self.webhook_router.handle_request(request)
if not isinstance(response, WebhookResponse):
raise TypeError(
f"Webhook handler must return a WebhookResponse, got {type(response)}"
)
return response.dict()