Skip to content

Developer Reference

testcompose.client.base_docker_client.BaseDockerClient (ABC)

Source code in testcompose/client/base_docker_client.py
class BaseDockerClient(ABC):
    def __init__(self, client_env_param: ClientFromEnv, client_url_param: ClientFromUrl) -> None:
        super(BaseDockerClient, self).__init__()
        _client_url_param: ClientFromUrl = ClientFromUrl()
        _client_env_param: ClientFromEnv = ClientFromEnv()

        if client_env_param:
            _client_env_param = client_env_param

        if client_url_param:
            _client_url_param = client_url_param

        self.docker_client = self._init_docker_client(
            client_url_param=_client_url_param, client_env_param=_client_env_param
        )

    @property
    def docker_client(self) -> DockerClient:
        return self._docker_client

    @docker_client.setter
    def docker_client(self, client: DockerClient) -> None:
        self._docker_client: DockerClient = client

    def _init_docker_client(
        self, *, client_url_param: ClientFromUrl, client_env_param: ClientFromEnv
    ) -> DockerClient:
        _docker_client: DockerClient = self._docker_client_from_env(client_env_param)
        if client_url_param.docker_host:
            _docker_client = self._docker_client_from_url(client_url_param)

        _docker_client.ping()
        return _docker_client

    def _docker_client_from_env(self, client_env_param: ClientFromEnv) -> DockerClient:
        return docker.from_env(
            version=client_env_param.version,
            timeout=client_env_param.timeout,
            max_pool_size=client_env_param.max_pool_size,
            use_ssh_client=client_env_param.use_ssh_client,
            ssl_version=client_env_param.ssl_version,
            assert_hostname=client_env_param.assert_hostname,
            environment=client_env_param.environment,
        )

    def _docker_client_from_url(self, client_url_param: ClientFromUrl) -> DockerClient:
        return DockerClient(
            base_url=client_url_param.docker_host,
            version=client_url_param.version,
            timeout=client_url_param.timeout,
            tls=client_url_param.tls,
            user_agent=client_url_param.user_agent,
            credstor_env=client_url_param.credstor_env,
            use_ssh_client=client_url_param.use_ssh_client,
            max_pool_size=client_url_param.max_pool_size,
        )

    def registry_login(self, login_credentials: Login) -> None:
        if login_credentials.registry:
            self.docker_client.login(**login_credentials.dict())

    def pull_docker_image(self, image_name: str) -> None:
        try:
            self.docker_client.images.get(name=image_name)
        except ImageNotFound:
            self.docker_client.images.pull(repository=image_name)
        except Exception:
            logger.error(traceback.format_exc())

docker_client: DockerClient property writable

pull_docker_image(self, image_name: str) -> None

Source code in testcompose/client/base_docker_client.py
def pull_docker_image(self, image_name: str) -> None:
    try:
        self.docker_client.images.get(name=image_name)
    except ImageNotFound:
        self.docker_client.images.pull(repository=image_name)
    except Exception:
        logger.error(traceback.format_exc())

registry_login(self, login_credentials: Login) -> None

Source code in testcompose/client/base_docker_client.py
def registry_login(self, login_credentials: Login) -> None:
    if login_credentials.registry:
        self.docker_client.login(**login_credentials.dict())

testcompose.configs.parse_config.TestConfigParser

Source code in testcompose/configs/parse_config.py
class TestConfigParser:
    @classmethod
    def parse_config(cls, file_name: str) -> ContainerServices:
        """parses and verifies test yaml config file

        Args:
            file_name (str): absolute path of the config file

        Raises:
            FileNotFoundError: when config file not present
            AttributeError: when config file is empty

        Returns:
            ConfigServices: A ConfigServices object with all named services in the config
        """
        if not os.path.exists(file_name):
            raise FileNotFoundError(f"Config file {file_name} does not exist!!")

        contents: Dict[str, Any] = dict()
        with open(file_name, 'r') as fh:
            contents = yaml.safe_load(fh)

        if not contents:
            raise AttributeError("Config content can not be empty")

        services: Dict[str, ContainerService] = dict()
        for service in contents["services"]:
            services.update({service["name"]: ContainerService(**service)})

        container_services: ContainerServices = ContainerServices(services=services)
        return container_services

parse_config(file_name: str) -> ContainerServices classmethod

parses and verifies test yaml config file

Parameters:

Name Type Description Default
file_name str

absolute path of the config file

required

Exceptions:

Type Description
FileNotFoundError

when config file not present

AttributeError

when config file is empty

Returns:

Type Description
ConfigServices

A ConfigServices object with all named services in the config

Source code in testcompose/configs/parse_config.py
@classmethod
def parse_config(cls, file_name: str) -> ContainerServices:
    """parses and verifies test yaml config file

    Args:
        file_name (str): absolute path of the config file

    Raises:
        FileNotFoundError: when config file not present
        AttributeError: when config file is empty

    Returns:
        ConfigServices: A ConfigServices object with all named services in the config
    """
    if not os.path.exists(file_name):
        raise FileNotFoundError(f"Config file {file_name} does not exist!!")

    contents: Dict[str, Any] = dict()
    with open(file_name, 'r') as fh:
        contents = yaml.safe_load(fh)

    if not contents:
        raise AttributeError("Config content can not be empty")

    services: Dict[str, ContainerService] = dict()
    for service in contents["services"]:
        services.update({service["name"]: ContainerService(**service)})

    container_services: ContainerServices = ContainerServices(services=services)
    return container_services

testcompose.configs.service_config.Config

This class consumes the model created from a config file. This is an important class that sets the precedence of how the different containers are to be started and stopped. Usually, the precedence are set correctly if the depends_on parameter of the config is set. Cyclic dependency will fail the test before it starts.

Parameters:

Name Type Description Default
test_services ConfigServices

model resulting from a parsed configuration file.

required
Source code in testcompose/configs/service_config.py
class Config:
    """This class consumes the model created from a config file.
    This is an important class that sets the precedence of how the
    different containers are to be started and stopped. Usually, the
    precedence are set correctly if the `depends_on` parameter of the
    config is set. Cyclic dependency will fail the test before it starts.

    Args:
        test_services (ConfigServices): model resulting from a parsed configuration file.
    """

    def __init__(self, test_services: ContainerServices) -> None:
        self._rank_test_services(test_services)

    @property
    def ranked_config_services(self) -> RankedContainerServices:
        """Object containing the ordered services from the config

        Returns:
            RankedServices: ranked container services
        """
        return self._ranked_it_services

    @ranked_config_services.setter
    def ranked_config_services(self, ranked_services: RankedContainerServices) -> None:
        self._ranked_it_services: RankedContainerServices = ranked_services

    def _rank_test_services(self, test_services: ContainerServices) -> None:
        """
        Args:
            test_services (ConfigServices): model resulting from a parsed configuration file.

        Raises:
            ValueError: raised if test_services is `null`
            AttributeError: raised if no concreate networking is provided
        """
        if not test_services:
            logger.error("Config content can not be Null")
            raise ValueError

        if not test_services.services:
            logger.error("No service was found in the provided config")
            raise ValueError

        _processed_containers: Dict[str, int] = self._compute_container_ranks(
            ranked_services=dict(),
            config_services=test_services,
        )

        _processed_containers_reversed: Dict[int, str] = {
            rank: service for service, rank in _processed_containers.items()
        }
        self.ranked_config_services = RankedContainerServices(ranked_services=_processed_containers_reversed)

    def _compute_container_ranks(
        self,
        *,
        ranked_services: Dict[str, int],
        config_services: ContainerServices,
    ) -> Dict[str, int]:
        """The main method that computes the ranking of the services specified
        in the config.

        Args:
            ranked_services (Dict[str, int]): dict container service name and their assigned ranks
            config_services (ConfigServices): config services generated from the supplied configuration file

        Raises:
            AttributeError: raised to prevent empty configuration properties to be passed to this function
            ValueError: rasied when cyclic dependency is detected

        Returns:
            Dict[str, int]: A list of ranked service models.
        """
        _ranked_services: Dict[str, int] = deepcopy(ranked_services)
        if not config_services:
            raise AttributeError("A valid config for test services must be provided")

        rank: int = len(ranked_services.keys())
        if len(config_services.services.keys()) == len(ranked_services.keys()):
            return ranked_services
        else:
            services: Dict[str, ContainerService] = {
                x: y for x, y in config_services.services.items() if x not in _ranked_services
            }
            for service_name, service in services.items():
                if not service.depends_on:
                    _ranked_services.update({service_name: rank})
                    rank += 1
                else:
                    if set(service.depends_on).issubset(
                        _ranked_services.keys()
                    ) and not self._check_cyclic_dependency(
                        [config_services.services[x] for x in _ranked_services], service_name
                    ):
                        _ranked_services.update({service_name: rank})
                        rank += 1
                    elif not set(service.depends_on).issubset(list(config_services.services.keys())):
                        raise AttributeError(
                            f"Invalid service name or dependencies detected: {service_name} <=> {set(service.depends_on)}"
                        )
            return self._compute_container_ranks(
                ranked_services=_ranked_services, config_services=config_services
            )

    @staticmethod
    def _check_cyclic_dependency(
        processed_services: List[ContainerService], dependent_service_name: str
    ) -> bool:
        for service in processed_services:
            if set([dependent_service_name]).issubset(service.depends_on):
                return True
        return False

ranked_config_services: RankedContainerServices property writable

Object containing the ordered services from the config

Returns:

Type Description
RankedServices

ranked container services

testcompose.configs.generate_template_config.GenerateConfigTemplate

Source code in testcompose/configs/generate_template_config.py
class GenerateConfigTemplate:
    def _remove_keys(self, keys: List[str], template_dict: Dict[str, Any]):
        _template = deepcopy(template_dict)
        for k in keys:
            _template.pop(k)
        return _template

    def _app_db_template(self) -> Dict[str, List[Dict[str, Any]]]:
        _app = self._remove_keys(["http_wait_parameters"], simple_app_or_db())
        _app["name"] = "app"
        _app["environment"] = {
            "DB_URL": "${db.db_user}:${db.db_password}@${db.container_hostname}:5432/${db.db_name}"
        }
        _app["depends_on"] = ["db"]
        _db = self._remove_keys(["http_wait_parameters", "depends_on"], simple_app_or_db())
        _db["name"] = "db"
        _db["environment"] = {"DB_USER": "user", "DB_PASSWORD": "very-secret", "DB_NAME": "some_db_name"}
        return {"services": [_app, _db]}

    def app_template(self) -> Dict[str, Any]:
        _template = self._remove_keys(["depends_on"], simple_app_or_db())
        _template['name'] = 'app'
        return {"services": [_template]}

    def db_template(self) -> Dict[str, Any]:
        _template = self._remove_keys(["http_wait_parameters", "depends_on"], simple_app_or_db())
        _template['name'] = 'db'
        return {"services": [_template]}

    def broker_template(self) -> Dict[str, Any]:
        return simple_broker()

    def app_db_template(self) -> Dict[str, Any]:
        return self._app_db_template()

    def app_broker_db_template(self) -> Dict[str, Any]:
        _services: List[Dict[str, Any]] = list()
        for x in self._app_db_template()['services']:
            app = deepcopy(x)
            if x.get('name') == 'app':
                app["depends_on"].append("broker")
            _services.append(app)

        _broker_template: Dict[str, Any] = simple_broker()
        _services.extend(_broker_template['services'])
        _broker_template['services'] = _services
        return _broker_template

    def app_broker_template(self) -> Dict[str, Any]:
        _app_template: Dict[str, Any] = simple_app_or_db()
        _app_template['depends_on'] = ['broker']
        _app_template['name'] = 'app'
        _broker_template: Dict[str, Any] = simple_broker()
        _services: List[Dict[str, Any]] = _broker_template['services']
        _services.append(_app_template)
        _broker_template['services'] = _services
        return _broker_template

    def broker_db_template(self) -> Dict[str, Any]:
        _db_template: Dict[str, Any] = simple_app_or_db()
        _db_template['name'] = 'db'
        _broker_template: Dict[str, Any] = simple_broker()
        _services: List[Dict[str, Any]] = _broker_template['services']
        _services.append(_db_template)
        _broker_template['services'] = _services
        return _broker_template

app_broker_db_template(self) -> Dict[str, Any]

Source code in testcompose/configs/generate_template_config.py
def app_broker_db_template(self) -> Dict[str, Any]:
    _services: List[Dict[str, Any]] = list()
    for x in self._app_db_template()['services']:
        app = deepcopy(x)
        if x.get('name') == 'app':
            app["depends_on"].append("broker")
        _services.append(app)

    _broker_template: Dict[str, Any] = simple_broker()
    _services.extend(_broker_template['services'])
    _broker_template['services'] = _services
    return _broker_template

app_broker_template(self) -> Dict[str, Any]

Source code in testcompose/configs/generate_template_config.py
def app_broker_template(self) -> Dict[str, Any]:
    _app_template: Dict[str, Any] = simple_app_or_db()
    _app_template['depends_on'] = ['broker']
    _app_template['name'] = 'app'
    _broker_template: Dict[str, Any] = simple_broker()
    _services: List[Dict[str, Any]] = _broker_template['services']
    _services.append(_app_template)
    _broker_template['services'] = _services
    return _broker_template

app_db_template(self) -> Dict[str, Any]

Source code in testcompose/configs/generate_template_config.py
def app_db_template(self) -> Dict[str, Any]:
    return self._app_db_template()

app_template(self) -> Dict[str, Any]

Source code in testcompose/configs/generate_template_config.py
def app_template(self) -> Dict[str, Any]:
    _template = self._remove_keys(["depends_on"], simple_app_or_db())
    _template['name'] = 'app'
    return {"services": [_template]}

broker_db_template(self) -> Dict[str, Any]

Source code in testcompose/configs/generate_template_config.py
def broker_db_template(self) -> Dict[str, Any]:
    _db_template: Dict[str, Any] = simple_app_or_db()
    _db_template['name'] = 'db'
    _broker_template: Dict[str, Any] = simple_broker()
    _services: List[Dict[str, Any]] = _broker_template['services']
    _services.append(_db_template)
    _broker_template['services'] = _services
    return _broker_template

broker_template(self) -> Dict[str, Any]

Source code in testcompose/configs/generate_template_config.py
def broker_template(self) -> Dict[str, Any]:
    return simple_broker()

db_template(self) -> Dict[str, Any]

Source code in testcompose/configs/generate_template_config.py
def db_template(self) -> Dict[str, Any]:
    _template = self._remove_keys(["http_wait_parameters", "depends_on"], simple_app_or_db())
    _template['name'] = 'db'
    return {"services": [_template]}

testcompose.containers.base_container.BaseContainer

Source code in testcompose/containers/base_container.py
class BaseContainer:
    def __init__(self) -> None:
        self._image_pull_policy: str = 'ALWAYS_PULL'

    @property
    def image(self) -> str:
        return self._image

    @image.setter
    def image(self, name: str) -> None:
        if not name:
            raise ValueError("A valid Image entity must be provided")
        self._image: str = name

    @property
    def command(self) -> str:
        return self._command

    @command.setter
    def command(self, command: str) -> None:
        self._command: str = command

    @property
    def entry_point(self) -> str:
        return self._entry_point

    @entry_point.setter
    def entry_point(self, entry_point: str) -> None:
        self._entry_point: str = entry_point

    @property
    def host_name(self) -> str:
        return self._host_name

    @host_name.setter
    def host_name(self, host_name: str) -> None:
        self._host_name: str = host_name

    @property
    def network(self) -> str:
        return self._network

    @network.setter
    def network(self, network: str) -> None:
        self._network: str = network

    @property
    def http_waiter(self) -> ContainerHttpWaitParameter:
        return self._http_waiter

    @http_waiter.setter
    def http_waiter(self, http_waiter: ContainerHttpWaitParameter) -> None:
        self._http_waiter: ContainerHttpWaitParameter = http_waiter

    @property
    def log_waiter(self) -> ContainerLogWaitParameter:
        return self._log_waiter

    @log_waiter.setter
    def log_waiter(self, log_waiter: ContainerLogWaitParameter) -> None:
        self._log_waiter: ContainerLogWaitParameter = log_waiter

    @property
    def ports(self) -> Dict[int, Any]:
        return self._ports

    @ports.setter
    def ports(self, ports: List[str]) -> None:
        self._ports: Dict[int, Any] = self._exposed_ports(ports)

    @property
    def container_environment_variables(self) -> Dict[str, Any]:
        return self._container_environment_variables

    @container_environment_variables.setter
    def container_environment_variables(self, env: Dict[str, Any]) -> None:
        self._container_environment_variables: Dict[str, Any] = deepcopy(env)

    @property
    def volumes(self) -> Dict[str, Dict[str, str]]:
        return self._volumes

    @volumes.setter
    def volumes(self, volumes: List[ContainerVolumeMap]) -> None:
        self._volumes: Dict[str, Dict[str, str]] = self._container_volumes(volumes)

    def with_service(
        self,
        service: ContainerService,
        processed_containers_services: Dict[str, RunningContainer],
        network: str,
    ) -> 'BaseContainer':
        """
        The initialization method that converts a config
        service into a Generic Container. It leverages other
        internal _with methods to assign the complete container
        properties.

        Args:
            service (Service): a config service
            processed_containers_services (Dict[str, Any]): a dict of a service that
                had already been initiated and the container is running.
            network (str): the test network name, to attach all containers to.

        Returns:
            BaseServiceContainer
        """
        self.image = service.image
        self.command = service.command
        (
            substituted_env_variables,
            modified_exposed_ports,
        ) = ContainerUtils.replace_container_config_placeholders(
            service_env_variables=service.environment,
            running_containers=processed_containers_services,
            service_name=service.name,
            exposed_ports=service.exposed_ports,
        )
        self.container_environment_variables = substituted_env_variables
        self.ports = modified_exposed_ports
        self.http_waiter = service.http_wait_parameters  # type: ignore
        self.volumes = service.volumes
        self.entry_point = service.entrypoint  # type: ignore
        self.log_waiter = service.log_wait_parameters  # type: ignore
        self.host_name = service.name
        self.network = network
        return self

    def _exposed_ports(self, ports: Optional[List[str]]) -> Dict[int, Any]:
        """List of exposed port to be assigned random port
        numbers on the host. Random ports are exposed to the
        host. A fixed port can be assigned on the host by providing
        the port in the format **[host_port:container_port]**

        Args:
            ports (Optional[List[str]]): list of container exposed ports
        """
        exposed_ports: Dict[int, Any] = dict()
        if ports:
            for port in ports:
                _ports: List[str] = self._generate_exposed_ports(re.sub(r"(\s)", "", port))
                for _port in _ports:
                    _split_port: List[str] = _port.split(":")
                    if len(_split_port) == 2:
                        exposed_ports[int(_split_port[1])] = int(_split_port[0])
                    else:
                        exposed_ports[int(_port)] = None
        return exposed_ports

    def _generate_exposed_ports(self, port: str) -> List[str]:
        matches: List[Any] = re.findall(r"\-", port)
        unprocessed_ports: List[str] = list()
        if len(matches) == 0:
            unprocessed_ports.append(port)
        elif len(matches) == 1:
            start, end = str(port).split("-")
            if int(end) <= int(start):
                raise AttributeError(
                    f"Start exposed port {start} must be less than end exposed port {end} for port ranges!"
                )
            unprocessed_ports = [str(x) for x in range(int(start), int(end) + 1)]
        else:
            raise AttributeError("Allowed exposed port format is host:container or port1 - port2")
        return unprocessed_ports

    def _container_volumes(
        self, volumes: Optional[List[ContainerVolumeMap]] = None
    ) -> Dict[str, Dict[str, str]]:
        """A list of volume mappings to be mounted in the container.

            VolumeMapping:
                host: host volume path or a docker volume name
                container: path to mount the host volume in the container
                mode: volume mode [ro|rw]
                source: source of the volume [filesystem|dockervolume]
                        filesystem: a file or directory to be mounted
                        dockervolume: a docker volume (existing or to be created)

        Args:
            volumes (Optional[List[VolumeMapping]]): Optional list of volumes to mount on the container
        """
        mapped_volumes: Dict[str, Dict[str, str]] = dict()
        if volumes:
            for vol in volumes:
                host_bind: Optional[str] = None
                if vol.source == VolumeSourceTypes.DOCKER_VOLUME_SOURCE:
                    host_bind = vol.host
                elif vol.source == VolumeSourceTypes.FILESYSTEM_SOURCE:
                    host_bind = str(pathlib.Path(vol.host).absolute())
                if not host_bind:
                    raise ValueError("Volume source can only be one of local|docker")
                mapped_volumes[host_bind] = {"bind": vol.container, "mode": vol.mode}
        return mapped_volumes

    @abstractmethod
    def start(self) -> None:
        raise NotImplementedError

    @abstractmethod
    def stop(self, force=True, delete_volume=True) -> None:
        raise NotImplementedError

