You have an API key and you've seen the Parquet dumps. Now what? This guide walks you through building a complete Polymarket database from scratch and keeping it synced in real time. By the end, you'll have every trade, position, split, merge, and redemption since 2022 in your own database, updated within seconds of on-chain activity.
The end result is a PostgreSQL database with 7 tables covering every on-chain Polymarket event:
| Table | Rows | Type |
|---|---|---|
| order_filled_events | ~865M | Append-only (INSERT) |
| positions | ~150M | Mutable (UPSERT) |
| payout_redemptions | ~93M | Append-only |
| position_splits | ~7.7M | Append-only |
| position_merges | ~7.8M | Append-only |
| position_conversions | ~1.8M | Append-only |
| markets | ~15K | Lookup (replace daily) |
This is the same data that powers predmktdata's API — every fill, every position change, every payout, going back to 2022. The difference is it lives in your database, where you can query it however you want.
Create a PostgreSQL database and set up the tables. All amounts are stored as BIGINT in raw on-chain units — divide by 1e6 when you query to get USDC values.
-- Create database CREATE DATABASE polymarket; -- Event tables (append-only) CREATE TABLE order_filled_events ( timestamp TIMESTAMPTZ, transaction_hash TEXT, block_number BIGINT, maker TEXT, taker TEXT, maker_asset_id TEXT, taker_asset_id TEXT, maker_amount_filled BIGINT, taker_amount_filled BIGINT, fee BIGINT, side TEXT ); CREATE TABLE payout_redemptions ( timestamp TIMESTAMPTZ, transaction_hash TEXT, block_number BIGINT, redeemer TEXT, condition_id TEXT, payout BIGINT ); CREATE TABLE position_splits ( timestamp TIMESTAMPTZ, transaction_hash TEXT, block_number BIGINT, stakeholder TEXT, condition_id TEXT, amount BIGINT ); CREATE TABLE position_merges ( timestamp TIMESTAMPTZ, transaction_hash TEXT, block_number BIGINT, stakeholder TEXT, condition_id TEXT, amount BIGINT ); CREATE TABLE position_conversions ( timestamp TIMESTAMPTZ, transaction_hash TEXT, block_number BIGINT, stakeholder TEXT, condition_id TEXT, index_set BIGINT, amount BIGINT ); -- Positions (mutable — use UPSERT) CREATE TABLE positions ( user_address TEXT, token_id TEXT, amount BIGINT, avg_price BIGINT, realized_pnl BIGINT, total_bought BIGINT, last_block BIGINT, PRIMARY KEY (user_address, token_id) ); -- Markets lookup CREATE TABLE markets ( condition_id TEXT PRIMARY KEY, question TEXT, outcome_yes TEXT, outcome_no TEXT, yes_token_id TEXT, no_token_id TEXT, market_slug TEXT, end_date_iso TEXT, neg_risk BOOLEAN ); -- Sync cursor (tracks where you left off) CREATE TABLE sync_state ( key TEXT PRIMARY KEY, value BIGINT );
TEXT, not integers. They're uint256 values (up to 78 digits) that overflow every numeric type. Always keep them as strings.
timestamp column and omit internal fields (log_index, exchange, source). The /events API includes those extra columns and names the timestamp block_timestamp. The schema above matches the dump columns. If you plan to also ingest from /events, add the extra columns or select only the dump columns when inserting.
The Parquet dumps contain the complete history. Each event table has one file per day (e.g., order_filled_events/20250115.parquet). Positions are a full snapshot (positions/positions_YYYYMMDD_<block>.parquet). Markets is a single file (markets/markets.parquet).
# See what's available curl -s -H "x-api-key: YOUR_KEY" https://api.predmktdata.com/dumps | python3 -m json.tool
This Python script downloads all dump files in parallel. It skips files you already have, so you can run it daily to grab only new data.
import requests, os, concurrent.futures API = "https://api.predmktdata.com" KEY = os.environ["PREDMKTDATA_API_KEY"] OUT = "./dumps" # Get file list resp = requests.get(f"{API}/dumps", headers={"x-api-key": KEY}).json() files = resp["files"] def download(f): path = f"{OUT}/{f['path']}" if os.path.exists(path): return f"skip {f['path']}" os.makedirs(os.path.dirname(path), exist_ok=True) r = requests.get(f"{API}/dumps/{f['path']}", headers={"x-api-key": KEY}, allow_redirects=True) with open(path, "wb") as fh: fh.write(r.content) return f"done {f['path']} ({len(r.content) / 1e6:.1f} MB)" # Download in parallel (8 threads) with concurrent.futures.ThreadPoolExecutor(max_workers=8) as pool: for result in pool.map(download, files): print(result)
order_filled_events is the largest table at ~18 GB. Downloading everything takes 10-30 minutes depending on your connection.
DuckDB is the fastest way to load Parquet into PostgreSQL. It reads Parquet natively and can write directly to Postgres via its postgres extension.
# Install DuckDB if you don't have it pip install duckdb python3 <<'SCRIPT' import duckdb db = duckdb.connect() db.execute("INSTALL postgres; LOAD postgres;") db.execute(""" ATTACH 'dbname=polymarket user=postgres' AS pg (TYPE POSTGRES); """) tables = { "order_filled_events": "dumps/order_filled_events/*.parquet", "payout_redemptions": "dumps/payout_redemptions/*.parquet", "position_splits": "dumps/position_splits/*.parquet", "position_merges": "dumps/position_merges/*.parquet", "position_conversions": "dumps/position_conversions/*.parquet", } # Load event tables (append-only) for table, pattern in tables.items(): print(f"Loading {table}...") db.execute(f"INSERT INTO pg.{table} SELECT * FROM read_parquet('{pattern}')") count = db.execute(f"SELECT count(*) FROM pg.{table}").fetchone()[0] print(f" {count:,} rows") # Positions: INSERT (table is empty on first load) # File is named positions_YYYYMMDD_<block>.parquet print("Loading positions...") db.execute(""" INSERT INTO pg.positions SELECT * FROM read_parquet('dumps/positions/positions_*.parquet') """) # Markets: replace entirely print("Loading markets...") db.execute("DELETE FROM pg.markets") db.execute("INSERT INTO pg.markets SELECT * FROM read_parquet('dumps/markets/markets.parquet')") print("Done!") SCRIPT
This loads the entire history in one shot. On a decent machine, expect ~30 minutes for the full 865M+ fills. The smaller tables load in seconds.
Once the data is loaded, add indexes for the queries you'll run most. These are optional but make a huge difference on 865M rows.
-- Essential indexes CREATE INDEX idx_fills_block ON order_filled_events (block_number); CREATE INDEX idx_fills_maker ON order_filled_events (maker); CREATE INDEX idx_fills_taker ON order_filled_events (taker); CREATE INDEX idx_fills_ts ON order_filled_events (timestamp); CREATE INDEX idx_pos_amount ON positions (amount); CREATE INDEX idx_pos_block ON positions (last_block); CREATE INDEX idx_redeem_block ON payout_redemptions (block_number); -- Markets: for joining token_id to market info CREATE INDEX idx_markets_yes ON markets (yes_token_id); CREATE INDEX idx_markets_no ON markets (no_token_id);
Your database now has the full history. But Polymarket never stops — new trades happen every second. There are three ways to stay current, depending on your plan and latency needs.
The simplest approach. Run the download script from Step 2 once a day. It only downloads new files, so it takes seconds. Then load the new day's data:
# In your daily cron job: # 1. Download new dump files python3 download_dumps.py # 2. Load today's events python3 -c " import duckdb, datetime db = duckdb.connect() db.execute('INSTALL postgres; LOAD postgres;') db.execute(\"ATTACH 'dbname=polymarket' AS pg (TYPE POSTGRES);\") today = datetime.date.today().strftime('%Y%m%d') for t in ['order_filled_events', 'payout_redemptions', 'position_splits', 'position_merges', 'position_conversions']: f = f'dumps/{t}/{today}.parquet' db.execute(f'INSERT INTO pg.{t} SELECT * FROM read_parquet(\"{f}\")') " # 3. Refresh positions (UPSERT) and markets (replace)
This keeps you at most ~24 hours behind. Good enough for daily analytics, backtesting, and research.
For near real-time sync. After the initial backfill, poll the /events endpoint every few seconds to get new events as they happen.
import requests, psycopg2, csv, io, time API = "https://api.predmktdata.com" KEY = "YOUR_KEY" conn = psycopg2.connect("dbname=polymarket") # Start from where the dumps left off with conn.cursor() as cur: cur.execute("SELECT COALESCE(MAX(last_block), 0) FROM positions") cursor = cur.fetchone()[0] while True: for table in ["order_filled_events", "positions", "payout_redemptions", "position_splits", "position_merges", "position_conversions"]: r = requests.get(f"{API}/events", headers={"x-api-key": KEY}, params={"after_block": cursor, "tables": table}) if r.status_code == 200 and r.text.strip(): reader = csv.DictReader(io.StringIO(r.text)) rows = list(reader) if rows: # Insert into your database insert_rows(conn, table, rows) # Track cursor from response headers new_cursor = int(r.headers.get("x-last-block", cursor)) cursor = max(cursor, new_cursor) # Save cursor with conn.cursor() as cur: cur.execute("""INSERT INTO sync_state (key, value) VALUES ('cursor', %s) ON CONFLICT (key) DO UPDATE SET value = %s""", (cursor, cursor)) conn.commit() time.sleep(5) # Poll every 5 seconds
x-reorg response header. If present, the chain reorganized and you need to roll back to the indicated block. Delete events with block_number > x-reorg and re-fetch from there.
The /ws/stream WebSocket pushes every event to you the moment the indexer commits it. No polling, no gaps, sub-second latency.
import asyncio, websockets, json, psycopg2 conn = psycopg2.connect("dbname=polymarket") # Get your sync cursor with conn.cursor() as cur: cur.execute("SELECT value FROM sync_state WHERE key = 'cursor'") row = cur.fetchone() cursor = row[0] if row else 0 async def stream(): uri = (f"wss://api.predmktdata.com/ws/stream" f"?x_api_key=YOUR_KEY&start_block={cursor}") async with websockets.connect(uri) as ws: async for msg in ws: batch = json.loads(msg) if batch["type"] == "ping": continue if batch["type"] == "caught_up": print(f"Caught up at block {batch['block']}") continue # batch["type"] == "batch" # Contains: from_block, to_block, and arrays for each table for table in ["order_filled_events", "positions", "payout_redemptions", "position_splits", "position_merges", "position_conversions"]: rows = batch.get(table, []) if rows: insert_rows(conn, table, rows) # Update cursor new_block = batch["to_block"] with conn.cursor() as cur: cur.execute("""INSERT INTO sync_state (key, value) VALUES ('cursor', %s) ON CONFLICT (key) DO UPDATE SET value = %s""", (new_block, new_block)) conn.commit() asyncio.run(stream())
The stream starts by catching up from your start_block — it rapidly sends batches of historical data until you're at the chain tip. Then it switches to live mode, pushing new events every few seconds as the indexer commits them.
Event tables are append-only — just INSERT. But positions are mutable. The same wallet can buy more of the same token, and its position row gets updated. Always use UPSERT:
INSERT INTO positions (user_address, token_id, amount, avg_price, realized_pnl, total_bought, last_block) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (user_address, token_id) DO UPDATE SET amount = EXCLUDED.amount, avg_price = EXCLUDED.avg_price, realized_pnl = EXCLUDED.realized_pnl, total_bought = EXCLUDED.total_bought, last_block = EXCLUDED.last_block;
condition_id.
After loading, run a quick sanity check to make sure your row counts match the source:
-- Compare your counts to the /status endpoint SELECT 'order_filled_events' as table_name, count(*) FROM order_filled_events UNION ALL SELECT 'positions', count(*) FROM positions UNION ALL SELECT 'payout_redemptions', count(*) FROM payout_redemptions UNION ALL SELECT 'position_splits', count(*) FROM position_splits UNION ALL SELECT 'position_merges', count(*) FROM position_merges UNION ALL SELECT 'position_conversions', count(*) FROM position_conversions;
Then compare the results to what GET /status returns. They should be within a day's worth of data.
Now you have the full dataset. Here are some things you can do that aren't possible with any API:
-- Top 10 wallets by total trading volume (USDC) SELECT maker, SUM(maker_amount_filled) / 1e6 AS volume_usdc FROM order_filled_events GROUP BY maker ORDER BY volume_usdc DESC LIMIT 10; -- Wallets with the highest realized PnL SELECT user_address, SUM(realized_pnl) / 1e6 AS total_pnl_usdc FROM positions GROUP BY user_address ORDER BY total_pnl_usdc DESC LIMIT 20; -- Daily trading volume over the last month SELECT timestamp::date AS day, SUM(maker_amount_filled) / 1e6 AS volume_usdc, COUNT(*) AS trades FROM order_filled_events WHERE timestamp > now() - interval '30 days' GROUP BY day ORDER BY day; -- Find which market a position belongs to SELECT p.user_address, p.amount / 1e6 AS amount_usdc, m.question, m.outcome_yes FROM positions p JOIN markets m ON m.yes_token_id = p.token_id OR m.no_token_id = p.token_id WHERE p.user_address = '0x...' ORDER BY amount_usdc DESC;
COPY for bulk loads. For the initial backfill, COPY (or DuckDB's bulk insert) is 10-100x faster than individual INSERTs.order_filled_events. For PostgreSQL, consider partitioning by month or quarter. Makes date-range queries and maintenance much faster on 865M+ rows.sync_state.value falls more than a few hundred blocks behind /status's head_block.to_block from your sync_state.Sign up, get your API key, and start downloading. Full history since 2022.
Get started