Thứ năm, 05/04/2018 | 00:00 GMT+7

Cách đồng bộ hóa dữ liệu đã chuyển đổi từ MongoDB sang Elasticsearch với Transporter trên Ubuntu 16.04

Transporter là một công cụ open-souce để di chuyển dữ liệu qua các repodata khác nhau. Các nhà phát triển thường viết kịch bản một lần cho các việc như di chuyển dữ liệu qua database , di chuyển dữ liệu từ file sang database hoặc ngược lại, nhưng sử dụng một công cụ như Transporter có một số lợi thế.

Trong Transporter, bạn xây dựng các đường ống dẫn , xác định stream dữ liệu từ một nguồn (nơi dữ liệu được đọc) đến phần chìm (nơi dữ liệu được ghi). Nguồn và phần chìm có thể là database SQL hoặc NoSQL, file phẳng hoặc các tài nguyên khác. Transporter sử dụng bộ điều hợp , là phần mở rộng có thể cắm được, để giao tiếp với các tài nguyên này và dự án bao gồm một số bộ điều hợp cho database phổ biến theo mặc định.

Ngoài dữ liệu di chuyển, Transporter cũng cho phép bạn thay đổi dữ liệu khi nó di chuyển qua đường ống sử dụng máy biến áp . Giống như bộ điều hợp, có một số máy biến áp được bao gồm theo mặc định. Bạn cũng có thể viết biến thế của riêng mình để tùy chỉnh sửa đổi dữ liệu .

Trong hướng dẫn này, ta sẽ giới thiệu cho các bạn một ví dụ về việc di chuyển và xử lý dữ liệu từ database MongoDB sang Elasticsearch bằng cách sử dụng các bộ điều hợp tích hợp của Transporter và một máy biến áp tùy chỉnh được viết bằng JavaScript.

Yêu cầu

Để làm theo hướng dẫn này, bạn cần :

Đường ống vận chuyển được viết bằng JavaScript. Bạn sẽ không cần bất kỳ kiến thức hoặc kinh nghiệm nào về JavaScript trước để làm theo hướng dẫn này, nhưng bạn có thể tìm hiểu thêm trong các hướng dẫn JavaScript này .

Bước 1 - Cài đặt Trình vận chuyển

Transporter cung cấp mã binary cho hầu hết các hệ điều hành phổ biến. Quá trình cài đặt Ubuntu bao gồm hai bước: download bản binary Linux và làm cho nó có thể thực thi được.

Đầu tiên, hãy lấy liên kết cho version mới nhất từ trang phát hành mới nhất của Transporter trên GitHub . Sao chép liên kết kết thúc bằng -linux-amd64 . Hướng dẫn này sử dụng v0.5.2, version mới nhất tại thời điểm viết bài.

Tải file binary vào folder chính của bạn.

  • cd
  • wget https://github.com/compose/transporter/releases/download/v0.5.2/transporter-0.5.2-linux-amd64

Di chuyển nó vào /usr/local/bin hoặc folder cài đặt bạn muốn .

  • mv transporter-*-linux-amd64 /usr/local/bin/transporter

Sau đó, làm cho nó có thể thực thi để bạn có thể chạy nó.

  • chmod +x /usr/local/bin/transporter

Bạn có thể kiểm tra xem Transporter có được cài đặt chính xác hay không bằng cách chạy file binary .

  • transporter

Bạn sẽ thấy kết quả trợ giúp sử dụng và số version :

Output
USAGE transporter <command> [flags] COMMANDS run run pipeline loaded from a file . . . VERSION 0.5.2

Để sử dụng Transporter để di chuyển dữ liệu từ MongoDB sang Elasticsearch, ta cần hai thứ: dữ liệu trong MongoDB mà ta muốn di chuyển và một đường ống cho Transporter biết cách di chuyển nó. Bước tiếp theo tạo một số dữ liệu ví dụ, nhưng nếu bạn đã có database MongoDB mà bạn muốn di chuyển, bạn có thể bỏ qua bước tiếp theo và chuyển thẳng sang Bước 3.

Bước 2 - Thêm dữ liệu mẫu vào MongoDB (Tùy chọn)

Trong bước này, ta sẽ tạo một database mẫu với một bộ sưu tập duy nhất trong MongoDB và thêm một vài tài liệu vào bộ sưu tập đó. Sau đó, trong phần còn lại của hướng dẫn, ta sẽ di chuyển và chuyển đổi dữ liệu ví dụ này với một đường ống Transporter.