command: str property writable

container_environment_variables: Dict[str, Any] property writable

entry_point: str property writable

host_name: str property writable

http_waiter: ContainerHttpWaitParameter property writable

image: str property writable

log_waiter: ContainerLogWaitParameter property writable

network: str property writable

ports: Dict[int, Any] property writable

volumes: Dict[str, Dict[str, str]] property writable

start(self) -> None

Source code in testcompose/containers/base_container.py
@abstractmethod
def start(self) -> None:
    raise NotImplementedError

stop(self, force = True, delete_volume = True) -> None

Source code in testcompose/containers/base_container.py
@abstractmethod
def stop(self, force=True, delete_volume=True) -> None:
    raise NotImplementedError

with_service(self, service: ContainerService, processed_containers_services: Dict[str, testcompose.models.container.running_container.RunningContainer], network: str) -> BaseContainer

The initialization method that converts a config service into a Generic Container. It leverages other internal _with methods to assign the complete container properties.

Parameters:

Name Type Description Default
service Service

a config service

required
processed_containers_services Dict[str, Any]

a dict of a service that had already been initiated and the container is running.

required
network str

the test network name, to attach all containers to.

required

Returns:

Type Description
BaseContainer

BaseServiceContainer

Source code in testcompose/containers/base_container.py
def with_service(
    self,
    service: ContainerService,
    processed_containers_services: Dict[str, RunningContainer],
    network: str,
) -> 'BaseContainer':
    """
    The initialization method that converts a config
    service into a Generic Container. It leverages other
    internal _with methods to assign the complete container
    properties.

    Args:
        service (Service): a config service
        processed_containers_services (Dict[str, Any]): a dict of a service that
            had already been initiated and the container is running.
        network (str): the test network name, to attach all containers to.

    Returns:
        BaseServiceContainer
    """
    self.image = service.image
    self.command = service.command
    (
        substituted_env_variables,
        modified_exposed_ports,
    ) = ContainerUtils.replace_container_config_placeholders(
        service_env_variables=service.environment,
        running_containers=processed_containers_services,
        service_name=service.name,
        exposed_ports=service.exposed_ports,
    )
    self.container_environment_variables = substituted_env_variables
    self.ports = modified_exposed_ports
    self.http_waiter = service.http_wait_parameters  # type: ignore
    self.volumes = service.volumes
    self.entry_point = service.entrypoint  # type: ignore
    self.log_waiter = service.log_wait_parameters  # type: ignore
    self.host_name = service.name
    self.network = network
    return self

testcompose.containers.container_network.ContainerNetwork

Network management for running container.This class ensures all containers in the same test belong to the same network. Contians utility to create and cleanup network.

Parameters:

Name Type Description Default
docker_client DockerClient

Docker client

required
network_name str

Name of test network

required
Source code in testcompose/containers/container_network.py
class ContainerNetwork:
    """Network management for running container.This class ensures
    all containers in the same test belong to the same network.
    Contians utility to create and cleanup network.

    Args:
        docker_client (DockerClient): Docker client
        network_name (str): Name of test network
    """

    def __init__(
        self, docker_client: DockerClient, network_name: str, labels: Dict[str, str] = dict()
    ) -> None:
        self._docker_client: DockerClient = docker_client
        self._assign_group_network(network_name, labels=labels)

    @property
    def network(self) -> Network:
        """Network Object

        Returns:
            docker.models.networks.Network: container network object
        """
        return self._container_network

    @network.setter
    def network(self, network: Network) -> None:
        self._container_network: Network = network

    @property
    def name(self) -> Optional[str]:
        return self.network.name

    @property
    def network_id(self) -> Optional[str]:
        return self.network.short_id

    def remove_network(self) -> None:
        try:
            if self.network.name not in ['bridge', 'none', 'host']:
                self.network.remove()
        except Exception as exc:
            print(f"Test network could not be removed. Still dangling ... {exc}")

    def _assign_group_network(
        self,
        network_name: str,
        labels: Dict[str, str] = dict(),
        driver: str = DefaultNeworkDrivers.DEFAULT_BRIDGE_NETWORK,
    ) -> None:
        try:
            self.network = (self._docker_client.networks.list(names=[network_name]))[0]  # type: ignore
        except Exception:
            self.network = self._docker_client.networks.create(
                name=network_name,
                driver=driver,
                check_duplicate=True,
                internal=False,
                labels=labels or None,
                enable_ipv6=False,
                attachable=True,
                scope='local',
            )  # type: ignore

name: Optional[str] property readonly

network: Network property writable

Network Object

Returns:

Type Description
docker.models.networks.Network

container network object

network_id: Optional[str] property readonly

remove_network(self) -> None

Source code in testcompose/containers/container_network.py
def remove_network(self) -> None:
    try:
        if self.network.name not in ['bridge', 'none', 'host']:
            self.network.remove()
    except Exception as exc:
        print(f"Test network could not be removed. Still dangling ... {exc}")

testcompose.containers.container_utils.ContainerUtils

