Docs/Guides/Sync Instruments Sync instruments from an external source
This guide walks through building a reliable synchronisation pipeline that keeps your Ptolemy instrument records up to date with an external data source such as a Bloomberg feed, a CSV from a vendor, or your own internal database.
Overview
The sync pattern has three steps:
1. Fetch source data
Pull the current record set from your external source. This might be a Bloomberg SFTP file, a vendor REST API, or a database query.
2. Upsert to Ptolemy
For each record, create or update the instrument in Ptolemy using identifier-based lookups. Field values are merged atomically.
3. Archive removals
For instruments no longer present in the source, archive (soft-delete) them in Ptolemy rather than hard-deleting, preserving historical data.
Step 1 — Discover existing instruments
Before syncing, fetch all existing Ptolemy instruments so you can diff against the source. Use the filter API with a large page size to iterate efficiently.
def get_all_instruments(client, type_id: str) -> dict[str, dict]:
instruments = {}
cursor = None
while True:
body = {"type_id": type_id, "limit": 500}
if cursor: body["after"] = cursor
resp = client.post("/instruments/filter", json=body).json()
for inst in resp["data"]:
instruments[inst["identifier"]] = inst
if not resp["meta"]["has_more"]: break
cursor = resp["meta"]["next_cursor"]
return instruments
Step 2 — Upsert changed records
Compare each source record against the current Ptolemy state. Create new instruments and update changed ones. Only send PATCH requests for records that have actually changed to avoid unnecessary API calls.
for record in source_records:
existing = ptolemy_instruments.get(record["identifier"])
if existing is None:
# New instrument — create it
client.post("/instruments", json={
"name": record["name"],
"identifier": record["identifier"],
"type_id": "equity",
"field_values": record["fields"]
}{)
elif has_changed(existing, record):
# Changed — update it
client.patch(f"/instruments/{record['identifier']}", json={
"field_values": record["fields"]
}{)
# No change — skip
Step 3 — Archive removed records
After processing all source records, archive any Ptolemy instruments that were not seen in the source feed. This indicates they’ve been delisted or removed upstream.
source_ids = {r["identifier"] for r in source_records}
for identifier, inst in ptolemy_instruments.items():
if identifier not in source_ids and inst["archived_at"] is None:
client.delete(f"/instruments/{identifier}")
logger.info(f"Archived {identifier} — not in source feed")
Archive rather than hard-delete. Archived instruments still have their time series history intact, letting you query historical prices even after a stock has been delisted.
Error handling and idempotency
Build your sync to be idempotent — running it twice should produce the same result as running it once. Key practices:
Catch and log, don't abort
Wrap each create/update in try/except. A single bad record shouldn’t stop the entire sync. Log failures and alert separately.
Retry on 429
Check the Retry-After header and sleep accordingly. Implement exponential backoff for 5xx responses.
Use identifiers, not UUIDs
Reference instruments by your own identifier (e.g. bhp.ax) so your sync is stable across re-runs without needing to store Ptolemy UUIDs.
Run on a schedule
Use a cron job or workflow scheduler (Airflow, Prefect) to run the sync on a regular cadence. Daily syncs are typical for reference data.