seaweedfs/weed/pb/mq_agent.proto
Chris Lu 02773a6107
Some checks are pending
go: build dev binaries / cleanup (push) Waiting to run
go: build dev binaries / build_dev_linux_windows (amd64, linux) (push) Blocked by required conditions
go: build dev binaries / build_dev_linux_windows (amd64, windows) (push) Blocked by required conditions
go: build dev binaries / build_dev_darwin (amd64, darwin) (push) Blocked by required conditions
go: build dev binaries / build_dev_darwin (arm64, darwin) (push) Blocked by required conditions
docker: build dev containers / build-dev-containers (push) Waiting to run
End to End / FUSE Mount (push) Waiting to run
go: build binary / Build (push) Waiting to run
Ceph S3 tests / Ceph S3 tests (push) Waiting to run
Accumulated changes for message queue (#6600)
* rename

* set agent address

* refactor

* add agent sub

* pub messages

* grpc new client

* can publish records via agent

* send init message with session id

* fmt

* check cancelled request while waiting

* use sessionId

* handle possible nil stream

* subscriber process messages

* separate debug port

* use atomic int64

* less logs

* minor

* skip io.EOF

* rename

* remove unused

* use saved offsets

* do not reuse session, since always session id is new after restart

remove last active ts from SessionEntry

* simplify printing

* purge unused

* just proxy the subscription, skipping the session step

* adjust offset types

* subscribe offset type and possible value

* start after the known tsns

* avoid wrongly set startPosition

* move

* remove

* refactor

* typo

* fix

* fix changed path
2025-03-09 23:49:42 -07:00

83 lines
2.3 KiB
Protocol Buffer

syntax = "proto3";
package messaging_pb;
import "mq_schema.proto";
option go_package = "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb";
option java_package = "seaweedfs.mq_agent";
option java_outer_classname = "MessageQueueAgentProto";
//////////////////////////////////////////////////
service SeaweedMessagingAgent {
// Publishing
rpc StartPublishSession (StartPublishSessionRequest) returns (StartPublishSessionResponse) {
}
rpc ClosePublishSession (ClosePublishSessionRequest) returns (ClosePublishSessionResponse) {
}
rpc PublishRecord (stream PublishRecordRequest) returns (stream PublishRecordResponse) {
}
// Subscribing
rpc SubscribeRecord (stream SubscribeRecordRequest) returns (stream SubscribeRecordResponse) {
}
}
//////////////////////////////////////////////////
message StartPublishSessionRequest {
schema_pb.Topic topic = 1;
int32 partition_count = 2;
schema_pb.RecordType record_type = 3;
string publisher_name = 4;
}
message StartPublishSessionResponse {
string error = 1;
int64 session_id = 2;
}
message ClosePublishSessionRequest {
int64 session_id = 1;
}
message ClosePublishSessionResponse {
string error = 1;
}
//////////////////////////////////////////////////
message PublishRecordRequest {
int64 session_id = 1; // session_id is required for the first record
bytes key = 2;
schema_pb.RecordValue value = 3;
}
message PublishRecordResponse {
int64 ack_sequence = 1;
string error = 2;
}
//////////////////////////////////////////////////
message SubscribeRecordRequest {
message InitSubscribeRecordRequest {
string consumer_group = 1;
string consumer_group_instance_id = 2;
schema_pb.Topic topic = 4;
repeated schema_pb.PartitionOffset partition_offsets = 5;
schema_pb.OffsetType offset_type = 6;
int64 offset_ts_ns = 7;
string filter = 10;
int32 max_subscribed_partitions = 11;
int32 sliding_window_size = 12;
}
InitSubscribeRecordRequest init = 1;
int64 ack_sequence = 2;
bytes ack_key = 3;
}
message SubscribeRecordResponse {
bytes key = 2;
schema_pb.RecordValue value = 3;
int64 ts_ns = 4;
string error = 5;
bool is_end_of_stream = 6;
bool is_end_of_topic = 7;
}
//////////////////////////////////////////////////