Source code in testcompose/containers/container_utils.py
class ContainerUtils:
    @staticmethod
    def replace_container_config_placeholders(
        service_env_variables: Dict[str, Any],
        running_containers: Dict[str, RunningContainer],
        service_name: str,
        exposed_ports: List[str],
    ) -> Tuple[Dict[str, Any], List[str]]:
        """Utility method to replace placeholders in the service containers.
        Placeholders are usually of the form *${container_name.containerenv_variable}*.

        Args:
            service_env_variables (Dict[str, Any]): Dict of config environment variables
            running_containers Dict[str, RunningContainer]: Running container object
            service_name (str): service name as specified in the config
            exposed_ports (List[str]): container exposed ports

        Raises:
            ValueError: when a placeholder variable is not of the form service_name.variable_name
            AttributeError: When a service name could not be found in the list of services obtained from the
                            provided config file.

        Returns:
            Tuple[Dict[str, Any], List[str]]: A tuple of `env_config` and `exposed_ports`
        """
        pattern: str = "\\$\\{([^}]*)}"
        substituted_env_variables: Dict[str, Any] = copy(service_env_variables)
        modified_exposed_ports: List[str] = deepcopy(exposed_ports)
        cmpl: Any = re.compile(pattern=pattern).findall
        for k, v in service_env_variables.items():
            if isinstance(v, str):
                replaced_variable: str = v
                for occurence in cmpl(v):
                    if len(str(occurence).split(".")) != 2:
                        raise ValueError
                    container_name, variable_name = str(occurence).split(".")
                    value = None
                    value, _exposed_ports = ContainerUtils._external_ports_variables(
                        running_containers,
                        service_name,
                        container_name,
                        variable_name,
                        modified_exposed_ports,
                    )
                    if _exposed_ports:
                        modified_exposed_ports = deepcopy(_exposed_ports)
                    replaced_variable = replaced_variable.replace(f"${{{occurence}}}", str(value))
                substituted_env_variables[k] = replaced_variable
        return substituted_env_variables, modified_exposed_ports

    @staticmethod
    def _get_free_host_port() -> str:
        """Get a free random port number from the container host

        Returns:
            str: port number
        """
        _socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        _socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
        _socket.settimeout(2)
        _socket.bind(("", 0))
        _, port = _socket.getsockname()
        _socket.close()
        return port

    @staticmethod
    def _external_ports_variables(
        running_containers: Dict[str, RunningContainer],
        service_name: str,
        container_name: str,
        variable_name: str,
        exposed_ports: List[str] = list(),
    ) -> Tuple[Optional[str], List[str]]:
        value: Optional[str] = None
        _exposed_ports: List[str] = list()
        if container_name.lower() == SupportedPlaceholders.SELF_HOST or variable_name.lower() in [
            SupportedPlaceholders.CONTAINER_HOSTNAME,
            SupportedPlaceholders.EXTERNAL_PORT,
            SupportedPlaceholders.CONTAINER_HOST_ADDRESS,
        ]:
            if (
                container_name.lower() == SupportedPlaceholders.SELF_HOST
                and variable_name.lower() == SupportedPlaceholders.CONTAINER_HOSTNAME
            ):
                value = service_name
            elif (
                container_name.lower() != SupportedPlaceholders.SELF_HOST
                and variable_name.lower() == SupportedPlaceholders.CONTAINER_HOSTNAME
            ):
                value = container_name
            else:
                if variable_name.lower().startswith(SupportedPlaceholders.EXTERNAL_PORT):
                    value, _exposed_ports = ContainerUtils._external_port_variables(
                        variable_name, exposed_ports
                    )
                elif variable_name.lower() == SupportedPlaceholders.CONTAINER_HOST_ADDRESS:
                    value = socket.gethostbyname(socket.gethostname())
        else:
            value = running_containers[
                f"{container_name.lower()}"
            ].generic_container.container_environment_variables[f"{variable_name.upper()}"]

        return value, _exposed_ports

    @staticmethod
    def _external_port_variables(variable_name: str, exposed_ports: List[str]) -> Tuple[str, List[str]]:
        _exposed_ports: List[str] = deepcopy(exposed_ports)
        container_port: str = re.sub(SupportedPlaceholders.EXTERNAL_PORT + "_", "", variable_name)
        host_port: str = ContainerUtils._get_free_host_port()
        if container_port and container_port not in exposed_ports:
            raise AttributeError(
                f"self.hostport_{container_port} must be a valid supplied exposed_ports value!"
            )
        _exposed_ports.remove(container_port)
        _exposed_ports.append(f"{host_port}:{container_port}")
        value: str = str(host_port)
        return value, _exposed_ports

replace_container_config_placeholders(service_env_variables: Dict[str, Any], running_containers: Dict[str, testcompose.models.container.running_container.RunningContainer], service_name: str, exposed_ports: List[str]) -> Tuple[Dict[str, Any], List[str]] staticmethod

Utility method to replace placeholders in the service containers. Placeholders are usually of the form ${container_name.containerenv_variable}.

Parameters:

Name Type Description Default
service_env_variables Dict[str, Any]

Dict of config environment variables

required
running_containers Dict[str, RunningContainer]

Running container object

required
service_name str

service name as specified in the config

required
exposed_ports List[str]

container exposed ports

required

Exceptions:

Type Description
ValueError

when a placeholder variable is not of the form service_name.variable_name

AttributeError

When a service name could not be found in the list of services obtained from the provided config file.

Returns:

Type Description
Tuple[Dict[str, Any], List[str]]

A tuple of env_config and exposed_ports

Source code in testcompose/containers/container_utils.py
@staticmethod
def replace_container_config_placeholders(
    service_env_variables: Dict[str, Any],
    running_containers: Dict[str, RunningContainer],
    service_name: str,
    exposed_ports: List[str],
) -> Tuple[Dict[str, Any], List[str]]:
    """Utility method to replace placeholders in the service containers.
    Placeholders are usually of the form *${container_name.containerenv_variable}*.

    Args:
        service_env_variables (Dict[str, Any]): Dict of config environment variables
        running_containers Dict[str, RunningContainer]: Running container object
        service_name (str): service name as specified in the config
        exposed_ports (List[str]): container exposed ports

    Raises:
        ValueError: when a placeholder variable is not of the form service_name.variable_name
        AttributeError: When a service name could not be found in the list of services obtained from the
                        provided config file.

    Returns:
        Tuple[Dict[str, Any], List[str]]: A tuple of `env_config` and `exposed_ports`
    """
    pattern: str = "\\$\\{([^}]*)}"
    substituted_env_variables: Dict[str, Any] = copy(service_env_variables)
    modified_exposed_ports: List[str] = deepcopy(exposed_ports)
    cmpl: Any = re.compile(pattern=pattern).findall
    for k, v in service_env_variables.items():
        if isinstance(v, str):
            replaced_variable: str = v
            for occurence in cmpl(v):
                if len(str(occurence).split(".")) != 2:
                    raise ValueError
                container_name, variable_name = str(occurence).split(".")
                value = None
                value, _exposed_ports = ContainerUtils._external_ports_variables(
                    running_containers,
                    service_name,
                    container_name,
                    variable_name,
                    modified_exposed_ports,
                )
                if _exposed_ports:
                    modified_exposed_ports = deepcopy(_exposed_ports)
                replaced_variable = replaced_variable.replace(f"${{{occurence}}}", str(value))
            substituted_env_variables[k] = replaced_variable
    return substituted_env_variables, modified_exposed_ports

testcompose.containers.generic_container.GenericContainer (BaseContainer)

Source code in testcompose/containers/generic_container.py
class GenericContainer(BaseContainer):
    def __init__(self) -> None:
        super().__init__()

    @property
    def container_label(self) -> str:
        return self._container_label

    @container_label.setter
    def container_label(self, label: str) -> None:
        self._container_label = label

    @property
    def container_network(self) -> ContainerNetwork:
        return self._container_network

    @container_network.setter
    def container_network(self, network: ContainerNetwork) -> None:
        self._container_network: ContainerNetwork = network

    @property
    def container(self) -> Container:
        return self._container

    @container.setter
    def container(self, container: Container) -> None:
        self._container: Container = container

    @property
    def container_attr(self) -> RunningContainerAttributes:
        """Running container attributes

        Returns:
            RunningContainerAttributes: Container attribute object
        """
        return self._container_attr

    @container_attr.setter
    def container_attr(self, atrr: Dict[str, Any]) -> None:
        """Running container attributes. Execute reload() to refresh this
        property.

        Args:
            atrr (RunningContainerAttributes): container attributes
        """
        self._container_attr: RunningContainerAttributes = RunningContainerAttributes(**atrr)

    def start(self, docker_client: DockerClient) -> Container:
        """Start a container"""
        if not docker_client.ping():
            raise RuntimeError("Docker Client not Running. Please check your docker settings and try again")

        return docker_client.containers.run(
            image=self.image,
            command=self.command,
            detach=True,
            environment=self.container_environment_variables,
            ports=self.ports,
            volumes=self.volumes,
            entrypoint=self.entry_point,
            auto_remove=True,
            remove=True,
            network=self.network,
            hostname=self.host_name,
            labels=[self.container_label],
        )  # type: ignore

    def check_container_health(self, docker_client: DockerClient, timeout: int = 120) -> None:
        start_time: datetime = datetime.now()
        while (datetime.now() - start_time).total_seconds() < timeout:
            logger.info(
                f"Waiting for containe:{self.container.name} status to \
                change from {self.container.status} to {PossibleContainerStates.RUNNING}"
            )
            self.container.reload()
            if self.container.status == PossibleContainerStates.RUNNING:
                logger.info(f"Container:{self.container.name} status changed to {self.container.status}")
                break
            sleep(2)

        self.reload(docker_client, self.get_container_id())
        if self.container.status != PossibleContainerStates.RUNNING:
            for line in self.container.logs(stream=True):
                logger.debug(line.decode())
            raise RuntimeError(f"Container is in an unwanted state {self.container.status}")

        self.reload(docker_client, self.get_container_id())

        LogWaiter.search_container_logs(docker_client, self.container, self.log_waiter)

        self.reload(docker_client, self.get_container_id())

        if self.http_waiter:
            mapped_http_port: Dict[str, str] = dict()
            mapped_http_port[str(self.http_waiter.http_port)] = self.get_exposed_port(  # type: ignore
                str(self.http_waiter.http_port)
            )
            EndpointWaiters.wait_for_http(
                docker_client,
                self.get_container_id(),  # type: ignore
                self.http_waiter,
                mapped_http_port,  # type: ignore
            )

    def stop(self, force=True, delete_volume=True) -> None:
        """Stop a running container
        Args:
            force (bool, optional): [description]. Defaults to True.
            delete_volume (bool, optional): [description]. Defaults to True.
        """
        try:
            if self.container:
                self.container.remove(v=delete_volume, force=force)
        except APIError as exc:
            logger.error(exc)

    def reload(self, docker_client, container_id) -> None:
        """Reload the attributes of a running container"""
        if is_container_still_running(docker_client, container_id):
            self.container.reload()
            self.container_attr = self.container.attrs  # type: ignore

    def get_exposed_port(self, port: str) -> Optional[str]:
        """Get host port bound to the container exposed port

        Args:
            port (str): container exposed port

        Returns:
            str: Host port bound to the container exposed port
        """
        if not port:
            return None
        return self._get_mapped_container_ports([port])[port]

    def _get_mapped_container_ports(self, exposed_ports: List[str]) -> Dict[str, str]:
        """Host port bound to the container returned as a k/v of the
        container exposed port as key and the host bound port as the value.

        Args:
            ports (List[str]): List of container exposed port to be mapped to host port

        Returns:
            Dict[str, str]: Mapped container-host ports.
        """
        mapped_ports: Dict[str, str] = dict()
        ports: Dict[str, Any] = self.container_attr.NetworkSettings.Ports
        for port in ports:
            container_port: str = re.sub("[^0-9]", "", port)
            if container_port in exposed_ports and ports[port] and isinstance(ports[port], list):
                host_ports: ContainerMappedPorts = ContainerMappedPorts(**(ports[port][0]))
                mapped_ports.update({container_port: host_ports.HostPort})
        return mapped_ports

    def get_container_id(self) -> Optional[str]:
        """Container id

        Returns:
            Optional[str]: Container Id
        """
        if self.container:
            return self.container.id
        return None

    def get_container_host_ip(self) -> str:
        """Container Host IP address

        Returns:
            str: container host IP Address
        """
        return socket.gethostbyname(socket.gethostname())

    def exe_command(self, command: Union[str, List[str]]) -> Tuple[int, ByteString]:
        """Execute a command inside a container after it has started running.

        Args:
            command (Union[str, List[str]]): command to run in the container

        Raises:
            RuntimeError: when the container object is not set

        Returns:
            Tuple[int, ByteString]: A tuple of (exit_code, output)
        """
        if not self.container:
            raise RuntimeError("Container must already be running to exec a command")
        return self.container.exec_run(cmd=command)

container: Container property writable

container_attr: RunningContainerAttributes property writable

Running container attributes

Returns:

Type Description
RunningContainerAttributes

Container attribute object

container_label: str property writable

container_network: ContainerNetwork property writable

check_container_health(self, docker_client: DockerClient, timeout: int = 120) -> None

Source code in testcompose/containers/generic_container.py
def check_container_health(self, docker_client: DockerClient, timeout: int = 120) -> None:
    start_time: datetime = datetime.now()
    while (datetime.now() - start_time).total_seconds() < timeout:
        logger.info(
            f"Waiting for containe:{self.container.name} status to \
            change from {self.container.status} to {PossibleContainerStates.RUNNING}"
        )
        self.container.reload()
        if self.container.status == PossibleContainerStates.RUNNING:
            logger.info(f"Container:{self.container.name} status changed to {self.container.status}")
            break
        sleep(2)

    self.reload(docker_client, self.get_container_id())
    if self.container.status != PossibleContainerStates.RUNNING:
        for line in self.container.logs(stream=True):
            logger.debug(line.decode())
        raise RuntimeError(f"Container is in an unwanted state {self.container.status}")

    self.reload(docker_client, self.get_container_id())

    LogWaiter.search_container_logs(docker_client, self.container, self.log_waiter)

    self.reload(docker_client, self.get_container_id())

    if self.http_waiter:
        mapped_http_port: Dict[str, str] = dict()
        mapped_http_port[str(self.http_waiter.http_port)] = self.get_exposed_port(  # type: ignore
            str(self.http_waiter.http_port)
        )
        EndpointWaiters.wait_for_http(
            docker_client,
            self.get_container_id(),  # type: ignore
            self.http_waiter,
            mapped_http_port,  # type: ignore
        )

exe_command(self, command: Union[str, List[str]]) -> Tuple[int, ByteString]

Execute a command inside a container after it has started running.

Parameters:

Name Type Description Default
command Union[str, List[str]]

command to run in the container

required

Exceptions:

Type Description
RuntimeError

when the container object is not set

Returns:

Type Description
Tuple[int, ByteString]

A tuple of (exit_code, output)

Source code in testcompose/containers/generic_container.py
def exe_command(self, command: Union[str, List[str]]) -> Tuple[int, ByteString]:
    """Execute a command inside a container after it has started running.

    Args:
        command (Union[str, List[str]]): command to run in the container

    Raises:
        RuntimeError: when the container object is not set

    Returns:
        Tuple[int, ByteString]: A tuple of (exit_code, output)
    """
    if not self.container:
        raise RuntimeError("Container must already be running to exec a command")
    return self.container.exec_run(cmd=command)

get_container_host_ip(self) -> str

Container Host IP address

Returns:

Type Description
str

container host IP Address

Source code in testcompose/containers/generic_container.py
def get_container_host_ip(self) -> str:
    """Container Host IP address

    Returns:
        str: container host IP Address
    """
    return socket.gethostbyname(socket.gethostname())

get_container_id(self) -> Optional[str]

Container id

Returns:

Type Description
Optional[str]

Container Id

Source code in testcompose/containers/generic_container.py
def get_container_id(self) -> Optional[str]:
    """Container id

    Returns:
        Optional[str]: Container Id
    """
    if self.container:
        return self.container.id
    return None

get_exposed_port(self, port: str) -> Optional[str]

Get host port bound to the container exposed port

Parameters:

Name Type Description Default
port str

container exposed port

required

Returns:

Type Description
str

Host port bound to the container exposed port

Source code in testcompose/containers/generic_container.py
def get_exposed_port(self, port: str) -> Optional[str]:
    """Get host port bound to the container exposed port

    Args:
        port (str): container exposed port

    Returns:
        str: Host port bound to the container exposed port
    """
    if not port:
        return None
    return self._get_mapped_container_ports([port])[port]

reload(self, docker_client, container_id) -> None

Reload the attributes of a running container

Source code in testcompose/containers/generic_container.py
def reload(self, docker_client, container_id) -> None:
    """Reload the attributes of a running container"""
    if is_container_still_running(docker_client, container_id):
        self.container.reload()
        self.container_attr = self.container.attrs  # type: ignore

start(self, docker_client: DockerClient) -> Container

Start a container

Source code in testcompose/containers/generic_container.py
def start(self, docker_client: DockerClient) -> Container:
    """Start a container"""
    if not docker_client.ping():
        raise RuntimeError("Docker Client not Running. Please check your docker settings and try again")

    return docker_client.containers.run(
        image=self.image,
        command=self.command,
        detach=True,
        environment=self.container_environment_variables,
        ports=self.ports,
        volumes=self.volumes,
        entrypoint=self.entry_point,
        auto_remove=True,
        remove=True,
        network=self.network,
        hostname=self.host_name,
        labels=[self.container_label],
    )  # type: ignore

stop(self, force = True, delete_volume = True) -> None

Stop a running container

Parameters:

Name Type Description Default
force bool

[description]. Defaults to True.

True
delete_volume bool

[description]. Defaults to True.

True
Source code in testcompose/containers/generic_container.py
def stop(self, force=True, delete_volume=True) -> None:
    """Stop a running container
    Args:
        force (bool, optional): [description]. Defaults to True.
        delete_volume (bool, optional): [description]. Defaults to True.
    """
    try:
        if self.container:
            self.container.remove(v=delete_volume, force=force)
    except APIError as exc:
        logger.error(exc)

testcompose.models.container.supported_placeholders.SupportedPlaceholders dataclass

SupportedPlaceholders(SELF_HOST: str = 'self', CONTAINER_HOSTNAME: str = 'container_hostname', EXTERNAL_PORT: str = 'external_port', CONTAINER_HOST_ADDRESS: str = 'container_host_address')

Source code in testcompose/models/container/supported_placeholders.py
@dataclass(frozen=True)
class SupportedPlaceholders:
    SELF_HOST: str = 'self'
    CONTAINER_HOSTNAME: str = 'container_hostname'
    EXTERNAL_PORT: str = 'external_port'
    CONTAINER_HOST_ADDRESS: str = 'container_host_address'

CONTAINER_HOSTNAME: str dataclass-field

CONTAINER_HOST_ADDRESS: str dataclass-field

EXTERNAL_PORT: str dataclass-field

SELF_HOST: str dataclass-field

testcompose.models.client.client_login.ClientFromEnv (BaseModel) pydantic-model

Source code in testcompose/models/client/client_login.py
class ClientFromEnv(BaseModel):
    use_ssh_client: bool = False
    ssl_version: Optional[int] = None
    assert_hostname: Optional[bool] = None
    environment: Optional[Dict[str, Any]] = None
    version: str = "auto"
    timeout: Optional[int] = DEFAULT_TIMEOUT_SECONDS
    max_pool_size: Optional[int] = DEFAULT_MAX_POOL_SIZE

    @validator('version')
    def validate_version(cls, v):
        return v or "auto"

    @validator('timeout')
    def validate_timeout(cls, v):
        return v or DEFAULT_TIMEOUT_SECONDS

    @validator('max_pool_size')
    def validate_max_pool_size(cls, v):
        return v or DEFAULT_MAX_POOL_SIZE

