Background #
IIoT(工业物联网)架构通常是分布式和异步的,通信由事件驱动,如消息的发布(和相应的订阅)。这些异步架构提高了可扩展性和对变化的耐受性,但也引发了互操作性问题,因为架构各元素之间对消息内部结构及其分类(主题)的明确知识被稀释了。
事实上,这也是 REST 应用程序接口面临的一个问题,直到业界联合起来,提出了一种定义同步应用程序接口结构和模式的标准方法: OpenAPI(源自 Swagger)。
Introduction #
对于异步架构,受 OpenAPI 的启发,AsyncAPI 的出现解决了这一问题:
AsyncAPI 提供了一种规范,允许您以机器可读的格式定义消息驱动的 API。它与协议无关,因此可以用于通过 Kafka、MQTT、AMQP、WebSockets、STOMP 等工作的 API。该规范与 OpenAPI/Swagger 非常相似,所以如果你熟悉它,AsyncAPI 对你来说应该很容易。
在 AsyncAPI 中,API 的规格可以用 YAML 或 JSON 定义,例如可以指定消息代理、感兴趣的主题或与每个主题相关的不同消息格式等。不过,AsyncAPI 还处于开发的早期阶段,AsyncAPI 工具市场还不发达,主要局限于生成供人类使用的文档。
AsyncAPI 最初的贡献就是上图中展示的方法。
AsyncAPI Toolkit #
如上图所示,AsyncAPI 团队扩展了这一初始框架。基于 AsyncAPI 规范在 Xtext 中开发 AsyncAPI JSON 语法的,该语法可验证符合 AsyncAPI 规范的消息驱动 API 定义。同样,根据该语法,Xtext 会自动生成相应的 AsyncAPI 元模型和所有工具(带内容辅助功能的编辑器、解析器等),以便轻松创建 AsyncAPI JSON 定义并将其转换为符合 AsyncAPI 元模型的 AsyncAPI 模型。
有了 AsyncAPI 元模型和作为符合模型的应用程序接口规范,就可以通过执行 M2T 转换(生成内部 DSL)来继续工作流程。目前, AsyncAPI Toolkit 支持 Java 语言,并生成一个库,通过提供流畅的 API 来协助开发人员创建、发布和接收格式良好的消息。
值得注意的是,由于这些架构都是基于 message 的,因此数据建模起着至关重要的作用。因此,我们在上述工作流程中使用了另一种(图形化)具体语法,重点是对要交换的消息进行建模。这可用于引导 AsyncAPI JSON 定义,随后可对其进行手动完善。
Importing / Modeling an AsyncAPI 规范 #
首先,基于 AsyncAPI 规范,我们创建了一个 Xtext 语法。根据该语法,我们自动生成了一个 Ecore metamodel,以及一套编辑器和基于 Eclipse 的工具。这些编辑器允许使用 AsyncAPI 创建基于 JSON 的消息驱动 API 规范。使用这些编辑器创建的规范会被自动解析并重新整合为 AsyncAPI 元模型的实例。
生成代码,轻松处理 AsyncAPI 规范中的信息 #
此外,原型还能生成 Java 代码,支持根据建模的 AsyncAPI(包括嵌套 JSON 对象)创建和序列化基于 JSON 的消息有效载荷。但目前还不支持数组。下面的节选显示了原型支持的 AsyncAPI 规范示例:
{
"asyncapi": "1.2.0",
"info": {
"title": "Sample AsyncAPI specification",
"version": "0.1.0",
},
"servers": [
{
"url": "broker.url:{port}",
"scheme": "mqtt",
"description": "This is an example description",
"variables": {
"port": {
"default": "1883",
"enum": [ "1883", "8883" ]
}
}
}
],
"topics": {
"messages/device2controller": {
"publish": { "$ref" : "#/components/messages/request" }
}
}
},
"components": {
"schemas": {
"protocol_version": {
"title": "Protocol version",
"type": "integer",
"default": 2,
"x-friendly-name": "ProtocolVersion"
},
"id": {
"title": "ID",
"type": "string",
"format": "XXXXXX YY ZZZZZZ W"
},
"status": {
"title": "Status",
"type": "string",
"enum": ["OK", "ERROR"],
"x-friendly-name" : "Status"
},
"environment": {
"title": "Environment",
"type": "string",
"enum": ["DEV", "STAG","PROD" ],
"x-friendly-name" : "Environment"
}
},
"messages" : {
"request" : {
"summary" : "Request connectivity.",
"description": "Request connectivity when status changes",
"payload": {
"type": "object",
"properties": {
"P": { "$ref": "#/components/schemas/protocol_version" },
"ID": { "$ref": "#/components/schemas/id" },
"E": { "$ref": "#/components/schemas/environment" },
"M": {
"x-friendly-name" : "Message",
"properties": {
"S": { "$ref": "#/components/schemas/status" },
"C": {
"title": "Content",
"type": "string",
"x-friendly-name": "Content"
}
}
}
}
}
}
}
}
根据上述规范,可以生成如下信息:
package tests;
import messages.device2controller.Request;
import messages.device2controller.Request.Payload.Environment;
import messages.device2controller.Request.Payload.Message;
import messages.device2controller.Request.Payload.PayloadBuilder;
import messages.device2controller.Request.Payload.Message.Status;
public class Test {
public static void main(String[] args) {
PayloadBuilder builder = Request.payloadBuilder();
Request.Payload payload = builder
.withProtocolVersion(2)
.withEnvironment(Environment.DEV)
.withID("id")
.withMessage(
Message.newBuilder()
.withStatus(Status.OK)
.withContent("Content")
.build()
).build();
System.out.println(payload.toJson(true));
System.out.println(Request.Payload.fromJson(payload.toJson()).toJson(true));
}
}
从 Ecore 模型生成新的 AsyncAPI #
在此之前,我们假设您要么已经有一个 AsyncAPI 文件要导入,要么您将使用我们的 AsyncAPI 编辑器创建一个文件。事实上,还有第三种选择:使用现有的 Ecore 模型,并从中生成一个骨架 AsyncAPI 规范。
生成器将为每个领域类创建一个可重复使用的 JSON 模式。通道将由注释过的 EClasses 创建。此外,还可通过 EAnnotations 指定主机信息。
除了其局限性外,获得基于 JSON 的 Ecore 模型表示法还有几个优点:
- 允许开发人员和架构师创建一个可用的 AsyncAPI 定义,而无需深入了解规范,
- 同时保持建模环境的简单性和可管理性;
- 以及让不熟悉建模的人也能遵守 AsyncAPI 规范还能让有经验的开发人员和架构师完善和完成无法用 Ecore 轻松捕获的架构细节
为了在建议的开发工作流程中集成数据模型,定义了 Ecore 到 AsyncAPI 的模型到模型(M2M)和 AsyncAPI 到 JSON 的 M2T 转换。
例子
Resources #
- tutorial: https://modeling-languages.com/asyncapi-modeling-editor-code-generator/
- A model-based approach for developing event-driven architectures with AsyncAPI
- Model-driven development of asynchronous message-driven architectures with AsyncAPI
Grammar #
grammar io.github.abelgomez.asyncapi.AsyncApi hidden(WS)
generate asyncApi "http://io.github.abelgomez/asyncapi/AsyncApi"
import "http://www.eclipse.org/emf/2002/Ecore" as ecore
AsyncAPI:
{AsyncAPI} '{' (
( '"asyncapi"' ':' version=VersionNumber ','? )
& ( '"info"' ':' info=Info ','? )
& ( '"servers"' ':' '{' servers+=Server (',' servers+=Server)* '}' ','? )?
& ( '"channels"' ':' '{' channels+=Channel (',' channels+=Channel)* '}' ','? )?
& ( '"components"' ':' components=Components ','? )?
& ( '"x-sla"' ':' sla=Sla ','? )?
// & ( GenericJsonTuple ','? )*
) '}';
Info:
{Info} '{' (
( '"title"' ':' title=AnyString ','? )
& ( '"version"' ':' version=AnyString ','? )
& ( '"description"' ':' description=AnyString ','? )?
& ( '"termsOfService"' ':' termsOfService=AnyString ','? )?
& ( '"contact"' ':' contact=Contact ','? )?
& ( '"license"' ':' license=License ','? )?
& ( '"x-basePackage"' ':' basePackage=AnyString ','? )?
// & ( GenericJsonTuple ','? )*
) '}';
Contact:
{Contact} '{' (
( '"name"' ':' name=AnyString ','? )?
& ( '"url"' ':' url=AnyString ','? )?
& ( '"email"' ':' email=AnyString ','? )?
// & ( GenericJsonTuple ','? )*
) '}';
License:
{License} '{' (
( '"name"' ':' name=AnyString ','? )?
& ( '"url"' ':' url=AnyString ','? )?
// & ( GenericJsonTuple ','? )*
) '}';
Server:
{Server} name=AnyString ':' '{' (
( '"url"' ':' url=AnyString ','? )
& ( '"protocol"' ':' protocol=Protocol ','? )
& ( '"description"' ':' description=AnyString ','? )?
& ( '"variables"' ':' '{' variables+=Variable (',' variables+=Variable)* '}' ','? )?
& ( '"x-isMonitored"' ':' isMonitored=Boolean ','? )?
// & ( GenericJsonTuple ','? )*
) '}';
Variable:
{Variable} name=AnyString ':' '{' (
( '"description"' ':' description=AnyString ','? )?
& ( '"default"' ':' default=AnyString ','? )?
& ( '"enum"' ':' '[' ^enum+=AnyString (',' ^enum+=AnyString)* ']' ','? )?
// & ( GenericJsonTuple ','? )*
) '}';
Channel:
{Channel} name=AnyString ':' '{' (
( '"description"' ':' description=AnyString ','? )?
& ( '"publish"' ':' publish=Operation ','? )?
& ( '"subscribe"' ':' subscribe=Operation ','? )?
& ( '"parameters"' ':' '{' parameters+=NamedParameter (',' parameters+=NamedParameter)* '}' ','? )?
& ( '"x-title"' ':' title=AnyString ','? )?
// & ( GenericJsonTuple ','? )*
) '}';
Operation:
{Operation} '{' (
( '"operationId"' ':' operationId=AnyString ','? )?
& ( '"summary"' ':' summary=AnyString ','? )?
& ( '"description"' ':' description=AnyString ','? )?
& ( '"message"' ':' message=AbstractMessage ','? )?
& ( '"traits"' ':' '[' traits+=AbstractOperationTrait ( ',' traits+=AbstractOperationTrait )* ']' ','? )?
// & ( GenericJsonTuple ','? )*
) '}';
AbstractMessage:
Reference | Message;
Message:
{Message} '{' (
( '"name"' ':' name=AnyString ','? )?
& ( '"title"' ':' title=AnyString ','? )?
& ( '"summary"' ':' summary=AnyString ','? )?
& ( '"description"' ':' description=AnyString ','? )?
& ( '"deprecated"' ':' deprecated=Boolean ','? )?
& ( '"headers"' ':' headers=AbstractSchema ','? )?
& ( '"tags"' ':' '[' tags+=Tag ( ',' tags+=Tag )* ']' ','? )?
& ( '"payload"' ':' payload=AbstractSchema ','? )?
& ( '"traits"' ':' '[' traits+=AbstractMessageTrait ( ',' traits+=AbstractMessageTrait )* ']' ','? )?
& ( '"x-identifier"' ':' identifier=MessageIdentifier )?
// & ( GenericJsonTupleButRef ','? )*
) '}';
NamedMessage:
{NamedMessage} name=AnyString ':' message=AbstractMessage;
Tag:
{Tag} '{' (
('"name"' ':' name=AnyString ','?)?
& ('"description"' ':' description=AnyString ','?)?
// & ( GenericJsonTuple ','? )*
) '}';
AbstractSchema:
Reference | Schema;
Schema:
{Schema} '{' (
( '"title"' ':' title=AnyString ','? )?
& ( '"type"' ':' type=JsonType ','? )?
& ( '"description"' ':' description=AnyString ','? )?
& ( '"format"' ':' format=AnyString ','? )?
& ( '"minimum"' ':' minimum=INT ','? )?
& ( '"maximum"' ':' maximum=INT ','? )?
& ( '"minItems"' ':' minItems=INT ','? )?
& ( '"maxItems"' ':' maxItems=INT ','? )?
& ( '"default"' ':' default=PrimitiveValue','? )?
& ( '"properties"' ':' '{' properties+=NamedSchema (',' properties+=NamedSchema)* '}' ','? )?
& ( '"enum"' ':' '[' ^enum+=PrimitiveValue (',' ^enum+=PrimitiveValue)* ']' ','? )?
& ( '"items"' ':' items=AbstractSchema ','? )?
& ( '"required"' ':' '[' required+=AnyString (',' required+=AnyString)* ']' ','? )?
// & ( GenericJsonTupleButRef ','? )*
) '}';
NamedSchema:
{NamedSchema} name=AnyString ':' schema=AbstractSchema;
AbstractParameter:
Reference | Parameter;
Parameter:
{Parameter} '{' (
( '"description"' ':' description=AnyString ','? )?
& ( '"schema"' ':' schema=AbstractSchema ','? )?
& ( '"location"' ':' location=AnyString ','? )?
// & ( GenericJsonTupleButRef ','? )*
) '}';
NamedParameter:
{NamedParameter} name=AnyString ':' parameter=AbstractParameter;
AbstractOperationTrait:
Reference | OperationTrait;
OperationTrait:
{OperationTrait} '{' (
( '"operationId"' ':' operationId=AnyString ','? )?
& ( '"summary"' ':' summary=AnyString ','? )?
& ( '"description"' ':' description=AnyString ','? )?
// & ( GenericJsonTupleButRef ','? )*
) '}';
NamedOperationTrait:
{NamedOperationTrait} name=AnyString ':' operationTrait=AbstractOperationTrait;
AbstractMessageTrait:
Reference | MessageTrait;
MessageTrait:
{MessageTrait} '{' (
( '"summary"' ':' summary=AnyString ','? )?
& ( '"description"' ':' description=AnyString ','? )?
& ( '"deprecated"' ':' deprecated=Boolean ','? )?
& ( '"headers"' ':' headers=AbstractSchema ','? )?
& ( '"tags"' ':' '[' tags+=Tag ( ',' tags+=Tag )* ']' ','? )?
// & ( GenericJsonTupleButRef ','? )*
) '}';
NamedMessageTrait:
{NamedMessageTrait} name=AnyString ':' messageTrait=AbstractMessageTrait;
Components:
{Components} '{' (
( '"schemas"' ':' '{' schemas+=NamedSchema (',' schemas+=NamedSchema)* '}' ','? )?
& ( '"messages"' ':' '{' messages+=NamedMessage (',' messages+=NamedMessage)* '}' ','? )?
& ( '"parameters"' ':' '{' parameters+=NamedParameter (',' parameters+=NamedParameter)* '}' ','? )?
& ( '"operationTraits"' ':' '{' operationTraits+=NamedOperationTrait (',' operationTraits+=NamedOperationTrait)* '}' ','? )?
& ( '"messageTraits"' ':' '{' messageTraits+=NamedMessageTrait (',' messageTraits+=NamedMessageTrait)* '}' ','? )?
& ( '"x-qosMetrics"' ':' '[' qosMetrics+=QoSMetric (',' qosMetrics+=QoSMetric)* ']' ','? )?
// & ( GenericJsonTupleButRef ','? )*
) '}';
Sla:
{Sla} '{' (
( '"guaranteeTerm"' ':' guaranteeTerm+=GuaranteeTerm (',' guaranteeTerm+=GuaranteeTerm)* )
) '}';
GuaranteeTerm:
{GuaranteeTerm} '{' (
( '"scopes"' ':' '{' scopes+=Scope (',' scopes+=Scope)* '}' ',' )
( '"qualifyingConditions"' ':' '{' qualifyingConditions+=QualifyingCondition (',' qualifyingConditions+=QualifyingCondition)* '}'',')?
( '"slos"' ':' '{' slos+=Slo (',' slos+=Slo)* '}')
) '}';
Scope:
{Scope}(
name=AnyString ':' reference= [Channel|AnyString]
);
QualifyingCondition:
{QualifyingCondition} name=AnyString ':' condition=BooleanExpression
;
Slo:
{Slo} name=AnyString ':' condition=BooleanExpression
;
AbstractQoSMetric:
QoSMetricReference | QoSMetric;
QoSMetricReference:
metric= [QoSMetric|AnyString]
;
QoSMetric:
('{'
(
( '"name"' ':' name=AnyString ','? )
& ( '"metricType"' ':' metricType=QoSMetricType ','? )
& ( '"description"' ':' description=AnyString ','? )?
& ( '"unit"' ':' unit=QoSMetricUnit ','? )
& ( '"groupedByEvent"' ':' groupedByEvent=Boolean ','? )
) (DerivedQoSMetric)? // Això està al final de tot, pq Xtext es queixa que no pot haver-hi una unasssigned rule dins d'una unordered list.
'}');
DerivedQoSMetric:
{DerivedQoSMetric}(
'"derivedQoSMetricDefinition"' ':' '{' (
('"window"' ':' window = AnyString ','? )
& ('"windowUnit"' ':' windowUnit = WindowUnit ','? )
& ('"aggregationFunction"' ':' aggregationFunction = AggregationFunction ','? )
) '}' )
;
BooleanExpression:
AndExpression | OrExpression | ComparisonExpression;
AndExpression:
{AndExpression} '{' '"AND"' ':' '[' conditions+=BooleanExpression (',' conditions+=BooleanExpression)* ']' '}' ;
OrExpression:
{OrExpression} '{' '"OR"' ':' '[' conditions+=BooleanExpression (',' conditions+=BooleanExpression)* ']' '}';
ComparisonExpression:
{ComparisonExpression}
'{'
'"qosMetric"' ':' qosMetric = AbstractQoSMetric ','
'"operator"' ':' operator = Operator ','
'"value"' ':' value = AnyString
'}'
;
Reference:
{Reference} '{' '"$ref"' ':' uri=AnyString '}';
//GenericJsonExpression:
// PrimitiveValue
// | GenericJsonObject
// | GenericJsonArray;
//
//GenericJsonObject:
// '{' '}' | '{' GenericJsonTuple (',' GenericJsonTuple)* '}';
//
//GenericJsonArray:
// '[' ']' | '[' GenericJsonExpression (',' GenericJsonExpression)* ']';
//
//GenericJsonTuple: AnyString ':' GenericJsonExpression;
//
//GenericJsonTupleButRef: AnyStringButRef ':' GenericJsonExpression;
enum WindowUnit:
seconds = '"seconds"'
| minutes = '"minutes"'
| hours = '"hours"'
| days = '"days"'
| messages = '"messages"'
;
enum AggregationFunction:
AVG = '"AVG"'
| MEDIAN = '"MEDIAN"'
| MAX = '"MAX"'
| MIN = '"MIN"'
;
enum QoSMetricType:
availability = '"availability"'
| bandwith = '"bandwith"'
| cpu = '"cpu"'
| capacity = '"capacity"'
| disaster = '"disaster"' | resiliance = '"resiliance"'
| discoverability = '"discoverability"'
| documentation = '"documentation"'
| exception_handling = '"exception_handling"'
| expected_failures = '"expected_failures"'
| failover = '"failover"'
| jitter = '"jitter"'
| latency = '"latency"'
| load_balancing = '"load_balancing"'
| maximum_throughput = '"maximum_throughput"'
| memory_aapacity = '"memory_aapacity"'
| packet_loss = '"packet_loss"'
| precision = '"precision"'
| probability_of_correctness = '"probability_of_correctness"'
| round_trip_time = '"round_trip_time"'
| throughput = '"throughput"'
| time_to_tail = '"time_to_tail"'
| time_to_tepair = '"time_to_tepair"'
| type_consistency = '"type_consistency"'
| uptime = '"uptime"'
| up_to_dateness = '"up-to-dateness"'
;
enum QoSMetricUnit:
milliseconds = '"milliseconds"'
| seconds = '"seconds"'
| minutes = '"minutes"'
| hours = '"hours"'
| null = '"null"'
;
enum Operator:
greater = '">"'
| greater_equal = '">="'
| equal = '"="'
| less_equal = '"<="'
| less = '"<"'
;
enum JsonType:
string = '"string"'
| number = '"number"'
| integer = '"integer"'
| boolean = '"boolean"'
| object = '"object"'
| array = '"array"'
| any = '"any"'
| null = '"null"';
enum Boolean:
_false = "false"
| _true = "true";
enum VersionNumber:
_200 = '"2.0.0"';
enum MessageIdentifier:
none ='"none"'
| generated = '"generated"'
| md5 = '"md5"'
| sha256 = '"sha-256"';
enum Protocol:
amqp = '"amqp"'
| amqps = '"amqps"'
| http = '"http"'
| https = '"https"'
| jms = '"jms"'
| kafka = '"kafka"'
| kafka_secure = '"kafka-secure"'
| mqtt = '"mqtt"'
| secure_mqtt = '"secure-mqtt"'
| ws = '"ws"'
| wss = '"wss"'
| stomp = '"stomp"'
| stomps = '"stomps"';
PrimitiveValue:
AnyString
| "true"
| "false"
| INT;
AnyStringButRef:
STRING
| Keyword;
AnyString:
STRING
| '"$ref"'
| Keyword;
terminal ID:
'^'?('a'..'z'|'A'..'Z'|'_') ('a'..'z'|'A'..'Z'|'_'|'0'..'9')*;
terminal INT returns ecore::EInt:
('0'..'9')+;
terminal STRING:
'"' ( '\\' . | !('\\'|'"') )* '"'
| "'" ( '\\' . | !('\\'|"'") )* "'";
terminal WS:
(' '|'\t'|'\r'|'\n')+;
Keyword:
'"2.0.0"'
| '"<"'
| '"<="'
| '"="'
| '">"'
| '">="'
| '"AND"'
| '"AVG"'
| '"MAX"'
| '"MEDIAN"'
| '"MIN"'
| '"OR"'
| '"aggregationFunction"'
| '"amqp"'
| '"amqps"'
| '"any"'
| '"array"'
| '"asyncapi"'
| '"availability"'
| '"bandwith"'
| '"boolean"'
| '"capacity"'
| '"channels"'
| '"components"'
| '"contact"'
| '"cpu"'
| '"dataType"'
| '"days"'
| '"default"'
| '"deprecated"'
| '"derivedQoSMetricDefinition"'
| '"description"'
| '"disaster"'
| '"discoverability"'
| '"documentation"'
| '"email"'
| '"enum"'
| '"exception_handling"'
| '"expected_failures"'
| '"failover"'
| '"format"'
| '"groupedByEvent"'
| '"guaranteeTerm"'
| '"headers"'
| '"hours"'
| '"http"'
| '"https"'
| '"info"'
| '"integer"'
| '"items"'
| '"jitter"'
| '"jms"'
| '"kafka"'
| '"kafka-secure"'
| '"latency"'
| '"license"'
| '"load_balancing"'
| '"location"'
| '"maxItems"'
| '"maximum"'
| '"maximum_throughput"'
| '"memory_aapacity"'
| '"message"'
| '"messageTraits"'
| '"messages"'
| '"metricType"'
| '"milliseconds"'
| '"minItems"'
| '"minimum"'
| '"minutes"'
| '"mqtt"'
| '"mqtts"'
| '"name"'
| '"null"'
| '"number"'
| '"object"'
| '"operationId"'
| '"operationTraits"'
| '"operator"'
| '"packet_loss"'
| '"parameters"'
| '"payload"'
| '"precision"'
| '"probability_of_correctness"'
| '"properties"'
| '"protocol"'
| '"publish"'
| '"qosMetric"'
| '"qualifyingConditions"'
| '"required"'
| '"resiliance"'
| '"round_trip_time"'
| '"schema"'
| '"schemas"'
| '"scopes"'
| '"seconds"'
| '"secure-mqtt"'
| '"servers"'
| '"slos"'
| '"stomp"'
| '"stomps"'
| '"string"'
| '"subscribe"'
| '"summary"'
| '"tags"'
| '"termsOfService"'
| '"throughput"'
| '"time_to_tail"'
| '"time_to_tepair"'
| '"title"'
| '"traits"'
| '"type"'
| '"type_consistency"'
| '"unit"'
| '"up-to-dateness"'
| '"uptime"'
| '"url"'
| '"value"'
| '"variables"'
| '"version"'
| '"window"'
| '"windowUnit"'
| '"ws"'
| '"wss"'
| '"x-basePackage"'
| '"x-qosMetrics"'
| '"x-sla"'
| '"x-title"';