"""
Command line interface for photon-mosaic.
"""
import argparse
import importlib.resources as pkg_resources
import logging
import os
import subprocess
from datetime import datetime
from pathlib import Path
import yaml
from photon_mosaic import get_snakefile_path
[docs]
def create_argument_parser():
"""Create and configure the argument parser for the CLI.
Returns
-------
argparse.ArgumentParser
Configured argument parser with all CLI options
"""
parser = argparse.ArgumentParser(
description="Run the photon-mosaic Snakemake pipeline."
)
parser.add_argument(
"--config",
default=None,
help="Path to your config.yaml file.",
)
parser.add_argument(
"--raw_data_base",
default=None,
help="Override raw_data_base in config file",
)
parser.add_argument(
"--processed_data_base",
default=None,
help="Override processed_data_base in config file",
)
parser.add_argument(
"--jobs", default="1", help="Number of parallel jobs to run"
)
parser.add_argument(
"--dry-run", action="store_true", help="Perform a dry run"
)
parser.add_argument(
"--forcerun",
default=None,
help="Force re-run of a specific rule",
)
parser.add_argument(
"--rerun-incomplete",
action="store_true",
help="Rerun any incomplete jobs",
)
parser.add_argument(
"--latency-wait",
default=10,
help="Time to wait before checking if output files are ready",
)
parser.add_argument(
"--log-level",
default="INFO",
help="Log level",
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
help="Enable verbose output (default is quiet mode)",
)
parser.add_argument(
"extra",
nargs=argparse.REMAINDER,
help="Additional arguments to snakemake",
)
parser.add_argument(
"--reset-config",
action="store_true",
help="Reset the config file to the default values",
)
return parser
[docs]
def ensure_default_config(reset_config=False):
"""Ensure the default config file exists, create if missing or
reset requested.
Parameters
----------
reset_config : bool
Whether to reset the config file to defaults
Returns
-------
Path
Path to the default config file
"""
logger = logging.getLogger(__name__)
default_config_dir = Path.home() / ".photon_mosaic"
default_config_path = default_config_dir / "config.yaml"
if not default_config_path.exists() or reset_config:
logger.debug("Creating default config file")
default_config_dir.mkdir(parents=True, exist_ok=True)
source_config_path = pkg_resources.files("photon_mosaic").joinpath(
"workflow", "config.yaml"
)
with (
source_config_path.open("rb") as src,
open(default_config_path, "wb") as dst,
):
dst.write(src.read())
return default_config_path
[docs]
def load_and_process_config(args):
"""Load configuration file and apply CLI overrides.
Parameters
----------
args : argparse.Namespace
Parsed command line arguments
Returns
-------
tuple[dict, Path]
Configuration dictionary and path to config file used
"""
logger = logging.getLogger(__name__)
# Ensure default config exists
default_config_path = ensure_default_config(args.reset_config)
# Determine which config to use
if args.config is not None:
logger.debug(f"Using config file: {args.config}")
config_path = Path(args.config)
else:
logger.debug("Using default config file")
config_path = default_config_path
# Load config
with open(config_path, "r") as f:
config = yaml.safe_load(f)
logger.debug(f"Loaded config from {config_path}:")
logger.debug(f"use_slurm: {config.get('use_slurm', 'NOT SET')}")
logger.debug(f"slurm config: {config.get('slurm', 'NOT SET')}")
# Apply CLI overrides
if args.raw_data_base is not None:
logger.debug(f"Overriding raw_data_base to: {args.raw_data_base}")
config["raw_data_base"] = args.raw_data_base
else:
logger.debug(
f"Using raw_data_base from config file: {config['raw_data_base']}"
)
if args.processed_data_base is not None:
logger.debug(
f"Overriding processed_data_base to: {args.processed_data_base}"
)
config["processed_data_base"] = args.processed_data_base
else:
logger.debug(
"Using processed_data_base from config file: "
f"{config['processed_data_base']}"
)
# Process paths
raw_data_base = Path(config["raw_data_base"]).resolve()
processed_data_base = Path(config["processed_data_base"]).resolve()
# Append derivatives to the processed_data_base if
# it doesn't end with /derivatives
if processed_data_base.name != "derivatives":
processed_data_base = processed_data_base / "derivatives"
config["processed_data_base"] = str(processed_data_base)
# Update default config file if it's the one being used
if args.config is None:
update_default_config(
default_config_path, raw_data_base, processed_data_base
)
return config, config_path
[docs]
def update_default_config(config_path, raw_data_base, processed_data_base):
"""Update the default config file with new base paths while
preserving comments.
Parameters
----------
config_path : Path
Path to the config file to update
raw_data_base : Path
Raw data base path
processed_data_base : Path
Processed data base path
"""
with open(config_path, "r") as f:
config_lines = f.readlines()
with open(config_path, "w") as f:
for line in config_lines:
if line.startswith("processed_data_base:"):
f.write(f"processed_data_base: {processed_data_base}\n")
elif line.startswith("raw_data_base:"):
f.write(f"raw_data_base: {raw_data_base}\n")
else:
f.write(line)
[docs]
def setup_output_directories(processed_data_base):
"""Create necessary output directories for the pipeline.
Parameters
----------
processed_data_base : Path
Base path for processed data
Returns
-------
tuple[Path, Path, Path]
Paths to output directory, logs directory, and configs directory
"""
logger = logging.getLogger(__name__)
output_dir = processed_data_base / "photon-mosaic"
logger.debug(f"Creating output directory: {output_dir}")
logs_dir = output_dir / "logs"
configs_dir = output_dir / "configs"
# Create directories with explicit permissions (rwxr-xr-x)
# This ensures SLURM jobs can access these directories
output_dir.mkdir(parents=True, exist_ok=True)
os.chmod(output_dir, 0o755)
logs_dir.mkdir(exist_ok=True)
os.chmod(logs_dir, 0o755)
configs_dir.mkdir(exist_ok=True)
os.chmod(configs_dir, 0o755)
return output_dir, logs_dir, configs_dir
[docs]
def save_timestamped_config(config, configs_dir):
"""Save configuration with timestamp for reproducibility.
Parameters
----------
config : dict
Configuration dictionary to save
configs_dir : Path
Directory to save config files
Returns
-------
tuple[str, Path]
Timestamp string and path to saved config file
"""
import os
logger = logging.getLogger(__name__)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
config_filename = f"config_{timestamp}.yaml"
config_path = configs_dir / config_filename
with open(config_path, "w") as f:
logger.debug(f"Saving config to: {config_path}")
yaml.dump(config, f)
# Ensure the config file is readable by all users (rw-r--r--)
# This is critical for SLURM jobs that need to read this file
# when Snakemake re-runs itself in each SLURM job context
os.chmod(config_path, 0o644)
logger.debug("Set config file permissions to 644 for SLURM job access")
return timestamp, config_path
[docs]
def build_snakemake_command(args, config_path):
"""Build the base snakemake command with common arguments.
Parameters
----------
args : argparse.Namespace
Parsed command line arguments
config_path : Path
Path to the config file to use
Returns
-------
list[str]
Snakemake command as list of strings
"""
logger = logging.getLogger(__name__)
snakefile_path = get_snakefile_path()
logger.debug(f"Launching snakemake with snakefile: {snakefile_path}")
cmd = [
"snakemake",
"--snakefile",
str(snakefile_path),
"--jobs",
str(args.jobs),
"--configfile",
str(config_path),
]
if args.dry_run:
cmd.append("--dry-run")
if args.forcerun:
cmd.extend(["--forcerun", args.forcerun])
if args.rerun_incomplete:
cmd.append("--rerun-incomplete")
if args.latency_wait:
cmd.extend(["--latency-wait", str(args.latency_wait)])
if not args.verbose:
# Default to quiet mode unless --verbose is specified
cmd.append("--quiet")
return cmd
[docs]
def execute_pipeline_with_retry(cmd, log_path):
"""Execute the snakemake pipeline with automatic
unlock and retry on lock errors.
Parameters
----------
cmd : list[str]
Snakemake command to execute
log_path : Path
Path to save execution logs
Returns
-------
int
Return code from the snakemake execution
"""
logger = logging.getLogger(__name__)
with open(log_path, "w") as logfile:
logger.debug(f"Saving logs to: {log_path}")
logger.info(f"Launching snakemake with command: {' '.join(cmd)}")
result = subprocess.run(cmd, stdout=logfile, stderr=logfile)
# If workflow is locked, automatically unlock and retry
if result.returncode != 0:
# Check if the error is related to locking by reading the log
with open(log_path, "r") as log_read:
log_content = log_read.read()
if (
"locked" in log_content.lower()
or "lock" in log_content.lower()
):
logger.warning(
"Workflow appears to be locked. "
"Attempting automatic unlock and retry..."
)
# Create unlock command
unlock_cmd = cmd.copy()
unlock_cmd.append("--unlock")
# Run unlock command
with open(log_path, "a") as logfile_append:
logfile_append.write("\n--- Auto-unlock attempt ---\n")
unlock_result = subprocess.run(
unlock_cmd,
stdout=logfile_append,
stderr=logfile_append,
)
# If unlock succeeded, retry the original command
if unlock_result.returncode == 0:
logger.info(
"Automatic unlock successful. "
"Retrying pipeline execution..."
)
with open(log_path, "a") as logfile_append:
logfile_append.write(
"\n--- Retry after unlock ---\n"
)
result = subprocess.run(
cmd,
stdout=logfile_append,
stderr=logfile_append,
)
else:
logger.error(
"Automatic unlock failed. "
"Please check the log file for details."
)
return result.returncode
[docs]
def main():
"""Run the photon-mosaic Snakemake pipeline for automated and reproducible
analysis of multiphoton calcium imaging datasets.
This pipeline integrates Suite2p for image registration and signal
extraction, with a standardized output folder structure following
the NeuroBlueprint specification. It is designed for labs that
store their data on servers connected to HPC clusters and want to
batch-process multiple imaging sessions in parallel.
Command Line Arguments
---------------------
--config : str, optional
Path to your config.yaml file. If not provided, uses
~/.photon_mosaic/config.yaml.
--raw_data_base : str, optional
Override raw_data_base in config file (path to raw imaging data).
--processed_data_base : str, optional
Override processed_data_base in config file (path for processed
outputs).
--jobs : str, default="1"
Number of parallel jobs to run.
--dry-run : bool, optional
Perform a dry run to preview the workflow without executing.
--forcerun : str, optional
Force re-run of a specific rule (e.g., 'suite2p').
--rerun-incomplete : bool, optional
Rerun any incomplete jobs.
--latency-wait : int, default=10
Time to wait before checking if output files are ready.
--log-level : str, default="INFO"
Log level.
--verbose, -v : flag, optional
Enable verbose output (default is quiet mode).
--reset-config : flag, optional
Reset the config file to the default values.
extra : list
Additional arguments to pass to snakemake.
Notes
-----
The pipeline will:
1. Create a timestamped config file in derivatives/photon-mosaic/configs/
2. Save execution logs in derivatives/photon-mosaic/logs/
3. Process all TIFF files found in the raw data directory
4. Generate standardized outputs following NeuroBlueprint specification
"""
# Parse command line arguments
parser = create_argument_parser()
args = parser.parse_args()
# Set up logging
logging.basicConfig(level=args.log_level)
logger = logging.getLogger(__name__)
logger.debug("Starting photon-mosaic CLI")
# Load and process configuration
config, _ = load_and_process_config(args)
# Set up output directories
processed_data_base = Path(config["processed_data_base"])
output_dir, logs_dir, configs_dir = setup_output_directories(
processed_data_base
)
# Save timestamped config for reproducibility
timestamp, config_path = save_timestamped_config(config, configs_dir)
# Build snakemake command
cmd = build_snakemake_command(args, config_path)
# Configure SLURM execution if enabled
cmd = configure_slurm_execution(cmd, config)
# Add extra arguments if provided
if args.extra:
cmd.extend(args.extra)
# Execute pipeline with automatic retry on lock errors
log_filename = f"snakemake_{timestamp}.log"
log_path = logs_dir / log_filename
return_code = execute_pipeline_with_retry(cmd, log_path)
# Report final status
if return_code == 0:
logging.info("Snakemake pipeline completed successfully.")
else:
logging.info(
f"Snakemake pipeline failed with exit code {return_code}. "
f"Check the log file at {log_path} for details."
)