Skip to content

seilylook/Data_Engineering_with_Python

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

3 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

Data Engineering with Python

Chapter 2. Building Infrastructure

Make build (docker build python image & docker compose up)

์ฑ…์—์„œ๋Š” Airflow, NiFi, PostgreSQL, Elasticsearch, Kibana, Kafka, Spark ๋“ฑ ๋ชจ์กฐ๋ฆฌ ๋‹ค ๋กœ์ปฌ ํ™˜๊ฒฝ์—์„œ ์„ค์น˜ํ•ด์„œ ์‹ค์Šตํ•œ๋‹ค. ํ•˜์ง€๋งŒ ์ด๋Š” ๋‚ด๊ฐ€ ์•„์ฃผ ์‹ซ์–ดํ•˜๋Š” ์ƒํ™ฉ์ด๋ฏ€๋กœ ๋‹น์—ฐํ•˜๊ฒŒ Docker๋ฅผ ํ™œ์šฉํ•ด์„œ ํ™˜๊ฒฝ์„ ๊ตฌ์ถ•ํ–ˆ๋‹ค.

๋ฒ„์ „ ๊ด€๋ฆฌ

Software / hardware OS requirements
Python 3.12.8
Nifi apache/nifi:1.28.0
PostgreSQL postgres:13
ElasticSearch elasticsearch:7.17.28
Kibana kibana:7.17.28
app-py3.12 โœ˜ {seilylook} ๐Ÿ€ make build
==============================================
Exporting Python dependencies to requirements.txt...
==============================================
poetry export -f requirements.txt --output requirements.txt --without-hashes --with dev
Warning: poetry-plugin-export will not be installed by default in a future version of Poetry.
In order to avoid a breaking change and make your automation forward-compatible, please install poetry-plugin-export explicitly. See https://python-poetry.org/docs/plugins/#using-plugins for details on how to install a plugin.
To disable this warning run 'poetry config warnings.export false'.