Đầu tiên, kết nối với database MongoDB của bạn.

  • mongo

Điều này sẽ thay đổi dấu nhắc của bạn thành mongo> , cho biết rằng bạn đang sử dụng MongoDB shell.

Từ đây, chọn một database để làm việc. Ta sẽ gọi my_application ta .

  • use my_application

Trong MongoDB , bạn không cần phải tạo database hoặc tập hợp một cách rõ ràng. Khi bạn bắt đầu thêm dữ liệu vào database mà bạn đã chọn theo tên, database đó sẽ tự động được tạo.

Vì vậy, để tạo database my_application , hãy lưu hai tài liệu vào bộ sưu tập users của nó: một tài liệu đại diện cho Sammy Shark và một tài liệu đại diện cho Gilly Glowfish. Đây sẽ là dữ liệu thử nghiệm của ta .

  • db.users.save({"firstName": "Sammy", "lastName": "Shark"});
  • db.users.save({"firstName": "Gilly", "lastName": "Glowfish"});

Sau khi thêm tài liệu, bạn có thể truy vấn bộ sưu tập users để xem profile của bạn .

  • db.users.find().pretty();

Đầu ra sẽ giống với kết quả bên dưới, nhưng các cột _id sẽ khác. MongoDB tự động thêm ID đối tượng để xác định duy nhất các tài liệu trong một bộ sưu tập.

output
{ "_id" : ObjectId("59299ac7f80b31254a916456"), "firstName" : "Sammy", "lastName" : "Shark" } { "_id" : ObjectId("59299ac7f80b31254a916457"), "firstName" : "Gilly", "lastName" : "Glowfish" }

Nhấn CTRL+C để thoát khỏi shell MongoDB.

Tiếp theo, hãy tạo một đường ống Transporter để di chuyển dữ liệu này từ MongoDB sang Elasticsearch.

Bước 3 - Tạo một đường ống cơ bản

Đường ống trong Transporter được xác định bởi file JavaScript có tên là pipeline.js theo mặc định. Lệnh init tạo một tệp cấu hình cơ bản trong đúng folder , được cung cấp nguồn và tệp chìm.

Khởi tạo một pipeline.js khởi động.js với MongoDB làm nguồn và Elasticsearch làm phần chìm.

  • transporter init mongodb elasticsearch

Bạn sẽ thấy kết quả sau:

Output
Writing pipeline.js...

Bạn sẽ không cần phải sửa đổi pipeline.js cho bước này, nhưng hãy xem nó hoạt động như thế nào.

Tệp trông giống như thế này, nhưng bạn cũng có thể xem nội dung của file bằng lệnh cat pipeline.js , less pipeline.js (thoát less bằng cách nhấn q ) hoặc bằng cách mở nó bằng editor yêu thích của bạn.

