Micronaut 服務器發(fā)送事件

2023-03-07 14:38 更新

Micronaut HTTP 服務器支持使用事件 API 發(fā)出服務器發(fā)送事件 (SSE)。

要從服務器發(fā)出事件,請返回一個發(fā)出事件類型對象的 Reactive Streams Publisher。

發(fā)布者本身可以通過事件系統(tǒng)等從后臺任務發(fā)布事件。

想象一個新聞標題的事件流;你可以定義一個數(shù)據(jù)類如下:

Headline

 Java Groovy  Kotlin 
public class Headline {

    private String title;
    private String description;

    public Headline() {}

    public Headline(String title, String description) {
        this.title = title;
        this.description = description;
    }

    public String getTitle() {
        return title;
    }

    public String getDescription() {
        return description;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public void setDescription(String description) {
        this.description = description;
    }
}
class Headline {

    String title
    String description

    Headline() {}

    Headline(String title, String description) {
        this.title = title;
        this.description = description;
    }
}
class Headline {

    var title: String? = null
    var description: String? = null

    constructor()

    constructor(title: String, description: String) {
        this.title = title
        this.description = description
    }
}

要發(fā)出新聞標題事件,請使用您喜歡的任何 Reactive 庫編寫一個控制器,該控制器返回一個事件發(fā)布者實例。下面的示例通過 generate 方法使用 Project Reactor 的 Flux:

從控制器發(fā)布服務器發(fā)送的事件

 Java Groovy  Kotlin 
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.sse.Event;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

@Controller("/headlines")
public class HeadlineController {

    @ExecuteOn(TaskExecutors.IO)
    @Get(produces = MediaType.TEXT_EVENT_STREAM)
    public Publisher<Event<Headline>> index() { // (1)
        String[] versions = {"1.0", "2.0"}; // (2)
        return Flux.generate(() -> 0, (i, emitter) -> { // (3)
            if (i < versions.length) {
                emitter.next( // (4)
                    Event.of(new Headline("Micronaut " + versions[i] + " Released", "Come and get it"))
                );
            } else {
                emitter.complete(); // (5)
            }
            return ++i;
        });
    }
}
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.sse.Event
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux

@Controller("/headlines")
class HeadlineController {

    @ExecuteOn(TaskExecutors.IO)
    @Get(produces = MediaType.TEXT_EVENT_STREAM)
    Publisher<Event<Headline>> index() { // (1)
        String[] versions = ["1.0", "2.0"] // (2)
        Flux.generate(() -> 0, (i, emitter) -> {
            if (i < versions.length) {
                emitter.next( // (4)
                        Event.of(new Headline("Micronaut ${versions[i]} Released", "Come and get it"))
                )
            } else {
                emitter.complete() // (5)
            }
            return i + 1
        })
    }
}
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.sse.Event
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
import java.util.concurrent.Callable
import java.util.function.BiFunction


@Controller("/headlines")
class HeadlineController {

    @ExecuteOn(TaskExecutors.IO)
    @Get(produces = [MediaType.TEXT_EVENT_STREAM])
    fun index(): Publisher<Event<Headline>> { // (1)
        val versions = arrayOf("1.0", "2.0") // (2)
        return Flux.generate(
            { 0 },
            BiFunction { i: Int, emitter: SynchronousSink<Event<Headline>> ->  // (3)
                if (i < versions.size) {
                    emitter.next( // (4)
                        Event.of(
                            Headline(
                                "Micronaut " + versions[i] + " Released", "Come and get it"
                            )
                        )
                    )
                } else {
                    emitter.complete() // (5)
                }
                return@BiFunction i + 1
            })
    }
}
  1. 控制器方法返回事件的發(fā)布者

  2. 為每個版本的 Micronaut 發(fā)出一個標題

  3. Flux 類型的 generate 方法生成一個 Publisher。 generate 方法接受初始值和接受該值的 lambda 以及發(fā)射器。請注意,此示例與控制器操作在同一線程上執(zhí)行,但您可以使用 subscribeOn 或映射現(xiàn)有的“熱”Flux。

  4. Emitter 接口 onNext 方法發(fā)出 Event 類型的對象。 Event.of(ET) 工廠方法構造事件。

  5. Emitter 接口的 onComplete 方法指示何時完成發(fā)送服務器發(fā)送的事件。

您通常希望在單獨的執(zhí)行程序上安排 SSE 事件流。前面的示例使用@ExecuteOn 在 I/O 執(zhí)行器上執(zhí)行流。

上面的示例發(fā)回一個文本/事件流類型的響應,對于每個發(fā)出的事件,之前的標題類型將被轉換為 JSON,從而產生如下響應:

服務器發(fā)送的事件響應輸出

 data: {"title":"Micronaut 1.0 Released","description":"Come and get it"}
 data: {"title":"Micronaut 2.0 Released","description":"Come and get it"}

可以使用Event接口的方法自定義發(fā)回的Server Sent Event數(shù)據(jù),包括關聯(lián)事件id、注釋、重試超時時間等。


以上內容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號