assert_hostname: bool pydantic-field

environment: Dict[str, Any] pydantic-field

max_pool_size: int pydantic-field

ssl_version: int pydantic-field

timeout: int pydantic-field

use_ssh_client: bool pydantic-field

version: str pydantic-field

validate_max_pool_size(v) classmethod

Source code in testcompose/models/client/client_login.py
@validator('max_pool_size')
def validate_max_pool_size(cls, v):
    return v or DEFAULT_MAX_POOL_SIZE

validate_timeout(v) classmethod

Source code in testcompose/models/client/client_login.py
@validator('timeout')
def validate_timeout(cls, v):
    return v or DEFAULT_TIMEOUT_SECONDS

validate_version(v) classmethod

Source code in testcompose/models/client/client_login.py
@validator('version')
def validate_version(cls, v):
    return v or "auto"

testcompose.models.client.client_login.ClientFromUrl (BaseModel) pydantic-model

Source code in testcompose/models/client/client_login.py
class ClientFromUrl(BaseModel):
    docker_host: Optional[str] = None
    tls: Optional[bool] = None
    user_agent: Optional[str] = None
    credstor_env: Optional[Dict[str, Any]] = None
    use_ssh_client: Optional[bool] = None
    timeout: Optional[int] = DEFAULT_TIMEOUT_SECONDS
    max_pool_size: Optional[int] = DEFAULT_MAX_POOL_SIZE
    version: str = "auto"

    @validator('version')
    def validate_version(cls, v):
        return v or "auto"

    @validator('timeout')
    def validate_timeout(cls, v):
        return v or DEFAULT_TIMEOUT_SECONDS

    @validator('max_pool_size')
    def validate_max_pool_size(cls, v):
        return v or DEFAULT_MAX_POOL_SIZE

credstor_env: Dict[str, Any] pydantic-field

docker_host: str pydantic-field

max_pool_size: int pydantic-field

timeout: int pydantic-field

tls: bool pydantic-field

use_ssh_client: bool pydantic-field

user_agent: str pydantic-field

version: str pydantic-field

validate_max_pool_size(v) classmethod

Source code in testcompose/models/client/client_login.py
@validator('max_pool_size')
def validate_max_pool_size(cls, v):
    return v or DEFAULT_MAX_POOL_SIZE

validate_timeout(v) classmethod

Source code in testcompose/models/client/client_login.py
@validator('timeout')
def validate_timeout(cls, v):
    return v or DEFAULT_TIMEOUT_SECONDS

validate_version(v) classmethod

Source code in testcompose/models/client/client_login.py
@validator('version')
def validate_version(cls, v):
    return v or "auto"

testcompose.models.client.registry_parameters.Login (BaseModel) pydantic-model

Source code in testcompose/models/client/registry_parameters.py
class Login(BaseModel):
    username: Optional[str] = None
    password: Optional[str] = None
    email: Optional[str] = None
    registry: Optional[str] = None
    reauth: Optional[bool] = False
    dockercfg_path: Optional[str] = None

dockercfg_path: str pydantic-field

email: str pydantic-field

password: str pydantic-field

reauth: bool pydantic-field

registry: str pydantic-field

username: str pydantic-field

testcompose.models.bootstrap.container_service.ContainerService (BaseModel) pydantic-model

Source code in testcompose/models/bootstrap/container_service.py
class ContainerService(BaseModel):
    name: str
    image: str
    exposed_ports: List[str]
    command: str = ''
    environment: Dict[str, Any] = dict()
    depends_on: List[str] = list()
    volumes: List[ContainerVolumeMap] = list()
    log_wait_parameters: Optional[ContainerLogWaitParameter] = None
    http_wait_parameters: Optional[ContainerHttpWaitParameter] = None
    https_wait_parameters: Optional[ContainerHttpWaitParameter] = None
    entrypoint: Optional[str] = None

    @validator('name')
    def validate_service_name(cls, v):
        if not v:
            raise AttributeError("Container Service name is required")
        return v

    @validator('image')
    def validate_image(cls, v):
        if not v:
            raise AttributeError("A valid image name is required")
        return v

command: str pydantic-field

depends_on: List[str] pydantic-field

entrypoint: str pydantic-field

environment: Dict[str, Any] pydantic-field

exposed_ports: List[str] pydantic-field required

http_wait_parameters: ContainerHttpWaitParameter pydantic-field

https_wait_parameters: ContainerHttpWaitParameter pydantic-field

image: str pydantic-field required

log_wait_parameters: ContainerLogWaitParameter pydantic-field

name: str pydantic-field required

volumes: List[testcompose.models.bootstrap.container_volume.ContainerVolumeMap] pydantic-field

validate_image(v) classmethod

Source code in testcompose/models/bootstrap/container_service.py
@validator('image')
def validate_image(cls, v):
    if not v:
        raise AttributeError("A valid image name is required")
    return v

validate_service_name(v) classmethod

Source code in testcompose/models/bootstrap/container_service.py
@validator('name')
def validate_service_name(cls, v):
    if not v:
        raise AttributeError("Container Service name is required")
    return v

testcompose.models.bootstrap.container_service.ContainerServices (BaseModel) pydantic-model

ConfigServices holds Dict of Service and their names

Parameters:

Name Type Description Default
services

Dict[name, Service]

required
Source code in testcompose/models/bootstrap/container_service.py
class ContainerServices(BaseModel):
    """
    ConfigServices holds Dict of Service and their names
    Args:
        services: Dict[name, Service]
    """

    services: Dict[str, ContainerService]

services: Dict[str, testcompose.models.bootstrap.container_service.ContainerService] pydantic-field required

testcompose.models.bootstrap.container_service.RankedContainerServices (BaseModel) pydantic-model

RankedConfigServices holds a dict of services ranked in the order they are to be started.

Parameters:

Name Type Description Default
ranked_services

Dict[rank, name]

required
Source code in testcompose/models/bootstrap/container_service.py
class RankedContainerServices(BaseModel):
    """
    RankedConfigServices holds a dict of services ranked in the order they are to
    be started.
    Args:
        ranked_services: Dict[rank, name]
    """

    ranked_services: Dict[int, str] = dict()

ranked_services: Dict[int, str] pydantic-field

testcompose.models.bootstrap.container_http_wait_parameter.ContainerHttpWaitParameter (BaseModel) pydantic-model

Source code in testcompose/models/bootstrap/container_http_wait_parameter.py
class ContainerHttpWaitParameter(BaseModel):
    http_port: int
    response_status_code: int = 200
    startup_delay_time_ms: int = 20000
    end_point: str = '/'
    use_https: bool = False

    @validator('http_port')
    def validate_http_port(cls, v) -> int:
        if not v or not isinstance(v, int):
            raise AttributeError("A valide Integer exposed Http port must be provided")
        return v

    @validator('end_point')
    def validate_end_point(cls, v) -> str:
        if not v or not isinstance(v, str):
            raise AttributeError("A valide Http endpoint must be provided")
        return v

    @validator('response_status_code')
    def validate_response_status_code(cls, v) -> int:
        if not v or not isinstance(v, int):
            raise AttributeError("A valide Integer Http response code must be provided")
        return v

    @validator('startup_delay_time_ms')
    def validate_startup_delay_time_ms(cls, v) -> int:
        if not v or not isinstance(v, int):
            return 20000
        return v

end_point: str pydantic-field

http_port: int pydantic-field required

response_status_code: int pydantic-field

startup_delay_time_ms: int pydantic-field

use_https: bool pydantic-field

validate_end_point(v) -> str classmethod

Source code in testcompose/models/bootstrap/container_http_wait_parameter.py
@validator('end_point')
def validate_end_point(cls, v) -> str:
    if not v or not isinstance(v, str):
        raise AttributeError("A valide Http endpoint must be provided")
    return v

validate_http_port(v) -> int classmethod

Source code in testcompose/models/bootstrap/container_http_wait_parameter.py
@validator('http_port')
def validate_http_port(cls, v) -> int:
    if not v or not isinstance(v, int):
        raise AttributeError("A valide Integer exposed Http port must be provided")
    return v

validate_response_status_code(v) -> int classmethod

Source code in testcompose/models/bootstrap/container_http_wait_parameter.py
@validator('response_status_code')
def validate_response_status_code(cls, v) -> int:
    if not v or not isinstance(v, int):
        raise AttributeError("A valide Integer Http response code must be provided")
    return v

validate_startup_delay_time_ms(v) -> int classmethod

Source code in testcompose/models/bootstrap/container_http_wait_parameter.py
@validator('startup_delay_time_ms')
def validate_startup_delay_time_ms(cls, v) -> int:
    if not v or not isinstance(v, int):
        return 20000
    return v

testcompose.models.bootstrap.container_log_wait_parameter.ContainerLogWaitParameter (BaseModel) pydantic-model

Source code in testcompose/models/bootstrap/container_log_wait_parameter.py
class ContainerLogWaitParameter(BaseModel):
    log_line_regex: str
    wait_timeout_ms: int = 60000
    poll_interval_ms: int = 10000

    @validator('log_line_regex')
    def validate_log_line_regex(cls, v):
        if not v:
            raise AttributeError("log_line_prefix must be set")
        return v

