Richtige Methode zur Strukturierung eines Kontrollpunkts in SnakemakePython

Python-Programme
Anonymous
 Richtige Methode zur Strukturierung eines Kontrollpunkts in Snakemake

Post by Anonymous »

Mein Snakemake -Workflow braucht ewig, um die DAG zu bewerten. Ich stelle fest, dass dies darauf zurückzuführen ist, dass er den Kontrollpunkt während des Erstellens ausführen muss. Wenn Sie den Kontrollpunkt als Eingabe für eine Regel haben (obwohl ich denke, nicht die All -Regel). Workflow ist hier. Mein Verständnis ist, dass ich die Ausgabe dieser Regel in allen festlegen muss, um den Checkpoint in einer Regel auszulösen. Mein Ausgang ist jedoch ein dynamischer Satz von Dateien (somit die Notwendigkeit des Kontrollpunkts). Ich könnte einen Ordner angeben, aber wenn ich das tue, wird Snakemake nicht wissen, wie man einen unterbrochenen Workflow wieder aufnimmt, wenn der Ordner teilweise voll ist (da er seine Inhalte nicht bewertet). < /P>
Ich möchte auch sicherstellen, dass die Regeln so weit wie möglich zusammengefasst werden, damit Snakemake so viel wie möglich parallelisiert. < /P>
Ich schätze hier alle Tipps. Danke. < /P>

Code: Select all

###############################################################################
# SNAKEMAKE FILE
###############################################################################

# CONFIG AND GLOBALS
################################################################################

configfile: "config.yaml"
# config.yaml is expected to have the keys:

import os
import csv
import glob
from itertools import combinations
from snakemake.shell import shell
from pathlib import Path

# Ensure all required keys exist in config
required_keys = ["genome_dir", "barcode_map", "filter_threshold", "threads", "coverage", "output_dir", "bam_dir"]
missing_keys = [key for key in required_keys if key not in config]
if missing_keys:
raise KeyError(f"Missing required keys in config.yaml: {missing_keys}")

# Parse the barcode map for sample -> treatment
barcodes = []
treatments = []
with open(config["barcode_map"], "r") as f:
reader = csv.reader(f, delimiter="\t")
header = next(reader, None)  # skip header if present
for row in reader:
if len(row) >= 2:
barcodes.append(row[0])
treatments.append(row[1])

SAMPLES = barcodes
TREATMENTS_BY_SAMPLE = dict(zip(barcodes, treatments))
ALL_TREATMENTS = sorted(list(set(treatments)))

# Gather all .fna references
GENOME_FILES = sorted(glob.glob(os.path.join(config["genome_dir"], "*.fna")))
GENOMES = [Path(f).stem for f in GENOME_FILES]

# Make them paths
config["genome_dir"] = Path(config["genome_dir"])
config["output_dir"] = Path(config["output_dir"])
config["bam_dir"] = Path(config["bam_dir"])

################################################################################
# HELPER FUNCTIONS
################################################################################

def is_metagenome(genome_name):
"""Decide if a genome is a 'metagenome' by name."""
return "metagenome"  in genome_name.lower()

def get_genome_fasta(genome_name):
"""Return the full path of the .fna for a given genome name."""
return config["genome_dir"] / f"{genome_name}.fna"

def get_outdir(genome_name):
"""Return the output directory for a given genome."""
return config["output_dir"] / genome_name

def get_bam(sample, genome_name):
"""Return path to the BAM for a sample/genome."""
return config["bam_dir"] / genome_name / f"{sample}.bam"

def get_bed(sample, genome_name):
"""Return path to the raw (uncompressed) BED output of pileup."""
return get_outdir(genome_name) / f"{sample}.bed"

def get_bed_gz(sample, genome_name):
"""Return path to the compressed bed file."""
return get_bed(sample, genome_name).with_suffix(".bed.gz")

def get_summary(sample, genome_name):
"""Return path to the summary TSV for a given sample/genome."""
return get_outdir(genome_name) / f"summary_{sample}.tsv"

def get_sample_probs_dir(sample, genome_name):
"""Return path to the sample-probs directory."""
return get_outdir(genome_name) / f"sample_probs_{sample}"

def get_merged_bed(genome_name):
"""Return path to the uncompressed merged bed of all samples for a genome."""
return get_outdir(genome_name) / "all_samples.bed"

def get_merged_bed_gz(genome_name):
"""Return path to the compressed merged bed of all samples for a genome."""
return get_merged_bed(genome_name).with_suffix(".bed.gz")

def get_genome_sizes(genome_name):
"""Chrom sizes file for merging."""
return get_outdir(genome_name) / "sizes.tsv"

