Earlier in this series, you learned that reactive programming can be defined as working with asynchronous streams of data. A network request is an asynchronous operation and that makes reactive programming an excellent match for networking.

Take Cloudy as an example. Cloudy sends a request to the weather API and is notified when that request completes, successfully or unsuccessfully. The URLSession API has built-in support for the Combine framework. In this episode, we use Combine to interact with the weather API. We cover a lot of ground in the next few episodes. Let's get started.

Creating a Data Task Publisher

Open RootViewModel.swift and navigate to the fetchWeatherData(for:) method. We start with a clean slate. We no longer ask the shared URL session for a data task.

// MARK: - Helper Methods

private func fetchWeatherData(for location: CLLocation) {
    // Cancel In Progress Data Task
    weatherDataTask?.cancel()

    // Helpers
    let latitude = location.coordinate.latitude
    let longitude = location.coordinate.longitude

    // Create URL
    let url = WeatherServiceRequest(latitude: latitude, longitude: longitude).url
}

Instead, we ask the shared URL session for a data task publisher by invoking the dataTaskPublisher(for:) method, passing in the URL object of the request. The URLSession class also defines a method with the same name that accepts a URLRequest object.

// MARK: - Helper Methods

private func fetchWeatherData(for location: CLLocation) {
    // Cancel In Progress Data Task
    weatherDataTask?.cancel()

    // Helpers
    let latitude = location.coordinate.latitude
    let longitude = location.coordinate.longitude

    // Create URL
    let url = WeatherServiceRequest(latitude: latitude, longitude: longitude).url

    // Create Data Task Publisher
    URLSession.shared.dataTaskPublisher(for: url)
}

The Output type of the data task publisher is a tuple with two values, a Data object and a URLResponse object. The Failure type of the data task publisher is URLError. Let's keep it simple for now. Because we are only interested in the Data object, we apply the map operator to transform the values emitted by the data task publisher.

// MARK: - Helper Methods

private func fetchWeatherData(for location: CLLocation) {
    // Cancel In Progress Data Task
    weatherDataTask?.cancel()

    // Helpers
    let latitude = location.coordinate.latitude
    let longitude = location.coordinate.longitude

    // Create URL
    let url = WeatherServiceRequest(latitude: latitude, longitude: longitude).url

    // Create Data Task Publisher
    URLSession.shared.dataTaskPublisher(for: url)
        .map { $0.data }
}

To decode the response, we apply the decode operator. This convenience operator decodes the Data object. The decode(type:decoder:) method accepts the type of the decoded data as its first argument and a decoder, an object that conforms to the TopLevelDecoder protocol, as its second argument. Because we expect a JSON response, we pass a JSONDecoder instance as the second argument.

// MARK: - Helper Methods

private func fetchWeatherData(for location: CLLocation) {
    // Cancel In Progress Data Task
    weatherDataTask?.cancel()

    // Helpers
    let latitude = location.coordinate.latitude
    let longitude = location.coordinate.longitude

    // Create URL
    let url = WeatherServiceRequest(latitude: latitude, longitude: longitude).url

    // Create Data Task Publisher
    URLSession.shared.dataTaskPublisher(for: url)
        .map { $0.data }
        .decode(type: WeatherData.self, decoder: JSONDecoder())
}

We invoke the sink(receiveCompletion:receiveValue:) method to subscribe to the resulting publisher and store the subscription in the set of subscriptions. Because the Failure type of the resulting publisher is Error, the sink(receiveCompletion:receiveValue:) method accepts two closures. The first closure is executed when the publisher completes, with or without an error. The second closure is executed every time the publisher emits a value. This is the first time we work with a publisher whose Failure type isn't equal to Never. This means we need to handle the error if the publisher completes with an error. I talk more about that later.

// MARK: - Helper Methods

private func fetchWeatherData(for location: CLLocation) {
    // Cancel In Progress Data Task
    weatherDataTask?.cancel()

    // Helpers
    let latitude = location.coordinate.latitude
    let longitude = location.coordinate.longitude

    // Create URL
    let url = WeatherServiceRequest(latitude: latitude, longitude: longitude).url

    // Create Data Task Publisher
    URLSession.shared.dataTaskPublisher(for: url)
        .map { $0.data }
        .decode(type: WeatherData.self, decoder: JSONDecoder())
        .sink { completion in
        
        } receiveValue: { weatherData in
            
        }
        .store(in: &subscriptions)
}

In the completion closure, the view model switches on the completion object, an object of type Subscribers.Completion that defines two cases, finished if the publisher completed without an error and failure if the publisher completed due to an error. The associated value of the failure case is the error that caused the publisher to terminate. We print the error to the console in the failure case.

// MARK: - Helper Methods

private func fetchWeatherData(for location: CLLocation) {
    // Cancel In Progress Data Task
    weatherDataTask?.cancel()

    // Helpers
    let latitude = location.coordinate.latitude
    let longitude = location.coordinate.longitude

    // Create URL
    let url = WeatherServiceRequest(latitude: latitude, longitude: longitude).url

    // Create Data Task Publisher
    URLSession.shared.dataTaskPublisher(for: url)
        .map { $0.data }
        .decode(type: WeatherData.self, decoder: JSONDecoder())
        .sink { completion in
            switch completion {
            case .finished:
                break
            case .failure(let error):
                print("Unable to Fetch Weather Data \(error)")
            }
        } receiveValue: { weatherData in
            
        }
        .store(in: &subscriptions)
}

In the value closure, we define a capture list to weakly reference self and use the WeatherData object to create a WeatherDataState object. We pass the WeatherDataState object to the send(_:) method of weatherDataStateSubject.

