How to Build a Complete Polymarket Database

April 2026 10 min read
TL;DR: Download the full history as Parquet dumps, load them into PostgreSQL (or any database), then keep it updated in real time using the /events API or /ws/stream WebSocket. You'll have a complete, queryable copy of every Polymarket trade since 2022 — always up to date.

You have an API key and you've seen the Parquet dumps. Now what? This guide walks you through building a complete Polymarket database from scratch and keeping it synced in real time. By the end, you'll have every trade, position, split, merge, and redemption since 2022 in your own database, updated within seconds of on-chain activity.

Parquet dumps (full history) → Your database/events API (real-time gap fill)

What you'll build

The end result is a PostgreSQL database with 7 tables covering every on-chain Polymarket event:

TableRowsType
order_filled_events~865MAppend-only (INSERT)
positions~150MMutable (UPSERT)
payout_redemptions~93MAppend-only
position_splits~7.7MAppend-only
position_merges~7.8MAppend-only
position_conversions~1.8MAppend-only
markets~15KLookup (replace daily)

This is the same data that powers predmktdata's API — every fill, every position change, every payout, going back to 2022. The difference is it lives in your database, where you can query it however you want.

Step 1: Create the schema

Create a PostgreSQL database and set up the tables. All amounts are stored as BIGINT in raw on-chain units — divide by 1e6 when you query to get USDC values.

-- Create database
CREATE DATABASE polymarket;

-- Event tables (append-only)
CREATE TABLE order_filled_events (
    timestamp      TIMESTAMPTZ,
    transaction_hash TEXT,
    block_number   BIGINT,
    maker          TEXT,
    taker          TEXT,
    maker_asset_id TEXT,
    taker_asset_id TEXT,
    maker_amount_filled BIGINT,
    taker_amount_filled BIGINT,
    fee            BIGINT,
    side           TEXT
);

CREATE TABLE payout_redemptions (
    timestamp      TIMESTAMPTZ,
    transaction_hash TEXT,
    block_number   BIGINT,
    redeemer       TEXT,
    condition_id   TEXT,
    payout         BIGINT
);

CREATE TABLE position_splits (
    timestamp      TIMESTAMPTZ,
    transaction_hash TEXT,
    block_number   BIGINT,
    stakeholder    TEXT,
    condition_id   TEXT,
    amount         BIGINT
);

CREATE TABLE position_merges (
    timestamp      TIMESTAMPTZ,
    transaction_hash TEXT,
    block_number   BIGINT,
    stakeholder    TEXT,
    condition_id   TEXT,
    amount         BIGINT
);

CREATE TABLE position_conversions (
    timestamp      TIMESTAMPTZ,
    transaction_hash TEXT,
    block_number   BIGINT,
    stakeholder    TEXT,
    condition_id   TEXT,
    index_set      BIGINT,
    amount         BIGINT
);

-- Positions (mutable — use UPSERT)
CREATE TABLE positions (
    user_address   TEXT,
    token_id       TEXT,
    amount         BIGINT,
    avg_price      BIGINT,
    realized_pnl   BIGINT,
    total_bought   BIGINT,
    last_block     BIGINT,
    PRIMARY KEY (user_address, token_id)
);

-- Markets lookup
CREATE TABLE markets (
    condition_id   TEXT PRIMARY KEY,
    question       TEXT,
    outcome_yes    TEXT,
    outcome_no     TEXT,
    yes_token_id   TEXT,
    no_token_id    TEXT,
    market_slug    TEXT,
    end_date_iso   TEXT,
    neg_risk       BOOLEAN
);

-- Sync cursor (tracks where you left off)
CREATE TABLE sync_state (
    key   TEXT PRIMARY KEY,
    value BIGINT
);
Token IDs are stored as TEXT, not integers. They're uint256 values (up to 78 digits) that overflow every numeric type. Always keep them as strings.
Dumps vs. /events API columns: The Parquet dumps include a timestamp column and omit internal fields (log_index, exchange, source). The /events API includes those extra columns and names the timestamp block_timestamp. The schema above matches the dump columns. If you plan to also ingest from /events, add the extra columns or select only the dump columns when inserting.

