Skip to content

数据合并与拆分#

本章将介绍如何合并和拆分数据,以及在哪些情况下执行这些操作可能很有用。

数据合并#

在某些情况下,您可能需要合并(组合)并处理来自不同来源的数据。

数据合并可能涉及:

  • 从多个来源创建一个数据集
  • 在多个系统之间同步数据。这可能包括删除重复数据或在一个系统中的数据发生变化时更新另一个系统中的数据

单向同步 vs 双向同步

在单向同步中,数据仅在一个方向上同步。一个系统作为唯一真实数据源。当主系统中的信息发生变化时,它会自动在次要系统中更新;但如果次要系统中的信息发生变化,这些变更不会反映到主系统中。

在双向同步中,数据在两个方向(两个系统之间)同步。当任一系统中的信息发生变化时,它都会自动在另一个系统中更新。

这篇博客教程解释了如何在两个CRM系统之间进行单向和双向数据同步。

在n8n中,您可以使用Merge节点合并来自两个不同节点的数据,该节点提供多种合并选项:

请注意,组合 > 按字段合并需要您输入要匹配的字段。这些字段应在数据源之间包含相同的值,以便n8n能够正确匹配数据。在Merge节点中,它们被称为Input 1 FieldInput 2 Field

Merge节点中的属性输入字段
Merge节点中的属性输入字段

点表示法中的属性输入

如果您想在Merge节点参数Input 1 FieldInput 2 Field中引用嵌套值,需要以点表示法格式(作为文本,而不是表达式)输入属性键。

Note

您也可以在别名Join下找到Merge节点。如果您熟悉SQL连接操作,这个名称可能更直观。

合并数据练习#

构建一个工作流,将来自客户数据存储节点和代码节点的数据进行合并。

  1. 添加一个 Merge 节点,其中 Input 1 来自 Customer Datastore 节点Input 2 来自 Code 节点
  2. Customer Datastore 节点中,运行 Get All People 操作。
  3. Code 节点中,创建一个包含两个对象的数组,每个对象有三个属性:namelanguagecountry,其中 country 属性包含两个子属性 codename
    • 用客户数据库中两个角色的信息填充这些属性的值。
    • 例如,Jay Gatsby 的语言是英语,国家是美国。
  4. Merge 节点中,尝试不同的合并选项。
显示解决方案

这个练习的工作流如下所示:

合并数据的工作流练习
合并数据的工作流练习

如果使用 Keep Matches 选项以 name 作为匹配字段来合并数据,结果应该如下所示(注意此示例仅包含 Jay Gatsby;根据您选择的角色,您的结果可能有所不同):

使用保留匹配选项的 Merge 节点输出
使用保留匹配选项的 Merge 节点输出

要检查节点的配置,您可以复制下面的 JSON 工作流代码并粘贴到您的编辑器 UI 中:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
{
"meta": {
	"templateCredsSetupCompleted": true,
	"instanceId": "cb484ba7b742928a2048bf8829668bed5b5ad9787579adea888f05980292a4a7"
},
"nodes": [
	{
	"parameters": {
		"mode": "combine",
		"mergeByFields": {
		"values": [
			{
			"field1": "name",
			"field2": "name"
			}
		]
		},
		"options": {}
	},
	"id": "578365f3-26dd-4fa6-9858-f0a5fdfc413b",
	"name": "Merge",
	"type": "n8n-nodes-base.merge",
	"typeVersion": 2.1,
	"position": [
		720,
		580
	]
	},
	{
	"parameters": {},
	"id": "71aa5aad-afdf-4f8a-bca0-34450eee8acc",
	"name": "When clicking \"Execute workflow\"",
	"type": "n8n-nodes-base.manualTrigger",
	"typeVersion": 1,
	"position": [
		260,
		560
	]
	},
	{
	"parameters": {
		"operation": "getAllPeople"
	},
	"id": "497174fe-3cab-4160-8103-78b44efd038d",
	"name": "Customer Datastore (n8n training)",
	"type": "n8n-nodes-base.n8nTrainingCustomerDatastore",
	"typeVersion": 1,
	"position": [
		500,
		460
	]
	},
	{
	"parameters": {
		"jsCode": "return [\n  {\n    'name': 'Jay Gatsby',\n    'language': 'English',\n    'country': {\n      'code': 'US',\n      'name': 'United States'\n    }\n    \n  }\n  \n];"
	},
	"id": "387e8a1e-e796-4f05-8e75-7ce25c786c5f",
	"name": "Code",
	"type": "n8n-nodes-base.code",
	"typeVersion": 2,
	"position": [
		500,
		720
	]
	}
],
"connections": {
	"When clicking \"Execute workflow\"": {
	"main": [
		[
		{
			"node": "Customer Datastore (n8n training)",
			"type": "main",
			"index": 0
		},
		{
			"node": "Code",
			"type": "main",
			"index": 0
		}
		]
	]
	},
	"Customer Datastore (n8n training)": {
	"main": [
		[
		{
			"node": "Merge",
			"type": "main",
			"index": 0
		}
		]
	]
	},
	"Code": {
	"main": [
		[
		{
			"node": "Merge",
			"type": "main",
			"index": 1
		}
		]
	]
	}
},
"pinData": {}
}

循环处理#