// MARK: - Helper Methods

private func fetchWeatherData(for location: CLLocation) {
    // Cancel In Progress Data Task
    weatherDataTask?.cancel()

    // Helpers
    let latitude = location.coordinate.latitude
    let longitude = location.coordinate.longitude

    // Create URL
    let url = WeatherServiceRequest(latitude: latitude, longitude: longitude).url

    // Create Data Task Publisher
    URLSession.shared.dataTaskPublisher(for: url)
        .map { $0.data }
        .decode(type: WeatherData.self, decoder: JSONDecoder())
        .sink { completion in
            switch completion {
            case .finished:
                break
            case .failure(let error):
                print("Unable to Fetch Weather Data \(error)")
            }
        } receiveValue: { [weak self] weatherData in
            self?.weatherDataStateSubject.send(.data(weatherData))
        }
        .store(in: &subscriptions)
}

Before we build and run the application, we need to make a small change. The JSONDecoder instance the view model passes to the decode(type:decoder) method isn't properly configured for the weather API. This becomes clear if we take a look at the implementation of the didFetchWeatherData(data:response:error:) method. The solution is easy. Before we create the data task publisher, we create a JSONDecoder instance and set its dateDecodingStrategy property to secondsSince1970. We pass the JSONDecoder instance to the decode(type:decoder) method.

// MARK: - Helper Methods

private func fetchWeatherData(for location: CLLocation) {
    // Cancel In Progress Data Task
    weatherDataTask?.cancel()

    // Helpers
    let latitude = location.coordinate.latitude
    let longitude = location.coordinate.longitude

    // Create URL
    let url = WeatherServiceRequest(latitude: latitude, longitude: longitude).url
    
    // Create JSON Decoder
    let decoder = JSONDecoder()

    // Configure JSON Decoder
    decoder.dateDecodingStrategy = .secondsSince1970

    // Create Data Task Publisher
    URLSession.shared.dataTaskPublisher(for: url)
        .map { $0.data }
        .decode(type: WeatherData.self, decoder: decoder)
        .sink { completion in
            switch completion {
            case .finished:
                break
            case .failure(let error):
                print("Unable to Fetch Weather Data \(error)")
            }
        } receiveValue: { [weak self] weatherData in
            self?.weatherDataStateSubject.send(.data(weatherData))
        }
        .store(in: &subscriptions)
}

Build and run the application to see the result. Cloudy doesn't display weather data and Xcode throws a runtime error. What is happening?

Xcode throws a runtime error.

Working with Schedulers

The completion and value closures of the sink(receiveCompletion:receiveValue:) method are executed on a background thread. This isn't surprising because a URL session performs its work on a background thread, making sure the main thread isn't blocked and the application stays responsive.

Because weatherDataStateSubject drives the user interface of the application, we are trying to update the user interface from a background thread. Remember that the user interface should always be updated from the main thread. That is why Xcode throws a runtime error and Cloudy doesn't display the weather data it fetched from the weather API.

We could use Grand Central Dispatch to resolve this problem, but Combine provides a more elegant solution.

We need to make sure the completion and value closures of the sink(receiveCompletion:receiveValue:) method are executed on the main thread. Combine makes this trivial with the receive operator. The receive(on:) method accepts an object that conforms to the Scheduler protocol. We cover schedulers in more detail later in this series. What you need to know for now is that a scheduler defines on which thread the completion and value closures are executed. Let me show you how that works.

We apply the receive operator to the resulting publisher of the decode operator. The receive(on:) method accepts a scheduler, an object that conforms to the Scheduler protocol. The DispatchQueue class conforms to the Scheduler protocol, which means we can pass a reference to the main dispatch queue to the receive(on:) method.

// MARK: - Helper Methods

private func fetchWeatherData(for location: CLLocation) {
    // Cancel In Progress Data Task
    weatherDataTask?.cancel()

    // Helpers
    let latitude = location.coordinate.latitude
    let longitude = location.coordinate.longitude

    // Create URL
    let url = WeatherServiceRequest(latitude: latitude, longitude: longitude).url
    
    // Create JSON Decoder
    let decoder = JSONDecoder()

    // Configure JSON Decoder
    decoder.dateDecodingStrategy = .secondsSince1970

    // Create Data Task Publisher
    URLSession.shared.dataTaskPublisher(for: url)
        .map { $0.data }
        .decode(type: WeatherData.self, decoder: decoder)
        .receive(on: DispatchQueue.main)
        .sink { completion in
            switch completion {
            case .finished:
                break
            case .failure(let error):
                print("Unable to Fetch Weather Data \(error)")
            }
        } receiveValue: { [weak self] weatherData in
            self?.weatherDataStateSubject.send(.data(weatherData))
        }
        .store(in: &subscriptions)
}

Let me summarize what changed. Combine uses the scheduler that is passed to the receive(on:) method to execute the completion and value closures of the sink(receiveCompletion:receiveValue:) method. Because the scheduler we pass to the receive(on:) method is a reference to the main dispatch queue, the completion and value closures are executed on the main thread.

Build and run the application to see the result. Xcode no longer throws a runtime error and Cloudy displays the weather data it fetched from the weather API.

What's Next?

Fetching and decoding the weather data is concise and declarative thanks to Combine. Schedulers make it straightforward to define when and where a block of work is executed. We cover schedulers in more detail later in this series.

The next episode focuses on error handling, an important aspect of reactive programming. The current implementation needs some work. Printing the error to the console doesn't result in a great user experience. We fix that in the next episode.