پرش به مطلب اصلی

دیتا کاتالوگ چیست؟

دیتا کاتالوگ (Data Catalog) یک ابزار است که به سازمان‌ها کمک می‌کند تا داده‌های خود را به‌طور منظم و سازمان‌یافته ذخیره، شناسایی و مدیریت کنند. این کاتالوگ می‌تواند شامل متادیتا (اطلاعات اضافی درباره داده‌ها) باشد که به کاربران این امکان را می‌دهد به‌راحتی به داده‌ها دسترسی پیدا کنند، کیفیت داده‌ها را ارزیابی کنند، و از آن‌ها در تصمیم‌گیری‌های خود استفاده کنند.

از ویژگی‌های اصلی دیتا کاتالوگ می‌توان به موارد زیر اشاره کرد:

  • شناسایی و کشف داده‌ها: جست‌وجو میان داده‌ها و منابع مختلف
  • مدیریت متادیتا: ذخیره‌ی اطلاعاتی مانند نوع داده‌ها، فرمت‌ها، ساختار و تاریخ تغییرات داده
  • دسترس‌پذیری و همکاری: به‌اشتراک گذاری داده‌ها برای همکاری بین‌تیمی ساده‌تر

S3 Data Catalog آروان یک دیتا کاتالوگ مدیریت‌شده بر پایه‌ی Apache Iceberg است که به‌طور مستقیم داده‌ها و متادیتای آن‌ها را در داخل صندوقچه‌های فضای ابری ذخیره می‌کند. این سرویس یک رابط استاندارد Iceberg REST ارایه می‌دهد تا بتوانید از موتورهایی که پیش‌تر استفاده می‌کردید (مانند Spark و PyIceberg) به آن متصل شوید و از صندوقچه‌های خود مثل یک دیتابیس با ساختاری انعطاف‌پذیر استفاده کنید.

دیتا کاتالوگ آروان‌کلاد به راحتی امکان تبدیل صندوقچه‌های فضای ابری را به یک انبار داده (Data Warehouse) برای انواع پردازش‌های کاری تحلیلی از جمله تحلیل لاگ‌ها، هوش تجاری و پایپلاین‌های داده فراهم می‌کند.

Apache Iceberg چیست؟

Apache Iceberg یک فرمت جدول باز (Open Table Format) است که برای مدیریت داده‌های تحلیلی با مقیاس بزرگ که در فضای ابری (Object Storage) ذخیره شده‌اند طراحی شده است.

ویژگی‌های کلیدی آن موارد زیر است:

  • تراکنش‌های ACID: اطمینان از خواندن و نوشتن هم‌زمان و قابل اعتماد با یکپارچگی کامل داده‌ها.
  • متادیتای بهینه‌شده: جلوگیری از اسکن‌های کامل جدول با استفاده از متادیتای ایندکس‌شده برای اجرای سریع‌تر کوئری‌ها.
  • انعطاف‌پذیری ساختار داده (Schema Evolution): امکان اضافه کردن، تغییر نام دادن و حذف ستون‌ها بدون نیاز به بازنویسی داده‌ها.

Iceberg در حال حاضر به‌طور گسترده‌ای توسط موتورهایی مانند Apache Spark ،Trino ،Snowflake ،DuckDB و ClickHouse پشتیبانی می‌شود.

چرا به دیتا کاتالوگ نیاز دارید؟

اگرچه دیتا و متادیتای Iceberg به‌طور مستقیم در فضای ابری ذخیره می‌شود، لیست جدول‌ها و اشاره‌گرهای مربوط به متادیتای جاری باید به‌شکل مرکزی توسط یک دیتا کاتالوگ مدیریت شوند.

به‌طور مشابه، دیتا کاتالوگ‌ها با ایجاد دسترسی هماهنگ و منسجم این امکان را برای موتورهای جست‌وجو مختلف فراهم می‌کند تا به‌طور ایمن از همان جدول‌ها بخوانند و در آن‌ها بنویسند بدون این‌که تداخلی در داده‌ها یا خرابی اطلاعات ایجاد شود.

فعال‌سازی دیتا کاتالوگ آروان‌کلاد

برای استفاده از دیتا کاتالوگ آروان، وارد پنل کاربری و بخش فضای ابری شوید. سپس روی قابلیت «دیتا کاتالوگ» کلیک کنید و در این صفحه، دکمه‌ی «دیتا کاتالوگ» را بزنید.

در صفحه‌ی جدید، باید مشخصات دیتا کاتالوگ را وارد کنید.

ابتدا یک نام برای کاتالوگ تعیین کنید و سپس صندوقچه‌ی آن را انتخاب کنید. اگر از پیش صندوقچه‌ای ندارید، در همین صفحه می‌توانید یک صندوقچه‌ی جدید بسازید. هم‌چنین برای بخش کلیدهای دسترسی، باید حداقل یک کلید دسترسی ماشین‌یوزر ایجاد کرده باشید. راهنمای ماشین‌یوزر به شما در این مسیر کمک می‌کند.

با کلیک روی دکمه‌ی «ایجاد» کاتالوگ ساخته شده و می‌توانید با استفاده از مقادیر Catalog URI ،Warehouse Name و کلید دسترسی خود ابزارهایتان را پیکربندی کنید.

اکنون می‌توانید با هر ابزاری که فرمت جدول‌باز را پشتیبانی کند از کاتالوگ استفاده کنید. مانند:

  • PySpark
  • PyIceberg
  • DuckDB
  • Trino

ابزارهای پردازش داده

