Files
2026-03-07 18:19:16 +01:00

192 lines
6.1 KiB
Swift

import Foundation
import SwiftSignalKit
public func transformValue<T, E, R>(_ f: @escaping(T) -> R) -> (Signal<T, E>) -> Signal<R, E> {
return map(f)
}
public func transformValueToSignal<T, R, E>(_ f: @escaping(T) -> Signal<R, E>) -> (Signal<T, E>) -> Signal<R, E> {
return mapToSignal(f)
}
public func convertSignalWithNoErrorToSignalWithError<T, R, E>(_ f: @escaping(T) -> Signal<R, E>) -> (Signal<T, NoError>) -> Signal<R, E> {
return mapToSignalPromotingError(f)
}
public func ignoreSignalErrors<T, E>(onError: ((E) -> Void)? = nil) -> (Signal<T, E>) -> Signal<T, NoError> {
return { signal in
return signal |> `catch` { error in
// Log the error using the provided callback, if any
onError?(error)
// Returning a signal that completes without errors
return Signal { subscriber in
subscriber.putCompletion()
return EmptyDisposable
}
}
}
}
// Wrapper for non-Error types
public struct SignalError<E>: Error {
public let error: E
public init(_ error: E) {
self.error = error
}
}
public struct SignalCompleted: Error {}
// Extension for Signals
// NoError can be marked a
// try? await signal.awaitable()
public extension Signal {
func awaitable(file: String = #file, line: Int = #line) async throws -> T {
return try await withCheckedThrowingContinuation { continuation in
var disposable: Disposable?
let hasResumed = Atomic<Bool>(value: false)
disposable = self.start(
next: { value in
if !hasResumed.with({ $0 }) {
let _ = hasResumed.swap(true)
continuation.resume(returning: value)
} else {
#if DEBUG
// Consider using awaitableStream() or |> take(1)
assertionFailure("awaitable Signal emitted more than one value. \(file):\(line)")
#endif
}
disposable?.dispose()
},
error: { error in
if !hasResumed.with({ $0 }) {
let _ = hasResumed.swap(true)
if let error = error as? Error {
continuation.resume(throwing: error)
} else {
continuation.resume(throwing: SignalError(error))
}
} else {
#if DEBUG
// I don't even know what we should consider here. awaitableStream?
assertionFailure("awaitable Signal emitted an error after a value. \(file):\(line)")
#endif
}
disposable?.dispose()
},
completed: {
if !hasResumed.with({ $0 }) {
let _ = hasResumed.swap(true)
continuation.resume(throwing: SignalCompleted())
}
disposable?.dispose()
}
)
}
}
func task() async throws -> T {
let disposable = MetaDisposable()
return try await withTaskCancellationHandler(operation: {
return try await withCheckedThrowingContinuation { continuation in
disposable.set((self |> take(1)).startStandalone(
next: { value in
continuation.resume(returning: value)
},
error: { err in
continuation.resume(throwing: SignalError(err))
}
))
}
}, onCancel: {
disposable.dispose()
})
}
func stream() -> AsyncThrowingStream<T, Error> {
return AsyncThrowingStream { continuation in
let disposable = self.startStandalone(
next: { value in
continuation.yield(value)
},
error: { err in
continuation.finish(throwing: SignalError(err))
},
completed: {
continuation.finish()
}
)
continuation.onTermination = { _ in
disposable.dispose()
}
}
}
}
public extension Signal where E == NoError {
func task() async -> T {
return await self.get()
}
func stream() -> AsyncStream<T> {
return AsyncStream { continuation in
let disposable = self.startStandalone(
next: { value in
continuation.yield(value)
},
completed: {
continuation.finish()
}
)
continuation.onTermination = { _ in
disposable.dispose()
}
}
}
}
// Extension for general Signal types - AsyncStream support
public extension Signal {
func awaitableStream() -> AsyncStream<T> {
return AsyncStream { continuation in
let disposable = self.start(
next: { value in
continuation.yield(value)
},
error: { _ in
continuation.finish()
},
completed: {
continuation.finish()
}
)
continuation.onTermination = { @Sendable _ in
disposable.dispose()
}
}
}
}
// Extension for NoError Signal types - AsyncStream support
public extension Signal where E == NoError {
func awaitableStream() -> AsyncStream<T> {
return AsyncStream { continuation in
let disposable = self.start(
next: { value in
continuation.yield(value)
},
completed: {
continuation.finish()
}
)
continuation.onTermination = { @Sendable _ in
disposable.dispose()
}
}
}
}