Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions ecoli/processes/transcript_elongation.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,27 +62,28 @@ class TranscriptElongation(PartitionedProcess):

defaults:
- rnaPolymeraseElongationRateDict (dict): Array with elongation rate
set points for different media environments.
set points for different media environments.
- rnaIds (array[str]) : array of names for each TU
- rnaLengths (array[int]) : array of lengths for each TU
(in nucleotides?)
(in nucleotides?)
- rnaSequences (2D array[int]) : Array with the nucleotide sequences
of each TU. This is in the form of a 2D array where each row is a
TU, and each column is a position in the TU's sequence. Nucleotides
are stored as an index {0, 1, 2, 3}, and the row is padded with
-1's on the right to indicate where the sequence ends.
of each TU. This is in the form of a 2D array where each row is a
TU, and each column is a position in the TU's sequence. Nucleotides
are stored as an index {0, 1, 2, 3}, and the row is padded with
-1's on the right to indicate where the sequence ends.
- ntWeights (array[float]): Array of nucleotide weights
- endWeight (array[float]): ???,
- endWeight (array[float]): Additional mass added when an RNA
transcript is completed (termination/end-group mass adjustment),
- replichore_lengths (array[int]): lengths of replichores
(in nucleotides?),
(in nucleotides?),
- is_mRNA (array[bool]): Mask for mRNAs
- ppi (str): ID of PPI
- inactive_RNAP (str): ID of inactive RNAP
- ntp_ids list[str]: IDs of ntp's (A, C, G, U)
- variable_elongation (bool): Whether to use variable elongation.
False by default.
False by default.
- make_elongation_rates: Function to make elongation rates, of the
form: lambda random, rates, timestep, variable: rates
form: lambda random, rates, timestep, variable: rates
"""

name = NAME
Expand Down Expand Up @@ -151,7 +152,6 @@ def __init__(self, parameters=None):
self.make_elongation_rates = self.parameters["make_elongation_rates"]

self.polymerized_ntps = self.parameters["polymerized_ntps"]
self.charged_trna_names = self.parameters["charged_trnas"]

# Attenuation
self.trna_attenuation = self.parameters["trna_attenuation"]
Expand Down
21 changes: 18 additions & 3 deletions runscripts/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,16 @@ def build_query_strings(
var_id = row[var_id_idx]
id_to_variants[data_id].add((exp_id, var_id))

# Extract lower-level filter conditions (columns not in id_cols) so
# they are still applied in the per-subset SQL passed to analysis scripts.
lower_level_parts = []
if duckdb_filter:
for condition in duckdb_filter.split(" AND "):
col_name = condition.strip().split(" ")[0]
if col_name not in id_cols:
lower_level_parts.append(condition.strip())
lower_level_filter = " AND ".join(lower_level_parts)

for data_id, variant_set in id_to_variants.items():
data_filters = []
curr_outdir = os.path.abspath(outdir)
Expand All @@ -350,10 +360,15 @@ def build_query_strings(
data_filters.append(f"{col}={col_val}")
os.makedirs(curr_outdir, exist_ok=True)
data_filters = " AND ".join(data_filters)
full_filter = (
f"{data_filters} AND {lower_level_filter}"
if lower_level_filter
else data_filters
)
query_strings[data_filters] = (
f"SELECT * FROM ({history_sql}) WHERE {data_filters}",
f"SELECT * FROM ({config_sql}) WHERE {data_filters}",
f"SELECT * FROM ({success_sql}) WHERE {data_filters}",
f"SELECT * FROM ({history_sql}) WHERE {full_filter}",
f"SELECT * FROM ({config_sql}) WHERE {full_filter}",
f"SELECT * FROM ({success_sql}) WHERE {full_filter}",
curr_outdir,
variant_set,
)
Expand Down
4 changes: 3 additions & 1 deletion runscripts/container/Singularity
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ From: ghcr.io/astral-sh/uv@sha256:2702577da32d9c3d55c8073e07d4476cd7402c085efbf7
tar -xf /repo.tar -C /vEcoli
rm /repo.tar
fi
apt-get update && apt-get install -y gcc procps nano curl g++
# No need for ca-certificates and unzip since not installing AWS CLI
apt-get update && apt-get install -y gcc procps nano curl g++ \
&& apt-get clean && rm -rf /var/lib/apt/lists/*
cd /vEcoli
# Source shared uv environment variables
. /vEcoli/singularity-env.sh
Expand Down
157 changes: 44 additions & 113 deletions runscripts/nextflow/config.template
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ params {
proc.waitFor()
return proc.text.trim()
} catch (Exception e) {
return ''
System.err.println("WARNING: Failed to read gcloud compute/region: ${e.message}")
return 'GCLOUD_CONFIG_ERROR'
}
}()
google_project = {
Expand All @@ -35,7 +36,8 @@ params {
proc.waitFor()
return proc.text.trim()
} catch (Exception e) {
return ''
System.err.println("WARNING: Failed to read gcloud project: ${e.message}")
return 'GCLOUD_CONFIG_ERROR'
}
}()
}
Expand All @@ -47,46 +49,43 @@ trace.sep = ','
trace.fields = 'name,native_id,status,submit,start,complete,duration,realtime,exit,%cpu,%mem,rss,peak_rss,error_action,attempt,cpu_model,workdir'
trace.file = "trace--${params.experimentId}--" + new java.text.SimpleDateFormat("yyyy-MM-dd--HH-mm-ss").format(new Date()) + ".csv"

def scaledMemory = { task, baseMem ->
if ( task.exitStatus == 137 ) {
1.GB * baseMem * task.attempt
} else {
// Do not scale further if mem was not issue last time
// Using task.attempts - 1 is inexact if a process experiences
// multiple failures. For example, a process can fail due to OOM
// (137), fail its first retry due to pre-emption (50001), then
// fail its second retry due to OOM (137). On its third attempt, it
// will get memory equivalent to if it had failed from OOM three
// times instead of twice. I feel it is better to overallocate than
// underallocate. Additionally, if users are seeing retries due to
// resource underallocation, they should just bump the base resource
// requests in their config files.
1.GB * baseMem * Math.max(1, task.attempt - 1)
}
}

def scaleTimeForRetry = { task, baseTime ->
if ( task.exitStatus == 140 ) {
1.h * baseTime * task.attempt
} else {
// Do not scale further if time was not issue last time
1.h * baseTime * Math.max(1, task.attempt - 1)
}
}

process {
withLabel: parca {
cpus = params.parca_cpus
memory = {
if ( task.exitStatus == 137 ) {
1.GB * params.parca_mem * task.attempt
} else {
// Do not scale further if mem was not issue last time
// Using task.attempts - 1 is inexact if a process experiences
// multiple failures. For example, a process can fail due to OOM
// (137), fail its first retry due to pre-emption (50001), then
// fail its second retry due to OOM (137). On its third attempt, it
// will get memory equivalent to if it had failed from OOM three
// times instead of twice. I feel it is better to overallocate than
// underallocate. Additionally, if users are seeing retries due to
// resource underallocation, they should just bump the base resource
// requests in their config files.
1.GB * params.parca_mem * Math.max(1, task.attempt - 1)
}
}
memory = { scaledMemory(task, params.parca_mem) }
}
withLabel: analysis {
cpus = params.analysis_cpus
memory = {
if ( task.exitStatus == 137 ) {
1.GB * params.analysis_mem * task.attempt
} else {
// Do not scale further if mem was not issue last time
1.GB * params.analysis_mem * Math.max(1, task.attempt - 1)
}
}
}
memory = {
if ( task.exitStatus == 137 ) {
1.GB * params.sim_mem * task.attempt
} else {
// Do not scale further if mem was not issue last time
1.GB * params.sim_mem * Math.max(1, task.attempt - 1)
}
memory = { scaledMemory(task, params.analysis_mem) }
}
memory = { scaledMemory(task, params.sim_mem) }
maxRetries = 3
cpus = params.sim_cpus
}
Expand Down Expand Up @@ -156,54 +155,20 @@ profiles {
// required to make time directives only apply to SLURM profiles
withLabel: parca {
cpus = params.parca_cpus
memory = {
if ( task.exitStatus == 137 ) {
1.GB * params.parca_mem * task.attempt
} else {
1.GB * params.parca_mem * Math.max(1, task.attempt - 1)
}
}
time = {
if ( task.exitStatus == 140 ) {
1.h * params.parca_time * task.attempt
} else {
// Do not scale further if time was not issue last time
1.h * params.parca_time * Math.max(1, task.attempt - 1)
}
}
memory = { scaledMemory(task, params.parca_mem) }
time = { scaleTimeForRetry(task, params.parca_time) }
}
withLabel: analysis {
cpus = params.analysis_cpus
memory = {
if ( task.exitStatus == 137 ) {
1.GB * params.analysis_mem * task.attempt
} else {
// Do not scale further if mem was not issue last time
1.GB * params.analysis_mem * Math.max(1, task.attempt - 1)
}
}
time = {
if ( task.exitStatus == 140 ) {
1.h * params.analysis_time * task.attempt
} else {
// Do not scale further if time was not issue last time
1.h * params.analysis_time * Math.max(1, task.attempt - 1)
}
}
memory = { scaledMemory(task, params.analysis_mem) }
time = { scaleTimeForRetry(task, params.analysis_time) }
}
container = params.container_image
containerOptions = "-B ${params.publishDir}:${params.publishDir} -B ${launchDir}:${launchDir} ${params.slurm_env}"
queue = params.queue
clusterOptions = params.cluster_options
executor = 'slurm'
time = {
if ( task.exitStatus == 140 ) {
1.h * params.sim_time * task.attempt
} else {
// Do not scale further if time was not issue last time
1.h * params.sim_time * Math.max(1, task.attempt - 1)
}
}
time = { scaleTimeForRetry(task, params.sim_time) }
errorStrategy = {
// Codes: 137 (OOM), 140 (SLURM job limits), 143 (SLURM preemption)
// Default value for exitStatus is max integer value, this
Expand Down Expand Up @@ -236,24 +201,11 @@ profiles {
withLabel: parca {
// Nextflow does not merge withLabel directives so must repeat
cpus = params.parca_cpus
memory = {
if ( task.exitStatus == 137 ) {
1.GB * params.parca_mem * task.attempt
} else {
1.GB * params.parca_mem * Math.max(1, task.attempt - 1)
}
}
memory = { scaledMemory(task, params.parca_mem) }
executor = 'slurm'
queue = params.queue
clusterOptions = params.cluster_options
time = {
if ( task.exitStatus == 140 ) {
1.h * params.parca_time * task.attempt
} else {
// Do not scale further if time was not issue last time
1.h * params.parca_time * Math.max(1, task.attempt - 1)
}
}
time = { scaleTimeForRetry(task, params.parca_time) }
}
// Creating variants happens before the HyperQueue workers
// are allocated and requires a separate SLURM job
Expand All @@ -266,22 +218,8 @@ profiles {
// stalling sims and allow custom resource requests
withLabel: analysis {
cpus = params.analysis_cpus
memory = {
if ( task.exitStatus == 137 ) {
1.GB * params.analysis_mem * task.attempt
} else {
// Do not scale further if mem was not issue last time
1.GB * params.analysis_mem * Math.max(1, task.attempt - 1)
}
}
time = {
if ( task.exitStatus == 140 ) {
1.h * params.analysis_time * task.attempt
} else {
// Do not scale further if time was not issue last time
1.h * params.analysis_time * Math.max(1, task.attempt - 1)
}
}
memory = { scaledMemory(task, params.analysis_mem) }
time = { scaleTimeForRetry(task, params.analysis_time) }
executor = 'slurm'
queue = params.queue
clusterOptions = params.cluster_options
Expand All @@ -291,14 +229,7 @@ profiles {
containerOptions = "-B ${params.publishDir}:${params.publishDir} -B ${launchDir}:${launchDir} ${params.slurm_env}"
// Use Nextflow's retry logic instead of HyperQueue's built-in logic
clusterOptions = '--crash-limit=1'
time = {
if ( task.exitStatus == 140 ) {
1.h * params.sim_time * task.attempt
} else {
// Do not scale further if time was not issue last time
1.h * params.sim_time * Math.max(1, task.attempt - 1)
}
}
time = { scaleTimeForRetry(task, params.sim_time) }
errorStrategy = {
((task.exitStatus in [137, 140, 143, Integer.MAX_VALUE])
&& (task.attempt <= task.maxRetries)) ? 'retry' : 'ignore'
Expand Down
Loading
Loading