log_line_regex: str pydantic-field required

poll_interval_ms: int pydantic-field

wait_timeout_ms: int pydantic-field

validate_log_line_regex(v) classmethod

Source code in testcompose/models/bootstrap/container_log_wait_parameter.py
@validator('log_line_regex')
def validate_log_line_regex(cls, v):
    if not v:
        raise AttributeError("log_line_prefix must be set")
    return v

testcompose.models.bootstrap.container_volume.VolumeSourceTypes dataclass

VolumeSourceTypes(FILESYSTEM_SOURCE: str = 'filesystem', DOCKER_VOLUME_SOURCE: str = 'dockervolume')

Source code in testcompose/models/bootstrap/container_volume.py
@dataclass(frozen=True)
class VolumeSourceTypes:
    FILESYSTEM_SOURCE: str = "filesystem"
    DOCKER_VOLUME_SOURCE: str = "dockervolume"

DOCKER_VOLUME_SOURCE: str dataclass-field

FILESYSTEM_SOURCE: str dataclass-field

testcompose.models.bootstrap.container_volume.ContainerVolumeMap (BaseModel) pydantic-model

Source code in testcompose/models/bootstrap/container_volume.py
class ContainerVolumeMap(BaseModel):
    host: str
    container: str
    mode: str = 'ro'
    source: str = VolumeSourceTypes.DOCKER_VOLUME_SOURCE

    @validator('mode')
    def validate_mode(cls, v) -> str:
        assert str(v).lower() in ['ro', 'rw']
        return v

    @validator('source')
    def validate_source(cls, v):
        assert str(v).lower() in ['filesystem', 'dockervolume']
        return v

    @validator('host')
    def validate_host(cls, v):
        if not v:
            raise AttributeError("Volume Host option can not be empty or None")
        return v

    @validator('container')
    def validate_container(cls, v):
        if not v:
            raise AttributeError("Volume container option can not be empty or None")
        return v

container: str pydantic-field required

host: str pydantic-field required

mode: str pydantic-field

source: str pydantic-field

validate_container(v) classmethod

Source code in testcompose/models/bootstrap/container_volume.py
@validator('container')
def validate_container(cls, v):
    if not v:
        raise AttributeError("Volume container option can not be empty or None")
    return v

validate_host(v) classmethod

Source code in testcompose/models/bootstrap/container_volume.py
@validator('host')
def validate_host(cls, v):
    if not v:
        raise AttributeError("Volume Host option can not be empty or None")
    return v

validate_mode(v) -> str classmethod

Source code in testcompose/models/bootstrap/container_volume.py
@validator('mode')
def validate_mode(cls, v) -> str:
    assert str(v).lower() in ['ro', 'rw']
    return v

validate_source(v) classmethod

Source code in testcompose/models/bootstrap/container_volume.py
@validator('source')
def validate_source(cls, v):
    assert str(v).lower() in ['filesystem', 'dockervolume']
    return v

testcompose.models.container.running_container_attributes.PossibleContainerStates dataclass

PossibleContainerStates(RUNNING: str = 'running', EXITED: str = 'exited')

Source code in testcompose/models/container/running_container_attributes.py
@dataclass(frozen=True)
class PossibleContainerStates:
    RUNNING: str = 'running'
    EXITED: str = 'exited'

EXITED: str dataclass-field

RUNNING: str dataclass-field

testcompose.models.container.running_container_attributes.ContainerState (BaseModel) pydantic-model

Source code in testcompose/models/container/running_container_attributes.py
class ContainerState(BaseModel):
    Status: str
    Running: bool
    Paused: bool
    Restarting: bool
    OOMKilled: bool
    Dead: bool
    Pid: int
    ExitCode: int
    Error: str
    StartedAt: str
    FinishedAt: str

Dead: bool pydantic-field required

Error: str pydantic-field required

ExitCode: int pydantic-field required

FinishedAt: str pydantic-field required

OOMKilled: bool pydantic-field required

Paused: bool pydantic-field required

Pid: int pydantic-field required

Restarting: bool pydantic-field required

Running: bool pydantic-field required

StartedAt: str pydantic-field required

Status: str pydantic-field required

testcompose.models.container.running_container_attributes.RunningContainerAttributes (BaseModel) pydantic-model

Source code in testcompose/models/container/running_container_attributes.py
class RunningContainerAttributes(BaseModel):
    Id: str
    State: ContainerState
    Platform: str
    NetworkSettings: ContainerNetworkSettings

Id: str pydantic-field required

NetworkSettings: ContainerNetworkSettings pydantic-field required

Platform: str pydantic-field required

State: ContainerState pydantic-field required

testcompose.models.network.network.NetworkComponents (BaseModel) pydantic-model

Source code in testcompose/models/network/network.py
class NetworkComponents(BaseModel):
    Aliases: Optional[List[str]] = None
    NetworkID: str
    EndpointID: str
    Gateway: str
    IPAddress: str

Aliases: List[str] pydantic-field

EndpointID: str pydantic-field required

Gateway: str pydantic-field required

IPAddress: str pydantic-field required

NetworkID: str pydantic-field required

testcompose.models.network.network.ContainerMappedPorts (BaseModel) pydantic-model

Source code in testcompose/models/network/network.py
class ContainerMappedPorts(BaseModel):
    HostIp: str
    HostPort: str

HostIp: str pydantic-field required

HostPort: str pydantic-field required

testcompose.models.network.network.ContainerNetworkSettings (BaseModel) pydantic-model

Source code in testcompose/models/network/network.py
class ContainerNetworkSettings(BaseModel):
    Ports: Dict[str, Any] = dict()
    Networks: Dict[str, NetworkComponents]

Networks: Dict[str, testcompose.models.network.network.NetworkComponents] pydantic-field required

Ports: Dict[str, Any] pydantic-field

testcompose.models.network.network.DefaultNeworkDrivers dataclass

DefaultNeworkDrivers()

Source code in testcompose/models/network/network.py
@dataclass(frozen=True)
class DefaultNeworkDrivers:
    DEFAULT_BRIDGE_NETWORK = 'bridge'
    DEFAULT_HOST_NETWORK = 'host'
    DEFAULT_NULL_NETWORK = 'null'

DEFAULT_BRIDGE_NETWORK

DEFAULT_HOST_NETWORK

DEFAULT_NULL_NETWORK

testcompose.waiters.endpoint_waiters.EndpointWaiters

Source code in testcompose/waiters/endpoint_waiters.py
class EndpointWaiters:
    @staticmethod
    def _get_container_host_ip() -> str:
        """The host IP where the container runs

        Returns:
            str: host IP
        """
        return socket.gethostbyname(socket.gethostname())

    @staticmethod
    def _check_endpoint(
        docker_client: DockerClient,
        container_id: str,
        wait_parameter: ContainerHttpWaitParameter,
        exposed_ports: Dict[str, str],
    ) -> None:
        """Endpoint health-check for a container. A running service
        with an exposed endpoint is queried and the response code is
        checked with the expected response code.

        Args:
            http_port (str): container service port
            status_code (int, optional): Defaults to 200.
            end_point (str, optional): Provided service endpoint. Defaults to "/".
            server_startup_time (int, optional): Expected wait time for the service to start. Defaults to 20.

        Returns:
            bool: Endpoint returned expected status code
        """
        response_check: bool = True
        for _ in range(0, 3):
            sleep(wait_parameter.startup_delay_time_ms / 1000)
            if not is_container_still_running(docker_client, container_id):
                response_check = False
                break
            try:
                host: str = EndpointWaiters._get_container_host_ip()
                mapped_port: str = exposed_ports[str(wait_parameter.http_port)]
                scheme: str = "https://" if wait_parameter.use_https else "http://"
                site_url: str = scheme + f"{host}:{mapped_port}/{wait_parameter.end_point.lstrip('/')}"
                response: Response = get(url=site_url.rstrip("/"))
                if response.status_code == wait_parameter.response_status_code:
                    break
            except Exception as exc:
                response_check = False
                logger.error("HTTP_CHECK_ERROR: %s", exc)
        if not response_check:
            raise RuntimeError(f"Http check on port {wait_parameter.http_port} failed")
        return

    @staticmethod
    def wait_for_http(
        docker_client: DockerClient,
        container_id: str,
        wait_parameter: ContainerHttpWaitParameter,
        exposed_ports: Dict[str, str],
    ) -> None:
        if wait_parameter:
            EndpointWaiters._check_endpoint(docker_client, container_id, wait_parameter, exposed_ports)

wait_for_http(docker_client: DockerClient, container_id: str, wait_parameter: ContainerHttpWaitParameter, exposed_ports: Dict[str, str]) -> None staticmethod

Source code in testcompose/waiters/endpoint_waiters.py
@staticmethod
def wait_for_http(
    docker_client: DockerClient,
    container_id: str,
    wait_parameter: ContainerHttpWaitParameter,
    exposed_ports: Dict[str, str],
) -> None:
    if wait_parameter:
        EndpointWaiters._check_endpoint(docker_client, container_id, wait_parameter, exposed_ports)

testcompose.waiters.log_waiters.LogWaiter

