Skip to content

Workflows

The workflows module provides Prefect v3 workflows for data ingestion and metadata management. It includes workflows for ingesting sequencer data, updating metadata, and more.

Key Features

  • Prefect v3 workflows for orchestrating complex tasks
  • Sequencer-specific workflows for different sequencer types
  • Metadata extraction and management workflows
  • Error handling and notifications for workflow failures
  • Integration with iRODS for data management

Available Workflows

The following workflows are available:

  • Ingest Sequencer Runs: Ingests sequencer runs of a specific type
  • Ingest All Sequencer Runs: Ingests sequencer runs of all supported types
  • Update Run Metadata: Updates metadata for a sequencer run in iRODS

Basic Usage

Running Workflows Directly

You can run the workflows directly using Python:

from rodrunner.config import get_config
from rodrunner.workflows.ingest import ingest_sequencer_runs

# Load the configuration
config = get_config()

# Run the ingest workflow
results = ingest_sequencer_runs(
    config=config,
    sequencer_type="miseq",
    root_dir="/path/to/sequencer/miseq"
)

# Print the results
for result in results:
    print(f"Run: {result['run_dir']}, Success: {result['success']}")

Running Workflows via the API

You can also run the workflows via the API:

curl -X POST http://localhost:8000/workflows/run \
  -H "Content-Type: application/json" \
  -d '{
    "workflow_name": "Ingest Sequencer Runs",
    "parameters": {
      "sequencer_type": "miseq",
      "root_dir": "/path/to/sequencer/miseq"
    }
  }'

Workflow Details

Ingest Sequencer Runs

The ingest_sequencer_runs workflow ingests sequencer runs of a specific type:

from rodrunner.config import get_config
from rodrunner.workflows.ingest import ingest_sequencer_runs

# Load the configuration
config = get_config()

# Run the ingest workflow
results = ingest_sequencer_runs(
    config=config,
    sequencer_type="miseq",
    root_dir="/path/to/sequencer/miseq",
    completion_indicator="RTAComplete.txt"
)

Parameters: - config: Application configuration - sequencer_type: Type of sequencer (miseq, nextseq, novaseq, etc.) - root_dir: Root directory to search for sequencer runs (defaults to config.sequencer.base_dir) - completion_indicator: Filename that indicates a completed run (defaults to "RTAComplete.txt")

Returns: - List of dictionaries with the results of the ingestion

Ingest All Sequencer Runs

The ingest_all_sequencer_runs workflow ingests sequencer runs of all supported types:

from rodrunner.config import get_config
from rodrunner.workflows.ingest import ingest_all_sequencer_runs

# Load the configuration
config = get_config()

# Run the ingest workflow
results = ingest_all_sequencer_runs(
    config=config,
    root_dir="/path/to/sequencer",
    completion_indicator="RTAComplete.txt"
)

Parameters: - config: Application configuration - root_dir: Root directory to search for sequencer runs (defaults to config.sequencer.base_dir) - completion_indicator: Filename that indicates a completed run (defaults to "RTAComplete.txt")

Returns: - Dictionary mapping sequencer types to lists of ingestion results

Update Run Metadata

The update_run_metadata workflow updates metadata for a sequencer run in iRODS:

from rodrunner.config import get_config
from rodrunner.workflows.metadata import update_run_metadata

# Load the configuration
config = get_config()

# Run the metadata update workflow
result = update_run_metadata(
    config=config,
    irods_path="/tempZone/home/rods/sequencer/miseq/220101_M00001_0001_000000000-A1B2C",
    sequencer_type="miseq"
)

Parameters: - config: Application configuration - irods_path: Path to the sequencer run in iRODS - sequencer_type: Type of sequencer (miseq, nextseq, novaseq, etc.)

Returns: - Dictionary with the result of the metadata update

Advanced Usage

Scheduling Workflows

You can schedule workflows to run at specific intervals using Prefect:

from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

from rodrunner.config import get_config
from rodrunner.workflows.ingest import ingest_all_sequencer_runs

# Create a deployment
deployment = Deployment.build_from_flow(
    flow=ingest_all_sequencer_runs,
    name="scheduled-ingest",
    schedule=CronSchedule(cron="0 0 * * *"),  # Run daily at midnight
    parameters={
        "config": get_config(),
        "root_dir": "/path/to/sequencer"
    }
)

# Apply the deployment
deployment.apply()

Customizing Workflows

You can customize the workflows by creating your own flows that call the existing tasks:

from prefect import flow, task
from rodrunner.config import get_config
from rodrunner.tasks.filesystem import find_sequencer_runs_task
from rodrunner.tasks.parsing import parse_sequencer_run
from rodrunner.irods.client import iRODSClient

@task
def process_run(run_dir, config):
    # Parse the run metadata
    metadata = parse_sequencer_run(run_dir)

    # Create an iRODS client
    irods_client = iRODSClient(config.irods)

    # Upload the run to iRODS
    irods_path = f"/tempZone/home/rods/sequencer/{os.path.basename(run_dir)}"
    irods_client.upload_directory(run_dir, irods_path, metadata=metadata)

    return {
        "run_dir": run_dir,
        "irods_path": irods_path,
        "success": True
    }

@flow
def custom_ingest_workflow(config, sequencer_type, root_dir):
    # Find sequencer runs
    run_dirs = find_sequencer_runs_task(
        root_dir=root_dir,
        sequencer_type=sequencer_type
    )

    # Process each run
    results = []
    for run_dir in run_dirs:
        result = process_run(run_dir, config)
        results.append(result)

    return results

Error Handling and Notifications

You can add error handling and notifications to your workflows:

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import smtplib
from email.message import EmailMessage

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def send_notification(subject, message, to_email):
    msg = EmailMessage()
    msg.set_content(message)
    msg['Subject'] = subject
    msg['From'] = 'noreply@example.com'
    msg['To'] = to_email

    with smtplib.SMTP('smtp.example.com', 587) as server:
        server.starttls()
        server.login('username', 'password')
        server.send_message(msg)

    return True

@flow
def ingest_with_notifications(config, sequencer_type, root_dir, notify_email):
    try:
        # Run the ingest workflow
        results = ingest_sequencer_runs(
            config=config,
            sequencer_type=sequencer_type,
            root_dir=root_dir
        )

        # Count successes and failures
        successes = sum(1 for r in results if r.get('success', False))
        failures = len(results) - successes

        # Send notification
        subject = f"Ingest Workflow Completed: {successes} succeeded, {failures} failed"
        message = f"Ingest workflow for {sequencer_type} completed.\n\n"
        message += f"Successes: {successes}\n"
        message += f"Failures: {failures}\n\n"

        if failures > 0:
            message += "Failed runs:\n"
            for result in results:
                if not result.get('success', False):
                    message += f"- {result.get('run_dir', 'Unknown')}: {result.get('error', 'Unknown error')}\n"

        send_notification(subject, message, notify_email)

        return results

    except Exception as e:
        # Send error notification
        subject = f"Ingest Workflow Failed: {str(e)}"
        message = f"Ingest workflow for {sequencer_type} failed with error:\n\n{str(e)}"
        send_notification(subject, message, notify_email)

        raise

Examples

Complete Ingest and Metadata Update Workflow

from rodrunner.config import get_config
from rodrunner.workflows.ingest import ingest_sequencer_runs
from rodrunner.workflows.metadata import update_run_metadata
from prefect import flow

@flow
def complete_ingest_workflow(sequencer_type, root_dir):
    # Load the configuration
    config = get_config()

    # Run the ingest workflow
    ingest_results = ingest_sequencer_runs(
        config=config,
        sequencer_type=sequencer_type,
        root_dir=root_dir
    )

    # Update metadata for each successful ingest
    metadata_results = []
    for result in ingest_results:
        if result.get('success', False) and 'irods_path' in result:
            metadata_result = update_run_metadata(
                config=config,
                irods_path=result['irods_path'],
                sequencer_type=sequencer_type
            )
            metadata_results.append(metadata_result)

    return {
        "ingest_results": ingest_results,
        "metadata_results": metadata_results
    }

# Run the workflow
results = complete_ingest_workflow("miseq", "/path/to/sequencer/miseq")
print(results)

Parallel Processing of Multiple Sequencer Types

from rodrunner.config import get_config
from rodrunner.workflows.ingest import ingest_sequencer_runs
from prefect import flow, task
from concurrent.futures import ThreadPoolExecutor

@task
def ingest_sequencer_type(config, sequencer_type, root_dir):
    return {
        "sequencer_type": sequencer_type,
        "results": ingest_sequencer_runs(
            config=config,
            sequencer_type=sequencer_type,
            root_dir=f"{root_dir}/{sequencer_type}"
        )
    }

@flow
def parallel_ingest_workflow(root_dir, sequencer_types):
    # Load the configuration
    config = get_config()

    # Process each sequencer type in parallel
    with ThreadPoolExecutor(max_workers=len(sequencer_types)) as executor:
        futures = []
        for sequencer_type in sequencer_types:
            future = executor.submit(
                ingest_sequencer_type.fn,
                config=config,
                sequencer_type=sequencer_type,
                root_dir=root_dir
            )
            futures.append(future)

        # Collect results
        results = {}
        for future in futures:
            result = future.result()
            results[result["sequencer_type"]] = result["results"]

    return results

# Run the workflow
results = parallel_ingest_workflow(
    root_dir="/path/to/sequencer",
    sequencer_types=["miseq", "novaseq", "pacbio"]
)
print(results)

API Reference

For detailed API documentation, see the Workflows API Reference.