Đang tải...

Vượt Giới Hạn Payload 2MB Của Temporal Bằng In-Memory Batch Chunking

25/05/2026
7 phút đọc
Vượt Giới Hạn Payload 2MB Của Temporal Bằng In-Memory Batch Chunking
📋 Bối Cảnh Nghiệp Vụ Đây không phải là luồng "tạo khách hàng mới" — người dùng vẫn được đăng ký realtime qua app/web như bình thường. Vấn đề nằm ở...

📋 Bối Cảnh Nghiệp Vụ Đây không phải là luồng "tạo khách hàng mới" — người dùng vẫn được đăng ký realtime qua app/web như bình thường. Vấn đề nằm ở chỗ sau khi chuyển đổi từ một nền tảng thứ ba (WebEngage), chúng tôi phát hiện hàng chục nghìn khách hàng hiện hữu bị thiếu sự kiện REGISTER trong event store mới, khiến hệ thống loyalty xử lý như thể họ chưa từng đăng ký. Đội ngũ Ops cần một giải pháp để backfill hàng loạt các event bị thiếu này từ một file CSV chứa các số điện thoại bị ảnh hưởng.

🔥 Vấn đề: Chúng tôi đã xây dựng pipeline backfill này để xử lý file CSV với hơn 50.000 số điện thoại. Ban đầu, Temporal Workflow phân tích file và cố gắng truyền toàn bộ danh sách vào một Activity worker duy nhất trong một lần gọi. Ngay lập tức, pipeline bị crash với lỗi vượt quá kích thước payload gRPC từ Temporal Server.

🔍 Nguyên nhân cốt lõi: Temporal áp dụng giới hạn mặc định 2MB payload cho tất cả các tham số input/output truyền qua gRPC. Giới hạn này được thiết kế có chủ ý: nó giúp tối giản lịch sử thực thi workflow, giảm thiểu chi phí mạng, và ngăn chặn worker bị tràn bộ nhớ (heap memory) trong quá trình replay trạng thái. Việc cố gắng truyền một mảng dữ liệu khổng lồ trong một cuộc gọi RPC duy nhất là một anti-pattern nghiêm trọng trong hệ thống phân tán.

✅ Cách khắc phục: Thay vì nhồi nhét toàn bộ mảng dữ liệu khổng lồ trực tiếp vào Activity, chúng tôi đã triển khai kỹ thuật in-memory batch chunking (chia lô trong bộ nhớ) ngay bên trong logic của Workflow. Chúng tôi chia nhỏ danh sách số điện thoại thành các batch nhỏ, nhẹ (100 item mỗi batch), và gọi tuần tự đến Activity worker.

Dưới đây là cách chúng tôi triển khai logic chunking một cách an toàn và tối ưu:

package com.example.temporal;

import com.google.common.collect.Lists;
import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.Workflow;
import java.time.Duration;
import java.util.List;

public class CsvImportWorkflowImpl implements CsvImportWorkflow {

 // Sử dụng Logger an toàn của Temporal để tránh log trùng lặp khi replay
 private static final org.slf4j.Logger log = Workflow.getLogger(CsvImportWorkflowImpl.class);

 private final CsvImportActivities activities = Workflow.newActivityStub(
 CsvImportActivities.class,
 ActivityOptions.newBuilder()
 .setStartToCloseTimeout(Duration.ofSeconds(60))
 .build());

 @Override
 public void importCsv(List<String> phoneNumbers) {
 log.info("Bắt đầu workflow Import CSV. Tổng số bản ghi: {}", phoneNumbers.size());
 
 // Chia collection lớn thành các lô nhỏ sử dụng thư viện Guava
 List<List<String>> batches = Lists.partition(phoneNumbers, 100);
 
 for (List<String> batch : batches) {
 log.info("Đang import batch có kích thước: {}", batch.size());
 activities.importBatch(batch); // Lệnh gọi gRPC an toàn (< 10 KB)
 }
 
 activities.notifyTelegram("Quá trình import dữ liệu hoàn tất thành công.");
 }
}

💡 Bài Học Rút Ra (Takeaway):

Không bao giờ coi các tham số Activity như một bộ đệm nhớ cục bộ (local memory buffer). Luôn giữ các payload gRPC ở mức nhẹ nhất (lý tưởng là dưới 100KB) để đảm bảo thực thi hiệu suất cao. Tận dụng chunking trong bộ nhớ bên trong Workflow để chia nhỏ các giao dịch số lượng lớn. Điều này giúp giữ payload mạng nhỏ gọn và thiết lập các ranh giới rollback rõ ràng khi có lỗi xảy ra. ⚠️ Các Giới Hạn Đã Biết (Known Limitations) Cách tiếp cận chunking trong bộ nhớ này hoạt động tốt ở quy mô hiện tại của chúng tôi (~50K số điện thoại ≈ 600KB), nhưng nó có hai giới hạn kiến trúc mà bạn cần lưu ý trước khi áp dụng vào hệ thống của mình:

  1. Input của Workflow cũng bị giới hạn bởi payload limit Giới hạn 2MB của gRPC áp dụng cho cả input của Workflow, không chỉ riêng Activity. Trong triển khai của chúng tôi, toàn bộ danh sách số điện thoại được truyền làm tham số Workflow qua WorkflowClient.start(workflow::importCsv, phoneNumbers). Việc chia nhỏ (chunking) bên trong Workflow chỉ giảm kích thước của mỗi lần gọi Activity — nhưng payload ban đầu kích hoạt Workflow vẫn là toàn bộ collection. Khi quy mô vượt ~150K+ bản ghi, nó sẽ vượt quá giới hạn và Workflow sẽ crash trước khi bất kỳ logic chunking nào kịp thực thi.

