Implementation here.
In this example, the backend sends a stream of values taken from a temperature sensor. In this case, values never change, only new measurements can appear.
In the schema, there is a paginated query to fetch values for existing measurements and a subscription for new ones:
type Temperature {
value: Float!
timestamp: AWSDateTime!
}
type TemperatureConnection {
items: [Temperature!]!
nextToken: String
}
type Subscription {
temperature: Temperature
}
type Query {
temperatures(nextToken: String): TemperatureConnection!
}
As the query can return more items than the client might want to show, it needs to halt pagination at some point. In this example, that cut-off point will be 5 minutes.
The usage to the implementation will be similar to the previous example: an observable that pushes items as they are available:
tempUpdateStream({
opened: () => setSubscriptionStatus(true),
closed: () => setSubscriptionStatus(false),
}).subscribe((list) => updateTemperature(list));
We'll handle two scenarios: one that pushes only new values, for example to show the "current temperature" value, and one that pushes all measurements after the cut-off point. Both will build upon the above function.
The easiest is the latter: push all updates. Here, we'll only need a mergeMap
that emits the values one-by-one from the list:
tempUpdateStream({...}).pipe(
mergeMap((items) => items),
).subscribe(...)
To get only the latest values, we'll use a scan
combined with a distinctUntilChanged
:
tempUpdateStream({...}).pipe(
scan((acc, items) =>
[...(acc ? [acc] : []), ...items].sort(sorter).reverse()[0]
, undefined),
distinctUntilChanged(),
).subscribe(...)
The scan
keeps track of the latest value and emits it for each value. Then the distinctUntilChanged
removes the duplicates.