Step 2: Download and load the dumps

The Parquet dumps contain the complete history. Each event table has one file per day (e.g., order_filled_events/20250115.parquet). Positions are a full snapshot (positions/positions_YYYYMMDD_<block>.parquet). Markets is a single file (markets/markets.parquet).

List available files

# See what's available
curl -s -H "x-api-key: YOUR_KEY" https://api.predmktdata.com/dumps | python3 -m json.tool

Download with a script

This Python script downloads all dump files in parallel. It skips files you already have, so you can run it daily to grab only new data.

import requests, os, concurrent.futures

API = "https://api.predmktdata.com"
KEY = os.environ["PREDMKTDATA_API_KEY"]
OUT = "./dumps"

# Get file list
resp = requests.get(f"{API}/dumps", headers={"x-api-key": KEY}).json()
files = resp["files"]

def download(f):
    path = f"{OUT}/{f['path']}"
    if os.path.exists(path):
        return f"skip {f['path']}"
    os.makedirs(os.path.dirname(path), exist_ok=True)
    r = requests.get(f"{API}/dumps/{f['path']}",
                     headers={"x-api-key": KEY}, allow_redirects=True)
    with open(path, "wb") as fh:
        fh.write(r.content)
    return f"done {f['path']} ({len(r.content) / 1e6:.1f} MB)"

# Download in parallel (8 threads)
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as pool:
    for result in pool.map(download, files):
        print(result)
The full dump set is ~25 GB compressed (Parquet with ZSTD). order_filled_events is the largest table at ~18 GB. Downloading everything takes 10-30 minutes depending on your connection.

Load into PostgreSQL

DuckDB is the fastest way to load Parquet into PostgreSQL. It reads Parquet natively and can write directly to Postgres via its postgres extension.

# Install DuckDB if you don't have it
pip install duckdb

python3 <<'SCRIPT'
import duckdb

db = duckdb.connect()
db.execute("INSTALL postgres; LOAD postgres;")
db.execute("""
    ATTACH 'dbname=polymarket user=postgres' AS pg (TYPE POSTGRES);
""")

tables = {
    "order_filled_events": "dumps/order_filled_events/*.parquet",
    "payout_redemptions":  "dumps/payout_redemptions/*.parquet",
    "position_splits":     "dumps/position_splits/*.parquet",
    "position_merges":     "dumps/position_merges/*.parquet",
    "position_conversions": "dumps/position_conversions/*.parquet",
}

# Load event tables (append-only)
for table, pattern in tables.items():
    print(f"Loading {table}...")
    db.execute(f"INSERT INTO pg.{table} SELECT * FROM read_parquet('{pattern}')")
    count = db.execute(f"SELECT count(*) FROM pg.{table}").fetchone()[0]
    print(f"  {count:,} rows")

# Positions: INSERT (table is empty on first load)
# File is named positions_YYYYMMDD_<block>.parquet
print("Loading positions...")
db.execute("""
    INSERT INTO pg.positions
    SELECT * FROM read_parquet('dumps/positions/positions_*.parquet')
""")

# Markets: replace entirely
print("Loading markets...")
db.execute("DELETE FROM pg.markets")
db.execute("INSERT INTO pg.markets SELECT * FROM read_parquet('dumps/markets/markets.parquet')")

print("Done!")
SCRIPT

This loads the entire history in one shot. On a decent machine, expect ~30 minutes for the full 865M+ fills. The smaller tables load in seconds.

Step 3: Add indexes

Once the data is loaded, add indexes for the queries you'll run most. These are optional but make a huge difference on 865M rows.

