Skip to main content
Examples and code snippets accompany each section of this documentation to guide you through common tasks.

Authenticating with the Snowflake Connector

Getting Started

Snowflake offers drivers which allow you to perform operations on Snowflake from application code. There are many language implementations including Python and NodeJS: To get started, you can authenticate using either a password or private key, depending on your desired method. Examples Connecting with a Private Key
import snowflake.connector
from snowflake.connector.cursor import SnowflakeCursor

# Connect using private key
def build_snowflake_cursor() -> SnowflakeCursor:
    conn = snowflake.connector.connect(
        user=SNOWFLAKE_USER,
        account=ACCOUNT_NAME,
        private_key=SNOWFLAKE_PRIVATE_KEY,
        warehouse=SNOWFLAKE_WAREHOUSE,
        database=SNOWFLAKE_DATABASE,
        schema=SCHEMA,
        role=SNOWFLAKE_ROLE,
    )
    return conn.cursor()
Connecting with a Password
import snowflake.connector
from snowflake.connector.cursor import SnowflakeCursor

# Connect using password
def build_snowflake_cursor() -> SnowflakeCursor:
    conn = snowflake.connector.connect(
        user=SNOWFLAKE_USER,
        account=ACCOUNT_NAME,
        password=PASSWORD,
        warehouse=SNOWFLAKE_WAREHOUSE,
        database=SNOWFLAKE_DATABASE,
        schema=SCHEMA,
        role=SNOWFLAKE_ROLE,
    )
    return conn.cursor()
Once you have established a connection, you can run this simple query to check that you are properly connected:
def sample_query(cursor: SnowflakeCursor) -> list[int]:
    query = "select 1"
    return cursor.execute(query).fetchall()

Streaming Data

A Snowflake Stream is a great way to stream new and/or altered data from Snowflake to any storage destination. There are two steps you must take to get started:

Getting Started

  1. Create a stream on any table or view using a Snowflake command.
CREATE STREAM example_db.example_schema.example_stream ON VIEW example_db.example_schema.example_table;
  1. Consume the stream using any DML action. Below is an example of exporting the stream to a stage.

Use a Snowflake Connector

from datetime import UTC, datetime
from snowflake.connector import SnowflakeConnection, connect


class SnowflakeStreamExporter:
    def __init__(
        self, snowflake: SnowflakeConnection, database: str, schema: str, stream: str, stage: str
    ):
        self.snowflake = snowflake
        self.database = database
        self.schema = schema
        self.stream = stream
        self.stage = stage

    def export_to_stage(self):
        cur_time = datetime.now(UTC).strftime("%Y%m%d_%H_%M_%S")
        query = f"""
            COPY INTO {self.stage}/{self.schema}/{self.stream}_{cur_time}
            FROM (select object_construct(*) as data from {self.database}.{self.schema}.{self.stream})
            FILE_FORMAT = (TYPE = JSON COMPRESSION = 'GZIP');
        """

        try:
            cursor = self.snowflake.cursor()
            results = cursor.execute(query).fetchall()
            for result in results:
                print(result)
        except Exception as e:
            print(f"Error during COPY INTO operation: {e}")
        finally:
            cursor.close()


if __name__ == "__main__":

    database = "db_name"
    schema = "schema_name"
    stream = "stream_name"
    stage = "@db_name.schema_name.stage_name"

    conn = connect(
        user="username",
        account="account_name",
        database=database,
        schema=schema,
        role="role_name",
        warehouse="warehouse_name",

        # Authenticate using password OR private_key. You only need to use one, not both.
        password="password",
        private_key="private_key",
    )
    exporter = SnowflakeStreamExporter(conn, database, schema, stream, stage)
    exporter.export_to_stage()

Unloading data directly from Snowflake

If you wish to export data directly from Snowflake to any external storage location, data unloading commands is a great way to accomplish this. Below is an example of using SQL to unload data from a Snowflake table to parquet file format. To get started, follow these steps:
  1. Create an external stage which points to the cloud bucket storage location you wish to unload the files to.
  2. Use the COPY INTO <location> command to copy the data from the Snowflake table into one or more files in the storage destination.
