HttpClient 接口構成了低級 API 的基礎。此接口聲明有助于簡化執(zhí)行 HTTP 請求和接收響應的方法。
HttpClient 接口中的大多數(shù)方法都返回 Reactive Streams Publisher 實例,這并不總是最有用的接口。
Micronaut 的 Reactor HTTP Client 依賴項附帶一個名為 ReactorHttpClient 的子接口。它提供了返回 Project Reactor Flux 類型的 HttpClient 接口的變體。
發(fā)送您的第一個 HTTP 請求
獲取 HttpClient
有幾種方法可以獲取對 HttpClient 的引用。最常見的是使用 Client 注釋。例如:
注入 HTTP 客戶端
@Client("https://api.twitter.com/1.1") @Inject HttpClient httpClient;
上面的示例注入了一個以 Twitter API 為目標的客戶端。
@field:Client("\${myapp.api.twitter.url}") @Inject lateinit var httpClient: HttpClient
上面的 Kotlin 示例使用配置路徑注入了一個以 Twitter API 為目標的客戶端。請注意“\${path.to.config}”上所需的轉義(反斜杠),這是由于 Kotlin 字符串插值所必需的。
Client 注釋也是一個自定義范圍,用于管理 HttpClient 實例的創(chuàng)建并確保它們在應用程序關閉時停止。
您傳遞給 Client 注釋的值可以是以下之一:
另一種創(chuàng)建 HttpClient 的方法是使用 HttpClient 的靜態(tài)創(chuàng)建方法,但是不推薦使用這種方法,因為您必須確保手動關閉客戶端,當然創(chuàng)建的客戶端不會發(fā)生依賴注入。
執(zhí)行 HTTP GET
使用 HttpClient 時,通常有兩種感興趣的方法。第一個是 retrieve,它執(zhí)行一個 HTTP 請求并以您作為 Publisher 請求的任何類型(默認為 String)返回正文。
retrieve 方法接受一個 HttpRequest 或一個字符串 URI 到您希望請求的端點。
以下示例顯示如何使用 retrieve 執(zhí)行 HTTP GET 并將響應主體作為字符串接收:
使用檢索
Java |
Groovy |
Kotlin |
String uri = UriBuilder.of("/hello/{name}")
.expand(Collections.singletonMap("name", "John"))
.toString();
assertEquals("/hello/John", uri);
String result = client.toBlocking().retrieve(uri);
assertEquals("Hello John", result);
|
when:
String uri = UriBuilder.of("/hello/{name}")
.expand(name: "John")
then:
"/hello/John" == uri
when:
String result = client.toBlocking().retrieve(uri)
then:
"Hello John" == result
|
val uri = UriBuilder.of("/hello/{name}")
.expand(Collections.singletonMap("name", "John"))
.toString()
uri shouldBe "/hello/John"
val result = client.toBlocking().retrieve(uri)
result shouldBe "Hello John"
|
請注意,在此示例中,出于說明目的,我們調用 toBlocking() 以返回客戶端的阻塞版本。但是,在生產(chǎn)代碼中,您不應該這樣做,而應該依賴 Micronaut HTTP 服務器的非阻塞特性。
例如,以下 @Controller 方法以非阻塞方式調用另一個端點:
不阻塞地使用 HTTP 客戶端
Java |
Groovy |
Kotlin |
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.Post;
import io.micronaut.http.annotation.Status;
import io.micronaut.http.client.HttpClient;
import io.micronaut.http.client.annotation.Client;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import io.micronaut.core.async.annotation.SingleResult;
import static io.micronaut.http.HttpRequest.GET;
import static io.micronaut.http.HttpStatus.CREATED;
import static io.micronaut.http.MediaType.TEXT_PLAIN;
@Get("/hello/{name}")
@SingleResult
Publisher<String> hello(String name) { // (1)
return Mono.from(httpClient.retrieve(GET("/hello/" + name))); // (2)
}
|
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.Post
import io.micronaut.http.annotation.Status
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import org.reactivestreams.Publisher
import io.micronaut.core.async.annotation.SingleResult
import reactor.core.publisher.Mono
import static io.micronaut.http.HttpRequest.GET
import static io.micronaut.http.HttpStatus.CREATED
import static io.micronaut.http.MediaType.TEXT_PLAIN
@Get("/hello/{name}")
@SingleResult
Publisher<String> hello(String name) { // (1)
Mono.from(httpClient.retrieve( GET("/hello/" + name))) // (2)
}
|
import io.micronaut.http.HttpRequest.GET
import io.micronaut.http.HttpStatus.CREATED
import io.micronaut.http.MediaType.TEXT_PLAIN
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.Post
import io.micronaut.http.annotation.Status
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import io.micronaut.core.async.annotation.SingleResult
@Get("/hello/{name}")
@SingleResult
internal fun hello(name: String): Publisher<String> { // (1)
return Flux.from(httpClient.retrieve(GET<Any>("/hello/$name")))
.next() // (2)
}
|
hello 方法返回一個 Mono,它可能會或可能不會發(fā)出一個項目。如果未發(fā)出某個項目,則返回 404。
檢索方法被調用,它返回一個 Flux。這有一個 firstElement 方法返回第一個發(fā)出的項目或什么都不返回
使用 Reactor(如果您愿意,也可以使用 RxJava),您可以輕松高效地編寫多個 HTTP 客戶端調用,而不會阻塞(這會限制您的應用程序的吞吐量和可擴展性)。
調試/跟蹤 HTTP 客戶端
要調試從 HTTP 客戶端發(fā)送和接收的請求,您可以通過 logback.xml 文件啟用跟蹤日志記錄:
logback.xml
<logger name="io.micronaut.http.client" level="TRACE"/>
客戶端特定調試/跟蹤
要啟用特定于客戶端的日志記錄,您可以為所有 HTTP 客戶端配置默認記錄器。您還可以使用特定于客戶端的配置為不同的客戶端配置不同的記錄器。例如,在您的配置文件(例如 application.yml)中:
Properties |
Yaml |
Toml |
Groovy |
Hocon |
JSON |
micronaut.http.client.logger-name=mylogger
micronaut.http.services.otherClient.logger-name=other.client
|
micronaut:
http:
client:
logger-name: mylogger
services:
otherClient:
logger-name: other.client
|
[micronaut]
[micronaut.http]
[micronaut.http.client]
logger-name="mylogger"
[micronaut.http.services]
[micronaut.http.services.otherClient]
logger-name="other.client"
|
micronaut {
http {
client {
loggerName = "mylogger"
}
services {
otherClient {
loggerName = "other.client"
}
}
}
}
|
{
micronaut {
http {
client {
logger-name = "mylogger"
}
services {
otherClient {
logger-name = "other.client"
}
}
}
}
}
|
{
"micronaut": {
"http": {
"client": {
"logger-name": "mylogger"
},
"services": {
"otherClient": {
"logger-name": "other.client"
}
}
}
}
}
|
然后在 logback.xml 中啟用日志記錄:
logback.xml
<logger name="mylogger" level="DEBUG"/>
<logger name="other.client" level="TRACE"/>
自定義 HTTP 請求
前面的示例演示了使用 HttpRequest 接口的靜態(tài)方法來構造 MutableHttpRequest 實例。顧名思義,MutableHttpRequest 可以改變,包括添加標頭、自定義請求正文等的能力。例如:
傳遞一個 HttpRequest 來檢索
Java |
Groovy |
Kotlin |
Flux<String> response = Flux.from(client.retrieve(
GET("/hello/John")
.header("X-My-Header", "SomeValue")
));
|
Flux<String> response = Flux.from(client.retrieve(
GET("/hello/John")
.header("X-My-Header", "SomeValue")
))
|
val response = client.retrieve(
GET<Any>("/hello/John")
.header("X-My-Header", "SomeValue")
)
|
上面的示例在發(fā)送之前向響應添加一個標頭(X-My-Header)。 MutableHttpRequest 接口有更多方便的方法,可以很容易地以常用的方式修改請求。
讀取 JSON 響應
微服務通常使用 JSON 等消息編碼格式。 Micronaut 的 HTTP 客戶端利用 Jackson 進行 JSON 解析,因此 Jackson 可以解碼的任何類型都可以作為第二個參數(shù)傳遞給檢索。
例如,考慮以下返回 JSON 響應的 @Controller 方法:
從控制器返回 JSON
Java |
Groovy |
Kotlin |
@Get("/greet/{name}")
Message greet(String name) {
return new Message("Hello " + name);
}
|
@Get("/greet/{name}")
Message greet(String name) {
new Message("Hello $name")
}
|
@Get("/greet/{name}")
internal fun greet(name: String): Message {
return Message("Hello $name")
}
|
上面的方法返回一個 Message 類型的 POJO,如下所示:
Message POJO
Java |
Groovy |
Kotlin |
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class Message {
private final String text;
@JsonCreator
public Message(@JsonProperty("text") String text) {
this.text = text;
}
public String getText() {
return text;
}
}
|
import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty
class Message {
final String text
@JsonCreator
Message(@JsonProperty("text") String text) {
this.text = text
}
}
|
import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty
class Message @JsonCreator
constructor(@param:JsonProperty("text") val text: String)
|
Jackson注解用于映射構造函數(shù)
在客戶端,您可以調用此端點并使用 retrieve 方法將 JSON 解碼為映射,如下所示:
將響應主體解碼為 Map
Java |
Groovy |
Kotlin |
response = Flux.from(client.retrieve(
GET("/greet/John"),
Argument.of(Map.class, String.class, String.class) // (1)
));
|
response = Flux.from(client.retrieve(
GET("/greet/John"),
Argument.of(Map, String, String) // (1)
))
|
var response: Flux<Map<*, *>> = Flux.from(client.retrieve(
GET<Any>("/greet/John"), Map::class.java
))
|
Argument.of 方法返回一個 Map,其中鍵和值類型為 String
雖然檢索 JSON 作為映射可能是可取的,但通常您希望將對象解碼為 POJO。為此,請改為傳遞類型:
將響應主體解碼為 POJO
Java |
Groovy |
Kotlin |
Flux<Message> response = Flux.from(client.retrieve(
GET("/greet/John"), Message.class
));
assertEquals("Hello John", response.blockFirst().getText());
|
when:
Flux<Message> response = Flux.from(client.retrieve(
GET("/greet/John"), Message
))
then:
"Hello John" == response.blockFirst().getText()
|
val response = Flux.from(client.retrieve(
GET<Any>("/greet/John"), Message::class.java
))
response.blockFirst().text shouldBe "Hello John"
|
請注意如何在客戶端和服務器上使用相同的 Java 類型。這意味著您通常會定義一個公共 API 項目,在該項目中定義用于定義 API 的接口和類型。
解碼其他內容類型
如果您與之通信的服務器使用非 JSON 的自定義內容類型,默認情況下 Micronaut 的 HTTP 客戶端將不知道如何解碼這種類型。
要解決此問題,請將 MediaTypeCodec 注冊為一個 bean,它會被自動拾取并用于解碼(或編碼)消息。
接收完整的 HTTP 響應
有時僅接收響應主體是不夠的,您還需要響應中的其他信息,例如標頭、cookie 等。在這種情況下,不要使用 retrieve 方法,而是使用 exchange 方法:
接收完整的 HTTP 響應
Java |
Groovy |
Kotlin |
Flux<HttpResponse<Message>> call = Flux.from(client.exchange(
GET("/greet/John"), Message.class // (1)
));
HttpResponse<Message> response = call.blockFirst();
Optional<Message> message = response.getBody(Message.class); // (2)
// check the status
assertEquals(HttpStatus.OK, response.getStatus()); // (3)
// check the body
assertTrue(message.isPresent());
assertEquals("Hello John", message.get().getText());
|
when:
Flux<HttpResponse<Message>> call = Flux.from(client.exchange(
GET("/greet/John"), Message // (1)
))
HttpResponse<Message> response = call.blockFirst();
Optional<Message> message = response.getBody(Message) // (2)
// check the status
then:
HttpStatus.OK == response.getStatus() // (3)
// check the body
message.isPresent()
"Hello John" == message.get().getText()
|
val call = client.exchange(
GET<Any>("/greet/John"), Message::class.java // (1)
)
val response = Flux.from(call).blockFirst()
val message = response.getBody(Message::class.java) // (2)
// check the status
response.status shouldBe HttpStatus.OK // (3)
// check the body
message.isPresent shouldBe true
message.get().text shouldBe "Hello John"
|
交換方法接收 HttpResponse
使用響應的 getBody(..) 方法檢索正文
可以檢查響應的其他方面,例如 HttpStatus
上面的示例接收完整的 HttpResponse,您可以從中獲取標頭和其他有用信息。
發(fā)布請求正文
到目前為止,所有示例都使用了相同的 HTTP 方法,即 GET。 HttpRequest 接口具有適用于所有不同 HTTP 方法的工廠方法。下表總結了它們:
還存在一個創(chuàng)建方法來構造任何 HttpMethod 類型的請求。由于 POST、PUT 和 PATCH 方法需要主體,因此需要第二個參數(shù),即主體對象。
以下示例演示了如何發(fā)送簡單的 String 正文:
發(fā)送字符串正文
Java |
Groovy |
Kotlin |
Flux<HttpResponse<String>> call = Flux.from(client.exchange(
POST("/hello", "Hello John") // (1)
.contentType(MediaType.TEXT_PLAIN_TYPE)
.accept(MediaType.TEXT_PLAIN_TYPE), // (2)
String.class // (3)
));
|
Flux<HttpResponse<String>> call = Flux.from(client.exchange(
POST("/hello", "Hello John") // (1)
.contentType(MediaType.TEXT_PLAIN_TYPE)
.accept(MediaType.TEXT_PLAIN_TYPE), // (2)
String // (3)
))
|
val call = client.exchange(
POST("/hello", "Hello John") // (1)
.contentType(MediaType.TEXT_PLAIN_TYPE)
.accept(MediaType.TEXT_PLAIN_TYPE), String::class.java // (3)
)
|
使用POST方法;第一個參數(shù)是 URI,第二個是主體
內容類型和接受類型設置為text/plain(默認為application/json)
預期的響應類型是 String
發(fā)送 JSON
前面的示例發(fā)送純文本。要發(fā)送 JSON,將要編碼的對象傳遞給 JSON(無論是 Map 還是 POJO),只要 Jackson 能夠對其進行編碼。
例如,您可以從上一節(jié)創(chuàng)建一個 Message 并將其傳遞給 POST 方法:
Sending a JSON body
Java |
Groovy |
Kotlin |
Flux<HttpResponse<Message>> call = Flux.from(client.exchange(
POST("/greet", new Message("Hello John")), // (1)
Message.class // (2)
));
|
Flux<HttpResponse<Message>> call = Flux.from(client.exchange(
POST("/greet", new Message("Hello John")), // (1)
Message // (2)
))
|
val call = client.exchange(
POST("/greet", Message("Hello John")), Message::class.java // (2)
)
|
創(chuàng)建 Message 實例并將其傳遞給 POST 方法
同一個類解碼響應
在上面的示例中,以下 JSON 作為請求的主體發(fā)送:
Resulting JSON
{"text":"Hello John"}
可以使用 Jackson Annotations 自定義 JSON。
使用 URI 模板
如果在 URI 中包含對象的某些屬性,則可以使用 URI 模板。
例如,假設您有一個帶有 title 屬性的 Book 類。您可以在 URI 模板中包含標題,然后從 Book 的實例中填充它。例如:
Sending a JSON body with a URI template
Java |
Groovy |
Kotlin |
Flux<HttpResponse<Book>> call = Flux.from(client.exchange(
POST("/amazon/book/{title}", new Book("The Stand")),
Book.class
));
|
Flux<HttpResponse<Book>> call = client.exchange(
POST("/amazon/book/{title}", new Book("The Stand")),
Book
);
|
val call = client.exchange(
POST("/amazon/book/{title}", Book("The Stand")),
Book::class.java
)
|
在上述情況下,title 屬性包含在 URI 中。
發(fā)送表單數(shù)據(jù)
您還可以將 POJO 或地圖編碼為表單數(shù)據(jù)而不是 JSON。只需在發(fā)布請求中將內容類型設置為 application/x-www-form-urlencoded:
Sending a Form Data
Java |
Groovy |
Kotlin |
Flux<HttpResponse<Book>> call = Flux.from(client.exchange(
POST("/amazon/book/{title}", new Book("The Stand"))
.contentType(MediaType.APPLICATION_FORM_URLENCODED),
Book.class
));
|
Flux<HttpResponse<Book>> call = client.exchange(
POST("/amazon/book/{title}", new Book("The Stand"))
.contentType(MediaType.APPLICATION_FORM_URLENCODED),
Book
)
|
val call = client.exchange(
POST("/amazon/book/{title}", Book("The Stand"))
.contentType(MediaType.APPLICATION_FORM_URLENCODED),
Book::class.java
)
|
請注意,Jackson 也可以綁定表單數(shù)據(jù),因此要自定義綁定過程,請使用 Jackson 注釋。
多部分客戶端上傳
Micronaut HTTP 客戶端支持多部分請求。要構建多部分請求,請將內容類型設置為 multipart/form-data 并將正文設置為 MultipartBody 的實例。
例如:
Creating the body
Java |
Groovy |
Kotlin |
import io.micronaut.http.client.multipart.MultipartBody;
String toWrite = "test file";
File file = File.createTempFile("data", ".txt");
FileWriter writer = new FileWriter(file);
writer.write(toWrite);
writer.close();
MultipartBody requestBody = MultipartBody.builder() // (1)
.addPart( // (2)
"data",
file.getName(),
MediaType.TEXT_PLAIN_TYPE,
file
).build(); // (3)
|
import io.micronaut.http.multipart.CompletedFileUpload
import io.micronaut.http.multipart.StreamingFileUpload
import io.micronaut.http.client.multipart.MultipartBody
import org.reactivestreams.Publisher
File file = new File(uploadDir, "data.txt")
file.text = "test file"
file.createNewFile()
MultipartBody requestBody = MultipartBody.builder() // (1)
.addPart( // (2)
"data",
file.name,
MediaType.TEXT_PLAIN_TYPE,
file
).build() // (3)
|
import io.micronaut.http.client.multipart.MultipartBody
val toWrite = "test file"
val file = File.createTempFile("data", ".txt")
val writer = FileWriter(file)
writer.write(toWrite)
writer.close()
val requestBody = MultipartBody.builder() // (1)
.addPart( // (2)
"data",
file.name,
MediaType.TEXT_PLAIN_TYPE,
file
).build() // (3)
|
創(chuàng)建一個 MultipartBody 構建器,用于向主體添加部件。
將一個部分添加到正文中,在本例中是一個文件。此方法在 MultipartBody.Builder 中有不同的變體。
build 方法將構建器中的所有部件組裝成一個 MultipartBody。至少需要一個部分。
創(chuàng)建請求
Java |
Groovy |
Kotlin |
HttpRequest.POST("/multipart/upload", requestBody) // (1)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE) // (2)
|
HttpRequest.POST("/multipart/upload", requestBody) // (1)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE) // (2)
|
HttpRequest.POST("/multipart/upload", requestBody) // (1)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE) // (2)
|
具有不同類型數(shù)據(jù)的多部分請求正文。
將請求的內容類型標頭設置為 multipart/form-data。
通過 HTTP 流式傳輸 JSON
Micronaut 的 HTTP 客戶端支持通過 ReactorStreamingHttpClient 接口通過 HTTP 流式傳輸數(shù)據(jù),該接口包括特定于流式傳輸?shù)姆椒?,包括?/p>
表 1. HTTP 流媒體方法
方法 |
描述 |
dataStream(HttpRequest<I> request)
|
將數(shù)據(jù)流作為 ByteBuffer 的 Flux 返回
|
exchangeStream(HttpRequest<I> request)
|
返回包裝 ByteBuffer 的 Flux 的 HttpResponse
|
jsonStream(HttpRequest<I> request)
|
返回一個非阻塞的 JSON 對象流
|
要使用 JSON 流,請在服務器上聲明一個控制器方法,該方法返回 JSON 對象的 application/x-json-stream。例如:
Streaming JSON on the Server
Java |
Groovy |
Kotlin |
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
@Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) // (1)
Publisher<Headline> streamHeadlines() {
return Mono.fromCallable(() -> { // (2)
Headline headline = new Headline();
headline.setText("Latest Headline at " + ZonedDateTime.now());
return headline;
}).repeat(100) // (3)
.delayElements(Duration.of(1, ChronoUnit.SECONDS)); // (4)
}
|
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration
import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
@Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) // (1)
Flux<Headline> streamHeadlines() {
Mono.fromCallable({ // (2)
new Headline(text: "Latest Headline at ${ZonedDateTime.now()}")
}).repeat(100) // (3)
.delayElements(Duration.of(1, ChronoUnit.SECONDS)) // (4)
}
|
import io.micronaut.http.MediaType.APPLICATION_JSON_STREAM
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration
import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit.SECONDS
@Get(value = "/headlines", processes = [APPLICATION_JSON_STREAM]) // (1)
internal fun streamHeadlines(): Flux<Headline> {
return Mono.fromCallable { // (2)
val headline = Headline()
headline.text = "Latest Headline at ${ZonedDateTime.now()}"
headline
}.repeat(100) // (3)
.delayElements(Duration.of(1, ChronoUnit.SECONDS)) // (4)
}
|
streamHeadlines 方法產(chǎn)生 application/x-json-stream
Flux 是從 Callable 函數(shù)創(chuàng)建的(注意函數(shù)內不會發(fā)生阻塞,所以這沒問題,否則你應該訂閱 I/O 線程池)。
Flux 重復 100 次
Flux 發(fā)射物品,每個物品之間有 1 秒的延遲
服務器不必用 Micronaut 編寫,任何支持 JSON 流的服務器都可以。
然后在客戶端上,使用 jsonStream 訂閱流,每次服務器發(fā)出 JSON 對象時,客戶端都會解碼并使用它:
在客戶端流式傳輸 JSON
Java |
Groovy |
Kotlin |
Flux<Headline> headlineStream = Flux.from(client.jsonStream(
GET("/streaming/headlines"), Headline.class)); // (1)
CompletableFuture<Headline> future = new CompletableFuture<>(); // (2)
headlineStream.subscribe(new Subscriber<Headline>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1); // (3)
}
@Override
public void onNext(Headline headline) {
System.out.println("Received Headline = " + headline.getText());
future.complete(headline); // (4)
}
@Override
public void onError(Throwable t) {
future.completeExceptionally(t); // (5)
}
@Override
public void onComplete() {
// no-op // (6)
}
});
|
Flux<Headline> headlineStream = Flux.from(client.jsonStream(
GET("/streaming/headlines"), Headline)) // (1)
CompletableFuture<Headline> future = new CompletableFuture<>() // (2)
headlineStream.subscribe(new Subscriber<Headline>() {
@Override
void onSubscribe(Subscription s) {
s.request(1) // (3)
}
@Override
void onNext(Headline headline) {
println "Received Headline = $headline.text"
future.complete(headline) // (4)
}
@Override
void onError(Throwable t) {
future.completeExceptionally(t) // (5)
}
@Override
void onComplete() {
// no-op // (6)
}
})
|
val headlineStream = client.jsonStream(
GET<Any>("/streaming/headlines"), Headline::class.java) // (1)
val future = CompletableFuture<Headline>() // (2)
headlineStream.subscribe(object : Subscriber<Headline> {
override fun onSubscribe(s: Subscription) {
s.request(1) // (3)
}
override fun onNext(headline: Headline) {
println("Received Headline = ${headline.text!!}")
future.complete(headline) // (4)
}
override fun onError(t: Throwable) {
future.completeExceptionally(t) // (5)
}
override fun onComplete() {
// no-op // (6)
}
})
|
jsonStream 方法返回一個 Flux
CompletableFuture 用于接收值,但是您對每個發(fā)出的項目執(zhí)行的操作是特定于應用程序的
訂閱請求單個項目。您可以使用訂閱來調節(jié)背壓和需求。
onNext 方法在一個項目被發(fā)出時被調用
發(fā)生錯誤時調用 onError 方法
onComplete 方法在所有 Headline 實例發(fā)出后被調用
請注意,上例中的服務器和客戶端都不執(zhí)行任何阻塞 I/O。
配置 HTTP 客戶端
所有客戶端的全局配置
默認的 HTTP 客戶端配置是一個名為 DefaultHttpClientConfiguration 的配置屬性,它允許為所有 HTTP 客戶端配置默認行為。例如,在您的配置文件中(例如 application.yml):
更改默認 HTTP 客戶端配置
Properties |
Yaml |
Toml |
Groovy |
Hocon |
JSON |
micronaut.http.client.read-timeout=5s
|
micronaut:
http:
client:
read-timeout: 5s
|
[micronaut]
[micronaut.http]
[micronaut.http.client]
read-timeout="5s"
|
micronaut {
http {
client {
readTimeout = "5s"
}
}
}
|
{
micronaut {
http {
client {
read-timeout = "5s"
}
}
}
}
|
{
"micronaut": {
"http": {
"client": {
"read-timeout": "5s"
}
}
}
}
|
上面的示例設置了 HttpClientConfiguration 類的 readTimeout 屬性。
客戶端特定配置
要為每個客戶端單獨配置,有幾個選項。您可以在配置文件(例如 application.yml)中手動配置服務發(fā)現(xiàn)并應用每個客戶端配置:
手動配置 HTTP 服務
Properties |
Yaml |
Toml |
Groovy |
Hocon |
JSON |
micronaut.http.services.foo.urls[0]=http://foo1
micronaut.http.services.foo.urls[1]=http://foo2
micronaut.http.services.foo.read-timeout=5s
|
micronaut:
http:
services:
foo:
urls:
- http://foo1
- http://foo2
read-timeout: 5s
|
[micronaut]
[micronaut.http]
[micronaut.http.services]
[micronaut.http.services.foo]
urls=[
"http://foo1",
"http://foo2"
]
read-timeout="5s"
|
micronaut {
http {
services {
foo {
urls = ["http://foo1", "http://foo2"]
readTimeout = "5s"
}
}
}
}
|
{
micronaut {
http {
services {
foo {
urls = ["http://foo1", "http://foo2"]
read-timeout = "5s"
}
}
}
}
}
|
{
"micronaut": {
"http": {
"services": {
"foo": {
"urls": ["http://foo1", "http://foo2"],
"read-timeout": "5s"
}
}
}
}
}
|
警告:此客戶端配置可以與 @Client 注釋結合使用,通過直接注入 HttpClient 或在客戶端界面上使用。在任何情況下,注釋上的所有其他屬性都將被忽略,除了服務 id。
然后,注入指定的客戶端配置:
注入 HTTP 客戶端
@Client("foo") @Inject ReactorHttpClient httpClient;
您還可以定義一個從 HttpClientConfiguration 擴展的 bean,并確保 javax.inject.Named 注釋適當?shù)孛?
定義 HTTP 客戶端配置 bean
@Named("twitter")
@Singleton
class TwitterHttpClientConfiguration extends HttpClientConfiguration {
public TwitterHttpClientConfiguration(ApplicationConfiguration configuration) {
super(configuration);
}
}
如果您使用服務發(fā)現(xiàn)使用 @Client 注入名為 twitter 的服務,則將選擇此配置:
注入 HTTP 客戶端
@Client("twitter") @Inject ReactorHttpClient httpClient;
或者,如果您不使用服務發(fā)現(xiàn),則可以使用 @Client 的配置成員來引用特定類型:
注入 HTTP 客戶端
@Client(value = "https://api.twitter.com/1.1",
configuration = TwitterHttpClientConfiguration.class)
@Inject
ReactorHttpClient httpClient;
使用 HTTP 客戶端連接池
處理大量請求的客戶端將受益于啟用 HTTP 客戶端連接池。以下配置為 foo 客戶端啟用池化:
手動配置 HTTP 服務
Properties |
Yaml |
Toml |
Groovy |
Hocon |
JSON |
micronaut.http.services.foo.urls[0]=http://foo1
micronaut.http.services.foo.urls[1]=http://foo2
micronaut.http.services.foo.pool.enabled=true
micronaut.http.services.foo.pool.max-connections=50
|
micronaut:
http:
services:
foo:
urls:
- http://foo1
- http://foo2
pool:
enabled: true
max-connections: 50
|
[micronaut]
[micronaut.http]
[micronaut.http.services]
[micronaut.http.services.foo]
urls=[
"http://foo1",
"http://foo2"
]
[micronaut.http.services.foo.pool]
enabled=true
max-connections=50
|
micronaut {
http {
services {
foo {
urls = ["http://foo1", "http://foo2"]
pool {
enabled = true
maxConnections = 50
}
}
}
}
}
|
{
micronaut {
http {
services {
foo {
urls = ["http://foo1", "http://foo2"]
pool {
enabled = true
max-connections = 50
}
}
}
}
}
}
|
{
"micronaut": {
"http": {
"services": {
"foo": {
"urls": ["http://foo1", "http://foo2"],
"pool": {
"enabled": true,
"max-connections": 50
}
}
}
}
}
}
|
有關可用池配置選項的詳細信息,請參閱 ConnectionPoolConfiguration 的 API。
配置事件循環(huán)組
默認情況下,Micronaut 為工作線程和所有 HTTP 客戶端線程共享一個通用的 Netty EventLoopGroup。
這個 EventLoopGroup 可以通過 micronaut.netty.event-loops.default 屬性進行配置:
配置默認事件循環(huán)
Properties |
Yaml |
Toml |
Groovy |
Hocon |
JSON |
micronaut.netty.event-loops.default.num-threads=10
micronaut.netty.event-loops.default.prefer-native-transport=true
|
micronaut:
netty:
event-loops:
default:
num-threads: 10
prefer-native-transport: true
|
[micronaut]
[micronaut.netty]
[micronaut.netty.event-loops]
[micronaut.netty.event-loops.default]
num-threads=10
prefer-native-transport=true
|
micronaut {
netty {
eventLoops {
'default' {
numThreads = 10
preferNativeTransport = true
}
}
}
}
|
{
micronaut {
netty {
event-loops {
default {
num-threads = 10
prefer-native-transport = true
}
}
}
}
}
|
{
"micronaut": {
"netty": {
"event-loops": {
"default": {
"num-threads": 10,
"prefer-native-transport": true
}
}
}
}
}
|
您還可以使用 micronaut.netty.event-loops 設置來配置一個或多個額外的事件循環(huán)。下表總結了屬性:
表 1. DefaultEventLoopGroupConfiguration 的配置屬性
屬性 |
類型 |
描述 |
micronaut.netty.event-loops.*.num-threads
|
int
|
|
micronaut.netty.event-loops.*.io-ratio
|
java.lang.Integer
|
|
micronaut.netty.event-loops.*.prefer-native-transport
|
boolean
|
|
micronaut.netty.event-loops.*.executor
|
java.lang.String
|
|
micronaut.netty.event-loops.*.shutdown-quiet-period
|
java.time.Duration
|
|
micronaut.netty.event-loops.*.shutdown-timeout
|
java.time.Duration
|
|
例如,如果您與 HTTP 客戶端的交互涉及 CPU 密集型工作,則可能值得為一個或所有客戶端配置一個單獨的 EventLoopGroup。
以下示例配置了一個名為“other”的附加事件循環(huán)組,其中包含 10 個線程:
配置額外的事件循環(huán)
Properties |
Yaml |
Toml |
Groovy |
Hocon |
JSON |
micronaut.netty.event-loops.other.num-threads=10
micronaut.netty.event-loops.other.prefer-native-transport=true
|
micronaut:
netty:
event-loops:
other:
num-threads: 10
prefer-native-transport: true
|
[micronaut]
[micronaut.netty]
[micronaut.netty.event-loops]
[micronaut.netty.event-loops.other]
num-threads=10
prefer-native-transport=true
|
micronaut {
netty {
eventLoops {
other {
numThreads = 10
preferNativeTransport = true
}
}
}
}
|
{
micronaut {
netty {
event-loops {
other {
num-threads = 10
prefer-native-transport = true
}
}
}
}
}
|
{
"micronaut": {
"netty": {
"event-loops": {
"other": {
"num-threads": 10,
"prefer-native-transport": true
}
}
}
}
}
|
配置附加事件循環(huán)后,您可以更改 HTTP 客戶端配置以使用它:
改變客戶端使用的事件循環(huán)組
Properties |
Yaml |
Toml |
Groovy |
Hocon |
JSON |
micronaut.http.client.event-loop-group=other
|
micronaut:
http:
client:
event-loop-group: other
|
[micronaut]
[micronaut.http]
[micronaut.http.client]
event-loop-group="other"
|
micronaut {
http {
client {
eventLoopGroup = "other"
}
}
}
|
{
micronaut {
http {
client {
event-loop-group = "other"
}
}
}
}
|
{
"micronaut": {
"http": {
"client": {
"event-loop-group": "other"
}
}
}
}
|
錯誤響應
如果返回代碼為 400 或更高的 HTTP 響應,則會創(chuàng)建 HttpClientResponseException。異常包含原始響應。如何拋出異常取決于方法的返回類型。
對于阻塞客戶端,拋出異常并應由調用者捕獲和處理。對于反應式客戶端,異常作為錯誤通過發(fā)布者傳遞。
綁定錯誤
如果請求成功,您通常希望使用端點并綁定到 POJO,如果發(fā)生錯誤則綁定到不同的 POJO。以下示例顯示如何調用具有成功和錯誤類型的交換。
Java |
Groovy |
Kotlin |
@Controller("/books")
public class BooksController {
@Get("/{isbn}")
public HttpResponse find(String isbn) {
if (isbn.equals("1680502395")) {
Map<String, Object> m = new HashMap<>();
m.put("status", 401);
m.put("error", "Unauthorized");
m.put("message", "No message available");
m.put("path", "/books/" + isbn);
return HttpResponse.status(HttpStatus.UNAUTHORIZED).body(m);
}
return HttpResponse.ok(new Book("1491950358", "Building Microservices"));
}
}
|
@Controller("/books")
class BooksController {
@Get("/{isbn}")
HttpResponse find(String isbn) {
if (isbn == "1680502395") {
Map<String, Object> m = [
status : 401,
error : "Unauthorized",
message: "No message available",
path : "/books/" + isbn]
return HttpResponse.status(HttpStatus.UNAUTHORIZED).body(m)
}
return HttpResponse.ok(new Book("1491950358", "Building Microservices"))
}
}
|
@Controller("/books")
class BooksController {
@Get("/{isbn}")
fun find(isbn: String): HttpResponse<*> {
if (isbn == "1680502395") {
val m = mapOf(
"status" to 401,
"error" to "Unauthorized",
"message" to "No message available",
"path" to "/books/$isbn"
)
return HttpResponse.status<Any>(HttpStatus.UNAUTHORIZED).body(m)
}
return HttpResponse.ok(Book("1491950358", "Building Microservices"))
}
}
|
Java |
Groovy |
Kotlin |
@Test
public void afterAnHttpClientExceptionTheResponseBodyCanBeBoundToAPOJO() {
try {
client.toBlocking().exchange(HttpRequest.GET("/books/1680502395"),
Argument.of(Book.class), // (1)
Argument.of(CustomError.class)); // (2)
} catch (HttpClientResponseException e) {
assertEquals(HttpStatus.UNAUTHORIZED, e.getResponse().getStatus());
Optional<CustomError> jsonError = e.getResponse().getBody(CustomError.class);
assertTrue(jsonError.isPresent());
assertEquals(401, jsonError.get().status);
assertEquals("Unauthorized", jsonError.get().error);
assertEquals("No message available", jsonError.get().message);
assertEquals("/books/1680502395", jsonError.get().path);
}
}
|
def "after an HttpClientException the response body can be bound to a POJO"() {
when:
client.toBlocking().exchange(HttpRequest.GET("/books/1680502395"),
Argument.of(Book), // (1)
Argument.of(CustomError)) // (2)
then:
def e = thrown(HttpClientResponseException)
e.response.status == HttpStatus.UNAUTHORIZED
when:
Optional<CustomError> jsonError = e.response.getBody(CustomError)
then:
jsonError.isPresent()
jsonError.get().status == 401
jsonError.get().error == 'Unauthorized'
jsonError.get().message == 'No message available'
jsonError.get().path == '/books/1680502395'
}
|
"after an httpclient exception the response body can be bound to a POJO" {
try {
client.toBlocking().exchange(HttpRequest.GET<Any>("/books/1680502395"),
Argument.of(Book::class.java), // (1)
Argument.of(CustomError::class.java)) // (2)
} catch (e: HttpClientResponseException) {
e.response.status shouldBe HttpStatus.UNAUTHORIZED
}
}
|
- 成功類型
- 錯誤類型
更多建議: