Developer Reference
testcompose.client.base_docker_client.BaseDockerClient (ABC)
Source code in testcompose/client/base_docker_client.py
class BaseDockerClient(ABC):
def __init__(self, client_env_param: ClientFromEnv, client_url_param: ClientFromUrl) -> None:
super(BaseDockerClient, self).__init__()
_client_url_param: ClientFromUrl = ClientFromUrl()
_client_env_param: ClientFromEnv = ClientFromEnv()
if client_env_param:
_client_env_param = client_env_param
if client_url_param:
_client_url_param = client_url_param
self.docker_client = self._init_docker_client(
client_url_param=_client_url_param, client_env_param=_client_env_param
)
@property
def docker_client(self) -> DockerClient:
return self._docker_client
@docker_client.setter
def docker_client(self, client: DockerClient) -> None:
self._docker_client: DockerClient = client
def _init_docker_client(
self, *, client_url_param: ClientFromUrl, client_env_param: ClientFromEnv
) -> DockerClient:
_docker_client: DockerClient = self._docker_client_from_env(client_env_param)
if client_url_param.docker_host:
_docker_client = self._docker_client_from_url(client_url_param)
_docker_client.ping()
return _docker_client
def _docker_client_from_env(self, client_env_param: ClientFromEnv) -> DockerClient:
return docker.from_env(
version=client_env_param.version,
timeout=client_env_param.timeout,
max_pool_size=client_env_param.max_pool_size,
use_ssh_client=client_env_param.use_ssh_client,
ssl_version=client_env_param.ssl_version,
assert_hostname=client_env_param.assert_hostname,
environment=client_env_param.environment,
)
def _docker_client_from_url(self, client_url_param: ClientFromUrl) -> DockerClient:
return DockerClient(
base_url=client_url_param.docker_host,
version=client_url_param.version,
timeout=client_url_param.timeout,
tls=client_url_param.tls,
user_agent=client_url_param.user_agent,
credstor_env=client_url_param.credstor_env,
use_ssh_client=client_url_param.use_ssh_client,
max_pool_size=client_url_param.max_pool_size,
)
def registry_login(self, login_credentials: Login) -> None:
if login_credentials.registry:
self.docker_client.login(**login_credentials.dict())
def pull_docker_image(self, image_name: str) -> None:
try:
self.docker_client.images.get(name=image_name)
except ImageNotFound:
self.docker_client.images.pull(repository=image_name)
except Exception:
logger.error(traceback.format_exc())
docker_client: DockerClient
property
writable
pull_docker_image(self, image_name: str) -> None
Source code in testcompose/client/base_docker_client.py
def pull_docker_image(self, image_name: str) -> None:
try:
self.docker_client.images.get(name=image_name)
except ImageNotFound:
self.docker_client.images.pull(repository=image_name)
except Exception:
logger.error(traceback.format_exc())
registry_login(self, login_credentials: Login) -> None
Source code in testcompose/client/base_docker_client.py
def registry_login(self, login_credentials: Login) -> None:
if login_credentials.registry:
self.docker_client.login(**login_credentials.dict())
testcompose.configs.parse_config.TestConfigParser
Source code in testcompose/configs/parse_config.py
class TestConfigParser:
@classmethod
def parse_config(cls, file_name: str) -> ContainerServices:
"""parses and verifies test yaml config file
Args:
file_name (str): absolute path of the config file
Raises:
FileNotFoundError: when config file not present
AttributeError: when config file is empty
Returns:
ConfigServices: A ConfigServices object with all named services in the config
"""
if not os.path.exists(file_name):
raise FileNotFoundError(f"Config file {file_name} does not exist!!")
contents: Dict[str, Any] = dict()
with open(file_name, 'r') as fh:
contents = yaml.safe_load(fh)
if not contents:
raise AttributeError("Config content can not be empty")
services: Dict[str, ContainerService] = dict()
for service in contents["services"]:
services.update({service["name"]: ContainerService(**service)})
container_services: ContainerServices = ContainerServices(services=services)
return container_services
parse_config(file_name: str) -> ContainerServices
classmethod
parses and verifies test yaml config file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_name |
str |
absolute path of the config file |
required |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
when config file not present |
AttributeError |
when config file is empty |
Returns:
Type | Description |
---|---|
ConfigServices |
A ConfigServices object with all named services in the config |
Source code in testcompose/configs/parse_config.py
@classmethod
def parse_config(cls, file_name: str) -> ContainerServices:
"""parses and verifies test yaml config file
Args:
file_name (str): absolute path of the config file
Raises:
FileNotFoundError: when config file not present
AttributeError: when config file is empty
Returns:
ConfigServices: A ConfigServices object with all named services in the config
"""
if not os.path.exists(file_name):
raise FileNotFoundError(f"Config file {file_name} does not exist!!")
contents: Dict[str, Any] = dict()
with open(file_name, 'r') as fh:
contents = yaml.safe_load(fh)
if not contents:
raise AttributeError("Config content can not be empty")
services: Dict[str, ContainerService] = dict()
for service in contents["services"]:
services.update({service["name"]: ContainerService(**service)})
container_services: ContainerServices = ContainerServices(services=services)
return container_services
testcompose.configs.service_config.Config
This class consumes the model created from a config file.
This is an important class that sets the precedence of how the
different containers are to be started and stopped. Usually, the
precedence are set correctly if the depends_on
parameter of the
config is set. Cyclic dependency will fail the test before it starts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
test_services |
ConfigServices |
model resulting from a parsed configuration file. |
required |
Source code in testcompose/configs/service_config.py
class Config:
"""This class consumes the model created from a config file.
This is an important class that sets the precedence of how the
different containers are to be started and stopped. Usually, the
precedence are set correctly if the `depends_on` parameter of the
config is set. Cyclic dependency will fail the test before it starts.
Args:
test_services (ConfigServices): model resulting from a parsed configuration file.
"""
def __init__(self, test_services: ContainerServices) -> None:
self._rank_test_services(test_services)
@property
def ranked_config_services(self) -> RankedContainerServices:
"""Object containing the ordered services from the config
Returns:
RankedServices: ranked container services
"""
return self._ranked_it_services
@ranked_config_services.setter
def ranked_config_services(self, ranked_services: RankedContainerServices) -> None:
self._ranked_it_services: RankedContainerServices = ranked_services
def _rank_test_services(self, test_services: ContainerServices) -> None:
"""
Args:
test_services (ConfigServices): model resulting from a parsed configuration file.
Raises:
ValueError: raised if test_services is `null`
AttributeError: raised if no concreate networking is provided
"""
if not test_services:
logger.error("Config content can not be Null")
raise ValueError
if not test_services.services:
logger.error("No service was found in the provided config")
raise ValueError
_processed_containers: Dict[str, int] = self._compute_container_ranks(
ranked_services=dict(),
config_services=test_services,
)
_processed_containers_reversed: Dict[int, str] = {
rank: service for service, rank in _processed_containers.items()
}
self.ranked_config_services = RankedContainerServices(ranked_services=_processed_containers_reversed)
def _compute_container_ranks(
self,
*,
ranked_services: Dict[str, int],
config_services: ContainerServices,
) -> Dict[str, int]:
"""The main method that computes the ranking of the services specified
in the config.
Args:
ranked_services (Dict[str, int]): dict container service name and their assigned ranks
config_services (ConfigServices): config services generated from the supplied configuration file
Raises:
AttributeError: raised to prevent empty configuration properties to be passed to this function
ValueError: rasied when cyclic dependency is detected
Returns:
Dict[str, int]: A list of ranked service models.
"""
_ranked_services: Dict[str, int] = deepcopy(ranked_services)
if not config_services:
raise AttributeError("A valid config for test services must be provided")
rank: int = len(ranked_services.keys())
if len(config_services.services.keys()) == len(ranked_services.keys()):
return ranked_services
else:
services: Dict[str, ContainerService] = {
x: y for x, y in config_services.services.items() if x not in _ranked_services
}
for service_name, service in services.items():
if not service.depends_on:
_ranked_services.update({service_name: rank})
rank += 1
else:
if set(service.depends_on).issubset(
_ranked_services.keys()
) and not self._check_cyclic_dependency(
[config_services.services[x] for x in _ranked_services], service_name
):
_ranked_services.update({service_name: rank})
rank += 1
elif not set(service.depends_on).issubset(list(config_services.services.keys())):
raise AttributeError(
f"Invalid service name or dependencies detected: {service_name} <=> {set(service.depends_on)}"
)
return self._compute_container_ranks(
ranked_services=_ranked_services, config_services=config_services
)
@staticmethod
def _check_cyclic_dependency(
processed_services: List[ContainerService], dependent_service_name: str
) -> bool:
for service in processed_services:
if set([dependent_service_name]).issubset(service.depends_on):
return True
return False
ranked_config_services: RankedContainerServices
property
writable
Object containing the ordered services from the config
Returns:
Type | Description |
---|---|
RankedServices |
ranked container services |
testcompose.configs.generate_template_config.GenerateConfigTemplate
Source code in testcompose/configs/generate_template_config.py
class GenerateConfigTemplate:
def _remove_keys(self, keys: List[str], template_dict: Dict[str, Any]):
_template = deepcopy(template_dict)
for k in keys:
_template.pop(k)
return _template
def _app_db_template(self) -> Dict[str, List[Dict[str, Any]]]:
_app = self._remove_keys(["http_wait_parameters"], simple_app_or_db())
_app["name"] = "app"
_app["environment"] = {
"DB_URL": "${db.db_user}:${db.db_password}@${db.container_hostname}:5432/${db.db_name}"
}
_app["depends_on"] = ["db"]
_db = self._remove_keys(["http_wait_parameters", "depends_on"], simple_app_or_db())
_db["name"] = "db"
_db["environment"] = {"DB_USER": "user", "DB_PASSWORD": "very-secret", "DB_NAME": "some_db_name"}
return {"services": [_app, _db]}
def app_template(self) -> Dict[str, Any]:
_template = self._remove_keys(["depends_on"], simple_app_or_db())
_template['name'] = 'app'
return {"services": [_template]}
def db_template(self) -> Dict[str, Any]:
_template = self._remove_keys(["http_wait_parameters", "depends_on"], simple_app_or_db())
_template['name'] = 'db'
return {"services": [_template]}
def broker_template(self) -> Dict[str, Any]:
return simple_broker()
def app_db_template(self) -> Dict[str, Any]:
return self._app_db_template()
def app_broker_db_template(self) -> Dict[str, Any]:
_services: List[Dict[str, Any]] = list()
for x in self._app_db_template()['services']:
app = deepcopy(x)
if x.get('name') == 'app':
app["depends_on"].append("broker")
_services.append(app)
_broker_template: Dict[str, Any] = simple_broker()
_services.extend(_broker_template['services'])
_broker_template['services'] = _services
return _broker_template
def app_broker_template(self) -> Dict[str, Any]:
_app_template: Dict[str, Any] = simple_app_or_db()
_app_template['depends_on'] = ['broker']
_app_template['name'] = 'app'
_broker_template: Dict[str, Any] = simple_broker()
_services: List[Dict[str, Any]] = _broker_template['services']
_services.append(_app_template)
_broker_template['services'] = _services
return _broker_template
def broker_db_template(self) -> Dict[str, Any]:
_db_template: Dict[str, Any] = simple_app_or_db()
_db_template['name'] = 'db'
_broker_template: Dict[str, Any] = simple_broker()
_services: List[Dict[str, Any]] = _broker_template['services']
_services.append(_db_template)
_broker_template['services'] = _services
return _broker_template
app_broker_db_template(self) -> Dict[str, Any]
Source code in testcompose/configs/generate_template_config.py
def app_broker_db_template(self) -> Dict[str, Any]:
_services: List[Dict[str, Any]] = list()
for x in self._app_db_template()['services']:
app = deepcopy(x)
if x.get('name') == 'app':
app["depends_on"].append("broker")
_services.append(app)
_broker_template: Dict[str, Any] = simple_broker()
_services.extend(_broker_template['services'])
_broker_template['services'] = _services
return _broker_template
app_broker_template(self) -> Dict[str, Any]
Source code in testcompose/configs/generate_template_config.py
def app_broker_template(self) -> Dict[str, Any]:
_app_template: Dict[str, Any] = simple_app_or_db()
_app_template['depends_on'] = ['broker']
_app_template['name'] = 'app'
_broker_template: Dict[str, Any] = simple_broker()
_services: List[Dict[str, Any]] = _broker_template['services']
_services.append(_app_template)
_broker_template['services'] = _services
return _broker_template
app_db_template(self) -> Dict[str, Any]
Source code in testcompose/configs/generate_template_config.py
def app_db_template(self) -> Dict[str, Any]:
return self._app_db_template()
app_template(self) -> Dict[str, Any]
Source code in testcompose/configs/generate_template_config.py
def app_template(self) -> Dict[str, Any]:
_template = self._remove_keys(["depends_on"], simple_app_or_db())
_template['name'] = 'app'
return {"services": [_template]}
broker_db_template(self) -> Dict[str, Any]
Source code in testcompose/configs/generate_template_config.py
def broker_db_template(self) -> Dict[str, Any]:
_db_template: Dict[str, Any] = simple_app_or_db()
_db_template['name'] = 'db'
_broker_template: Dict[str, Any] = simple_broker()
_services: List[Dict[str, Any]] = _broker_template['services']
_services.append(_db_template)
_broker_template['services'] = _services
return _broker_template
broker_template(self) -> Dict[str, Any]
Source code in testcompose/configs/generate_template_config.py
def broker_template(self) -> Dict[str, Any]:
return simple_broker()
db_template(self) -> Dict[str, Any]
Source code in testcompose/configs/generate_template_config.py
def db_template(self) -> Dict[str, Any]:
_template = self._remove_keys(["http_wait_parameters", "depends_on"], simple_app_or_db())
_template['name'] = 'db'
return {"services": [_template]}
testcompose.containers.base_container.BaseContainer
Source code in testcompose/containers/base_container.py
class BaseContainer:
def __init__(self) -> None:
self._image_pull_policy: str = 'ALWAYS_PULL'
@property
def image(self) -> str:
return self._image
@image.setter
def image(self, name: str) -> None:
if not name:
raise ValueError("A valid Image entity must be provided")
self._image: str = name
@property
def command(self) -> str:
return self._command
@command.setter
def command(self, command: str) -> None:
self._command: str = command
@property
def entry_point(self) -> str:
return self._entry_point
@entry_point.setter
def entry_point(self, entry_point: str) -> None:
self._entry_point: str = entry_point
@property
def host_name(self) -> str:
return self._host_name
@host_name.setter
def host_name(self, host_name: str) -> None:
self._host_name: str = host_name
@property
def network(self) -> str:
return self._network
@network.setter
def network(self, network: str) -> None:
self._network: str = network
@property
def http_waiter(self) -> ContainerHttpWaitParameter:
return self._http_waiter
@http_waiter.setter
def http_waiter(self, http_waiter: ContainerHttpWaitParameter) -> None:
self._http_waiter: ContainerHttpWaitParameter = http_waiter
@property
def log_waiter(self) -> ContainerLogWaitParameter:
return self._log_waiter
@log_waiter.setter
def log_waiter(self, log_waiter: ContainerLogWaitParameter) -> None:
self._log_waiter: ContainerLogWaitParameter = log_waiter
@property
def ports(self) -> Dict[int, Any]:
return self._ports
@ports.setter
def ports(self, ports: List[str]) -> None:
self._ports: Dict[int, Any] = self._exposed_ports(ports)
@property
def container_environment_variables(self) -> Dict[str, Any]:
return self._container_environment_variables
@container_environment_variables.setter
def container_environment_variables(self, env: Dict[str, Any]) -> None:
self._container_environment_variables: Dict[str, Any] = deepcopy(env)
@property
def volumes(self) -> Dict[str, Dict[str, str]]:
return self._volumes
@volumes.setter
def volumes(self, volumes: List[ContainerVolumeMap]) -> None:
self._volumes: Dict[str, Dict[str, str]] = self._container_volumes(volumes)
def with_service(
self,
service: ContainerService,
processed_containers_services: Dict[str, RunningContainer],
network: str,
) -> 'BaseContainer':
"""
The initialization method that converts a config
service into a Generic Container. It leverages other
internal _with methods to assign the complete container
properties.
Args:
service (Service): a config service
processed_containers_services (Dict[str, Any]): a dict of a service that
had already been initiated and the container is running.
network (str): the test network name, to attach all containers to.
Returns:
BaseServiceContainer
"""
self.image = service.image
self.command = service.command
(
substituted_env_variables,
modified_exposed_ports,
) = ContainerUtils.replace_container_config_placeholders(
service_env_variables=service.environment,
running_containers=processed_containers_services,
service_name=service.name,
exposed_ports=service.exposed_ports,
)
self.container_environment_variables = substituted_env_variables
self.ports = modified_exposed_ports
self.http_waiter = service.http_wait_parameters # type: ignore
self.volumes = service.volumes
self.entry_point = service.entrypoint # type: ignore
self.log_waiter = service.log_wait_parameters # type: ignore
self.host_name = service.name
self.network = network
return self
def _exposed_ports(self, ports: Optional[List[str]]) -> Dict[int, Any]:
"""List of exposed port to be assigned random port
numbers on the host. Random ports are exposed to the
host. A fixed port can be assigned on the host by providing
the port in the format **[host_port:container_port]**
Args:
ports (Optional[List[str]]): list of container exposed ports
"""
exposed_ports: Dict[int, Any] = dict()
if ports:
for port in ports:
_ports: List[str] = self._generate_exposed_ports(re.sub(r"(\s)", "", port))
for _port in _ports:
_split_port: List[str] = _port.split(":")
if len(_split_port) == 2:
exposed_ports[int(_split_port[1])] = int(_split_port[0])
else:
exposed_ports[int(_port)] = None
return exposed_ports
def _generate_exposed_ports(self, port: str) -> List[str]:
matches: List[Any] = re.findall(r"\-", port)
unprocessed_ports: List[str] = list()
if len(matches) == 0:
unprocessed_ports.append(port)
elif len(matches) == 1:
start, end = str(port).split("-")
if int(end) <= int(start):
raise AttributeError(
f"Start exposed port {start} must be less than end exposed port {end} for port ranges!"
)
unprocessed_ports = [str(x) for x in range(int(start), int(end) + 1)]
else:
raise AttributeError("Allowed exposed port format is host:container or port1 - port2")
return unprocessed_ports
def _container_volumes(
self, volumes: Optional[List[ContainerVolumeMap]] = None
) -> Dict[str, Dict[str, str]]:
"""A list of volume mappings to be mounted in the container.
VolumeMapping:
host: host volume path or a docker volume name
container: path to mount the host volume in the container
mode: volume mode [ro|rw]
source: source of the volume [filesystem|dockervolume]
filesystem: a file or directory to be mounted
dockervolume: a docker volume (existing or to be created)
Args:
volumes (Optional[List[VolumeMapping]]): Optional list of volumes to mount on the container
"""
mapped_volumes: Dict[str, Dict[str, str]] = dict()
if volumes:
for vol in volumes:
host_bind: Optional[str] = None
if vol.source == VolumeSourceTypes.DOCKER_VOLUME_SOURCE:
host_bind = vol.host
elif vol.source == VolumeSourceTypes.FILESYSTEM_SOURCE:
host_bind = str(pathlib.Path(vol.host).absolute())
if not host_bind:
raise ValueError("Volume source can only be one of local|docker")
mapped_volumes[host_bind] = {"bind": vol.container, "mode": vol.mode}
return mapped_volumes
@abstractmethod
def start(self) -> None:
raise NotImplementedError
@abstractmethod
def stop(self, force=True, delete_volume=True) -> None:
raise NotImplementedError
command: str
property
writable
container_environment_variables: Dict[str, Any]
property
writable
entry_point: str
property
writable
host_name: str
property
writable
http_waiter: ContainerHttpWaitParameter
property
writable
image: str
property
writable
log_waiter: ContainerLogWaitParameter
property
writable
network: str
property
writable
ports: Dict[int, Any]
property
writable
volumes: Dict[str, Dict[str, str]]
property
writable
start(self) -> None
Source code in testcompose/containers/base_container.py
@abstractmethod
def start(self) -> None:
raise NotImplementedError
stop(self, force = True, delete_volume = True) -> None
Source code in testcompose/containers/base_container.py
@abstractmethod
def stop(self, force=True, delete_volume=True) -> None:
raise NotImplementedError
with_service(self, service: ContainerService, processed_containers_services: Dict[str, testcompose.models.container.running_container.RunningContainer], network: str) -> BaseContainer
The initialization method that converts a config service into a Generic Container. It leverages other internal _with methods to assign the complete container properties.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
Service |
a config service |
required |
processed_containers_services |
Dict[str, Any] |
a dict of a service that had already been initiated and the container is running. |
required |
network |
str |
the test network name, to attach all containers to. |
required |
Returns:
Type | Description |
---|---|
BaseContainer |
BaseServiceContainer |
Source code in testcompose/containers/base_container.py
def with_service(
self,
service: ContainerService,
processed_containers_services: Dict[str, RunningContainer],
network: str,
) -> 'BaseContainer':
"""
The initialization method that converts a config
service into a Generic Container. It leverages other
internal _with methods to assign the complete container
properties.
Args:
service (Service): a config service
processed_containers_services (Dict[str, Any]): a dict of a service that
had already been initiated and the container is running.
network (str): the test network name, to attach all containers to.
Returns:
BaseServiceContainer
"""
self.image = service.image
self.command = service.command
(
substituted_env_variables,
modified_exposed_ports,
) = ContainerUtils.replace_container_config_placeholders(
service_env_variables=service.environment,
running_containers=processed_containers_services,
service_name=service.name,
exposed_ports=service.exposed_ports,
)
self.container_environment_variables = substituted_env_variables
self.ports = modified_exposed_ports
self.http_waiter = service.http_wait_parameters # type: ignore
self.volumes = service.volumes
self.entry_point = service.entrypoint # type: ignore
self.log_waiter = service.log_wait_parameters # type: ignore
self.host_name = service.name
self.network = network
return self
testcompose.containers.container_network.ContainerNetwork
Network management for running container.This class ensures all containers in the same test belong to the same network. Contians utility to create and cleanup network.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
docker_client |
DockerClient |
Docker client |
required |
network_name |
str |
Name of test network |
required |
Source code in testcompose/containers/container_network.py
class ContainerNetwork:
"""Network management for running container.This class ensures
all containers in the same test belong to the same network.
Contians utility to create and cleanup network.
Args:
docker_client (DockerClient): Docker client
network_name (str): Name of test network
"""
def __init__(
self, docker_client: DockerClient, network_name: str, labels: Dict[str, str] = dict()
) -> None:
self._docker_client: DockerClient = docker_client
self._assign_group_network(network_name, labels=labels)
@property
def network(self) -> Network:
"""Network Object
Returns:
docker.models.networks.Network: container network object
"""
return self._container_network
@network.setter
def network(self, network: Network) -> None:
self._container_network: Network = network
@property
def name(self) -> Optional[str]:
return self.network.name
@property
def network_id(self) -> Optional[str]:
return self.network.short_id
def remove_network(self) -> None:
try:
if self.network.name not in ['bridge', 'none', 'host']:
self.network.remove()
except Exception as exc:
print(f"Test network could not be removed. Still dangling ... {exc}")
def _assign_group_network(
self,
network_name: str,
labels: Dict[str, str] = dict(),
driver: str = DefaultNeworkDrivers.DEFAULT_BRIDGE_NETWORK,
) -> None:
try:
self.network = (self._docker_client.networks.list(names=[network_name]))[0] # type: ignore
except Exception:
self.network = self._docker_client.networks.create(
name=network_name,
driver=driver,
check_duplicate=True,
internal=False,
labels=labels or None,
enable_ipv6=False,
attachable=True,
scope='local',
) # type: ignore
name: Optional[str]
property
readonly
network: Network
property
writable
Network Object
Returns:
Type | Description |
---|---|
docker.models.networks.Network |
container network object |
network_id: Optional[str]
property
readonly
remove_network(self) -> None
Source code in testcompose/containers/container_network.py
def remove_network(self) -> None:
try:
if self.network.name not in ['bridge', 'none', 'host']:
self.network.remove()
except Exception as exc:
print(f"Test network could not be removed. Still dangling ... {exc}")
testcompose.containers.container_utils.ContainerUtils
Source code in testcompose/containers/container_utils.py
class ContainerUtils:
@staticmethod
def replace_container_config_placeholders(
service_env_variables: Dict[str, Any],
running_containers: Dict[str, RunningContainer],
service_name: str,
exposed_ports: List[str],
) -> Tuple[Dict[str, Any], List[str]]:
"""Utility method to replace placeholders in the service containers.
Placeholders are usually of the form *${container_name.containerenv_variable}*.
Args:
service_env_variables (Dict[str, Any]): Dict of config environment variables
running_containers Dict[str, RunningContainer]: Running container object
service_name (str): service name as specified in the config
exposed_ports (List[str]): container exposed ports
Raises:
ValueError: when a placeholder variable is not of the form service_name.variable_name
AttributeError: When a service name could not be found in the list of services obtained from the
provided config file.
Returns:
Tuple[Dict[str, Any], List[str]]: A tuple of `env_config` and `exposed_ports`
"""
pattern: str = "\\$\\{([^}]*)}"
substituted_env_variables: Dict[str, Any] = copy(service_env_variables)
modified_exposed_ports: List[str] = deepcopy(exposed_ports)
cmpl: Any = re.compile(pattern=pattern).findall
for k, v in service_env_variables.items():
if isinstance(v, str):
replaced_variable: str = v
for occurence in cmpl(v):
if len(str(occurence).split(".")) != 2:
raise ValueError
container_name, variable_name = str(occurence).split(".")
value = None
value, _exposed_ports = ContainerUtils._external_ports_variables(
running_containers,
service_name,
container_name,
variable_name,
modified_exposed_ports,
)
if _exposed_ports:
modified_exposed_ports = deepcopy(_exposed_ports)
replaced_variable = replaced_variable.replace(f"${{{occurence}}}", str(value))
substituted_env_variables[k] = replaced_variable
return substituted_env_variables, modified_exposed_ports
@staticmethod
def _get_free_host_port() -> str:
"""Get a free random port number from the container host
Returns:
str: port number
"""
_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
_socket.settimeout(2)
_socket.bind(("", 0))
_, port = _socket.getsockname()
_socket.close()
return port
@staticmethod
def _external_ports_variables(
running_containers: Dict[str, RunningContainer],
service_name: str,
container_name: str,
variable_name: str,
exposed_ports: List[str] = list(),
) -> Tuple[Optional[str], List[str]]:
value: Optional[str] = None
_exposed_ports: List[str] = list()
if container_name.lower() == SupportedPlaceholders.SELF_HOST or variable_name.lower() in [
SupportedPlaceholders.CONTAINER_HOSTNAME,
SupportedPlaceholders.EXTERNAL_PORT,
SupportedPlaceholders.CONTAINER_HOST_ADDRESS,
]:
if (
container_name.lower() == SupportedPlaceholders.SELF_HOST
and variable_name.lower() == SupportedPlaceholders.CONTAINER_HOSTNAME
):
value = service_name
elif (
container_name.lower() != SupportedPlaceholders.SELF_HOST
and variable_name.lower() == SupportedPlaceholders.CONTAINER_HOSTNAME
):
value = container_name
else:
if variable_name.lower().startswith(SupportedPlaceholders.EXTERNAL_PORT):
value, _exposed_ports = ContainerUtils._external_port_variables(
variable_name, exposed_ports
)
elif variable_name.lower() == SupportedPlaceholders.CONTAINER_HOST_ADDRESS:
value = socket.gethostbyname(socket.gethostname())
else:
value = running_containers[
f"{container_name.lower()}"
].generic_container.container_environment_variables[f"{variable_name.upper()}"]
return value, _exposed_ports
@staticmethod
def _external_port_variables(variable_name: str, exposed_ports: List[str]) -> Tuple[str, List[str]]:
_exposed_ports: List[str] = deepcopy(exposed_ports)
container_port: str = re.sub(SupportedPlaceholders.EXTERNAL_PORT + "_", "", variable_name)
host_port: str = ContainerUtils._get_free_host_port()
if container_port and container_port not in exposed_ports:
raise AttributeError(
f"self.hostport_{container_port} must be a valid supplied exposed_ports value!"
)
_exposed_ports.remove(container_port)
_exposed_ports.append(f"{host_port}:{container_port}")
value: str = str(host_port)
return value, _exposed_ports
replace_container_config_placeholders(service_env_variables: Dict[str, Any], running_containers: Dict[str, testcompose.models.container.running_container.RunningContainer], service_name: str, exposed_ports: List[str]) -> Tuple[Dict[str, Any], List[str]]
staticmethod
Utility method to replace placeholders in the service containers. Placeholders are usually of the form ${container_name.containerenv_variable}.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_env_variables |
Dict[str, Any] |
Dict of config environment variables |
required |
running_containers |
Dict[str, RunningContainer] |
Running container object |
required |
service_name |
str |
service name as specified in the config |
required |
exposed_ports |
List[str] |
container exposed ports |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
when a placeholder variable is not of the form service_name.variable_name |
AttributeError |
When a service name could not be found in the list of services obtained from the provided config file. |
Returns:
Type | Description |
---|---|
Tuple[Dict[str, Any], List[str]] |
A tuple of |
Source code in testcompose/containers/container_utils.py
@staticmethod
def replace_container_config_placeholders(
service_env_variables: Dict[str, Any],
running_containers: Dict[str, RunningContainer],
service_name: str,
exposed_ports: List[str],
) -> Tuple[Dict[str, Any], List[str]]:
"""Utility method to replace placeholders in the service containers.
Placeholders are usually of the form *${container_name.containerenv_variable}*.
Args:
service_env_variables (Dict[str, Any]): Dict of config environment variables
running_containers Dict[str, RunningContainer]: Running container object
service_name (str): service name as specified in the config
exposed_ports (List[str]): container exposed ports
Raises:
ValueError: when a placeholder variable is not of the form service_name.variable_name
AttributeError: When a service name could not be found in the list of services obtained from the
provided config file.
Returns:
Tuple[Dict[str, Any], List[str]]: A tuple of `env_config` and `exposed_ports`
"""
pattern: str = "\\$\\{([^}]*)}"
substituted_env_variables: Dict[str, Any] = copy(service_env_variables)
modified_exposed_ports: List[str] = deepcopy(exposed_ports)
cmpl: Any = re.compile(pattern=pattern).findall
for k, v in service_env_variables.items():
if isinstance(v, str):
replaced_variable: str = v
for occurence in cmpl(v):
if len(str(occurence).split(".")) != 2:
raise ValueError
container_name, variable_name = str(occurence).split(".")
value = None
value, _exposed_ports = ContainerUtils._external_ports_variables(
running_containers,
service_name,
container_name,
variable_name,
modified_exposed_ports,
)
if _exposed_ports:
modified_exposed_ports = deepcopy(_exposed_ports)
replaced_variable = replaced_variable.replace(f"${{{occurence}}}", str(value))
substituted_env_variables[k] = replaced_variable
return substituted_env_variables, modified_exposed_ports
testcompose.containers.generic_container.GenericContainer (BaseContainer)
Source code in testcompose/containers/generic_container.py
class GenericContainer(BaseContainer):
def __init__(self) -> None:
super().__init__()
@property
def container_label(self) -> str:
return self._container_label
@container_label.setter
def container_label(self, label: str) -> None:
self._container_label = label
@property
def container_network(self) -> ContainerNetwork:
return self._container_network
@container_network.setter
def container_network(self, network: ContainerNetwork) -> None:
self._container_network: ContainerNetwork = network
@property
def container(self) -> Container:
return self._container
@container.setter
def container(self, container: Container) -> None:
self._container: Container = container
@property
def container_attr(self) -> RunningContainerAttributes:
"""Running container attributes
Returns:
RunningContainerAttributes: Container attribute object
"""
return self._container_attr
@container_attr.setter
def container_attr(self, atrr: Dict[str, Any]) -> None:
"""Running container attributes. Execute reload() to refresh this
property.
Args:
atrr (RunningContainerAttributes): container attributes
"""
self._container_attr: RunningContainerAttributes = RunningContainerAttributes(**atrr)
def start(self, docker_client: DockerClient) -> Container:
"""Start a container"""
if not docker_client.ping():
raise RuntimeError("Docker Client not Running. Please check your docker settings and try again")
return docker_client.containers.run(
image=self.image,
command=self.command,
detach=True,
environment=self.container_environment_variables,
ports=self.ports,
volumes=self.volumes,
entrypoint=self.entry_point,
auto_remove=True,
remove=True,
network=self.network,
hostname=self.host_name,
labels=[self.container_label],
) # type: ignore
def check_container_health(self, docker_client: DockerClient, timeout: int = 120) -> None:
start_time: datetime = datetime.now()
while (datetime.now() - start_time).total_seconds() < timeout:
logger.info(
f"Waiting for containe:{self.container.name} status to \
change from {self.container.status} to {PossibleContainerStates.RUNNING}"
)
self.container.reload()
if self.container.status == PossibleContainerStates.RUNNING:
logger.info(f"Container:{self.container.name} status changed to {self.container.status}")
break
sleep(2)
self.reload(docker_client, self.get_container_id())
if self.container.status != PossibleContainerStates.RUNNING:
for line in self.container.logs(stream=True):
logger.debug(line.decode())
raise RuntimeError(f"Container is in an unwanted state {self.container.status}")
self.reload(docker_client, self.get_container_id())
LogWaiter.search_container_logs(docker_client, self.container, self.log_waiter)
self.reload(docker_client, self.get_container_id())
if self.http_waiter:
mapped_http_port: Dict[str, str] = dict()
mapped_http_port[str(self.http_waiter.http_port)] = self.get_exposed_port( # type: ignore
str(self.http_waiter.http_port)
)
EndpointWaiters.wait_for_http(
docker_client,
self.get_container_id(), # type: ignore
self.http_waiter,
mapped_http_port, # type: ignore
)
def stop(self, force=True, delete_volume=True) -> None:
"""Stop a running container
Args:
force (bool, optional): [description]. Defaults to True.
delete_volume (bool, optional): [description]. Defaults to True.
"""
try:
if self.container:
self.container.remove(v=delete_volume, force=force)
except APIError as exc:
logger.error(exc)
def reload(self, docker_client, container_id) -> None:
"""Reload the attributes of a running container"""
if is_container_still_running(docker_client, container_id):
self.container.reload()
self.container_attr = self.container.attrs # type: ignore
def get_exposed_port(self, port: str) -> Optional[str]:
"""Get host port bound to the container exposed port
Args:
port (str): container exposed port
Returns:
str: Host port bound to the container exposed port
"""
if not port:
return None
return self._get_mapped_container_ports([port])[port]
def _get_mapped_container_ports(self, exposed_ports: List[str]) -> Dict[str, str]:
"""Host port bound to the container returned as a k/v of the
container exposed port as key and the host bound port as the value.
Args:
ports (List[str]): List of container exposed port to be mapped to host port
Returns:
Dict[str, str]: Mapped container-host ports.
"""
mapped_ports: Dict[str, str] = dict()
ports: Dict[str, Any] = self.container_attr.NetworkSettings.Ports
for port in ports:
container_port: str = re.sub("[^0-9]", "", port)
if container_port in exposed_ports and ports[port] and isinstance(ports[port], list):
host_ports: ContainerMappedPorts = ContainerMappedPorts(**(ports[port][0]))
mapped_ports.update({container_port: host_ports.HostPort})
return mapped_ports
def get_container_id(self) -> Optional[str]:
"""Container id
Returns:
Optional[str]: Container Id
"""
if self.container:
return self.container.id
return None
def get_container_host_ip(self) -> str:
"""Container Host IP address
Returns:
str: container host IP Address
"""
return socket.gethostbyname(socket.gethostname())
def exe_command(self, command: Union[str, List[str]]) -> Tuple[int, ByteString]:
"""Execute a command inside a container after it has started running.
Args:
command (Union[str, List[str]]): command to run in the container
Raises:
RuntimeError: when the container object is not set
Returns:
Tuple[int, ByteString]: A tuple of (exit_code, output)
"""
if not self.container:
raise RuntimeError("Container must already be running to exec a command")
return self.container.exec_run(cmd=command)
container: Container
property
writable
container_attr: RunningContainerAttributes
property
writable
Running container attributes
Returns:
Type | Description |
---|---|
RunningContainerAttributes |
Container attribute object |
container_label: str
property
writable
container_network: ContainerNetwork
property
writable
check_container_health(self, docker_client: DockerClient, timeout: int = 120) -> None
Source code in testcompose/containers/generic_container.py
def check_container_health(self, docker_client: DockerClient, timeout: int = 120) -> None:
start_time: datetime = datetime.now()
while (datetime.now() - start_time).total_seconds() < timeout:
logger.info(
f"Waiting for containe:{self.container.name} status to \
change from {self.container.status} to {PossibleContainerStates.RUNNING}"
)
self.container.reload()
if self.container.status == PossibleContainerStates.RUNNING:
logger.info(f"Container:{self.container.name} status changed to {self.container.status}")
break
sleep(2)
self.reload(docker_client, self.get_container_id())
if self.container.status != PossibleContainerStates.RUNNING:
for line in self.container.logs(stream=True):
logger.debug(line.decode())
raise RuntimeError(f"Container is in an unwanted state {self.container.status}")
self.reload(docker_client, self.get_container_id())
LogWaiter.search_container_logs(docker_client, self.container, self.log_waiter)
self.reload(docker_client, self.get_container_id())
if self.http_waiter:
mapped_http_port: Dict[str, str] = dict()
mapped_http_port[str(self.http_waiter.http_port)] = self.get_exposed_port( # type: ignore
str(self.http_waiter.http_port)
)
EndpointWaiters.wait_for_http(
docker_client,
self.get_container_id(), # type: ignore
self.http_waiter,
mapped_http_port, # type: ignore
)
exe_command(self, command: Union[str, List[str]]) -> Tuple[int, ByteString]
Execute a command inside a container after it has started running.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
command |
Union[str, List[str]] |
command to run in the container |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
when the container object is not set |
Returns:
Type | Description |
---|---|
Tuple[int, ByteString] |
A tuple of (exit_code, output) |
Source code in testcompose/containers/generic_container.py
def exe_command(self, command: Union[str, List[str]]) -> Tuple[int, ByteString]:
"""Execute a command inside a container after it has started running.
Args:
command (Union[str, List[str]]): command to run in the container
Raises:
RuntimeError: when the container object is not set
Returns:
Tuple[int, ByteString]: A tuple of (exit_code, output)
"""
if not self.container:
raise RuntimeError("Container must already be running to exec a command")
return self.container.exec_run(cmd=command)
get_container_host_ip(self) -> str
Container Host IP address
Returns:
Type | Description |
---|---|
str |
container host IP Address |
Source code in testcompose/containers/generic_container.py
def get_container_host_ip(self) -> str:
"""Container Host IP address
Returns:
str: container host IP Address
"""
return socket.gethostbyname(socket.gethostname())
get_container_id(self) -> Optional[str]
Container id
Returns:
Type | Description |
---|---|
Optional[str] |
Container Id |
Source code in testcompose/containers/generic_container.py
def get_container_id(self) -> Optional[str]:
"""Container id
Returns:
Optional[str]: Container Id
"""
if self.container:
return self.container.id
return None
get_exposed_port(self, port: str) -> Optional[str]
Get host port bound to the container exposed port
Parameters:
Name | Type | Description | Default |
---|---|---|---|
port |
str |
container exposed port |
required |
Returns:
Type | Description |
---|---|
str |
Host port bound to the container exposed port |
Source code in testcompose/containers/generic_container.py
def get_exposed_port(self, port: str) -> Optional[str]:
"""Get host port bound to the container exposed port
Args:
port (str): container exposed port
Returns:
str: Host port bound to the container exposed port
"""
if not port:
return None
return self._get_mapped_container_ports([port])[port]
reload(self, docker_client, container_id) -> None
Reload the attributes of a running container
Source code in testcompose/containers/generic_container.py
def reload(self, docker_client, container_id) -> None:
"""Reload the attributes of a running container"""
if is_container_still_running(docker_client, container_id):
self.container.reload()
self.container_attr = self.container.attrs # type: ignore
start(self, docker_client: DockerClient) -> Container
Start a container
Source code in testcompose/containers/generic_container.py
def start(self, docker_client: DockerClient) -> Container:
"""Start a container"""
if not docker_client.ping():
raise RuntimeError("Docker Client not Running. Please check your docker settings and try again")
return docker_client.containers.run(
image=self.image,
command=self.command,
detach=True,
environment=self.container_environment_variables,
ports=self.ports,
volumes=self.volumes,
entrypoint=self.entry_point,
auto_remove=True,
remove=True,
network=self.network,
hostname=self.host_name,
labels=[self.container_label],
) # type: ignore
stop(self, force = True, delete_volume = True) -> None
Stop a running container
Parameters:
Name | Type | Description | Default |
---|---|---|---|
force |
bool |
[description]. Defaults to True. |
True |
delete_volume |
bool |
[description]. Defaults to True. |
True |
Source code in testcompose/containers/generic_container.py
def stop(self, force=True, delete_volume=True) -> None:
"""Stop a running container
Args:
force (bool, optional): [description]. Defaults to True.
delete_volume (bool, optional): [description]. Defaults to True.
"""
try:
if self.container:
self.container.remove(v=delete_volume, force=force)
except APIError as exc:
logger.error(exc)
testcompose.models.container.supported_placeholders.SupportedPlaceholders
dataclass
SupportedPlaceholders(SELF_HOST: str = 'self', CONTAINER_HOSTNAME: str = 'container_hostname', EXTERNAL_PORT: str = 'external_port', CONTAINER_HOST_ADDRESS: str = 'container_host_address')
Source code in testcompose/models/container/supported_placeholders.py
@dataclass(frozen=True)
class SupportedPlaceholders:
SELF_HOST: str = 'self'
CONTAINER_HOSTNAME: str = 'container_hostname'
EXTERNAL_PORT: str = 'external_port'
CONTAINER_HOST_ADDRESS: str = 'container_host_address'
CONTAINER_HOSTNAME: str
dataclass-field
CONTAINER_HOST_ADDRESS: str
dataclass-field
EXTERNAL_PORT: str
dataclass-field
SELF_HOST: str
dataclass-field
testcompose.models.client.client_login.ClientFromEnv (BaseModel)
pydantic-model
Source code in testcompose/models/client/client_login.py
class ClientFromEnv(BaseModel):
use_ssh_client: bool = False
ssl_version: Optional[int] = None
assert_hostname: Optional[bool] = None
environment: Optional[Dict[str, Any]] = None
version: str = "auto"
timeout: Optional[int] = DEFAULT_TIMEOUT_SECONDS
max_pool_size: Optional[int] = DEFAULT_MAX_POOL_SIZE
@validator('version')
def validate_version(cls, v):
return v or "auto"
@validator('timeout')
def validate_timeout(cls, v):
return v or DEFAULT_TIMEOUT_SECONDS
@validator('max_pool_size')
def validate_max_pool_size(cls, v):
return v or DEFAULT_MAX_POOL_SIZE
assert_hostname: bool
pydantic-field
environment: Dict[str, Any]
pydantic-field
max_pool_size: int
pydantic-field
ssl_version: int
pydantic-field
timeout: int
pydantic-field
use_ssh_client: bool
pydantic-field
version: str
pydantic-field
validate_max_pool_size(v)
classmethod
Source code in testcompose/models/client/client_login.py
@validator('max_pool_size')
def validate_max_pool_size(cls, v):
return v or DEFAULT_MAX_POOL_SIZE
validate_timeout(v)
classmethod
Source code in testcompose/models/client/client_login.py
@validator('timeout')
def validate_timeout(cls, v):
return v or DEFAULT_TIMEOUT_SECONDS
validate_version(v)
classmethod
Source code in testcompose/models/client/client_login.py
@validator('version')
def validate_version(cls, v):
return v or "auto"
testcompose.models.client.client_login.ClientFromUrl (BaseModel)
pydantic-model
Source code in testcompose/models/client/client_login.py
class ClientFromUrl(BaseModel):
docker_host: Optional[str] = None
tls: Optional[bool] = None
user_agent: Optional[str] = None
credstor_env: Optional[Dict[str, Any]] = None
use_ssh_client: Optional[bool] = None
timeout: Optional[int] = DEFAULT_TIMEOUT_SECONDS
max_pool_size: Optional[int] = DEFAULT_MAX_POOL_SIZE
version: str = "auto"
@validator('version')
def validate_version(cls, v):
return v or "auto"
@validator('timeout')
def validate_timeout(cls, v):
return v or DEFAULT_TIMEOUT_SECONDS
@validator('max_pool_size')
def validate_max_pool_size(cls, v):
return v or DEFAULT_MAX_POOL_SIZE
credstor_env: Dict[str, Any]
pydantic-field
docker_host: str
pydantic-field
max_pool_size: int
pydantic-field
timeout: int
pydantic-field
tls: bool
pydantic-field
use_ssh_client: bool
pydantic-field
user_agent: str
pydantic-field
version: str
pydantic-field
validate_max_pool_size(v)
classmethod
Source code in testcompose/models/client/client_login.py
@validator('max_pool_size')
def validate_max_pool_size(cls, v):
return v or DEFAULT_MAX_POOL_SIZE
validate_timeout(v)
classmethod
Source code in testcompose/models/client/client_login.py
@validator('timeout')
def validate_timeout(cls, v):
return v or DEFAULT_TIMEOUT_SECONDS
validate_version(v)
classmethod
Source code in testcompose/models/client/client_login.py
@validator('version')
def validate_version(cls, v):
return v or "auto"
testcompose.models.client.registry_parameters.Login (BaseModel)
pydantic-model
Source code in testcompose/models/client/registry_parameters.py
class Login(BaseModel):
username: Optional[str] = None
password: Optional[str] = None
email: Optional[str] = None
registry: Optional[str] = None
reauth: Optional[bool] = False
dockercfg_path: Optional[str] = None
dockercfg_path: str
pydantic-field
email: str
pydantic-field
password: str
pydantic-field
reauth: bool
pydantic-field
registry: str
pydantic-field
username: str
pydantic-field
testcompose.models.bootstrap.container_service.ContainerService (BaseModel)
pydantic-model
Source code in testcompose/models/bootstrap/container_service.py
class ContainerService(BaseModel):
name: str
image: str
exposed_ports: List[str]
command: str = ''
environment: Dict[str, Any] = dict()
depends_on: List[str] = list()
volumes: List[ContainerVolumeMap] = list()
log_wait_parameters: Optional[ContainerLogWaitParameter] = None
http_wait_parameters: Optional[ContainerHttpWaitParameter] = None
https_wait_parameters: Optional[ContainerHttpWaitParameter] = None
entrypoint: Optional[str] = None
@validator('name')
def validate_service_name(cls, v):
if not v:
raise AttributeError("Container Service name is required")
return v
@validator('image')
def validate_image(cls, v):
if not v:
raise AttributeError("A valid image name is required")
return v
command: str
pydantic-field
depends_on: List[str]
pydantic-field
entrypoint: str
pydantic-field
environment: Dict[str, Any]
pydantic-field
exposed_ports: List[str]
pydantic-field
required
http_wait_parameters: ContainerHttpWaitParameter
pydantic-field
https_wait_parameters: ContainerHttpWaitParameter
pydantic-field
image: str
pydantic-field
required
log_wait_parameters: ContainerLogWaitParameter
pydantic-field
name: str
pydantic-field
required
volumes: List[testcompose.models.bootstrap.container_volume.ContainerVolumeMap]
pydantic-field
validate_image(v)
classmethod
Source code in testcompose/models/bootstrap/container_service.py
@validator('image')
def validate_image(cls, v):
if not v:
raise AttributeError("A valid image name is required")
return v
validate_service_name(v)
classmethod
Source code in testcompose/models/bootstrap/container_service.py
@validator('name')
def validate_service_name(cls, v):
if not v:
raise AttributeError("Container Service name is required")
return v
testcompose.models.bootstrap.container_service.ContainerServices (BaseModel)
pydantic-model
ConfigServices holds Dict of Service and their names
Parameters:
Name | Type | Description | Default |
---|---|---|---|
services |
Dict[name, Service] |
required |
Source code in testcompose/models/bootstrap/container_service.py
class ContainerServices(BaseModel):
"""
ConfigServices holds Dict of Service and their names
Args:
services: Dict[name, Service]
"""
services: Dict[str, ContainerService]
services: Dict[str, testcompose.models.bootstrap.container_service.ContainerService]
pydantic-field
required
testcompose.models.bootstrap.container_service.RankedContainerServices (BaseModel)
pydantic-model
RankedConfigServices holds a dict of services ranked in the order they are to be started.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ranked_services |
Dict[rank, name] |
required |
Source code in testcompose/models/bootstrap/container_service.py
class RankedContainerServices(BaseModel):
"""
RankedConfigServices holds a dict of services ranked in the order they are to
be started.
Args:
ranked_services: Dict[rank, name]
"""
ranked_services: Dict[int, str] = dict()
ranked_services: Dict[int, str]
pydantic-field
testcompose.models.bootstrap.container_http_wait_parameter.ContainerHttpWaitParameter (BaseModel)
pydantic-model
Source code in testcompose/models/bootstrap/container_http_wait_parameter.py
class ContainerHttpWaitParameter(BaseModel):
http_port: int
response_status_code: int = 200
startup_delay_time_ms: int = 20000
end_point: str = '/'
use_https: bool = False
@validator('http_port')
def validate_http_port(cls, v) -> int:
if not v or not isinstance(v, int):
raise AttributeError("A valide Integer exposed Http port must be provided")
return v
@validator('end_point')
def validate_end_point(cls, v) -> str:
if not v or not isinstance(v, str):
raise AttributeError("A valide Http endpoint must be provided")
return v
@validator('response_status_code')
def validate_response_status_code(cls, v) -> int:
if not v or not isinstance(v, int):
raise AttributeError("A valide Integer Http response code must be provided")
return v
@validator('startup_delay_time_ms')
def validate_startup_delay_time_ms(cls, v) -> int:
if not v or not isinstance(v, int):
return 20000
return v
end_point: str
pydantic-field
http_port: int
pydantic-field
required
response_status_code: int
pydantic-field
startup_delay_time_ms: int
pydantic-field
use_https: bool
pydantic-field
validate_end_point(v) -> str
classmethod
Source code in testcompose/models/bootstrap/container_http_wait_parameter.py
@validator('end_point')
def validate_end_point(cls, v) -> str:
if not v or not isinstance(v, str):
raise AttributeError("A valide Http endpoint must be provided")
return v
validate_http_port(v) -> int
classmethod
Source code in testcompose/models/bootstrap/container_http_wait_parameter.py
@validator('http_port')
def validate_http_port(cls, v) -> int:
if not v or not isinstance(v, int):
raise AttributeError("A valide Integer exposed Http port must be provided")
return v
validate_response_status_code(v) -> int
classmethod
Source code in testcompose/models/bootstrap/container_http_wait_parameter.py
@validator('response_status_code')
def validate_response_status_code(cls, v) -> int:
if not v or not isinstance(v, int):
raise AttributeError("A valide Integer Http response code must be provided")
return v
validate_startup_delay_time_ms(v) -> int
classmethod
Source code in testcompose/models/bootstrap/container_http_wait_parameter.py
@validator('startup_delay_time_ms')
def validate_startup_delay_time_ms(cls, v) -> int:
if not v or not isinstance(v, int):
return 20000
return v
testcompose.models.bootstrap.container_log_wait_parameter.ContainerLogWaitParameter (BaseModel)
pydantic-model
Source code in testcompose/models/bootstrap/container_log_wait_parameter.py
class ContainerLogWaitParameter(BaseModel):
log_line_regex: str
wait_timeout_ms: int = 60000
poll_interval_ms: int = 10000
@validator('log_line_regex')
def validate_log_line_regex(cls, v):
if not v:
raise AttributeError("log_line_prefix must be set")
return v
log_line_regex: str
pydantic-field
required
poll_interval_ms: int
pydantic-field
wait_timeout_ms: int
pydantic-field
validate_log_line_regex(v)
classmethod
Source code in testcompose/models/bootstrap/container_log_wait_parameter.py
@validator('log_line_regex')
def validate_log_line_regex(cls, v):
if not v:
raise AttributeError("log_line_prefix must be set")
return v
testcompose.models.bootstrap.container_volume.VolumeSourceTypes
dataclass
VolumeSourceTypes(FILESYSTEM_SOURCE: str = 'filesystem', DOCKER_VOLUME_SOURCE: str = 'dockervolume')
Source code in testcompose/models/bootstrap/container_volume.py
@dataclass(frozen=True)
class VolumeSourceTypes:
FILESYSTEM_SOURCE: str = "filesystem"
DOCKER_VOLUME_SOURCE: str = "dockervolume"
DOCKER_VOLUME_SOURCE: str
dataclass-field
FILESYSTEM_SOURCE: str
dataclass-field
testcompose.models.bootstrap.container_volume.ContainerVolumeMap (BaseModel)
pydantic-model
Source code in testcompose/models/bootstrap/container_volume.py
class ContainerVolumeMap(BaseModel):
host: str
container: str
mode: str = 'ro'
source: str = VolumeSourceTypes.DOCKER_VOLUME_SOURCE
@validator('mode')
def validate_mode(cls, v) -> str:
assert str(v).lower() in ['ro', 'rw']
return v
@validator('source')
def validate_source(cls, v):
assert str(v).lower() in ['filesystem', 'dockervolume']
return v
@validator('host')
def validate_host(cls, v):
if not v:
raise AttributeError("Volume Host option can not be empty or None")
return v
@validator('container')
def validate_container(cls, v):
if not v:
raise AttributeError("Volume container option can not be empty or None")
return v
container: str
pydantic-field
required
host: str
pydantic-field
required
mode: str
pydantic-field
source: str
pydantic-field
validate_container(v)
classmethod
Source code in testcompose/models/bootstrap/container_volume.py
@validator('container')
def validate_container(cls, v):
if not v:
raise AttributeError("Volume container option can not be empty or None")
return v
validate_host(v)
classmethod
Source code in testcompose/models/bootstrap/container_volume.py
@validator('host')
def validate_host(cls, v):
if not v:
raise AttributeError("Volume Host option can not be empty or None")
return v
validate_mode(v) -> str
classmethod
Source code in testcompose/models/bootstrap/container_volume.py
@validator('mode')
def validate_mode(cls, v) -> str:
assert str(v).lower() in ['ro', 'rw']
return v
validate_source(v)
classmethod
Source code in testcompose/models/bootstrap/container_volume.py
@validator('source')
def validate_source(cls, v):
assert str(v).lower() in ['filesystem', 'dockervolume']
return v
testcompose.models.container.running_container_attributes.PossibleContainerStates
dataclass
PossibleContainerStates(RUNNING: str = 'running', EXITED: str = 'exited')
Source code in testcompose/models/container/running_container_attributes.py
@dataclass(frozen=True)
class PossibleContainerStates:
RUNNING: str = 'running'
EXITED: str = 'exited'
EXITED: str
dataclass-field
RUNNING: str
dataclass-field
testcompose.models.container.running_container_attributes.ContainerState (BaseModel)
pydantic-model
Source code in testcompose/models/container/running_container_attributes.py
class ContainerState(BaseModel):
Status: str
Running: bool
Paused: bool
Restarting: bool
OOMKilled: bool
Dead: bool
Pid: int
ExitCode: int
Error: str
StartedAt: str
FinishedAt: str
Dead: bool
pydantic-field
required
Error: str
pydantic-field
required
ExitCode: int
pydantic-field
required
FinishedAt: str
pydantic-field
required
OOMKilled: bool
pydantic-field
required
Paused: bool
pydantic-field
required
Pid: int
pydantic-field
required
Restarting: bool
pydantic-field
required
Running: bool
pydantic-field
required
StartedAt: str
pydantic-field
required
Status: str
pydantic-field
required
testcompose.models.container.running_container_attributes.RunningContainerAttributes (BaseModel)
pydantic-model
Source code in testcompose/models/container/running_container_attributes.py
class RunningContainerAttributes(BaseModel):
Id: str
State: ContainerState
Platform: str
NetworkSettings: ContainerNetworkSettings
Id: str
pydantic-field
required
NetworkSettings: ContainerNetworkSettings
pydantic-field
required
Platform: str
pydantic-field
required
State: ContainerState
pydantic-field
required
testcompose.models.network.network.NetworkComponents (BaseModel)
pydantic-model
Source code in testcompose/models/network/network.py
class NetworkComponents(BaseModel):
Aliases: Optional[List[str]] = None
NetworkID: str
EndpointID: str
Gateway: str
IPAddress: str
Aliases: List[str]
pydantic-field
EndpointID: str
pydantic-field
required
Gateway: str
pydantic-field
required
IPAddress: str
pydantic-field
required
NetworkID: str
pydantic-field
required
testcompose.models.network.network.ContainerMappedPorts (BaseModel)
pydantic-model
Source code in testcompose/models/network/network.py
class ContainerMappedPorts(BaseModel):
HostIp: str
HostPort: str
HostIp: str
pydantic-field
required
HostPort: str
pydantic-field
required
testcompose.models.network.network.ContainerNetworkSettings (BaseModel)
pydantic-model
Source code in testcompose/models/network/network.py
class ContainerNetworkSettings(BaseModel):
Ports: Dict[str, Any] = dict()
Networks: Dict[str, NetworkComponents]
Networks: Dict[str, testcompose.models.network.network.NetworkComponents]
pydantic-field
required
Ports: Dict[str, Any]
pydantic-field
testcompose.models.network.network.DefaultNeworkDrivers
dataclass
DefaultNeworkDrivers()
Source code in testcompose/models/network/network.py
@dataclass(frozen=True)
class DefaultNeworkDrivers:
DEFAULT_BRIDGE_NETWORK = 'bridge'
DEFAULT_HOST_NETWORK = 'host'
DEFAULT_NULL_NETWORK = 'null'
DEFAULT_BRIDGE_NETWORK
DEFAULT_HOST_NETWORK
DEFAULT_NULL_NETWORK
testcompose.waiters.endpoint_waiters.EndpointWaiters
Source code in testcompose/waiters/endpoint_waiters.py
class EndpointWaiters:
@staticmethod
def _get_container_host_ip() -> str:
"""The host IP where the container runs
Returns:
str: host IP
"""
return socket.gethostbyname(socket.gethostname())
@staticmethod
def _check_endpoint(
docker_client: DockerClient,
container_id: str,
wait_parameter: ContainerHttpWaitParameter,
exposed_ports: Dict[str, str],
) -> None:
"""Endpoint health-check for a container. A running service
with an exposed endpoint is queried and the response code is
checked with the expected response code.
Args:
http_port (str): container service port
status_code (int, optional): Defaults to 200.
end_point (str, optional): Provided service endpoint. Defaults to "/".
server_startup_time (int, optional): Expected wait time for the service to start. Defaults to 20.
Returns:
bool: Endpoint returned expected status code
"""
response_check: bool = True
for _ in range(0, 3):
sleep(wait_parameter.startup_delay_time_ms / 1000)
if not is_container_still_running(docker_client, container_id):
response_check = False
break
try:
host: str = EndpointWaiters._get_container_host_ip()
mapped_port: str = exposed_ports[str(wait_parameter.http_port)]
scheme: str = "https://" if wait_parameter.use_https else "http://"
site_url: str = scheme + f"{host}:{mapped_port}/{wait_parameter.end_point.lstrip('/')}"
response: Response = get(url=site_url.rstrip("/"))
if response.status_code == wait_parameter.response_status_code:
break
except Exception as exc:
response_check = False
logger.error("HTTP_CHECK_ERROR: %s", exc)
if not response_check:
raise RuntimeError(f"Http check on port {wait_parameter.http_port} failed")
return
@staticmethod
def wait_for_http(
docker_client: DockerClient,
container_id: str,
wait_parameter: ContainerHttpWaitParameter,
exposed_ports: Dict[str, str],
) -> None:
if wait_parameter:
EndpointWaiters._check_endpoint(docker_client, container_id, wait_parameter, exposed_ports)
wait_for_http(docker_client: DockerClient, container_id: str, wait_parameter: ContainerHttpWaitParameter, exposed_ports: Dict[str, str]) -> None
staticmethod
Source code in testcompose/waiters/endpoint_waiters.py
@staticmethod
def wait_for_http(
docker_client: DockerClient,
container_id: str,
wait_parameter: ContainerHttpWaitParameter,
exposed_ports: Dict[str, str],
) -> None:
if wait_parameter:
EndpointWaiters._check_endpoint(docker_client, container_id, wait_parameter, exposed_ports)
testcompose.waiters.log_waiters.LogWaiter
Source code in testcompose/waiters/log_waiters.py
class LogWaiter:
@staticmethod
def search_container_logs(
docker_client: DockerClient, container: Container, log_parameter: ContainerLogWaitParameter
) -> None:
"""Search for a given predicate in the container log. Useful to check if a
container is running and healthy
Args:
search_string (str): predicate to search in the log
timeout (float, optional): Defaults to 300.0.
interval (int, optional): Defaults to 1.
Raises:
ValueError: if a non string predicate is passed
Returns:
bool: True if the log contains the provided predicate
"""
if not log_parameter:
return
if not isinstance(log_parameter.log_line_regex, str):
raise ValueError
prog = re.compile(log_parameter.log_line_regex, re.MULTILINE).search
start: datetime = datetime.now()
output: Optional[Match[str]] = None
while (datetime.now() - start).total_seconds() < (log_parameter.wait_timeout_ms / 1000):
if not is_container_still_running(docker_client, container.id): # type: ignore
return
output = prog(container.logs().decode())
if output:
return
if (datetime.now() - start).total_seconds() > (log_parameter.wait_timeout_ms / 1000):
raise TimeoutError(
"container %s did not emit logs satisfying predicate in %.3f seconds"
% (container.name, float(log_parameter.wait_timeout_ms or 60000))
)
sleep(log_parameter.poll_interval_ms / 1000)
logger.info(container.logs().decode())
search_container_logs(docker_client: DockerClient, container: Container, log_parameter: ContainerLogWaitParameter) -> None
staticmethod
Search for a given predicate in the container log. Useful to check if a container is running and healthy
Parameters:
Name | Type | Description | Default |
---|---|---|---|
search_string |
str |
predicate to search in the log |
required |
timeout |
float |
Defaults to 300.0. |
required |
interval |
int |
Defaults to 1. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
if a non string predicate is passed |
Returns:
Type | Description |
---|---|
bool |
True if the log contains the provided predicate |
Source code in testcompose/waiters/log_waiters.py
@staticmethod
def search_container_logs(
docker_client: DockerClient, container: Container, log_parameter: ContainerLogWaitParameter
) -> None:
"""Search for a given predicate in the container log. Useful to check if a
container is running and healthy
Args:
search_string (str): predicate to search in the log
timeout (float, optional): Defaults to 300.0.
interval (int, optional): Defaults to 1.
Raises:
ValueError: if a non string predicate is passed
Returns:
bool: True if the log contains the provided predicate
"""
if not log_parameter:
return
if not isinstance(log_parameter.log_line_regex, str):
raise ValueError
prog = re.compile(log_parameter.log_line_regex, re.MULTILINE).search
start: datetime = datetime.now()
output: Optional[Match[str]] = None
while (datetime.now() - start).total_seconds() < (log_parameter.wait_timeout_ms / 1000):
if not is_container_still_running(docker_client, container.id): # type: ignore
return
output = prog(container.logs().decode())
if output:
return
if (datetime.now() - start).total_seconds() > (log_parameter.wait_timeout_ms / 1000):
raise TimeoutError(
"container %s did not emit logs satisfying predicate in %.3f seconds"
% (container.name, float(log_parameter.wait_timeout_ms or 60000))
)
sleep(log_parameter.poll_interval_ms / 1000)
logger.info(container.logs().decode())
testcompose.run_containers.RunContainers (BaseDockerClient)
Source code in testcompose/run_containers.py
class RunContainers(BaseDockerClient):
def __init_subclass__(cls, **kwargs) -> None:
if cls is not RunContainers:
raise TypeError("The class RunContainers can not be extended")
return super().__init_subclass__()
def __init__(
self,
config_services: ContainerServices,
ranked_services: RankedContainerServices,
registry_login_param=Login(),
env_param: ClientFromEnv = ClientFromEnv(),
url_param: ClientFromUrl = ClientFromUrl(),
) -> None:
super(RunContainers, self).__init__(client_env_param=env_param, client_url_param=url_param)
self._config_services: ContainerServices = config_services
self._ranked_config_services: RankedContainerServices = ranked_services
self.running_containers = RunningContainers()
self.registry_login(login_credentials=registry_login_param)
self._running_container_labels: List[str] = list()
@property
def running_containers(self) -> RunningContainers:
return self._running_containers
@running_containers.setter
def running_containers(self, containers: RunningContainers) -> None:
self._running_containers: RunningContainers = containers
@property
def unique_container_label(self) -> str:
return self._unique_container_label
@unique_container_label.setter
def unique_container_label(self, label: str) -> None:
self._unique_container_label: str = label
def __enter__(self) -> RunningContainers:
try:
return self.run_containers()
except Exception:
self.stop_running_containers()
return RunningContainers()
def __exit__(self, exc_type, exc_value, exc_tb) -> None:
try:
self.stop_running_containers()
if exc_tb and exc_type:
logger.info("%s[%s]: %s", exc_type, exc_value, exc_tb)
except Exception as exc:
logger.info(exc.with_traceback(None))
def run_containers(self) -> RunningContainers:
self.unique_container_label = uuid4().hex
network_name: str = f"{self.unique_container_label}_network"
processed_containers_services: Dict[str, RunningContainer] = dict()
self._running_container_labels.append(f"label={network_name}")
for rank in sorted(self._ranked_config_services.ranked_services.keys()):
service: ContainerService = self._config_services.services[
self._ranked_config_services.ranked_services[rank]
]
self.pull_docker_image(service.image)
generic_container: GenericContainer = GenericContainer()
generic_container.container_network = ContainerNetwork(
self.docker_client, network_name, labels={"label": network_name}
)
generic_container.with_service(
service,
processed_containers_services,
generic_container.container_network.name, # type: ignore
)
generic_container.container_label = f"{self.unique_container_label}_{service.name}"
self._running_container_labels.append(generic_container.container_label)
try:
log_wait_timeout = 120
if service.log_wait_parameters:
log_wait_timeout = int((service.log_wait_parameters.wait_timeout_ms or 120000) / 1000)
generic_container.container = generic_container.start(self.docker_client)
generic_container.check_container_health(self.docker_client, timeout=log_wait_timeout)
running_container: RunningContainer = RunningContainer(
service_name=service.name,
config_environment_variables=generic_container.container_environment_variables,
generic_container=generic_container,
)
processed_containers_services.update({service.name: running_container})
except Exception as exc:
logger.error(exc)
self.stop_running_containers()
raise APIError(exc)
logger.info("The following containers were started: %s", list(processed_containers_services.keys()))
self.running_containers = RunningContainers(running_containers=processed_containers_services)
return self.running_containers
def stop_running_containers(self) -> None:
self._running_container_labels.sort(reverse=True)
Housekeeping.perform_housekeeping(
docker_client=self.docker_client, labels=self._running_container_labels
)
running_containers: RunningContainers
property
writable
unique_container_label: str
property
writable
run_containers(self) -> RunningContainers
Source code in testcompose/run_containers.py
def run_containers(self) -> RunningContainers:
self.unique_container_label = uuid4().hex
network_name: str = f"{self.unique_container_label}_network"
processed_containers_services: Dict[str, RunningContainer] = dict()
self._running_container_labels.append(f"label={network_name}")
for rank in sorted(self._ranked_config_services.ranked_services.keys()):
service: ContainerService = self._config_services.services[
self._ranked_config_services.ranked_services[rank]
]
self.pull_docker_image(service.image)
generic_container: GenericContainer = GenericContainer()
generic_container.container_network = ContainerNetwork(
self.docker_client, network_name, labels={"label": network_name}
)
generic_container.with_service(
service,
processed_containers_services,
generic_container.container_network.name, # type: ignore
)
generic_container.container_label = f"{self.unique_container_label}_{service.name}"
self._running_container_labels.append(generic_container.container_label)
try:
log_wait_timeout = 120
if service.log_wait_parameters:
log_wait_timeout = int((service.log_wait_parameters.wait_timeout_ms or 120000) / 1000)
generic_container.container = generic_container.start(self.docker_client)
generic_container.check_container_health(self.docker_client, timeout=log_wait_timeout)
running_container: RunningContainer = RunningContainer(
service_name=service.name,
config_environment_variables=generic_container.container_environment_variables,
generic_container=generic_container,
)
processed_containers_services.update({service.name: running_container})
except Exception as exc:
logger.error(exc)
self.stop_running_containers()
raise APIError(exc)
logger.info("The following containers were started: %s", list(processed_containers_services.keys()))
self.running_containers = RunningContainers(running_containers=processed_containers_services)
return self.running_containers
stop_running_containers(self) -> None
Source code in testcompose/run_containers.py
def stop_running_containers(self) -> None:
self._running_container_labels.sort(reverse=True)
Housekeeping.perform_housekeeping(
docker_client=self.docker_client, labels=self._running_container_labels
)