mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-04-05 07:31:23 +08:00
Page:
Seaweed Message Queue
Pages
AWS CLI with SeaweedFS
AWS IAM CLI
Actual Users
Amazon IAM API
Amazon S3 API
Applications
Async Backup
Async Filer Metadata Backup
Async Replication to Cloud
Async Replication to another Filer
Benchmark SeaweedFS as a GlusterFS replacement
Benchmarks from jinleileiking
Benchmarks
Cache Remote Storage
Choosing a Filer Store
Client Libraries
Cloud Drive Architecture
Cloud Drive Benefits
Cloud Drive Quick Setup
Cloud Monitoring
Cloud Tier
Components
Configure Remote Storage
Customize Filer Store
Data Backup
Data Structure for Large Files
Deployment to Kubernetes and Minikube
Directories and Files
Docker Compose for S3
Docker Image Registry with SeaweedFS
Environment Variables
Erasure Coding for warm storage
Error reporting to sentry
FAQ
FIO benchmark
FUSE Mount
Failover Master Server
Filer Active Active cross cluster continuous synchronization
Filer Cassandra Setup
Filer Change Data Capture
Filer Commands and Operations
Filer Data Encryption
Filer JWT Use
Filer Metadata Events
Filer Redis Setup
Filer Server API
Filer Setup
Filer Store Replication
Filer Stores
Filer as a Key Large Value Store
Gateway to Remote Object Storage
Getting Started
HDFS via S3 connector
Hadoop Benchmark
Hadoop Compatible File System
Hardware
Hobbyest Tinkerer scale on premises tutorial
Home
Independent Benchmarks
Kubernetes Backups and Recovery with K8up
Large File Handling
Load Command Line Options from a file
Master Server API
Migrate to Filer Store
Mount Remote Storage
Optimization
Path Specific Configuration
Path Specific Filer Store
Production Setup
Replication
Run Blob Storage on Public Internet
Run Presto on SeaweedFS
S3 API Audit log
S3 API Benchmark
S3 API FAQ
S3 Bucket Quota
S3 Nginx Proxy
SRV Service Discovery
Seaweed Message Queue
SeaweedFS Java Client
SeaweedFS in Docker Swarm
Security Configuration
Security Overview
Server Startup Setup
Store file with a Time To Live
Super Large Directories
System Metrics
TensorFlow with SeaweedFS
Tiered Storage
UrBackup with SeaweedFS
Use Cases
Volume Files Structure
Volume Management
Volume Server API
WebDAV
Words from SeaweedFS Users
fstab
nodejs with Seaweed S3
rclone with SeaweedFS
restic with SeaweedFS
run HBase on SeaweedFS
run Spark on SeaweedFS
s3cmd with SeaweedFS
weed shell
7
Seaweed Message Queue
chrislu edited this page 2025-03-20 08:53:43 -07:00
Introduction
Seaweed Message Queue (SMQ) is a distributed messaging system built on top of SeaweedFS. It provides:
- Structured message : all messages need to have a schema
- Streamed publishing with async acknowledgements
- Messages are stored in Parquet files, for both streaming and batch reading
- Disaggregated storage
- Scalable stateless message brokers
Architecture
The system consists of three main components:
- Message Queue Agent: A gRPC server that provides a simplified interface for clients
- Message Queue Brokers: Stateless brokers that handle message routing and storage
- SeaweedFS: The underlying storage system that persists messages in Parquet format
Publishers => gRPC Publish APIs => Agent => Brokers => Agent => gRPC Subscribe APIs => Subscribers
^
|
v
SeaweedFS
The Agent can be run either on the server side, or as a sidecar on each client.
Features
Core Features
- Structured Messages: SeaweedFS is used to store unstructured data files, while Seaweed Message Queue is used to store structured messages
- Messages stored in SeaweedFS can be converted into Parquet files, saving disk space with more efficient columnar compression
- The messages in Parquet files can be streamed via Seaweed messaging brokers
- The Parquet files can be read in batches directly from SeaweedFS
Publishing Features
- Messages published successfully are acknowledged asynchronously
- Partition-based message routing
- Schema validation for message structure
Subscribing Features
- Message consume offsets are tracked and persisted on the server side
- Consumer APIs can process messages in parallel while still ensuring serial processing of messages with the same key
- Configurable sliding window for concurrent message processing
- Ability to start consuming from specific timestamps or offsets
Usage Examples
Starting the Services
- Start a Message Queue Broker:
weed mq.broker -port=17777 -master=localhost:9333
- Start a Message Queue Agent:
weed mq.agent -port=16777 -broker=localhost:17777
Defining Message Schema
Messages in SMQ must have a defined schema. Here's an example of defining a message type:
type MyRecord struct {
Key []byte
Field1 []byte
Field2 string
Field3 int32
Field4 int64
Field5 float32
Field6 float64
Field7 bool
}
func MyRecordType() *schema_pb.RecordType {
return schema.RecordTypeBegin().
WithField("key", schema.TypeBytes).
WithField("field1", schema.TypeBytes).
WithField("field2", schema.TypeString).
WithField("field3", schema.TypeInt32).
WithField("field4", schema.TypeInt64).
WithField("field5", schema.TypeFloat).
WithField("field6", schema.TypeDouble).
WithField("field7", schema.TypeBoolean).
RecordTypeEnd()
}
Publishing Messages
// Create a publish session
session, err := agent_client.NewPublishSession(
"localhost:16777", // agent address
schema.NewSchema("my_namespace", "my_topic", MyRecordType()),
6, // partition count
"publisher1", // client name
)
// Publish a message
myRecord := &MyRecord{
Key: []byte("key1"),
Field1: []byte("value1"),
Field2: "string value",
Field3: 123,
Field4: 456,
Field5: 1.23,
Field6: 4.56,
Field7: true,
}
err := session.PublishMessageRecord(myRecord.Key, myRecord.ToRecordValue())
Subscribing to Messages
// Create a subscribe session
session, err := agent_client.NewSubscribeSession(
"localhost:16777", // agent address
&agent_client.SubscribeOption{
ConsumerGroup: "my-group",
ConsumerGroupInstanceId: "consumer1",
Topic: topic.NewTopic("my_namespace", "topmy_topicic"),
OffsetType: schema_pb.OffsetType_RESUME_OR_EARLIEST,
MaxSubscribedPartitions: 3, // maximum number of partitions this consumer instance can subscribe
SlidingWindowSize: 16, // concurrently process up-to 16 messages with different message key
},
)
// Subscribe to messages
session.SubscribeMessageRecord(
func(key []byte, recordValue *schema_pb.RecordValue) {
record := FromRecordValue(recordValue)
fmt.Printf("Received: %+v\n", record)
},
func() {
fmt.Println("Subscription completed")
},
)
Configuration
Broker Configuration
-port
: gRPC server port (default: 17777)-master
: comma-separated master servers-filerGroup
: share metadata with other filers in the same group-dataCenter
: prefer volumes in this data center-rack
: prefer volumes in this rack
Agent Configuration
-port
: gRPC server port (default: 16777)-broker
: comma-separated message queue brokers
Introduction
API
Configuration
- Replication
- Store file with a Time To Live
- Failover Master Server
- Erasure coding for warm storage
- Server Startup Setup
- Environment Variables
Filer
- Filer Setup
- Directories and Files
- Data Structure for Large Files
- Filer Data Encryption
- Filer Commands and Operations
- Filer JWT Use
Filer Stores
- Filer Cassandra Setup
- Filer Redis Setup
- Super Large Directories
- Path-Specific Filer Store
- Choosing a Filer Store
- Customize Filer Store
Advanced Filer Configurations
- Migrate to Filer Store
- Add New Filer Store
- Filer Store Replication
- Filer Active Active cross cluster continuous synchronization
- Filer as a Key-Large-Value Store
- Path Specific Configuration
- Filer Change Data Capture
FUSE Mount
WebDAV
Cloud Drive
- Cloud Drive Benefits
- Cloud Drive Architecture
- Configure Remote Storage
- Mount Remote Storage
- Cache Remote Storage
- Cloud Drive Quick Setup
- Gateway to Remote Object Storage
AWS S3 API
- Amazon S3 API
- AWS CLI with SeaweedFS
- s3cmd with SeaweedFS
- rclone with SeaweedFS
- restic with SeaweedFS
- nodejs with Seaweed S3
- S3 API Benchmark
- S3 API FAQ
- S3 Bucket Quota
- S3 API Audit log
- S3 Nginx Proxy
- Docker Compose for S3
AWS IAM
Machine Learning
HDFS
- Hadoop Compatible File System
- run Spark on SeaweedFS
- run HBase on SeaweedFS
- run Presto on SeaweedFS
- Hadoop Benchmark
- HDFS via S3 connector
Replication and Backup
- Async Replication to another Filer [Deprecated]
- Async Backup
- Async Filer Metadata Backup
- Async Replication to Cloud [Deprecated]
- Kubernetes Backups and Recovery with K8up
Metadata Change Events
Messaging
Use Cases
Operations
Advanced
- Large File Handling
- Optimization
- Volume Management
- Tiered Storage
- Cloud Tier
- Cloud Monitoring
- Load Command Line Options from a file
- SRV Service Discovery
- Volume Files Structure