-
Notifications
You must be signed in to change notification settings - Fork 2
Improve numeric arg handling and add dynamic worker spawning #56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
apply _forkrun_getVal to all user-provided numeric options so that any user-specified number can use a SI/IEC prefix
Fix _forkrun_getVal usage
Update forkrun.bash
make bash completions faster
Completely reworked the logic for how coprocs are dynamically spawned. Other various minor changes as well.
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
Update forkrun.bash
Update forkrun.bash
Reviewer's Guide by SourceryThis pull request enhances numeric argument handling in State diagram for dynamic worker spawningstateDiagram-v2
[*] --> Initial
Initial --> Monitoring: Start with nProcs workers
Monitoring --> SpawnWorker: System load < threshold AND
Queue depth < minimum
SpawnWorker --> Monitoring: Update worker count
Monitoring --> CheckLoad: Periodic check
CheckLoad --> SpawnWorker: Load allows more workers
CheckLoad --> Monitoring: Load too high
Monitoring --> [*]: Quit signal
note right of SpawnWorker
New workers added based on:
- CPU load
- Queue depth
- Current worker count
end note
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
makes a few variables local they had not yet been made local. fix minor mistakes in documentation made pulling file descriptor byte offset from procfs slightly more efficient
…of arguments that forkrun will process before returning.
add new `-n` flag into help
Add -n option to limit input lines
Fix option parsing if options have whitespace characters
use `head` to speed up `-n <#>` flag when possible + fix whitespace option parsing
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
@sourcery-ai review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jkool702 - I've reviewed your changes and they look great!
Here's what I looked at during the review
- 🟡 General issues: 3 issues found
- 🟢 Security: all looks good
- 🟢 Testing: all looks good
- 🟡 Complexity: 1 issue found
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
lseek_builtin/lseek.c
Outdated
// check for exactly 2 args passed to lseek | ||
if (argc != 3) { | ||
fprintf(stderr, "\nIncorrect number of arguments.\nUSAGE: lseek <FD> <REL_OFFSET>\n"); | ||
int whence = SEEK_CUR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Remove duplicate whence variable declaration to prevent undefined behavior
README.md
Outdated
1. the logic by which coprocs are dynamically spawned has been completely rewritten to improve performance and reliability when handling varying workloads. | ||
2. all numeric commandline arguments now accept standard prefixes (k=1000, ki=1024, M=1000000, etc.) | ||
3. the lseek loadable builtin has been recompiled and now can be used on x86_64 and aarch64 and riscv64 architectures | ||
4. **BREAKING CHANGE**: the `-n` flag (which previously added ordering infornmation to the output and implied `-k`) has been renamed to `-K`. The `-n` flag now implements a new feature - limiting the otal number of lines that `forkrun` will process. `... | forkrun -n <#> ...` is basically equivilant to `... | head -n <#> | forkrun ...` (except that, unlike `head`, `forkrun -n <#> -d _` allows for this functionality with delimiters other than NULLs and newlines). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Clarify the behavior of the -K flag.
The changelog mentions that -n
was renamed to -K
. Does -K
now provide the old functionality of -n
(adding ordering information and implying -k
) in addition to its new functionality? Please clarify.
@@ -8,7 +8,7 @@ forkrun() { | |||
# | |||
# USAGE: printf '%s\n' "${args[@]}" | forkrun [-flags] [--] parFunc ["${args0[@]}"] | |||
# | |||
# LIST OF FLAGS: [-j|-P [-]<#>[,<#>,<#>]] [-t <path>] ( [-l <#>] | [-L <#[,#]>]] ) ( [-b <#>] | [-B <#>[,<#>]] ) [-d <char>] [-u <fd>] [-i] [-I] [-k] [-n] [-z|-0] [-s] [-S] [-p] [-D] [-N] [-U] [-v] [-h|-?] | |||
# LIST OF FLAGS: [-j|-P [-]<#>[,<#>,<#>]] [-t <path>] ( [-l <#>] | [-L <#[,#]>]] ) [-n <#>] ( [-b <#>] | [-B <#>[,<#>]] ) [-d <char>] [-u <fd>] [-i] [-I] [-k] [-K] [-z|-0] [-s] [-S] [-p] [-D] [-N] [-U] [-v] [-h|-?] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider refactoring the dynamic worker spawning logic to separate worker management into a dedicated module and simplify the scaling algorithm using clearer thresholds.
The dynamic worker spawning implementation has grown overly complex. Consider refactoring to separate concerns:
- Extract worker management into a dedicated module:
# worker_manager.sh
manage_workers() {
local target_workers min_workers max_workers
local current_load queue_depth
while true; do
# Get metrics
current_load=$(get_system_load)
queue_depth=$(get_queue_depth)
# Simple scaling decision
if (( current_load < load_threshold && queue_depth > min_queue )); then
add_workers "$target_workers"
fi
sleep "$check_interval"
done
}
- Simplify the scaling algorithm:
calculate_target_workers() {
local load=$1
local queue=$2
# Simple linear scaling based on load
if (( load < 70 )); then
echo $(( current_workers + 1 ))
else
echo "$current_workers"
fi
}
This separates monitoring from control logic and uses simpler thresholds instead of complex weighted averages. The functionality remains the same but the code becomes more maintainable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with this is that spawning more threads doesnt guarantee increasing system load, and in those cases it would always result in spawning the max number even though far fewer were needed. For example
{ for nn in {1..1000}; do echo ${nn}; sleep 1; done; } | forkrun echo
clearly this doesnt need many workers, but your simplified logic would spawn the maximum number allowed for it and all but 1 would just sit by idling the whole time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, @jkool702, my suggested simplification was too naive and wouldn't handle I/O-bound tasks like your example efficiently. Spawning more workers wouldn't increase CPU load in such cases, leading to unnecessary worker creation. Thanks for pointing that out!
I've reconsidered the approach. Instead of solely relying on CPU load, we should incorporate a metric that reflects the actual work being done by the workers. One option is to track the number of active workers. We can then adjust the scaling logic to consider both CPU load and the number of currently active workers.
Here's a revised suggestion for the calculate_target_workers
function:
calculate_target_workers() {
local load=$1
local queue=$2
local active_workers=$(get_active_worker_count)
local max_workers=$(( $(nproc) * 2 )) # Example: Double the number of cores
# Scale up if load is low and not all potential workers are active
if (( load < 70 && active_workers < max_workers )); then
echo $(( current_workers + 1 ))
# Scale down if load is high and there are more than the minimum workers
elif (( load > 80 && current_workers > min_workers )); then
echo $(( current_workers - 1 ))
else
echo "$current_workers"
fi
}
This version introduces a few improvements:
- Active worker count:
get_active_worker_count
(which would need to be implemented) provides a measure of how many workers are actively processing tasks. This helps prevent over-spawning in I/O-bound scenarios. - Max workers: A
max_workers
limit is introduced (calculated based onnproc
in this example, but could be configurable). This prevents unbounded worker creation. - Scale down logic: The function now also includes logic to scale down the number of workers if the load is high and there are more than the minimum required workers.
This approach still maintains a degree of simplicity while addressing the issue you raised. It allows for more controlled scaling based on both system load and actual worker activity. What are your thoughts on this revised approach?
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
improve unit testing script
@sourcery-ai review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jkool702 - I've reviewed your changes - here's some feedback:
Overall Comments:
- The dynamic worker spawning logic has been significantly improved to better handle varying workloads by considering both CPU usage and queue depth metrics. This should provide more efficient parallelization.
- Breaking change: The -n flag has been renamed to -K for output ordering info. The new -n flag now limits total lines processed, consistent with Unix conventions like head -n.
Here's what I looked at during the review
- 🟡 General issues: 4 issues found
- 🟢 Security: all looks good
- 🟢 Testing: all looks good
- 🟡 Complexity: 1 issue found
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
`forkrun`'s usage and syntax are very similar to `xargs` and `parallel`. However, <sup>1</sup>. `forkrun`: | ||
|
||
* offers more features than `xargs` and is approximately equal in speed to the fastest `xargs` invocation<sup>2</sup>. (`forkrun` without any flags is functionally equivalent to `xargs -P $*(nproc) -d $'\n'`), | ||
* is considerably faster than `parallel` (over an order of magnitude faster in some cases) <sup>2</sup>, while still supporting many of the particularly useful "core" `parallel` features, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (typo): Typo: "equivilant" should be "equivalent"
**forkrun v1.5**: major changes include: | ||
1. the logic by which coprocs are dynamically spawned has been completely rewritten to improve performance and reliability when handling varying workloads. | ||
2. all numeric commandline arguments now accept standard prefixes (k=1000, ki=1024, M=1000000, etc.) | ||
3. the lseek loadable builtin has been recompiled and now can be used on x86_64 and aarch64 and riscv64 architectures |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (typo): Grammar: Use commas for better readability
"x86_64 and aarch64 and riscv64" could be improved to "x86_64, aarch64, and riscv64".
3. the lseek loadable builtin has been recompiled and now can be used on x86_64 and aarch64 and riscv64 architectures | |
3. the lseek loadable builtin has been recompiled and now can be used on x86_64, aarch64, and riscv64 architectures |
@@ -8,7 +8,7 @@ forkrun() { | |||
# | |||
# USAGE: printf '%s\n' "${args[@]}" | forkrun [-flags] [--] parFunc ["${args0[@]}"] | |||
# | |||
# LIST OF FLAGS: [-j|-P [-]<#>[,<#>,<#>]] [-t <path>] ( [-l <#>] | [-L <#[,#]>]] ) ( [-b <#>] | [-B <#>[,<#>]] ) [-d <char>] [-u <fd>] [-i] [-I] [-k] [-n] [-z|-0] [-s] [-S] [-p] [-D] [-N] [-U] [-v] [-h|-?] | |||
# LIST OF FLAGS: [-j|-P [-]<#>[,<#>,<#>]] [-t <path>] ( [-l <#>] | [-L <#[,#]>]] ) [-n <#>] ( [-b <#>] | [-B <#>[,<#>]] ) [-d <char>] [-u <fd>] [-i] [-I] [-k] [-K] [-z|-0] [-s] [-S] [-p] [-D] [-N] [-U] [-v] [-h|-?] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider extracting the worker spawn count calculation into a separate function to simplify the main loop and improve readability.
The dynamic worker spawning logic could be simplified by extracting the spawn decision into a separate function. This would improve maintainability while preserving the core functionality. Here's a suggested refactoring:
_forkrun_calc_spawn_count() {
local -i pLoad=$1 pLoadMax=$2 kkProcs=$3 nProcsMax=$4 pAdd
# Calculate base number of workers to add based on system load
pAdd=$(( ( pLoadMax - pLoad ) / pLoad1 ))
# Apply limits
(( pAdd < 1 )) && pAdd=0
(( pAdd > (nProcsMax - kkProcs) )) && pAdd=$(( nProcsMax - kkProcs ))
# Scale down as we approach max workers
pAdd=$(( ( ( 4 * nProcsMax ) - ( 3 * kkProcs ) ) * pAdd / ( ( 8 * nProcsMax ) + ( 3 * kkProcs ) ) ))
echo "$pAdd"
}
Then simplify the main loop:
while ! [[ -f "${tmpDir}"/.quit ]] && (( kkProcs < nProcsMax )); do
# Get current load metrics
mapfile -t pLOADA < <(_forkrun_get_load "${pLOADA0[@]}")
# Skip if system load too high
(( pLOADA > pLoad_max )) && continue
# Calculate number of workers to add
pAdd=$(_forkrun_calc_spawn_count "${pLOADA}" "${pLoad_max}" "${kkProcs}" "${nProcsMax}")
# Spawn new workers if needed
if (( pAdd > 0 )); then
spawn_workers "${pAdd}" "${coprocSrcCode}"
kkProcs+=pAdd
echo "${kkProcs}" >"${tmpDir}"/.nWorkers
fi
done
This refactoring:
- Extracts spawn count calculation into a focused function
- Simplifies the main loop flow
- Makes the code more maintainable
- Preserves the core dynamic spawning behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, let me explain a bit of why the "spawning new worker coprocs" decision tree is so complex.
The decision is really just based on 2 main ideas:
- if "time for N lines to arrive on stdin" is equal to or less than "time for a coproc to process N lines * number of worker coprocs" then dont spawn more workers, since trhey will just sit idle
- spawn workers in groups until you hit the max worker count or hit maximum achievable system load. determine group size based on an estimate of "load added per worker" and "distance betweeen current load and max attainable load"
complications with #1 are that thetime to process N lines depends on the batch size (e.g., calling sha1sum once and passing it 1000 filnames is much faster than calling ti 1000 times and passing it 1 filename each time).. So, the "average time to process N lines" only can use timing data for a specific batch size of N lines.
complications with #2 are that the "maximum attainable load" at the maximum coproc worker count will not always be near 100%. For example, calling
printf '1%.0s\n' {1..1000} | forkrun -j 100 sleep
on my system only took up, on average, 10% of a single cpu to run.
So, forkrun dynamically changes the "target max load" parameter. In particular, if spawning another group of coprocs doesnt actually increase system load, the target max load is lowered considerably. If the target max load was previously lowered and spawning a group of coprocs once again does increase total system load, the target max load is slightly raised.
Do you have any suggestions for simplifying the code while still addressing these complications?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining the complexities of the worker spawning logic. I understand the challenges involved in estimating processing time and managing system load dynamically. My initial suggestion for extracting the spawn count calculation was aimed at improving readability and maintainability, but I see now that it doesn't fully address the dynamic nature of the pLoad_max
adjustments.
Given the intricacies you've described, a simple extraction might not be sufficient. Instead of a single function, consider a more structured approach: encapsulate the entire worker spawning and load management logic within a dedicated class or module. This would allow you to:
-
Abstract the complexity: Hide the internal workings of load calculation,
pLoad_max
adjustments, and worker spawning within the class/module. This will make the main loop cleaner and easier to follow. -
Improve testability: A separate class/module can be tested independently, ensuring the complex logic works correctly under various scenarios. You can mock system load and timing data to simulate different conditions.
-
Manage state effectively: The class/module can maintain internal state for
pLoad_max
, previous load measurements, and other relevant variables, making the logic more self-contained and less reliant on global variables. -
Introduce more sophisticated algorithms: With a dedicated structure, you can potentially implement more advanced load balancing and worker spawning algorithms without cluttering the main loop. For example, you could explore using a feedback loop based on actual processing times to dynamically adjust the number of workers.
Here's a conceptual outline of how a class-based approach might look (using pseudo-code):
class WorkerSpawner:
def __init__(self, max_workers, initial_load_target):
# Initialize internal state
def update_load_target(self, current_load, num_workers):
# Implement logic to adjust pLoad_max
def calculate_spawn_count(self, current_load):
# Calculate number of workers to spawn based on current load and pLoad_max
def spawn_workers(self, count):
# Spawn the specified number of workers
# In the main loop:
spawner = WorkerSpawner(max_workers, initial_load_target)
while ...:
current_load = get_current_load()
spawner.update_load_target(current_load, current_worker_count)
spawn_count = spawner.calculate_spawn_count(current_load)
spawner.spawn_workers(spawn_count)
...
This approach would require more refactoring, but it would significantly improve the organization, readability, and testability of the dynamic worker spawning logic. Let me know what you think of this approach, and I'm happy to discuss specific implementation details further.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remember that this is written in bash, not in C. There arent native class definitions. That said...
encapsulate the entire worker spawning and load management logic within a dedicated class or module
It sort of already is. It isnt a "class" or a "module", but the entirety of the code for spawning workers is in its own coproc (pQueue) which runs in parallel with the main loop and which is only spawned when dynamic coproc spawning is enabled (via a cmdline option).
Regarding "introducing more sophisticated algorithms": the example i gave about running sleep with forkrun made me realize that I shoukd probably keep track of time to run N lines / num workers
(i.e., the average time between 2 coprocs reading new data) and make sure that every time new workers are spawned that ratio is actually decreasing. This is tied to "how close we are to full system load" for cpu-bound tasks, but not necessarily for I/O bound tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, my apologies! I got carried away with the class analogy, forgetting we're working within bash's constraints. Thanks for reminding me. Using a separate coproc (pQueue) is a clever way to encapsulate the worker spawning logic within bash.
Your idea of tracking time to run N lines / num workers
and ensuring it decreases when spawning new workers is excellent. This directly addresses the issue of I/O-bound tasks not necessarily reflecting load in CPU usage. It adds a more direct performance measurement into the feedback loop.
To integrate this new metric, you could modify the pQueue coproc to:
- Measure and store: Record the time taken to process N lines before and after spawning a new group of workers.
- Calculate the ratio: Compute
time to run N lines / num workers
for both measurements. - Adjust spawning behavior: If the ratio hasn't decreased (or hasn't decreased sufficiently), adjust the spawning strategy. This might involve reducing the number of workers in the next group, pausing spawning for a longer period, or even killing some existing workers if the ratio has increased significantly. You could also use this information to dynamically adjust
pLoad_max
as you mentioned earlier.
Since the pQueue coproc already handles the spawning logic, adding this timing-based feedback mechanism within it would be a natural extension. It keeps the related logic together and avoids cluttering the main loop.
Would you like me to help you brainstorm specific bash commands or logic for implementing this timing and ratio calculation within the pQueue coproc? I can also offer suggestions on how to best integrate this new feedback mechanism into your existing spawning logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think ill implement this by replacing some of the existing scaling/reduction code for pAdd
with
# t0 is avg time to run N lines with k0 worker coprocs
# t1 is avg time to run N lines with k1 worker coprocs
# k1 > k0
pAdd = $(( ( pAdd * ( t0 * k1 - t1 * k0 ) ) / ( t0 * ( k1 - k0 ) ) ))
(( pAdd < 0 )) && pAdd=0
This makes it so that if adding more workers didnt increase avg worker runtime (t1 == t0
) then there is no reduction in pAdd
. If time increased proportionally to number of new coprocs (k1 - k0), meaning that the
time to run N lines / num workersratio was the same with k0 and k1 workers, then
pAdd` goes down to 0 and no new workers get added.
Note: this is equivilant to
# r0 = t0 / k0
# r1 = t1 / k1
pAdd = $(( pAdd * ( 1 - r1/r0 ) / ( 1 - k0 / k1 ) ))
# k0/k1 is effectively r1/r0 in ideal circumstances where increasing worker count does not change average runtime per worker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a clever approach! Using the ratio of execution times to directly adjust pAdd
is elegant and avoids the need for separate state variables. Your formula effectively captures the desired behavior: no reduction if adding workers doesn't impact runtime, and complete halting of spawning if the ratio remains constant.
A few minor suggestions:
- Clarity: Add a comment explaining the logic behind the formula. While the derivation is clear to you now, it might be less obvious to someone else (or even to you in the future). Something like:
# Adjust pAdd based on the change in per-worker runtime.
# If adding workers doesn't improve throughput (t1*k0 >= t0*k1), reduce pAdd proportionally.
pAdd = $(( ( pAdd * ( t0 * k1 - t1 * k0 ) ) / ( t0 * ( k1 - k0 ) ) ))
- Division by zero: While unlikely,
t0
or(k1 - k0)
could theoretically be zero. Adding a check to prevent division by zero would make the code more robust:
if (( t0 != 0 && k1 != k0 )); then
pAdd=$(( ( pAdd * ( t0 * k1 - t1 * k0 ) ) / ( t0 * ( k1 - k0 ) ) ))
fi
- Overflow: For very large values of
pAdd
,t0
, ork1
, the intermediate calculations could potentially overflow. Consider usingbc
for arbitrary precision arithmetic if this is a concern. This might add a slight performance overhead, but it would guarantee correctness for extreme cases. For example:
pAdd=$(echo "scale=0; ($pAdd * ($t0 * $k1 - $t1 * $k0)) / ($t0 * ($k1 - $k0))" | bc)
- Testing: With this new logic, it's crucial to test thoroughly. Try to simulate scenarios where:
- Adding workers improves throughput significantly.
- Adding workers has little or no impact.
- Adding workers actually degrades performance (due to excessive overhead).
By testing these edge cases, you can ensure the formula behaves as expected and fine-tune the parameters if necessary.
Overall, this looks like a promising improvement. Let me know if you have any further questions or would like to discuss testing strategies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regarding your suggestions:
-
agreed. ill add comments explaining the logic
-
division by 0 shoukd be impossible here. k1 is guaranteed to be more than k0 (else that loop iteration would have already
continue
d and skipped this calculation). t0 is measured in microseconds (time difference between 2$EPOCHREALTIME
calls), and just checking the time takes more than a microsecond. But a quick check that t0 is nonzero couldnt hurt. -
i dont think i have to worry too much about overflow, so long as this isnt run on a supercomputer where
k0
might be a million.For running on a typical PC i dont seepAdd
nork0
/k1
being more than 100 or so, and for thatr to overflow the average runtimet0
would be longer than a human lifetime.
Ive worked hard to minimize forkrun's dependency list and Id rather not add bc
to it. I think it is more likely someone wuithout bc
will want to run forkrun than it is someone running forkrun on a supercomputer.
- Agreed. Ill run it through some real-world tests.
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
Summary by Sourcery
Add dynamic worker spawning and improve numeric argument handling.
Enhancements:
Tests: