database - Very slow ingestion from python in QuestDB - Stack Overflow

I am starting with QuestDB. When I saw that the technology can ingest millions of rows per second, I th

I am starting with QuestDB. When I saw that the technology can ingest millions of rows per second, I thought: finally, I found what I need. I am trying a very simple example shown below. The system works for a few seconds, then it breaks with the connection failed error.

Can someone please help me understand what I am doing wrong? There’s a lot of messages being streamed, but it’s not even close to a million/second.

import os
import sys
from dataclasses import asdict
from typing import List

import pandas as pd
from polygon import WebSocketClient
from polygon.websocket.models import WebSocketMessage, EquityTrade
from questdb.ingress import Sender
from dotenv import load_dotenv
load_dotenv('../.env')

QDB_CLIENT_CONF=(
    f"{os.environ.get('QUESTDB_PROTOCOL')}::"
    f"addr={os.environ.get('QUESTDB_ADDR')}:{os.environ.get('QUESTDB_PORT')};"
    f"username={os.environ.get('QUESTDB_USERNAME')};password={os.environ.get('QUESTDB_PASSWORD')};"
)


def handle_message(messages: List[WebSocketMessage]):
    with Sender.from_conf(QDB_CLIENT_CONF) as sender:
        for message in messages:
            data = asdict(message)
            if isinstance(message, EquityTrade):
                symbols = {'event_type': 'T', 'symbol': data.pop('symbol')}
                # Convert timestamp fields to datetime objects
                for date_col in ['timestamp', 'trf_timestamp']:
                    if data[date_col] is not None:
                        data[date_col] = pd.to_datetime(data[date_col], unit='ms')

                # Convert conditions list to string if it exists
                if data['conditions'] is not None:
                    data['conditions'] = ','.join(map(str, data['conditions']))

                # Send data to QuestDB
                sender.row('equity_trade', symbols=symbols, columns=data, at=data['timestamp'])
                print(
                    f"{message.symbol.rjust(6)}\t"
                    f"{pd.Timestamp(message.timestamp, unit='ms')}\t"
                    f"{message.price:>12,.2f}\t"
                    f"{message.size:>8,d}"
                )

        sender.flush()


if __name__ == '__main__':
    try:
        client = WebSocketClient(
            api_key=os.getenv('POLYGON_API_KEY'),
            subscriptions=['T.*']
        )
        client.run(handle_message)
    except KeyboardInterrupt:
        print("\nShutting down...")
        sys.exit(0)

I am starting with QuestDB. When I saw that the technology can ingest millions of rows per second, I thought: finally, I found what I need. I am trying a very simple example shown below. The system works for a few seconds, then it breaks with the connection failed error.

Can someone please help me understand what I am doing wrong? There’s a lot of messages being streamed, but it’s not even close to a million/second.

import os
import sys
from dataclasses import asdict
from typing import List

import pandas as pd
from polygon import WebSocketClient
from polygon.websocket.models import WebSocketMessage, EquityTrade
from questdb.ingress import Sender
from dotenv import load_dotenv
load_dotenv('../.env')

QDB_CLIENT_CONF=(
    f"{os.environ.get('QUESTDB_PROTOCOL')}::"
    f"addr={os.environ.get('QUESTDB_ADDR')}:{os.environ.get('QUESTDB_PORT')};"
    f"username={os.environ.get('QUESTDB_USERNAME')};password={os.environ.get('QUESTDB_PASSWORD')};"
)


def handle_message(messages: List[WebSocketMessage]):
    with Sender.from_conf(QDB_CLIENT_CONF) as sender:
        for message in messages:
            data = asdict(message)
            if isinstance(message, EquityTrade):
                symbols = {'event_type': 'T', 'symbol': data.pop('symbol')}
                # Convert timestamp fields to datetime objects
                for date_col in ['timestamp', 'trf_timestamp']:
                    if data[date_col] is not None:
                        data[date_col] = pd.to_datetime(data[date_col], unit='ms')

                # Convert conditions list to string if it exists
                if data['conditions'] is not None:
                    data['conditions'] = ','.join(map(str, data['conditions']))

                # Send data to QuestDB
                sender.row('equity_trade', symbols=symbols, columns=data, at=data['timestamp'])
                print(
                    f"{message.symbol.rjust(6)}\t"
                    f"{pd.Timestamp(message.timestamp, unit='ms')}\t"
                    f"{message.price:>12,.2f}\t"
                    f"{message.size:>8,d}"
                )

        sender.flush()


if __name__ == '__main__':
    try:
        client = WebSocketClient(
            api_key=os.getenv('POLYGON_API_KEY'),
            subscriptions=['T.*']
        )
        client.run(handle_message)
    except KeyboardInterrupt:
        print("\nShutting down...")
        sys.exit(0)
Share Improve this question asked Mar 26 at 17:50 Javier RamirezJavier Ramirez 4,0851 gold badge27 silver badges36 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

There are two big problems with the code above. The first one is that Sender.from_conf(QDB_CLIENT_CONF) as sender: is being invoke at each message, establishing the connection every time, which is very resource-consuming. You can just open the connection at the __main__ block of your script, so it will be reused.

The second anti pattern is calling flush() after every message. In QuestDB it is more efficient to batch messages together. By default QuestDB will autoflush every 75K messages or every 1000ms, whatever happens first. Unless really needed it, it is a best practice to rely on autoflush (params can be configured if the defaults are not ideal for you), and call only flush at the end of the script, to flush any pending data before closing.

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744134447a4559993.html

相关推荐

  • database - Very slow ingestion from python in QuestDB - Stack Overflow

    I am starting with QuestDB. When I saw that the technology can ingest millions of rows per second, I th

    8天前
    40

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信