Cách khắc phục chuẩn Production:

Upload file CSV lên object storage (S3, GCS, hoặc MinIO), sau đó chỉ truyền tham chiếu file (ví dụ: S3 URI) cho Workflow. Activity đầu tiên sẽ tải xuống và phân tích file, rồi trả về danh sách ID để Workflow thực hiện phân lô.

❌ Hiện tại: Truyền toàn bộ danh sách làm input của Workflow WorkflowClient.start(workflow::importCsv, phoneNumbers);

✅ Tối ưu: Chỉ truyền đường dẫn tham chiếu, để Activity tự lấy dữ liệu WorkflowClient.start(workflow::importCsv, "s3://bucket/imports/job-123.csv");

  1. Chưa có ContinueAsNew cho history lớn Với 50K item và batch size là 100, Workflow tạo ra ~500 lần gọi Activity. Mỗi Activity sinh ra ít nhất 3 event (ActivityTaskScheduled, ActivityTaskStarted, ActivityTaskCompleted), tổng cộng ~1.500 event trong lịch sử thực thi Workflow. Temporal khuyến nghị giữ history dưới 50K event (và đưa ra cảnh báo ở mức 10K). Với dataset lớn hơn đáng kể, bạn nên gọi Workflow.continueAsNew() sau mỗi N batch để reset lịch sử và tránh giảm hiệu năng khi Worker replay.
List<List<String>> batches = Lists.partition(phoneNumbers, 100);
int batchCount = 0;

for (int i = 0; i < batches.size(); i++) {
 activities.importBatch(batches.get(i));
 batchCount++;

 // Reset history sau mỗi 200 batch để tránh phình to lịch sử
 if (batchCount % 200 == 0 && i < batches.size() - 1) {
 // Truyền các phần tử còn lại cho lần chạy workflow tiếp theo
 List<String> remaining = phoneNumbers.subList((i + 1) * 100, phoneNumbers.size());
 Workflow.continueAsNew(remaining);
 }
}

Bạn đã bao giờ đụng phải giới hạn payload của Temporal trong môi trường production chưa? Cách bạn xử lý việc xử lý hàng loạt (batch processing) ở quy mô lớn trong các durable workflow là gì — dùng object storage, Child Workflows, hay một phương pháp nào khác? Hãy cùng thảo luận dưới phần bình luận nhé!

📚 Nguồn: Viblo

Bình luận

0 bình luận

Email không hiển thị công khai.

Chưa có bình luận nào. Hãy là người đầu tiên bình luận.

Chia sẻ bài viết

Cần tư vấn?

Liên hệ với chúng tôi để được hỗ trợ

Liên hệ ngay

Bài viết liên quan

Proxy hoạt động ở tầng nào trong mô hình TCP/IP? HTTP Proxy Và SOCKS5 nằm ở đâu?
09/06/2026

Proxy hoạt động ở tầng nào trong mô hình TCP/IP? HTTP Proxy Và SOCKS5 nằm ở đâu?

Proxy hoạt động ở tầng nào? Sau khi đã đi qua các tầng mạng như Physical Layer, Data Link Layer, Internet Layer, Transport Layer và Application Layer, ta có thể nhìn Proxy rõ ...

Đọc thêm
Red Team RAG: Khi mỗi pipeline là một đường hầm tối – Phần 2: Đầu độc dòng chảy – Từ ingestion đến sụp đổ
09/06/2026

Red Team RAG: Khi mỗi pipeline là một đường hầm tối – Phần 2: Đầu độc dòng chảy – Từ ingestion đến sụp đổ

## Lời mở đầu: Bạn đã vào hầm. Bây giờ, hãy đầu độc dòng nước. Ở phần 1, chúng ta đã đứng trước **cửa hầm**, học cách đọc bản đồ pipeline RAG, v...

Đọc thêm
Vì sao giá trị truyền thống luôn được đặt lên hàng đầu
09/06/2026

Vì sao giá trị truyền thống luôn được đặt lên hàng đầu

Giá trị truyền thống không chỉ là yếu tố mang tính hoài niệm, mà còn đóng vai trò nền tảng trong việc định hình bản sắc và chiều sâu của một công trình ...

Đọc thêm

Bắt đầu dự án của bạn

Hãy để Flash Dev đồng hành cùng bạn

Liên hệ ngay