在某些情况下,您可能需要对数组中的每个元素或每条数据执行相同的操作(例如向通讯录中的每个联系人发送消息)。用技术术语来说,您需要遍历数据(使用循环)。

n8n 通常会自动处理这种重复性操作,因为节点会对每个数据项运行一次,因此您无需在工作流中构建循环。

不过,存在一些节点和操作的例外情况,这些情况需要您在工作流中构建循环。

要在 n8n 工作流中创建循环,您需要将一个节点的输出连接到前一个节点的输入,并添加一个 If 节点 来检查何时停止循环。

分批处理数据#

如果您需要处理大量传入数据、多次执行 Code 节点 或避免 API 速率限制,最好将数据分割成批次(组)来处理这些批次。

对于这些处理过程,请使用 Loop Over Items 节点。该节点将输入数据分割成指定大小的批次,并在每次迭代时返回预定数量的数据。

Loop Over Items 节点的执行

Loop Over Items 节点 在所有传入项被分割成批次并传递给工作流中的下一个节点后停止执行,因此不需要添加 If 节点 来停止循环。

循环/批量处理练习#

构建一个工作流,用于读取 Medium 和 dev.to 的 RSS 订阅源。该工作流应包含三个节点:

  1. 一个代码节点,返回 Medium (https://medium.com/feed/n8n-io) 和 dev.to (https://dev.to/feed/n8n) 的 RSS 订阅源 URL。
  2. 一个循环处理项节点,设置 Batch Size: 1,接收来自代码节点RSS读取节点的输入并遍历各项。
  3. 一个RSS读取节点,获取 Medium RSS 订阅源的 URL,通过表达式传递:{{ $json.url }}
    • RSS读取节点例外节点之一,它只处理接收到的第一项,因此需要使用循环处理项节点来遍历多个项。
显示解决方案
  1. 添加一个代码节点。可以通过多种方式格式化代码,其中一种方式是:
    • 模式设置为 Run Once for All Items
    • 语言设置为 JavaScript
    • 复制以下代码并粘贴到 JavaScript 代码编辑器中:
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      let urls = [
          {
              json: {
              url: 'https://medium.com/feed/n8n-io'
              }
          },
          {
          json: {
              url: 'https://dev.to/feed/n8n'
              } 
          }
      ]
      return urls;
      
  2. 添加一个循环处理项节点连接到代码节点
    • 批量大小设置为 1
  3. 循环处理项节点会自动添加一个名为 "Replace Me" 的节点。将该节点替换为RSS读取节点
    • URL设置为使用来自代码节点的 URL:{{ $json.url }}

本练习的工作流如下所示:

从两个博客获取RSS订阅源的工作流
从两个博客获取RSS订阅源的工作流

要检查节点的配置,您可以复制下面的 JSON 工作流代码并粘贴到您的编辑器 UI 中:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
{
"meta": {
    "templateCredsSetupCompleted": true,
    "instanceId": "cb484ba7b742928a2048bf8829668bed5b5ad9787579adea888f05980292a4a7"
},
"nodes": [
    {
    "parameters": {},
    "id": "ed8dc090-ae8c-4db6-a93b-0fa873015c25",
    "name": "When clicking \"Execute workflow\"",
    "type": "n8n-nodes-base.manualTrigger",
    "typeVersion": 1,
    "position": [
        460,
        460
    ]
    },
    {
    "parameters": {
        "jsCode": "let urls = [\n  {\n    json: {\n      url: 'https://medium.com/feed/n8n-io'\n    }\n  },\n  {\n   json: {\n     url: 'https://dev.to/feed/n8n'\n   } \n  }\n]\n\nreturn urls;"
    },
    "id": "1df2a9bf-f970-4e04-b906-92dbbc9e8d3a",
    "name": "Code",
    "type": "n8n-nodes-base.code",
    "typeVersion": 2,
    "position": [
        680,
        460
    ]
    },
    {
    "parameters": {
        "options": {}
    },
    "id": "3cce249a-0eab-42e2-90e3-dbdf3684e012",
    "name": "Loop Over Items",
    "type": "n8n-nodes-base.splitInBatches",
    "typeVersion": 3,
    "position": [
        900,
        460
    ]
    },
    {
    "parameters": {
        "url": "={{ $json.url }}",
        "options": {}
    },
    "id": "50e1c1dc-9a5d-42d3-b7c0-accc31636aa6",
    "name": "RSS Read",
    "type": "n8n-nodes-base.rssFeedRead",
    "typeVersion": 1,
    "position": [
        1120,
        460
    ]
    }
],
"connections": {
    "When clicking \"Execute workflow\"": {
    "main": [
        [
        {
            "node": "Code",
            "type": "main",
            "index": 0
        }
        ]
    ]
    },
    "Code": {
    "main": [
        [
        {
            "node": "Loop Over Items",
            "type": "main",
            "index": 0
        }
        ]
    ]
    },
    "Loop Over Items": {
    "main": [
        null,
        [
        {
            "node": "RSS Read",
            "type": "main",
            "index": 0
        }
        ]
    ]
    },
    "RSS Read": {
    "main": [
        [
        {
            "node": "Loop Over Items",
            "type": "main",
            "index": 0
        }
        ]
    ]
    }
},
"pinData": {}
}