Source code for soar_sdk.abstract

from abc import abstractmethod
from collections.abc import AsyncIterable, Iterable, Mapping
from typing import Any, Generic, TypeVar

import httpx
from pydantic import field_validator
from pydantic.dataclasses import dataclass

from soar_sdk.action_results import ActionOutput
from soar_sdk.apis.artifact import Artifact
from soar_sdk.apis.container import Container
from soar_sdk.apis.vault import Vault

JSONType = dict[str, Any] | list[Any] | str | int | float | bool | None
SummaryType = TypeVar("SummaryType", bound=ActionOutput)


@dataclass
class SOARClientAuth:
    """Authentication credentials for the SOAR API."""

    base_url: str
    username: str = ""
    password: str = ""
    user_session_token: str = ""
    # Broker authentication (populated when running on AB)
    user_hash_key: str = ""
    broker_ph_auth_token: str = ""

    @field_validator("base_url")
    @classmethod
    def validate_phantom_url(cls, value: str) -> str:
        """Validate and format the base URL for the SOAR API."""
        return (
            f"https://{value}"
            if not value.startswith(("http://", "https://"))
            else value
        )


[docs] class SOARClient(Generic[SummaryType]): """An API interface for interacting with the Splunk SOAR Platform.""" @property @abstractmethod def client(self) -> httpx.Client: """Generic HTTP client. Subclasses must define.""" pass @property @abstractmethod def vault(self) -> Vault: """Object governing interaction with the SOAR Vault API. Subclasses must define.""" pass @property @abstractmethod def artifact(self) -> Artifact: """Object governing interaction with the SOAR artifact API. Subclasses must define.""" pass @property @abstractmethod def container(self) -> Container: """Object governing interaction with the SOAR container API. Subclasses must define.""" pass
[docs] @abstractmethod def get_executing_container_id(self) -> int: """Return the current Container ID passed in the Connector Run Action JSON.""" pass
[docs] @abstractmethod def get_asset_id(self) -> str: """Return the current Asset ID passed in the Connector Run Action JSON.""" pass
MAX_BULK_FINDINGS = 500
[docs] def create_finding(self, finding: dict[str, Any]) -> dict[str, Any]: """Create a single finding in ES via the SOAR proxy.""" response = self.post( "/rest/enterprise_security/findings", json=finding, timeout=30.0 ) return response.json()
[docs] def create_findings_bulk( self, findings: list[dict[str, Any]], container_ids: list[int | None] | None = None, ) -> dict[str, Any]: """Create findings in ES via the SOAR bulk endpoint. Accepts up to 500 findings per call. When ``container_ids`` is provided, the backend links pre-existing containers to the created findings instead of creating new ones. Returns the bulk response containing ``status``, ``created``, ``failed``, ``findings`` (list of IDs), ``container_ids`` (list of SOAR container IDs), and ``errors``. """ if not findings: return { "status": "success", "created": 0, "failed": 0, "findings": [], "container_ids": [], "errors": [], } if len(findings) > self.MAX_BULK_FINDINGS: raise ValueError( f"Maximum {self.MAX_BULK_FINDINGS} findings per request, got {len(findings)}" ) endpoint = "/rest/enterprise_security/findings/bulk_create" timeout = max(30.0, len(findings) * 5.0) payload: dict[str, Any] | list = findings if container_ids is not None: payload = {"findings": findings, "container_ids": container_ids} response = self.post(endpoint, json=payload, timeout=timeout) return response.json()
[docs] def upload_finding_attachment( self, finding_id: str, file_name: str, data: bytes, source_type: str = "Incident", is_raw_email: bool = True, ) -> dict[str, Any]: """Upload an attachment to a finding via the SOAR proxy.""" import base64 endpoint = f"/rest/enterprise_security/findings/{finding_id}/files" encoded_data = base64.b64encode(data).decode("utf-8") response = self.post( endpoint, json={ "file_name": file_name, "data": encoded_data, "source_type": source_type, "is_raw_email": is_raw_email, }, timeout=30.0, ) return response.json()
[docs] def get( self, endpoint: str, *, params: dict[str, Any] | httpx.QueryParams | None = None, headers: dict[str, str] | None = None, cookies: dict[str, str] | None = None, timeout: httpx.Timeout | None = None, auth: httpx.Auth | tuple[str, str] | None = None, follow_redirects: bool = False, extensions: Mapping[str, Any] | None = None, ) -> httpx.Response: """Perform a GET request to the specific endpoint using the SOAR client.""" response = self.client.get( endpoint, params=params, headers=headers, cookies=cookies, timeout=timeout, auth=auth or httpx.USE_CLIENT_DEFAULT, follow_redirects=follow_redirects, extensions=extensions, ) response.raise_for_status() return response
[docs] def post( self, endpoint: str, *, content: str | bytes | Iterable[bytes] | AsyncIterable[bytes] | None = None, data: Mapping[str, Any] | None = None, files: dict[str, Any] | None = None, json: JSONType | None = None, params: dict[str, Any] | None = None, headers: dict[str, str] | None = None, cookies: dict[str, str] | None = None, auth: httpx.Auth | tuple[str, str] | None = None, timeout: float | httpx.Timeout | None = None, follow_redirects: bool = True, extensions: Mapping[str, Any] | None = None, ) -> httpx.Response: """Perform a POST request to the specific endpoint using the SOAR client.""" headers = headers or {} headers.update({"Referer": f"{self.client.base_url}/{endpoint}"}) response = self.client.post( endpoint, headers=headers, content=content, data=data, files=files, json=json, params=params, cookies=cookies, auth=auth or httpx.USE_CLIENT_DEFAULT, timeout=timeout, follow_redirects=follow_redirects, extensions=extensions, ) response.raise_for_status() return response
[docs] def put( self, endpoint: str, *, content: str | bytes | Iterable[bytes] | AsyncIterable[bytes] | None = None, data: Mapping[str, Any] | None = None, files: dict[str, Any] | None = None, json: JSONType | None = None, params: dict[str, Any] | None = None, headers: dict[str, str] | None = None, cookies: dict[str, str] | None = None, auth: httpx.Auth | tuple[str, str] | None = None, timeout: float | httpx.Timeout | None = None, follow_redirects: bool = True, extensions: Mapping[str, Any] | None = None, ) -> httpx.Response: """Perform a PUT request to the specific endpoint using the SOAR client.""" headers = headers or {} headers.update({"Referer": f"{self.client.base_url}/{endpoint}"}) response = self.client.put( endpoint, headers=headers, content=content, data=data, files=files, json=json, params=params, cookies=cookies, auth=auth or httpx.USE_CLIENT_DEFAULT, timeout=timeout, follow_redirects=follow_redirects, extensions=extensions, ) response.raise_for_status() return response
[docs] def delete( self, endpoint: str, *, params: dict[str, Any] | httpx.QueryParams | None = None, headers: dict[str, str] | None = None, cookies: dict[str, str] | None = None, auth: httpx.Auth | tuple[str, str] | None = None, timeout: httpx.Timeout | None = None, follow_redirects: bool = False, extensions: Mapping[str, Any] | None = None, ) -> httpx.Response: """Perform a DELETE request to the specific endpoint using the SOAR client.""" headers = headers or {} headers.update({"Referer": f"{self.client.base_url}/{endpoint}"}) response = self.client.delete( endpoint, params=params, headers=headers, cookies=cookies, auth=auth or httpx.USE_CLIENT_DEFAULT, timeout=timeout, follow_redirects=follow_redirects, extensions=extensions, ) response.raise_for_status() return response
[docs] def get_soar_base_url(self) -> str: """Get the base URL for the running SOAR system. Example: https://splunk.soar/ """ return "https://localhost:9999/"
[docs] @abstractmethod def update_client( self, soar_auth: SOARClientAuth, asset_id: str, container_id: int = 0 ) -> None: """Hook to update the SOAR API client before any actions run with the input data. An example of what this function might do is authenticate the API client. """ pass
[docs] @abstractmethod def set_summary(self, summary: SummaryType) -> None: """Set the custom summary object for the action run.""" pass
[docs] @abstractmethod def set_message(self, message: str) -> None: """Set the summary message for the action run.""" pass
[docs] @abstractmethod def get_summary(self) -> SummaryType | None: """Get the summary for the action run.""" pass
[docs] @abstractmethod def get_message(self) -> str: """Get the summary message for the action run.""" pass