Docs/Guides/Time Series Pipeline

Building a time series pipeline

This guide walks through building an automated pipeline that ingests end-of-day price data into Ptolemy time series values. We'll cover discovery, bulk ingestion, and efficient daily updates.

Before you start

A scalar time series value requires three identifiers: an instrument, a series type, and a timestamp. Before ingesting data, confirm these exist in your workspace:

Instruments
The instruments you want to price must exist. See the Sync Instrumentsguide if you haven’t loaded them yet.
Series type
The series type (e.g. close-price) must be created by an Editor in your workspace before you can ingest values.
Timestamp format
Timestamps must be ISO 8601 strings in UTC. For end-of-day data, use YYYY-MM-DDT00:00:00Z (midnight UTC on the trade date).

Step 1 — Look up series type ID

Fetch the series type once and cache it. You’ll need the UUID (or identifier) for every value you create.

resp = client.get("/series/close-price").json() series_type_id = resp["identifier"] # or use resp["id"] for the UUID

Step 2 — Ingest daily prices

For each price record, POST a scalar time series value. Use the instrument identifier directly — Ptolemy resolves it to a UUID internally.

import concurrent.futures def ingest_price(record): return client.post("/time-series-values", json={ "instrument_id": record["ticker"], "type_id": series_type_id, "timestamp": f"{record['date']}T00:00:00Z", "value": str(record["close"]) }{) with concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool: futures = [pool.submit(ingest_price, r) for r in price_records] for f in concurrent.futures.as_completed(futures): if f.exception(): logger.error(f.exception())
Keep concurrent requests to 10–20 to stay well within burst limits. For very large ingestion batches, consider the Import API instead.

Step 3 — Update existing values

If you’re re-running the pipeline for a date that already has data (e.g. to correct a price), use PATCH instead of POST. Look up the existing value first:

# Find existing value existing = client.post("/time-series-values/filter", json={ "filter": { "and": [ { "field": "instrument_id", "op": "eq", "value": "bhp.ax" }, { "field": "type_id", "op": "eq", "value": "close-price" }, { "field": "timestamp", "op": "eq", "value": "2026-04-15T00:00:00Z" } ] } }).json() if existing["data"]: value_id = existing["data"][0]["id"] client.patch(f"/time-series-values/{value_id}", json={"value": "45.82"})

Querying ingested data

Retrieve prices for a range of dates using the filter endpoint:

Close prices for BHP in April 2026
POST /time-series-values/filter { "filter": { "and": [ { "field": "instrument_id", "op": "eq", "value": "bhp.ax" }, { "field": "type_id", "op": "eq", "value": "close-price" }, { "field": "timestamp", "op": "gte", "value": "2026-04-01T00:00:00Z" }, { "field": "timestamp", "op": "lte", "value": "2026-04-30T23:59:59Z" } ] }, "sort": [{ "field": "timestamp", "direction": "asc" }] }
PrivacyTermsStatus© 2025 Ptolemy Pty Ltd