==============================================
Building Docker image python-app:latest...
==============================================
docker build -t python-app:latest .
[+] Building 1.7s (20/20) FINISHED                                                                                               docker:desktop-linux
 => [internal] load build definition from Dockerfile                                                                                             0.0s
 => => transferring dockerfile: 1.14kB                                                                                                           0.0s
 => [internal] load metadata for docker.io/library/python:3.12-slim                                                                              1.5s
 => [auth] library/python:pull token for registry-1.docker.io                                                                                    0.0s
 => [internal] load .dockerignore                                                                                                                0.0s
 => => transferring context: 2B                                                                                                                  0.0s
 => [internal] load build context                                                                                                                0.0s
 => => transferring context: 33.38kB                                                                                                             0.0s
 => [builder 1/5] FROM docker.io/library/python:3.12-slim@sha256:aaa3f8cb64dd64e5f8cb6e58346bdcfa410a108324b0f28f1a7cc5964355b211                0.0s
 => CACHED [stage-1  2/10] RUN apt-get update &&     apt-get install -y --no-install-recommends     default-jdk     procps     wget     libpq5   0.0s
 => CACHED [stage-1  3/10] WORKDIR /app                                                                                                          0.0s
 => CACHED [builder 2/5] WORKDIR /app                                                                                                            0.0s
 => CACHED [builder 3/5] RUN apt-get update &&     apt-get install -y --no-install-recommends     build-essential     libpq-dev     python3-dev  0.0s
 => CACHED [builder 4/5] COPY requirements.txt .                                                                                                 0.0s
 => CACHED [builder 5/5] RUN pip wheel --no-cache-dir --no-deps --wheel-dir /app/wheels -r requirements.txt                                      0.0s
 => CACHED [stage-1  4/10] COPY --from=builder /app/wheels /wheels                                                                               0.0s
 => CACHED [stage-1  5/10] COPY --from=builder /app/requirements.txt .                                                                           0.0s
 => CACHED [stage-1  6/10] RUN pip install --no-cache /wheels/*                                                                                  0.0s
 => [stage-1  7/10] COPY src/ src/                                                                                                               0.0s
 => [stage-1  8/10] COPY tests/ tests/                                                                                                           0.0s
 => [stage-1  9/10] COPY data/ data/                                                                                                             0.0s
 => [stage-1 10/10] COPY conf/ conf/                                                                                                             0.0s
 => exporting to image                                                                                                                           0.0s
 => => exporting layers                                                                                                                          0.0s
 => => writing image sha256:a7003da4b1bb9a04c9d98b7a386e9c28c84efe83df2ae20e99abc16cedf9e3fb                                                     0.0s
 => => naming to docker.io/library/python-app:latest                                                                                             0.0s

View build details: docker-desktop://dashboard/build/desktop-linux/desktop-linux/4go6dhhdb6ahlu1m1xmndgsou

What's next:
    View a summary of image vulnerabilities and recommendations โ†’ docker scout quickview 


==============================================
Constructing Docker Containers...
==============================================
docker compose up -d
WARN[0000] The "AIRFLOW_UID" variable is not set. Defaulting to a blank string. 
WARN[0000] The "AIRFLOW_UID" variable is not set. Defaulting to a blank string. 
WARN[0000] /Users/seilylook/Development/Book/Data_Engineering_with_Python/docker-compose.yml: the attribute `version` is obsolete, it will be ignored, please remove it to avoid potential confusion 
[+] Running 10/10
 โœ” Container postgres           Healthy                                                                                                          4.3s 
 โœ” Container elasticsearch      Healthy                                                                                                          3.6s 
 โœ” Container redis              Healthy                                                                                                          4.3s 
 โœ” Container kibana             Running                                                                                                          0.0s 
 โœ” Container airflow-init       Exited                                                                                                           7.8s 
 โœ” Container python-app         Started                                                                                                          3.8s 
 โœ” Container airflow-triggerer  Running                                                                                                          0.0s 
 โœ” Container airflow-webserver  Running                                                                                                          0.0s 
 โœ” Container airflow-scheduler  Running                                                                                                          0.0s 
 โœ” Container airflow-worker     Running                                                                                                          0.0s 


==============================================
Waiting for PostgreSQL to start...
==============================================
=====================================
Initializing PostgreSQL...
=====================================
chmod +x ./scripts/init_postgresql.sh
./scripts/init_postgresql.sh
dataengineering ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ƒ์„ฑ ์ค‘...
ERROR:  database "dataengineering" already exists
Successfully copied 2.05kB to postgres:/tmp/create_tables.sql
ํ…Œ์ด๋ธ” ์ƒ์„ฑ ์ค‘...
psql:/tmp/create_tables.sql:10: NOTICE:  relation "users" already exists, skipping
CREATE TABLE
        List of relations
 Schema | Name  | Type  |  Owner  
--------+-------+-------+---------
 public | users | table | airflow
(1 row)

                                    Table "public.users"
 Column |          Type          | Collation | Nullable |              Default              
--------+------------------------+-----------+----------+-----------------------------------
 id     | integer                |           | not null | nextval('users_id_seq'::regclass)
 name   | character varying(100) |           | not null | 
 street | character varying(200) |           |          | 
 city   | character varying(100) |           |          | 
 zip    | character varying(10)  |           |          | 
 lng    | numeric(10,6)          |           |          | 
 lat    | numeric(10,6)          |           |          | 
Indexes:
    "users_pkey" PRIMARY KEY, btree (id)

๊ถŒํ•œ ๋ถ€์—ฌ ์ค‘...
GRANT
๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ ํ…Œ์ด๋ธ”์ด ์„ฑ๊ณต์ ์œผ๋กœ ์ƒ์„ฑ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.
๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์—ฐ๊ฒฐ ํ…Œ์ŠคํŠธ ์ค‘...
/var/run/postgresql:5432 - accepting connections
๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๊ฐ€ ์ •์ƒ์ ์œผ๋กœ ์‘๋‹ตํ•ฉ๋‹ˆ๋‹ค.




==============================================
Waiting for Elasticsearch to start...
==============================================
=====================================
Initializing Elasticsearch...
=====================================
chmod +x ./scripts/init_elasticsearch.sh
./scripts/init_elasticsearch.sh
Elasticsearch๊ฐ€ ์ค€๋น„๋  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ ์ค‘...
Elasticsearch๊ฐ€ ์ค€๋น„๋˜์—ˆ์Šต๋‹ˆ๋‹ค.
users ์ธ๋ฑ์Šค ์ƒ์„ฑ ์ค‘...
{"error":{"root_cause":[{"type":"resource_already_exists_exception","reason":"index [users/AW57UPmxTYyW3G-GdL6lHw] already exists","index_uuid":"AW57UPmxTYyW3G-GdL6lHw","index":"users"}],"type":"resource_already_exists_exception","reason":"index [users/AW57UPmxTYyW3G-GdL6lHw] already exists","index_uuid":"AW57UPmxTYyW3G-GdL6lHw","index":"users"},"status":400}users ์ธ๋ฑ์Šค๊ฐ€ ์„ฑ๊ณต์ ์œผ๋กœ ์ƒ์„ฑ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.
์ธ๋ฑ์Šค ๋ชฉ๋ก ํ™•์ธ:
health status index                                                              uuid                   pri rep docs.count docs.deleted store.size pri.store.size dataset.size
green  open   .internal.alerts-transform.health.alerts-default-000001            MSc-VAFyQHG9tGk2TxwbGg   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-observability.logs.alerts-default-000001          Bief08evQ_SX_3UrwZOzwQ   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-observability.uptime.alerts-default-000001        N-ptFAoNTYeF7OHGtOqZFw   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-ml.anomaly-detection.alerts-default-000001        JL9JINenTS-XMJClxKXrYA   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-observability.slo.alerts-default-000001           rFwL_h62QZmQx8kkXFZIyA   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-default.alerts-default-000001                     6xFM1NqvTc--9BJ7MlhCpA   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-observability.apm.alerts-default-000001           D1xM5G2DTPeF25kkR6_KWQ   1   0          0            0       249b           249b         249b
green  open   users                                                              AW57UPmxTYyW3G-GdL6lHw   1   0       1000            0    229.1kb        229.1kb      229.1kb
green  open   .internal.alerts-observability.metrics.alerts-default-000001       IO4Li-8sS6-AH8aSsvEqLg   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-ml.anomaly-detection-health.alerts-default-000001 95sA140IQ2qCyErtmHKTEg   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-observability.threshold.alerts-default-000001     TV3IWC9fQUuYWQO-56_5sA   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-security.alerts-default-000001                    VE-qcLiURZy7h_i1hb96Lw   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-stack.alerts-default-000001                       _PEhTLb4TJ2bYAwiv6kz8w   1   0          0            0       249b           249b         249b

Make test

app-py3.12 {seilylook} ๐Ÿ€ make test 
=======================
Running tests with pytest...
=======================
mkdir -p target
docker run --rm -v /Users/seilylook/Development/Book/Data_Engineering_with_Python/app/target:/app/target python-app:latest /bin/bash -c \
                'for test_file in $(find tests -name "*.py" ! -name "__init__.py"); do \
                        base_name=$(basename $test_file .py); \
                        pytest $test_file --junitxml=target/$base_name.xml; \
                done'
============================= test session starts ==============================
platform linux -- Python 3.12.9, pytest-8.3.4, pluggy-1.5.0
rootdir: /app
plugins: Faker-36.1.1, time-machine-2.16.0, anyio-4.8.0
collected 7 items

tests/test_progress_bar.py .......                                       [100%]

------------ generated xml file: /app/target/test_progress_bar.xml -------------
============================== 7 passed in 0.06s ===============================
============================= test session starts ==============================
platform linux -- Python 3.12.9, pytest-8.3.4, pluggy-1.5.0
rootdir: /app
plugins: Faker-36.1.1, time-machine-2.16.0, anyio-4.8.0
collected 9 items

tests/test_data_generator.py .........                                   [100%]

----------- generated xml file: /app/target/test_data_generator.xml ------------
============================== 9 passed in 0.03s ===============================
============================= test session starts ==============================
platform linux -- Python 3.12.9, pytest-8.3.4, pluggy-1.5.0
rootdir: /app
plugins: Faker-36.1.1, time-machine-2.16.0, anyio-4.8.0
collected 0 items

----------------- generated xml file: /app/target/conftest.xml -----------------
============================ no tests ran in 0.00s =============================
============================= test session starts ==============================
platform linux -- Python 3.12.9, pytest-8.3.4, pluggy-1.5.0
rootdir: /app
plugins: Faker-36.1.1, time-machine-2.16.0, anyio-4.8.0
collected 7 items

tests/test_postgres_connector.py .......                                 [100%]

--------- generated xml file: /app/target/test_postgres_connector.xml ----------
============================== 7 passed in 0.20s ===============================
============================= test session starts ==============================
platform linux -- Python 3.12.9, pytest-8.3.4, pluggy-1.5.0
rootdir: /app
plugins: Faker-36.1.1, time-machine-2.16.0, anyio-4.8.0
collected 9 items

tests/test_database_config.py .........                                  [100%]

----------- generated xml file: /app/target/test_database_config.xml -----------
============================== 9 passed in 0.02s ===============================
============================= test session starts ==============================
platform linux -- Python 3.12.9, pytest-8.3.4, pluggy-1.5.0
rootdir: /app
plugins: Faker-36.1.1, time-machine-2.16.0, anyio-4.8.0
collected 14 items

tests/test_database_pytest.py ..............                             [100%]

----------- generated xml file: /app/target/test_database_pytest.xml -----------
============================== 14 passed in 0.19s ==============================
============================= test session starts ==============================
platform linux -- Python 3.12.9, pytest-8.3.4, pluggy-1.5.0
rootdir: /app
plugins: Faker-36.1.1, time-machine-2.16.0, anyio-4.8.0
collected 8 items

tests/test_elasticsearch_connector.py ........                           [100%]

------- generated xml file: /app/target/test_elasticsearch_connector.xml -------
============================== 8 passed in 0.18s ===============================

Make start

app-py3.12 {seilylook} ๐Ÿ€ make start
=========================
Starting the application...
=========================
python -m src.main
2025-02-28 17:34:59,269 - root - INFO - ๋ฐ์ดํ„ฐ ์ƒ์„ฑ ๋ฐ ์ €์žฅ ํ”„๋กœ์„ธ์Šค ์‹œ์ž‘
2025-02-28 17:34:59,269 - root - INFO - ๋ฐ์ดํ„ฐ์…‹์ด ์ด๋ฏธ ์กด์žฌํ•ฉ๋‹ˆ๋‹ค: data/raw/test_data.csv
2025-02-28 17:34:59,285 - src.utils.connection - INFO - PostgreSQL ์—ฐ๊ฒฐ ์„ฑ๊ณต!
2025-02-28 17:34:59,286 - src.utils.connection - INFO - Elasticsearch ํด๋ผ์ด์–ธํŠธ ์ƒ์„ฑ ์™„๋ฃŒ
2025-02-28 17:34:59,290 - elastic_transport.transport - INFO - GET http://localhost:9200/ [status:200 duration:0.004s]
2025-02-28 17:34:59,290 - src.utils.connection - INFO - Elasticsearch ์—ฐ๊ฒฐ ์„ฑ๊ณต! ๋ฒ„์ „: 8.17.2
2025-02-28 17:34:59,290 - root - INFO - Postgresql ์ƒํƒœ: ์—ฐ๊ฒฐ๋จ
2025-02-28 17:34:59,290 - root - INFO - Elasticsearch ์ƒํƒœ: ์—ฐ๊ฒฐ๋จ
2025-02-28 17:34:59,295 - root - INFO - PostgreSQL: 1000๊ฐœ ๋ ˆ์ฝ”๋“œ๋ฅผ data/raw/test_data.csv์—์„œ ์ฝ์—ˆ์Šต๋‹ˆ๋‹ค
2025-02-28 17:34:59,330 - src.database.repository - INFO - Bulk inserted 1000 users
2025-02-28 17:34:59,331 - root - INFO - PostgreSQL์— 1000๊ฐœ ๋ ˆ์ฝ”๋“œ ์ €์žฅ ์™„๋ฃŒ
2025-02-28 17:34:59,331 - root - INFO - PostgreSQL์— 1000๊ฐœ ๋ ˆ์ฝ”๋“œ ์ €์žฅ๋จ
2025-02-28 17:34:59,333 - root - INFO - Elasticsearch: 1000๊ฐœ ๋ ˆ์ฝ”๋“œ๋ฅผ data/raw/test_data.csv์—์„œ ์ฝ์—ˆ์Šต๋‹ˆ๋‹ค
2025-02-28 17:34:59,335 - src.utils.connection - INFO - Elasticsearch ํด๋ผ์ด์–ธํŠธ ์ƒ์„ฑ ์™„๋ฃŒ
2025-02-28 17:34:59,408 - elastic_transport.transport - INFO - PUT http://localhost:9200/_bulk?refresh=true [status:200 duration:0.068s]
2025-02-28 17:34:59,410 - src.database.repository - INFO - Elasticsearch์— 1000๊ฐœ ๋ฌธ์„œ ๋ฒŒํฌ ์ €์žฅ ์™„๋ฃŒ
2025-02-28 17:34:59,410 - root - INFO - Elasticsearch์— 1000๊ฐœ ๋ ˆ์ฝ”๋“œ ์ €์žฅ ์™„๋ฃŒ
2025-02-28 17:34:59,410 - root - INFO - Elasticsearch์— 1000๊ฐœ ๋ ˆ์ฝ”๋“œ ์ €์žฅ๋จ
2025-02-28 17:34:59,414 - root - INFO - PostgreSQL์—์„œ 5๊ฐœ ๋ ˆ์ฝ”๋“œ ์กฐํšŒ ์™„๋ฃŒ
2025-02-28 17:34:59,414 - root - INFO - PostgreSQL ๋ฐ์ดํ„ฐ ํ™•์ธ (์ƒ˜ํ”Œ 5๊ฐœ):
2025-02-28 17:34:59,414 - root - INFO -   ๋ ˆ์ฝ”๋“œ 1: {'id': 1, 'name': 'Whitney Olson', 'street': '1791 Pittman Overpass', 'city': 'Lake Jason', 'zip': '48870', 'lng': Decimal('114.735089'), 'lat': Decimal('45.235433')}
2025-02-28 17:34:59,414 - root - INFO -   ๋ ˆ์ฝ”๋“œ 2: {'id': 2, 'name': 'David Smith', 'street': '0474 Julian Station', 'city': 'West Sophia', 'zip': '72976', 'lng': Decimal('94.204753'), 'lat': Decimal('-88.761862')}
2025-02-28 17:34:59,414 - root - INFO -   ๋ ˆ์ฝ”๋“œ 3: {'id': 3, 'name': 'Mr. Jason Hughes MD', 'street': '7351 Robinson Underpass', 'city': 'Stephaniebury', 'zip': '8702', 'lng': Decimal('-87.282108'), 'lat': Decimal('12.763472')}
2025-02-28 17:34:59,414 - root - INFO -   ๋ ˆ์ฝ”๋“œ 4: {'id': 4, 'name': 'John Johnson', 'street': '8304 Cooper Mews', 'city': 'Candicefort', 'zip': '87821', 'lng': Decimal('-169.562279'), 'lat': Decimal('-53.845951')}
2025-02-28 17:34:59,414 - root - INFO -   ๋ ˆ์ฝ”๋“œ 5: {'id': 5, 'name': 'Gregory Harrison', 'street': '0866 Lee Expressway Suite 888', 'city': 'Dianaport', 'zip': '14219', 'lng': Decimal('-30.874919'), 'lat': Decimal('84.261251')}
2025-02-28 17:34:59,414 - src.utils.connection - INFO - Elasticsearch ํด๋ผ์ด์–ธํŠธ ์ƒ์„ฑ ์™„๋ฃŒ
2025-02-28 17:34:59,419 - elastic_transport.transport - INFO - POST http://localhost:9200/users/_search [status:200 duration:0.005s]
2025-02-28 17:34:59,420 - root - INFO - Elasticsearch์—์„œ 5๊ฐœ ๋ ˆ์ฝ”๋“œ ์กฐํšŒ ์™„๋ฃŒ
2025-02-28 17:34:59,420 - root - INFO - Elasticsearch ๋ฐ์ดํ„ฐ ํ™•์ธ (์ƒ˜ํ”Œ 5๊ฐœ):
2025-02-28 17:34:59,420 - root - INFO -   ๋ ˆ์ฝ”๋“œ 1: {'name': 'Whitney Olson', 'age': 26, 'street': '1791 Pittman Overpass', 'city': 'Lake Jason', 'state': 'Idaho', 'zip': 48870, 'lng': 114.735089, 'lat': 45.2354325}
2025-02-28 17:34:59,420 - root - INFO -   ๋ ˆ์ฝ”๋“œ 2: {'name': 'David Smith', 'age': 28, 'street': '0474 Julian Station', 'city': 'West Sophia', 'state': 'Arizona', 'zip': 72976, 'lng': 94.204753, 'lat': -88.761862}
2025-02-28 17:34:59,420 - root - INFO -   ๋ ˆ์ฝ”๋“œ 3: {'name': 'Mr. Jason Hughes MD', 'age': 70, 'street': '7351 Robinson Underpass', 'city': 'Stephaniebury', 'state': 'Mississippi', 'zip': 8702, 'lng': -87.282108, 'lat': 12.763472}
2025-02-28 17:34:59,420 - root - INFO -   ๋ ˆ์ฝ”๋“œ 4: {'name': 'John Johnson', 'age': 41, 'street': '8304 Cooper Mews', 'city': 'Candicefort', 'state': 'Rhode Island', 'zip': 87821, 'lng': -169.562279, 'lat': -53.845951}
2025-02-28 17:34:59,420 - root - INFO -   ๋ ˆ์ฝ”๋“œ 5: {'name': 'Gregory Harrison', 'age': 24, 'street': '0866 Lee Expressway Suite 888', 'city': 'Dianaport', 'state': 'New Jersey', 'zip': 14219, 'lng': -30.874919, 'lat': 84.261251}
2025-02-28 17:34:59,420 - root - INFO - ๋ฐ์ดํ„ฐ ์ƒ์„ฑ ๋ฐ ์ €์žฅ ํ”„๋กœ์„ธ์Šค ์™„๋ฃŒ

Chapter 3. Reading and Writing Files

Handling files using NiFi processors

์•ž์„œ make start๋ฅผ ํ†ตํ•ด Faker๋ฅผ ํ™œ์šฉํ•ด ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์„ฑํ•œ๋‹ค. ์ด ๋ฐ์ดํ„ฐ๋Š” ๋‹ค์Œ ๋””๋ ‰ํ† ๋ฆฌ์— ์กด์žฌํ•œ๋‹ค.

  • local: /app/data/raw/test_data.csv

  • NiFi container: /opt/nifi/nifi-current/data/raw/test_data.csv

# Nifi container ์‹คํ–‰
app-py3.12 {seilylook} ๐Ÿš€ docker exec -i -t nifi /bin/bash

# Container์— ์›๋ณธ ๋ฐ์ดํ„ฐ๊ฐ€ ์กด์žฌํ•˜๋Š”์ง€ ํ™•์ธ
nifi@e92527995ead:/opt/nifi/nifi-current$ ls data/raw/
test_data.csv

# 40์„ธ ์ด์ƒ ์‚ฌ๋žŒ๋“ค Query ํ•˜๊ณ  Name ๊ธฐ์ค€์œผ๋กœ ์ €์žฅ
nifi@e92527995ead:/opt/nifi/nifi-current/data/processed$ ls -al
total 84
drwxr-xr-x 12 nifi nifi  384 Mar  7 07:17  .
drwxr-xr-x  4 root root 4096 Mar  7 07:05  ..
-rw-r--r--  1 nifi nifi 5637 Mar  7 07:17 'Amber Taylor'
-rw-r--r--  1 nifi nifi 5284 Mar  7 07:17 'Charles Arnold'
-rw-r--r--  1 nifi nifi 5789 Mar  7 07:17 'Corey Hardin'
-rw-r--r--  1 nifi nifi 6580 Mar  7 07:17 'Ebony Miller'
-rw-r--r--  1 nifi nifi 6030 Mar  7 07:17 'Grant Garrison'
-rw-r--r--  1 nifi nifi 5108 Mar  7 07:17 'Kristina Parker'
-rw-r--r--  1 nifi nifi 5444 Mar  7 07:17 'Nicholas Baker MD'
-rw-r--r--  1 nifi nifi 5277 Mar  7 07:17 'Phillip Love'
-rw-r--r--  1 nifi nifi 6180 Mar  7 07:17 'Whitney Barnes'
-rw-r--r--  1 nifi nifi 5438 Mar  7 07:17 'Zachary Cohen'

Processors ์„ค์ •

์ฑ… 63p๋ฅผ ์ฐธ์กฐํ•ด์„œ Processors์™€ ๊ฐ Processors๋“ค์˜ Properties๋ฅผ ์„ค์ •ํ–ˆ๋‹ค. ์›๋ณธ ๋ฐ์ดํ„ฐ๋Š” Row๊ฐ€ 1000๊ฐœ์ด๋‹ค. SplitRecord Processor์˜ Records Per Split๋ฅผ 100์œผ๋กœ ์„ค์ •ํ–ˆ๊ธฐ ๋•Œ๋ฌธ์— /opt/nifi/nifi-current/data/processed ์— ์žˆ๋Š” ๊ฒฐ๊ณผ ํŒŒ์ผ๋“ค์„ ๋ณด๋ฉด 10๊ฐœ์ž„์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

๋ฌธ์ œ ๋ฐ ํ•ด๊ฒฐ

  1. Nifi container ์ƒ์„ฑ

์ดˆ๊ธฐ์— apache/nifi:latest ๋ฒ„์ „์œผ๋กœ image pull์„ ์ˆ˜ํ–‰ํ•˜๋‹ˆ version 2๋ถ€ํ„ฐ ์ž๋™์œผ๋กœ https:๋กœ ์—ฐ๊ฒฐ๋˜๋„๋ก ์ •์˜๋˜์–ด ์žˆ์—ˆ๋‹ค. ๊ทธ๋ž˜์„œ ์ด์ „๊นŒ์ง€ Nifi๋ฅผ ๋นผ๊ณ  ๋‚˜๋จธ์ง€(postgresql, elasticsearch, airflow)๋งŒ์„ docker container๋กœ ๋งŒ๋“ค์—ˆ์œผ๋‚˜ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ณ  ์‹ถ์–ด ์—ฌ๋Ÿฌ๊ฐ€์ง€ ์‹œ๋„๋ฅผ ํ•˜๋‹ค๊ฐ€ ๋งˆ์ง€๋ง‰ ์ˆ˜๋‹จ์œผ๋กœ version์„ 1.28.0(๋ฒ„์ „์„ ๋‚ฎ์ถœ ๋–„ ์ฃผ์˜ํ•  ์ ์€ OS/ARCH == linux/arm64 ๋ฅผ ์ง€์›ํ•˜๋Š” docker image ์ธ์ง€ ํ™•์ธํ•ด์•ผ ํ•œ๋‹ค. ๋‚ด MAC์€ arm64์ด๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.) ์œผ๋กœ ๋‚ฎ์ถ”๋‹ˆ ์ •์ƒ์ ์œผ๋กœ http: port๋ฅผ ์‚ฌ์šฉํ•ด์„œ ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ์—ˆ๋‹ค.

  1. Nifi Processor

Nifi Processor๋ฅผ ์ƒ์„ฑํ•˜๊ณ  Properties๋ฅผ ์„ค์ •ํ•˜๋Š” ๊ฒƒ์€ ์ฑ…๊ณผ ๋™์ผํ•ด ํฐ ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ•˜์ง€ ์•Š์•˜๋‹ค. ๊ทธ๋Ÿฐ๋ฐ ๊ณ„์†ํ•ด์„œ SplitRecord ๋ถ€๋ถ„์—์„œ ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ–ˆ๋Š”๋ฐ, ์›์ธ์€ RELATIONS ์„ค์ •. ์ฆ‰, ๊ฐ๊ฐ์˜ Processor๋Š” ์›ํ•˜๋Š” ์—ฐ๊ฒฐ ex, success, splits, over . 40, matched ๋ฟ๋งŒ ์•„๋‹ˆ๋ผ failure, unmatched ๋“ฑ ์˜ˆ์ƒ์น˜ ๋ชปํ•œ ์ƒํ™ฉ์—์„œ ๋Œ€ํ•ด์„œ terminate | retry ๋ฅผ ์„ค์ •ํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค. ์‰ฝ๊ฒŒ ์ƒ๊ฐํ•˜๋ฉด ์ƒ๋‹จ์˜ !(Warning)์ด ํ•˜๋‚˜๋„ ์—†์–ด์•ผ ํ•œ๋‹ค.

Chapter 4. Working with Databases

Inserting data into Databases

make build๋ฅผ ์‹คํ–‰ํ•˜๋ฉด Makefile์—์„œ ์ž‘์„ฑํ•œ ๋Œ€๋กœ init_elasticsearch.sh, init_postgresql.sh๋ฅผ ์‹คํ–‰ ์‹œํ‚จ๋‹ค.

  • init_elasticsearch.sh
#!/bin/bash

# Elasticsearch ์ปจํ…Œ์ด๋„ˆ๊ฐ€ ์‹คํ–‰ ์ค‘์ธ์ง€ ํ™•์ธ
if ! docker ps | grep -q "elasticsearch"; then
    echo "Elasticsearch ์ปจํ…Œ์ด๋„ˆ๊ฐ€ ์‹คํ–‰๋˜๊ณ  ์žˆ์ง€ ์•Š์Šต๋‹ˆ๋‹ค."
    exit 1
fi

# Elasticsearch๊ฐ€ ์ค€๋น„๋  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ
echo "Elasticsearch๊ฐ€ ์ค€๋น„๋  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ ์ค‘..."
until $(curl --silent --output /dev/null --fail --max-time 5 http://localhost:9200); do
    printf '.'
    sleep 5
done
echo "Elasticsearch๊ฐ€ ์ค€๋น„๋˜์—ˆ์Šต๋‹ˆ๋‹ค."

# users ์ธ๋ฑ์Šค ์„ค์ •
echo "users ์ธ๋ฑ์Šค ์ƒ์„ฑ ์ค‘..."
curl -X PUT "localhost:9200/users" -H 'Content-Type: application/json' -d'
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "name": { "type": "text" },
      "street": { "type": "text" },
      "city": { "type": "text" },
      "zip": { "type": "keyword" },
      "lng": { "type": "double" },
      "lat": { "type": "double" }
    }
  }
}
'

# ๊ฒฐ๊ณผ ํ™•์ธ
if [ $? -eq 0 ]; then
    echo "users ์ธ๋ฑ์Šค๊ฐ€ ์„ฑ๊ณต์ ์œผ๋กœ ์ƒ์„ฑ๋˜์—ˆ์Šต๋‹ˆ๋‹ค."
else
    echo "users ์ธ๋ฑ์Šค ์ƒ์„ฑ ์ค‘ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค."
    exit 1
fi

# ์ธ๋ฑ์Šค ํ™•์ธ
echo "์ธ๋ฑ์Šค ๋ชฉ๋ก ํ™•์ธ:"
curl -X GET "localhost:9200/_cat/indices?v"
  • init_postgresql.sh
#!/bin/bash

# PostgreSQL ์ปจํ…Œ์ด๋„ˆ๊ฐ€ ์‹คํ–‰ ์ค‘์ธ์ง€ ํ™•์ธ
if ! docker ps | grep -q "postgres"; then
    echo "PostgreSQL ์ปจํ…Œ์ด๋„ˆ๊ฐ€ ์‹คํ–‰๋˜๊ณ  ์žˆ์ง€ ์•Š์Šต๋‹ˆ๋‹ค."
    exit 1
fi

# ๋จผ์ € ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์กด์žฌ ์—ฌ๋ถ€ ํ™•์ธ ๋ฐ ์ƒ์„ฑ
echo "dataengineering ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ƒ์„ฑ ์ค‘..."
docker exec -i postgres bash -c 'PGPASSWORD=airflow psql -U airflow -d airflow -c "CREATE DATABASE dataengineering;"'

# ์ด์ œ dataengineering ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ํ…Œ์ด๋ธ” ์ƒ์„ฑ
cat << 'EOF' > /tmp/create_tables.sql
-- users ํ…Œ์ด๋ธ”์ด ์กด์žฌํ•˜์ง€ ์•Š์œผ๋ฉด ์ƒ์„ฑ
CREATE TABLE IF NOT EXISTS users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    street VARCHAR(200),
    city VARCHAR(100),
    zip VARCHAR(10),
    lng DECIMAL(10, 6),
    lat DECIMAL(10, 6)
);

-- ํ…Œ์ด๋ธ”์ด ์„ฑ๊ณต์ ์œผ๋กœ ์ƒ์„ฑ๋˜์—ˆ๋Š”์ง€ ํ™•์ธ
\dt
-- ํ…Œ์ด๋ธ” ๊ตฌ์กฐ ํ™•์ธ
\d users
EOF

# PostgreSQL ์ปจํ…Œ์ด๋„ˆ์— SQL ํŒŒ์ผ ๋ณต์‚ฌ
docker cp /tmp/create_tables.sql postgres:/tmp/create_tables.sql

# PostgreSQL ์ปจํ…Œ์ด๋„ˆ ๋‚ด์—์„œ SQL ์Šคํฌ๋ฆฝํŠธ ์‹คํ–‰ (dataengineering ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์ง์ ‘ ์—ฐ๊ฒฐ)
echo "ํ…Œ์ด๋ธ” ์ƒ์„ฑ ์ค‘..."
docker exec -i postgres bash -c 'PGPASSWORD=airflow psql -U airflow -d dataengineering -f /tmp/create_tables.sql'

# ๊ถŒํ•œ ๋ถ€์—ฌ
echo "๊ถŒํ•œ ๋ถ€์—ฌ ์ค‘..."
docker exec -i postgres bash -c 'PGPASSWORD=airflow psql -U airflow -d dataengineering -c "GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO airflow; GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO airflow;"'

# ์‹คํ–‰ ๊ฒฐ๊ณผ ํ™•์ธ
if [ $? -eq 0 ]; then
    echo "๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ ํ…Œ์ด๋ธ”์ด ์„ฑ๊ณต์ ์œผ๋กœ ์ƒ์„ฑ๋˜์—ˆ์Šต๋‹ˆ๋‹ค."
    # ์ž„์‹œ ํŒŒ์ผ ์‚ญ์ œ
    rm /tmp/create_tables.sql
    docker exec postgres rm /tmp/create_tables.sql
else
    echo "์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค."
    # ์ž„์‹œ ํŒŒ์ผ ์‚ญ์ œ
    rm /tmp/create_tables.sql
    docker exec postgres rm /tmp/create_tables.sql
    exit 1
fi

# ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์—ฐ๊ฒฐ ํ…Œ์ŠคํŠธ
echo "๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์—ฐ๊ฒฐ ํ…Œ์ŠคํŠธ ์ค‘..."
docker exec postgres pg_isready -U airflow -d dataengineering

if [ $? -eq 0 ]; then
    echo "๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๊ฐ€ ์ •์ƒ์ ์œผ๋กœ ์‘๋‹ตํ•ฉ๋‹ˆ๋‹ค."
else
    echo "๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์—ฐ๊ฒฐ ํ…Œ์ŠคํŠธ์— ์‹คํŒจํ–ˆ์Šต๋‹ˆ๋‹ค."
    exit 1
fi

์ด๋ฅผ ํ†ตํ•ด ๊ธฐ๋ณธ ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ๋‘๊ฐœ์˜ ๋ฐ์ดํ„ฐ๋ฅผ PostgresSQL, ElasticSearch์— ๊ฐ๊ฐ ์ €์žฅํ•œ๋‹ค. ์•„๋ž˜์˜ Log์„ ํ†ตํ•ด ์ •์ƒ์ ์œผ๋กœ ์ €์žฅ๋˜์—ˆ์Œ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

==============================================
Waiting for PostgreSQL to start...
==============================================
=====================================
Initializing PostgreSQL...
=====================================
chmod +x ./scripts/init_postgresql.sh
./scripts/init_postgresql.sh
dataengineering ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ƒ์„ฑ ์ค‘...
ERROR:  database "dataengineering" already exists
Successfully copied 2.05kB to postgres:/tmp/create_tables.sql
ํ…Œ์ด๋ธ” ์ƒ์„ฑ ์ค‘...
psql:/tmp/create_tables.sql:10: NOTICE:  relation "users" already exists, skipping
CREATE TABLE
        List of relations
 Schema | Name  | Type  |  Owner  
--------+-------+-------+---------
 public | users | table | airflow
(1 row)

                                    Table "public.users"
 Column |          Type          | Collation | Nullable |              Default              
--------+------------------------+-----------+----------+-----------------------------------
 id     | integer                |           | not null | nextval('users_id_seq'::regclass)
 name   | character varying(100) |           | not null | 
 street | character varying(200) |           |          | 
 city   | character varying(100) |           |          | 
 zip    | character varying(10)  |           |          | 
 lng    | numeric(10,6)          |           |          | 
 lat    | numeric(10,6)          |           |          | 
Indexes:
    "users_pkey" PRIMARY KEY, btree (id)

๊ถŒํ•œ ๋ถ€์—ฌ ์ค‘...
GRANT
๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ ํ…Œ์ด๋ธ”์ด ์„ฑ๊ณต์ ์œผ๋กœ ์ƒ์„ฑ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.
๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์—ฐ๊ฒฐ ํ…Œ์ŠคํŠธ ์ค‘...
/var/run/postgresql:5432 - accepting connections
๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๊ฐ€ ์ •์ƒ์ ์œผ๋กœ ์‘๋‹ตํ•ฉ๋‹ˆ๋‹ค.




==============================================
Waiting for Elasticsearch to start...
==============================================
=====================================
Initializing Elasticsearch...
=====================================
chmod +x ./scripts/init_elasticsearch.sh
./scripts/init_elasticsearch.sh
Elasticsearch๊ฐ€ ์ค€๋น„๋  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ ์ค‘...
Elasticsearch๊ฐ€ ์ค€๋น„๋˜์—ˆ์Šต๋‹ˆ๋‹ค.
users ์ธ๋ฑ์Šค ์ƒ์„ฑ ์ค‘...
{"error":{"root_cause":[{"type":"resource_already_exists_exception","reason":"index [users/AW57UPmxTYyW3G-GdL6lHw] already exists","index_uuid":"AW57UPmxTYyW3G-GdL6lHw","index":"users"}],"type":"resource_already_exists_exception","reason":"index [users/AW57UPmxTYyW3G-GdL6lHw] already exists","index_uuid":"AW57UPmxTYyW3G-GdL6lHw","index":"users"},"status":400}users ์ธ๋ฑ์Šค๊ฐ€ ์„ฑ๊ณต์ ์œผ๋กœ ์ƒ์„ฑ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.
์ธ๋ฑ์Šค ๋ชฉ๋ก ํ™•์ธ:
health status index                                                              uuid                   pri rep docs.count docs.deleted store.size pri.store.size dataset.size
green  open   .internal.alerts-transform.health.alerts-default-000001            MSc-VAFyQHG9tGk2TxwbGg   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-observability.logs.alerts-default-000001          Bief08evQ_SX_3UrwZOzwQ   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-observability.uptime.alerts-default-000001        N-ptFAoNTYeF7OHGtOqZFw   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-ml.anomaly-detection.alerts-default-000001        JL9JINenTS-XMJClxKXrYA   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-observability.slo.alerts-default-000001           rFwL_h62QZmQx8kkXFZIyA   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-default.alerts-default-000001                     6xFM1NqvTc--9BJ7MlhCpA   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-observability.apm.alerts-default-000001           D1xM5G2DTPeF25kkR6_KWQ   1   0          0            0       249b           249b         249b
green  open   users                                                              AW57UPmxTYyW3G-GdL6lHw   1   0       1000            0    229.1kb        229.1kb      229.1kb
green  open   .internal.alerts-observability.metrics.alerts-default-000001       IO4Li-8sS6-AH8aSsvEqLg   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-ml.anomaly-detection-health.alerts-default-000001 95sA140IQ2qCyErtmHKTEg   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-observability.threshold.alerts-default-000001     TV3IWC9fQUuYWQO-56_5sA   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-security.alerts-default-000001                    VE-qcLiURZy7h_i1hb96Lw   1   0          0            0       249b           249b         249b
green  open   .internal.alerts-stack.alerts-default-000001                       _PEhTLb4TJ2bYAwiv6kz8w   1   0          0            0       249b           249b         249b

Create index pattern in ElasticSearch

PostgreSQL UI๋ฅผ ์œ„ํ•œ pgAdmin์€ ๋”ฐ๋กœ ์„ค์น˜ํ•˜์ง€ ์•Š์•˜๊ธฐ ๋•Œ๋ฌธ์— psql์„ ํ†ตํ•ด ํ™•์ธํ•˜๋Š” ๊ฒƒ์œผ๋กœ ํ•˜๊ณ  ElasticSearch์— ๋ฐ์ดํ„ฐ๊ฐ€ ์ ์ ˆํžˆ ์ €์žฅ๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•˜๊ธฐ ์œ„ํ•ด ์•ž์„œ ์„ค์น˜ํ•œ Kibana๋ฅผ ์ด์šฉํ•œ๋‹ค. init_elasticsearch.sh์—์„œ users index๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ์ดˆ๊ธฐ ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•ด์ฃผ์—ˆ๋‹ค.

http://localhost:5601์— ์ ‘๊ทผํ•˜๊ณ , ๋‹ค์Œ์˜ ๊ณผ์ •์„ ํ†ตํ•ด์„œ ๋ฐ์ดํ„ฐ๊ฐ€ ์ •์ƒ์ ์œผ๋กœ ElasticSearch์— ์ €์žฅ๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•ด์ค€๋‹ค.

  1. ์™ผ์ชฝ Toolbar์—์„œ Analytics -> Discover ํด๋ฆญ

  1. Create index pattern ํด๋ฆญ

  1. ์˜ค๋ฅธ์ชฝ ํ™”๋ฉด์„ ๋ณด๋ฉด ์•ž์„œ init_elasticsearch.sh๋ฅผ ํ†ตํ•ด ์ƒ์„ฑํ•œ users index๊ฐ€ ๋ณด์ธ๋‹ค. ๊ทธ๋ ‡๊ธฐ์— ์™ผ์ชฝ์— Name์— uses๋ฅผ ๋„ฃ์–ด์ค€๋‹ค.

  1. index๊ฐ€ ์—ฐ๊ฒฐ๋˜์—ˆ๊ธฐ์— field type๋“ค์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

  1. ์™ผ์ชฝ Toolbardptj Discover ํด๋ฆญ. ์ €์žฅ๋œ ๊ฐ’๋“ค์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

Building data pipelines in Apache Airflow

  1. DAG ์ž‘์„ฑ
import os
import datetime as dt
from datetime import timedelta
import logging
import pandas as pd
import psycopg2
from psycopg2.extras import DictCursor

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.exceptions import AirflowException
from elasticsearch import Elasticsearch

# ๋กœ๊น… ์„ค์ •
logger = logging.getLogger(__name__)

# ๋ชจ๋“ˆ ๊ฐ€์ ธ์˜ค๊ธฐ ์‹œ๋„ - ์ž„ํฌํŠธ ๊ฒฝ๋กœ ๋ฌธ์ œ ํ•ด๊ฒฐ
import sys
import importlib.util
import inspect

# ํ˜„์žฌ ํŒŒ์ผ์˜ ์ ˆ๋Œ€ ๊ฒฝ๋กœ
current_file = inspect.getfile(inspect.currentframe())
# ํ˜„์žฌ ๋””๋ ‰ํ† ๋ฆฌ (airflow/dags)
current_dir = os.path.dirname(os.path.abspath(current_file))
# Airflow ํ™ˆ ๋””๋ ‰ํ† ๋ฆฌ (/opt/airflow)
airflow_home = os.path.dirname(os.path.dirname(current_dir))
# app ๋””๋ ‰ํ† ๋ฆฌ ๊ฒฝ๋กœ
app_dir = os.path.join(airflow_home, "app")

# Python ๊ฒฝ๋กœ์— app ๋””๋ ‰ํ† ๋ฆฌ ์ถ”๊ฐ€
if app_dir not in sys.path:
    sys.path.insert(0, app_dir)
    logger.info(f"Python ๊ฒฝ๋กœ์— ์ถ”๊ฐ€๋จ: {app_dir}")

# ๊ฒฝ๋กœ ๋””๋ฒ„๊น…
logger.info(f"Python ๊ฒฝ๋กœ: {sys.path}")
logger.info(f"ํ˜„์žฌ ๋””๋ ‰ํ† ๋ฆฌ: {os.getcwd()}")
logger.info(f"ํ˜„์žฌ ํŒŒ์ผ: {current_file}")
logger.info(f"app ๋””๋ ‰ํ† ๋ฆฌ ๊ฒฝ๋กœ: {app_dir}")

# ๋ชจ๋“ˆ ๊ฐ€์ ธ์˜ค๊ธฐ ์‹œ๋„
USE_REPOSITORY_PATTERN = False
try:
    from src.database.repository import RepositoryFactory
    from src.config.database import PostgresConfig, ElasticsearchConfig
    from src.utils.connection import PostgresConnector, ElasticsearchConnector

    # ์„ฑ๊ณต์ ์œผ๋กœ ๊ฐ€์ ธ์™”๋Š”์ง€ ํ™•์ธ
    if all(
        [
            RepositoryFactory,
            PostgresConfig,
            ElasticsearchConfig,
            PostgresConnector,
            ElasticsearchConnector,
        ]
    ):
        USE_REPOSITORY_PATTERN = True
        logger.info("Repository ๋ชจ๋“ˆ ๊ฐ€์ ธ์˜ค๊ธฐ ์„ฑ๊ณต")
except ImportError as e:
    logger.error(f"Repository ๋ชจ๋“ˆ ๊ฐ€์ ธ์˜ค๊ธฐ ์‹คํŒจ: {e}")
    logger.info("์ง์ ‘ ๊ตฌํ˜„์œผ๋กœ ๋Œ€์ฒดํ•ฉ๋‹ˆ๋‹ค")

# DAG ์„ค์ •
default_args = {
    "owner": "Se Hyeon Kim",
    "start_date": dt.datetime(2025, 3, 1),
    "retries": 3,
    "retry_delay": dt.timedelta(minutes=5),
    "email_on_failure": True,
    "email_on_retry": False,
}

# PostgreSQL ์—ฐ๊ฒฐ ์ •๋ณด
PG_HOST = "postgres"  # Docker Compose์˜ ์„œ๋น„์Šค ์ด๋ฆ„
PG_PORT = 5432
PG_DATABASE = "airflow"
PG_USER = "airflow"
PG_PASSWORD = "airflow"

# Elasticsearch ์—ฐ๊ฒฐ ์ •๋ณด
ES_HOST = "elasticsearch"  # Docker Compose์˜ ์„œ๋น„์Šค ์ด๋ฆ„
ES_PORT = 9200


def extract_from_postgresql(**context):
    """PostgreSQL์—์„œ ์‚ฌ์šฉ์ž ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”์ถœํ•˜์—ฌ CSV ํŒŒ์ผ๋กœ ์ €์žฅ"""
    global USE_REPOSITORY_PATTERN  # ์ „์—ญ ๋ณ€์ˆ˜๋กœ ์„ ์–ธ

    try:
        if USE_REPOSITORY_PATTERN:
            # Repository ํŒจํ„ด์„ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ ์ ‘๊ทผ
            try:
                logger.info("Repository ํŒจํ„ด์œผ๋กœ PostgreSQL ์ ‘๊ทผ ์‹œ๋„")
                postgres_config = PostgresConfig()
                postgres_connector = PostgresConnector(config=postgres_config)
                repository = RepositoryFactory.create(
                    "postgresql", connector=postgres_connector
                )

                # ์—ฐ๊ฒฐ ํ™•์ธ
                if not repository.check_connection():
                    raise Exception("PostgreSQL ์—ฐ๊ฒฐ์— ์‹คํŒจํ–ˆ์Šต๋‹ˆ๋‹ค.")

                # ๋ฐ์ดํ„ฐ ์กฐํšŒ
                users = repository.get_all(limit=1000)

                if not users:
                    logger.warning("PostgreSQL์—์„œ ์กฐํšŒ๋œ ์‚ฌ์šฉ์ž ์—†์Œ")
                    return False

                # DataFrame์œผ๋กœ ๋ณ€ํ™˜
                df = pd.DataFrame(users)
                logger.info(
                    f"Repository ํŒจํ„ด์œผ๋กœ {len(df)}๋ช…์˜ ์‚ฌ์šฉ์ž ๋ฐ์ดํ„ฐ ์ถ”์ถœ ์™„๋ฃŒ"
                )
            except Exception as repo_error:
                logger.error(f"Repository ํŒจํ„ด ์‚ฌ์šฉ ์‹คํŒจ: {repo_error}")
                logger.info("์ง์ ‘ DB ์—ฐ๊ฒฐ๋กœ ๋Œ€์ฒดํ•ฉ๋‹ˆ๋‹ค")
                # Repository ํŒจํ„ด ์‹คํŒจ ์‹œ ์ง์ ‘ ์—ฐ๊ฒฐ๋กœ ๋Œ€์ฒด
                USE_REPOSITORY_PATTERN = False
                raise repo_error

        if not USE_REPOSITORY_PATTERN:
            # ์ง์ ‘ psycopg2๋กœ PostgreSQL์— ์—ฐ๊ฒฐ
            conn_string = f"host={PG_HOST} port={PG_PORT} dbname={PG_DATABASE} user={PG_USER} password={PG_PASSWORD}"
            logger.info(f"PostgreSQL ์ง์ ‘ ์—ฐ๊ฒฐ: {conn_string}")

            conn = psycopg2.connect(conn_string)

            with conn.cursor(cursor_factory=DictCursor) as cur:
                # users ํ…Œ์ด๋ธ”์ด ์—†์œผ๋ฏ€๋กœ ๋‹ค๋ฅธ ํ…Œ์ด๋ธ” ์‚ฌ์šฉ (์˜ˆ: dag_run)
                cur.execute("SELECT * FROM dag_run LIMIT 100")
                rows = cur.fetchall()

                if not rows:
                    logger.warning("PostgreSQL์—์„œ ์กฐํšŒ๋œ ๋ฐ์ดํ„ฐ ์—†์Œ")
                    return False

                # ๋ฐ์ดํ„ฐ๋ฅผ ๋”•์…”๋„ˆ๋ฆฌ ๋ฆฌ์ŠคํŠธ๋กœ ๋ณ€ํ™˜
                data = [dict(row) for row in rows]
                df = pd.DataFrame(data)

            conn.close()

        # CSV ์ €์žฅ
        output_path = "/tmp/postgresql_users.csv"
        df.to_csv(output_path, index=False)
        context["ti"].xcom_push(key="csv_path", value=output_path)

        logger.info(f"PostgreSQL์—์„œ {len(df)}๊ฐœ์˜ ๋ฐ์ดํ„ฐ ์ถ”์ถœ ์™„๋ฃŒ")
        return True
    except Exception as e:
        logger.error(f"PostgreSQL ๋ฐ์ดํ„ฐ ์ถ”์ถœ ์‹คํŒจ: {e}")

        # ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ (์‹ค์ œ DB ์—ฐ๊ฒฐ์ด ์—†๋Š” ๊ฒฝ์šฐ๋ฅผ ์œ„ํ•œ ๋ฐฑ์—…)
        try:
            logger.info("ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค")
            test_data = [
                {
                    "id": 1,
                    "name": "User 1",
                    "street": "Street 1",
                    "city": "City 1",
                    "zip": "11111",
                    "lng": "1.1",
                    "lat": "1.1",
                },
                {
                    "id": 2,
                    "name": "User 2",
                    "street": "Street 2",
                    "city": "City 2",
                    "zip": "22222",
                    "lng": "2.2",
                    "lat": "2.2",
                },
                {
                    "id": 3,
                    "name": "User 3",
                    "street": "Street 3",
                    "city": "City 3",
                    "zip": "33333",
                    "lng": "3.3",
                    "lat": "3.3",
                },
            ]
            df = pd.DataFrame(test_data)
            output_path = "/tmp/postgresql_users.csv"
            df.to_csv(output_path, index=False)
            context["ti"].xcom_push(key="csv_path", value=output_path)
            logger.info("ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ ์™„๋ฃŒ")
            return True
        except Exception as test_error:
            logger.error(f"ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ ์‹คํŒจ: {test_error}")
            raise AirflowException(f"๋ฐ์ดํ„ฐ ์ถ”์ถœ ์˜ค๋ฅ˜: {str(e)}")


def load_to_elasticsearch(**context):
    """CSV ํŒŒ์ผ ๋ฐ์ดํ„ฐ๋ฅผ Elasticsearch์— ์ ์žฌ"""
    global USE_REPOSITORY_PATTERN  # ์ „์—ญ ๋ณ€์ˆ˜๋กœ ์„ ์–ธ

    try:
        # CSV ํŒŒ์ผ ๊ฒฝ๋กœ ๊ฐ€์ ธ์˜ค๊ธฐ
        ti = context["ti"]
        csv_path = ti.xcom_pull(task_ids="extract_postgresql_data", key="csv_path")

        if not csv_path:
            raise AirflowException(
                "์ด์ „ ํƒœ์Šคํฌ์—์„œ CSV ํŒŒ์ผ ๊ฒฝ๋กœ๋ฅผ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค."
            )

        # CSV ํŒŒ์ผ ์ฝ๊ธฐ
        df = pd.read_csv(csv_path)

        if df.empty:
            logger.warning("์ ์žฌํ•  ์‚ฌ์šฉ์ž ๋ฐ์ดํ„ฐ๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค")
            return False

        if USE_REPOSITORY_PATTERN:
            # Repository ํŒจํ„ด์„ ์‚ฌ์šฉํ•˜์—ฌ Elasticsearch์— ์ ์žฌ
            try:
                logger.info("Repository ํŒจํ„ด์œผ๋กœ Elasticsearch ์ ‘๊ทผ ์‹œ๋„")
                es_config = ElasticsearchConfig()
                es_connector = ElasticsearchConnector(config=es_config)
                repository = RepositoryFactory.create(
                    "elasticsearch",
                    connector=es_connector,
                    index="users_from_postgresql",
                )

                # ์—ฐ๊ฒฐ ํ™•์ธ
                if not repository.check_connection():
                    raise Exception("Elasticsearch ์—ฐ๊ฒฐ์— ์‹คํŒจํ–ˆ์Šต๋‹ˆ๋‹ค.")

                # ๋ฐ์ดํ„ฐ ๋ณ€ํ™˜ ๋ฐ ์ ์žฌ
                records = df.to_dict("records")
                inserted_count = repository.bulk_save(records)

                logger.info(f"Elasticsearch์— {inserted_count}๊ฐœ ๋ฌธ์„œ ์ ์žฌ ์™„๋ฃŒ")
                return True
            except Exception as repo_error:
                logger.error(f"Repository ํŒจํ„ด ์‚ฌ์šฉ ์‹คํŒจ: {repo_error}")
                logger.info("์ง์ ‘ Elasticsearch ์—ฐ๊ฒฐ๋กœ ๋Œ€์ฒดํ•ฉ๋‹ˆ๋‹ค")
                # ์‹คํŒจ ์‹œ ์ง์ ‘ ์—ฐ๊ฒฐ๋กœ ๋Œ€์ฒด
                USE_REPOSITORY_PATTERN = False
                raise repo_error

        if not USE_REPOSITORY_PATTERN:
            # Elasticsearch ์ง์ ‘ ์—ฐ๊ฒฐ
            es_url = f"http://{ES_HOST}:{ES_PORT}"
            logger.info(f"Elasticsearch ์ง์ ‘ ์—ฐ๊ฒฐ: {es_url}")

            es = Elasticsearch([es_url])

            # ์—ฐ๊ฒฐ ํ™•์ธ
            if not es.ping():
                logger.error("Elasticsearch ์—ฐ๊ฒฐ ์‹คํŒจ")
                logger.info(
                    "์ž‘์—…์„ ์™„๋ฃŒ๋œ ๊ฒƒ์œผ๋กœ ํ‘œ์‹œํ•ฉ๋‹ˆ๋‹ค (์‹ค์ œ ๋ฐ์ดํ„ฐ๋Š” ์ ์žฌ๋˜์ง€ ์•Š์Œ)"
                )
                return True  # ํ…Œ์ŠคํŠธ ํ™˜๊ฒฝ์—์„œ๋Š” ์„ฑ๊ณต์œผ๋กœ ์ฒ˜๋ฆฌ

            # ๋ฐ์ดํ„ฐ ๋ณ€ํ™˜ ๋ฐ ์ ์žฌ
            index_name = "users_from_postgresql"
            bulk_data = []
            for _, row in df.iterrows():
                bulk_data.append({"index": {"_index": index_name}})
                bulk_data.append(row.to_dict())

            if bulk_data:
                es.bulk(operations=bulk_data, refresh=True)

            logger.info(f"Elasticsearch์— {len(df)}๊ฐœ ๋ฌธ์„œ ์ ์žฌ ์™„๋ฃŒ")
            return True

    except Exception as e:
        logger.error(f"Elasticsearch ๋ฐ์ดํ„ฐ ์ ์žฌ ์‹คํŒจ: {e}")
        # ๊ฐœ๋ฐœ/ํ…Œ์ŠคํŠธ ํ™˜๊ฒฝ์—์„œ๋Š” ์ด ์˜ค๋ฅ˜๋ฅผ ๋ฌด์‹œํ•˜๊ณ  ์ง„ํ–‰
        logger.info("Elasticsearch ์ ์žฌ์— ์‹คํŒจํ–ˆ์ง€๋งŒ ์ž‘์—…์„ ์™„๋ฃŒ๋œ ๊ฒƒ์œผ๋กœ ํ‘œ์‹œํ•ฉ๋‹ˆ๋‹ค")
        return True


# DAG ์ •์˜
with DAG(
    dag_id="user_data_transfer",
    default_args=default_args,
    description="PostgreSQL ์‚ฌ์šฉ์ž ๋ฐ์ดํ„ฐ๋ฅผ Elasticsearch๋กœ ์ „์†ก",
    schedule_interval=timedelta(hours=1),
    catchup=False,
    tags=["postgresql", "elasticsearch", "user_data"],
) as dag:

    extract_task = PythonOperator(
        task_id="extract_postgresql_data",
        python_callable=extract_from_postgresql,
    )

    load_task = PythonOperator(
        task_id="load_elasticsearch_data",
        python_callable=load_to_elasticsearch,
    )

    # ํƒœ์Šคํฌ ์˜์กด์„ฑ ์„ค์ •
    extract_task >> load_task
  1. ์‹คํ–‰ ๊ฒฐ๊ณผ

  • extract_postgresql_data
ac7a746c901c
 โ–ถ Log message source details
[2025-03-08, 04:04:08 UTC] {local_task_job_runner.py:123} โ–ถ Pre task execution logs
[2025-03-08, 04:04:08 UTC] {main.py:128} INFO - PostgreSQL ์ง์ ‘ ์—ฐ๊ฒฐ: host=postgres port=5432 dbname=*** user=*** password=***
[2025-03-08, 04:04:08 UTC] {main.py:152} INFO - PostgreSQL์—์„œ 1๊ฐœ์˜ ๋ฐ์ดํ„ฐ ์ถ”์ถœ ์™„๋ฃŒ
[2025-03-08, 04:04:08 UTC] {python.py:240} INFO - Done. Returned value was: True
[2025-03-08, 04:04:08 UTC] {taskinstance.py:341} โ–ถ Post task execution logs
  • load_elasticsearch_data
ac7a746c901c
 โ–ถ Log message source details
[2025-03-08, 04:04:09 UTC] {local_task_job_runner.py:123} โ–ถ Pre task execution logs
[2025-03-08, 04:04:09 UTC] {main.py:253} INFO - Elasticsearch ์ง์ ‘ ์—ฐ๊ฒฐ: http://elasticsearch:9200
[2025-03-08, 04:04:09 UTC] {_transport.py:349} INFO - HEAD http://elasticsearch:9200/ [status:200 duration:0.003s]
[2025-03-08, 04:04:09 UTC] {_transport.py:349} INFO - PUT http://elasticsearch:9200/_bulk?refresh=true [status:200 duration:0.096s]
[2025-03-08, 04:04:09 UTC] {main.py:275} INFO - Elasticsearch์— 1๊ฐœ ๋ฌธ์„œ ์ ์žฌ ์™„๋ฃŒ
[2025-03-08, 04:04:09 UTC] {python.py:240} INFO - Done. Returned value was: True
[2025-03-08, 04:04:09 UTC] {taskinstance.py:341} โ–ถ Post task execution logs

Handling databases with NiFi processorss

Extracting data from PostgreSQL

app-py3.12 {seilylook} โ˜•๏ธ curl -X GET "localhost:9200/_cat/indices?v"                                          
health status index                            uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .geoip_databases                 7wlLmCbzT-68QQtczL1XlQ   1   0         39            0     36.2mb         36.2mb
green  open   .kibana_task_manager_7.17.28_001 32xC4NKOQCWKlK_Lta3KuA   1   0         17         1206    267.3kb        267.3kb
yellow open   fromnifi                         7wEUB-_YTI6ZpVo9nVu0Fg   1   1     450400            0    189.9mb        189.9mb
green  open   .kibana_7.17.28_001              lUIvEKPPQwiGzG94W_LaVA   1   0         15            0      2.3mb          2.3mb
green  open   .apm-custom-link                 guOxO9XJSdmOvvEuTyOdnA   1   0          0            0       227b           227b
green  open   .apm-agent-configuration         77OEbeUsSsW8bGctTzmD1A   1   0          0            0       227b           227b
green  open   users                            iI4uC4UiRPyFBgts1aAB7w   1   0       1000            0    231.9kb        231.9kb

๋ฌธ์ œ ํ•ด๊ฒฐ

Docker ํ™˜๊ฒฝ์—์„œ Postgres, Elasticsearch, Nifi๋ฅผ ์‹คํ–‰ํ•˜๋Š” ์ƒํƒœ์ด๊ธฐ ๋•Œ๋ฌธ์— ๋‹ค์Œ๊ณผ ๊ฐ™์ด ๋‹ค๋ฅด๊ฒŒ ๋”ฐ๋กœ ์„ธํŒ…์ด ํ•„์š”ํ•˜๋‹ค.

PostgreSQL, Elasticsearch connection ํ•  ๋•Œ, postgres๋Š” jar๋ฅผ ์ด์šฉํ•ด์•ผ ํ•œ๋‹ค. PostgreSQL ๊ณต์‹ ํ™ˆํŽ˜์ด์ง€์—์„œ ์‚ฌ์šฉํ•˜๊ณ ์ž ํ•˜๋Š” ๋ฒ„์ „์˜ jar์„ ๋‹ค์šด๋ฐ›์•„ local์˜ nifi/drivers์— ์ €์žฅํ•ด์คฌ๋‹ค. ์ด๋ฅผ docker-compose์—์„œ nifi container๋ฅผ ๋งŒ๋“ค ๋•Œ mount ์‹œ์ผœ์„œ nifi container์—์„œ PostgreSQL jar๋ฅผ ์ธ์‹ํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•ด์ค€๋‹ค.

services:
  nifi:
    image: apache/nifi:1.28.0
    container_name: nifi
    restart: always
    ports:
      - "9300:9300"
    environment:
      - NIFI_WEB_HTTP_HOST=0.0.0.0
      - NIFI_WEB_HTTP_PORT=9300
      - NIFI_WEB_PROXY_HOST=localhost:9300
      - SINGLE_USER_CREDENTIALS_USERNAME=nifi
      - SINGLE_USER_CREDENTIALS_PASSWORD=nifipassword
    volumes:
      - nifi-system-data:/opt/nifi/nifi-current/system-data  # ๋‚ด๋ถ€ ์‹œ์Šคํ…œ ๋ฐ์ดํ„ฐ
      - ./logs/nifi:/opt/nifi/nifi-current/logs
      - nifi-conf:/opt/nifi/nifi-current/conf
      - ./nifi/data/raw:/opt/nifi/nifi-current/data/raw      # ์›๋ณธ ๋ฐ์ดํ„ฐ
      - ./nifi/data/processed:/opt/nifi/nifi-current/data/processed  # ์ฒ˜๋ฆฌ๋œ ๋ฐ์ดํ„ฐ
      - ./nifi/templates:/opt/nifi/nifi-current/templates
      - ./nifi/drivers:/opt/nifi/nifi-current/lib/custom-drivers

์˜ฌ๋ฐ”๋ฅด๊ฒŒ mount ๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•ด์ค€๋‹ค.

app-py3.12 {seilylook} โ˜•๏ธ docker exec -i -t nifi /bin/bash    

nifi@d8ffbc7cb03c:/opt/nifi/nifi-current$ cd lib/custom-drivers/
nifi@d8ffbc7cb03c:/opt/nifi/nifi-current/lib/custom-drivers$ ls
postgresql-42.7.5.jar

์ด์–ด์„œ DBCPConnectionPool ์„œ๋น„์Šค๋ฅผ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์„ค์ •ํ•ด์ค€๋‹ค.

  • Database Connection URL:

jdbc:postgresql://postgres:5432/dataengineering

  • Database Driver Class Name:

org.postgresql.Driver

  • Database Driver Locations:

/opt/nifi/nifi-current/lib/custom-drivers/postgresql-42.7.1.jar

  • Database User:

airflow

  • Password:

airflow

๋งˆ์ง€๋ง‰์œผ๋กœ PutElasticsearchHtttp processor์˜ properties์—์„œ URL์™€ Port๋ฅผ ๋‹ค์Œ์œผ๋กœ ์„ค์ •ํ•ด์ค€๋‹ค.

http://elasticsearch:9200

Chapter 6. Building a 311 Data Pipeline

Building the data pipeline

NiFi data extraction & Load ๋ฌธ์ œ ํ•ด๊ฒฐ ๊ณผ์ •

๋ฌธ์ œ ์ƒํ™ฉ

NiFi ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์—์„œ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์›Œํฌํ”Œ๋กœ์šฐ๋ฅผ ๊ตฌ์„ฑํ–ˆ์Šต๋‹ˆ๋‹ค:

  1. ExecuteScript (1์ฐจ): ์™ธ๋ถ€ API์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ค๋Š” ์Šคํฌ๋ฆฝํŠธ
  2. SplitJSON: ๊ฐ€์ ธ์˜จ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐœ๋ณ„ ๋ ˆ์ฝ”๋“œ๋กœ ๋ถ„ํ• 
  3. ExecuteScript (2์ฐจ): ๊ฐ ๋ ˆ์ฝ”๋“œ์— ์ถ”๊ฐ€ ํ•„๋“œ ์ƒ์„ฑ ๋ฐ ๋ฐ์ดํ„ฐ ๋ณ€ํ™˜
  4. EvaluateJsonPath: JSON์—์„œ id ํ•„๋“œ ์ถ”์ถœ
  5. PutElasticsearchHTTP: Elasticsearch์— ๋ฐ์ดํ„ฐ ์ €์žฅ

๊ทธ๋Ÿฌ๋‚˜ ๋งˆ์ง€๋ง‰ PutElasticsearchHTTP ํ”„๋กœ์„ธ์„œ์—์„œ ๋‹ค์Œ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค:

Failed to process Flowfile due to failed to parse, transfering to failure

๋กœ๊ทธ๋ฅผ ์ž์„ธํžˆ ์‚ดํŽด๋ณด๋‹ˆ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๊ตฌ์ฒด์ ์ธ ์˜ค๋ฅ˜ ๋ฉ”์‹œ์ง€๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ์—ˆ์Šต๋‹ˆ๋‹ค:

Index operation upsert requires a valid identifier value from a flow file attribute, transferring to failure.
๋ฌธ์ € ๋ถ„์„

๋กœ๊ทธ ๋ถ„์„ ๊ฒฐ๊ณผ, ๋‹ค์Œ ์‚ฌํ•ญ์„ ํ™•์ธํ–ˆ์Šต๋‹ˆ๋‹ค:

  1. ์›์ธ: PutElasticsearchHTTP ํ”„๋กœ์„ธ์„œ๊ฐ€ 'upsert' ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๊ธฐ ์œ„ํ•ด ํ•„์š”ํ•œ ์‹๋ณ„์ž(ID) ๊ฐ’์ด ์˜ฌ๋ฐ”๋ฅด๊ฒŒ ์„ค์ •๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.

  2. ์„ธ๋ถ€ ์‚ฌํ•ญ:

    • FlowFile ํฌ๊ธฐ๊ฐ€ ๋ชจ๋‘ 8๋ฐ”์ดํŠธ๋กœ, ์‹ค์ œ JSON ๋ฐ์ดํ„ฐ๊ฐ€ ์•„๋‹ ๊ฐ€๋Šฅ์„ฑ์ด ๋†’์•˜์Šต๋‹ˆ๋‹ค.
    • EvaluateJsonPath ํ”„๋กœ์„ธ์„œ์—์„œ id๋ฅผ ์ถ”์ถœํ•˜๊ณ  ์žˆ์—ˆ์ง€๋งŒ, ์ด ๊ฐ’์ด PutElasticsearchHTTP ํ”„๋กœ์„ธ์„œ์—์„œ ์ธ์‹ํ•˜๋Š” ํ˜•์‹์œผ๋กœ ์„ค์ •๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.
ํ•ด๊ฒฐ ๋ฐฉ๋ฒ•
  1. PutElasticsearchHTTP ํ”„๋กœ์„ธ์„œ ์„ค์ • ์ˆ˜์ • PutElasticsearchHTTP ํ”„๋กœ์„ธ์„œ์˜ ์„ค์ •์„ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ˆ˜์ •ํ–ˆ์Šต๋‹ˆ๋‹ค:

    • Identifier Attribute: id (EvaluateJsonPath์—์„œ ์ถ”์ถœํ•œ ์†์„ฑ๋ช…๊ณผ ์ผ์น˜ํ•˜๋„๋ก ์„ค์ •)
    • Index Operation: ๊ธฐ์กด upsert ์„ค์ • ํ™•์ธ
    • Type: Elasticsearch ๋ฒ„์ „์— ๋งž๊ฒŒ ์„ค์ • (7.x ์ด์ƒ์˜ ๊ฒฝ์šฐ ๋น„์›Œ๋‘๊ฑฐ๋‚˜ _doc ์‚ฌ์šฉ)
  2. UpdateAttribute ํ”„๋กœ์„ธ์„œ ์ถ”๊ฐ€ EvaluateJsonPath์™€ PutElasticsearchHTTP ์‚ฌ์ด์— UpdateAttribute ํ”„๋กœ์„ธ์„œ๋ฅผ ์ถ”๊ฐ€ํ•˜์—ฌ ID ๊ฐ’์„ ๋ช…์‹œ์ ์œผ๋กœ ์„ค์ •ํ–ˆ์Šต๋‹ˆ๋‹ค:

    • ์ƒˆ ํ”„๋กœ์„ธ์„œ: UpdateAttribute

    • ์†์„ฑ ์„ค์ •:

      • ์†์„ฑ๋ช…: elasticsearch.id
      • ๊ฐ’: ${id} (EvaluateJsonPath์—์„œ ์ถ”์ถœํ•œ id ์†์„ฑ ์‚ฌ์šฉ)
  3. PutElasticsearchHTTP ํ”„๋กœ์„ธ์„œ ์„ค์ • ์ถ”๊ฐ€ ์ˆ˜์ • UpdateAttribute ์ถ”๊ฐ€ ํ›„, PutElasticsearchHTTP ํ”„๋กœ์„ธ์„œ์˜ ์„ค์ •์„ ๋‹ค์‹œ ์ˆ˜์ •ํ–ˆ์Šต๋‹ˆ๋‹ค:

    • Identifier Attribute: elasticsearch.id (UpdateAttribute์—์„œ ์„ค์ •ํ•œ ์†์„ฑ๋ช…์œผ๋กœ ๋ณ€๊ฒฝ)
  4. ๋””๋ฒ„๊น…์„ ์œ„ํ•œ LogAttribute ํ™œ์šฉ ๋ฌธ์ œ ํ•ด๊ฒฐ ๊ณผ์ •์—์„œ ๋‹ค์Œ๊ณผ ๊ฐ™์ด LogAttribute ํ”„๋กœ์„ธ์„œ๋ฅผ ํ™œ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ ํ๋ฆ„์„ ๋ชจ๋‹ˆํ„ฐ๋งํ–ˆ์Šต๋‹ˆ๋‹ค:

    • LogAttribute ์„ค์ •:

      • Log Level: INFO ๋˜๋Š” DEBUG
      • Log Payload: true (FlowFile ๋‚ด์šฉ์„ ํ•จ๊ป˜ ๋กœ๊น…)
      • Attributes to Log: all attributes (๋ชจ๋“  ์†์„ฑ ๋กœ๊น…)
๊ฒฐ๊ณผ

Elasticsearch์— ์ •์ƒ์ ์œผ๋กœ ์ €์žฅ๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์—ˆ์Šต๋‹ˆ๋‹ค.

app-py3.12 {seilylook} ๐Ÿš€ curl -X GET "localhost:9200/_cat/indices?v"

health status index                            uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .geoip_databases                 qhv1z7QnTAit-MJzq4nVxw   1   0         39            0     36.7mb         36.7mb
green  open   .kibana_task_manager_7.17.28_001 PkOLULcPR02nuAckO0GZQg   1   0         17          228      141kb          141kb
green  open   .apm-custom-link                 bCszQPeqTm6YGadNEOdDbw   1   0          0            0       227b           227b
green  open   .kibana_7.17.28_001              frGA6utWSFSA-o3uovgPBw   1   0         11            0      2.3mb          2.3mb
yellow open   scf                              L7BtsSUNQXK1FxOEmM_tPA   1   1       5000            0      1.7mb          1.7mb
green  open   .apm-agent-configuration         EKRPRn9aRbepTGm81kxlXQ   1   0          0            0       227b           227b
green  open   users                            hFnaau-_TtaDclMcC5Ocpw   1   0          0            0       227b           227b

Chapter 12. Building a Kafka Cluster

In stream processing, the data may be inifinite and incomplete at the time of a query. One of the leading tools in handling streaming data is Apache Kafka. Kafka is a tool that allows you to send dat in real time to topics. These topics can be read by consumers who process the data.

Creating zookeeper and Kafka clusters

services:
  # Zookeeper service
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    volumes:
      - zookeeper-data:/var/lib/zookeeper/data
      - zookeeper-log:/var/lib/zookeeper/log
      - ./logs/zookeeper:/var/log/zookeeper
    healthcheck:
      test: ["CMD", "nc", "-z", "localhost", "2181"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    networks:
      - data-platform
  # Kafka Broker 1
  kafka1:
    image: confluentinc/cp-kafka:7.4.0
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
    depends_on:
      zookeeper:
        condition: service_healthy
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_LOG4J_ROOT_LOGLEVEL: INFO
      KAFKA_TOOLS_LOG4J_LOGLEVEL: INFO
    volumes:
      - kafka1-data:/var/lib/kafka/data
      - ./logs/kafka1:/var/log/kafka
    healthcheck:
      test: ["CMD", "kafka-topics", "--bootstrap-server", "localhost:9092", "--list"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 45s
    networks:
      - data-platform

  # Kafka Broker 2
  kafka2:
    image: confluentinc/cp-kafka:7.4.0
    container_name: kafka2
    ports:
      - "9093:9093"
      - "29093:29093"
    depends_on:
      zookeeper:
        condition: service_healthy
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9093,PLAINTEXT_HOST://localhost:29093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_LOG4J_ROOT_LOGLEVEL: INFO
      KAFKA_TOOLS_LOG4J_LOGLEVEL: INFO
    volumes:
      - kafka2-data:/var/lib/kafka/data
      - ./logs/kafka2:/var/log/kafka
    healthcheck:
      test: ["CMD", "kafka-topics", "--bootstrap-server", "localhost:9093", "--list"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 45s
    networks:
      - data-platform

  # Kafka Broker 3
  kafka3:
    image: confluentinc/cp-kafka:7.4.0
    container_name: kafka3
    ports:
      - "9094:9094"
      - "29094:29094"
    depends_on:
      zookeeper:
        condition: service_healthy
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9094,PLAINTEXT_HOST://localhost:29094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_LOG4J_ROOT_LOGLEVEL: INFO
      KAFKA_TOOLS_LOG4J_LOGLEVEL: INFO
    volumes:
      - kafka3-data:/var/lib/kafka/data
      - ./logs/kafka3:/var/log/kafka
    healthcheck:
      test: ["CMD", "kafka-topics", "--bootstrap-server", "localhost:9094", "--list"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 45s
    networks:
      - data-platform

  # Kafka-UI
  kafka-ui:
    image: provectuslabs/kafka-ui:v0.7.2
    container_name: kafka-ui
    ports:
      - "8989:8080"
    depends_on:
      - kafka1
      - kafka2
      - kafka3
    environment:
      KAFKA_CLUSTERS_0_NAME: data-platform-cluster
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:9092,kafka2:9093,kafka3:9094
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_METRICS_PORT: 9997
    restart: always
    networks:
      - data-platform

Testing the kafka cluster

  1. Topics ํ…Œ์ŠคํŠธ
docker exec -i -t kafka1 bash

kafka-topics --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --create --topic {TOPIC_NAME} --partitions 3 --replication-factor 3

Created topic dataengineering.

  1. Topics ํ™•์ธ
kafka-topics --bootstrap-server kafka1:9092 --list

dataengineering

kafka-topics --bootstrap-server kafka1:9092 --describe --topic dataengineering

Topic: dataengineering  TopicId: FhQDsGqqQVaJARfp--T_tw PartitionCount: 3       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: dataengineering  Partition: 0    Leader: 3       Replicas: 3,2,1 Isr: 3,2,1
        Topic: dataengineering  Partition: 1    Leader: 1       Replicas: 1,3,2 Isr: 1,3,2
        Topic: dataengineering  Partition: 2    Leader: 2       Replicas: 2,1,3 Isr: 2,1,3
  1. Messages ํ…Œ์ŠคํŠธ
kafka-console-producer --bootstrap-server kafka1:9092 kafka2:9093 kafka3:9094 --topic dataengineering

> ์•ˆ๋…•ํ•˜์„ธ์š” ๋ฉ”์‹œ์ง€ 1์ž…๋‹ˆ๋‹ค.
> ์•ˆ๋…•ํ•˜์„ธ์š” ๋ฉ”์‹œ์ง€ 2์ž…๋‹ˆ๋‹ค.
> {"name": "ํ…Œ์ŠคํŠธ", "value": 123}
  1. Read Message

์ƒˆ๋กœ์šด ํ„ฐ๋ฏธ๋„ ์—ด๊ธฐ

kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic dataengineering --from-beginning

์•ˆ๋…•ํ•˜์„ธ์š” ๋ฉ”์‹œ๏ฟฝ์ง€ 1์ž…๋‹ˆ๋‹ค.
์•ˆ๋…•ํ•˜์„ธ์š” ๋ฉ”์‹œ์ง€ 2์ž…๋‹ˆ๋‹ค.
{"๏ฟฝname": "ํ…Œ์ŠคํŠธ", "value": 123}

Processed a total of 4 messages

Producing and consuming with python

Writing a kafka producer in Python

app-py3.12 {seilylook} ๐Ÿ’ก  ๎‚ฐ ~/Development/Book/Data_Engineering_with_Python/app ๎‚ฐ ๎‚  main ยฑ ๎‚ฐ make start
=========================
Starting the application...
=========================
python -m src.main
2025-03-20 21:36:56,208 - root - INFO - ๋ฐ์ดํ„ฐ์…‹์ด ์ด๋ฏธ ์กด์žฌํ•ฉ๋‹ˆ๋‹ค: data/raw/test_data.csv
2025-03-20 21:36:56,208 - root - INFO - ========================
2025-03-20 21:36:56,208 - root - INFO - Kafka Topic & Message ์ƒ์„ฑ
2025-03-20 21:36:56,208 - root - INFO - ========================
2025-03-20 21:36:56,227 - src.services.data_streaming - INFO - Kafka ํด๋Ÿฌ์Šคํ„ฐ ์—ฐ๊ฒฐ ๋ฐ ํ† ํ”ฝ ํ™•์ธ ์ค‘...
2025-03-20 21:36:56,255 - src.services.data_streaming - WARNING - 'users' ํ† ํ”ฝ์ด ์กด์žฌํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ์ž๋™ ์ƒ์„ฑ๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
2025-03-20 21:36:56,255 - src.services.data_streaming - INFO - 'data/raw/test_data.csv' ํŒŒ์ผ ์ฒ˜๋ฆฌ ์‹œ์ž‘
2025-03-20 21:36:56,261 - src.services.data_streaming - INFO - 100๊ฐœ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์ค‘...
2025-03-20 21:36:56,264 - src.services.data_streaming - INFO - 200๊ฐœ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์ค‘...
2025-03-20 21:36:56,267 - src.services.data_streaming - INFO - 300๊ฐœ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์ค‘...
2025-03-20 21:36:56,270 - src.services.data_streaming - INFO - 400๊ฐœ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์ค‘...
2025-03-20 21:36:56,272 - src.services.data_streaming - INFO - 500๊ฐœ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์ค‘...
2025-03-20 21:36:56,275 - src.services.data_streaming - INFO - 600๊ฐœ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์ค‘...
2025-03-20 21:36:56,277 - src.services.data_streaming - INFO - 700๊ฐœ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์ค‘...
2025-03-20 21:36:56,280 - src.services.data_streaming - INFO - 800๊ฐœ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์ค‘...
2025-03-20 21:36:56,282 - src.services.data_streaming - INFO - 900๊ฐœ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์ค‘...
2025-03-20 21:36:56,285 - src.services.data_streaming - INFO - 1000๊ฐœ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์ค‘...
2025-03-20 21:36:58,524 - src.services.data_streaming - INFO - ์ด 1000๊ฐœ ๋ฉ”์‹œ์ง€๊ฐ€ 'users' ํ† ํ”ฝ์œผ๋กœ ์ „์†ก๋˜์—ˆ์Šต๋‹ˆ๋‹ค.
๋ฌธ์„ธ ์ƒํ™ฉ
  1. DNS ํ•ด์„ ์‹คํŒจ
%3|1742473184.651|FAIL|csv-producer#producer-1| [thrd:kafka2:9093/bootstrap]: kafka2:9093/bootstrap: Failed to resolve 'kafka2:9093': nodename nor servname provided, or not known (after 3ms in state CONNECT, 1 identical error(s) suppressed)
  1. ์—ฐ๊ฒฐ ์‹œ๊ฐ„ ์ดˆ๊ณผ
%4|1742473953.566|FAIL|csv-producer#producer-1| [thrd:172.18.0.7:9093/bootstrap]: 172.18.0.7:9093/bootstrap: Connection setup timed out in state CONNECT (after 30030ms in state CONNECT)
%4|1742473954.564|FAIL|csv-producer#producer-1| [thrd:172.18.0.6:9092/bootstrap]: 172.18.0.6:9092/bootstrap: Connection setup timed out in state CONNECT (after 30028ms in state CONNECT)
%4|1742473955.569|FAIL|csv-producer#producer-1| [thrd:172.18.0.8:9094/bootstrap]: 172.18.0.8:9094/bootstrap: Connection setup timed out in state CONNECT (after 30029ms in state CONNECT)
๋ฌธ์ œ ์›์ธ
  1. kafka ์ด์ค‘ ๋ฆฌ์Šค๋„ˆ ์„ค์ •

kafka๋Š” Docker ์ปจํ…Œ์ด๋„ˆ ํ™˜๊ฒฝ์—์„œ ๋‘ ๊ฐ€์ง€ ๋ฆฌ์Šค๋„ˆ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

  • ๋‚ด๋ถ€ ํ†ต์‹ ์šฉ ๋ฆฌ์Šค๋„ˆ: PLAINTEXT://kafka1:9092

    • ์ปจํ‹ฐ์—๋„ˆ ๊ฐ„ ๋‚ด๋ถ€ ํ†ต์‹ ์— ์‚ฌ์šฉ๋จ

    • Docker ๋‚ด๋ถ€ DNS๋กœ ํ•ด์„๋˜์–ด์•ผ ํ•จ

  • ์™ธ๋ถ€ ์ ‘๊ทผ์šฉ ๋ฆฌ์Šค๋„ˆ: PLAINTEXT_HOST://localhost:29092

    • ํ˜ธ์ŠคํŠธ ๋จธ์‹ ์—์„œ ์ ‘๊ทผํ•  ๋•Œ ์‚ฌ์šฉ๋จ

    • ์™ธ๋ถ€๋กœ ํฌํŠธ๊ฐ€ ๋…ธ์ถœ๋จ

  1. ๊ตฌ์ฒด์ ์ธ ์˜ค๋ฅ˜ ์›์ธ
  • DNS ํ•ด์„ ์‹คํŒจ

    • ํด๋ผ์ด์–ธํŠธ๊ฐ€ kafka1:9092, kafka2:9093, kafka3:9094์™€ ๊ฐ™์€ ํ˜ธ์ŠคํŠธ๋ช…์„ IP์ฃผ์†Œ๋กœ ํ•ด์„ํ•˜์ง€ ๋ชปํ•จ

    • ์ด๋Š” Docker ๋„คํŠธ์›Œํฌ ์™ธ๋ถ€์—์„œ ์ ‘๊ทผํ•˜๊ฑฐ๋‚˜, DNS ์„ค์ •์ด ์ œ๋Œ€๋กœ ๋˜์ง€ ์•Š์€ ๊ฒฝ์šฐ ๋ฐœ์ƒ.

  • ์—ฐ๊ฒฐ ์‹œ๊ฐ„ ์ดˆ๊ณผ

    • IP ์ฃผ์†Œ๋Š” ํ•ด์„๋˜์—ˆ์œผ๋‚˜ ์‹ค์ œ TCP ์—ฐ๊ฒฐ์ด ์ด๋ฃจ์–ด์ง€์ง€ ์•Š์Œ

    • ์ด๋Š” ๋ณดํ†ต ๋ฐฉํ™”๋ฒฝ ๋ฌธ์ œ, ๋„คํŠธ์›Œํฌ ๋ถ„๋ฆฌ, ๋˜๋Š” Kafka ์„ค์ • ๋ฌธ์ œ๋กœ ๋ฐœ์ƒ

ํ•ด๊ฒฐ ๊ณผ์ •
  1. ์ ‘๊ทผ ๋ฐฉ์‹ ๋ณ€๊ฒฝ: ๋‚ด๋ถ€ ํฌํŠธ์—์„œ ์™ธ๋ถ€ ํฌํŠธ๋กœ

kafka ๋ธŒ๋กœ์ปค์˜ ์™ธ๋ถ€ ๋…ธ์ถœ ํฌํŠธ(29092, 29093, 29094)๋ฅผ ์‚ฌ์šฉํ•˜๋„๋ก ๋ณ€๊ฒฝ

# ๋ณ€๊ฒฝ ์ „
bootstrap_servers = "kafka1:9092,kafka2:9093,kafka3:9094"

# ๋ณ€๊ฒฝ ํ›„
bootstrap_servers = "localhost:29092,localhost:29093,localhost:29094"
  1. ํ•ด๊ฒฐ ์›๋ฆฌ
  • ๋‚ด๋ถ€ ํฌํŠธ(9092, 9093, 9094):

    • Docker ๋„คํŠธ์›Œํฌ ๋‚ด์—์„œ๋งŒ ์ ‘๊ทผ ๊ฐ€๋Šฅ

    • ์ปจํ…Œ์ด๋„ˆ ๊ฐ„ ์ง์ ‘ ํ†ต์‹ ์— ์‚ฌ์šฉ

  • ์™ธ๋ถ€ ํฌํŠธ(29092, 29093, 29094):

    • ํ˜ธ์ŠคํŠธ ๋จธ์‹ ์„ ํ†ตํ•ด ์ ‘๊ทผ

    • Docker ์ปจํ…Œ์ด๋„ˆ ์™ธ๋ถ€์—์„œ๋„ ์ ‘๊ทผ ๊ฐ€๋Šฅ

    • localhost๋กœ ๋ผ์šฐํŒ…๋จ

  1. Docker compose์—์„œ์˜ kafka ์„ค์ • ํ™•์ธ
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
  • ADVERTISED_LISTENERS: Kafka๊ฐ€ ํด๋ผ์ด์–ธํŠธ์—๊ฒŒ ์•Œ๋ ค์ฃผ๋Š” ์—ฐ๊ฒฐ ์ •๋ณด
  • LISTENER_SECURITY_PROTOCOL_MAP: ๊ฐ ๋ฆฌ์Šค๋„ˆ์˜ ๋ณด์•ˆ ํ”„๋กœํ† ์ฝœ ์ง€์ •
  • INTER_BROKER_LISTENER_NAME: ๋ธŒ๋กœ์ปค ๊ฐ„ ํ†ต์‹ ์— ์‚ฌ์šฉํ•  ๋ฆฌ์Šค๋„ˆ ์ง€์ •
๊ฒฐ๋ก 
  • ๊ฐ™์€ Docker ๋„คํŠธ์›Œํฌ ๋‚ด: ์„œ๋น„์Šค ์ด๋ฆ„๊ณผ ๋‚ด๋ถ€ ํฌํŠธ (kafka1:9092)

  • ์™ธ๋ถ€ ๋˜๋Š” ๋‹ค๋ฅธ ๋„คํŠธ์›Œํฌ: localhost์™€ ์™ธ๋ถ€ ํฌํŠธ (localhost:29092)

Writing a kafka consumer in Python

app-py3.12 โœ˜ {seilylook} ๐Ÿ’ก  ๎‚ฐ ~/Development/Book/Data_Engineering_with_Python/app ๎‚ฐ ๎‚  main ยฑ ๎‚ฐ make start
=========================
Starting the application...
=========================
python -m src.main
2025-03-20 23:02:45,286 - root - INFO - ๋ฐ์ดํ„ฐ์…‹์ด ์ด๋ฏธ ์กด์žฌํ•ฉ๋‹ˆ๋‹ค: data/raw/test_data.csv
2025-03-20 23:02:45,286 - root - INFO - ========================
2025-03-20 23:02:45,286 - root - INFO - Kafka Topic & Message ์ƒ์„ฑ
2025-03-20 23:02:45,286 - root - INFO - ========================
2025-03-20 23:02:45,303 - src.services.data_streaming - INFO - ํ† ํ”ฝ ๊ตฌ๋… ์‹œ์ž‘: users
2025-03-20 23:02:45,303 - src.services.data_streaming - INFO - ๋ฉ”์‹œ์ง€ ์†Œ๋น„ ์‹œ์ž‘...
2025-03-20 23:02:49,507 - root - INFO - Received: {'name': 'Kristina Parker', 'age': 68, 'street': '34674 Miller Overpass', 'city': 'Randallfurt', 'state': 'Maryland', 'zip': 40293, 'lng': 161.665903, 'lat': -87.125185}
2025-03-20 23:02:49,507 - root - INFO - Received: {'name': 'Johnathan Lawson', 'age': 19, 'street': '95990 Williams Shore Apt. 829', 'city': 'Webbside', 'state': 'Maine', 'zip': 15543, 'lng': 146.494403, 'lat': -73.700935}
2025-03-20 23:02:49,507 - root - INFO - Received: {'name': 'Rose Carpenter', 'age': 68, 'street': '444 Joseph Station', 'city': 'Pattersonside', 'state': 'New Mexico', 'zip': 79242, 'lng': 0.048327, 'lat': 74.385104}
2025-03-20 23:02:49,507 - root - INFO - Received: {'name': 'Kimberly Santiago', 'age': 39, 'street': '7635 Peterson Spur Apt. 396', 'city': 'Tinaborough', 'state': 'Nevada', 'zip': 66267, 'lng': -38.278099, 'lat': -36.354147}
2025-03-20 23:02:49,507 - root - INFO - Received: {'name': 'Wendy Murphy', 'age': 75, 'street': '35166 Ashlee Mills', 'city': 'Lawsonview', 'state': 'Massachusetts', 'zip': 30520, 'lng': -137.345477, 'lat': 35.262674}
2025-03-20 23:02:49,507 - root - INFO - Received: {'name': 'Michael Lin', 'age': 18, 'street': '13086 Hall Pass', 'city': 'East Jay', 'state': 'New York', 'zip': 49686, 'lng': -52.411619, 'lat': -5.883704}
2025-03-20 23:02:49,507 - root - INFO - Received: {'name': 'Wesley Watts', 'age': 61, 'street': '4541 Roth Brook Apt. 538', 'city': 'Hensleyland', 'state': 'Maine', 'zip': 70629, 'lng': 137.051209, 'lat': -35.1061065}
2025-03-20 23:02:49,507 - root - INFO - Received: {'name': 'Dennis Wolfe', 'age': 37, 'street': '474 Jones Plaza', 'city': 'Wardville', 'state': 'Minnesota', 'zip': 70795, 'lng': 19.632934, 'lat': -81.602252}
2025-03-20 23:02:49,508 - root - INFO - Received: {'name': 'Sharon Chandler', 'age': 21, 'street': '696 Michael Valleys Apt. 412', 'city': 'Lauraton', 'state': 'New Jersey', 'zip': 19419, 'lng': 14.510882, 'lat': 65.1203075}
2025-03-20 23:02:49,508 - root - INFO - Received: {'name': 'Amanda Mcmahon', 'age': 34, 'street': '96470 Cobb Hollow', 'city': 'Albertberg', 'state': 'Louisiana', 'zip': 22483, 'lng': -8.723311, 'lat': 27.196991}
2025-03-20 23:02:49,508 - root - INFO - Received: {'name': 'Peter Nguyen', 'age': 68, 'street': '15478 Dylan Crescent', 'city': 'North Katrinashire', 'state': 'New Jersey', 'zip': 96223, 'lng': 26.947073, 'lat': -9.097944}
2025-03-20 23:02:49,508 - root - INFO - Received: {'name': 'Matthew Robbins', 'age': 43, 'street': '4211 Brittany Field Suite 605', 'city': 'South Rebeccaborough', 'state': 'Delaware', 'zip': 19879, 'lng': 100.065663, 'lat': 54.933101}
2025-03-20 23:02:49,508 - root - INFO - Received: {'name': 'Michael Wilcox', 'age': 33, 'street': '018 Leon Alley', 'city': 'Johnmouth', 'state': 'New Mexico', 'zip': 73338, 'lng': -19.245506, 'lat': 26.5704125}
2025-03-20 23:02:49,508 - root - INFO - Received: {'name': 'Amanda Williams', 'age': 75, 'street': '44981 Rebecca Bypass', 'city': 'North Joseph', 'state': 'South Carolina', 'zip': 66529, 'lng': -24.771468, 'lat': 14.545032}

Releases

No releases published

Packages

No packages published