You are viewing the preview version of this book
Click here for the full version.

Values over time

Code example

Implementation here.

In this example, the backend sends a stream of values taken from a temperature sensor. In this case, values never change, only new measurements can appear.

In the schema, there is a paginated query to fetch values for existing measurements and a subscription for new ones:

type Temperature {
  value: Float!
  timestamp: AWSDateTime!
}

type TemperatureConnection {
  items: [Temperature!]!
  nextToken: String
}

type Subscription {
  temperature: Temperature
}

type Query {
  temperatures(nextToken: String): TemperatureConnection!
}

As the query can return more items than the client might want to show, it needs to halt pagination at some point. In this example, that cut-off point will be 5 minutes.

Implementation

The usage to the implementation will be similar to the previous example: an observable that pushes items as they are available:

tempUpdateStream({
  opened: () => setSubscriptionStatus(true),
  closed: () => setSubscriptionStatus(false),
}).subscribe((list) => updateTemperature(list));

We'll handle two scenarios: one that pushes only new values, for example to show the "current temperature" value, and one that pushes all measurements after the cut-off point. Both will build upon the above function.

The easiest is the latter: push all updates. Here, we'll only need a mergeMap that emits the values one-by-one from the list:

tempUpdateStream({...}).pipe(
  mergeMap((items) => items),
).subscribe(...)
The graph showing all recent measurements

To get only the latest values, we'll use a scan combined with a distinctUntilChanged:

tempUpdateStream({...}).pipe(
  scan((acc, items) =>
    [...(acc ? [acc] : []), ...items].sort(sorter).reverse()[0]
  , undefined),
  distinctUntilChanged(),
).subscribe(...)

The scan keeps track of the latest value and emits it for each value. Then the distinctUntilChanged removes the duplicates.

The graph updating only with newest values where missed updates are skipped
tempUpdateStream function

There is more, but you've reached the end of this preview
Read this and all other chapters in full and get lifetime access to:
  • all future updates
  • full web-based access
  • PDF and Epub versions