It’s important to note that using this method of unloading data can incur egress costs depending on the destination the data is unloaded to.
copy into @STAGE_NAME/destination/path/
    from (
    select
        block_number,
        timestamp,
        datetime,
        block_hash,
        block_parent_hash,
        nonce,
        sha3_uncles,
        logs_bloom,
        transactions_root,
        state_root,
        receipts_root,
        miner,
        mix_hash,
        extra_data,
        difficulty,
        total_difficulty,
        size,
        gas_limit,
        gas_used,
        transaction_count,
        base_fee_per_gas,
        is_reorg,
        date_created,
        date_updated
    from ethereum.blocks
    where datetime >= '2024-06-01' and datetime < '2024-07-01'
)
partition by (
    'year=' ||
    to_char(date(date_trunc(year, datetime)), 'YYYY') || '/' ||
    'month=' ||
    to_char(date(date_trunc(month, datetime)), 'MM') || '/' ||
    'day=' ||
    lpad(to_char(extract(day from datetime)), 2, '0') || '/' ||
    'hour=' ||
    lpad(to_char(extract(hour from datetime)), 2, '0')
)
file_format = (type = parquet)
header = true
max_file_size = 5242880000
detailed_output = true
;

Backfilling a Snowflake Schema using the Connector

Another option for exporting data from Snowflake to any destination is to use the Snowflake Connector. The example below demonstrates backfilling an entire Snowflake schema to local files, but you can easily adapt it to export the data to any destination you prefer. One advantage of using this method is that you will not incur egress costs, as querying data from Snowflake is free. Example Code
import os
        sf_cur.execute(table_sql)
        for row in sf_cur.fetchall():
            table_list.append(row[0].lower())

    return table_list


def write_local_file(sf_conn, schema: str, table_name: str, chunksize: int = 100000, output_dir: str = "output"):
    row_count = 0
    tgt_table_name = f"{schema.lower()}.{table_name.lower()}"
    os.makedirs(os.path.join(output_dir, table_name), exist_ok=True)

    read_sql = jinjafy_sql(READ_TEMPLATE, {"tgt_table": tgt_table_name})
    print(read_sql.strip())

    chunk_count = 0
    for df in pd.read_sql(read_sql, sf_conn, chunksize=chunksize):
        file_name = f"{output_dir}/{table_name}/{table_name}_{chunk_count}.csv"
        row_count += len(df)
        df.columns = map(str.lower, df.columns)
        print(f"Loading chunk {row_count:,} rows")
        df.to_csv(file_name, index=False)
        chunk_count += 1

    return row_count


class SnowflakeBackfillExporter:
    def __init__(
        self,
        snowflake: snowflake.connector.SnowflakeConnection,
        chunksize: int = 100000,
        output_dir: str = "output",
    ):
        self.snowflake = snowflake
        self.database = os.getenv("SNOWFLAKE_DATABASE")
        self.schema = os.getenv("SNOWFLAKE_SCHEMA")
        self.chunksize = chunksize
        self.output_dir = output_dir

    def run_backfill(self):
        sf_conn = get_snowflake_connection()

        row_count = table_count = 0

        # Get list of tables in Snowflake to export
        table_list = get_snowflake_tables_in_schema(sf_conn, self.schema)
        print(f"Tables in {self.schema}: {table_list}")

        for table_name in table_list:
            table_count += 1
            table_row_count = 0

            # Query and write data to local file
            # Alter this to export to desired destination
            table_row_count = write_local_file(
                sf_conn, self.schema, table_name=table_name, chunksize=self.chunksize, output_dir=self.output_dir
            )

            row_count += table_row_count
            print(f"Exported {table_row_count:,} rows from {table_name}\n")

        print(f"Exported {row_count:,} rows from {table_count} tables")

        sf_conn.close()


if __name__ == "__main__":
    sf_conn = get_snowflake_connection()
    exporter = SnowflakeBackfillExporter(sf_conn)
    exporter.run_backfill()
Notes
  1. The DATE_CREATED/DATE_UPDATED fields in our Snowflake tables allow users to know when any update has happened.