Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions release-notes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Release notes:

Unreleased
- adds TaskSeq.foldWhile and TaskSeq.foldWhileAsync: fold with early termination via a (state, element) -> bool predicate (takeWhile-style, exclusive). When the predicate returns false, iteration halts without folding that element; no further elements are enumerated.
- test: add SideEffects module and ImmTaskSeq variant tests to TaskSeq.ChunkBy.Tests.fs, improving coverage for chunkBy and chunkByAsync
- fixes: `Async.bind` signature corrected from `(Async<'T> -> Async<'U>)` to `('T -> Async<'U>)` to match standard monadic bind semantics (same as `Task.bind`); the previous signature made the function effectively equivalent to direct application
- refactor: simplify splitAt 'rest' taskSeq to use while!, removing redundant go2 mutable and manual MoveNextAsync pre-advance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<Compile Include="TaskSeq.FindIndex.Tests.fs" />
<Compile Include="TaskSeq.Find.Tests.fs" />
<Compile Include="TaskSeq.Fold.Tests.fs" />
<Compile Include="TaskSeq.FoldWhile.Tests.fs" />
<Compile Include="TaskSeq.Scan.Tests.fs" />
<Compile Include="TaskSeq.MapFold.Tests.fs" />
<Compile Include="TaskSeq.Reduce.Tests.fs" />
Expand Down
258 changes: 258 additions & 0 deletions src/FSharp.Control.TaskSeq.Test/TaskSeq.FoldWhile.Tests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
module TaskSeq.Tests.FoldWhile

open Xunit
open FsUnit.Xunit

open FSharp.Control

//
// TaskSeq.foldWhile
// TaskSeq.foldWhileAsync
//
// Semantics match TaskSeq.takeWhile: the predicate is evaluated against (state, element)
// before that element is folded in. When the predicate returns false, iteration halts
// without folding that element, and no further elements are enumerated.
//

module EmptySeq =
[<Fact>]
let ``Null source is invalid`` () =
assertNullArg
<| fun () -> TaskSeq.foldWhile (fun _ _ -> true) (fun _ item -> item + 1) 0 null

assertNullArg
<| fun () -> TaskSeq.foldWhileAsync (fun _ _ -> Task.fromResult true) (fun _ item -> Task.fromResult (item + 1)) 0 null

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-foldWhile returns initial state when empty`` variant = task {
let! result =
Gen.getEmptyVariant variant
|> TaskSeq.foldWhile (fun _ _ -> true) (fun _ item -> item + 1) -1

result |> should equal -1
}

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-foldWhileAsync returns initial state when empty`` variant = task {
let! result =
Gen.getEmptyVariant variant
|> TaskSeq.foldWhileAsync (fun _ _ -> Task.fromResult true) (fun _ item -> Task.fromResult (item + 1)) -1

result |> should equal -1
}

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-foldWhile does not call predicate or folder when empty`` variant = task {
let mutable predicateCalled = false
let mutable folderCalled = false

let! _ =
Gen.getEmptyVariant variant
|> TaskSeq.foldWhile
(fun _ _ ->
predicateCalled <- true
true)
(fun state _ ->
folderCalled <- true
state)
0

predicateCalled |> should be False
folderCalled |> should be False
}

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-foldWhileAsync does not call predicate or folder when empty`` variant = task {
let mutable predicateCalled = false
let mutable folderCalled = false

let! _ =
Gen.getEmptyVariant variant
|> TaskSeq.foldWhileAsync
(fun _ _ -> task {
predicateCalled <- true
return true
})
(fun state _ -> task {
folderCalled <- true
return state
})
0

predicateCalled |> should be False
folderCalled |> should be False
}

module Functionality =
[<Fact>]
let ``TaskSeq-foldWhile with always-true predicate behaves like fold`` () = task {
let! result =
TaskSeq.ofList [ 1; 2; 3; 4; 5 ]
|> TaskSeq.foldWhile (fun _ _ -> true) (fun acc item -> acc + item) 0

result |> should equal 15
}

[<Fact>]
let ``TaskSeq-foldWhileAsync with always-true predicate behaves like foldAsync`` () = task {
let! result =
TaskSeq.ofList [ 1; 2; 3; 4; 5 ]
|> TaskSeq.foldWhileAsync (fun _ _ -> Task.fromResult true) (fun acc item -> Task.fromResult (acc + item)) 0

result |> should equal 15
}

[<Fact>]
let ``TaskSeq-foldWhile is left-associative like fold`` () = task {
let! result =
TaskSeq.ofList [ "b"; "c"; "d" ]
|> TaskSeq.foldWhile (fun _ _ -> true) (fun acc item -> acc + item) "a"

result |> should equal "abcd"
}

[<Fact>]
let ``TaskSeq-foldWhileAsync is left-associative like foldAsync`` () = task {
let! result =
TaskSeq.ofList [ "b"; "c"; "d" ]
|> TaskSeq.foldWhileAsync (fun _ _ -> Task.fromResult true) (fun acc item -> Task.fromResult (acc + item)) "a"

result |> should equal "abcd"
}

module Halt =
[<Fact>]
let ``TaskSeq-foldWhile stops immediately when predicate is false on first element`` () = task {
let mutable predicateCalls = 0
let mutable folderCalls = 0

let! result =
TaskSeq.ofList [ 1; 2; 3; 4; 5 ]
|> TaskSeq.foldWhile
(fun _ _ ->
predicateCalls <- predicateCalls + 1
false)
(fun _ item ->
folderCalls <- folderCalls + 1
item)
0

result |> should equal 0
predicateCalls |> should equal 1
folderCalls |> should equal 0
}

[<Fact>]
let ``TaskSeq-foldWhileAsync stops immediately when predicate is false on first element`` () = task {
let mutable predicateCalls = 0
let mutable folderCalls = 0

let! result =
TaskSeq.ofList [ 1; 2; 3; 4; 5 ]
|> TaskSeq.foldWhileAsync
(fun _ _ -> task {
predicateCalls <- predicateCalls + 1
return false
})
(fun _ item -> task {
folderCalls <- folderCalls + 1
return item
})
0

result |> should equal 0
predicateCalls |> should equal 1
folderCalls |> should equal 0
}

[<Fact>]
let ``TaskSeq-foldWhile halts mid-sequence without folding the halting element`` () = task {
// Sum while adding the next element would keep the total <= 5. Once adding
// the element would overshoot, stop — that element is NOT folded in.
let mutable predicateCalls = 0
let mutable folderCalls = 0

let! result =
TaskSeq.ofList [ 1; 2; 3; 4; 5 ]
|> TaskSeq.foldWhile
(fun acc item ->
predicateCalls <- predicateCalls + 1
acc + item <= 5)
(fun acc item ->
folderCalls <- folderCalls + 1
acc + item)
0

// 1 (ok, total 1), 2 (ok, total 3), 3 (would make 6 > 5, stop)
result |> should equal 3
predicateCalls |> should equal 3
folderCalls |> should equal 2
}

[<Fact>]
let ``TaskSeq-foldWhileAsync halts mid-sequence without folding the halting element`` () = task {
let mutable predicateCalls = 0
let mutable folderCalls = 0

let! result =
TaskSeq.ofList [ 1; 2; 3; 4; 5 ]
|> TaskSeq.foldWhileAsync
(fun acc item -> task {
predicateCalls <- predicateCalls + 1
return acc + item <= 5
})
(fun acc item -> task {
folderCalls <- folderCalls + 1
return acc + item
})
0

result |> should equal 3
predicateCalls |> should equal 3
folderCalls |> should equal 2
}

[<Fact>]
let ``TaskSeq-foldWhile does not enumerate past the halting element`` () = task {
// Source has a side effect per pulled element; halt on the 3rd pull.
let mutable pulled = 0

let source = taskSeq {
for i in 1..5 do
pulled <- pulled + 1
yield i
}

let! _ =
source
|> TaskSeq.foldWhile (fun _ item -> item < 3) (fun acc item -> acc + item) 0

// Pull 1 (ok), pull 2 (ok), pull 3 (predicate false, stop) — must not pull 4.
pulled |> should equal 3
}

[<Fact>]
let ``TaskSeq-foldWhileAsync does not enumerate past the halting element`` () = task {
let mutable pulled = 0

let source = taskSeq {
for i in 1..5 do
pulled <- pulled + 1
yield i
}

let! _ =
source
|> TaskSeq.foldWhileAsync (fun _ item -> Task.fromResult (item < 3)) (fun acc item -> Task.fromResult (acc + item)) 0

pulled |> should equal 3
}

