Я пишу Netty клиент и серверное приложение, которое будет записывать статистику JVM GC в базу данных временных рядов для анализа около 300 серверов. Однако мой трубопровод бросает много из этих исключений:StreamCorruptedException возникает при декодировании объекта
10/02/2012 10:14:23.415 New I/O server worker #2-2 ERROR GCEventsCollector - netty error
java.io.StreamCorruptedException: invalid type code: 6E
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at org.jboss.netty.handler.codec.serialization.ObjectDecoder.decode(ObjectDecoder.java:94)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:282)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:349)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:280)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:200)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Похоже, это создает OutputStream, но один уже существует - так кидает, что конкретное исключение. Он появляется в моей среде AIT, где> 300 серверов подключаются, но не в DEV, где только один агент подключается.
Я подозреваю, что это либо ошибка в декодере объекта, либо где-то есть неправильный код. Пожалуйста, может кто-нибудь объяснить, почему это происходит?
Вот коллектор:
общественного класса GCEventsCollector расширяет AbstractDataCollector { защищенный статический окончательный Logger Регистратор = Logger.getLogger (GCEventsCollector.class); private static final ExecutorService WORKERS = Executors.newCachedThreadPool(); private static final ChannelGroup GROUP = new DefaultChannelGroup ("gc-events"); закрытый конечный порт; закрытый окончательный бутстрап ServerBootstrap;
public GCEventsCollector(final int port) {
logger.info("Creating GC Events collector on port " + port);
this.port = port;
this.bootstrap = newServerBootstrap(port);
}
/**
* Creates a bootstrap for creating bindings to sockets. Each channel has a pipeline, which contains the
* logic for handling a message such as encoding, decoding, buffering, etc.
*
* @param port port of socket
* @return configured bootstrap
*/
private ServerBootstrap newServerBootstrap(int port) {
ExecutorService bossExecutor = Executors.newCachedThreadPool();
ExecutorService workerExecutor = Executors.newCachedThreadPool();
NioServerSocketChannelFactory channelFactory =
new NioServerSocketChannelFactory(bossExecutor, workerExecutor);
ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
ChannelHandler collectorHandler = new CollectorHandler();
bootstrap.setPipelineFactory(new CollectorPipelineFactory(collectorHandler));
bootstrap.setOption("localAddress", new InetSocketAddress(port));
return bootstrap;
}
protected KDBCategory[] getKDBCategories() {
return new KDBCategory[] { KDBCategory.GC_EVENTS };
}
/**
* Bind to a socket to accept messages
*
* @throws Exception
*/
public void doStart() throws Exception {
Channel channel = bootstrap.bind();
GROUP.add(channel);
}
/**
* Disconnect the channel to stop accepting messages and wait until disconnected
*
* @throws Exception
*/
public void doStop() throws Exception {
logger.info("disconnecting");
GROUP.close().awaitUninterruptibly();
}
class CollectorHandler extends SimpleChannelHandler {
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
super.channelOpen(ctx, e);
GROUP.add(e.getChannel());
}
@Override
public void channelConnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
super.channelConnected(ctx, e);
logger.info("channel connected");
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
super.channelDisconnected(ctx, e);
logger.info("channel disconnected");
}
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Received GcStats event: " + e.toString());
}
WORKERS.execute(new Runnable() {
public void run() {
saveData(KDBCategory.GC_EVENTS, (GcEventsPersister) e.getMessage());
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
logger.error("netty error", e.getCause());
}
}
private static class CollectorPipelineFactory implements ChannelPipelineFactory {
private final ChannelHandler handler;
private CollectorPipelineFactory(ChannelHandler handler) {
this.handler = handler;
}
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new ObjectDecoder(), handler);
}
}
}
Вот агент:
public class GCEventsAgent {
private final static Logger logger = Logger.getLogger(GCEventsAgent.class);
private static final ExecutorService bosses = Executors.newCachedThreadPool();
private static final ExecutorService workers = Executors.newCachedThreadPool();
private static final Timer timer = new HashedWheelTimer();
private static final String localHostName;
private static final ParseExceptionListener exceptionListener = new ExceptionListener();
static {
String name = "";
try {
name = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
logger.error("cannot retrieve local host name", e);
}
localHostName = name;
}
public static void main(final String[] args) {
checkArgument(args.length >= 3, "Usage: GCEventsAgent [log4j cfg] [mba cfg] [server cfg] [process 1] ... [process n]");
System.setProperty("log4j.configuration", "file:log4j.properties");
final String log4jConfig = args[0];
DOMConfigurator.configure(log4jConfig);
final String mbaConfig = args[1];
final String serverConfigPath = args[2];
final ServerConfig serverCfg = new ServerConfig(serverConfigPath);
setup(serverCfg, args);
}
private static void setup(ServerConfig cfg, String[] args) {
final String host = cfg.getParameter(String.class, "host");
final int port = cfg.getParameter(Integer.class, "port");
if (args.length == 3)
configurePolling(cfg, host, port);
else
configureProcesses(cfg, args, host, port);
}
private static void configureProcesses(final ServerConfig cfg,
final String[] args,
final String host,
final int port) {
final List<File> logFiles = logFiles(cfg, args);
logger.info("Initializing GC Agent for [" + host + ":" + port + "]");
final NioClientSocketChannelFactory channelFactory =
new NioClientSocketChannelFactory(bosses, workers);
final ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setOption("remoteAddress", new InetSocketAddress(host, port));
final GCParserFactory parserFactory = new DefaultParserFactory();
final AgentProcessHandler agentHandler =
new AgentProcessHandler(bootstrap, logFiles, parserFactory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new ObjectEncoder(), agentHandler);
}
});
bootstrap.connect().awaitUninterruptibly();
}
private static void configurePolling(ServerConfig cfg, String host, int port) {
final int frequency = cfg.getParameter(Integer.class, "frequency");
final NioClientSocketChannelFactory channelFactory =
new NioClientSocketChannelFactory(newCachedThreadPool(), newCachedThreadPool());
final ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setOption("remoteAddress", new InetSocketAddress(host, port));
final GcParserSupplier parserSupplier = new GcParserSupplier();
final ConcurrentMap<File, Tailer> tailerMap = Maps.newConcurrentMap();
final ParseExceptionListener exceptionListener = new ExceptionListener();
final Set<File> discoveredSet = Sets.newHashSet();
final File directory = new File(cfg.getParameter(String.class, "logDirectory"));
final TailManager tailManager =
new TailManager(discoveredSet, tailerMap, parserSupplier, exceptionListener, localHostName);
final DetectionTask detectionTask = new DetectionTask(directory, discoveredSet, tailManager);
final FileWatcher fileWatcher =
new FileWatcher(Executors.newScheduledThreadPool(1), detectionTask, frequency);
final Timer timer = new HashedWheelTimer();
final EfxAgentHandler agentHandler =
new EfxAgentHandler(bootstrap, tailManager, fileWatcher, timer);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new ObjectEncoder(), agentHandler);
}
});
bootstrap.connect().awaitUninterruptibly();
}
private static List<File> logFiles(ServerConfig cfg, String[] args) {
String logDir = cfg.getParameter(String.class, "logDirectory");
final int n = args.length;
List<File> logFiles = new ArrayList<File>(n-3);
for (int i = 3; i < n; i++) {
String filePath = logDir + args[i] + ".gc.log";
try {
File file = new File(filePath);
if (!file.exists()) {
logger.info("missing log file so creating empty file at path: " + filePath);
File dir = file.getParentFile();
dir.mkdirs();
if (file.createNewFile())
logger.info("successfully created empty file at path: " + filePath);
}
logFiles.add(file);
} catch (IOException e) {
logger.error("error creating log file at path: " + filePath);
}
}
return logFiles;
}
static final class AgentPauseListener implements GCEventListener<CMSType, CMSHeap> {
private final Channel channel;
private final GcEventsPersister.Builder builder;
private byte state;
private AgentPauseListener(Channel channel,
GcEventsPersister.Builder builder) {
this.channel = channel;
this.builder = builder;
}
@Override
public void onPause(PauseDetail<CMSType> pauseDetail) {
logger.info("onPause");
checkState(state == 0x00 || state == 0x01);
builder
.collectionType(pauseDetail.getType() == null
? null : pauseDetail.getType().toString())
.start(new Instant(pauseDetail.getStartTimestamp()))
.end(new Instant(pauseDetail.getEndTimestamp()))
.pause(pauseDetail.getType() == null
? false : pauseDetail.getType().isPause())
.duration(pauseDetail.getPauseMillis());
if (state == 0x00)
channel.write(builder.build());
else
state |= 0x02;
}
@Override
public void onHeapBefore(HeapDetail<CMSHeap> details) {
logger.info("onHeapBefore");
checkState(state == 0x00);
builder.heapBefore(used(details)).heapBeforeTotal(total(details));
state |= 0x01;
}
@Override
public void onHeapAfter(HeapDetail<CMSHeap> details) {
logger.info("onHeapAfter");
checkState(state == 0x03);
builder.heapAfter(used(details)).heapAfterTotal(total(details));
channel.write(builder.build());
state = 0x00;
}
private final long used(HeapDetail<CMSHeap> details) {
return used(details, CMSHeap.PAR_NEW_GENERATION)
+ used(details, CMSHeap.CMS_GENERATION)
+ used(details, CMSHeap.CMS_PERM_GENERATION);
}
private final long used(HeapDetail<CMSHeap> heapDetail,
CMSHeap gen) {
final Map<CMSHeap, HeapDetail.HeapMetric> sizes = heapDetail.getSizes();
final long used = sizes.get(gen).getUsed();
logger.info("used = " + used);
return used;
}
private final long total(HeapDetail<CMSHeap> details) {
return total(details, CMSHeap.PAR_NEW_GENERATION)
+ total(details, CMSHeap.CMS_GENERATION)
+ total(details, CMSHeap.CMS_PERM_GENERATION);
}
private final long total(HeapDetail<CMSHeap> heapDetail,
CMSHeap gen) {
final Map<CMSHeap, HeapDetail.HeapMetric> sizes = heapDetail.getSizes();
return sizes.get(gen).getTotal();
}
}
static final class ExceptionListener implements ParseExceptionListener {
@Override
public void onParseError(int lineNo, String input, String error) {
logger.error("error parsing: " + lineNo + " - " + input + " - " + error);
}
}
static final class ReconnectTask implements TimerTask {
private final ClientBootstrap bootstrap;
ReconnectTask(ClientBootstrap bootstrap) {
this.bootstrap = bootstrap;
}
@Override
public void run(Timeout timeout) throws Exception {
bootstrap.connect();
}
}
static class AgentProcessHandler extends SimpleChannelHandler {
private final ClientBootstrap bootstrap;
private final List<File> logFiles;
private final GCParserFactory parserFactory;
private final Set<Tailer> tailers = new HashSet<Tailer>(4);
public AgentProcessHandler(ClientBootstrap bootstrap,
List<File> logFiles,
GCParserFactory parserFactory) {
this.bootstrap = bootstrap;
this.logFiles = logFiles;
this.parserFactory = parserFactory;
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
logger.info("channel connected");
for (File logFile : logFiles) {
logger.info("setting up scraper for logfile: " + logFile);
GCParser parser = parserFactory.getParser();
GcEventsPersister.Builder builder =
new GcEventsPersister.Builder(logFile.getName(), localHostName);
GCEventListener gcEventListener =
new AgentPauseListener(e.getChannel(), builder);
TailerListener listener =
new LogLineListener(parser, gcEventListener, exceptionListener);
Tailer tailer = Tailer.create(logFile, listener, 1000L, true);
tailers.add(tailer);
}
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
logger.error("channel disconnected");
stopTailers();
scheduleReconnect();
}
private void scheduleReconnect() {
timer.newTimeout(new ReconnectTask(bootstrap), 5L, TimeUnit.SECONDS);
}
private final void stopTailers() {
for (Tailer tailer : tailers) {
tailer.stop();
}
tailers.clear();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
final Throwable cause = e.getCause();
logger.error(cause);
if (cause instanceof ConnectException) {
stopTailers();
scheduleReconnect();
}
}
};
private static class LogLineListener extends TailerListenerAdapter {
private final GCParser parser;
private final GCEventListener pauseListener;
private final ParseExceptionListener exceptionLister;
public LogLineListener(GCParser parser,
GCEventListener listener,
ParseExceptionListener exceptionLister) {
this.parser = parser;
this.pauseListener = listener;
this.exceptionLister = exceptionLister;
}
@Override
public void handle(String line) {
logger.info("handle(String line)");
parser.matchLine(line, pauseListener, exceptionLister);
}
}
private static final class GcParserSupplier
implements Supplier<GCParser<CMSType, CMSHeap>> {
@Override public GCParser<CMSType, CMSHeap> get() {
return new CMSParser();
}
}
private static final class TailManager implements FileHandler {
private final Set<File> discoveredSet;
private final ConcurrentMap<File, Tailer> tailerMap;
private final Supplier<GCParser<CMSType, CMSHeap>> parserSupplier;
private final ParseExceptionListener exceptionListener;
private final String host;
private volatile boolean go;
private TailManager(final Set<File> discoveredSet,
final ConcurrentMap<File, Tailer> tailerMap,
final Supplier<GCParser<CMSType, CMSHeap>> parserSupplier,
final ParseExceptionListener exceptionListener,
final String host) {
this.discoveredSet = discoveredSet;
this.tailerMap = tailerMap;
this.parserSupplier = parserSupplier;
this.exceptionListener = exceptionListener;
this.host = host;
}
public void stop() {
go = false;
for (Tailer tailer : tailerMap.values())
tailer.stop();
tailerMap.clear();
}
public void start() {
go = true;
}
@Override public void onNew(final File file,
final Channel channel) {
checkState(go);
GCParser<CMSType, CMSHeap> parser = parserSupplier.get();
String fileName = file.getName();
GcEventsPersister.Builder builder =
new GcEventsPersister.Builder(fileName, host);
AgentPauseListener eventListener =
new AgentPauseListener(channel, builder);
Function<Void, Void> removeTail = new Function<Void, Void>() {
@Override
public Void apply(@Nullable final Void input) {
final Tailer tailer = tailerMap.remove(file);
tailer.stop();
discoveredSet.remove(file);
return null;
}
};
GcTailAdaptor listener =
new GcTailAdaptor(logger, parser, eventListener, exceptionListener, removeTail);
tailerMap.put(file, Tailer.create(file, listener, 1000L, true));
}
@Override public void onDelete(File file, Channel channel) {
checkState(go);
final Tailer tailer = tailerMap.remove(file);
tailer.stop();
}
}
static class EfxAgentHandler extends SimpleChannelHandler {
private final ClientBootstrap bootstrap;
private final TailManager tailManager;
private final FileWatcher fileWatcher;
private final Timer timer;
public EfxAgentHandler(ClientBootstrap bootstrap,
TailManager tailManager,
FileWatcher fileWatcher,
Timer timer) {
this.bootstrap = bootstrap;
this.tailManager = tailManager;
this.fileWatcher = fileWatcher;
this.timer = timer;
}
@Override public void channelConnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
logger.info("channel connected");
tailManager.start();
fileWatcher.start(e.getChannel());
}
@Override public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
logger.error("channel disconnected");
tailManager.stop();
fileWatcher.stop();
scheduleReconnect();
}
private void scheduleReconnect() {
timer.newTimeout(new ReconnectTask(bootstrap), 5L, TimeUnit.SECONDS);
}
@Override public void exceptionCaught(ChannelHandlerContext ctx,
ExceptionEvent e) throws Exception {
final Throwable cause = e.getCause();
logger.error(cause);
if (cause instanceof ConnectException) {
tailManager.stop();
scheduleReconnect();
}
}
}
static final class GcTailAdaptor extends TailerListenerAdapter {
private final Logger logger;
private final GCParser parser;
private final GCEventListener eventListener;
private final ParseExceptionListener exceptionListener;
private final Function<Void, Void> removeTail;
private volatile long lastTail;
GcTailAdaptor(final Logger logger,
final GCParser parser,
final GCEventListener eventListener,
final ParseExceptionListener exceptionListener,
final Function<Void, Void> removeTail) {
this.logger = logger;
this.parser = parser;
this.eventListener = eventListener;
this.exceptionListener = exceptionListener;
this.removeTail = removeTail;
}
@Override public void handle(String line) {
lastTail();
parser.matchLine(line, eventListener, exceptionListener);
}
private final void lastTail() {
final long t = System.currentTimeMillis();
if (lastTail == 0L) {
lastTail = t;
return;
}
if ((t-lastTail)>=1800000L)
removeTail.apply(null);
}
@Override public void handle(Exception ex) {
logger.error(ex);
}
}
@VisibleForTesting
final static class DetectionTask implements Runnable {
private final File directory;
private final Set<File> discovered;
private final FileHandler handler;
private volatile Channel channel;
DetectionTask(final File directory,
final Set<File> discovered,
final FileHandler handler) {
this.discovered = discovered;
this.handler = handler;
this.directory = directory;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
public boolean removeStaleFile(File file) {
checkArgument(discovered.contains(file),
"file is not discovered so cannot be stale");
return discovered.remove(file);
}
public void run() {
final File[] files = directory.listFiles();
for (int i=0, n=files.length; i<n; i++) {
final File file = files[i];
synchronized (discovered) {
if (!discovered.contains(file)) {
discovered.add(file);
handler.onNew(file, channel);
}
}
}
final ImmutableSet<File> logFiles = ImmutableSet.copyOf(files);
final ImmutableSet<File> diff = Sets.difference(discovered, logFiles).immutableCopy();
for (File file : diff) {
discovered.remove(file);
handler.onDelete(file, channel);
}
}
}
@VisibleForTesting static interface FileHandler {
void onNew(File file, Channel channel);
void onDelete(File file, Channel channel);
}
@VisibleForTesting
final static class FileWatcher {
private final ScheduledExecutorService executor;
private final DetectionTask detectionTask;
private final int frequency;
private volatile ScheduledFuture<?> task;
@VisibleForTesting
FileWatcher(ScheduledExecutorService executor,
DetectionTask detectionTask,
int frequency) {
this.executor = executor;
this.detectionTask = detectionTask;
this.frequency = frequency;
}
public void start(Channel chanel) {
task = executor.scheduleAtFixedRate(detectionTask, 0L, frequency, TimeUnit.SECONDS);
detectionTask.setChannel(chanel);
}
public void stop() {
task.cancel(true);
detectionTask.setChannel(null);
}
public static FileWatcher on(File directory,
FileHandler handler,
int frequency) {
checkNotNull(directory);
checkNotNull(handler);
checkArgument(directory.isDirectory(), "file is not a directory");
checkArgument(directory.canRead(), "no read access to directory");
checkArgument(0 < frequency, "frequency must be > 0");
final HashSet<File> objects = Sets.newHashSet();
final DetectionTask task = new DetectionTask(directory, objects, handler);
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
return new FileWatcher(executorService, task, frequency);
}
}
}
Показать код, пожалуйста. –
Отредактировано для добавления агента и кода коллекционера – algolicious