def get_motif_tsv(genome_name, contig=None):
"""Motif TSV output from modkit find-motifs for a single genome."""
if contig:
return get_motifs_dir(genome_name) / f"{config['coverage']}_{contig}_motifs.tsv"
elif not is_metagenome(genome_name):
return get_motifs_dir(genome_name) / f"{config['coverage']}_motifs.tsv"
else:
assert False, "Cannot get motif TSV for metagenome without contig"

def get_motifs_dir(genome_name):
"""Return the directory for all motifs in a single genome pipeline."""
return get_outdir(genome_name) / "motifs"

def get_motif_dir(genome_name, motif, contig=None):
"""Directory for a specific motif in a single genome pipeline."""
if contig:
return get_outdir(genome_name) / contig / motif
else:
return get_motifs_dir(genome_name) / motif

def get_location_bed(genome_name, motif, contig=None):
"""Return path to the motif location bed (contig or full)."""
return get_motif_dir(genome_name, motif, contig) / "location.bed"

def get_motif_bed(sample, genome_name, motif, contig=None):
"""Return path to the motif-based pileup bed for a sample."""
return get_motif_dir(genome_name, motif, contig) / f"{sample}.bed"

def get_dmr_dir(genome_name, motif, contig=None):
"""Return directory for DMR output."""
return get_motif_dir(genome_name, motif, contig) / "dmr"

def get_dmr_file(genome_name, motif, treatmentA, treatmentB, contig=None):
"""Return DMR bed file for a given pair of treatments."""
return get_dmr_dir(genome_name, motif,  contig) / f"{treatmentA}_vs_{treatmentB}.bed"

def get_fai(genome_name):
"""Return .fai index path for a given genome."""
return get_genome_fasta(genome_name).with_suffix(".fna.fai")

def get_contigs(genome_name):
"""Return a list of contigs for a genome."""
fai_path = get_fai(genome_name)
contigs = []
with open(fai_path) as f:
for line in f:
fields = line.strip().split("\t")
if fields:
contigs.append(fields[0])
return contigs

def get_motif_paths(genome_name):
"""Return all motif files for a given genome."""
if is_metagenome(genome_name):
return [get_motif_tsv(genome_name, contig=contig) for contig in get_contigs(genome_name)]
else:
return [get_motif_tsv(genome_name)]

def unpack_motifs(genome_name, contig=None):
"""
Given the motif TSV from find_motifs, parse the lines to discover (mod_code, motif, modpos).
Return them for expansion.
"""
# Trigger the checkpoint
if contig:
chk = checkpoints.find_motifs_contig.get(genome_name=genome_name, contig=contig)
else:
chk = checkpoints.find_motifs_single.get(genome_name=genome_name)

tsv = get_motif_tsv(genome_name, contig=contig)

motifs = []
with open(tsv) as f:
reader = csv.reader(f, delimiter="\t")
next(reader, None)  # skip header
for row in reader:
if len(row) >= 3:
mod_code, motif, modpos = row[0], row[1], row[2]
motifs.append((mod_code, motif, modpos))

return motifs

def get_location_beds(genome_name):
"""Returns paths of all motif location beds associated with this genome"""
res = []
if is_metagenome(genome_name):
for contig in get_contigs(genome_name):
for motif in unpack_motifs(genome_name, contig=contig):
res.append(get_location_bed(genome_name, motif[1], contig=contig))

else:
for motif in unpack_motifs(genome_name):
res.append(get_location_bed(genome_name, motif[1], contig=None))

return res

def get_motif_pileups(genome_name):
"""Returns paths of all motif pileups associated with this genome"""
res = []
if is_metagenome(genome_name):
for contig in get_contigs(genome_name):
for motif in unpack_motifs(genome_name, contig=contig):
for sample in SAMPLES:
res.append(get_motif_bed(sample, genome_name, motif[1], contig=contig))

else:
for motif in unpack_motifs(genome_name):
for sample in SAMPLES:
res.append(get_motif_bed(sample, genome_name, motif[1], contig=None))

return res

def get_dmr_paths(genome_name):
"""
For each pair of treatments, produce the DMR file name as a final target.
"""
# All pairwise combos of treatments
dmr_files = []
for (A, B) in combinations(ALL_TREATMENTS, 2):
if is_metagenome(genome_name):
for contig in get_contigs(genome_name):
for motif in unpack_motifs(genome_name, contig=contig):
dmr_files.append(get_dmr_file(genome_name, motif[1], A, B, contig=contig))
else:
for motif in unpack_motifs(genome_name):
dmr_files.append(get_dmr_file(genome_name, motif[1], A, B, contig=None))

return dmr_files

################################################################################
# GENOME INDEX
################################################################################

rule index_genome:
"""
Create FASTA index if it doesn't exist.
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name)
output:
config["genome_dir"] / "{genome_name}.fna.fai"
threads:  config["threads"]
shell:
"samtools faidx {input.fasta}"

################################################################################
# RULES FOR PILEUP, SUMMARY, SAMPLE-PROBS, COMPRESS
################################################################################

rule modkit_pileup:
"""
Generate the BED file from each sample's BAM using modkit pileup.
"""
input:
bam = lambda wc: get_bam(wc.sample, wc.genome_name),
fai = lambda wc: get_fai(wc.genome_name)
output:
bed = config["output_dir"] / "{genome_name}" / "{sample}.bed"
threads: config["threads"]
params:
filter_threshold = config["filter_threshold"]
shell:
...rule code....

rule modkit_summary:
"""
Generate a summary TSV for each sample.
"""
input:
bam = lambda wc: get_bam(wc.sample, wc.genome_name),
fai = lambda wc: get_fai(wc.genome_name)
output:
tsv = config["output_dir"] / "{genome_name}" / "summary_{sample}.tsv"
threads: config["threads"]
params:
filter_threshold = config["filter_threshold"]
shell:
...rule code....

rule modkit_sample_probs:
"""
Generate sample-probs directory for each sample.
"""
input:
bam = lambda wc: get_bam(wc.sample, wc.genome_name),
fai = lambda wc: get_fai(wc.genome_name)
output:
directory = directory(config["output_dir"] / "{genome_name}" / "sample_probs_{sample}")
threads: config["threads"]
shell:
...rule code....

rule compress_and_index_bed:
"""
Compress (.gz) and tabix-index the raw bed files.
"""
input:
bed = lambda wc: get_bed(wc.sample, wc.genome_name)
output:
gz = config["output_dir"] / "{genome_name}" / "{sample}.bed.gz",
tbi = config["output_dir"] / "{genome_name}" / "{sample}.bed.gz.tbi"
threads: config["threads"]
shell:
...rule code....

################################################################################
# MERGE BED FILES
################################################################################

rule compute_genome_sizes:
"""
Compute chromosome sizes from the FASTA for merging coverage.
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name),
fai = lambda wc: get_fai(wc.genome_name)
output:
sizes = config["output_dir"] / "{genome_name}" / "sizes.tsv"
run:
...rule code....

rule merge_beds:
"""
Merge all sample bed.gz files into a single all_samples.bed for each genome.
"""
input:
sizes = lambda wc: get_genome_sizes(wc.genome_name),
bedgz = lambda wc: [get_bed_gz(s, wc.genome_name) for s in SAMPLES],
output:
merged = config["output_dir"] / "{genome_name}" / "all_samples.bed",
merged_gz = config["output_dir"] / "{genome_name}" / "all_samples.bed.gz",
merged_tbi = config["output_dir"] / "{genome_name}" / "all_samples.bed.gz.tbi"
threads: config["threads"]
shell:
...rule code....

################################################################################
# FIND MOTIFS (CHECKPOINT)
################################################################################

checkpoint find_motifs_contig:
input:
merged_gz = lambda wc: get_merged_bed_gz(wc.genome_name),
fasta = lambda wc: get_genome_fasta(wc.genome_name),
output:
out_file = config["output_dir"] / "{genome_name}" / "motifs"  / f"{config['coverage']}_{{contig}}_motifs.tsv"
threads: config["threads"]
params:
coverage = config["coverage"]
run:
...rule code....

checkpoint find_motifs_single:
input:
merged_gz = lambda wc: get_merged_bed_gz(wc.genome_name),
fasta = lambda wc: get_genome_fasta(wc.genome_name),
output:
out_file = config["output_dir"] / "{genome_name}" / "motifs" / f"{config['coverage']}_motifs.tsv"
threads: config["threads"]
params:
coverage = config["coverage"]
run:
...rule code....

################################################################################
# CREATE MOTIF LOCATION.BED
################################################################################

rule create_motif_location_bed_single:
"""
Create location.bed for a given motif (standard genomes).
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name),
output:
bed = config["output_dir"] / "{genome_name}" / "{motif}" / "location.bed"
threads: config["threads"]
params:
motif  = lambda wc: wc.motif,
modpos = lambda wc: get_motif_mod_pos(wc.genome_name, params.motif)
run:
...rule code....

rule create_motif_location_bed_contig:
"""
Create location.bed for a given motif in a metagenome (for a specific contig).
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name),
output:
bed = config["output_dir"] / "{genome_name}" / "{contig}" / "{motif}" / "location.bed"
threads: config["threads"]
params:
motif  = lambda wc: wc.motif,
modpos = lambda wc: get_motif_mod_pos(wc.genome_name, params.motif, contig=wc.contig)
run:
...rule code....