[<Fact>]
let ``TaskSeq-foldWhile that never halts is equivalent to fold`` () = task {
let! result =
TaskSeq.ofList [ 1; 2; 3; 4; 5 ]
|> TaskSeq.foldWhile (fun _ item -> item <= 10) (fun acc item -> acc + item) 0

result |> should equal 15
}
4 changes: 4 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,10 @@ type TaskSeq private () =
static member compareWithAsync comparer source1 source2 = Internal.compareWithAsync comparer source1 source2
static member fold folder state source = Internal.fold (FolderAction folder) state source
static member foldAsync folder state source = Internal.fold (AsyncFolderAction folder) state source
static member foldWhile predicate folder state source = Internal.foldWhile predicate folder state source

static member foldWhileAsync predicate folder state source = Internal.foldWhileAsync predicate folder state source

static member scan folder state source = Internal.scan (FolderAction folder) state source
static member scanAsync folder state source = Internal.scan (AsyncFolderAction folder) state source
static member reduce folder source = Internal.reduce (FolderAction folder) source
Expand Down
45 changes: 45 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -1865,6 +1865,51 @@ type TaskSeq =
static member foldAsync:
folder: ('State -> 'T -> #Task<'State>) -> state: 'State -> source: TaskSeq<'T> -> Task<'State>

/// <summary>
/// Applies the function <paramref name="folder" /> to each element in the task sequence, threading an
/// accumulator of type <paramref name="'State" /> through the computation, for as long as
/// <paramref name="predicate" /> returns <c>true</c>. The predicate is evaluated against the current
/// state and next element before that element is folded in; once it returns <c>false</c> the element
/// is not folded, iteration stops, and no further elements of the input are enumerated.
/// If either function is asynchronous, consider using <see cref="TaskSeq.foldWhileAsync" />.
/// </summary>
///
/// <param name="predicate">A function that, given the current state and next element, returns <c>true</c> to keep folding or <c>false</c> to stop.</param>
/// <param name="folder">A function that updates the state with each element from the sequence.</param>
/// <param name="state">The initial state.</param>
/// <param name="source">The input sequence.</param>
/// <returns>The state object after iteration halted, or after the whole sequence was consumed.</returns>
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
static member foldWhile:
predicate: ('State -> 'T -> bool) ->
folder: ('State -> 'T -> 'State) ->
state: 'State ->
source: TaskSeq<'T> ->
Task<'State>

/// <summary>
/// Applies the asynchronous function <paramref name="folder" /> to each element in the task sequence,
/// threading an accumulator of type <paramref name="'State" /> through the computation, for as long as
/// the asynchronous <paramref name="predicate" /> returns <c>true</c>. The predicate is evaluated
/// against the current state and next element before that element is folded in; once it returns
/// <c>false</c> the element is not folded, iteration stops, and no further elements of the input are
/// enumerated.
/// If both functions are synchronous, consider using <see cref="TaskSeq.foldWhile" />.
/// </summary>
///
/// <param name="predicate">An async function that, given the current state and next element, returns <c>true</c> to keep folding or <c>false</c> to stop.</param>
/// <param name="folder">An async function that updates the state with each element from the sequence.</param>
/// <param name="state">The initial state.</param>
/// <param name="source">The input sequence.</param>
/// <returns>The state object after iteration halted, or after the whole sequence was consumed.</returns>
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
static member foldWhileAsync:
predicate: ('State -> 'T -> #Task<bool>) ->
folder: ('State -> 'T -> #Task<'State>) ->
state: 'State ->
source: TaskSeq<'T> ->
Task<'State>

/// <summary>
/// Like <see cref="TaskSeq.fold" />, but returns the sequence of intermediate results and the final result.
/// The first element of the output sequence is always the initial state. If the input task sequence
Expand Down
47 changes: 47 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeqInternal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,53 @@ module internal TaskSeqInternal =
return result
}

let foldWhile predicate folder initial (source: TaskSeq<_>) =
checkNonNull (nameof source) source

task {
use e = source.GetAsyncEnumerator CancellationToken.None
let mutable result = initial
let mutable running = true

while running do
let! hasNext = e.MoveNextAsync()

if hasNext then
if predicate result e.Current then
result <- folder result e.Current
else
running <- false
else
running <- false

return result
}

let foldWhileAsync predicate folder initial (source: TaskSeq<_>) =
checkNonNull (nameof source) source

task {
use e = source.GetAsyncEnumerator CancellationToken.None
let mutable result = initial
let mutable running = true

while running do
let! hasNext = e.MoveNextAsync()

if hasNext then
let! keepGoing = predicate result e.Current

if keepGoing then
let! newState = folder result e.Current
result <- newState
else
running <- false
else
running <- false

return result
}

let scan folder initial (source: TaskSeq<_>) =
checkNonNull (nameof source) source

Expand Down
Loading