Source code in testcompose/waiters/log_waiters.py
class LogWaiter:
    @staticmethod
    def search_container_logs(
        docker_client: DockerClient, container: Container, log_parameter: ContainerLogWaitParameter
    ) -> None:
        """Search for a given predicate in the container log. Useful to check if a
        container is running and healthy

        Args:
            search_string (str): predicate to search in the log
            timeout (float, optional): Defaults to 300.0.
            interval (int, optional): Defaults to 1.

        Raises:
            ValueError: if a non string predicate is passed

        Returns:
            bool: True if the log contains the provided predicate
        """
        if not log_parameter:
            return

        if not isinstance(log_parameter.log_line_regex, str):
            raise ValueError

        prog = re.compile(log_parameter.log_line_regex, re.MULTILINE).search
        start: datetime = datetime.now()
        output: Optional[Match[str]] = None
        while (datetime.now() - start).total_seconds() < (log_parameter.wait_timeout_ms / 1000):
            if not is_container_still_running(docker_client, container.id):  # type: ignore
                return
            output = prog(container.logs().decode())
            if output:
                return
            if (datetime.now() - start).total_seconds() > (log_parameter.wait_timeout_ms / 1000):
                raise TimeoutError(
                    "container %s did not emit logs satisfying predicate in %.3f seconds"
                    % (container.name, float(log_parameter.wait_timeout_ms or 60000))
                )
            sleep(log_parameter.poll_interval_ms / 1000)
        logger.info(container.logs().decode())

search_container_logs(docker_client: DockerClient, container: Container, log_parameter: ContainerLogWaitParameter) -> None staticmethod

Search for a given predicate in the container log. Useful to check if a container is running and healthy

Parameters:

Name Type Description Default
search_string str

predicate to search in the log

required
timeout float

Defaults to 300.0.

required
interval int

Defaults to 1.

required

Exceptions:

Type Description
ValueError

if a non string predicate is passed

Returns:

Type Description
bool

True if the log contains the provided predicate

Source code in testcompose/waiters/log_waiters.py
@staticmethod
def search_container_logs(
    docker_client: DockerClient, container: Container, log_parameter: ContainerLogWaitParameter
) -> None:
    """Search for a given predicate in the container log. Useful to check if a
    container is running and healthy

    Args:
        search_string (str): predicate to search in the log
        timeout (float, optional): Defaults to 300.0.
        interval (int, optional): Defaults to 1.

    Raises:
        ValueError: if a non string predicate is passed

    Returns:
        bool: True if the log contains the provided predicate
    """
    if not log_parameter:
        return

    if not isinstance(log_parameter.log_line_regex, str):
        raise ValueError

    prog = re.compile(log_parameter.log_line_regex, re.MULTILINE).search
    start: datetime = datetime.now()
    output: Optional[Match[str]] = None
    while (datetime.now() - start).total_seconds() < (log_parameter.wait_timeout_ms / 1000):
        if not is_container_still_running(docker_client, container.id):  # type: ignore
            return
        output = prog(container.logs().decode())
        if output:
            return
        if (datetime.now() - start).total_seconds() > (log_parameter.wait_timeout_ms / 1000):
            raise TimeoutError(
                "container %s did not emit logs satisfying predicate in %.3f seconds"
                % (container.name, float(log_parameter.wait_timeout_ms or 60000))
            )
        sleep(log_parameter.poll_interval_ms / 1000)
    logger.info(container.logs().decode())

testcompose.run_containers.RunContainers (BaseDockerClient)

Source code in testcompose/run_containers.py
class RunContainers(BaseDockerClient):
    def __init_subclass__(cls, **kwargs) -> None:
        if cls is not RunContainers:
            raise TypeError("The class RunContainers can not be extended")
        return super().__init_subclass__()

    def __init__(
        self,
        config_services: ContainerServices,
        ranked_services: RankedContainerServices,
        registry_login_param=Login(),
        env_param: ClientFromEnv = ClientFromEnv(),
        url_param: ClientFromUrl = ClientFromUrl(),
    ) -> None:
        super(RunContainers, self).__init__(client_env_param=env_param, client_url_param=url_param)
        self._config_services: ContainerServices = config_services
        self._ranked_config_services: RankedContainerServices = ranked_services
        self.running_containers = RunningContainers()
        self.registry_login(login_credentials=registry_login_param)
        self._running_container_labels: List[str] = list()

    @property
    def running_containers(self) -> RunningContainers:
        return self._running_containers

    @running_containers.setter
    def running_containers(self, containers: RunningContainers) -> None:
        self._running_containers: RunningContainers = containers

    @property
    def unique_container_label(self) -> str:
        return self._unique_container_label

    @unique_container_label.setter
    def unique_container_label(self, label: str) -> None:
        self._unique_container_label: str = label

    def __enter__(self) -> RunningContainers:
        try:
            return self.run_containers()
        except Exception:
            self.stop_running_containers()
            return RunningContainers()

    def __exit__(self, exc_type, exc_value, exc_tb) -> None:
        try:
            self.stop_running_containers()
            if exc_tb and exc_type:
                logger.info("%s[%s]: %s", exc_type, exc_value, exc_tb)
        except Exception as exc:
            logger.info(exc.with_traceback(None))

    def run_containers(self) -> RunningContainers:
        self.unique_container_label = uuid4().hex
        network_name: str = f"{self.unique_container_label}_network"
        processed_containers_services: Dict[str, RunningContainer] = dict()
        self._running_container_labels.append(f"label={network_name}")

        for rank in sorted(self._ranked_config_services.ranked_services.keys()):
            service: ContainerService = self._config_services.services[
                self._ranked_config_services.ranked_services[rank]
            ]
            self.pull_docker_image(service.image)
            generic_container: GenericContainer = GenericContainer()
            generic_container.container_network = ContainerNetwork(
                self.docker_client, network_name, labels={"label": network_name}
            )
            generic_container.with_service(
                service,
                processed_containers_services,
                generic_container.container_network.name,  # type: ignore
            )
            generic_container.container_label = f"{self.unique_container_label}_{service.name}"
            self._running_container_labels.append(generic_container.container_label)
            try:
                log_wait_timeout = 120
                if service.log_wait_parameters:
                    log_wait_timeout = int((service.log_wait_parameters.wait_timeout_ms or 120000) / 1000)
                generic_container.container = generic_container.start(self.docker_client)
                generic_container.check_container_health(self.docker_client, timeout=log_wait_timeout)
                running_container: RunningContainer = RunningContainer(
                    service_name=service.name,
                    config_environment_variables=generic_container.container_environment_variables,
                    generic_container=generic_container,
                )
                processed_containers_services.update({service.name: running_container})
            except Exception as exc:
                logger.error(exc)
                self.stop_running_containers()
                raise APIError(exc)
        logger.info("The following containers were started: %s", list(processed_containers_services.keys()))
        self.running_containers = RunningContainers(running_containers=processed_containers_services)
        return self.running_containers

    def stop_running_containers(self) -> None:
        self._running_container_labels.sort(reverse=True)
        Housekeeping.perform_housekeeping(
            docker_client=self.docker_client, labels=self._running_container_labels
        )

running_containers: RunningContainers property writable

unique_container_label: str property writable

run_containers(self) -> RunningContainers

Source code in testcompose/run_containers.py
def run_containers(self) -> RunningContainers:
    self.unique_container_label = uuid4().hex
    network_name: str = f"{self.unique_container_label}_network"
    processed_containers_services: Dict[str, RunningContainer] = dict()
    self._running_container_labels.append(f"label={network_name}")

    for rank in sorted(self._ranked_config_services.ranked_services.keys()):
        service: ContainerService = self._config_services.services[
            self._ranked_config_services.ranked_services[rank]
        ]
        self.pull_docker_image(service.image)
        generic_container: GenericContainer = GenericContainer()
        generic_container.container_network = ContainerNetwork(
            self.docker_client, network_name, labels={"label": network_name}
        )
        generic_container.with_service(
            service,
            processed_containers_services,
            generic_container.container_network.name,  # type: ignore
        )
        generic_container.container_label = f"{self.unique_container_label}_{service.name}"
        self._running_container_labels.append(generic_container.container_label)
        try:
            log_wait_timeout = 120
            if service.log_wait_parameters:
                log_wait_timeout = int((service.log_wait_parameters.wait_timeout_ms or 120000) / 1000)
            generic_container.container = generic_container.start(self.docker_client)
            generic_container.check_container_health(self.docker_client, timeout=log_wait_timeout)
            running_container: RunningContainer = RunningContainer(
                service_name=service.name,
                config_environment_variables=generic_container.container_environment_variables,
                generic_container=generic_container,
            )
            processed_containers_services.update({service.name: running_container})
        except Exception as exc:
            logger.error(exc)
            self.stop_running_containers()
            raise APIError(exc)
    logger.info("The following containers were started: %s", list(processed_containers_services.keys()))
    self.running_containers = RunningContainers(running_containers=processed_containers_services)
    return self.running_containers

stop_running_containers(self) -> None

Source code in testcompose/run_containers.py
def stop_running_containers(self) -> None:
    self._running_container_labels.sort(reverse=True)
    Housekeeping.perform_housekeeping(
        docker_client=self.docker_client, labels=self._running_container_labels
    )