我正在尝试使用 Spring 云流将数据从 kafka 发送到 Rsocket,然后在 React 上表示数据
这是我的配置。
@Configuration
public class RsocketConsumerConfiguration {
@Bean
public Sinks.Many<Data> sender(){
return Sinks.many().multicast().directBestEffort();
}
}
@Controller 公共类 ServerController {
@Autowired
private Sinks.Many<Data> integer;
@MessageMapping("integer")
public Flux<Data> integer() {
return integer.asFlux();
}
@EnableBinding(IClientProcessor.class)
public class Listener {
@Autowired
private Sinks.Many<Data> integer;
@StreamListener(IClientProcessor.INTEGER)
public void integer(Data val) {
System.out.println(val);
integer.tryEmitNext(val);
}
}
let client = new RSocketClient({
transport: new RSocketWebSocketClient(
{
url: 'ws://localhost:7000/ws',
wsCreator: (url) => new WebSocket(url),
debug: true,
},
BufferEncoders,
),
setup: {
dataMimeType: "application/json",
metadataMimeType: MESSAGE_RSOCKET_COMPOSITE_METADATA.string,
keepAlive: 5000,
lifetime: 60000,
},
});
client
.then(rsocket => {
console.log("Connected to rsocket");
rsocket.requestStream({
metadata: Buffer.from(encodeCompositeMetadata([
[MESSAGE_RSOCKET_ROUTING, encodeRoute("integer")],
])),
})
.subscribe({
onSubscribe: s => {
s.request(2147483647)
},
onNext: (p) => {
let newData = {
time: new Date(JSON.parse(p.data).time).getUTCSeconds(),
integer: JSON.parse(p.data).integer
}
newData.integer >100?setInteger(currentData => [newData, ...currentData]):setInt(currentData => [newData, ...currentData])
console.log(newData)
},
onError: (e) => console.error(e),
onComplete: () => console.log("Done")
});
spring.cloud.stream.bindings.integer.destination=integer 无法在react应用程序中看到它。请指教。我做错了什么?