################################################################################
# MOTIF PILEUP PER SAMPLE
################################################################################

rule motif_pileup_single:
"""
For each motif, run modkit pileup with --motif for each sample,
optionally restricted to a specific contig if this is a metagenome.
"""
input:
bam = lambda wc: get_bam(wc.sample, wc.genome_name),
fasta = lambda wc: get_genome_fasta(wc.genome_name),
loc = lambda wc: get_location_bed(wc.genome_name, wc.motif, contig=None)
output:
bed = config["output_dir"] / "{genome_name}" / "{motif}" / "{sample}.bed"
threads: config["threads"]
params:
motif = lambda wc: wc.motif,
modpos = lambda wc: get_motif_mod_pos(wc.genome_name, params.motif, contig=None),
filter_threshold = config["filter_threshold"]
shell:
...rule code....

rule motif_pileup_contig:
"""
For each motif, run modkit pileup with --motif for each sample,
optionally restricted to a specific contig if this is a metagenome.
"""
input:
bam = lambda wc: get_bam(wc.sample, wc.genome_name),
fasta = lambda wc: get_genome_fasta(wc.genome_name),
loc = lambda wc: get_location_bed(wc.genome_name, wc.motif, contig=wc.contig)
output:
bed = config["output_dir"] / "{genome_name}" / "{contig}" / "{motif}" / "{sample}.bed"
threads: config["threads"]
params:
motif = lambda wc: wc.motif,
modpos = lambda wc: get_motif_mod_pos(wc.genome_name, params.motif, contig=wc.contig),
filter_threshold = config["filter_threshold"]
shell:
...rule code....

################################################################################
# DMR COMPARISONS
################################################################################

rule dmr_pair_single:
"""
Compare two treatments for DMR.  Then filter results with bedtools intersect.
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name),
loc = lambda wc: get_location_bed(wc.genome_name, wc.motif, contig=None)
# We gather sample bed gz from the top-level pipeline.
# But for the modkit dmr pair, we want the compressed all-sample bed from each treatment.
# Actually, we do pairwise combining of each sample bed in each treatment.
# We can gather them on the fly or inputfunction.
output:
dmr = config["output_dir"] / "{genome_name}" / "{motif}" / "dmr" / "{treatmentA}_vs_{treatmentB}.bed"
threads: config["threads"]
params:
coverage = config["coverage"]
run:
...rule code....

rule dmr_pair_contig:
"""
Compare two treatments for DMR. Then filter results with bedtools intersect.
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name),
loc = lambda wc: get_location_bed(wc.genome_name, wc.motif, contig=wc.contig)
output:
dmr = config["output_dir"] / "{genome_name}" / "{contig}" / "{motif}" / "dmr" / "{treatmentA}_vs_{treatmentB}.bed"
threads: config["threads"]
params:
coverage = config["coverage"]
run:
...rule code....

################################################################################
# RULE: ALL
################################################################################

rule all:
"""
Final target: we want everything done for all genomes, all motifs, all pairs, etc.
Using Snakemake's checkpoint and dynamic expansions, our final 'all' rule will be
the presence of all possible DMR files. But we'll rely on the checkpoint logic
to discover motifs.
"""
input:
[get_fai(g) for g in GENOMES],
[get_summary(s, g) for s in SAMPLES for g in GENOMES],
[get_sample_probs_dir(s, g) for s in SAMPLES for g in GENOMES],
[get_merged_bed_gz(g) for g in GENOMES],

# These could be collapsed to just get_dmr_paths, but provising the rest is more efficient
[path for genome in GENOMES for path in get_motif_paths(genome)],
# lambda wc: [get_location_beds(g) for g in GENOMES],
# lambda wc: [get_motif_pileups(g) for g in GENOMES],
lambda wc: [get_dmr_paths(g) for g in GENOMES]
default_target: True

ruleorder: index_genome > modkit_pileup > modkit_summary > modkit_sample_probs > compress_and_index_bed > compute_genome_sizes > merge_beds > find_motifs_single > create_motif_location_bed_single  > motif_pileup_single > dmr_pair_single > find_motifs_contig > create_motif_location_bed_contig > motif_pileup_contig > dmr_pair_contig

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post