از محبوب‌ترین ابزارهای پردازش داده که می‌‌توانید برای استفاده از دیتا کاتالوگ انتخاب کنید، PyIceberg و PySpark هستند.

استفاده از PyIceberg برای اتصال به دیتا کاتالوگ

پیش از شروع نیاز است کتاب‌خانه‌های زیر را نصب کنید:

  • s3fs

  • PyIceberg

    برای نصب از دستور زیر استفاده کنید:

    pip install pyiceberg
  • PyArrow

    برای نصب از دستور زیر استفاده کنید:

    pip install pyarrow

نمونه کد استفاده از PyIceberg

برای اتصال به دیتا کاتالوگ و ایجاد جدول‌های Iceberg می‌توانید از نمونه‌کد زیر استفاده کنید:

import pyarrow as pa
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.exceptions import NamespaceAlreadyExistsError, TableAlreadyExistsError
import string
import random
# Define catalog connection details
WAREHOUSE = "<WAREHOUSE_NAME>"
CATALOG_URI = "https://<WAREHOUSE_NAME>.bucketcatalog.arvanstorage.ir/iceberg/"
TOKEN = "apikey XXXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
ACCESS_KEY = "XXXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
SECRET_ACCESS_KEY = "XXXXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
# This configuration can be adjusted based on your needs. For large batch requests,
# if you encounter timeouts, you can increase the timeout values.
S3_CONNECT_TIMEOUT = "30"
S3_REQUEST_TIMEOUT = "30"
# Connect to ArvanCloud Data Catalog
catalog = RestCatalog(
name="my_catalog",
uri= CATALOG_URI,
warehouse= WAREHOUSE,
token=TOKEN,
**{
"s3.connect-timeout": S3_CONNECT_TIMEOUT,
"s3.request-timeout": S3_REQUEST_TIMEOUT,
"type": "rest",
"s3.access-key-id": ACCESS_KEY,
"s3.secret-access-key": SECRET_ACCESS_KEY
}
)
# Create arvan namespace if it doesn't exist
try:
catalog.create_namespace("arvan")
except NamespaceAlreadyExistsError:
pass
# Define table schema
schema = pa.schema([
("id", pa.int64()),
("name", pa.string())
])
# Create empty Iceberg table
test_table = ("arvan", "my_table")
try:
table = catalog.create_table(
test_table,
schema=schema,
)
except TableAlreadyExistsError:
table = catalog.load_table(test_table)
# Create PyArrow table with data
data = pa.table({
"id": [1, 2, 3, 4, 5],
"name": ["Rostam", "Sohrab", "Esfandiar", "Kaykhosrow", "Siyavash"],
})
# Write data to the table
with table.transaction() as txn:
txn.append(data)
# Load the existing Iceberg table
table = catalog.load_table(test_table)
# Read data from table with limit
result = table.scan(limit=10).to_arrow() # Removed row_filter=None
# Print results
print(result)

استفاده از PySpark برای اتصال به دیتا کاتالوگ

پیش از شروع نیاز است کتاب‌خانه‌های زیر را نصب کنید:

  • Java 11

  • PySpark

    برای نصب از دستور زیر استفاده کنید:

    pip install pyspark==3.5.0

نمونه کد استفاده از PySpark

برای اتصال به دیتا کاتالوگ و اجرای کوئری‌ها می‌توانید از نمونه‌کد زیر استفاده کنید:

from pyspark.sql import SparkSession
import uuid
import time
import random
import string
import time
# Define catalog connection details
WAREHOUSE = "<WAREHOUSE_NAME>"
CATALOG_URI = "https://<WAREHOUSE_NAME>.bucketcatalog.arvanstorage.ir/iceberg/"
TOKEN = "apikey XXXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
# Initialize Spark session with Iceberg configurations
spark = SparkSession.builder \
.appName("IcebergDataCatalogExample") \
.config('spark.jars.packages',
'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,'
'org.apache.iceberg:iceberg-aws-bundle:1.6.1') \
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.my_catalog.type", "rest") \
.config("spark.sql.catalog.my_catalog.uri", CATALOG_URI) \
.config("spark.sql.catalog.my_catalog.warehouse", WAREHOUSE) \
.config("spark.sql.catalog.my_catalog.token", TOKEN) \
.getOrCreate()
try:
# Step 1: Use catalog and create schema
print("Step 1: Creating schema")
spark.sql("USE my_catalog")
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS default")

print("Available schemas:")
schemas = spark.sql("SHOW NAMESPACES IN my_catalog").collect()
for row in schemas:
print(f" - {row['namespace']}")
# Step 2: Create table
print("\nStep 2: Creating table")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS default.my_table (
id BIGINT,
name STRING
)
USING iceberg
""")
tables = spark.sql(f"SHOW TABLES IN default").collect()
print("Tables in schema:")
for row in tables:
print(f" - {row['tableName']}")
# Create a simple DataFrame
df = spark.createDataFrame(
[(1, "Rostam"), (2, "Sohrab"), (3, "Esfandiar"),(4, "Kaykhosrow"),(5, "Siyavash"),(6, "Faramarz")],
["id", "name"]
)
# Write the DataFrame to the Iceberg table
df.write \
.format("iceberg") \
.mode("append") \
.save("default.my_table")


# Read the data back from the Iceberg table
result_df = spark.read \
.format("iceberg") \
.load("default.my_table")

data_df = result_df.limit(5)
# Show the data
data = data_df.collect() # Collect the data from your DataFrame

for row in data:
# Print each row's data into the file
print(f"ID: {row['id']}, Name: {row['name']}\n")
except Exception as e:
print(f"Error occurred: {str(e)}")