Micronaut 服務(wù)器發(fā)送事件

2023-03-07 14:38 更新

Micronaut HTTP 服務(wù)器支持使用事件 API 發(fā)出服務(wù)器發(fā)送事件 (SSE)。

要從服務(wù)器發(fā)出事件,請(qǐng)返回一個(gè)發(fā)出事件類型對(duì)象的 Reactive Streams Publisher。

發(fā)布者本身可以通過事件系統(tǒng)等從后臺(tái)任務(wù)發(fā)布事件。

想象一個(gè)新聞標(biāo)題的事件流;你可以定義一個(gè)數(shù)據(jù)類如下:

Headline

 Java Groovy  Kotlin 
public class Headline {

    private String title;
    private String description;

    public Headline() {}

    public Headline(String title, String description) {
        this.title = title;
        this.description = description;
    }

    public String getTitle() {
        return title;
    }

    public String getDescription() {
        return description;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public void setDescription(String description) {
        this.description = description;
    }
}
class Headline {

    String title
    String description

    Headline() {}

    Headline(String title, String description) {
        this.title = title;
        this.description = description;
    }
}
class Headline {

    var title: String? = null
    var description: String? = null

    constructor()

    constructor(title: String, description: String) {
        this.title = title
        this.description = description
    }
}

要發(fā)出新聞標(biāo)題事件,請(qǐng)使用您喜歡的任何 Reactive 庫編寫一個(gè)控制器,該控制器返回一個(gè)事件發(fā)布者實(shí)例。下面的示例通過 generate 方法使用 Project Reactor 的 Flux:

從控制器發(fā)布服務(wù)器發(fā)送的事件

 Java Groovy  Kotlin 
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.sse.Event;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

@Controller("/headlines")
public class HeadlineController {

    @ExecuteOn(TaskExecutors.IO)
    @Get(produces = MediaType.TEXT_EVENT_STREAM)
    public Publisher<Event<Headline>> index() { // (1)
        String[] versions = {"1.0", "2.0"}; // (2)
        return Flux.generate(() -> 0, (i, emitter) -> { // (3)
            if (i < versions.length) {
                emitter.next( // (4)
                    Event.of(new Headline("Micronaut " + versions[i] + " Released", "Come and get it"))
                );
            } else {
                emitter.complete(); // (5)
            }
            return ++i;
        });
    }
}
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.sse.Event
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux

@Controller("/headlines")
class HeadlineController {

    @ExecuteOn(TaskExecutors.IO)
    @Get(produces = MediaType.TEXT_EVENT_STREAM)
    Publisher<Event<Headline>> index() { // (1)
        String[] versions = ["1.0", "2.0"] // (2)
        Flux.generate(() -> 0, (i, emitter) -> {
            if (i < versions.length) {
                emitter.next( // (4)
                        Event.of(new Headline("Micronaut ${versions[i]} Released", "Come and get it"))
                )
            } else {
                emitter.complete() // (5)
            }
            return i + 1
        })
    }
}
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.sse.Event
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
import java.util.concurrent.Callable
import java.util.function.BiFunction


@Controller("/headlines")
class HeadlineController {

    @ExecuteOn(TaskExecutors.IO)
    @Get(produces = [MediaType.TEXT_EVENT_STREAM])
    fun index(): Publisher<Event<Headline>> { // (1)
        val versions = arrayOf("1.0", "2.0") // (2)
        return Flux.generate(
            { 0 },
            BiFunction { i: Int, emitter: SynchronousSink<Event<Headline>> ->  // (3)
                if (i < versions.size) {
                    emitter.next( // (4)
                        Event.of(
                            Headline(
                                "Micronaut " + versions[i] + " Released", "Come and get it"
                            )
                        )
                    )
                } else {
                    emitter.complete() // (5)
                }
                return@BiFunction i + 1
            })
    }
}
  1. 控制器方法返回事件的發(fā)布者

  2. 為每個(gè)版本的 Micronaut 發(fā)出一個(gè)標(biāo)題

  3. Flux 類型的 generate 方法生成一個(gè) Publisher。 generate 方法接受初始值和接受該值的 lambda 以及發(fā)射器。請(qǐng)注意,此示例與控制器操作在同一線程上執(zhí)行,但您可以使用 subscribeOn 或映射現(xiàn)有的“熱”Flux。

  4. Emitter 接口 onNext 方法發(fā)出 Event 類型的對(duì)象。 Event.of(ET) 工廠方法構(gòu)造事件。

  5. Emitter 接口的 onComplete 方法指示何時(shí)完成發(fā)送服務(wù)器發(fā)送的事件。

您通常希望在單獨(dú)的執(zhí)行程序上安排 SSE 事件流。前面的示例使用@ExecuteOn 在 I/O 執(zhí)行器上執(zhí)行流。

上面的示例發(fā)回一個(gè)文本/事件流類型的響應(yīng),對(duì)于每個(gè)發(fā)出的事件,之前的標(biāo)題類型將被轉(zhuǎn)換為 JSON,從而產(chǎn)生如下響應(yīng):

服務(wù)器發(fā)送的事件響應(yīng)輸出

 data: {"title":"Micronaut 1.0 Released","description":"Come and get it"}
 data: {"title":"Micronaut 2.0 Released","description":"Come and get it"}

可以使用Event接口的方法自定義發(fā)回的Server Sent Event數(shù)據(jù),包括關(guān)聯(lián)事件id、注釋、重試超時(shí)時(shí)間等。


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)