pipe.js
var source = mongodb({   "uri": "${MONGODB_URI}"   // "timeout": "30s",   // "tail": false,   // "ssl": false,   // "cacerts": ["/path/to/cert.pem"],   // "wc": 1,   // "fsync": false,   // "bulk": false,   // "collection_filters": "{}",   // "read_preference": "Primary" })  var sink = elasticsearch({   "uri": "${ELASTICSEARCH_URI}"   // "timeout": "10s", // defaults to 30s   // "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service   // "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service   // "parent_id": "elastic_parent" // defaults to "elastic_parent" parent identifier for Elasticsearch })  t.Source("source", source, "/.*/").Save("sink", sink, "/.*/") 

Các dòng bắt đầu bằng var sourcevar sink xác định các biến JavaScript cho các bộ điều hợp MongoDB và Elasticsearch tương ứng. Ta sẽ xác định MONGODB_URIELASTICSEARCH_URI biến môi trường mà các adapter cần sau này trong bước này.

Các dòng bắt đầu bằng // là comment . Chúng nêu bật một số tùy chọn cấu hình phổ biến mà bạn có thể đặt cho đường dẫn của bạn , nhưng ta không sử dụng chúng cho đường dẫn cơ bản mà ta đang tạo ở đây.

Dòng cuối cùng nối nguồn và bồn rửa. Người transporter biến hoặc t cho phép ta truy cập vào đường ống của ta . Ta sử dụng .Source().Save() chức năng để thêm nguồn và chìm bằng cách sử dụng sourcesink biến xác định trước đó trong file.

Đối số thứ ba cho các hàm Source()Save()namespace. Chuyển /.*/ làm đối số cuối cùng nghĩa là ta muốn chuyển tất cả dữ liệu từ MongoDB và lưu nó dưới cùng một không gian tên trong Elasticsearch.

Trước khi có thể chạy đường ống này, ta cần đặt các biến môi trường cho URI MongoDBURI Elasticsearch . Trong ví dụ mà ta đang sử dụng, cả hai đều được lưu trữ local với cài đặt mặc định, nhưng hãy đảm bảo bạn tùy chỉnh các tùy chọn này nếu bạn đang sử dụng các version MongoDB hoặc Elasticsearch hiện có.

  • export MONGODB_URI='mongodb://localhost/my_application'
  • export ELASTICSEARCH_URI='http://localhost:9200/my_application'

Bây giờ ta đã sẵn sàng để chạy đường ống.

  • transporter run pipeline.js

Bạn sẽ thấy kết quả kết thúc như thế này:

Output
. . . INFO[0001] metrics source records: 2 path=source ts=1522942118483391242 INFO[0001] metrics source/sink records: 2 path="source/sink" ts=1522942118483395960 INFO[0001] exit map[source:mongodb sink:elasticsearch] ts=1522942118483396878

Trong dòng thứ hai và thứ ba đến dòng cuối cùng, kết quả này cho biết rằng có 2 bản ghi hiện diện trong nguồn và 2 bản ghi đã được chuyển đến bồn rửa.

Để xác nhận cả hai bản ghi đã được xử lý, bạn có thể truy vấn Elasticsearch để biết nội dung của database my_application , hiện sẽ tồn tại.

  • curl $ELASTICSEARCH_URI/_search?pretty=true

Tham số ?pretty=true giúp kết quả dễ đọc hơn:

Output
{ "took" : 5, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e9c6687d9f638ced4fe", "_score" : 1.0, "_source" : { "firstName" : "Gilly", "lastName" : "Glowfish" } }, { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e986687d9f638ced4fd", "_score" : 1.0, "_source" : { "firstName" : "Sammy", "lastName" : "Shark" } } ] } }

Database và bộ sưu tập trong MongoDB tương tự như các index và loại trong Elasticsearch. Với suy nghĩ đó, bạn sẽ thấy:

  • Trường _index được đặt thành my_application, tên của database MongoDB ban đầu).
  • Trường _type được đặt cho users, tên của bộ sưu tập MongoDB.
  • Các firstNamelastName lĩnh vực điền với “Sammy” “Shark” và “Gilly” “Glowfish”, tương ứng.

Điều này xác nhận cả hai bản ghi từ MongoDB đã được xử lý thành công thông qua Transporter và được tải vào Elasticsearch. Để xây dựng dựa trên đường dẫn cơ bản này, ta sẽ thêm một bước xử lý trung gian có thể biến đổi dữ liệu đầu vào.

Bước 4 - Tạo máy biến áp

Như tên cho thấy, máy biến áp sửa đổi dữ liệu nguồn trước khi tải nó vào bồn rửa. Ví dụ: chúng cho phép bạn thêm trường mới, xóa trường hoặc thay đổi dữ liệu của trường. Transporter đi kèm với một số máy biến áp được định nghĩa cũng như hỗ trợ cho các máy biến áp tùy chỉnh.

Thông thường, máy biến áp tùy chỉnh được viết dưới dạng hàm JavaScript và được lưu trong một file riêng biệt. Để sử dụng chúng, bạn thêm một tham chiếu đến file biến áp trong pipeline.js Transporter bao gồm cả công cụ JavaScript Otto và Goja. Vì Goja mới hơn và thường nhanh hơn, ta sẽ sử dụng nó ở đây. Sự khác biệt về chức năng duy nhất là cú pháp.

Tạo một file có tên là transform.js , ta sẽ sử dụng file này để viết hàm chuyển đổi của bạn .

  • nano transform.js

Đây là hàm ta sẽ sử dụng, sẽ tạo ra một trường mới có tên là fullName , giá trị của nó sẽ là trường firstNamelastName nối với nhau, được phân tách bằng dấu cách (như Sammy Shark ).

biến đổi.js
function transform(msg) {     msg.data.fullName = msg.data.firstName + " " + msg.data.lastName;     return msg } 

Hãy xem qua các dòng của file này:

  • Dòng đầu tiên của file , function transform(msg),định nghĩa hàm .
  • msg là một đối tượng JavaScript chứa các chi tiết của tài liệu nguồn. Ta sử dụng đối tượng này để truy cập dữ liệu đi qua đường ống.
  • Dòng đầu tiên của hàm nối hai trường hiện có và gán giá trị đó cho trường fullName mới.
  • Dòng cuối cùng của hàm trả về đối tượng msg mới được sửa đổi để phần còn lại của đường dẫn sử dụng.

Lưu và đóng file .

Tiếp theo, ta cần sửa đổi đường ống để sử dụng máy biến áp này. Mở file pipeline.js để chỉnh sửa.

  • nano pipeline.js

Trong dòng cuối cùng, ta cần thêm một lệnh gọi vào hàm Transform() để thêm biến áp vào đường ống giữa các lệnh gọi đến Source()Save() , như sau:

~ / transporter / pipe.js
. . . t.Source("source", source, "/.*/") .Transform(goja({"filename": "transform.js"})) .Save("sink", sink, "/.*/") 

Đối số được truyền cho Transform() là kiểu biến đổi, trong trường hợp này là Goja. Sử dụng hàm goja , ta chỉ định tên file của máy biến áp bằng đường dẫn tương đối của nó.

Lưu và đóng file . Trước khi ta chạy lại đường ống để kiểm tra máy biến áp, hãy xóa dữ liệu hiện có trong Elasticsearch từ thử nghiệm trước.

  • curl -XDELETE $ELASTICSEARCH_URI

Bạn sẽ thấy kết quả này thông báo sự thành công của lệnh.

Output
{"acknowledged":true}

Bây giờ chạy lại đường ống.

  • transporter run pipeline.js

Đầu ra sẽ trông rất giống với thử nghiệm trước đó và bạn có thể xem trong vài dòng cuối cùng liệu đường ống có hoàn thành như trước hay không. Để chắc chắn, ta có thể kiểm tra lại Elasticsearch để xem dữ liệu có tồn tại ở định dạng mà ta mong đợi hay không.

  • curl $ELASTICSEARCH_URI/_search?pretty=true

Bạn có thể thấy trường fullName trong kết quả mới:

Output
{ "took" : 9, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e9c6687d9f638ced4fe", "_score" : 1.0, "_source" : { "firstName" : "Gilly", "fullName" : "Gilly Glowfish", "lastName" : "Glowfish" } }, { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e986687d9f638ced4fd", "_score" : 1.0, "_source" : { "firstName" : "Sammy", "fullName" : "Sammy Shark", "lastName" : "Shark" } } ] } }

Lưu ý trường fullName đã được thêm vào cả hai tài liệu với các giá trị được đặt chính xác. Với điều này, bây giờ ta biết cách thêm các phép biến đổi tùy chỉnh vào đường ống Transporter.

Kết luận

Bạn đã xây dựng một đường ống vận chuyển cơ bản với một máy biến áp để sao chép và sửa đổi dữ liệu từ MongoDB sang Elasticsearch. Bạn có thể áp dụng các phép biến đổi phức tạp hơn theo cùng một cách, chuỗi nhiều phép biến đổi trong cùng một đường dẫn và hơn thế nữa. MongoDB và Elasticsearch chỉ là hai trong số các bộ điều hợp Transporter hỗ trợ. Nó cũng hỗ trợ các file phẳng, database SQL như Postgres và nhiều nguồn dữ liệu khác.

Bạn có thể xem dự án Transporter trên GitHub để được cập nhật những thay đổi mới nhất trong API và truy cập wiki Transporter để biết thêm thông tin chi tiết về cách sử dụng bộ điều hợp, máy biến áp và các tính năng khác của Máy biến áp.


Tags:

Các tin liên quan

Cách triển khai trang web Jekyll bằng Git Hooks trên Ubuntu 16.04
2018-03-29
Cách chặn nỗ lực đăng nhập SSH không mong muốn với PyFilter trên Ubuntu 16.04
2018-03-27
Cách tự động triển khai ứng dụng Laravel với Trình triển khai trên Ubuntu 16.04
2018-03-23
Cách thiết lập trang web phát triển Jekyll trên Ubuntu 16.04
2018-03-20
Cách cài đặt Ruby on Rails với rbenv trên Ubuntu 16.04
2018-03-15
Cách cài đặt Node.js trên Ubuntu 16.04
2018-03-07
Cách cài đặt và bảo mật Memcached trên Ubuntu 16.04
2018-03-06
Cách cài đặt Buildbot trên Ubuntu 16.04
2018-03-06
Cách quản lý an toàn bí mật với HashiCorp Vault trên Ubuntu 16.04
2018-02-28
Cách bảo mật Roundcube trên Ubuntu 16.04
2018-02-24