Merge commit '7621e2f8dec938cf48181c8b10afc9b01f444e68' into beta

This commit is contained in:
Ilya Laktyushin
2025-12-06 02:17:48 +04:00
commit 8344b97e03
28070 changed files with 7995182 additions and 0 deletions
@@ -0,0 +1,467 @@
import Foundation
import Postbox
import MtProtoKit
import SwiftSignalKit
import TelegramApi
private func roundUp(_ value: Int, to multiple: Int) -> Int {
if multiple == 0 {
return value
}
let remainder = value % multiple
if remainder == 0 {
return value
}
return value + multiple - remainder
}
enum UploadPartError {
case generic
case invalidMedia
}
private func wrapMethodBody(_ body: (FunctionDescription, Buffer, DeserializeFunctionResponse<Api.Bool>), useCompression: Bool) -> (FunctionDescription, Buffer, DeserializeFunctionResponse<Api.Bool>) {
if useCompression {
if let compressed = MTGzip.compress(body.1.makeData()) {
if compressed.count < body.1.size {
let os = MTOutputStream()
os.write(0x3072cfa1 as Int32)
os.writeBytes(compressed)
return (body.0, Buffer(data: os.currentBytes()), body.2)
}
}
}
return body
}
class Download: NSObject, MTRequestMessageServiceDelegate {
let datacenterId: Int
let isCdn: Bool
let context: MTContext
let mtProto: MTProto
let requestService: MTRequestMessageService
let useRequestTimeoutTimers: Bool
private let logPrefix = Atomic<String?>(value: nil)
private var shouldKeepConnectionDisposable: Disposable?
init(queue: Queue, datacenterId: Int, isMedia: Bool, isCdn: Bool, context: MTContext, masterDatacenterId: Int, usageInfo: MTNetworkUsageCalculationInfo?, shouldKeepConnection: Signal<Bool, NoError>, useRequestTimeoutTimers: Bool) {
self.datacenterId = datacenterId
self.isCdn = isCdn
self.context = context
self.useRequestTimeoutTimers = useRequestTimeoutTimers
var requiredAuthToken: Any?
var authTokenMasterDatacenterId: Int = 0
if !isCdn && datacenterId != masterDatacenterId {
authTokenMasterDatacenterId = masterDatacenterId
requiredAuthToken = Int(datacenterId) as NSNumber
}
self.mtProto = MTProto(context: self.context, datacenterId: datacenterId, usageCalculationInfo: usageInfo, requiredAuthToken: requiredAuthToken, authTokenMasterDatacenterId: authTokenMasterDatacenterId)
let logPrefix = self.logPrefix
self.mtProto.getLogPrefix = {
return logPrefix.with { $0 }
}
self.mtProto.cdn = isCdn
self.mtProto.useTempAuthKeys = self.context.useTempAuthKeys && !isCdn
self.mtProto.media = isMedia
self.requestService = MTRequestMessageService(context: self.context)
self.requestService.forceBackgroundRequests = true
super.init()
self.requestService.delegate = self
self.mtProto.add(self.requestService)
let mtProto = self.mtProto
self.shouldKeepConnectionDisposable = (shouldKeepConnection |> distinctUntilChanged |> deliverOn(queue)).start(next: { [weak mtProto] value in
if let mtProto = mtProto {
if value {
Logger.shared.log("Network", "Resume worker network connection")
mtProto.resume()
} else {
Logger.shared.log("Network", "Pause worker network connection")
mtProto.pause()
}
}
})
}
deinit {
self.mtProto.remove(self.requestService)
self.mtProto.stop()
self.mtProto.finalizeSession()
self.shouldKeepConnectionDisposable?.dispose()
}
func requestMessageServiceAuthorizationRequired(_ requestMessageService: MTRequestMessageService!) {
self.context.updateAuthTokenForDatacenter(withId: self.datacenterId, authToken: nil)
self.context.authTokenForDatacenter(withIdRequired: self.datacenterId, authToken:self.mtProto.requiredAuthToken, masterDatacenterId: self.mtProto.authTokenMasterDatacenterId)
}
static func uploadPart(multiplexedManager: MultiplexedRequestManager, datacenterId: Int, consumerId: Int64, tag: MediaResourceFetchTag?, fileId: Int64, index: Int, data: Data, asBigPart: Bool, bigTotalParts: Int? = nil, useCompression: Bool = false, onFloodWaitError: ((String) -> Void)? = nil) -> Signal<Void, UploadPartError> {
let saveFilePart: (FunctionDescription, Buffer, DeserializeFunctionResponse<Api.Bool>)
if asBigPart {
let totalParts: Int32
if let bigTotalParts = bigTotalParts, bigTotalParts > 0 && bigTotalParts < Int32.max {
totalParts = Int32(bigTotalParts)
} else {
totalParts = -1
}
saveFilePart = Api.functions.upload.saveBigFilePart(fileId: fileId, filePart: Int32(index), fileTotalParts: totalParts, bytes: Buffer(data: data))
} else {
saveFilePart = Api.functions.upload.saveFilePart(fileId: fileId, filePart: Int32(index), bytes: Buffer(data: data))
}
return multiplexedManager.request(to: .main(datacenterId), consumerId: consumerId, resourceId: nil, data: wrapMethodBody(saveFilePart, useCompression: useCompression), tag: tag, continueInBackground: true, onFloodWaitError: onFloodWaitError, expectedResponseSize: nil)
|> mapError { error -> UploadPartError in
if error.errorCode == 400 {
return .invalidMedia
} else {
return .generic
}
}
|> mapToSignal { _ -> Signal<Void, UploadPartError> in
return .complete()
}
}
func uploadPart(fileId: Int64, index: Int, data: Data, asBigPart: Bool, bigTotalParts: Int? = nil, useCompression: Bool = false, onFloodWaitError: ((String) -> Void)? = nil) -> Signal<Void, UploadPartError> {
return Signal<Void, MTRpcError> { subscriber in
let request = MTRequest()
var saveFilePart: (FunctionDescription, Buffer, DeserializeFunctionResponse<Api.Bool>)
if asBigPart {
let totalParts: Int32
if let bigTotalParts = bigTotalParts {
totalParts = Int32(bigTotalParts)
} else {
totalParts = -1
}
saveFilePart = Api.functions.upload.saveBigFilePart(fileId: fileId, filePart: Int32(index), fileTotalParts: totalParts, bytes: Buffer(data: data))
} else {
saveFilePart = Api.functions.upload.saveFilePart(fileId: fileId, filePart: Int32(index), bytes: Buffer(data: data))
}
saveFilePart = wrapMethodBody(saveFilePart, useCompression: useCompression)
request.setPayload(saveFilePart.1.makeData() as Data, metadata: WrappedRequestMetadata(metadata: WrappedFunctionDescription(saveFilePart.0), tag: nil), shortMetadata: WrappedRequestShortMetadata(shortMetadata: WrappedShortFunctionDescription(saveFilePart.0)), responseParser: { response in
if let result = saveFilePart.2.parse(Buffer(data: response)) {
return BoxedMessage(result)
}
return nil
})
request.dependsOnPasswordEntry = false
request.shouldContinueExecutionWithErrorContext = { errorContext in
guard let errorContext = errorContext else {
return true
}
if let onFloodWaitError, errorContext.floodWaitSeconds > 0, let errorText = errorContext.floodWaitErrorText {
onFloodWaitError(errorText)
}
return true
}
request.completed = { (boxedResponse, timestamp, error) -> () in
if let error = error {
subscriber.putError(error)
} else {
subscriber.putCompletion()
}
}
let internalId: Any! = request.internalId
self.requestService.add(request)
return ActionDisposable {
self.requestService.removeRequest(byInternalId: internalId)
}
} |> `catch` { value -> Signal<Void, UploadPartError> in
if value.errorCode == 400 {
return .fail(.invalidMedia)
} else {
return .fail(.generic)
}
}
}
func webFilePart(location: Api.InputWebFileLocation, offset: Int, length: Int) -> Signal<Data, NoError> {
return Signal<Data, MTRpcError> { subscriber in
let request = MTRequest()
request.expectedResponseSize = Int32(length)
var updatedLength = roundUp(length, to: 4096)
while updatedLength % 4096 != 0 || 1048576 % updatedLength != 0 {
updatedLength += 1
}
let data = Api.functions.upload.getWebFile(location: location, offset: Int32(offset), limit: Int32(updatedLength))
request.setPayload(data.1.makeData() as Data, metadata: WrappedRequestMetadata(metadata: WrappedFunctionDescription(data.0), tag: nil), shortMetadata: WrappedRequestShortMetadata(shortMetadata: WrappedFunctionDescription(data.0)), responseParser: { response in
if let result = data.2.parse(Buffer(data: response)) {
return BoxedMessage(result)
}
return nil
})
request.dependsOnPasswordEntry = false
request.needsTimeoutTimer = self.useRequestTimeoutTimers
request.shouldContinueExecutionWithErrorContext = { errorContext in
return true
}
request.completed = { (boxedResponse, timestamp, error) -> () in
if let error = error {
subscriber.putError(error)
} else {
if let result = (boxedResponse as! BoxedMessage).body as? Api.upload.WebFile {
switch result {
case .webFile(_, _, _, _, let bytes):
subscriber.putNext(bytes.makeData())
}
subscriber.putCompletion()
}
else {
subscriber.putError(MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"))
}
}
}
let internalId: Any! = request.internalId
self.requestService.add(request)
return ActionDisposable {
self.requestService.removeRequest(byInternalId: internalId)
}
} |> retryRequest
}
func part(location: Api.InputFileLocation, offset: Int64, length: Int) -> Signal<Data, NoError> {
return Signal<Data, MTRpcError> { subscriber in
let request = MTRequest()
request.expectedResponseSize = Int32(length)
var updatedLength = roundUp(length, to: 4096)
while updatedLength % 4096 != 0 || 1048576 % updatedLength != 0 {
updatedLength += 1
}
let data = Api.functions.upload.getFile(flags: 0, location: location, offset: offset, limit: Int32(updatedLength))
request.setPayload(data.1.makeData() as Data, metadata: WrappedRequestMetadata(metadata: WrappedFunctionDescription(data.0), tag: nil), shortMetadata: WrappedRequestShortMetadata(shortMetadata: WrappedShortFunctionDescription(data.0)), responseParser: { response in
if let result = data.2.parse(Buffer(data: response)) {
return BoxedMessage(result)
}
return nil
})
request.dependsOnPasswordEntry = false
request.needsTimeoutTimer = self.useRequestTimeoutTimers
request.shouldContinueExecutionWithErrorContext = { errorContext in
return true
}
request.completed = { (boxedResponse, timestamp, error) -> () in
if let error = error {
subscriber.putError(error)
} else {
if let result = (boxedResponse as! BoxedMessage).body as? Api.upload.File {
switch result {
case let .file(_, _, bytes):
subscriber.putNext(bytes.makeData())
case .fileCdnRedirect:
break
}
subscriber.putCompletion()
}
else {
subscriber.putError(MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"))
}
}
}
let internalId: Any! = request.internalId
self.requestService.add(request)
return ActionDisposable {
self.requestService.removeRequest(byInternalId: internalId)
}
}
|> retryRequest
}
func request<T>(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), expectedResponseSize: Int32? = nil, automaticFloodWait: Bool = true, onFloodWaitError: ((String) -> Void)? = nil) -> Signal<T, MTRpcError> {
return Signal { subscriber in
let request = MTRequest()
request.expectedResponseSize = expectedResponseSize ?? 0
request.setPayload(data.1.makeData() as Data, metadata: WrappedRequestMetadata(metadata: WrappedFunctionDescription(data.0), tag: nil), shortMetadata: WrappedRequestShortMetadata(shortMetadata: WrappedShortFunctionDescription(data.0)), responseParser: { response in
if let result = data.2.parse(Buffer(data: response)) {
return BoxedMessage(result)
}
return nil
})
request.dependsOnPasswordEntry = false
request.needsTimeoutTimer = self.useRequestTimeoutTimers
request.shouldContinueExecutionWithErrorContext = { errorContext in
guard let errorContext = errorContext else {
return true
}
if let onFloodWaitError, errorContext.floodWaitSeconds > 0, let errorText = errorContext.floodWaitErrorText {
onFloodWaitError(errorText)
}
if errorContext.floodWaitSeconds > 0 && !automaticFloodWait {
return false
}
return true
}
request.completed = { (boxedResponse, timestamp, error) -> () in
if let error = error {
subscriber.putError(error)
} else {
if let result = (boxedResponse as! BoxedMessage).body as? T {
subscriber.putNext(result)
subscriber.putCompletion()
}
else {
subscriber.putError(MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"))
}
}
}
let internalId: Any! = request.internalId
self.requestService.add(request)
return ActionDisposable {
self.requestService.removeRequest(byInternalId: internalId)
}
}
}
func requestWithAdditionalData<T>(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), automaticFloodWait: Bool = true, onFloodWaitError: ((String) -> Void)? = nil, failOnServerErrors: Bool = false, expectedResponseSize: Int32? = nil) -> Signal<(T, Double), (MTRpcError, Double)> {
return Signal { subscriber in
let request = MTRequest()
request.expectedResponseSize = expectedResponseSize ?? 0
request.setPayload(data.1.makeData() as Data, metadata: WrappedRequestMetadata(metadata: WrappedFunctionDescription(data.0), tag: nil), shortMetadata: WrappedRequestShortMetadata(shortMetadata: WrappedShortFunctionDescription(data.0)), responseParser: { response in
if let result = data.2.parse(Buffer(data: response)) {
return BoxedMessage(result)
}
return nil
})
request.dependsOnPasswordEntry = false
request.needsTimeoutTimer = self.useRequestTimeoutTimers
request.shouldContinueExecutionWithErrorContext = { errorContext in
guard let errorContext = errorContext else {
return true
}
if let onFloodWaitError, errorContext.floodWaitSeconds > 0, let errorText = errorContext.floodWaitErrorText {
onFloodWaitError(errorText)
}
if errorContext.floodWaitSeconds > 0 && !automaticFloodWait {
return false
}
if errorContext.internalServerErrorCount > 0 && failOnServerErrors {
return false
}
return true
}
request.completed = { (boxedResponse, info, error) -> () in
if let error = error {
subscriber.putError((error, info?.timestamp ?? 0.0))
} else {
if let result = (boxedResponse as! BoxedMessage).body as? T {
subscriber.putNext((result, info?.timestamp ?? 0.0))
subscriber.putCompletion()
}
else {
subscriber.putError((MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"), info?.timestamp ?? 0.0))
}
}
}
let internalId: Any! = request.internalId
self.requestService.add(request)
return ActionDisposable {
self.requestService.removeRequest(byInternalId: internalId)
}
}
}
func rawRequest(_ data: (FunctionDescription, Buffer, (Buffer) -> Any?), automaticFloodWait: Bool = true, onFloodWaitError: ((String) -> Void)? = nil, failOnServerErrors: Bool = false, logPrefix: String = "", expectedResponseSize: Int32? = nil) -> Signal<(Any, NetworkResponseInfo), (MTRpcError, Double)> {
let requestService = self.requestService
return Signal { subscriber in
let request = MTRequest()
request.expectedResponseSize = expectedResponseSize ?? 0
request.setPayload(data.1.makeData() as Data, metadata: WrappedRequestMetadata(metadata: WrappedFunctionDescription(data.0), tag: nil), shortMetadata: WrappedRequestShortMetadata(shortMetadata: WrappedShortFunctionDescription(data.0)), responseParser: { response in
if let result = data.2(Buffer(data: response)) {
return BoxedMessage(result)
}
return nil
})
request.dependsOnPasswordEntry = false
request.needsTimeoutTimer = self.useRequestTimeoutTimers
request.shouldContinueExecutionWithErrorContext = { errorContext in
guard let errorContext = errorContext else {
return true
}
if let onFloodWaitError, errorContext.floodWaitSeconds > 0, let errorText = errorContext.floodWaitErrorText {
onFloodWaitError(errorText)
}
if errorContext.floodWaitSeconds > 0 && !automaticFloodWait {
return false
}
if errorContext.internalServerErrorCount > 0 && failOnServerErrors {
return false
}
return true
}
request.completed = { (boxedResponse, info, error) -> () in
if let error = error {
subscriber.putError((error, info?.timestamp ?? 0))
} else {
let mappedInfo = NetworkResponseInfo(
timestamp: info?.timestamp ?? 0.0,
networkType: info?.networkType == 0 ? .wifi : .cellular,
networkDuration: info?.duration ?? 0.0
)
subscriber.putNext(((boxedResponse as! BoxedMessage).body, mappedInfo))
subscriber.putCompletion()
}
}
let internalId: Any! = request.internalId
requestService.add(request)
return ActionDisposable { [weak requestService] in
requestService?.removeRequest(byInternalId: internalId)
}
}
}
}
@@ -0,0 +1,35 @@
import Foundation
import Postbox
import SwiftSignalKit
import MtProtoKit
public func fetchHttpResource(url: String, preserveExactUrl: Bool = false) -> Signal<MediaResourceDataFetchResult, MediaResourceDataFetchError> {
var urlString: String? = url
if !preserveExactUrl {
urlString = url.addingPercentEncoding(withAllowedCharacters: CharacterSet.urlQueryAllowed)
}
if let urlString, let url = URL(string: urlString) {
let signal = MTHttpRequestOperation.data(forHttpUrl: url)!
return Signal { subscriber in
subscriber.putNext(.reset)
let disposable = signal.start(next: { next in
if let response = next as? MTHttpResponse {
let fetchResult: MediaResourceDataFetchResult = .dataPart(resourceOffset: 0, data: response.data, range: 0 ..< Int64(response.data.count), complete: true)
subscriber.putNext(fetchResult)
subscriber.putCompletion()
} else {
subscriber.putError(.generic)
}
}, error: { _ in
subscriber.putError(.generic)
}, completed: {
})
return ActionDisposable {
disposable?.dispose()
}
}
} else {
return .never()
}
}
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,541 @@
import Foundation
import Postbox
import TelegramApi
import SwiftSignalKit
import MtProtoKit
import CryptoUtils
import ManagedFile
private typealias SignalKitTimer = SwiftSignalKit.Timer
private struct UploadPart {
let fileId: Int64
let index: Int
let data: Data
let bigTotalParts: Int?
let bigPart: Bool
}
private func md5(_ data: Data) -> Data {
return data.withUnsafeBytes { rawBytes -> Data in
let bytes = rawBytes.baseAddress!
return CryptoMD5(bytes, Int32(data.count))
}
}
private final class MultipartUploadState {
let aesKey: Data
var aesIv: Data
var effectiveSize: Int64 = 0
init(encryptionKey: SecretFileEncryptionKey?) {
if let encryptionKey = encryptionKey {
self.aesKey = encryptionKey.aesKey
self.aesIv = encryptionKey.aesIv
} else {
self.aesKey = Data()
self.aesIv = Data()
}
}
func transformHeader(data: Data) -> Data {
assert(self.aesKey.isEmpty)
self.effectiveSize += Int64(data.count)
return data
}
func transform(data: Data) -> Data {
if self.aesKey.count != 0 {
var encryptedData = data
var paddingSize = 0
while (encryptedData.count + paddingSize) % 16 != 0 {
paddingSize += 1
}
if paddingSize != 0 {
encryptedData.count = encryptedData.count + paddingSize
}
let encryptedDataCount = encryptedData.count
encryptedData.withUnsafeMutableBytes { rawBytes -> Void in
let bytes = rawBytes.baseAddress!.assumingMemoryBound(to: UInt8.self)
if paddingSize != 0 {
arc4random_buf(bytes.advanced(by: encryptedDataCount - paddingSize), paddingSize)
}
self.aesIv.withUnsafeMutableBytes { rawIv -> Void in
let iv = rawIv.baseAddress!.assumingMemoryBound(to: UInt8.self)
MTAesEncryptBytesInplaceAndModifyIv(bytes, encryptedDataCount, self.aesKey, iv)
}
}
self.effectiveSize += Int64(encryptedData.count)
return encryptedData
} else {
self.effectiveSize += Int64(data.count)
return data
}
}
func finalize() -> Int64 {
return self.effectiveSize
}
}
private struct MultipartIntermediateResult {
let id: Int64
let partCount: Int32
let md5Digest: String
let size: Int64
let bigTotalParts: Int?
}
private enum MultipartUploadData {
case resourceData(MediaResourceData)
case data(Data)
var size: Int64 {
switch self {
case let .resourceData(data):
return data.size
case let .data(data):
return Int64(data.count)
}
}
var complete: Bool {
switch self {
case let .resourceData(data):
return data.complete
case .data:
return true
}
}
}
private enum HeaderPartState {
case notStarted
case uploading
case ready
}
private final class MultipartUploadManager {
let parallelParts: Int
var defaultPartSize: Int64
var bigTotalParts: Int?
var bigParts: Bool
private let forceNoBigParts: Bool
private let useLargerParts: Bool
let queue = Queue()
let fileId: Int64
let dataSignal: Signal<MultipartUploadData, NoError>
var committedOffset: Int64
let uploadPart: (UploadPart) -> Signal<Void, UploadPartError>
let progress: (Float) -> Void
let completed: (MultipartIntermediateResult?) -> Void
var uploadingParts: [Int64: (Int64, Disposable)] = [:]
var uploadedParts: [Int64: Int64] = [:]
let dataDisposable = MetaDisposable()
var resourceData: MultipartUploadData?
var headerPartState: HeaderPartState
let state: MultipartUploadState
init(headerSize: Int32, data: Signal<MultipartUploadData, NoError>, encryptionKey: SecretFileEncryptionKey?, hintFileSize: Int64?, hintFileIsLarge: Bool, forceNoBigParts: Bool, useLargerParts: Bool, increaseParallelParts: Bool, uploadPart: @escaping (UploadPart) -> Signal<Void, UploadPartError>, progress: @escaping (Float) -> Void, completed: @escaping (MultipartIntermediateResult?) -> Void) {
self.dataSignal = data
var fileId: Int64 = 0
arc4random_buf(&fileId, 8)
self.fileId = fileId
if increaseParallelParts {
self.parallelParts = 30
} else {
self.parallelParts = 3
}
self.forceNoBigParts = forceNoBigParts
self.useLargerParts = useLargerParts
self.state = MultipartUploadState(encryptionKey: encryptionKey)
self.committedOffset = 0
self.uploadPart = uploadPart
self.progress = progress
self.completed = completed
if headerSize == 0 {
self.headerPartState = .ready
} else {
self.headerPartState = .notStarted
}
if let hintFileSize = hintFileSize, hintFileSize > 10 * 1024 * 1024, !forceNoBigParts {
self.defaultPartSize = 512 * 1024
self.bigTotalParts = Int((hintFileSize / self.defaultPartSize) + (hintFileSize % self.defaultPartSize == 0 ? 0 : 1))
self.bigParts = true
} else if hintFileIsLarge, !forceNoBigParts {
self.defaultPartSize = 512 * 1024
self.bigTotalParts = nil
self.bigParts = true
} else if useLargerParts {
self.bigParts = false
self.defaultPartSize = 256 * 1024
self.bigTotalParts = nil
} else {
self.bigParts = false
self.defaultPartSize = 128 * 1024
self.bigTotalParts = nil
}
}
deinit {
let uploadingParts = self.uploadingParts
let dataDisposable = self.dataDisposable
self.queue.async {
for (_, (_, disposable)) in uploadingParts {
disposable.dispose()
}
dataDisposable.dispose()
}
}
func start() {
self.queue.async {
self.dataDisposable.set((self.dataSignal
|> deliverOn(self.queue)).startStrict(next: { [weak self] data in
if let strongSelf = self {
strongSelf.resourceData = data
strongSelf.checkState()
}
}))
}
}
func cancel() {
self.queue.async {
for (_, (_, disposable)) in self.uploadingParts {
disposable.dispose()
}
}
}
func checkState() {
if let resourceData = self.resourceData, resourceData.complete && resourceData.size != 0 {
if self.committedOffset == 0 && self.uploadedParts.isEmpty && self.uploadingParts.isEmpty {
if resourceData.size > 10 * 1024 * 1024, !self.forceNoBigParts {
self.defaultPartSize = 512 * 1024
self.bigTotalParts = Int(resourceData.size / self.defaultPartSize) + (resourceData.size % self.defaultPartSize == 0 ? 0 : 1)
self.bigParts = true
} else {
self.bigParts = false
if self.useLargerParts {
self.defaultPartSize = 256 * 1024
} else {
self.defaultPartSize = 16 * 1024
}
self.bigTotalParts = nil
}
}
}
var updatedCommittedOffset = false
for offset in self.uploadedParts.keys.sorted() {
if offset == self.committedOffset {
let partSize = self.uploadedParts[offset]!
self.committedOffset += partSize
updatedCommittedOffset = true
let _ = self.uploadedParts.removeValue(forKey: offset)
}
}
if updatedCommittedOffset {
if let resourceData = self.resourceData, resourceData.complete && resourceData.size != 0 {
self.progress(Float(self.committedOffset) / Float(resourceData.size))
}
}
if let resourceData = self.resourceData, resourceData.complete, self.committedOffset >= resourceData.size {
switch self.headerPartState {
case .ready:
let effectiveSize = self.state.finalize()
let effectivePartCount = Int32(effectiveSize / self.defaultPartSize + (effectiveSize % self.defaultPartSize == 0 ? 0 : 1))
var currentBigTotalParts = self.bigTotalParts
if self.bigParts {
currentBigTotalParts = Int(resourceData.size / self.defaultPartSize) + (resourceData.size % self.defaultPartSize == 0 ? 0 : 1)
}
self.completed(MultipartIntermediateResult(id: self.fileId, partCount: effectivePartCount, md5Digest: "", size: resourceData.size, bigTotalParts: currentBigTotalParts))
case .notStarted:
let partOffset: Int64 = 0
let partSize = min(resourceData.size - partOffset, self.defaultPartSize)
let partIndex = Int(partOffset / self.defaultPartSize)
let fileData: Data?
switch resourceData {
case let .resourceData(data):
fileData = try? Data(contentsOf: URL(fileURLWithPath: data.path), options: [.alwaysMapped])
case let .data(data):
fileData = data
}
if let fileData = fileData {
let partData = self.state.transformHeader(data: fileData.subdata(in: Int(partOffset) ..< Int(partOffset + partSize)))
var currentBigTotalParts: Int? = nil
if self.bigParts {
let totalParts = (resourceData.size / self.defaultPartSize) + (resourceData.size % self.defaultPartSize == 0 ? 0 : 1)
currentBigTotalParts = Int(totalParts)
}
self.headerPartState = .uploading
let part = self.uploadPart(UploadPart(fileId: self.fileId, index: partIndex, data: partData, bigTotalParts: currentBigTotalParts, bigPart: self.bigParts))
|> deliverOn(self.queue)
self.uploadingParts[0] = (partSize, part.startStrict(error: { [weak self] _ in
self?.completed(nil)
}, completed: { [weak self] in
if let strongSelf = self {
strongSelf.uploadingParts.removeValue(forKey: 0)?.1.dispose()
strongSelf.headerPartState = .ready
strongSelf.checkState()
}
}))
}
case .uploading:
break
}
} else if let resourceData = self.resourceData, self.state.aesKey.isEmpty || resourceData.complete {
while uploadingParts.count < self.parallelParts {
switch self.headerPartState {
case .notStarted:
if self.committedOffset == 0, !resourceData.complete {
self.committedOffset += self.defaultPartSize
}
case .ready, .uploading:
break
}
var nextOffset = self.committedOffset
for (offset, (size, _)) in self.uploadingParts {
nextOffset = max(nextOffset, offset + size)
}
for (offset, partSize) in self.uploadedParts {
nextOffset = max(nextOffset, offset + partSize)
}
let partOffset = nextOffset
let partSize = min(resourceData.size - partOffset, self.defaultPartSize)
if nextOffset < resourceData.size && partSize > 0 && (resourceData.complete || partSize == self.defaultPartSize) {
let partIndex = Int(partOffset / self.defaultPartSize)
let partData: Data?
switch resourceData {
case let .resourceData(data):
if let file = ManagedFile(queue: nil, path: data.path, mode: .read) {
let _ = file.seek(position: Int64(partOffset))
let data = file.readData(count: Int(partSize))
if data.count == partSize {
partData = data
} else {
partData = nil
}
} else {
partData = nil
}
case let .data(data):
if data.count >= partOffset + partSize {
partData = data.subdata(in: Int(partOffset) ..< Int(partOffset + partSize))
} else {
partData = nil
}
}
if let partData = partData {
let partData = self.state.transform(data: partData)
var currentBigTotalParts = self.bigTotalParts
if self.bigParts && resourceData.complete && partOffset + partSize == resourceData.size {
currentBigTotalParts = Int(resourceData.size / self.defaultPartSize) + (resourceData.size % self.defaultPartSize == 0 ? 0 : 1)
}
let part = self.uploadPart(UploadPart(fileId: self.fileId, index: partIndex, data: partData, bigTotalParts: currentBigTotalParts, bigPart: self.bigParts))
|> deliverOn(self.queue)
if partIndex == 0 {
switch self.headerPartState {
case .notStarted:
self.headerPartState = .uploading
case .ready, .uploading:
break
}
}
self.uploadingParts[nextOffset] = (partSize, part.startStrict(error: { [weak self] _ in
self?.completed(nil)
}, completed: { [weak self] in
if let strongSelf = self {
strongSelf.uploadingParts.removeValue(forKey: nextOffset)?.1.dispose()
strongSelf.uploadedParts[partOffset] = partSize
if partIndex == 0 {
strongSelf.headerPartState = .ready
}
strongSelf.checkState()
}
}))
} else {
self.completed(nil)
}
} else {
break
}
}
}
}
}
enum MultipartUploadResult {
case progress(Float)
case inputFile(Api.InputFile)
case inputSecretFile(Api.InputEncryptedFile, Int64, SecretFileEncryptionKey)
}
public enum MultipartUploadSource {
case resource(MediaResourceReference)
case data(Data)
case custom(Signal<MediaResourceData, NoError>)
case tempFile(TempBoxFile)
}
enum MultipartUploadError {
case generic
}
func multipartUpload(network: Network, postbox: Postbox, source: MultipartUploadSource, encrypt: Bool, tag: MediaResourceFetchTag?, hintFileSize: Int64?, hintFileIsLarge: Bool, forceNoBigParts: Bool, useLargerParts: Bool = false, increaseParallelParts: Bool = false, useMultiplexedRequests: Bool = true, useCompression: Bool = false) -> Signal<MultipartUploadResult, MultipartUploadError> {
enum UploadInterface {
case download(Download)
case multiplexed(manager: MultiplexedRequestManager, datacenterId: Int, consumerId: Int64)
}
let uploadInterface: Signal<UploadInterface, NoError>
if useMultiplexedRequests {
uploadInterface = .single(.multiplexed(manager: network.multiplexedRequestManager, datacenterId: network.datacenterId, consumerId: Int64.random(in: Int64.min ... Int64.max)))
} else {
uploadInterface = network.upload(tag: tag)
|> map { download -> UploadInterface in
return .download(download)
}
}
return uploadInterface
|> mapToSignalPromotingError { uploadInterface -> Signal<MultipartUploadResult, MultipartUploadError> in
return Signal { subscriber in
var encryptionKey: SecretFileEncryptionKey?
if encrypt {
var aesKey = Data()
aesKey.count = 32
var aesIv = Data()
aesIv.count = 32
aesKey.withUnsafeMutableBytes { rawBytes -> Void in
let bytes = rawBytes.baseAddress!
arc4random_buf(bytes, 32)
}
aesIv.withUnsafeMutableBytes { rawBytes -> Void in
let bytes = rawBytes.baseAddress!
arc4random_buf(bytes, 32)
}
encryptionKey = SecretFileEncryptionKey(aesKey: aesKey, aesIv: aesIv)
}
let dataSignal: Signal<MultipartUploadData, NoError>
let headerSize: Int32
let fetchedResource: Signal<Void, FetchResourceError>
switch source {
case let .resource(resource):
dataSignal = postbox.mediaBox.resourceData(resource.resource, option: .incremental(waitUntilFetchStatus: true)) |> map { MultipartUploadData.resourceData($0) }
headerSize = resource.resource.headerSize
fetchedResource = fetchedMediaResource(mediaBox: postbox.mediaBox, userLocation: .other, userContentType: .other, reference: resource)
|> map { _ in }
case let .tempFile(file):
if let size = fileSize(file.path) {
dataSignal = .single(.resourceData(MediaResourceData(path: file.path, offset: 0, size: size, complete: true)))
headerSize = 0
fetchedResource = .complete()
} else {
subscriber.putError(.generic)
return EmptyDisposable
}
case let .data(data):
dataSignal = .single(.data(data))
headerSize = 0
fetchedResource = .complete()
case let .custom(signal):
headerSize = 1024
dataSignal = signal
|> map { data in
print("**data \(data) \(data.complete)")
return MultipartUploadData.resourceData(data)
}
fetchedResource = .complete()
}
let onFloodWaitError: (String) -> Void = { [weak network] error in
guard let network else {
return
}
if error.hasPrefix("FLOOD_PREMIUM_WAIT") {
network.addNetworkSpeedLimitedEvent(event: .upload)
}
}
let manager = MultipartUploadManager(headerSize: headerSize, data: dataSignal, encryptionKey: encryptionKey, hintFileSize: hintFileSize, hintFileIsLarge: hintFileIsLarge, forceNoBigParts: forceNoBigParts, useLargerParts: useLargerParts, increaseParallelParts: increaseParallelParts, uploadPart: { part in
switch uploadInterface {
case let .download(download):
return download.uploadPart(fileId: part.fileId, index: part.index, data: part.data, asBigPart: part.bigPart, bigTotalParts: part.bigTotalParts, useCompression: useCompression, onFloodWaitError: onFloodWaitError)
case let .multiplexed(multiplexed, datacenterId, consumerId):
return Download.uploadPart(multiplexedManager: multiplexed, datacenterId: datacenterId, consumerId: consumerId, tag: nil, fileId: part.fileId, index: part.index, data: part.data, asBigPart: part.bigPart, bigTotalParts: part.bigTotalParts, useCompression: useCompression, onFloodWaitError: onFloodWaitError)
}
}, progress: { progress in
subscriber.putNext(.progress(progress))
}, completed: { result in
if let result = result {
if let encryptionKey = encryptionKey {
let keyDigest = md5(encryptionKey.aesKey + encryptionKey.aesIv)
var fingerprint: Int32 = 0
keyDigest.withUnsafeBytes { rawBytes -> Void in
let bytes = rawBytes.baseAddress!.assumingMemoryBound(to: UInt8.self)
withUnsafeMutableBytes(of: &fingerprint, { ptr -> Void in
let uintPtr = ptr.baseAddress!.assumingMemoryBound(to: UInt8.self)
uintPtr[0] = bytes[0] ^ bytes[4]
uintPtr[1] = bytes[1] ^ bytes[5]
uintPtr[2] = bytes[2] ^ bytes[6]
uintPtr[3] = bytes[3] ^ bytes[7]
})
}
if let _ = result.bigTotalParts {
let inputFile = Api.InputEncryptedFile.inputEncryptedFileBigUploaded(id: result.id, parts: result.partCount, keyFingerprint: fingerprint)
subscriber.putNext(.inputSecretFile(inputFile, result.size, encryptionKey))
} else {
let inputFile = Api.InputEncryptedFile.inputEncryptedFileUploaded(id: result.id, parts: result.partCount, md5Checksum: result.md5Digest, keyFingerprint: fingerprint)
subscriber.putNext(.inputSecretFile(inputFile, result.size, encryptionKey))
}
} else {
if let _ = result.bigTotalParts {
let inputFile = Api.InputFile.inputFileBig(id: result.id, parts: result.partCount, name: "file.jpg")
subscriber.putNext(.inputFile(inputFile))
} else {
let inputFile = Api.InputFile.inputFile(id: result.id, parts: result.partCount, name: "file.jpg", md5Checksum: result.md5Digest)
subscriber.putNext(.inputFile(inputFile))
}
}
subscriber.putCompletion()
} else {
subscriber.putError(.generic)
}
})
manager.start()
let fetchedResourceDisposable = fetchedResource.start(error: { _ in
subscriber.putError(.generic)
})
return ActionDisposable {
manager.cancel()
fetchedResourceDisposable.dispose()
}
}
}
}
@@ -0,0 +1,400 @@
import Foundation
import TelegramApi
import Postbox
import SwiftSignalKit
import MtProtoKit
enum MultiplexedRequestTarget: Equatable, Hashable, CustomStringConvertible {
case main(Int)
case cdn(Int)
var description: String {
switch self {
case let .main(id):
return "dc\(id)"
case let .cdn(id):
return "cdn\(id)"
}
}
}
private struct MultiplexedRequestTargetKey: Equatable, Hashable {
let target: MultiplexedRequestTarget
let continueInBackground: Bool
}
private final class RequestData {
let id: Int32
let consumerId: Int64
let resourceId: String?
let target: MultiplexedRequestTarget
let functionDescription: FunctionDescription
let payload: Buffer
let tag: MediaResourceFetchTag?
let continueInBackground: Bool
let automaticFloodWait: Bool
let onFloodWaitError: ((String) -> Void)?
let expectedResponseSize: Int32?
let deserializeResponse: (Buffer) -> Any?
let completed: (Any, NetworkResponseInfo) -> Void
let error: (MTRpcError, Double) -> Void
init(id: Int32, consumerId: Int64, resourceId: String?, target: MultiplexedRequestTarget, functionDescription: FunctionDescription, payload: Buffer, tag: MediaResourceFetchTag?, continueInBackground: Bool, automaticFloodWait: Bool, onFloodWaitError: ((String) -> Void)?, expectedResponseSize: Int32?, deserializeResponse: @escaping (Buffer) -> Any?, completed: @escaping (Any, NetworkResponseInfo) -> Void, error: @escaping (MTRpcError, Double) -> Void) {
self.id = id
self.consumerId = consumerId
self.resourceId = resourceId
self.target = target
self.functionDescription = functionDescription
self.tag = tag
self.continueInBackground = continueInBackground
self.automaticFloodWait = automaticFloodWait
self.onFloodWaitError = onFloodWaitError
self.expectedResponseSize = expectedResponseSize
self.payload = payload
self.deserializeResponse = deserializeResponse
self.completed = completed
self.error = error
}
}
private final class ExecutingRequestData {
let requestId: Int32
let disposable: Disposable
init(requestId: Int32, disposable: Disposable) {
self.requestId = requestId
self.disposable = disposable
}
}
private final class RequestTargetContext {
let id: Int32
let worker: Download
var requests: [ExecutingRequestData]
init(id: Int32, worker: Download) {
self.id = id
self.worker = worker
self.requests = []
}
}
private struct MultiplexedRequestTargetTimerKey: Equatable, Hashable {
let key: MultiplexedRequestTargetKey
let id: Int32
}
private typealias SignalKitTimer = SwiftSignalKit.Timer
struct NetworkResponseInfo {
var timestamp: Double
var networkType: NetworkStatsContext.NetworkType
var networkDuration: Double
}
private final class MultiplexedRequestManagerContext {
final class RequestManagerPriorityContext {
var resourceCounters: [String: Bag<Int>] = [:]
}
private let queue: Queue
private let takeWorker: (MultiplexedRequestTarget, MediaResourceFetchTag?, Bool) -> Download?
private let priorityContext = RequestManagerPriorityContext()
private var queuedRequests: [RequestData] = []
private var nextId: Int32 = 0
private var targetContexts: [MultiplexedRequestTargetKey: [RequestTargetContext]] = [:]
private var emptyTargetDisposables: [MultiplexedRequestTargetTimerKey: Disposable] = [:]
init(queue: Queue, takeWorker: @escaping (MultiplexedRequestTarget, MediaResourceFetchTag?, Bool) -> Download?) {
self.queue = queue
self.takeWorker = takeWorker
}
deinit {
for targetContextList in self.targetContexts.values {
for targetContext in targetContextList {
for request in targetContext.requests {
request.disposable.dispose()
}
}
}
for disposable in emptyTargetDisposables.values {
disposable.dispose()
}
}
func pushPriority(resourceId: String, priority: Int) -> Disposable {
let queue = self.queue
let counters: Bag<Int>
if let current = self.priorityContext.resourceCounters[resourceId] {
counters = current
} else {
counters = Bag()
self.priorityContext.resourceCounters[resourceId] = counters
}
let index = counters.add(priority)
self.updateState()
return ActionDisposable { [weak self, weak counters] in
queue.async {
guard let `self` = self else {
return
}
if let current = self.priorityContext.resourceCounters[resourceId], current === counters {
current.remove(index)
if current.isEmpty {
self.priorityContext.resourceCounters.removeValue(forKey: resourceId)
}
self.updateState()
}
}
}
}
func request(to target: MultiplexedRequestTarget, consumerId: Int64, resourceId: String?, data: (FunctionDescription, Buffer, (Buffer) -> Any?), tag: MediaResourceFetchTag?, continueInBackground: Bool, automaticFloodWait: Bool, onFloodWaitError: ((String) -> Void)? = nil, expectedResponseSize: Int32?, completed: @escaping (Any, NetworkResponseInfo) -> Void, error: @escaping (MTRpcError, Double) -> Void) -> Disposable {
let targetKey = MultiplexedRequestTargetKey(target: target, continueInBackground: continueInBackground)
let requestId = self.nextId
self.nextId += 1
self.queuedRequests.append(RequestData(id: requestId, consumerId: consumerId, resourceId: resourceId, target: target, functionDescription: data.0, payload: data.1, tag: tag, continueInBackground: continueInBackground, automaticFloodWait: automaticFloodWait, onFloodWaitError: onFloodWaitError, expectedResponseSize: expectedResponseSize, deserializeResponse: { buffer in
return data.2(buffer)
}, completed: { result, info in
completed(result, info)
}, error: { e, timestamp in
error(e, timestamp)
}))
self.updateState()
let queue = self.queue
return ActionDisposable { [weak self] in
queue.async {
guard let strongSelf = self else {
return
}
for i in 0 ..< strongSelf.queuedRequests.count {
if strongSelf.queuedRequests[i].id == requestId {
strongSelf.queuedRequests.remove(at: i)
break
}
}
if strongSelf.targetContexts[targetKey] != nil {
outer: for targetContext in strongSelf.targetContexts[targetKey]! {
for i in 0 ..< targetContext.requests.count {
if targetContext.requests[i].requestId == requestId {
targetContext.requests[i].disposable.dispose()
targetContext.requests.remove(at: i)
break outer
}
}
}
}
strongSelf.updateState()
}
}
}
private func updateState() {
let maxRequestsPerWorker = 3
let maxWorkersPerTarget = 4
for request in self.queuedRequests.sorted(by: { lhs, rhs in
let lhsPriority = lhs.resourceId.flatMap { id in
if let counters = self.priorityContext.resourceCounters[id] {
return counters.copyItems().max() ?? 0
} else {
return 0
}
} ?? 0
let rhsPriority = rhs.resourceId.flatMap { id in
if let counters = self.priorityContext.resourceCounters[id] {
return counters.copyItems().max() ?? 0
} else {
return 0
}
} ?? 0
if lhsPriority != rhsPriority {
return lhsPriority > rhsPriority
}
return lhs.id < rhs.id
}) {
let targetKey = MultiplexedRequestTargetKey(target: request.target, continueInBackground: request.continueInBackground)
if self.targetContexts[targetKey] == nil {
self.targetContexts[targetKey] = []
}
var selectedContext: RequestTargetContext?
for targetContext in self.targetContexts[targetKey]! {
if targetContext.requests.count < maxRequestsPerWorker {
selectedContext = targetContext
break
}
}
if selectedContext == nil && self.targetContexts[targetKey]!.count < maxWorkersPerTarget {
if let worker = self.takeWorker(request.target, request.tag, request.continueInBackground) {
let contextId = self.nextId
self.nextId += 1
let targetContext = RequestTargetContext(id: contextId, worker: worker)
self.targetContexts[targetKey]!.append(targetContext)
selectedContext = targetContext
} else {
Logger.shared.log("MultiplexedRequestManager", "couldn't take worker")
}
}
if let selectedContext = selectedContext {
let disposable = MetaDisposable()
let requestId = request.id
selectedContext.requests.append(ExecutingRequestData(requestId: requestId, disposable: disposable))
let queue = self.queue
disposable.set(selectedContext.worker.rawRequest((request.functionDescription, request.payload, request.deserializeResponse), automaticFloodWait: request.automaticFloodWait, onFloodWaitError: request.onFloodWaitError, expectedResponseSize: request.expectedResponseSize).start(next: { [weak self, weak selectedContext] result, info in
queue.async {
guard let strongSelf = self else {
return
}
if let selectedContext = selectedContext {
for i in 0 ..< selectedContext.requests.count {
if selectedContext.requests[i].requestId == requestId {
selectedContext.requests.remove(at: i)
break
}
}
}
request.completed(result, info)
strongSelf.updateState()
}
}, error: { [weak self, weak selectedContext] error, timestamp in
queue.async {
guard let strongSelf = self else {
return
}
request.error(error, timestamp)
if let selectedContext = selectedContext {
for i in 0 ..< selectedContext.requests.count {
if selectedContext.requests[i].requestId == requestId {
selectedContext.requests.remove(at: i)
break
}
}
}
strongSelf.updateState()
}
}))
if let requestIndex = self.queuedRequests.firstIndex(where: { $0 === request }) {
self.queuedRequests.remove(at: requestIndex)
}
continue
}
}
self.checkEmptyContexts()
}
private func checkEmptyContexts() {
for (targetKey, contexts) in self.targetContexts {
for context in contexts {
let key = MultiplexedRequestTargetTimerKey(key: targetKey, id: context.id)
if context.requests.isEmpty {
if self.emptyTargetDisposables[key] == nil {
let disposable = MetaDisposable()
self.emptyTargetDisposables[key] = disposable
disposable.set((Signal<Never, NoError>.complete()
|> delay(20 * 60, queue: self.queue)
|> deliverOn(self.queue)).start(completed: { [weak self] in
guard let strongSelf = self else {
return
}
strongSelf.emptyTargetDisposables.removeValue(forKey: key)
if strongSelf.targetContexts[targetKey] != nil {
for i in 0 ..< strongSelf.targetContexts[targetKey]!.count {
if strongSelf.targetContexts[targetKey]![i].id == key.id {
strongSelf.targetContexts[targetKey]!.remove(at: i)
break
}
}
}
}))
}
} else {
if let disposable = self.emptyTargetDisposables[key] {
disposable.dispose()
self.emptyTargetDisposables.removeValue(forKey: key)
}
}
}
}
}
}
final class MultiplexedRequestManager {
private let queue = Queue()
private let context: QueueLocalObject<MultiplexedRequestManagerContext>
init(takeWorker: @escaping (MultiplexedRequestTarget, MediaResourceFetchTag?, Bool) -> Download?) {
let queue = self.queue
self.context = QueueLocalObject(queue: self.queue, generate: {
return MultiplexedRequestManagerContext(queue: queue, takeWorker: takeWorker)
})
}
func pushPriority(resourceId: String, priority: Int) -> Disposable {
let disposable = MetaDisposable()
self.context.with { context in
disposable.set(context.pushPriority(resourceId: resourceId, priority: priority))
}
return disposable
}
func request<T>(to target: MultiplexedRequestTarget, consumerId: Int64, resourceId: String?, data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: MediaResourceFetchTag?, continueInBackground: Bool, automaticFloodWait: Bool = true, onFloodWaitError: ((String) -> Void)? = nil, expectedResponseSize: Int32?) -> Signal<T, MTRpcError> {
return Signal { subscriber in
let disposable = MetaDisposable()
self.context.with { context in
disposable.set(context.request(to: target, consumerId: consumerId, resourceId: resourceId, data: (data.0, data.1, { buffer in
return data.2.parse(buffer)
}), tag: tag, continueInBackground: continueInBackground, automaticFloodWait: automaticFloodWait, onFloodWaitError: onFloodWaitError, expectedResponseSize: expectedResponseSize, completed: { result, _ in
if let result = result as? T {
subscriber.putNext(result)
subscriber.putCompletion()
} else {
subscriber.putError(MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"))
}
}, error: { error, _ in
subscriber.putError(error)
}))
}
return disposable
}
}
func requestWithAdditionalInfo<T>(to target: MultiplexedRequestTarget, consumerId: Int64, resourceId: String?, data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: MediaResourceFetchTag?, continueInBackground: Bool, automaticFloodWait: Bool = true, onFloodWaitError: ((String) -> Void)? = nil, expectedResponseSize: Int32?) -> Signal<(T, NetworkResponseInfo), (MTRpcError, Double)> {
return Signal { subscriber in
let disposable = MetaDisposable()
self.context.with { context in
disposable.set(context.request(to: target, consumerId: consumerId, resourceId: resourceId, data: (data.0, data.1, { buffer in
return data.2.parse(buffer)
}), tag: tag, continueInBackground: continueInBackground, automaticFloodWait: automaticFloodWait, onFloodWaitError: onFloodWaitError, expectedResponseSize: expectedResponseSize, completed: { result, info in
if let result = result as? T {
subscriber.putNext((result, info))
subscriber.putCompletion()
} else {
subscriber.putError((MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"), info.timestamp))
}
}, error: { error, timestamp in
subscriber.putError((error, timestamp))
}))
}
return disposable
}
}
}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,356 @@
import Foundation
import Network
import MtProtoKit
import SwiftSignalKit
@available(iOS 12.0, macOS 14.0, *)
final class NetworkFrameworkTcpConnectionInterface: NSObject, MTTcpConnectionInterface {
private struct ReadRequest {
let length: Int
let tag: Int
}
private final class ExecutingReadRequest {
let request: ReadRequest
var data: Data
var readyLength: Int = 0
init(request: ReadRequest) {
self.request = request
self.data = Data(count: request.length)
}
}
private final class Impl {
private let queue: Queue
private weak var delegate: MTTcpConnectionInterfaceDelegate?
private let delegateQueue: DispatchQueue
private let requestChunkLength: Int
private var connection: NWConnection?
private var reportedDisconnection: Bool = false
private var currentInterfaceIsWifi: Bool = true
private var connectTimeoutTimer: SwiftSignalKit.Timer?
private var usageCalculationInfo: MTNetworkUsageCalculationInfo?
private var networkUsageManager: MTNetworkUsageManager?
private var readRequests: [ReadRequest] = []
private var currentReadRequest: ExecutingReadRequest?
init(
queue: Queue,
delegate: MTTcpConnectionInterfaceDelegate,
delegateQueue: DispatchQueue
) {
self.queue = queue
self.delegate = delegate
self.delegateQueue = delegateQueue
self.requestChunkLength = 256 * 1024
}
deinit {
}
func setUsageCalculationInfo(_ usageCalculationInfo: MTNetworkUsageCalculationInfo?) {
if self.usageCalculationInfo !== usageCalculationInfo {
self.usageCalculationInfo = usageCalculationInfo
if let usageCalculationInfo = usageCalculationInfo {
self.networkUsageManager = MTNetworkUsageManager(info: usageCalculationInfo)
} else {
self.networkUsageManager = nil
}
}
}
func connect(host: String, port: UInt16, timeout: Double) {
if self.connection != nil {
assertionFailure("A connection already exists")
return
}
let host = NWEndpoint.Host(host)
let port = NWEndpoint.Port(rawValue: port)!
let tcpOptions = NWProtocolTCP.Options()
tcpOptions.noDelay = true
tcpOptions.enableKeepalive = true
tcpOptions.keepaliveIdle = 5
tcpOptions.keepaliveCount = 2
tcpOptions.keepaliveInterval = 5
tcpOptions.enableFastOpen = true
let parameters = NWParameters(tls: nil, tcp: tcpOptions)
let connection = NWConnection(host: host, port: port, using: parameters)
self.connection = connection
let queue = self.queue
connection.stateUpdateHandler = { [weak self] state in
queue.async {
self?.stateUpdated(state: state)
}
}
connection.pathUpdateHandler = { [weak self] path in
queue.async {
guard let self = self else {
return
}
if path.usesInterfaceType(.cellular) {
self.currentInterfaceIsWifi = false
} else {
self.currentInterfaceIsWifi = true
}
}
}
connection.viabilityUpdateHandler = { [weak self] isViable in
queue.async {
guard let self = self else {
return
}
if !isViable {
self.cancelWithError(error: nil)
}
}
}
/*connection.betterPathUpdateHandler = { [weak self] hasBetterPath in
queue.async {
guard let self = self else {
return
}
if hasBetterPath {
self.cancelWithError(error: nil)
}
}
}*/
self.connectTimeoutTimer = SwiftSignalKit.Timer(timeout: timeout, repeat: false, completion: { [weak self] in
guard let self = self else {
return
}
self.connectTimeoutTimer = nil
self.cancelWithError(error: nil)
}, queue: self.queue)
self.connectTimeoutTimer?.start()
connection.start(queue: self.queue.queue)
self.processReadRequests()
}
private func stateUpdated(state: NWConnection.State) {
switch state {
case .ready:
if let path = self.connection?.currentPath {
if path.usesInterfaceType(.cellular) {
self.currentInterfaceIsWifi = false
} else {
self.currentInterfaceIsWifi = true
}
}
if let connectTimeoutTimer = connectTimeoutTimer {
self.connectTimeoutTimer = nil
connectTimeoutTimer.invalidate()
}
let delegate = self.delegate
self.delegateQueue.async { [weak delegate] in
if let delegate = delegate {
delegate.connectionInterfaceDidConnect()
}
}
case let .failed(error):
self.cancelWithError(error: error)
default:
break
}
}
func write(data: Data) {
guard let connection = self.connection else {
Logger.shared.log("NetworkFrameworkTcpConnectionInterface", "write called while connection == nil")
return
}
connection.send(content: data, completion: .contentProcessed({ _ in
}))
self.networkUsageManager?.addOutgoingBytes(UInt(data.count), interface: self.currentInterfaceIsWifi ? MTNetworkUsageManagerInterfaceOther : MTNetworkUsageManagerInterfaceWWAN)
}
func read(length: Int, timeout: Double, tag: Int) {
self.readRequests.append(NetworkFrameworkTcpConnectionInterface.ReadRequest(length: length, tag: tag))
self.processReadRequests()
}
private func processReadRequests() {
if self.currentReadRequest != nil {
return
}
if self.readRequests.isEmpty {
return
}
let readRequest = self.readRequests.removeFirst()
let currentReadRequest = ExecutingReadRequest(request: readRequest)
self.currentReadRequest = currentReadRequest
self.processCurrentRead()
}
private func processCurrentRead() {
guard let currentReadRequest = self.currentReadRequest else {
return
}
guard let connection = self.connection else {
print("Connection not ready")
return
}
let requestChunkLength = min(self.requestChunkLength, currentReadRequest.request.length - currentReadRequest.readyLength)
if requestChunkLength == 0 {
self.currentReadRequest = nil
let delegate = self.delegate
let currentInterfaceIsWifi = self.currentInterfaceIsWifi
self.delegateQueue.async { [weak delegate] in
if let delegate = delegate {
delegate.connectionInterfaceDidRead(currentReadRequest.data, withTag: currentReadRequest.request.tag, networkType: currentInterfaceIsWifi ? 0 : 1)
}
}
self.processReadRequests()
} else {
connection.receive(minimumIncompleteLength: requestChunkLength, maximumLength: requestChunkLength, completion: { [weak self] data, context, isComplete, error in
guard let self = self, let currentReadRequest = self.currentReadRequest else {
return
}
if let data = data {
self.networkUsageManager?.addIncomingBytes(UInt(data.count), interface: self.currentInterfaceIsWifi ? MTNetworkUsageManagerInterfaceOther : MTNetworkUsageManagerInterfaceWWAN)
if data.count != 0 && data.count <= currentReadRequest.request.length - currentReadRequest.readyLength {
currentReadRequest.data.withUnsafeMutableBytes { currentBuffer in
guard let currentBytes = currentBuffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else {
return
}
data.copyBytes(to: currentBytes.advanced(by: currentReadRequest.readyLength), count: data.count)
}
currentReadRequest.readyLength += data.count
let tag = currentReadRequest.request.tag
let readCount = data.count
let delegate = self.delegate
self.delegateQueue.async { [weak delegate] in
if let delegate = delegate {
delegate.connectionInterfaceDidReadPartialData(ofLength: UInt(readCount), tag: tag)
}
}
self.processCurrentRead()
} else {
self.cancelWithError(error: error)
}
if isComplete && data.count == 0 {
self.cancelWithError(error: nil)
}
} else {
self.cancelWithError(error: error)
}
})
}
}
private func cancelWithError(error: Error?) {
if let connectTimeoutTimer = self.connectTimeoutTimer {
self.connectTimeoutTimer = nil
connectTimeoutTimer.invalidate()
}
if !self.reportedDisconnection {
self.reportedDisconnection = true
let delegate = self.delegate
self.delegateQueue.async { [weak delegate] in
if let delegate = delegate {
delegate.connectionInterfaceDidDisconnectWithError(error)
}
}
}
if let connection = self.connection {
self.connection = nil
connection.cancel()
}
}
func disconnect() {
self.cancelWithError(error: nil)
}
func resetDelegate() {
self.delegate = nil
}
}
private static let sharedQueue = Queue(name: "NetworkFrameworkTcpConnectionInteface")
private let queue: Queue
private let impl: QueueLocalObject<Impl>
init(delegate: MTTcpConnectionInterfaceDelegate, delegateQueue: DispatchQueue) {
let queue = NetworkFrameworkTcpConnectionInterface.sharedQueue
self.queue = queue
self.impl = QueueLocalObject(queue: queue, generate: {
return Impl(queue: queue, delegate: delegate, delegateQueue: delegateQueue)
})
}
func setGetLogPrefix(_ getLogPrefix: (() -> String)?) {
}
func setUsageCalculationInfo(_ usageCalculationInfo: MTNetworkUsageCalculationInfo?) {
self.impl.with { impl in
impl.setUsageCalculationInfo(usageCalculationInfo)
}
}
func connect(toHost inHost: String, onPort port: UInt16, viaInterface inInterface: String?, withTimeout timeout: TimeInterval, error errPtr: NSErrorPointer) -> Bool {
self.impl.with { impl in
impl.connect(host: inHost, port: port, timeout: timeout)
}
return true
}
func write(_ data: Data) {
self.impl.with { impl in
impl.write(data: data)
}
}
func readData(toLength length: UInt, withTimeout timeout: TimeInterval, tag: Int) {
self.impl.with { impl in
impl.read(length: Int(length), timeout: timeout, tag: tag)
}
}
func disconnect() {
self.impl.with { impl in
impl.disconnect()
}
}
func resetDelegate() {
self.impl.with { impl in
impl.resetDelegate()
}
}
}
@@ -0,0 +1,124 @@
import Foundation
import SwiftSignalKit
import Postbox
final class NetworkStatsContext {
enum NetworkType: Int32 {
case wifi = 0
case cellular = 1
}
struct DownloadEvent {
let networkType: NetworkType
let datacenterId: Int32
let size: Double
let networkDuration: Double
let issueDuration: Double
init(
networkType: NetworkType,
datacenterId: Int32,
size: Double,
networkDuration: Double,
issueDuration: Double
) {
self.networkType = networkType
self.datacenterId = datacenterId
self.size = size
self.networkDuration = networkDuration
self.issueDuration = issueDuration
}
}
private struct TargetKey: Hashable {
let networkType: NetworkType
let datacenterId: Int32
init(networkType: NetworkType, datacenterId: Int32) {
self.networkType = networkType
self.datacenterId = datacenterId
}
}
private final class AverageStats {
var networkBps: Double = 0.0
var issueDuration: Double = 0.0
var networkDelay: Double = 0.0
var count: Int = 0
var size: Int64 = 0
}
private final class Impl {
let queue: Queue
weak var postbox: Postbox?
var averageTargetStats: [TargetKey: AverageStats] = [:]
init(queue: Queue, postbox: Postbox?) {
self.queue = queue
self.postbox = postbox
}
func add(downloadEvents: [DownloadEvent]) {
for event in downloadEvents {
if event.networkDuration == 0.0 {
continue
}
let targetKey = TargetKey(networkType: event.networkType, datacenterId: event.datacenterId)
let averageStats: AverageStats
if let current = self.averageTargetStats[targetKey] {
averageStats = current
} else {
averageStats = AverageStats()
self.averageTargetStats[targetKey] = averageStats
}
averageStats.count += 1
averageStats.issueDuration += event.issueDuration
averageStats.networkDelay += event.issueDuration - event.networkDuration
averageStats.networkBps += event.size / event.networkDuration
averageStats.size += Int64(event.size)
}
self.maybeFlushStats()
}
private func maybeFlushStats() {
var removeKeys: [TargetKey] = []
for (targetKey, averageStats) in self.averageTargetStats {
if averageStats.count >= 1000 || averageStats.size >= 4 * 1024 * 1024 {
if let postbox = self.postbox {
addAppLogEvent(postbox: postbox, type: "download", data: .dictionary([
"n": .number(Double(targetKey.networkType.rawValue)),
"d": .number(Double(targetKey.datacenterId)),
"b": .number(averageStats.networkBps / Double(averageStats.count)),
"nd": .number(averageStats.networkDelay / Double(averageStats.count))
]))
}
removeKeys.append(targetKey)
}
}
for key in removeKeys {
self.averageTargetStats.removeValue(forKey: key)
}
}
}
private static let sharedQueue = Queue(name: "NetworkStatsContext")
private let queue: Queue
private let impl: QueueLocalObject<Impl>
init(postbox: Postbox) {
let queue = NetworkStatsContext.sharedQueue
self.queue = queue
self.impl = QueueLocalObject(queue: queue, generate: { [weak postbox] in
return Impl(queue: queue, postbox: postbox)
})
}
func add(downloadEvents: [DownloadEvent]) {
self.impl.with { impl in
impl.add(downloadEvents: downloadEvents)
}
}
}
@@ -0,0 +1,163 @@
import Foundation
import SwiftSignalKit
import MtProtoKit
import Reachability
#if os(iOS)
import CoreTelephony
#endif
#if os(iOS)
public enum CellularNetworkType {
case unknown
case gprs
case edge
case thirdG
case lte
}
extension CellularNetworkType {
init(accessTechnology: String) {
switch accessTechnology {
case CTRadioAccessTechnologyGPRS:
self = .gprs
case CTRadioAccessTechnologyEdge, CTRadioAccessTechnologyCDMA1x:
self = .edge
case CTRadioAccessTechnologyLTE:
self = .lte
case CTRadioAccessTechnologyWCDMA, CTRadioAccessTechnologyHSDPA, CTRadioAccessTechnologyHSUPA, CTRadioAccessTechnologyCDMAEVDORev0, CTRadioAccessTechnologyCDMAEVDORevA, CTRadioAccessTechnologyCDMAEVDORevB, CTRadioAccessTechnologyeHRPD:
self = .thirdG
default:
self = .unknown
}
}
}
#endif
public enum NetworkType: Equatable {
case none
case wifi
#if os(iOS)
case cellular(CellularNetworkType)
#endif
}
extension NetworkType {
#if os(iOS)
init(internalType: Reachability.NetworkType, cellularType: CellularNetworkType) {
switch internalType {
case .none:
self = .none
case .wifi:
self = .wifi
case .cellular:
self = .cellular(cellularType)
}
}
#else
init(internalType: Reachability.NetworkType) {
switch internalType {
case .none:
self = .none
case .wifi, .cellular:
self = .wifi
}
}
#endif
}
private final class NetworkTypeManagerImpl {
let queue: Queue
let updated: (NetworkType) -> Void
var networkTypeDisposable: Disposable?
var currentNetworkType: Reachability.NetworkType?
var networkType: NetworkType?
#if os(iOS)
var currentCellularType: CellularNetworkType
var cellularTypeObserver: NSObjectProtocol?
#endif
init(queue: Queue, updated: @escaping (NetworkType) -> Void) {
self.queue = queue
self.updated = updated
#if os(iOS)
let accessTechnology = CTTelephonyNetworkInfo().serviceCurrentRadioAccessTechnology?.values.first ?? ""
self.currentCellularType = CellularNetworkType(accessTechnology: accessTechnology)
self.cellularTypeObserver = NotificationCenter.default.addObserver(forName: NSNotification.Name.CTServiceRadioAccessTechnologyDidChange, object: nil, queue: nil, using: { [weak self] notification in
queue.async {
guard let strongSelf = self else {
return
}
let accessTechnology = CTTelephonyNetworkInfo().serviceCurrentRadioAccessTechnology?.values.first ?? ""
let cellularType = CellularNetworkType(accessTechnology: accessTechnology)
if strongSelf.currentCellularType != cellularType {
strongSelf.currentCellularType = cellularType
if let currentNetworkType = strongSelf.currentNetworkType {
let networkType = NetworkType(internalType: currentNetworkType, cellularType: cellularType)
if strongSelf.networkType != networkType {
strongSelf.networkType = networkType
strongSelf.updated(networkType)
}
}
}
}
})
#endif
let networkTypeDisposable = MetaDisposable()
self.networkTypeDisposable = networkTypeDisposable
networkTypeDisposable.set((Reachability.networkType
|> deliverOn(queue)).start(next: { [weak self] networkStatus in
guard let strongSelf = self else {
return
}
if strongSelf.currentNetworkType != networkStatus {
strongSelf.currentNetworkType = networkStatus
let networkType: NetworkType
#if os(iOS)
networkType = NetworkType(internalType: networkStatus, cellularType: strongSelf.currentCellularType)
#else
networkType = NetworkType(internalType: networkStatus)
#endif
if strongSelf.networkType != networkType {
strongSelf.networkType = networkType
updated(networkType)
}
}
}))
}
func stop() {
self.networkTypeDisposable?.dispose()
#if os(iOS)
if let observer = self.cellularTypeObserver {
NotificationCenter.default.removeObserver(observer, name: NSNotification.Name.CTServiceRadioAccessTechnologyDidChange, object: nil)
}
#endif
}
}
func currentNetworkType() -> Signal<NetworkType, NoError> {
return Signal { subscriber in
let queue = Queue()
let disposable = MetaDisposable()
queue.async {
let impl = QueueLocalObject(queue: queue, generate: {
return NetworkTypeManagerImpl(queue: queue, updated: { value in
subscriber.putNext(value)
})
})
disposable.set(ActionDisposable {
impl.with({ impl in
impl.stop()
})
})
}
return disposable
}
}
@@ -0,0 +1,129 @@
import Foundation
import SwiftSignalKit
import MtProtoKit
public enum ProxyServerStatus: Equatable {
case checking
case notAvailable
case available(Double)
}
private final class ProxyServerItemContext {
private var disposable: Disposable?
var value: ProxyServerStatus = .checking
init(queue: Queue, context: MTContext, datacenterId: Int, server: ProxyServerSettings, updated: @escaping (ProxyServerStatus) -> Void) {
self.disposable = (Signal<ProxyServerStatus, NoError> { subscriber in
let disposable = MTProxyConnectivity.pingProxy(with: context, datacenterId: datacenterId, settings: server.mtProxySettings).start(next: { next in
if let next = next as? MTProxyConnectivityStatus {
if !next.reachable {
subscriber.putNext(.notAvailable)
} else {
subscriber.putNext(.available(next.roundTripTime))
}
}
})
return ActionDisposable {
disposable?.dispose()
}
} |> runOn(queue)).start(next: { status in
updated(status)
})
}
deinit {
self.disposable?.dispose()
}
}
final class ProxyServersStatusesImpl {
private let queue: Queue
private var contexts: [ProxyServerSettings: ProxyServerItemContext] = [:]
private var serversDisposable: Disposable?
private var currentValues: [ProxyServerSettings: ProxyServerStatus] = [:] {
didSet {
self.values.set(.single(self.currentValues))
}
}
let values = Promise<[ProxyServerSettings: ProxyServerStatus]>([:])
init(queue: Queue, network: Network, servers: Signal<[ProxyServerSettings], NoError>) {
self.queue = queue
self.serversDisposable = (servers
|> deliverOn(self.queue)).start(next: { [weak self] servers in
if let strongSelf = self {
let validKeys = Set<ProxyServerSettings>(servers)
for key in validKeys {
if strongSelf.contexts[key] == nil {
let context = ProxyServerItemContext(queue: strongSelf.queue, context: network.context, datacenterId: network.datacenterId, server: key, updated: { value in
queue.async {
if let strongSelf = self {
strongSelf.contexts[key]?.value = value
strongSelf.updateValues()
}
}
})
strongSelf.contexts[key] = context
}
}
var removeKeys: [ProxyServerSettings] = []
for (key, _) in strongSelf.contexts {
if !validKeys.contains(key) {
removeKeys.append(key)
}
}
for key in removeKeys {
let _ = strongSelf.contexts.removeValue(forKey: key)
}
if !removeKeys.isEmpty {
strongSelf.updateValues()
}
}
})
}
deinit {
self.serversDisposable?.dispose()
}
private func updateValues() {
assert(self.queue.isCurrent())
var values: [ProxyServerSettings: ProxyServerStatus] = [:]
for (key, context) in self.contexts {
values[key] = context.value
}
self.currentValues = values
}
}
public final class ProxyServersStatuses {
private let impl: QueueLocalObject<ProxyServersStatusesImpl>
public init(network: Network, servers: Signal<[ProxyServerSettings], NoError>) {
let queue = Queue()
self.impl = QueueLocalObject(queue: queue, generate: {
return ProxyServersStatusesImpl(queue: queue, network: network, servers: servers)
})
}
public func statuses() -> Signal<[ProxyServerSettings: ProxyServerStatus], NoError> {
return Signal { subscriber in
let disposable = MetaDisposable()
self.impl.with { impl in
disposable.set(impl.values.get().start(next: { value in
subscriber.putNext(value)
}))
}
return ActionDisposable {
self.impl.with({ _ in })
disposable.dispose()
}
}
}
}