-- Essential indexes
CREATE INDEX idx_fills_block ON order_filled_events (block_number);
CREATE INDEX idx_fills_maker ON order_filled_events (maker);
CREATE INDEX idx_fills_taker ON order_filled_events (taker);
CREATE INDEX idx_fills_ts    ON order_filled_events (timestamp);

CREATE INDEX idx_pos_amount ON positions (amount);
CREATE INDEX idx_pos_block  ON positions (last_block);

CREATE INDEX idx_redeem_block ON payout_redemptions (block_number);

-- Markets: for joining token_id to market info
CREATE INDEX idx_markets_yes ON markets (yes_token_id);
CREATE INDEX idx_markets_no  ON markets (no_token_id);

Step 4: Keep it updated

Your database now has the full history. But Polymarket never stops — new trades happen every second. There are three ways to stay current, depending on your plan and latency needs.

Option A: Daily dumps (Lite plan)

The simplest approach. Run the download script from Step 2 once a day. It only downloads new files, so it takes seconds. Then load the new day's data:

# In your daily cron job:
# 1. Download new dump files
python3 download_dumps.py

# 2. Load today's events
python3 -c "
import duckdb, datetime
db = duckdb.connect()
db.execute('INSTALL postgres; LOAD postgres;')
db.execute(\"ATTACH 'dbname=polymarket' AS pg (TYPE POSTGRES);\")

today = datetime.date.today().strftime('%Y%m%d')
for t in ['order_filled_events', 'payout_redemptions',
         'position_splits', 'position_merges', 'position_conversions']:
    f = f'dumps/{t}/{today}.parquet'
    db.execute(f'INSERT INTO pg.{t} SELECT * FROM read_parquet(\"{f}\")')
"

# 3. Refresh positions (UPSERT) and markets (replace)

This keeps you at most ~24 hours behind. Good enough for daily analytics, backtesting, and research.

Option B: Polling /events (Pro plan)

For near real-time sync. After the initial backfill, poll the /events endpoint every few seconds to get new events as they happen.

import requests, psycopg2, csv, io, time

API = "https://api.predmktdata.com"
KEY = "YOUR_KEY"
conn = psycopg2.connect("dbname=polymarket")

# Start from where the dumps left off
with conn.cursor() as cur:
    cur.execute("SELECT COALESCE(MAX(last_block), 0) FROM positions")
    cursor = cur.fetchone()[0]

while True:
    for table in ["order_filled_events", "positions", "payout_redemptions",
                  "position_splits", "position_merges", "position_conversions"]:
        r = requests.get(f"{API}/events", headers={"x-api-key": KEY},
                         params={"after_block": cursor, "tables": table})

        if r.status_code == 200 and r.text.strip():
            reader = csv.DictReader(io.StringIO(r.text))
            rows = list(reader)
            if rows:
                # Insert into your database
                insert_rows(conn, table, rows)

        # Track cursor from response headers
        new_cursor = int(r.headers.get("x-last-block", cursor))
        cursor = max(cursor, new_cursor)

    # Save cursor
    with conn.cursor() as cur:
        cur.execute("""INSERT INTO sync_state (key, value) VALUES ('cursor', %s)
                       ON CONFLICT (key) DO UPDATE SET value = %s""", (cursor, cursor))
    conn.commit()

    time.sleep(5)  # Poll every 5 seconds
Check the x-reorg response header. If present, the chain reorganized and you need to roll back to the indicated block. Delete events with block_number > x-reorg and re-fetch from there.

Option C: WebSocket firehose (Pro plan, lowest latency)

The /ws/stream WebSocket pushes every event to you the moment the indexer commits it. No polling, no gaps, sub-second latency.

import asyncio, websockets, json, psycopg2

conn = psycopg2.connect("dbname=polymarket")

# Get your sync cursor
with conn.cursor() as cur:
    cur.execute("SELECT value FROM sync_state WHERE key = 'cursor'")
    row = cur.fetchone()
    cursor = row[0] if row else 0

async def stream():
    uri = (f"wss://api.predmktdata.com/ws/stream"
           f"?x_api_key=YOUR_KEY&start_block={cursor}")

    async with websockets.connect(uri) as ws:
        async for msg in ws:
            batch = json.loads(msg)

            if batch["type"] == "ping":
                continue

            if batch["type"] == "caught_up":
                print(f"Caught up at block {batch['block']}")
                continue

            # batch["type"] == "batch"
            # Contains: from_block, to_block, and arrays for each table
            for table in ["order_filled_events", "positions",
                          "payout_redemptions", "position_splits",
                          "position_merges", "position_conversions"]:
                rows = batch.get(table, [])
                if rows:
                    insert_rows(conn, table, rows)

            # Update cursor
            new_block = batch["to_block"]
            with conn.cursor() as cur:
                cur.execute("""INSERT INTO sync_state (key, value)
                               VALUES ('cursor', %s)
                               ON CONFLICT (key) DO UPDATE SET value = %s""",
                            (new_block, new_block))
            conn.commit()

asyncio.run(stream())

The stream starts by catching up from your start_block — it rapidly sends batches of historical data until you're at the chain tip. Then it switches to live mode, pushing new events every few seconds as the indexer commits them.

Handling positions (UPSERT, not INSERT)

Event tables are append-only — just INSERT. But positions are mutable. The same wallet can buy more of the same token, and its position row gets updated. Always use UPSERT:

INSERT INTO positions (user_address, token_id, amount, avg_price,
                       realized_pnl, total_bought, last_block)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (user_address, token_id) DO UPDATE SET
    amount = EXCLUDED.amount,
    avg_price = EXCLUDED.avg_price,
    realized_pnl = EXCLUDED.realized_pnl,
    total_bought = EXCLUDED.total_bought,
    last_block = EXCLUDED.last_block;
Markets should also be replaced, not appended. Delete the old data and re-insert the fresh snapshot, or use UPSERT on condition_id.

Verify your data

After loading, run a quick sanity check to make sure your row counts match the source:

-- Compare your counts to the /status endpoint
SELECT
    'order_filled_events' as table_name, count(*) FROM order_filled_events
UNION ALL SELECT
    'positions', count(*) FROM positions
UNION ALL SELECT
    'payout_redemptions', count(*) FROM payout_redemptions
UNION ALL SELECT
    'position_splits', count(*) FROM position_splits
UNION ALL SELECT
    'position_merges', count(*) FROM position_merges
UNION ALL SELECT
    'position_conversions', count(*) FROM position_conversions;

Then compare the results to what GET /status returns. They should be within a day's worth of data.

Example queries on your new database

Now you have the full dataset. Here are some things you can do that aren't possible with any API:

-- Top 10 wallets by total trading volume (USDC)
SELECT maker, SUM(maker_amount_filled) / 1e6 AS volume_usdc
FROM order_filled_events
GROUP BY maker
ORDER BY volume_usdc DESC
LIMIT 10;

-- Wallets with the highest realized PnL
SELECT user_address, SUM(realized_pnl) / 1e6 AS total_pnl_usdc
FROM positions
GROUP BY user_address
ORDER BY total_pnl_usdc DESC
LIMIT 20;

-- Daily trading volume over the last month
SELECT timestamp::date AS day,
       SUM(maker_amount_filled) / 1e6 AS volume_usdc,
       COUNT(*) AS trades
FROM order_filled_events
WHERE timestamp > now() - interval '30 days'
GROUP BY day
ORDER BY day;

-- Find which market a position belongs to
SELECT p.user_address, p.amount / 1e6 AS amount_usdc,
       m.question, m.outcome_yes
FROM positions p
JOIN markets m ON m.yes_token_id = p.token_id
                OR m.no_token_id = p.token_id
WHERE p.user_address = '0x...'
ORDER BY amount_usdc DESC;

Production tips

Ready to build your database?

Sign up, get your API key, and start downloading. Full history since 2022.

Get started