Micronaut HTTP 服務器支持使用事件 API 發(fā)出服務器發(fā)送事件 (SSE)。
要從服務器發(fā)出事件,請返回一個發(fā)出事件類型對象的 Reactive Streams Publisher。
發(fā)布者本身可以通過事件系統(tǒng)等從后臺任務發(fā)布事件。
想象一個新聞標題的事件流;你可以定義一個數(shù)據(jù)類如下:
Headline
Java |
Groovy |
Kotlin |
public class Headline {
private String title;
private String description;
public Headline() {}
public Headline(String title, String description) {
this.title = title;
this.description = description;
}
public String getTitle() {
return title;
}
public String getDescription() {
return description;
}
public void setTitle(String title) {
this.title = title;
}
public void setDescription(String description) {
this.description = description;
}
}
|
class Headline {
String title
String description
Headline() {}
Headline(String title, String description) {
this.title = title;
this.description = description;
}
}
|
class Headline {
var title: String? = null
var description: String? = null
constructor()
constructor(title: String, description: String) {
this.title = title
this.description = description
}
}
|
要發(fā)出新聞標題事件,請使用您喜歡的任何 Reactive 庫編寫一個控制器,該控制器返回一個事件發(fā)布者實例。下面的示例通過 generate 方法使用 Project Reactor 的 Flux:
從控制器發(fā)布服務器發(fā)送的事件
Java |
Groovy |
Kotlin |
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.sse.Event;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@Controller("/headlines")
public class HeadlineController {
@ExecuteOn(TaskExecutors.IO)
@Get(produces = MediaType.TEXT_EVENT_STREAM)
public Publisher<Event<Headline>> index() { // (1)
String[] versions = {"1.0", "2.0"}; // (2)
return Flux.generate(() -> 0, (i, emitter) -> { // (3)
if (i < versions.length) {
emitter.next( // (4)
Event.of(new Headline("Micronaut " + versions[i] + " Released", "Come and get it"))
);
} else {
emitter.complete(); // (5)
}
return ++i;
});
}
}
|
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.sse.Event
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
@Controller("/headlines")
class HeadlineController {
@ExecuteOn(TaskExecutors.IO)
@Get(produces = MediaType.TEXT_EVENT_STREAM)
Publisher<Event<Headline>> index() { // (1)
String[] versions = ["1.0", "2.0"] // (2)
Flux.generate(() -> 0, (i, emitter) -> {
if (i < versions.length) {
emitter.next( // (4)
Event.of(new Headline("Micronaut ${versions[i]} Released", "Come and get it"))
)
} else {
emitter.complete() // (5)
}
return i + 1
})
}
}
|
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.sse.Event
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
import java.util.concurrent.Callable
import java.util.function.BiFunction
@Controller("/headlines")
class HeadlineController {
@ExecuteOn(TaskExecutors.IO)
@Get(produces = [MediaType.TEXT_EVENT_STREAM])
fun index(): Publisher<Event<Headline>> { // (1)
val versions = arrayOf("1.0", "2.0") // (2)
return Flux.generate(
{ 0 },
BiFunction { i: Int, emitter: SynchronousSink<Event<Headline>> -> // (3)
if (i < versions.size) {
emitter.next( // (4)
Event.of(
Headline(
"Micronaut " + versions[i] + " Released", "Come and get it"
)
)
)
} else {
emitter.complete() // (5)
}
return@BiFunction i + 1
})
}
}
|
控制器方法返回事件的發(fā)布者
為每個版本的 Micronaut 發(fā)出一個標題
Flux 類型的 generate 方法生成一個 Publisher。 generate 方法接受初始值和接受該值的 lambda 以及發(fā)射器。請注意,此示例與控制器操作在同一線程上執(zhí)行,但您可以使用 subscribeOn 或映射現(xiàn)有的“熱”Flux。
Emitter 接口 onNext 方法發(fā)出 Event 類型的對象。 Event.of(ET) 工廠方法構造事件。
Emitter 接口的 onComplete 方法指示何時完成發(fā)送服務器發(fā)送的事件。
您通常希望在單獨的執(zhí)行程序上安排 SSE 事件流。前面的示例使用@ExecuteOn 在 I/O 執(zhí)行器上執(zhí)行流。
上面的示例發(fā)回一個文本/事件流類型的響應,對于每個發(fā)出的事件,之前的標題類型將被轉換為 JSON,從而產生如下響應:
服務器發(fā)送的事件響應輸出
data: {"title":"Micronaut 1.0 Released","description":"Come and get it"}
data: {"title":"Micronaut 2.0 Released","description":"Come and get it"}
可以使用Event接口的方法自定義發(fā)回的Server Sent Event數(shù)據(jù),包括關聯(lián)事件id、注釋、重試超時時間等。
更多建議: