Skip to content

[求助] Pipe 开启后必须手动执行 flush 才能同步数据 #17972

@coniferous-cmd

Description

@coniferous-cmd

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

IoTDB 版本

IoTDB 2.0.8

问题

Pipe 已正常创建并启动,数据持续写入源端数据库。

但是新写入的数据不会立即通过 Pipe 同步到目标端,只有在手动执行:

flush

之后,之前写入的数据才会被同步。

表现上看,Pipe 似乎依赖 TsFile Flush 之后才能感知到新增数据,而不是实时同步内存中的写入数据。

复现步骤

  1. 创建 Pipe
package com.xlkh.tsdb.iotdb.plugin.processor;

import com.xlkh.tsdb.iotdb.Globals;
import com.xlkh.tsdb.iotdb.util.TableRowUtil;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsoleProcessor implements PipeProcessor {

  private static final Logger log = LoggerFactory.getLogger(ConsoleProcessor.class);
  private static final ConcurrentSkipListMap<Long, Map<String, String>> payloads = new ConcurrentSkipListMap<>();

  private static final AtomicInteger counter = new AtomicInteger(0);

  @Override
  public void validate(PipeParameterValidator pipeParameterValidator) throws Exception {

  }

  @Override
  public void customize(PipeParameters pipeParameters, PipeProcessorRuntimeConfiguration pipeProcessorRuntimeConfiguration) throws Exception {
    counter.set(Integer.parseInt(pipeParameters.getAttribute().getOrDefault("count", "200000")));
  }

  @Override
  public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
      throws Exception {
    tabletInsertionEvent.processTablet(((tablet, collector) -> {
      TableRowUtil.toMap(tablet).forEach((ts, fields) ->
          payloads.merge(ts, fields, (oldVal, newVal) -> {
            oldVal.putAll(newVal);
            return oldVal;
          }));
    }));

    Long timestamp = payloads.firstKey();
    log.info("payloads size: {} - {}", timestamp, payloads.get(timestamp).size());

    if (payloads.get(timestamp).size() >= counter.get()) {
      Entry<Long, Map<String, String>> firstEntry = payloads.pollFirstEntry();
      if (firstEntry != null) {
        Globals.EXECUTOR.submit(() -> {
          log.info("time: {} - {}", timestamp, System.currentTimeMillis());
          log.info("payloads size: {}", payloads.size());
          log.info("{}", Globals.gson.toJson(firstEntry.getValue()));
        });
      }
    }

    eventCollector.collect(tabletInsertionEvent);
  }

  @Override
  public void process(Event event, EventCollector eventCollector) throws Exception {
    eventCollector.collect(event);
  }

  @Override
  public void close() throws Exception {
    payloads.clear();
    counter.set(200000);
  }

}
  1. 启动 Pipe
DROP PIPE IF EXISTS console_pipe;
DROP PIPEPLUGIN IF EXISTS console_processor;

CREATE
PIPEPLUGIN console_processor AS 'com.xlkh.tsdb.iotdb.plugin.processor.ConsoleProcessor'
USING URI 'file:///home/iotdb/apache-iotdb-2.0.8-all-bin/ext/pipe/iotdb-plugin-ext.jar';

CREATE
PIPE console_pipe
WITH SOURCE (
  'source' = 'iotdb-source',
  'source.mode' = 'stream',
  'source.pattern' = 'root.s1_5w',
  'history.enable'='false',
  'realtime.enable'='true',
)
WITH PROCESSOR (
  'count' = '200000',
  'processor' = 'console_processor',
)
WITH SINK (
  'sink' = 'do-nothing-sink',
);
  1. 持续写入数据
INSERT INTO root.test.d1(time, s1) VALUES (NOW(), 190);
  1. 执行
flush

发现之前未同步的数据立即开始同步。

补充信息

  • Pipe 状态正常
  • Pipe 未报错
  • 问题可以稳定复现
  • 已确认源端数据写入成功
  • 希望确认这是配置问题还是 Pipe 的已知缺陷

Solution

No response

Alternatives

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions